rtmp: use std net for connections

This commit is contained in:
Dan Kortschak 2018-09-30 18:59:01 +09:30
parent fcfdd69b8e
commit f61bd0a193
3 changed files with 40 additions and 155 deletions

View File

@ -201,7 +201,6 @@ func C_RTMP_Alloc() *C_RTMP {
// rtmp.c +329
func C_RTMP_Init(r *C_RTMP) {
*r = C_RTMP{}
r.m_sb.sb_socket = -1
r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_nBufferMS = 30000
@ -223,7 +222,7 @@ func C_RTMP_EnableWrite(r *C_RTMP) {
// int RTMP_IsConnected(RTMP *r);
// rtmp.c +363
func C_RTMP_IsConnected(r *C_RTMP) int32 {
if r.m_sb.sb_socket != -1 {
if r.m_sb.conn != nil {
return 1
}
return 0
@ -1400,7 +1399,14 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
nChunkSize := int(r.m_outChunkSize)
if debugMode {
log.Printf("C_RTMP_SendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
if r.m_sb.conn != nil {
f, err := r.m_sb.conn.File()
if err != nil {
log.Printf("could not get file: %v", err)
} else {
log.Printf("C_RTMP_SendPacket: fd=%d, size=%v", f.Fd(), nSize)
}
}
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
@ -1492,7 +1498,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) {
}
r.m_stream_id = -1
r.m_sb.sb_socket = -1
r.m_sb.conn = nil
r.m_nBWCheckCounter = 0
r.m_nBytesIn = 0
r.m_nBytesInSent = 0

View File

@ -31,6 +31,8 @@ LICENSE
*/
package rtmp
import "net"
const (
RTMPT_OPEN = iota
RTMPT_SEND
@ -137,7 +139,7 @@ type C_RTMPPacket struct {
// typedef struct RTMPSockBuf
// rtmp.h +127
type C_RTMPSockBuf struct {
sb_socket int32
conn *net.TCPConn
sb_size int
sb_start int
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const

View File

@ -33,122 +33,30 @@ LICENSE
package rtmp
/*
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
typedef struct sockaddr_in sockaddr_in;
typedef struct sockaddr sockaddr;
*/
import "C"
import (
"log"
"syscall"
"unsafe"
"github.com/chamaken/cgolmnl/inet"
"fmt"
"net"
)
func bAddr(buf []byte) *byte {
if len(buf) == 0 {
return nil
}
return &buf[0]
}
// int add_addr_info(struct sockaddr_in *service, AVal *host, int port)
// rtmp.c +869
func C_add_addr_info(service *C.sockaddr_in, hostname string, port uint16) (ok bool) {
h := C.CString(hostname)
defer C.free(unsafe.Pointer(h))
service.sin_addr.s_addr = C.inet_addr(h)
if service.sin_addr.s_addr == C.INADDR_NONE {
host := C.gethostbyname(h)
if host == nil || *host.h_addr_list == nil {
//RTMP_Log(RTMP_LOGERROR, "Problem accessing the DNS. (addr: %s)", hostname)
return false
}
service.sin_addr = *(*C.struct_in_addr)(unsafe.Pointer(*host.h_addr_list))
}
service.sin_port = C.ushort(inet.Htons(port))
return true
}
// int RTMP_Connect0(RTMP *r, struct sockaddr* service);
// rtmp.c +906
func C_RTMP_Connect0(r *C_RTMP, service *C.sockaddr) (ok bool) {
r.m_sb.sb_timedout = false
r.m_pausing = 0
r.m_fDuration = 0
r.m_sb.sb_socket = int32(C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP))
if r.m_sb.sb_socket != -1 {
if C.connect(C.int(r.m_sb.sb_socket), service, C.socklen_t(unsafe.Sizeof(*service))) < 0 {
log.Println("C_RTMP_Connect0, failed to connect socket.")
}
if r.Link.socksport != 0 {
if debugMode {
log.Println("C_RTMP_Connect0: socks negotiation.")
}
if !C_SocksNegotiate(r) {
log.Println("C_RTMP_Connect0: SOCKS negotiation failed.")
return false
}
}
} else {
log.Println("C_RTMP_Connect0: failed to create socket.")
return false
}
{
var tv int32
SET_RCVTIMEO(&tv, r.Link.timeout)
if C.setsockopt(C.int(r.m_sb.sb_socket), C.SOL_SOCKET, C.SO_RCVTIMEO,
unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 {
log.Println("C_RTMP_Connect0: Setting socket timeout failed")
}
}
on := C.int(1)
C.setsockopt(C.int(r.m_sb.sb_socket), C.IPPROTO_TCP, C.TCP_NODELAY,
unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on)))
return true
}
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
// TODO: port this
var service C.sockaddr_in
if r.Link.hostname == "" {
return false
}
// TODO: port this
service.sin_family = C.AF_INET
var hostname string
if r.Link.socksport != 0 {
if !C_add_addr_info(&service, r.Link.sockshost, r.Link.socksport) {
return false
}
hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport)
} else {
// connect directly
if !C_add_addr_info(&service, r.Link.hostname, r.Link.port) {
return false
}
hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port)
}
if !C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) {
addr, err := net.ResolveTCPAddr("tcp4", hostname)
if err != nil {
return false
}
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
return false
}
@ -157,38 +65,6 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r);
// rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP) (ok bool) {
var addr int32
var service C.sockaddr_in
C_add_addr_info(&service, r.Link.hostname, r.Link.port)
addr = int32(inet.Htonl(uint32(service.sin_addr.s_addr)))
packet := []byte{
4, 1,
byte((r.Link.port >> 8) & 0xFF),
byte((r.Link.port) & 0xFF),
byte(addr>>24) & 0xFF, byte(addr>>16) & 0xFF,
byte(addr>>8) & 0xFF, byte(addr) & 0xFF,
0,
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0 && packet[1] == 90 {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
@ -196,34 +72,35 @@ func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
sb.sb_start = 0
}
nBytes := C.ssize_t((len(sb.sb_buf) - 1) - (sb.sb_size + sb.sb_start))
nBytes, err := C.recv(C.int(sb.sb_socket), unsafe.Pointer(&sb.sb_buf[sb.sb_start+sb.sb_size]), C.size_t(nBytes), 0)
if nBytes == -1 {
log.Printf("C_RTMPSockBuf_Fill: recv error: %v", err)
if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN {
sb.sb_timedout = true
nBytes = 0
}
} else {
sb.sb_size += int(nBytes)
n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:])
if err != nil {
return 0
}
return int(nBytes)
sb.sb_size += n
return n
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
// TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 {
return int32(C.send(C.int(sb.sb_socket), unsafe.Pointer(bAddr(buf)), C.size_t(len(buf)), 0))
n, err := sb.conn.Write(buf)
if err != nil {
return -1
}
return int32(n)
}
// int
// RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 {
if sb.sb_socket != -1 {
return int32(C.close(C.int(sb.sb_socket)))
if sb.conn != nil {
err := sb.conn.Close()
sb.conn = nil
if err == nil {
return 1
}
}
return 0
}