From 26157f47a2e4702168ba79f6ab3009405b7bb7ff Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 6 Jan 2019 10:11:31 +1030 Subject: [PATCH] Refactored C_ReadN() to use io.ReadFull() and removed now obsolete RTMPSockBuf_Fill(). --- rtmp/rtmp.go | 72 +++++++++++++++----------------------------- rtmp/rtmp_headers.go | 4 +-- rtmp/socket.go | 14 ++------- 3 files changed, 28 insertions(+), 62 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 342f1369..f4790488 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -38,9 +38,9 @@ import ( "encoding/binary" "errors" "fmt" + "io" "log" "math/rand" - "net" "strconv" "time" ) @@ -404,52 +404,31 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { // int ReadN(RTMP* r, char* buffer, int n); // rtmp.c +1390 func C_ReadN(r *C_RTMP, buf []byte) int { - nOriginalSize := len(buf) - - for len(buf) != 0 { - nBytes := 0 - var nRead int - - avail := int(r.m_sb.sb_size) - if avail == 0 { - _, err := C_RTMPSockBuf_Fill(&r.m_sb) - if err != nil { - if err, ok := err.(net.Error); ok && err.Timeout() { - return 0 - } - } - avail = int(r.m_sb.sb_size) - } - - if len(buf) < avail { - nRead = len(buf) - } else { - nRead = avail - } - - if nRead > 0 { - copy(buf, r.m_sb.sb_buf[r.m_sb.sb_start:][:nRead]) - r.m_sb.sb_start += nRead - r.m_sb.sb_size -= nRead - nBytes = nRead - r.m_nBytesIn += int32(nRead) - if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { - if !C_SendBytesReceived(r) { - return 0 - } - } - } - - if nBytes == 0 { - log.Println("RTMP socket closed by peer") - C_RTMP_Close(r) - break - } - - buf = buf[nBytes:] + err := r.m_sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout))) + if err != nil { + return 0 } - - return nOriginalSize - len(buf) + n, err := io.ReadFull(r.m_sb.conn, buf) + if err != nil { + if debugMode { + log.Printf("C_ReadN error: %v\n", err) + } + return 0 + } + if n == 0 { + if debugMode { + log.Println("RTMP socket closed by peer") + } + C_RTMP_Close(r) + return 0 + } + r.m_nBytesIn += int32(n) + if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { + if !C_SendBytesReceived(r) { + return 0 + } + } + return n } // int WriteN(RTMP* r, const char* buffer, int n); @@ -1519,7 +1498,6 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) { r.m_numInvokes = 0 r.m_bPlaying = false - r.m_sb.sb_size = 0 r.m_msgCounter = 0 r.m_resplen = 0 diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index 6ae32190..e1ea2168 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -138,12 +138,10 @@ type C_RTMPPacket struct { // typedef struct RTMPSockBuf // rtmp.h +127 +// TODO: Incorporate what is left of C_RTMPSockBuf into C_RTMP_LNK type C_RTMPSockBuf struct { conn *net.TCPConn timeout uint - sb_size int - sb_start int - sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const } // RTMPPacket_IsReady(a) diff --git a/rtmp/socket.go b/rtmp/socket.go index 22675c14..453d079c 100644 --- a/rtmp/socket.go +++ b/rtmp/socket.go @@ -64,18 +64,8 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { // int RTMPSockBuf_Fill(RTMPSockBuf* sb); // rtmp.c +4253 -func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) (int, error) { - if sb.sb_size == 0 { - sb.sb_start = 0 - } - err := sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout))) - if err != nil { - return 0, err - } - n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:]) - sb.sb_size += n - return n, err -} +// DELETED + // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // rtmp.c +4297