diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 28a86a60..268d9371 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -221,18 +221,18 @@ var ( av_onStatus = AVC("onStatus") av_code = AVC("code") av_level = AVC("level") - av_NetStream_Failed = AVC("NetStream_Failed") - av_NetStream_Play_Failed = AVC("NetStream_Play_Failed") - av_NetConnection_Connect_InvalidApp = AVC("NetConnection_Connect_InvalidApp") - av_NetStream_Play_StreamNotFound = AVC("NetStream_Play_StreamNotFound") - av_NetStream_Play_Start = AVC("NetStream_Play_Start") - av_NetStream_Play_PublishNotify = AVC("NetStream_Play_PublishNotify") - av_NetStream_Publish_Start = AVC("NetStream_Publish_Start") - av_NetStream_Play_Complete = AVC("NetStream_Play_Complete") - av_NetStream_Play_Stop = AVC("NetStream_Play_Stop") - av_NetStream_Play_UnpublishNotify = AVC("NetStream_Play_UnpublishNotify") - av_NetStream_Seek_Notify = AVC("NetStream_Play_Seek_Notify") - av_NetStream_Pause_Notify = AVC("NetStream_Pause_Notify") + av_NetStream_Failed = AVC("NetStream.Failed") + av_NetStream_Play_Failed = AVC("NetStream.Play.Failed") + av_NetConnection_Connect_InvalidApp = AVC("NetConnection.Connect.InvalidApp") + av_NetStream_Play_StreamNotFound = AVC("NetStream.Play.StreamNotFound") + av_NetStream_Play_Start = AVC("NetStream.Play.Start") + av_NetStream_Play_PublishNotify = AVC("NetStream.Play.PublishNotify") + av_NetStream_Publish_Start = AVC("NetStream.Publish.Start") + av_NetStream_Play_Complete = AVC("NetStream.Play.Complete") + av_NetStream_Play_Stop = AVC("NetStream.Play.Stop") + av_NetStream_Play_UnpublishNotify = AVC("NetStream.Play.UnpublishNotify") + av_NetStream_Seek_Notify = AVC("NetStream.Seek.Notify") + av_NetStream_Pause_Notify = AVC("NetStream.Pause.Notify") av_playlist_ready = AVC("playlist_ready") av_set_playlist = AVC("set_playlist") ) @@ -1847,8 +1847,9 @@ func C_RTMP_ClientPacket(r *C.RTMP, packet *C.RTMPPacket) int32 { // TODO use new logger here //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize); - if C.HandleInvoke(r, packet.m_body, C.uint(packet.m_nBodySize)) == 1 { - //if C_HandleInvoke(r, (*byte)(unsafe.Pointer(packet.m_body), uint32(packet.m_nBodySize)) == 1 { + //if C.HandleInvoke(r, packet.m_body, C.uint(packet.m_nBodySize)) == 1 { + if C_HandleInvoke(r, (*byte)(unsafe.Pointer(packet.m_body)), uint32(packet.m_nBodySize)) == 1 { + log.Println("HasMediaPacket") bHasMediaPacket = 2 } /* @@ -1964,11 +1965,12 @@ func C_HandleClientBW(r *C.RTMP, packet *C.RTMPPacket) { func C_HandleInvoke(r *C.RTMP, body *byte, nBodySize uint32) int32 { var obj C.AMFObject var method C.AVal - var txn float32 + var txn float64 var ret int32 = 0 var nRes int32 if *body != 0x02 { + log.Println("here") // TODO use new logger here //RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet", //__FUNCTION__); @@ -1978,6 +1980,7 @@ func C_HandleInvoke(r *C.RTMP, body *byte, nBodySize uint32) int32 { nRes = int32(C.AMF_Decode(&obj, (*C.char)(unsafe.Pointer(body)), C.int(nBodySize), 0)) // nRes = C_AMF_Decode(&obj, body, nBodySize, 0) if nRes < 0 { + log.Println("here2") // TODO use new logger here //RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); return 0 @@ -1985,169 +1988,234 @@ func C_HandleInvoke(r *C.RTMP, body *byte, nBodySize uint32) int32 { C.AMF_Dump(&obj) C.AMFProp_GetString(C.AMF_GetProp(&obj, nil, 0), &method) - txn = float32(C.AMFProp_GetNumber(C.AMF_GetProp(&obj, nil, 1))) + txn = float64(C.AMFProp_GetNumber(C.AMF_GetProp(&obj, nil, 1))) // TODO use new logger here // RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); switch { case C_AVMATCH(&method, &av__result) != 0: - var methodInvoked C.AVal - var i int32 - for i = 0; i < int32(r.m_numCalls); i++ { - if float32((*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), - int(unsafe.Sizeof(*r.m_methodCalls))))).num) == txn { - methodInvoked = (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), - int(i), int(unsafe.Sizeof(*r.m_methodCalls))))).name - C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 0) - break + { + log.Println("1") + var methodInvoked C.AVal + var i int32 + for i = 0; i < int32(r.m_numCalls); i++ { + if float64((*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), + int(unsafe.Sizeof(*r.m_methodCalls))))).num) == txn { + methodInvoked = (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), + int(i), int(unsafe.Sizeof(*r.m_methodCalls))))).name + C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 0) + break + } + } + if methodInvoked.av_val == nil { + // TODO use new logger here + //RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", + //__FUNCTION__, txn); + goto leave } - } - if methodInvoked.av_val == nil { // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", - //__FUNCTION__, txn); - goto leave - } - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, - //methodInvoked.av_val); - switch { - case C_AVMATCH(&methodInvoked, &av_connect) != 0: - if r.Link.token.av_len != 0 { - var p C.AMFObjectProperty - if C.RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p) != 0 { - C.DecodeTEA(&r.Link.token, &p.p_vu.p_aval) - C.SendSecureTokenResponse(r, &p.p_vu.p_aval) - } - } - if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 { - C.SendReleaseStream(r) - C.SendFCPublish(r) - } else { - C.RTMP_SendServerBW(r) - C.RTMP_SendCtrl(r, 3, 0, 300) - } - C.RTMP_SendCreateStream(r) + //RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, + //methodInvoked.av_val); + switch { + case C_AVMATCH(&methodInvoked, &av_connect) != 0: + { + log.Println("2") + if r.Link.token.av_len != 0 { + var p C.AMFObjectProperty + if C.RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p) != 0 { + C.DecodeTEA(&r.Link.token, &p.p_vu.p_aval) + C.SendSecureTokenResponse(r, &p.p_vu.p_aval) + } + } + if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 { + C.SendReleaseStream(r) + C.SendFCPublish(r) + } else { + C.RTMP_SendServerBW(r) + C.RTMP_SendCtrl(r, 3, 0, 300) + } + C.RTMP_SendCreateStream(r) - if (r.Link.protocol & RTMP_FEATURE_WRITE) == 0 { - /* Authenticate on Justin.tv legacy servers before sending FCSubscribe */ - if r.Link.usherToken.av_len != 0 { - C.SendUsherToken(r, &r.Link.usherToken) + if (r.Link.protocol & RTMP_FEATURE_WRITE) == 0 { + /* Authenticate on Justin.tv legacy servers before sending FCSubscribe */ + if r.Link.usherToken.av_len != 0 { + C.SendUsherToken(r, &r.Link.usherToken) + } + /* Send the FCSubscribe if live stream or if subscribepath is set */ + switch { + case r.Link.subscribepath.av_len != 0: + { + log.Println("3") + C.SendFCSubscribe(r, &r.Link.subscribepath) + } + case (r.Link.lFlags & RTMP_LF_LIVE) != 0: + { + log.Println("4") + C.SendFCSubscribe(r, &r.Link.playpath) + } + } + } } - /* Send the FCSubscribe if live stream or if subscribepath is set */ - switch { - case r.Link.subscribepath.av_len != 0: - C.SendFCSubscribe(r, &r.Link.subscribepath) - case (r.Link.lFlags & RTMP_LF_LIVE) != 0: - C.SendFCSubscribe(r, &r.Link.playpath) - } - } - case C_AVMATCH(&methodInvoked, &av_createStream) != 0: - r.m_stream_id = C.int(C.AMFProp_GetNumber(C.AMF_GetProp(&obj, nil, 3))) + case C_AVMATCH(&methodInvoked, &av_createStream) != 0: + { + log.Println("5") + r.m_stream_id = C.int(C.AMFProp_GetNumber(C.AMF_GetProp(&obj, nil, 3))) - if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 { - C.SendPublish(r) - } else { - if (r.Link.lFlags & RTMP_LF_PLST) != 0 { - C.SendPlaylist(r) + if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 { + C.SendPublish(r) + } else { + if (r.Link.lFlags & RTMP_LF_PLST) != 0 { + C.SendPlaylist(r) + } + C.SendPlay(r) + C.RTMP_SendCtrl(r, 3, C.uint(r.m_stream_id), C.uint(r.m_nBufferMS)) + } + } + case C_AVMATCH(&methodInvoked, &av_play) != 0 || + C_AVMATCH(&methodInvoked, &av_publish) != 0: + { + log.Println("6") + r.m_bPlaying = 1 } - C.SendPlay(r) - C.RTMP_SendCtrl(r, 3, C.uint(r.m_stream_id), C.uint(r.m_nBufferMS)) } - case C_AVMATCH(&methodInvoked, &av_play) != 0 || - C_AVMATCH(&methodInvoked, &av_publish) != 0: - r.m_bPlaying = 1 + C.free(unsafe.Pointer(methodInvoked.av_val)) } - //free(methodInvoked.av_val) case C_AVMATCH(&method, &av_onBWDone) != 0: - if r.m_nBWCheckCounter == 0 { - C.SendCheckBW(r) + { + log.Println("7") + if r.m_nBWCheckCounter == 0 { + C.SendCheckBW(r) + } } case C_AVMATCH(&method, &av_onFCSubscribe) != 0: /* SendOnFCSubscribe(); */ case C_AVMATCH(&method, &av_onFCUnsubscribe) != 0: - C.RTMP_Close(r) - ret = 1 + { + log.Println("8") + C.RTMP_Close(r) + ret = 1 + } case C_AVMATCH(&method, &av_ping) != 0: - C.SendPong(r, C.double(txn)) + { + log.Println("9") + C.SendPong(r, C.double(txn)) + } case C_AVMATCH(&method, &av__onbwcheck) != 0: - C.SendCheckBWResult(r, C.double(txn)) + { + log.Println("10") + C.SendCheckBWResult(r, C.double(txn)) + } case C_AVMATCH(&method, &av__onbwdone) != 0: - var i int32 - for i = 0; i < int32(r.m_numCalls); i++ { - if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), - int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av__checkbw) != 0 { - C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) - break + { + log.Println("11") + var i int32 + for i = 0; i < int32(r.m_numCalls); i++ { + if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), + int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av__checkbw) != 0 { + C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) + break + } } } case C_AVMATCH(&method, &av_close) != 0: - // TODO use new logger - //RTMP_Log(RTMP_LOGERROR, "rtmp server requested close"); - C.RTMP_Close(r) - case C_AVMATCH(&method, &av_onStatus) != 0: - var obj2 C.AMFObject - var code, level C.AVal - C.AMFProp_GetObject(C.AMF_GetProp(&obj, nil, 3), &obj2) - C.AMFProp_GetString(C.AMF_GetProp(&obj2, &av_code, -1), &code) - C.AMFProp_GetString(C.AMF_GetProp(&obj2, &av_level, -1), &level) - - // TODO use new logger - // RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); - switch { - case C_AVMATCH(&code, &av_NetStream_Failed) != 0 || - C_AVMATCH(&code, &av_NetStream_Play_Failed) != 0 || - C_AVMATCH(&code, &av_NetStream_Play_StreamNotFound) != 0 || - C_AVMATCH(&code, &av_NetConnection_Connect_InvalidApp) != 0: - r.m_stream_id = -1 - C.RTMP_Close(r) + { + log.Println("12") // TODO use new logger - // RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val); - case C_AVMATCH(&code, &av_NetStream_Play_Start) != 0 || - C_AVMATCH(&code, &av_NetStream_Play_PublishNotify) != 0: - var i int32 - r.m_bPlaying = 1 - for i = 0; i < int32(r.m_numCalls); i++ { - if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), - int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_play) != 0 { - C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) - break - } - } - case C_AVMATCH(&code, &av_NetStream_Publish_Start) != 0: - var i int32 - r.m_bPlaying = 1 - for i = 0; i < int32(r.m_numCalls); i++ { - if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), - int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_publish) != 0 { - C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) - break - } - } - case C_AVMATCH(&code, &av_NetStream_Play_Complete) != 0 || - C_AVMATCH(&code, &av_NetStream_Play_Stop) != 0 || - C_AVMATCH(&code, &av_NetStream_Play_UnpublishNotify) != 0: + //RTMP_Log(RTMP_LOGERROR, "rtmp server requested close"); C.RTMP_Close(r) - ret = 1 - case C_AVMATCH(&code, &av_NetStream_Seek_Notify) != 0: - // NOTE ~ has been replace by ^ - is this correct ? - r.m_read.flags = C.uint8_t(int8(r.m_read.flags) & ^RTMP_READ_SEEKING) - case C_AVMATCH(&code, &av_NetStream_Pause_Notify) != 0: - if r.m_pausing == 1 || r.m_pausing == 2 { - C.RTMP_SendPause(r, 0, C.int(r.m_pauseStamp)) - r.m_pausing = 3 + } + case C_AVMATCH(&method, &av_onStatus) != 0: + { + log.Println("13") + var obj2 C.AMFObject + log.Println("13.1") + var code, level C.AVal + log.Println("13.2") + C.AMFProp_GetObject(C.AMF_GetProp(&obj, nil, 3), &obj2) + log.Println("13.3") + C.AMFProp_GetString(C.AMF_GetProp(&obj2, &av_code, -1), &code) + log.Println("13.4") + C.AMFProp_GetString(C.AMF_GetProp(&obj2, &av_level, -1), &level) + + // TODO use new logger + // RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); + switch { + case C_AVMATCH(&code, &av_NetStream_Failed) != 0 || + C_AVMATCH(&code, &av_NetStream_Play_Failed) != 0 || + C_AVMATCH(&code, &av_NetStream_Play_StreamNotFound) != 0 || + C_AVMATCH(&code, &av_NetConnection_Connect_InvalidApp) != 0: + { + log.Println("14") + r.m_stream_id = -1 + C.RTMP_Close(r) + // TODO use new logger + // RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val); + } + case C_AVMATCH(&code, &av_NetStream_Play_Start) != 0 || + C_AVMATCH(&code, &av_NetStream_Play_PublishNotify) != 0: + { + log.Println("15") + var i int32 + r.m_bPlaying = 1 + for i = 0; i < int32(r.m_numCalls); i++ { + if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), + int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_play) != 0 { + C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) + break + } + } + } + case C_AVMATCH(&code, &av_NetStream_Publish_Start) != 0: + { + log.Println("16") + var i int32 + r.m_bPlaying = 1 + for i = 0; i < int32(r.m_numCalls); i++ { + if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), + int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_publish) != 0 { + C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) + break + } + } + } + case C_AVMATCH(&code, &av_NetStream_Play_Complete) != 0 || + C_AVMATCH(&code, &av_NetStream_Play_Stop) != 0 || + C_AVMATCH(&code, &av_NetStream_Play_UnpublishNotify) != 0: + { + log.Println("17") + C.RTMP_Close(r) + ret = 1 + } + case C_AVMATCH(&code, &av_NetStream_Seek_Notify) != 0: + { + log.Println("18") + // NOTE ~ has been replace by ^ - is this correct ? + r.m_read.flags = C.uint8_t(int8(r.m_read.flags) & ^RTMP_READ_SEEKING) + } + case C_AVMATCH(&code, &av_NetStream_Pause_Notify) != 0: + { + log.Println("19") + if r.m_pausing == 1 || r.m_pausing == 2 { + C.RTMP_SendPause(r, 0, C.int(r.m_pauseStamp)) + r.m_pausing = 3 + } + } } } case C_AVMATCH(&method, &av_playlist_ready) != 0: - var i int32 - for i = 0; i < int32(r.m_numCalls); i++ { - if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), - int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_set_playlist) != 0 { - C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) - break + { + log.Println("19") + var i int32 + for i = 0; i < int32(r.m_numCalls); i++ { + if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i), + int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_set_playlist) != 0 { + C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1) + break + } } } default: + log.Println("20") } leave: C.AMF_Reset(&obj) @@ -2158,7 +2226,7 @@ leave: // amf.h +63 func C_AVMATCH(a1, a2 *C.AVal) int32 { if a1.av_len == a2.av_len && memcmp(unsafe.Pointer(a1.av_val), - unsafe.Pointer(a2.av_val), int(a1.av_len)) != 0 { + unsafe.Pointer(a2.av_val), int(a1.av_len)) == 0 { return 1 } else { return 0