diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index a1d10c2c..62cc8175 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -37,7 +37,9 @@ package rtmp #include #include #include +#include #include +#include typedef enum { RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE @@ -262,7 +264,7 @@ type RTMPSockBuf struct { sb_socket int sb_size int sb_start *byte - sb_buf [C.RTMP_BUFFER_CACHE_SIZE]byte // port const + sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const sb_timedout int sb_ssl uintptr } @@ -352,6 +354,7 @@ func (s *session) Write(data []byte) (int, error) { if s.rtmp == nil { return 0, Err(3) } + // TODO: port this if C.RTMP_IsConnected(s.rtmp) == 0 { return 0, Err(1) } @@ -381,6 +384,7 @@ func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) { return nil, errors.New("rtmp startSession: Failed to connect!") } + // TODO: port this if C.RTMP_ConnectStream(rtmp, 0) == 0 { //C.RTMP_Close(rtmp) //C.RTMP_Free(rtmp) @@ -461,7 +465,7 @@ func rtmpSetupUrl(r *C.RTMP, u string) int32 { r.Link.tcUrl.av_len = C.int(strlen(url)) } } - + // TODO: port this C.SocksSetup(r, &r.Link.sockshost) if r.Link.port == 0 { @@ -477,6 +481,7 @@ func rtmpSetupUrl(r *C.RTMP, u string) int32 { return 1 } +/* func rtmpClose(r *C.RTMP) { closeInternal(r, 0) } @@ -558,6 +563,7 @@ func closeInternal(r *C.RTMP, reconnect int32) { r.Link.playpath0.av_val = nil } } +*/ func rtmpEnableWrite(r *C.RTMP) { r.Link.protocol |= RTMP_FEATURE_WRITE @@ -568,6 +574,7 @@ func rtmpSetBufferMS(r *C.RTMP, size int32) { } func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int { + // TODO: port this var service C.sockaddr_in if r.Link.hostname.av_len == 0 { @@ -576,28 +583,77 @@ func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int { var tmp C.sockaddr_in memset((*byte)(unsafe.Pointer(&service)), 0, int(unsafe.Sizeof(tmp))) + // TODO: port this service.sin_family = C.AF_INET if r.Link.socksport != 0 { + // TODO: port this if C.add_addr_info(&service, &r.Link.sockshost, C.int(r.Link.socksport)) == 0 { return 0 } } else { // connect directly - if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)), C.int(r.Link.port)) == 0 { + if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)), + C.int(r.Link.port)) == 0 { return 0 } } + // TODO: port this if C.RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 { return 0 } r.m_bSendCounter = 1 + // TODO: port this return int(C.RTMP_Connect1(r, cp)) } +func rtmpConnect0(r *C.RTMP, service *C.sockaddr) int { + on := 1 + r.m_sb.sb_timedout = 0 + r.m_pausing = 0 + r.m_fDuration = 0.0 + + r.m_sb.sb_socket = C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP) + + if r.m_sb.sb_socket != -1 { + if C.connect(r.m_sb.sb_socket, service, C.uint(unsafe.Sizeof(*service))) < 0 { + log.Println("rtmpConnect0, failed to connect socket.") + } + + if r.Link.socksport != 0 { + if debugMode { + log.Println("rtmpConnect0: socks negotiation.") + } + + if C.SocksNegotiate(r) == 0 { + log.Println("rtmpConnect0: SOCKS negotiation failed.") + return 0 + } + } + } else { + log.Println("rtmpConnect0: failed to create socket.") + return 0 + } + + { + //C.SET_RCVTIMEO(tv, r.Link.timeout) + tv := r.Link.timeout * 1000 + + if C.setsockopt(r.m_sb.sb_socket, C.SOL_SOCKET, C.SO_RCVTIMEO, + unsafe.Pointer(&tv), C.uint(unsafe.Sizeof(tv))) != 0 { + log.Println("rtmpConnect0: Setting socket timeout failed") + } + } + + C.setsockopt(r.m_sb.sb_socket, C.IPPROTO_TCP, C.TCP_NODELAY, + unsafe.Pointer(&on), C.uint(unsafe.Sizeof(on))) + + return 1 +} + /* func socksSetup(r *RTMP, sockshost *AVal) { if sockshost.av_len != 0 { @@ -611,8 +667,8 @@ func endSession(rtmp *C.RTMP) uint32 { return 3 } - C.RTMP_Close(rtmp) - C.RTMP_Free(rtmp) + //C.RTMP_Close(rtmp) + //C.RTMP_Free(rtmp) return 0 } @@ -812,6 +868,7 @@ func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int { if packet.m_nChannel >= r.m_channelsAllocatedOut { log.Println("Resize") n := int(packet.m_nChannel + 10) + // TODO: port this packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut), C.size_t(unsafe.Sizeof(packet)*uintptr(n))) @@ -822,6 +879,7 @@ func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int { return 0 } r.m_vecChannelsOut = (**C.RTMPPacket)(packets) + // TODO: replace this with my memset C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut), int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)* uintptr(n-int(r.m_channelsAllocatedOut)))) @@ -841,7 +899,6 @@ func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int { if prevPacket.m_nTimeStamp == packet.m_nTimeStamp && packet.m_headerType == RTMP_PACKET_SIZE_SMALL { - // TODO: port this constant packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM } diff --git a/rtmp/rtmp_c/librtmp/rtmp.c b/rtmp/rtmp_c/librtmp/rtmp.c index 465496ad..81532de0 100644 --- a/rtmp/rtmp_c/librtmp/rtmp.c +++ b/rtmp/rtmp_c/librtmp/rtmp.c @@ -110,7 +110,7 @@ typedef enum { static int DumpMetaData(AMFObject *obj); static int HandShake(RTMP *r, int FP9HandShake); -static int SocksNegotiate(RTMP *r); +int SocksNegotiate(RTMP *r); static int SendConnectPacket(RTMP *r, RTMPPacket *cp); static int SendCheckBW(RTMP *r); @@ -866,7 +866,7 @@ int RTMP_SetupURL(RTMP *r, char *url) return TRUE; } -static int +int add_addr_info(struct sockaddr_in *service, AVal *host, int port) { char *hostname; @@ -1058,8 +1058,7 @@ RTMP_Connect(RTMP *r, RTMPPacket *cp) return RTMP_Connect1(r, cp); } -static int -SocksNegotiate(RTMP *r) +int SocksNegotiate(RTMP *r) { unsigned long addr; struct sockaddr_in service; diff --git a/rtmp/rtmp_c/librtmp/rtmp.h b/rtmp/rtmp_c/librtmp/rtmp.h index ca9fafd1..56fd4f52 100644 --- a/rtmp/rtmp_c/librtmp/rtmp.h +++ b/rtmp/rtmp_c/librtmp/rtmp.h @@ -293,6 +293,8 @@ extern "C" void SocksSetup(RTMP *r, AVal *sockshost); + int SocksNegotiate(RTMP *r); + int SendFCUnpublish(RTMP *r); int SendDeleteStream(RTMP *r, double dStreamId); diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 650078c0..17445969 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -47,6 +47,22 @@ var ( errMsg = "Obtained: %v, but wanted: %v" ) +func TestMemset(t *testing.T) { + size := 10 + setNum := 5 + testVal := byte('A') + mem := allocate(uintptr(size)) + memset((*byte)(mem), testVal, setNum) + for i := 0; i < size; i++ { + if i > setNum-1 { + testVal = byte(0) + } + if *indxBytePtr(mem, i) != testVal { + t.Errorf("mem doesn't match expected values at: %v", i) + } + } +} + func TestGoStrToCStr(t *testing.T) { goStr := "string\000" bStr := goStrToCStr(goStr)