diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 5a6bd9e7..d4dcd18e 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -40,8 +40,8 @@ import ( "fmt" "log" "math/rand" + "net" "strconv" - "strings" "time" ) @@ -141,10 +141,11 @@ func startSession(rtmp *C_RTMP, u string, timeout uint32) (*C_RTMP, error) { C_RTMP_EnableWrite(rtmp) C_RTMP_SetBufferMS(rtmp, 3600*1000) - if !C_RTMP_Connect(rtmp, nil) { + err := C_RTMP_Connect(rtmp, nil) + if err != nil { C_RTMP_Close(rtmp) //C.RTMP_Free(rtmp) - return nil, errors.New("rtmp startSession: Failed to connect!") + return nil, errors.New("rtmp startSession: Failed to connect with error: " + err.Error()) } // TODO: port this @@ -236,30 +237,13 @@ func C_RTMP_SetBufferMS(r *C_RTMP, size int32) { // void SocksSetup(RTMP *r, C_AVal* sockshost); // rtmp.c +410 -func C_SocksSetup(r *C_RTMP, sockshost string) { - if sockshost != "" { - p := strings.SplitN(sockshost, ":", 2) - r.Link.sockshost = p[0] - r.Link.socksport = 1080 - if len(p) != 1 { - port, err := strconv.Atoi(p[1]) - if err != nil { - port = 1080 - log.Println("C_SocksSetup: bad string conversion!") - } - r.Link.socksport = uint16(port) - } - } else { - r.Link.sockshost = "" - r.Link.socksport = 0 - } -} +// DELETED // int RTMP_SetupURL(RTMP *r, char* url); // rtmp.c +757 // NOTE: code dealing with rtmp over http has been disregarded func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) { - r.Link.protocol, r.Link.hostname, r.Link.port, r.Link.app, r.Link.playpath0, ok = C_RTMP_ParseURL(addr) + r.Link.protocol, r.Link.host, r.Link.port, r.Link.app, r.Link.playpath0, ok = C_RTMP_ParseURL(addr) if !ok { return false } @@ -268,15 +252,13 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) { if r.Link.tcUrl == "" { if r.Link.app != "" { r.Link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", - RTMPProtocolStringsLower[r.Link.protocol], r.Link.hostname, r.Link.port, r.Link.app) + RTMPProtocolStringsLower[r.Link.protocol], r.Link.host, r.Link.port, r.Link.app) r.Link.lFlags |= RTMP_LF_FTCU } else { r.Link.tcUrl = addr } } - C_SocksSetup(r, r.Link.sockshost) - if r.Link.port == 0 { switch { case (r.Link.protocol & RTMP_FEATURE_SSL) != 0: @@ -411,7 +393,6 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { // rtmp.c +1390 func C_ReadN(r *C_RTMP, buf []byte) int { nOriginalSize := len(buf) - r.m_sb.sb_timedout = false for len(buf) != 0 { nBytes := 0 @@ -419,8 +400,9 @@ func C_ReadN(r *C_RTMP, buf []byte) int { avail := int(r.m_sb.sb_size) if avail == 0 { - if C_RTMPSockBuf_Fill(&r.m_sb) < 1 { - if !r.m_sb.sb_timedout { + _, err := C_RTMPSockBuf_Fill(&r.m_sb) + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { return 0 } } @@ -439,7 +421,7 @@ func C_ReadN(r *C_RTMP, buf []byte) int { r.m_sb.sb_size -= nRead nBytes = nRead r.m_nBytesIn += int32(nRead) - if r.m_bSendCounter && r.m_nBytesIn > (r.m_nBytesInSent+r.m_nClientBW/10) { + if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { if !C_SendBytesReceived(r) { return 0 } @@ -462,20 +444,19 @@ func C_ReadN(r *C_RTMP, buf []byte) int { // rtmp.c +1502 func C_WriteN(r *C_RTMP, buf []byte) (ok bool) { for len(buf) != 0 { - nBytes := int(C_RTMPSockBuf_Send(&r.m_sb, buf)) - if nBytes < 0 { + n, err := C_RTMPSockBuf_Send(&r.m_sb, buf) + if n < 0 { if debugMode { - log.Println("C_WriteN, RTMP send error") + log.Printf("C_WriteN, RTMP send error: %v\n", err) } - C_RTMP_Close(r) return false } - if nBytes == 0 { + if n == 0 { break } - buf = buf[nBytes:] + buf = buf[n:] } // !ok here is equivalent to io.ErrShortWrite. @@ -1494,7 +1475,10 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) { } C_SendDeleteStream(r, float64(i)) } - C_RTMPSockBuf_Close(&r.m_sb) + err := C_RTMPSockBuf_Close(&r.m_sb) + if err != nil && debugMode { + log.Printf("C_RTMPSockBuf_Close error: %v\n", err) + } } r.m_stream_id = -1 diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index eb33814f..2e440713 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -139,12 +139,11 @@ type C_RTMPPacket struct { // typedef struct RTMPSockBuf // rtmp.h +127 type C_RTMPSockBuf struct { - conn *net.TCPConn - timeout int32 - sb_size int - sb_start int - sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const - sb_timedout bool + conn *net.TCPConn + timeout int32 + sb_size int + sb_start int + sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const } // RTMPPacket_IsReady(a) @@ -156,8 +155,7 @@ func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool { // typedef struct RTMP_LNK // rtmp.h +144 type C_RTMP_LNK struct { - hostname string - sockshost string + host string playpath0 string playpath string tcUrl string @@ -173,7 +171,6 @@ type C_RTMP_LNK struct { swfAge int32 protocol int32 timeout int32 - socksport uint16 port uint16 } @@ -201,7 +198,6 @@ type C_RTMP struct { m_nClientBW2 uint8 m_bPlaying bool m_bSendEncoding bool - m_bSendCounter bool m_numInvokes int32 m_methodCalls []C_RTMP_METHOD m_channelsAllocatedIn int32 diff --git a/rtmp/socket.go b/rtmp/socket.go index beef8e5c..80b0e253 100644 --- a/rtmp/socket.go +++ b/rtmp/socket.go @@ -34,113 +34,69 @@ LICENSE package rtmp import ( - "fmt" - "log" + "errors" "net" + "strconv" "time" ) // int RTMP_Connect(RTMP *r, RTMPPacket* cp); // rtmp.c +1032 -func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { - if r.Link.hostname == "" { - return false +func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { + if r.Link.host == "" { + return errors.New("Empty host") } - - var hostname string - if r.Link.socksport != 0 { - hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport) - } else { - hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port) - } - addr, err := net.ResolveTCPAddr("tcp4", hostname) + addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port))) if err != nil { - return false + return err } r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { - return false + return err } - if r.Link.socksport != 0 { - if !C_SocksNegotiate(r, addr) { - return false - } - } - r.m_sb.timeout = r.Link.timeout - r.m_bSendCounter = true - return C_RTMP_Connect1(r, cp) + if !C_RTMP_Connect1(r, cp) { + return errors.New("RTMP connection failed") + } + return nil } // int SocksNegotiate(RTMP* r); // rtmp.c +1062 -func C_SocksNegotiate(r *C_RTMP, addr *net.TCPAddr) (ok bool) { - ip := addr.IP.To4() - packet := []byte{ - 0x4, // SOCKS version - 0x1, // Establish a TCP/IP stream connection - byte(r.Link.port >> 8), byte(r.Link.port), - ip[0], ip[1], ip[2], ip[3], - 0x0, // Empty user ID string - } - - C_WriteN(r, packet) - - if C_ReadN(r, packet[:8]) != 8 { - return false - } - - if packet[0] == 0x0 && packet[1] == 0x5a { - return true - } - // TODO: use new logger here - log.Println("C_SocksNegotitate: SOCKS returned error code!") - return false -} +// DELETED // int RTMPSockBuf_Fill(RTMPSockBuf* sb); // rtmp.c +4253 -func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int { +func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) (int, error) { if sb.sb_size == 0 { sb.sb_start = 0 } - err := sb.conn.SetReadDeadline(time.Now().Local().Add(time.Second * time.Duration(sb.timeout))) + err := sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout))) if err != nil { - return -1 + return 0, err } n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:]) - if err != nil { - return 0 - } sb.sb_size += n - return n + return n, err } // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // rtmp.c +4297 // TODO replace send with golang net connection send -func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 { - err := sb.conn.SetWriteDeadline(time.Now().Local().Add(time.Second * time.Duration(sb.timeout))) +func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) (int, error) { + err := sb.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout))) if err != nil { - return -1 + return 0, err } - n, err := sb.conn.Write(buf) - if err != nil { - return -1 - } - return int32(n) + return sb.conn.Write(buf) } // int // RTMPSockBuf_Close(RTMPSockBuf *sb) // rtmp.c +4369 -func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 { - if sb.conn != nil { - err := sb.conn.Close() - sb.conn = nil - if err == nil { - return 1 - } +func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) error { + if sb.conn == nil { + return nil } - return 0 + return sb.conn.Close() }