Ported HandleInvoke - need to test

This commit is contained in:
saxon 2018-08-14 18:22:37 +09:30
parent 73e2c9977a
commit da99cbe86b
1 changed files with 194 additions and 155 deletions

View File

@ -205,6 +205,36 @@ var (
av_videoFunction = AVC("videoFunction") av_videoFunction = AVC("videoFunction")
av_pageUrl = AVC("pageUrl") av_pageUrl = AVC("pageUrl")
av_objectEncoding = AVC("objectEncoding") av_objectEncoding = AVC("objectEncoding")
av__result = AVC("_result")
av_secureToken = AVC("secureToken")
av_createStream = AVC("createStream")
av_play = AVC("play")
av_publish = AVC("publish")
av_onBWDone = AVC("onBWDone")
av_onFCSubscribe = AVC("onFCSubscribe")
av_onFCUnsubscribe = AVC("onFCUnsubscribe")
av__onbwcheck = AVC("_onbwcheck")
av__onbwdone = AVC("_onbwdone")
av_ping = AVC("ping")
av__checkbw = AVC("_checkbw")
av_close = AVC("close")
av_onStatus = AVC("onStatus")
av_code = AVC("code")
av_level = AVC("level")
av_NetStream_Failed = AVC("NetStream_Failed")
av_NetStream_Play_Failed = AVC("NetStream_Play_Failed")
av_NetConnection_Connect_InvalidApp = AVC("NetConnection_Connect_InvalidApp")
av_NetStream_Play_StreamNotFound = AVC("NetStream_Play_StreamNotFound")
av_NetStream_Play_Start = AVC("NetStream_Play_Start")
av_NetStream_Play_PublishNotify = AVC("NetStream_Play_PublishNotify")
av_NetStream_Publish_Start = AVC("NetStream_Publish_Start")
av_NetStream_Play_Complete = AVC("NetStream_Play_Complete")
av_NetStream_Play_Stop = AVC("NetStream_Play_Stop")
av_NetStream_Play_UnpublishNotify = AVC("NetStream_Play_UnpublishNotify")
av_NetStream_Seek_Notify = AVC("NetStream_Play_Seek_Notify")
av_NetStream_Pause_Notify = AVC("NetStream_Pause_Notify")
av_playlist_ready = AVC("playlist_ready")
av_set_playlist = AVC("set_playlist")
) )
var packetSize = [...]int{12, 8, 4, 1} var packetSize = [...]int{12, 8, 4, 1}
@ -1301,7 +1331,7 @@ func C_ReadN(r *C.RTMP, buffer *byte, n int) int {
if nBytes == 0 { if nBytes == 0 {
log.Println("RTMP socket closed by peer") log.Println("RTMP socket closed by peer")
// RTMP_Close(r) // C.RTMP_Close(r)
break break
} }
@ -1930,6 +1960,7 @@ func C_HandleClientBW(r *C.RTMP, packet *C.RTMPPacket) {
// TODO port SendCheckBWResult // TODO port SendCheckBWResult
// TODO port AMFProp_GetObject // TODO port AMFProp_GetObject
// TODO port AV_erase // TODO port AV_erase
// TODO port AMF_Decode
func C_HandleInvoke(r *C.RTMP, body *byte, nBodySize uint32) int32 { func C_HandleInvoke(r *C.RTMP, body *byte, nBodySize uint32) int32 {
var obj C.AMFObject var obj C.AMFObject
var method C.AVal var method C.AVal
@ -1944,182 +1975,190 @@ func C_HandleInvoke(r *C.RTMP, body *byte, nBodySize uint32) int32 {
return 0 return 0
} }
// nRes = C.AMF_Decode(&obj, body, nBodySize, 0) nRes = int32(C.AMF_Decode(&obj, (*C.char)(unsafe.Pointer(body)), C.int(nBodySize), 0))
nRes = C_AMF_Decode(&obj, body, nBodySize, 0); // nRes = C_AMF_Decode(&obj, body, nBodySize, 0)
if nRes < 0 { if nRes < 0 {
// TODO use new logger here // TODO use new logger here
//RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); //RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
return 0 return 0
} }
C.AMF_Dump(&obj); C.AMF_Dump(&obj)
C.AMFProp_GetString(C.AMF_GetProp(&obj, NULL, 0), &method); C.AMFProp_GetString(C.AMF_GetProp(&obj, nil, 0), &method)
txn = C.AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1)); txn = float32(C.AMFProp_GetNumber(C.AMF_GetProp(&obj, nil, 1)))
// TODO use new logger here // TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); // RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val);
switch { switch {
case C_AVMATCH(&method, &av__result): case C_AVMATCH(&method, &av__result) != 0:
var methodInvoked C.AVal var methodInvoked C.AVal
var i int32 var i int32
for i=0; i < r.m_numCalls; i++ { for i = 0; i < int32(r.m_numCalls); i++ {
if r.m_methodCalls[i].num == (int)txn { if float32((*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i),
methodInvoked = r.m_methodCalls[i].name; int(unsafe.Sizeof(*r.m_methodCalls))))).num) == txn {
C.AV_erase(r.m_methodCalls, &r.m_numCalls, i, FALSE); methodInvoked = (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls),
break; int(i), int(unsafe.Sizeof(*r.m_methodCalls))))).name
C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 0)
break
} }
} }
if methodInvoked.av_val == 0 { if methodInvoked.av_val == nil {
// TODO use new logger here // TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", //RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request",
//__FUNCTION__, txn); //__FUNCTION__, txn);
goto leave; goto leave
} }
// TODO use new logger here // TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, //RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__,
//methodInvoked.av_val); //methodInvoked.av_val);
switch { switch {
case C_AVMATCH(&methodInvoked, &av_connect): case C_AVMATCH(&methodInvoked, &av_connect) != 0:
if r.Link.token.av_len != 0 { if r.Link.token.av_len != 0 {
var p C.AMFObjectProperty var p C.AMFObjectProperty
if C.RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p){ if C.RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p) != 0 {
C.DecodeTEA(&r.Link.token, &p.p_vu.p_aval); C.DecodeTEA(&r.Link.token, &p.p_vu.p_aval)
C.SendSecureTokenResponse(r, &p.p_vu.p_aval); C.SendSecureTokenResponse(r, &p.p_vu.p_aval)
} }
} }
if (r.Link.protocol & RTMP_FEATURE_WRITE){ if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 {
C.SendReleaseStream(r); C.SendReleaseStream(r)
C.SendFCPublish(r); C.SendFCPublish(r)
} else { } else {
C.RTMP_SendServerBW(r); C.RTMP_SendServerBW(r)
C.RTMP_SendCtrl(r, 3, 0, 300); C.RTMP_SendCtrl(r, 3, 0, 300)
} }
C.RTMP_SendCreateStream(r); C.RTMP_SendCreateStream(r)
if (r.Link.protocol & RTMP_FEATURE_WRITE) == 0 { if (r.Link.protocol & RTMP_FEATURE_WRITE) == 0 {
/* Authenticate on Justin.tv legacy servers before sending FCSubscribe */ /* Authenticate on Justin.tv legacy servers before sending FCSubscribe */
if r.Link.usherToken.av_len != 0 { if r.Link.usherToken.av_len != 0 {
C.SendUsherToken(r, &r.Link.usherToken); C.SendUsherToken(r, &r.Link.usherToken)
} }
/* Send the FCSubscribe if live stream or if subscribepath is set */ /* Send the FCSubscribe if live stream or if subscribepath is set */
switch { switch {
case r.Link.subscribepath.av_len: case r.Link.subscribepath.av_len != 0:
C.SendFCSubscribe(r, &r.Link.subscribepath); C.SendFCSubscribe(r, &r.Link.subscribepath)
case r.Link.lFlags & RTMP_LF_LIV: case (r.Link.lFlags & RTMP_LF_LIVE) != 0:
C.SendFCSubscribe(r, &r.Link.playpath); C.SendFCSubscribe(r, &r.Link.playpath)
} }
} }
case C_AVMATCH(&methodInvoked, &av_createStream): case C_AVMATCH(&methodInvoked, &av_createStream) != 0:
r.m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); r.m_stream_id = C.int(C.AMFProp_GetNumber(C.AMF_GetProp(&obj, nil, 3)))
if (r.Link.protocol & RTMP_FEATURE_WRITE){ if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 {
C.SendPublish(r); C.SendPublish(r)
} else { } else {
if (r.Link.lFlags & RTMP_LF_PLST){ if (r.Link.lFlags & RTMP_LF_PLST) != 0 {
C.SendPlaylist(r); C.SendPlaylist(r)
} }
C.SendPlay(r); C.SendPlay(r)
C.RTMP_SendCtrl(r, 3, r.m_stream_id, r.m_nBufferMS); C.RTMP_SendCtrl(r, 3, C.uint(r.m_stream_id), C.uint(r.m_nBufferMS))
} }
case C_AVMATCH(&methodInvoked, &av_play) || case C_AVMATCH(&methodInvoked, &av_play) != 0 ||
C_AVMATCH(&methodInvoked, &av_publish): C_AVMATCH(&methodInvoked, &av_publish) != 0:
r.m_bPlaying = TRUE; r.m_bPlaying = 1
} }
free(methodInvoked.av_val); //free(methodInvoked.av_val)
case (C_AVMATCH(&method, &av_onBWDone)): case C_AVMATCH(&method, &av_onBWDone) != 0:
if (!r.m_nBWCheckCounter){ if r.m_nBWCheckCounter == 0 {
C.SendCheckBW(r); C.SendCheckBW(r)
} }
case C_AVMATCH(&method, &av_onFCSubscribe): case C_AVMATCH(&method, &av_onFCSubscribe) != 0:
/* SendOnFCSubscribe(); */ /* SendOnFCSubscribe(); */
case C_AVMATCH(&method, &av_onFCUnsubscribe): case C_AVMATCH(&method, &av_onFCUnsubscribe) != 0:
RTMP_Close(r); C.RTMP_Close(r)
ret = 1; ret = 1
case C_AVMATCH(&method, &av_ping): case C_AVMATCH(&method, &av_ping) != 0:
C.SendPong(r, txn); C.SendPong(r, C.double(txn))
case C_AVMATCH(&method, &av__onbwcheck): case C_AVMATCH(&method, &av__onbwcheck) != 0:
C.SendCheckBWResult(r, txn); C.SendCheckBWResult(r, C.double(txn))
case C_AVMATCH(&method, &av__onbwdone): case C_AVMATCH(&method, &av__onbwdone) != 0:
int i; var i int32
for (i = 0; i < r.m_numCalls; i++){ for i = 0; i < int32(r.m_numCalls); i++ {
if (C_AVMATCH(&r.m_methodCalls[i].name, &av__checkbw)){ if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i),
C.AV_erase(r.m_methodCalls, &r.m_numCalls, i, TRUE); int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av__checkbw) != 0 {
break; C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1)
break
} }
} }
case C_AVMATCH(&method, &av_close): case C_AVMATCH(&method, &av_close) != 0:
// TODO use new logger // TODO use new logger
//RTMP_Log(RTMP_LOGERROR, "rtmp server requested close"); //RTMP_Log(RTMP_LOGERROR, "rtmp server requested close");
RTMP_Close(r); C.RTMP_Close(r)
case C_AVMATCH(&method, &av_onStatus): case C_AVMATCH(&method, &av_onStatus) != 0:
var obj2 C.AMFObject var obj2 C.AMFObject
var code, level C.AVal var code, level C.AVal
C.AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2); C.AMFProp_GetObject(C.AMF_GetProp(&obj, nil, 3), &obj2)
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code); C.AMFProp_GetString(C.AMF_GetProp(&obj2, &av_code, -1), &code)
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level); C.AMFProp_GetString(C.AMF_GetProp(&obj2, &av_level, -1), &level)
// TODO use new logger // TODO use new logger
// RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); // RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
switch { switch {
case C_AVMATCH(&code, &av_NetStream_Failed) case C_AVMATCH(&code, &av_NetStream_Failed) != 0 ||
|| C_AVMATCH(&code, &av_NetStream_Play_Failed) C_AVMATCH(&code, &av_NetStream_Play_Failed) != 0 ||
|| C_AVMATCH(&code, &av_NetStream_Play_StreamNotFound) C_AVMATCH(&code, &av_NetStream_Play_StreamNotFound) != 0 ||
|| C_AVMATCH(&code, &av_NetConnection_Connect_InvalidApp): C_AVMATCH(&code, &av_NetConnection_Connect_InvalidApp) != 0:
r.m_stream_id = -1; r.m_stream_id = -1
RTMP_Close(r); C.RTMP_Close(r)
// TODO use new logger // TODO use new logger
// RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val); // RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val);
case C_AVMATCH(&code, &av_NetStream_Play_Start) case C_AVMATCH(&code, &av_NetStream_Play_Start) != 0 ||
|| C_AVMATCH(&code, &av_NetStream_Play_PublishNotify): C_AVMATCH(&code, &av_NetStream_Play_PublishNotify) != 0:
int i; var i int32
r.m_bPlaying = TRUE; r.m_bPlaying = 1
for (i = 0; i < r.m_numCalls; i++){ for i = 0; i < int32(r.m_numCalls); i++ {
if (C_AVMATCH(&r.m_methodCalls[i].name, &av_play)){ if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i),
C.AV_erase(r.m_methodCalls, &r.m_numCalls, i, TRUE); int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_play) != 0 {
break; C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1)
break
} }
} }
case C_AVMATCH(&code, &av_NetStream_Publish_Start): case C_AVMATCH(&code, &av_NetStream_Publish_Start) != 0:
int i; var i int32
r.m_bPlaying = TRUE; r.m_bPlaying = 1
for (i = 0; i < r.m_numCalls; i++){ for i = 0; i < int32(r.m_numCalls); i++ {
if (C_AVMATCH(&r.m_methodCalls[i].name, &av_publish)){ if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i),
C.AV_erase(r.m_methodCalls, &r.m_numCalls, i, TRUE); int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_publish) != 0 {
break; C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1)
break
} }
} }
case C_AVMATCH(&code, &av_NetStream_Play_Complete) case C_AVMATCH(&code, &av_NetStream_Play_Complete) != 0 ||
|| C_AVMATCH(&code, &av_NetStream_Play_Stop) C_AVMATCH(&code, &av_NetStream_Play_Stop) != 0 ||
|| C_AVMATCH(&code, &av_NetStream_Play_UnpublishNotify): C_AVMATCH(&code, &av_NetStream_Play_UnpublishNotify) != 0:
RTMP_Close(r); C.RTMP_Close(r)
ret = 1; ret = 1
case C_AVMATCH(&code, &av_NetStream_Seek_Notify): case C_AVMATCH(&code, &av_NetStream_Seek_Notify) != 0:
r.m_read.flags &= ~RTMP_READ_SEEKING; // NOTE ~ has been replace by ^ - is this correct ?
case C_AVMATCH(&code, &av_NetStream_Pause_Notify): r.m_read.flags = C.uint8_t(int8(r.m_read.flags) & ^RTMP_READ_SEEKING)
if (r.m_pausing == 1 || r.m_pausing == 2){ case C_AVMATCH(&code, &av_NetStream_Pause_Notify) != 0:
RTMP_SendPause(r, FALSE, r.m_pauseStamp); if r.m_pausing == 1 || r.m_pausing == 2 {
r.m_pausing = 3; C.RTMP_SendPause(r, 0, C.int(r.m_pauseStamp))
r.m_pausing = 3
} }
} }
case C_AVMATCH(&method, &av_playlist_ready): case C_AVMATCH(&method, &av_playlist_ready) != 0:
int i; var i int32
for (i = 0; i < r.m_numCalls; i++){ for i = 0; i < int32(r.m_numCalls); i++ {
if (C_AVMATCH(&r.m_methodCalls[i].name, &av_set_playlist)){ if C_AVMATCH(&(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(r.m_methodCalls), int(i),
C.AV_erase(r.m_methodCalls, &r.m_numCalls, i, TRUE); int(unsafe.Sizeof(*r.m_methodCalls))))).name, &av_set_playlist) != 0 {
break; C.AV_erase(r.m_methodCalls, &r.m_numCalls, C.int(i), 1)
break
} }
} }
default: default:
} }
leave: leave:
C.AMF_Reset(&obj); C.AMF_Reset(&obj)
return ret; return ret
} }
// #define AVMATCH(a1,a2) // #define AVMATCH(a1,a2)
// amf.h +63 // amf.h +63
func C_AVMATCH(a1, a2 C.AVal) int32 { func C_AVMATCH(a1, a2 *C.AVal) int32 {
if a1.av_len == a2.av_len && memcmp(unsafe.Pointer(a1.av_val), unsafe.Pointer(a2.av_val), a1.av_len) { if a1.av_len == a2.av_len && memcmp(unsafe.Pointer(a1.av_val),
unsafe.Pointer(a2.av_val), int(a1.av_len)) != 0 {
return 1 return 1
} else { } else {
return 0 return 0