rtmp: extract socket code into separate file

This commit is contained in:
Dan Kortschak 2018-09-19 16:21:32 +09:30
parent 29f7d24623
commit f25661ec2d
2 changed files with 222 additions and 183 deletions

View File

@ -33,18 +33,6 @@ LICENSE
package rtmp
/*
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
typedef struct sockaddr_in sockaddr_in;
typedef struct sockaddr sockaddr;
*/
import "C"
import (
"bytes"
"encoding/binary"
@ -54,11 +42,8 @@ import (
"math/rand"
"strconv"
"strings"
"syscall"
"time"
"unsafe"
"github.com/chamaken/cgolmnl/inet"
)
const _Gi = 1 << 30
@ -333,70 +318,6 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (ok bool) {
return true
}
// int add_addr_info(struct sockaddr_in *service, AVal *host, int port)
// rtmp.c +869
func C_add_addr_info(service *C.sockaddr_in, hostname string, port uint16) (ok bool) {
h := (*C.char)(unsafe.Pointer(goStrToCStr(hostname)))
service.sin_addr.s_addr = C.inet_addr(h)
if service.sin_addr.s_addr == C.INADDR_NONE {
host := C.gethostbyname(h)
if host == nil || *host.h_addr_list == nil {
//RTMP_Log(RTMP_LOGERROR, "Problem accessing the DNS. (addr: %s)", hostname)
return false
}
service.sin_addr = *(*C.struct_in_addr)(unsafe.Pointer(*host.h_addr_list))
}
service.sin_port = C.ushort(inet.Htons(port))
return true
}
// int RTMP_Connect0(RTMP *r, struct sockaddr* service);
// rtmp.c +906
func C_RTMP_Connect0(r *C_RTMP, service *C.sockaddr) (ok bool) {
r.m_sb.sb_timedout = false
r.m_pausing = 0
r.m_fDuration = 0
r.m_sb.sb_socket = int32(C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP))
if r.m_sb.sb_socket != -1 {
if C.connect(C.int(r.m_sb.sb_socket), service, C.socklen_t(unsafe.Sizeof(*service))) < 0 {
log.Println("C_RTMP_Connect0, failed to connect socket.")
}
if r.Link.socksport != 0 {
if debugMode {
log.Println("C_RTMP_Connect0: socks negotiation.")
}
if !C_SocksNegotiate(r) {
log.Println("C_RTMP_Connect0: SOCKS negotiation failed.")
return false
}
}
} else {
log.Println("C_RTMP_Connect0: failed to create socket.")
return false
}
{
var tv int32
SET_RCVTIMEO(&tv, r.Link.timeout)
if C.setsockopt(C.int(r.m_sb.sb_socket), C.SOL_SOCKET, C.SO_RCVTIMEO,
unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 {
log.Println("C_RTMP_Connect0: Setting socket timeout failed")
}
}
on := C.int(1)
C.setsockopt(C.int(r.m_sb.sb_socket), C.IPPROTO_TCP, C.TCP_NODELAY,
unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on)))
return true
}
// int RTMP_Connect1(RTMP* r, RTMPPacket* cp);
// rtmp.c +978
func C_RTMP_Connect1(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
@ -417,70 +338,6 @@ func C_RTMP_Connect1(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
return true
}
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
// TODO: port this
var service C.sockaddr_in
if r.Link.hostname == "" {
return false
}
// TODO: port this
service.sin_family = C.AF_INET
if r.Link.socksport != 0 {
if !C_add_addr_info(&service, r.Link.sockshost, r.Link.socksport) {
return false
}
} else {
// connect directly
if !C_add_addr_info(&service, r.Link.hostname, r.Link.port) {
return false
}
}
if !C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) {
return false
}
r.m_bSendCounter = true
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r);
// rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP) (ok bool) {
var addr int32
var service C.sockaddr_in
C_add_addr_info(&service, r.Link.hostname, r.Link.port)
addr = int32(inet.Htonl(uint32(service.sin_addr.s_addr)))
packet := []byte{
4, 1,
byte((r.Link.port >> 8) & 0xFF),
byte((r.Link.port) & 0xFF),
byte(addr>>24) & 0xFF, byte(addr>>16) & 0xFF,
byte(addr>>8) & 0xFF, byte(addr) & 0xFF,
0,
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0 && packet[1] == 90 {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMP_ConnectStream(RTMP* r, int seekTime);
// rtmp.c +1099
func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) (playing bool) {
@ -1735,46 +1592,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) {
}
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
if sb.sb_size == 0 {
sb.sb_start = 0
}
nBytes := C.long((len(sb.sb_buf) - 1) - (sb.sb_size + sb.sb_start))
nBytes, err := C.recv(C.int(sb.sb_socket), unsafe.Pointer(&sb.sb_buf[sb.sb_start+sb.sb_size]), C.size_t(nBytes), 0)
if nBytes == -1 {
log.Printf("C_RTMPSockBuf_Fill: recv error: %v", err)
if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN {
sb.sb_timedout = true
nBytes = 0
}
} else {
sb.sb_size += int(nBytes)
}
return int(nBytes)
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
// TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 {
return int32(C.send(C.int(sb.sb_socket), unsafe.Pointer(bAddr(buf)), C.ulong(len(buf)), 0))
}
// int
// RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 {
if sb.sb_socket != -1 {
return int32(C.close(C.int(sb.sb_socket)))
}
return 0
}
// int RTMP_Write(RTMP* r, const char* buf, int size);
/// int RTMP_Write(RTMP* r, const char* buf, int size);
// rtmp.c +5136
func C_RTMP_Write(r *C_RTMP, buf []byte) int {
// TODO: port RTMPPacket

221
rtmp/socket.go Normal file
View File

@ -0,0 +1,221 @@
/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
/*
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
typedef struct sockaddr_in sockaddr_in;
typedef struct sockaddr sockaddr;
*/
import "C"
import (
"log"
"syscall"
"unsafe"
"github.com/chamaken/cgolmnl/inet"
)
// int add_addr_info(struct sockaddr_in *service, AVal *host, int port)
// rtmp.c +869
func C_add_addr_info(service *C.sockaddr_in, hostname string, port uint16) (ok bool) {
h := (*C.char)(unsafe.Pointer(goStrToCStr(hostname)))
service.sin_addr.s_addr = C.inet_addr(h)
if service.sin_addr.s_addr == C.INADDR_NONE {
host := C.gethostbyname(h)
if host == nil || *host.h_addr_list == nil {
//RTMP_Log(RTMP_LOGERROR, "Problem accessing the DNS. (addr: %s)", hostname)
return false
}
service.sin_addr = *(*C.struct_in_addr)(unsafe.Pointer(*host.h_addr_list))
}
service.sin_port = C.ushort(inet.Htons(port))
return true
}
// int RTMP_Connect0(RTMP *r, struct sockaddr* service);
// rtmp.c +906
func C_RTMP_Connect0(r *C_RTMP, service *C.sockaddr) (ok bool) {
r.m_sb.sb_timedout = false
r.m_pausing = 0
r.m_fDuration = 0
r.m_sb.sb_socket = int32(C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP))
if r.m_sb.sb_socket != -1 {
if C.connect(C.int(r.m_sb.sb_socket), service, C.socklen_t(unsafe.Sizeof(*service))) < 0 {
log.Println("C_RTMP_Connect0, failed to connect socket.")
}
if r.Link.socksport != 0 {
if debugMode {
log.Println("C_RTMP_Connect0: socks negotiation.")
}
if !C_SocksNegotiate(r) {
log.Println("C_RTMP_Connect0: SOCKS negotiation failed.")
return false
}
}
} else {
log.Println("C_RTMP_Connect0: failed to create socket.")
return false
}
{
var tv int32
SET_RCVTIMEO(&tv, r.Link.timeout)
if C.setsockopt(C.int(r.m_sb.sb_socket), C.SOL_SOCKET, C.SO_RCVTIMEO,
unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 {
log.Println("C_RTMP_Connect0: Setting socket timeout failed")
}
}
on := C.int(1)
C.setsockopt(C.int(r.m_sb.sb_socket), C.IPPROTO_TCP, C.TCP_NODELAY,
unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on)))
return true
}
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
// TODO: port this
var service C.sockaddr_in
if r.Link.hostname == "" {
return false
}
// TODO: port this
service.sin_family = C.AF_INET
if r.Link.socksport != 0 {
if !C_add_addr_info(&service, r.Link.sockshost, r.Link.socksport) {
return false
}
} else {
// connect directly
if !C_add_addr_info(&service, r.Link.hostname, r.Link.port) {
return false
}
}
if !C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) {
return false
}
r.m_bSendCounter = true
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r);
// rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP) (ok bool) {
var addr int32
var service C.sockaddr_in
C_add_addr_info(&service, r.Link.hostname, r.Link.port)
addr = int32(inet.Htonl(uint32(service.sin_addr.s_addr)))
packet := []byte{
4, 1,
byte((r.Link.port >> 8) & 0xFF),
byte((r.Link.port) & 0xFF),
byte(addr>>24) & 0xFF, byte(addr>>16) & 0xFF,
byte(addr>>8) & 0xFF, byte(addr) & 0xFF,
0,
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0 && packet[1] == 90 {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
if sb.sb_size == 0 {
sb.sb_start = 0
}
nBytes := C.long((len(sb.sb_buf) - 1) - (sb.sb_size + sb.sb_start))
nBytes, err := C.recv(C.int(sb.sb_socket), unsafe.Pointer(&sb.sb_buf[sb.sb_start+sb.sb_size]), C.size_t(nBytes), 0)
if nBytes == -1 {
log.Printf("C_RTMPSockBuf_Fill: recv error: %v", err)
if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN {
sb.sb_timedout = true
nBytes = 0
}
} else {
sb.sb_size += int(nBytes)
}
return int(nBytes)
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
// TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 {
return int32(C.send(C.int(sb.sb_socket), unsafe.Pointer(bAddr(buf)), C.ulong(len(buf)), 0))
}
// int
// RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 {
if sb.sb_socket != -1 {
return int32(C.close(C.int(sb.sb_socket)))
}
return 0
}