Refactored C_ReadN() to use io.ReadFull() and removed now obsolete RTMPSockBuf_Fill().

This commit is contained in:
scruzin 2019-01-06 10:11:31 +10:30
parent be1610b67f
commit 26157f47a2
3 changed files with 28 additions and 62 deletions

View File

@ -38,9 +38,9 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io"
"log" "log"
"math/rand" "math/rand"
"net"
"strconv" "strconv"
"time" "time"
) )
@ -404,52 +404,31 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 {
// int ReadN(RTMP* r, char* buffer, int n); // int ReadN(RTMP* r, char* buffer, int n);
// rtmp.c +1390 // rtmp.c +1390
func C_ReadN(r *C_RTMP, buf []byte) int { func C_ReadN(r *C_RTMP, buf []byte) int {
nOriginalSize := len(buf) err := r.m_sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout)))
for len(buf) != 0 {
nBytes := 0
var nRead int
avail := int(r.m_sb.sb_size)
if avail == 0 {
_, err := C_RTMPSockBuf_Fill(&r.m_sb)
if err != nil { if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return 0 return 0
} }
n, err := io.ReadFull(r.m_sb.conn, buf)
if err != nil {
if debugMode {
log.Printf("C_ReadN error: %v\n", err)
} }
avail = int(r.m_sb.sb_size) return 0
} }
if n == 0 {
if len(buf) < avail { if debugMode {
nRead = len(buf) log.Println("RTMP socket closed by peer")
} else {
nRead = avail
} }
C_RTMP_Close(r)
if nRead > 0 { return 0
copy(buf, r.m_sb.sb_buf[r.m_sb.sb_start:][:nRead]) }
r.m_sb.sb_start += nRead r.m_nBytesIn += int32(n)
r.m_sb.sb_size -= nRead
nBytes = nRead
r.m_nBytesIn += int32(nRead)
if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) {
if !C_SendBytesReceived(r) { if !C_SendBytesReceived(r) {
return 0 return 0
} }
} }
} return n
if nBytes == 0 {
log.Println("RTMP socket closed by peer")
C_RTMP_Close(r)
break
}
buf = buf[nBytes:]
}
return nOriginalSize - len(buf)
} }
// int WriteN(RTMP* r, const char* buffer, int n); // int WriteN(RTMP* r, const char* buffer, int n);
@ -1519,7 +1498,6 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) {
r.m_numInvokes = 0 r.m_numInvokes = 0
r.m_bPlaying = false r.m_bPlaying = false
r.m_sb.sb_size = 0
r.m_msgCounter = 0 r.m_msgCounter = 0
r.m_resplen = 0 r.m_resplen = 0

View File

@ -138,12 +138,10 @@ type C_RTMPPacket struct {
// typedef struct RTMPSockBuf // typedef struct RTMPSockBuf
// rtmp.h +127 // rtmp.h +127
// TODO: Incorporate what is left of C_RTMPSockBuf into C_RTMP_LNK
type C_RTMPSockBuf struct { type C_RTMPSockBuf struct {
conn *net.TCPConn conn *net.TCPConn
timeout uint timeout uint
sb_size int
sb_start int
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
} }
// RTMPPacket_IsReady(a) // RTMPPacket_IsReady(a)

View File

@ -64,18 +64,8 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error {
// int RTMPSockBuf_Fill(RTMPSockBuf* sb); // int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253 // rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) (int, error) { // DELETED
if sb.sb_size == 0 {
sb.sb_start = 0
}
err := sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout)))
if err != nil {
return 0, err
}
n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:])
sb.sb_size += n
return n, err
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297 // rtmp.c +4297