diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index b66369cf..3b5abe66 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -33,18 +33,6 @@ LICENSE package rtmp -/* -#include -#include -#include -#include -#include - -typedef struct sockaddr_in sockaddr_in; -typedef struct sockaddr sockaddr; -*/ -import "C" - import ( "bytes" "encoding/binary" @@ -54,11 +42,8 @@ import ( "math/rand" "strconv" "strings" - "syscall" "time" "unsafe" - - "github.com/chamaken/cgolmnl/inet" ) const _Gi = 1 << 30 @@ -333,70 +318,6 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) { return true } -// int add_addr_info(struct sockaddr_in *service, AVal *host, int port) -// rtmp.c +869 -func C_add_addr_info(service *C.sockaddr_in, hostname string, port uint16) (ok bool) { - h := (*C.char)(unsafe.Pointer(goStrToCStr(hostname))) - service.sin_addr.s_addr = C.inet_addr(h) - if service.sin_addr.s_addr == C.INADDR_NONE { - host := C.gethostbyname(h) - if host == nil || *host.h_addr_list == nil { - //RTMP_Log(RTMP_LOGERROR, "Problem accessing the DNS. (addr: %s)", hostname) - return false - } - service.sin_addr = *(*C.struct_in_addr)(unsafe.Pointer(*host.h_addr_list)) - } - - service.sin_port = C.ushort(inet.Htons(port)) - return true -} - -// int RTMP_Connect0(RTMP *r, struct sockaddr* service); -// rtmp.c +906 -func C_RTMP_Connect0(r *C_RTMP, service *C.sockaddr) (ok bool) { - r.m_sb.sb_timedout = false - r.m_pausing = 0 - r.m_fDuration = 0 - - r.m_sb.sb_socket = int32(C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP)) - - if r.m_sb.sb_socket != -1 { - if C.connect(C.int(r.m_sb.sb_socket), service, C.socklen_t(unsafe.Sizeof(*service))) < 0 { - log.Println("C_RTMP_Connect0, failed to connect socket.") - } - - if r.Link.socksport != 0 { - if debugMode { - log.Println("C_RTMP_Connect0: socks negotiation.") - } - - if !C_SocksNegotiate(r) { - log.Println("C_RTMP_Connect0: SOCKS negotiation failed.") - return false - } - } - } else { - log.Println("C_RTMP_Connect0: failed to create socket.") - return false - } - - { - var tv int32 - SET_RCVTIMEO(&tv, r.Link.timeout) - - if C.setsockopt(C.int(r.m_sb.sb_socket), C.SOL_SOCKET, C.SO_RCVTIMEO, - unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 { - log.Println("C_RTMP_Connect0: Setting socket timeout failed") - } - } - - on := C.int(1) - C.setsockopt(C.int(r.m_sb.sb_socket), C.IPPROTO_TCP, C.TCP_NODELAY, - unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on))) - - return true -} - // int RTMP_Connect1(RTMP* r, RTMPPacket* cp); // rtmp.c +978 func C_RTMP_Connect1(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { @@ -417,70 +338,6 @@ func C_RTMP_Connect1(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { return true } -// int RTMP_Connect(RTMP *r, RTMPPacket* cp); -// rtmp.c +1032 -func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { - // TODO: port this - var service C.sockaddr_in - - if r.Link.hostname == "" { - return false - } - - // TODO: port this - service.sin_family = C.AF_INET - - if r.Link.socksport != 0 { - if !C_add_addr_info(&service, r.Link.sockshost, r.Link.socksport) { - return false - } - } else { - // connect directly - if !C_add_addr_info(&service, r.Link.hostname, r.Link.port) { - return false - } - } - if !C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) { - return false - } - - r.m_bSendCounter = true - - return C_RTMP_Connect1(r, cp) -} - -// int SocksNegotiate(RTMP* r); -// rtmp.c +1062 -func C_SocksNegotiate(r *C_RTMP) (ok bool) { - var addr int32 - var service C.sockaddr_in - - C_add_addr_info(&service, r.Link.hostname, r.Link.port) - addr = int32(inet.Htonl(uint32(service.sin_addr.s_addr))) - - packet := []byte{ - 4, 1, - byte((r.Link.port >> 8) & 0xFF), - byte((r.Link.port) & 0xFF), - byte(addr>>24) & 0xFF, byte(addr>>16) & 0xFF, - byte(addr>>8) & 0xFF, byte(addr) & 0xFF, - 0, - } - - C_WriteN(r, packet) - - if C_ReadN(r, packet[:8]) != 8 { - return false - } - - if packet[0] == 0 && packet[1] == 90 { - return true - } - // TODO: use new logger here - log.Println("C_SocksNegotitate: SOCKS returned error code!") - return false -} - // int RTMP_ConnectStream(RTMP* r, int seekTime); // rtmp.c +1099 func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) (playing bool) { @@ -1735,46 +1592,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) { } } -// int RTMPSockBuf_Fill(RTMPSockBuf* sb); -// rtmp.c +4253 -func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int { - if sb.sb_size == 0 { - sb.sb_start = 0 - } - - nBytes := C.long((len(sb.sb_buf) - 1) - (sb.sb_size + sb.sb_start)) - nBytes, err := C.recv(C.int(sb.sb_socket), unsafe.Pointer(&sb.sb_buf[sb.sb_start+sb.sb_size]), C.size_t(nBytes), 0) - if nBytes == -1 { - log.Printf("C_RTMPSockBuf_Fill: recv error: %v", err) - if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN { - sb.sb_timedout = true - nBytes = 0 - } - } else { - sb.sb_size += int(nBytes) - } - - return int(nBytes) -} - -// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); -// rtmp.c +4297 -// TODO replace send with golang net connection send -func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 { - return int32(C.send(C.int(sb.sb_socket), unsafe.Pointer(bAddr(buf)), C.ulong(len(buf)), 0)) -} - -// int -// RTMPSockBuf_Close(RTMPSockBuf *sb) -// rtmp.c +4369 -func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 { - if sb.sb_socket != -1 { - return int32(C.close(C.int(sb.sb_socket))) - } - return 0 -} - -// int RTMP_Write(RTMP* r, const char* buf, int size); +/// int RTMP_Write(RTMP* r, const char* buf, int size); // rtmp.c +5136 func C_RTMP_Write(r *C_RTMP, buf []byte) int { // TODO: port RTMPPacket diff --git a/rtmp/socket.go b/rtmp/socket.go new file mode 100644 index 00000000..88fc9582 --- /dev/null +++ b/rtmp/socket.go @@ -0,0 +1,221 @@ +/* +NAME + rtmp.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + +LICENSE + rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + + Derived from librtmp under the GNU Lesser General Public License 2.1 + Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org + Copyright (C) 2008-2009 Andrej Stepanchuk + Copyright (C) 2009-2010 Howard Chu +*/ + +package rtmp + +/* +#include +#include +#include +#include +#include + +typedef struct sockaddr_in sockaddr_in; +typedef struct sockaddr sockaddr; +*/ +import "C" + +import ( + "log" + "syscall" + "unsafe" + + "github.com/chamaken/cgolmnl/inet" +) + +// int add_addr_info(struct sockaddr_in *service, AVal *host, int port) +// rtmp.c +869 +func C_add_addr_info(service *C.sockaddr_in, hostname string, port uint16) (ok bool) { + h := (*C.char)(unsafe.Pointer(goStrToCStr(hostname))) + service.sin_addr.s_addr = C.inet_addr(h) + if service.sin_addr.s_addr == C.INADDR_NONE { + host := C.gethostbyname(h) + if host == nil || *host.h_addr_list == nil { + //RTMP_Log(RTMP_LOGERROR, "Problem accessing the DNS. (addr: %s)", hostname) + return false + } + service.sin_addr = *(*C.struct_in_addr)(unsafe.Pointer(*host.h_addr_list)) + } + + service.sin_port = C.ushort(inet.Htons(port)) + return true +} + +// int RTMP_Connect0(RTMP *r, struct sockaddr* service); +// rtmp.c +906 +func C_RTMP_Connect0(r *C_RTMP, service *C.sockaddr) (ok bool) { + r.m_sb.sb_timedout = false + r.m_pausing = 0 + r.m_fDuration = 0 + + r.m_sb.sb_socket = int32(C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP)) + + if r.m_sb.sb_socket != -1 { + if C.connect(C.int(r.m_sb.sb_socket), service, C.socklen_t(unsafe.Sizeof(*service))) < 0 { + log.Println("C_RTMP_Connect0, failed to connect socket.") + } + + if r.Link.socksport != 0 { + if debugMode { + log.Println("C_RTMP_Connect0: socks negotiation.") + } + + if !C_SocksNegotiate(r) { + log.Println("C_RTMP_Connect0: SOCKS negotiation failed.") + return false + } + } + } else { + log.Println("C_RTMP_Connect0: failed to create socket.") + return false + } + + { + var tv int32 + SET_RCVTIMEO(&tv, r.Link.timeout) + + if C.setsockopt(C.int(r.m_sb.sb_socket), C.SOL_SOCKET, C.SO_RCVTIMEO, + unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 { + log.Println("C_RTMP_Connect0: Setting socket timeout failed") + } + } + + on := C.int(1) + C.setsockopt(C.int(r.m_sb.sb_socket), C.IPPROTO_TCP, C.TCP_NODELAY, + unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on))) + + return true +} + +// int RTMP_Connect(RTMP *r, RTMPPacket* cp); +// rtmp.c +1032 +func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { + // TODO: port this + var service C.sockaddr_in + + if r.Link.hostname == "" { + return false + } + + // TODO: port this + service.sin_family = C.AF_INET + + if r.Link.socksport != 0 { + if !C_add_addr_info(&service, r.Link.sockshost, r.Link.socksport) { + return false + } + } else { + // connect directly + if !C_add_addr_info(&service, r.Link.hostname, r.Link.port) { + return false + } + } + if !C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) { + return false + } + + r.m_bSendCounter = true + + return C_RTMP_Connect1(r, cp) +} + +// int SocksNegotiate(RTMP* r); +// rtmp.c +1062 +func C_SocksNegotiate(r *C_RTMP) (ok bool) { + var addr int32 + var service C.sockaddr_in + + C_add_addr_info(&service, r.Link.hostname, r.Link.port) + addr = int32(inet.Htonl(uint32(service.sin_addr.s_addr))) + + packet := []byte{ + 4, 1, + byte((r.Link.port >> 8) & 0xFF), + byte((r.Link.port) & 0xFF), + byte(addr>>24) & 0xFF, byte(addr>>16) & 0xFF, + byte(addr>>8) & 0xFF, byte(addr) & 0xFF, + 0, + } + + C_WriteN(r, packet) + + if C_ReadN(r, packet[:8]) != 8 { + return false + } + + if packet[0] == 0 && packet[1] == 90 { + return true + } + // TODO: use new logger here + log.Println("C_SocksNegotitate: SOCKS returned error code!") + return false +} + +// int RTMPSockBuf_Fill(RTMPSockBuf* sb); +// rtmp.c +4253 +func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int { + if sb.sb_size == 0 { + sb.sb_start = 0 + } + + nBytes := C.long((len(sb.sb_buf) - 1) - (sb.sb_size + sb.sb_start)) + nBytes, err := C.recv(C.int(sb.sb_socket), unsafe.Pointer(&sb.sb_buf[sb.sb_start+sb.sb_size]), C.size_t(nBytes), 0) + if nBytes == -1 { + log.Printf("C_RTMPSockBuf_Fill: recv error: %v", err) + if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN { + sb.sb_timedout = true + nBytes = 0 + } + } else { + sb.sb_size += int(nBytes) + } + + return int(nBytes) +} + +// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); +// rtmp.c +4297 +// TODO replace send with golang net connection send +func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 { + return int32(C.send(C.int(sb.sb_socket), unsafe.Pointer(bAddr(buf)), C.ulong(len(buf)), 0)) +} + +// int +// RTMPSockBuf_Close(RTMPSockBuf *sb) +// rtmp.c +4369 +func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 { + if sb.sb_socket != -1 { + return int32(C.close(C.int(sb.sb_socket))) + } + return 0 +}