Ported RTMP_Connect0 to rtmpConnect0 - and tested with success

This commit is contained in:
saxon 2018-07-22 18:18:46 +09:30
parent 30cf9bd6e3
commit e3eaa99284
4 changed files with 84 additions and 10 deletions

View File

@ -37,7 +37,9 @@ package rtmp
#include <string.h> #include <string.h>
#include <rtmp.h> #include <rtmp.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h>
typedef enum { typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
@ -262,7 +264,7 @@ type RTMPSockBuf struct {
sb_socket int sb_socket int
sb_size int sb_size int
sb_start *byte sb_start *byte
sb_buf [C.RTMP_BUFFER_CACHE_SIZE]byte // port const sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout int sb_timedout int
sb_ssl uintptr sb_ssl uintptr
} }
@ -352,6 +354,7 @@ func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil { if s.rtmp == nil {
return 0, Err(3) return 0, Err(3)
} }
// TODO: port this
if C.RTMP_IsConnected(s.rtmp) == 0 { if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1) return 0, Err(1)
} }
@ -381,6 +384,7 @@ func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) {
return nil, errors.New("rtmp startSession: Failed to connect!") return nil, errors.New("rtmp startSession: Failed to connect!")
} }
// TODO: port this
if C.RTMP_ConnectStream(rtmp, 0) == 0 { if C.RTMP_ConnectStream(rtmp, 0) == 0 {
//C.RTMP_Close(rtmp) //C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp) //C.RTMP_Free(rtmp)
@ -461,7 +465,7 @@ func rtmpSetupUrl(r *C.RTMP, u string) int32 {
r.Link.tcUrl.av_len = C.int(strlen(url)) r.Link.tcUrl.av_len = C.int(strlen(url))
} }
} }
// TODO: port this
C.SocksSetup(r, &r.Link.sockshost) C.SocksSetup(r, &r.Link.sockshost)
if r.Link.port == 0 { if r.Link.port == 0 {
@ -477,6 +481,7 @@ func rtmpSetupUrl(r *C.RTMP, u string) int32 {
return 1 return 1
} }
/*
func rtmpClose(r *C.RTMP) { func rtmpClose(r *C.RTMP) {
closeInternal(r, 0) closeInternal(r, 0)
} }
@ -558,6 +563,7 @@ func closeInternal(r *C.RTMP, reconnect int32) {
r.Link.playpath0.av_val = nil r.Link.playpath0.av_val = nil
} }
} }
*/
func rtmpEnableWrite(r *C.RTMP) { func rtmpEnableWrite(r *C.RTMP) {
r.Link.protocol |= RTMP_FEATURE_WRITE r.Link.protocol |= RTMP_FEATURE_WRITE
@ -568,6 +574,7 @@ func rtmpSetBufferMS(r *C.RTMP, size int32) {
} }
func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int { func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int {
// TODO: port this
var service C.sockaddr_in var service C.sockaddr_in
if r.Link.hostname.av_len == 0 { if r.Link.hostname.av_len == 0 {
@ -576,28 +583,77 @@ func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int {
var tmp C.sockaddr_in var tmp C.sockaddr_in
memset((*byte)(unsafe.Pointer(&service)), 0, int(unsafe.Sizeof(tmp))) memset((*byte)(unsafe.Pointer(&service)), 0, int(unsafe.Sizeof(tmp)))
// TODO: port this
service.sin_family = C.AF_INET service.sin_family = C.AF_INET
if r.Link.socksport != 0 { if r.Link.socksport != 0 {
// TODO: port this
if C.add_addr_info(&service, &r.Link.sockshost, C.int(r.Link.socksport)) == 0 { if C.add_addr_info(&service, &r.Link.sockshost, C.int(r.Link.socksport)) == 0 {
return 0 return 0
} }
} else { } else {
// connect directly // connect directly
if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)), C.int(r.Link.port)) == 0 { if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)),
C.int(r.Link.port)) == 0 {
return 0 return 0
} }
} }
// TODO: port this
if C.RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 { if C.RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 {
return 0 return 0
} }
r.m_bSendCounter = 1 r.m_bSendCounter = 1
// TODO: port this
return int(C.RTMP_Connect1(r, cp)) return int(C.RTMP_Connect1(r, cp))
} }
func rtmpConnect0(r *C.RTMP, service *C.sockaddr) int {
on := 1
r.m_sb.sb_timedout = 0
r.m_pausing = 0
r.m_fDuration = 0.0
r.m_sb.sb_socket = C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP)
if r.m_sb.sb_socket != -1 {
if C.connect(r.m_sb.sb_socket, service, C.uint(unsafe.Sizeof(*service))) < 0 {
log.Println("rtmpConnect0, failed to connect socket.")
}
if r.Link.socksport != 0 {
if debugMode {
log.Println("rtmpConnect0: socks negotiation.")
}
if C.SocksNegotiate(r) == 0 {
log.Println("rtmpConnect0: SOCKS negotiation failed.")
return 0
}
}
} else {
log.Println("rtmpConnect0: failed to create socket.")
return 0
}
{
//C.SET_RCVTIMEO(tv, r.Link.timeout)
tv := r.Link.timeout * 1000
if C.setsockopt(r.m_sb.sb_socket, C.SOL_SOCKET, C.SO_RCVTIMEO,
unsafe.Pointer(&tv), C.uint(unsafe.Sizeof(tv))) != 0 {
log.Println("rtmpConnect0: Setting socket timeout failed")
}
}
C.setsockopt(r.m_sb.sb_socket, C.IPPROTO_TCP, C.TCP_NODELAY,
unsafe.Pointer(&on), C.uint(unsafe.Sizeof(on)))
return 1
}
/* /*
func socksSetup(r *RTMP, sockshost *AVal) { func socksSetup(r *RTMP, sockshost *AVal) {
if sockshost.av_len != 0 { if sockshost.av_len != 0 {
@ -611,8 +667,8 @@ func endSession(rtmp *C.RTMP) uint32 {
return 3 return 3
} }
C.RTMP_Close(rtmp) //C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp) //C.RTMP_Free(rtmp)
return 0 return 0
} }
@ -812,6 +868,7 @@ func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
if packet.m_nChannel >= r.m_channelsAllocatedOut { if packet.m_nChannel >= r.m_channelsAllocatedOut {
log.Println("Resize") log.Println("Resize")
n := int(packet.m_nChannel + 10) n := int(packet.m_nChannel + 10)
// TODO: port this
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut), packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut),
C.size_t(unsafe.Sizeof(packet)*uintptr(n))) C.size_t(unsafe.Sizeof(packet)*uintptr(n)))
@ -822,6 +879,7 @@ func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
return 0 return 0
} }
r.m_vecChannelsOut = (**C.RTMPPacket)(packets) r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
// TODO: replace this with my memset
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut), C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)* int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)*
uintptr(n-int(r.m_channelsAllocatedOut)))) uintptr(n-int(r.m_channelsAllocatedOut))))
@ -841,7 +899,6 @@ func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp && if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL { packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
// TODO: port this constant
packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM
} }

View File

@ -110,7 +110,7 @@ typedef enum {
static int DumpMetaData(AMFObject *obj); static int DumpMetaData(AMFObject *obj);
static int HandShake(RTMP *r, int FP9HandShake); static int HandShake(RTMP *r, int FP9HandShake);
static int SocksNegotiate(RTMP *r); int SocksNegotiate(RTMP *r);
static int SendConnectPacket(RTMP *r, RTMPPacket *cp); static int SendConnectPacket(RTMP *r, RTMPPacket *cp);
static int SendCheckBW(RTMP *r); static int SendCheckBW(RTMP *r);
@ -866,7 +866,7 @@ int RTMP_SetupURL(RTMP *r, char *url)
return TRUE; return TRUE;
} }
static int int
add_addr_info(struct sockaddr_in *service, AVal *host, int port) add_addr_info(struct sockaddr_in *service, AVal *host, int port)
{ {
char *hostname; char *hostname;
@ -1058,8 +1058,7 @@ RTMP_Connect(RTMP *r, RTMPPacket *cp)
return RTMP_Connect1(r, cp); return RTMP_Connect1(r, cp);
} }
static int int SocksNegotiate(RTMP *r)
SocksNegotiate(RTMP *r)
{ {
unsigned long addr; unsigned long addr;
struct sockaddr_in service; struct sockaddr_in service;

View File

@ -293,6 +293,8 @@ extern "C"
void SocksSetup(RTMP *r, AVal *sockshost); void SocksSetup(RTMP *r, AVal *sockshost);
int SocksNegotiate(RTMP *r);
int SendFCUnpublish(RTMP *r); int SendFCUnpublish(RTMP *r);
int SendDeleteStream(RTMP *r, double dStreamId); int SendDeleteStream(RTMP *r, double dStreamId);

View File

@ -47,6 +47,22 @@ var (
errMsg = "Obtained: %v, but wanted: %v" errMsg = "Obtained: %v, but wanted: %v"
) )
func TestMemset(t *testing.T) {
size := 10
setNum := 5
testVal := byte('A')
mem := allocate(uintptr(size))
memset((*byte)(mem), testVal, setNum)
for i := 0; i < size; i++ {
if i > setNum-1 {
testVal = byte(0)
}
if *indxBytePtr(mem, i) != testVal {
t.Errorf("mem doesn't match expected values at: %v", i)
}
}
}
func TestGoStrToCStr(t *testing.T) { func TestGoStrToCStr(t *testing.T) {
goStr := "string\000" goStr := "string\000"
bStr := goStrToCStr(goStr) bStr := goStrToCStr(goStr)