rtmp: make C_RTMPPacket.m_body a []byte

Punt on, but prepare for, the more ridiculous C-isms.
This commit is contained in:
Dan Kortschak 2018-09-19 14:31:42 +09:30
parent 467984513b
commit cb47db73c8
2 changed files with 48 additions and 42 deletions

View File

@ -219,7 +219,8 @@ func C_RTMP_GetTime() int32 {
// rtmp.c +189 // rtmp.c +189
func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) (ok bool) { func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) (ok bool) {
buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize) buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize)
p.m_body = &buf[RTMP_MAX_HEADER_SIZE] p.m_header = buf[:RTMP_MAX_HEADER_SIZE]
p.m_body = buf[RTMP_MAX_HEADER_SIZE:]
p.m_nBytesRead = 0 p.m_nBytesRead = 0
return true return true
} }
@ -561,7 +562,7 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 {
// TODO use new logger here // TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize); //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize);
if C_HandleInvoke(r, pl2b(packet.m_body, int(packet.m_nBodySize))) { if C_HandleInvoke(r, packet.m_body[:packet.m_nBodySize]) {
// This will never happen with the methods we implement. // This will never happen with the methods we implement.
log.Println("HasMediaPacket") log.Println("HasMediaPacket")
bHasMediaPacket = 2 bHasMediaPacket = 2
@ -667,9 +668,10 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_connect) enc = C_AMF_EncodeString(enc, av_connect)
@ -786,9 +788,10 @@ func C_RTMP_SendCreateStream(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_createStream) enc = C_AMF_EncodeString(enc, av_createStream)
r.m_numInvokes++ r.m_numInvokes++
@ -812,9 +815,10 @@ func C_SendReleaseStream(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_releaseStream) enc = C_AMF_EncodeString(enc, av_releaseStream)
r.m_numInvokes++ r.m_numInvokes++
@ -842,9 +846,10 @@ func C_SendFCPublish(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_FCPublish) enc = C_AMF_EncodeString(enc, av_FCPublish)
r.m_numInvokes++ r.m_numInvokes++
@ -872,9 +877,10 @@ func C_SendFCUnpublish(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_FCUnpublish) enc = C_AMF_EncodeString(enc, av_FCUnpublish)
r.m_numInvokes++ r.m_numInvokes++
@ -903,9 +909,10 @@ func C_SendPublish(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: r.m_stream_id, m_nInfoField2: r.m_stream_id,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_publish) enc = C_AMF_EncodeString(enc, av_publish)
r.m_numInvokes++ r.m_numInvokes++
@ -940,9 +947,10 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_deleteStream) enc = C_AMF_EncodeString(enc, av_deleteStream)
r.m_numInvokes++ r.m_numInvokes++
@ -968,9 +976,10 @@ func C_SendBytesReceived(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
r.m_nBytesInSent = r.m_nBytesIn r.m_nBytesInSent = r.m_nBytesIn
C_AMF_EncodeInt32(enc, r.m_nBytesIn) C_AMF_EncodeInt32(enc, r.m_nBytesIn)
@ -991,9 +1000,10 @@ func C_SendCheckBW(r *C_RTMP) (ok bool) {
m_nTimeStamp: 0, m_nTimeStamp: 0,
m_nInfoField2: 0, m_nInfoField2: 0,
m_hasAbsTimestamp: false, m_hasAbsTimestamp: false,
m_body: &pbuf[RTMP_MAX_HEADER_SIZE], m_header: pbuf[:RTMP_MAX_HEADER_SIZE],
m_body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
enc := pbuf[RTMP_MAX_HEADER_SIZE:] enc := packet.m_body
enc = C_AMF_EncodeString(enc, av__checkbw) enc = C_AMF_EncodeString(enc, av__checkbw)
r.m_numInvokes++ r.m_numInvokes++
@ -1165,7 +1175,7 @@ leave:
func C_HandleChangeChunkSize(r *C_RTMP, packet *C_RTMPPacket) { func C_HandleChangeChunkSize(r *C_RTMP, packet *C_RTMPPacket) {
if packet.m_nBodySize >= 4 { if packet.m_nBodySize >= 4 {
//r.m_inChunkSize = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.m_body)))) //r.m_inChunkSize = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.m_body))))
r.m_inChunkSize = int32(C_AMF_DecodeInt32((*[_Gi]byte)(unsafe.Pointer(packet.m_body))[:4])) r.m_inChunkSize = int32(C_AMF_DecodeInt32(packet.m_body[:4]))
// TODO use new logger here // TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, r.m_inChunkSize); // RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, r.m_inChunkSize);
} }
@ -1174,7 +1184,7 @@ func C_HandleChangeChunkSize(r *C_RTMP, packet *C_RTMPPacket) {
// void HandleServerBW(RTMP* r, const RTMPPacket* packet); // void HandleServerBW(RTMP* r, const RTMPPacket* packet);
// rtmp.c +3508 // rtmp.c +3508
func C_HandlServerBW(r *C_RTMP, packet *C_RTMPPacket) { func C_HandlServerBW(r *C_RTMP, packet *C_RTMPPacket) {
r.m_nServerBW = int32(C_AMF_DecodeInt32((*[_Gi]byte)(unsafe.Pointer(packet.m_body))[:4])) r.m_nServerBW = int32(C_AMF_DecodeInt32(packet.m_body[:4]))
// TODO use new logger here // TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r.m_nServerBW); // RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r.m_nServerBW);
} }
@ -1182,11 +1192,11 @@ func C_HandlServerBW(r *C_RTMP, packet *C_RTMPPacket) {
// void HandleClientBW(RTMP* r, const RTMPPacket* packet); // void HandleClientBW(RTMP* r, const RTMPPacket* packet);
// rtmp.c +3515 // rtmp.c +3515
func C_HandleClientBW(r *C_RTMP, packet *C_RTMPPacket) { func C_HandleClientBW(r *C_RTMP, packet *C_RTMPPacket) {
r.m_nClientBW = int32(C_AMF_DecodeInt32((*[_Gi]byte)(unsafe.Pointer(packet.m_body))[:4])) r.m_nClientBW = int32(C_AMF_DecodeInt32(packet.m_body[:4]))
//r.m_nClientBW = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.m_body)))) //r.m_nClientBW = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.m_body))))
if packet.m_nBodySize > 4 { if packet.m_nBodySize > 4 {
r.m_nClientBW2 = (*[_Gi]byte)(unsafe.Pointer(packet.m_body))[4] r.m_nClientBW2 = packet.m_body[4]
} else { } else {
r.m_nClientBW2 = 0xff r.m_nClientBW2 = 0xff
} }
@ -1341,10 +1351,10 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
if packet.m_chunk != nil { if packet.m_chunk != nil {
packet.m_chunk.c_headerSize = hSize packet.m_chunk.c_headerSize = hSize
copy(packet.m_chunk.c_header[:], hbuf[:hSize]) copy(packet.m_chunk.c_header[:], hbuf[:hSize])
packet.m_chunk.c_chunk = (*[_Gi]byte)(unsafe.Pointer(packet.m_body))[packet.m_nBytesRead : packet.m_nBytesRead+uint32(nChunk)] packet.m_chunk.c_chunk = packet.m_body[packet.m_nBytesRead : packet.m_nBytesRead+uint32(nChunk)]
} }
if C_ReadN(r, pl2b((*byte)(incBytePtr(unsafe.Pointer(packet.m_body), int(packet.m_nBytesRead))), int(nChunk))) != int(nChunk) { if C_ReadN(r, packet.m_body[packet.m_nBytesRead:][:nChunk]) != int(nChunk) {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body") log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body")
return false return false
} }
@ -1499,8 +1509,8 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
t = uint32(int(packet.m_nTimeStamp) - last) t = uint32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil { if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize) header = decBytePtr(unsafe.Pointer(&packet.m_body[0]), nSize)
hend = unsafe.Pointer(packet.m_body) hend = unsafe.Pointer(&packet.m_body[0])
} else { } else {
header = incBytePtr(hbuf, 6) header = incBytePtr(hbuf, 6)
// TODO: be cautious about this sizeof - make sure it works how you think it // TODO: be cautious about this sizeof - make sure it works how you think it
@ -1573,7 +1583,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
} }
nSize = int(packet.m_nBodySize) nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body) buffer = unsafe.Pointer(&packet.m_body[0])
nChunkSize := int(r.m_outChunkSize) nChunkSize := int(r.m_outChunkSize)
if debugMode { if debugMode {
@ -1642,18 +1652,16 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
// We invoked a remote method // We invoked a remote method
// TODO: port the const // TODO: port the const
if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE { if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE {
// TODO: port C_AVal buf := packet.m_body[1:]
var ptr unsafe.Pointer method := C_AMF_DecodeString(buf)
ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1)
method := C_AMF_DecodeString((*[_Gi]byte)(unsafe.Pointer(ptr))[:2])
if debugMode { if debugMode {
log.Printf("invoking %v", method) log.Printf("invoking %v", method)
} }
// keep it in call queue till result arrives // keep it in call queue till result arrives
if queue != 0 { if queue != 0 {
ptr = incBytePtr(ptr, 3+len(method)) buf = buf[3+len(method):]
txn := int32(C_AMF_DecodeNumber((*[_Gi]byte)(unsafe.Pointer(ptr))[:8])) txn := int32(C_AMF_DecodeNumber(buf[:8]))
r.m_methodCalls = append(r.m_methodCalls, C_RTMP_METHOD{name: method, num: txn}) r.m_methodCalls = append(r.m_methodCalls, C_RTMP_METHOD{name: method, num: txn})
} }
} }
@ -1818,17 +1826,14 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int {
return 0 return 0
} }
enc = (*[_Gi]byte)(unsafe.Pointer(pkt.m_body))[:pkt.m_nBodySize] enc = pkt.m_body[:pkt.m_nBodySize]
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
enc = C_AMF_EncodeString(enc, setDataFrame) enc = C_AMF_EncodeString(enc, setDataFrame)
pkt.m_nBytesRead = uint32(len(pkt.m_body) - len(enc))
// TODO: work out what to do with this
pkt.m_nBytesRead = uint32(float64(uintptr(unsafe.Pointer(&enc[0])) -
uintptr(unsafe.Pointer(pkt.m_body))))
} }
} else { } else {
enc = ((*[_Gi]byte)(unsafe.Pointer(pkt.m_body))[:pkt.m_nBodySize])[pkt.m_nBytesRead:] enc = pkt.m_body[:pkt.m_nBodySize][pkt.m_nBytesRead:]
} }
num = int(pkt.m_nBodySize - pkt.m_nBytesRead) num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
if num > len(buf) { if num > len(buf) {

View File

@ -130,7 +130,8 @@ type C_RTMPPacket struct {
m_nBodySize uint32 m_nBodySize uint32
m_nBytesRead uint32 m_nBytesRead uint32
m_chunk *C_RTMPChunk m_chunk *C_RTMPChunk
m_body *byte m_header []byte
m_body []byte
} }
// typedef struct RTMPSockBuf // typedef struct RTMPSockBuf