Merged in sockets-cleanup (pull request #96)

Socket-related code cleanup #2

Approved-by: Saxon Milton <saxon.milton@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Alan Noble 2019-01-05 22:38:08 +00:00
commit 8381d1b1e8
3 changed files with 51 additions and 115 deletions

View File

@ -40,8 +40,8 @@ import (
"fmt" "fmt"
"log" "log"
"math/rand" "math/rand"
"net"
"strconv" "strconv"
"strings"
"time" "time"
) )
@ -141,10 +141,11 @@ func startSession(rtmp *C_RTMP, u string, timeout uint32) (*C_RTMP, error) {
C_RTMP_EnableWrite(rtmp) C_RTMP_EnableWrite(rtmp)
C_RTMP_SetBufferMS(rtmp, 3600*1000) C_RTMP_SetBufferMS(rtmp, 3600*1000)
if !C_RTMP_Connect(rtmp, nil) { err := C_RTMP_Connect(rtmp, nil)
if err != nil {
C_RTMP_Close(rtmp) C_RTMP_Close(rtmp)
//C.RTMP_Free(rtmp) //C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect!") return nil, errors.New("rtmp startSession: Failed to connect with error: " + err.Error())
} }
// TODO: port this // TODO: port this
@ -236,30 +237,13 @@ func C_RTMP_SetBufferMS(r *C_RTMP, size int32) {
// void SocksSetup(RTMP *r, C_AVal* sockshost); // void SocksSetup(RTMP *r, C_AVal* sockshost);
// rtmp.c +410 // rtmp.c +410
func C_SocksSetup(r *C_RTMP, sockshost string) { // DELETED
if sockshost != "" {
p := strings.SplitN(sockshost, ":", 2)
r.Link.sockshost = p[0]
r.Link.socksport = 1080
if len(p) != 1 {
port, err := strconv.Atoi(p[1])
if err != nil {
port = 1080
log.Println("C_SocksSetup: bad string conversion!")
}
r.Link.socksport = uint16(port)
}
} else {
r.Link.sockshost = ""
r.Link.socksport = 0
}
}
// int RTMP_SetupURL(RTMP *r, char* url); // int RTMP_SetupURL(RTMP *r, char* url);
// rtmp.c +757 // rtmp.c +757
// NOTE: code dealing with rtmp over http has been disregarded // NOTE: code dealing with rtmp over http has been disregarded
func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) { func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) {
r.Link.protocol, r.Link.hostname, r.Link.port, r.Link.app, r.Link.playpath0, ok = C_RTMP_ParseURL(addr) r.Link.protocol, r.Link.host, r.Link.port, r.Link.app, r.Link.playpath0, ok = C_RTMP_ParseURL(addr)
if !ok { if !ok {
return false return false
} }
@ -268,15 +252,13 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) {
if r.Link.tcUrl == "" { if r.Link.tcUrl == "" {
if r.Link.app != "" { if r.Link.app != "" {
r.Link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", r.Link.tcUrl = fmt.Sprintf("%v://%v:%v/%v",
RTMPProtocolStringsLower[r.Link.protocol], r.Link.hostname, r.Link.port, r.Link.app) RTMPProtocolStringsLower[r.Link.protocol], r.Link.host, r.Link.port, r.Link.app)
r.Link.lFlags |= RTMP_LF_FTCU r.Link.lFlags |= RTMP_LF_FTCU
} else { } else {
r.Link.tcUrl = addr r.Link.tcUrl = addr
} }
} }
C_SocksSetup(r, r.Link.sockshost)
if r.Link.port == 0 { if r.Link.port == 0 {
switch { switch {
case (r.Link.protocol & RTMP_FEATURE_SSL) != 0: case (r.Link.protocol & RTMP_FEATURE_SSL) != 0:
@ -411,7 +393,6 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 {
// rtmp.c +1390 // rtmp.c +1390
func C_ReadN(r *C_RTMP, buf []byte) int { func C_ReadN(r *C_RTMP, buf []byte) int {
nOriginalSize := len(buf) nOriginalSize := len(buf)
r.m_sb.sb_timedout = false
for len(buf) != 0 { for len(buf) != 0 {
nBytes := 0 nBytes := 0
@ -419,8 +400,9 @@ func C_ReadN(r *C_RTMP, buf []byte) int {
avail := int(r.m_sb.sb_size) avail := int(r.m_sb.sb_size)
if avail == 0 { if avail == 0 {
if C_RTMPSockBuf_Fill(&r.m_sb) < 1 { _, err := C_RTMPSockBuf_Fill(&r.m_sb)
if !r.m_sb.sb_timedout { if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return 0 return 0
} }
} }
@ -439,7 +421,7 @@ func C_ReadN(r *C_RTMP, buf []byte) int {
r.m_sb.sb_size -= nRead r.m_sb.sb_size -= nRead
nBytes = nRead nBytes = nRead
r.m_nBytesIn += int32(nRead) r.m_nBytesIn += int32(nRead)
if r.m_bSendCounter && r.m_nBytesIn > (r.m_nBytesInSent+r.m_nClientBW/10) { if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) {
if !C_SendBytesReceived(r) { if !C_SendBytesReceived(r) {
return 0 return 0
} }
@ -462,20 +444,19 @@ func C_ReadN(r *C_RTMP, buf []byte) int {
// rtmp.c +1502 // rtmp.c +1502
func C_WriteN(r *C_RTMP, buf []byte) (ok bool) { func C_WriteN(r *C_RTMP, buf []byte) (ok bool) {
for len(buf) != 0 { for len(buf) != 0 {
nBytes := int(C_RTMPSockBuf_Send(&r.m_sb, buf)) n, err := C_RTMPSockBuf_Send(&r.m_sb, buf)
if nBytes < 0 { if n < 0 {
if debugMode { if debugMode {
log.Println("C_WriteN, RTMP send error") log.Printf("C_WriteN, RTMP send error: %v\n", err)
} }
C_RTMP_Close(r) C_RTMP_Close(r)
return false return false
} }
if nBytes == 0 { if n == 0 {
break break
} }
buf = buf[nBytes:] buf = buf[n:]
} }
// !ok here is equivalent to io.ErrShortWrite. // !ok here is equivalent to io.ErrShortWrite.
@ -1494,7 +1475,10 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) {
} }
C_SendDeleteStream(r, float64(i)) C_SendDeleteStream(r, float64(i))
} }
C_RTMPSockBuf_Close(&r.m_sb) err := C_RTMPSockBuf_Close(&r.m_sb)
if err != nil && debugMode {
log.Printf("C_RTMPSockBuf_Close error: %v\n", err)
}
} }
r.m_stream_id = -1 r.m_stream_id = -1

View File

@ -144,7 +144,6 @@ type C_RTMPSockBuf struct {
sb_size int sb_size int
sb_start int sb_start int
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout bool
} }
// RTMPPacket_IsReady(a) // RTMPPacket_IsReady(a)
@ -156,8 +155,7 @@ func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool {
// typedef struct RTMP_LNK // typedef struct RTMP_LNK
// rtmp.h +144 // rtmp.h +144
type C_RTMP_LNK struct { type C_RTMP_LNK struct {
hostname string host string
sockshost string
playpath0 string playpath0 string
playpath string playpath string
tcUrl string tcUrl string
@ -173,7 +171,6 @@ type C_RTMP_LNK struct {
swfAge int32 swfAge int32
protocol int32 protocol int32
timeout int32 timeout int32
socksport uint16
port uint16 port uint16
} }
@ -201,7 +198,6 @@ type C_RTMP struct {
m_nClientBW2 uint8 m_nClientBW2 uint8
m_bPlaying bool m_bPlaying bool
m_bSendEncoding bool m_bSendEncoding bool
m_bSendCounter bool
m_numInvokes int32 m_numInvokes int32
m_methodCalls []C_RTMP_METHOD m_methodCalls []C_RTMP_METHOD
m_channelsAllocatedIn int32 m_channelsAllocatedIn int32

View File

@ -34,113 +34,69 @@ LICENSE
package rtmp package rtmp
import ( import (
"fmt" "errors"
"log"
"net" "net"
"strconv"
"time" "time"
) )
// int RTMP_Connect(RTMP *r, RTMPPacket* cp); // int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032 // rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error {
if r.Link.hostname == "" { if r.Link.host == "" {
return false return errors.New("Empty host")
} }
addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port)))
var hostname string
if r.Link.socksport != 0 {
hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport)
} else {
hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port)
}
addr, err := net.ResolveTCPAddr("tcp4", hostname)
if err != nil { if err != nil {
return false return err
} }
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr) r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil { if err != nil {
return false return err
} }
if r.Link.socksport != 0 {
if !C_SocksNegotiate(r, addr) {
return false
}
}
r.m_sb.timeout = r.Link.timeout r.m_sb.timeout = r.Link.timeout
r.m_bSendCounter = true if !C_RTMP_Connect1(r, cp) {
return C_RTMP_Connect1(r, cp) return errors.New("RTMP connection failed")
}
return nil
} }
// int SocksNegotiate(RTMP* r); // int SocksNegotiate(RTMP* r);
// rtmp.c +1062 // rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP, addr *net.TCPAddr) (ok bool) { // DELETED
ip := addr.IP.To4()
packet := []byte{
0x4, // SOCKS version
0x1, // Establish a TCP/IP stream connection
byte(r.Link.port >> 8), byte(r.Link.port),
ip[0], ip[1], ip[2], ip[3],
0x0, // Empty user ID string
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0x0 && packet[1] == 0x5a {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb); // int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253 // rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int { func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) (int, error) {
if sb.sb_size == 0 { if sb.sb_size == 0 {
sb.sb_start = 0 sb.sb_start = 0
} }
err := sb.conn.SetReadDeadline(time.Now().Local().Add(time.Second * time.Duration(sb.timeout))) err := sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout)))
if err != nil { if err != nil {
return -1 return 0, err
} }
n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:]) n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:])
if err != nil {
return 0
}
sb.sb_size += n sb.sb_size += n
return n return n, err
} }
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297 // rtmp.c +4297
// TODO replace send with golang net connection send // TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 { func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) (int, error) {
err := sb.conn.SetWriteDeadline(time.Now().Local().Add(time.Second * time.Duration(sb.timeout))) err := sb.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout)))
if err != nil { if err != nil {
return -1 return 0, err
} }
n, err := sb.conn.Write(buf) return sb.conn.Write(buf)
if err != nil {
return -1
}
return int32(n)
} }
// int // int
// RTMPSockBuf_Close(RTMPSockBuf *sb) // RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369 // rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 { func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) error {
if sb.conn != nil { if sb.conn == nil {
err := sb.conn.Close() return nil
sb.conn = nil
if err == nil {
return 1
} }
} return sb.conn.Close()
return 0
} }