From f61bd0a1933ba3703f69af606c28611932d9bf2e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 30 Sep 2018 18:59:01 +0930 Subject: [PATCH] rtmp: use std net for connections --- rtmp/rtmp.go | 14 +++- rtmp/rtmp_headers.go | 4 +- rtmp/socket.go | 177 +++++++------------------------------------ 3 files changed, 40 insertions(+), 155 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 71737619..5a6bd9e7 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -201,7 +201,6 @@ func C_RTMP_Alloc() *C_RTMP { // rtmp.c +329 func C_RTMP_Init(r *C_RTMP) { *r = C_RTMP{} - r.m_sb.sb_socket = -1 r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE r.m_nBufferMS = 30000 @@ -223,7 +222,7 @@ func C_RTMP_EnableWrite(r *C_RTMP) { // int RTMP_IsConnected(RTMP *r); // rtmp.c +363 func C_RTMP_IsConnected(r *C_RTMP) int32 { - if r.m_sb.sb_socket != -1 { + if r.m_sb.conn != nil { return 1 } return 0 @@ -1400,7 +1399,14 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) { nChunkSize := int(r.m_outChunkSize) if debugMode { - log.Printf("C_RTMP_SendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize) + if r.m_sb.conn != nil { + f, err := r.m_sb.conn.File() + if err != nil { + log.Printf("could not get file: %v", err) + } else { + log.Printf("C_RTMP_SendPacket: fd=%d, size=%v", f.Fd(), nSize) + } + } } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. @@ -1492,7 +1498,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) { } r.m_stream_id = -1 - r.m_sb.sb_socket = -1 + r.m_sb.conn = nil r.m_nBWCheckCounter = 0 r.m_nBytesIn = 0 r.m_nBytesInSent = 0 diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index 8296814c..62e667e1 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -31,6 +31,8 @@ LICENSE */ package rtmp +import "net" + const ( RTMPT_OPEN = iota RTMPT_SEND @@ -137,7 +139,7 @@ type C_RTMPPacket struct { // typedef struct RTMPSockBuf // rtmp.h +127 type C_RTMPSockBuf struct { - sb_socket int32 + conn *net.TCPConn sb_size int sb_start int sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const diff --git a/rtmp/socket.go b/rtmp/socket.go index e67c68e1..0e53b255 100644 --- a/rtmp/socket.go +++ b/rtmp/socket.go @@ -33,122 +33,30 @@ LICENSE package rtmp -/* -#include -#include -#include -#include -#include - -typedef struct sockaddr_in sockaddr_in; -typedef struct sockaddr sockaddr; -*/ -import "C" - import ( - "log" - "syscall" - "unsafe" - - "github.com/chamaken/cgolmnl/inet" + "fmt" + "net" ) -func bAddr(buf []byte) *byte { - if len(buf) == 0 { - return nil - } - return &buf[0] -} - -// int add_addr_info(struct sockaddr_in *service, AVal *host, int port) -// rtmp.c +869 -func C_add_addr_info(service *C.sockaddr_in, hostname string, port uint16) (ok bool) { - h := C.CString(hostname) - defer C.free(unsafe.Pointer(h)) - service.sin_addr.s_addr = C.inet_addr(h) - if service.sin_addr.s_addr == C.INADDR_NONE { - host := C.gethostbyname(h) - if host == nil || *host.h_addr_list == nil { - //RTMP_Log(RTMP_LOGERROR, "Problem accessing the DNS. (addr: %s)", hostname) - return false - } - service.sin_addr = *(*C.struct_in_addr)(unsafe.Pointer(*host.h_addr_list)) - } - - service.sin_port = C.ushort(inet.Htons(port)) - return true -} - -// int RTMP_Connect0(RTMP *r, struct sockaddr* service); -// rtmp.c +906 -func C_RTMP_Connect0(r *C_RTMP, service *C.sockaddr) (ok bool) { - r.m_sb.sb_timedout = false - r.m_pausing = 0 - r.m_fDuration = 0 - - r.m_sb.sb_socket = int32(C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP)) - - if r.m_sb.sb_socket != -1 { - if C.connect(C.int(r.m_sb.sb_socket), service, C.socklen_t(unsafe.Sizeof(*service))) < 0 { - log.Println("C_RTMP_Connect0, failed to connect socket.") - } - - if r.Link.socksport != 0 { - if debugMode { - log.Println("C_RTMP_Connect0: socks negotiation.") - } - - if !C_SocksNegotiate(r) { - log.Println("C_RTMP_Connect0: SOCKS negotiation failed.") - return false - } - } - } else { - log.Println("C_RTMP_Connect0: failed to create socket.") - return false - } - - { - var tv int32 - SET_RCVTIMEO(&tv, r.Link.timeout) - - if C.setsockopt(C.int(r.m_sb.sb_socket), C.SOL_SOCKET, C.SO_RCVTIMEO, - unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 { - log.Println("C_RTMP_Connect0: Setting socket timeout failed") - } - } - - on := C.int(1) - C.setsockopt(C.int(r.m_sb.sb_socket), C.IPPROTO_TCP, C.TCP_NODELAY, - unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on))) - - return true -} - // int RTMP_Connect(RTMP *r, RTMPPacket* cp); // rtmp.c +1032 func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { - // TODO: port this - var service C.sockaddr_in - if r.Link.hostname == "" { return false } - // TODO: port this - service.sin_family = C.AF_INET - + var hostname string if r.Link.socksport != 0 { - if !C_add_addr_info(&service, r.Link.sockshost, r.Link.socksport) { - return false - } + hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport) } else { - // connect directly - if !C_add_addr_info(&service, r.Link.hostname, r.Link.port) { - return false - } + hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port) } - if !C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) { + addr, err := net.ResolveTCPAddr("tcp4", hostname) + if err != nil { + return false + } + r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr) + if err != nil { return false } @@ -157,38 +65,6 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { return C_RTMP_Connect1(r, cp) } -// int SocksNegotiate(RTMP* r); -// rtmp.c +1062 -func C_SocksNegotiate(r *C_RTMP) (ok bool) { - var addr int32 - var service C.sockaddr_in - - C_add_addr_info(&service, r.Link.hostname, r.Link.port) - addr = int32(inet.Htonl(uint32(service.sin_addr.s_addr))) - - packet := []byte{ - 4, 1, - byte((r.Link.port >> 8) & 0xFF), - byte((r.Link.port) & 0xFF), - byte(addr>>24) & 0xFF, byte(addr>>16) & 0xFF, - byte(addr>>8) & 0xFF, byte(addr) & 0xFF, - 0, - } - - C_WriteN(r, packet) - - if C_ReadN(r, packet[:8]) != 8 { - return false - } - - if packet[0] == 0 && packet[1] == 90 { - return true - } - // TODO: use new logger here - log.Println("C_SocksNegotitate: SOCKS returned error code!") - return false -} - // int RTMPSockBuf_Fill(RTMPSockBuf* sb); // rtmp.c +4253 func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int { @@ -196,34 +72,35 @@ func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int { sb.sb_start = 0 } - nBytes := C.ssize_t((len(sb.sb_buf) - 1) - (sb.sb_size + sb.sb_start)) - nBytes, err := C.recv(C.int(sb.sb_socket), unsafe.Pointer(&sb.sb_buf[sb.sb_start+sb.sb_size]), C.size_t(nBytes), 0) - if nBytes == -1 { - log.Printf("C_RTMPSockBuf_Fill: recv error: %v", err) - if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN { - sb.sb_timedout = true - nBytes = 0 - } - } else { - sb.sb_size += int(nBytes) + n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:]) + if err != nil { + return 0 } - - return int(nBytes) + sb.sb_size += n + return n } // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // rtmp.c +4297 // TODO replace send with golang net connection send func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 { - return int32(C.send(C.int(sb.sb_socket), unsafe.Pointer(bAddr(buf)), C.size_t(len(buf)), 0)) + n, err := sb.conn.Write(buf) + if err != nil { + return -1 + } + return int32(n) } // int // RTMPSockBuf_Close(RTMPSockBuf *sb) // rtmp.c +4369 func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 { - if sb.sb_socket != -1 { - return int32(C.close(C.int(sb.sb_socket))) + if sb.conn != nil { + err := sb.conn.Close() + sb.conn = nil + if err == nil { + return 1 + } } return 0 }