diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 2e7fa865..d555a3ec 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -161,6 +161,8 @@ const ( const ( minDataSize = 11 debugMode = false + TRUE = 1 + FALSE = 0 ) const ( @@ -781,8 +783,8 @@ func handShake(r *C.RTMP, FP9HandShake int32) int { return 0 } - if C.ReadN(r, (*C.char)(unsafe.Pointer(&typ)), 1) != 1 { - //if readN(r, (*byte)(unsafe.Pointer(&typ)), 1) != 1 { + //if C.ReadN(r, (*C.char)(unsafe.Pointer(&typ)), 1) != 1 { + if readN(r, (*byte)(unsafe.Pointer(&typ)), 1) != 1 { return 0 } @@ -807,8 +809,8 @@ func handShake(r *C.RTMP, FP9HandShake int32) int { return 0 } - //if readN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { - if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { + if readN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { + //if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { return 0 } @@ -931,7 +933,8 @@ func sendBytesReceived(r *C.RTMP) int { r.m_nBytesInSent = r.m_nBytesIn - return int(C.RTMP_SendPacket(r, &packet, 0)) + //return int(C.RTMP_SendPacket(r, &packet, 0)) + return rtmpSendPacket(r, &packet, 0) } func sendConnectPacket(r *C.RTMP, cp *C.RTMPPacket) int { @@ -1286,6 +1289,139 @@ func rtmpConnectStream(r *C.RTMP, seekTime int32) int { return int(r.m_bPlaying) } +func rtmpReadPacket(r *C.RTMP, packet *C.RTMPPacket) int32 { + var hbuf [RTMP_MAX_HEADER_SIZE]uint8 + memset((*byte)(hbuf), RTMP_MAX_HEADER_SIZE) + var header *byte + header = (*byte)(unsafe.Pointer(&hbuf[0])) + var nSize, hSize, nToRead, nChunk int32 + var didAlloc int32 = 0 + var extendedTimestamp int32 + + if readN(r, (*byte)(&hbuf[0]), 1) == 0 { + log.Println("rtmpReadPacket: failed to read RTMP packet header!") + return 0 + } + + packet.m_headerType = C.int((hbuf[0] & 0xc0) >> 6) + packet.m_Channel = C.int(hbuf[0] & 0x3f) + header = (*byte)(incBytePtr(unsafe.Pointer(header), 1)) + + switch { + case packet.m_nChannel == 0: + if readN(r, (*byte)(&hbuf[1]), 1) != 1 { + log.Println("rtmpReadPacket: failed to read rtmp packet header 2nd byte.") + return 0 + } + + packet.m_nChannel = hbuf[1] + packet.m_nChannel += 64 + header = (*byte)(incBytePtr(unsafe.Pointer(header), 1)) + case packet.m_nChannel == 1: + var tmp int32 + + if readN(r, (*byte)(&hubf[1]), 2) != 2 { + log.Println("rtmpReadPacket: failed to read RTMP packet 3rd byte") + return 0 + } + + tmp = (hbuf[2] << 8) + hbuf[1] + packet.m_nChannel = C.int(tmp + 64) + header = (*byte)(incBytePtr(unsafe.Pointer(header), 2)) + } + + nSize = packetSize[packet.m_headerType] + + if packet.m_nChannel >= r.m_channelsAllocatedIn { + var n int32 = packet.m_nChannel + 10 + timestamp := (*int32)(C.realloc(r.m_channelTimestamp, unsafe.Sizeof(n)*n)) + var packets **C.RTMPPacket = (**C.RTMPPacket)(C.realloc(r.m_vecChannelsIn, unsafe.Sizeof(*packets)*n)) + if timestamp == nil { + C.free(r.m_channelTimestamp) + } + + if packets == nil { + C.free(r.m_vecChannelsIn) + } + + r.m_channelTimestamp = timestamp + r.m_vecChannelsIn = packets + + if timestamp == nil || packets == nil { + r.m_channelsAllocatedInt = 0 + return 0 + } + + memset(r.m_channelTimestamp+r.m_channelsAllocatedInt, 0, int32Size*(n- + r.m_channelsAllocatedIn)) + memset(incPtr(unsafe.Pointer(r.m_vecChannelsIn), int(uintptr(unsafe.Pointer( + r.m_channelsAllocatedIn))), unsafe.Sizeof(*r.m_vecChannelsIn)), 0, + unsafe.Sizeof(*packets)*(n-r.m_channelsAllocatedIn)) + r.m_channelsAllocatedIn = n + } + + switch { + case nSize == RTMP_LARGE_HEADER_SIZE: + packet.m_hasAbsTimestamp = 1 + case nSize < RTMP_LARGE_HEADER_SIZE: + if r.m_vecChannelsIn[packet.m_nChannel] != nil { + memmove(unsafe.Pointer(packet), unsafe.Pointer( + r.m_vecChannelsIn[packet.m_nChannel]), unsafe.Sizeof(**packets)) + } + } + + nSize-- + + if nSize > 0 && readN(r, header, nSize) != nSize { + log.Println("rtmpReadPacket: failed to read rtmp packet header.") + return 0 + } + + hSize = nSize + decBytePtr(unsafe.Pointer(header), int(uintptr( + unsafe.Pointer(&hbuf[0])))) + + if nSize >= 3 { + packet.m_nTimeStamp = amfDecodeInt24(header) + + if nSize >= 6 { + packet.m_nBodySize = amfDecodeInt24((*byte)(incBytePtr(unsafe.Pointer(header), 3))) + packet.m_nBytesRead = 0 + + if nSize > 6 { + packet.m_packetType = C.char(*indxBytePtr(unsafe.Pointer(header), 6)) + + if nSize == 11 { + // TODO: port this + packet.m_nInfoField2 = C.DecodeInt32LE((*C.char)(incBytePtr(unsafe.Pointer(header), 7))) + } + } + } + } + extendedTimestamp = 0 + if packet.m_nTimeStamp == 0xffffff { + extendedTimestamp = 1 + } + + if extendedTimestamp != 0 { + if readN(r, (*byte)(incBytePtr(unsafe.Pointer(header), nSize)), 4) != 4 { + log.Println("RTMPRead_Packet: Failed to read extended timestamp") + return 0 + } + // TODO: port this + packet.m_nTimeStamp = amfDecodeInt32((*C.char)(incBytePtr(unsafe.Pointer(header), nSize))) + hSize += 4 + } + + if packet.m_nBodySize > 0 && packet.m_body == nil { + // TODO: port this + if C.RTMPPacket_Alloc(packet, packet.m_nBodySize) == 0 { + log.Println("RTMPRead_Packet: failed to allocate packet") + return 0 + } + didAlloc = TRUE + } +} + func rtmpPacketIsReady(p *C.RTMPPacket) int { if p.m_nBytesRead == p.m_nBodySize { return 1 diff --git a/rtmp/rtmp_c/librtmp/rtmp.c b/rtmp/rtmp_c/librtmp/rtmp.c index 59927b28..83021dbf 100644 --- a/rtmp/rtmp_c/librtmp/rtmp.c +++ b/rtmp/rtmp_c/librtmp/rtmp.c @@ -3523,7 +3523,7 @@ HandleClientBW(RTMP *r, const RTMPPacket *packet) r->m_nClientBW2); } -static int +int DecodeInt32LE(const char *data) { unsigned char *c = (unsigned char *)data; diff --git a/rtmp/rtmp_c/librtmp/rtmp.h b/rtmp/rtmp_c/librtmp/rtmp.h index 8cbf2986..dd19f0c0 100644 --- a/rtmp/rtmp_c/librtmp/rtmp.h +++ b/rtmp/rtmp_c/librtmp/rtmp.h @@ -299,6 +299,8 @@ extern "C" int HandShake(RTMP *r, int FP9HandShake); + int DecodeInt32LE(const char *data); + int SendBytesReceived(RTMP *r); int SendConnectPacket(RTMP *r, RTMPPacket *cp);