diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 1693a7f1..16300f4c 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -241,20 +241,19 @@ func connectStream(s *Session) error { } // handlePacket handles a packet that the client has received. -// NB: cases have been commented out that are not currently used by AusOcean +// NB: Unsupported packet types are logged fatally. func handlePacket(s *Session, pkt *packet) error { + if pkt.bodySize < 4 { + return errInvalidBody + } + switch pkt.packetType { case packetTypeChunkSize: - if pkt.bodySize >= 4 { - s.inChunkSize = int32(amf.DecodeInt32(pkt.body[:4])) - } + s.inChunkSize = int32(amf.DecodeInt32(pkt.body[:4])) case packetTypeBytesReadReport: s.serverBW = int32(amf.DecodeInt32(pkt.body[:4])) - case packetTypeControl: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") - case packetTypeServerBW: s.serverBW = int32(amf.DecodeInt32(pkt.body[:4])) @@ -266,18 +265,6 @@ func handlePacket(s *Session, pkt *packet) error { s.clientBW2 = 0xff } - case packetTypeAudio: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio") - - case packetTypeVideo: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo") - - case packetTypeFlexMessage: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage") - - case packetTypeInfo: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo") - case packetTypeInvoke: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { @@ -286,8 +273,8 @@ func handlePacket(s *Session, pkt *packet) error { return err } - case packetTypeFlashVideo: - s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") + case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo: + s.log(FatalLevel, pkg+"unsupported packet type "+strconv.Itoa(int(pkt.packetType))) default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) @@ -315,49 +302,21 @@ func sendConnectPacket(s *Session) error { if err != nil { return err } + enc[0] = amf.TypeObject enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, s.link.app) if err != nil { return err } - if s.link.protocol&featureWrite != 0 { - enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) - if err != nil { - return err - } + enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) + if err != nil { + return err } - if s.link.url != "" { - enc, err = amf.EncodeNamedString(enc, avTcUrl, s.link.url) - if err != nil { - return err - } + enc, err = amf.EncodeNamedString(enc, avTcUrl, s.link.url) + if err != nil { + return err } - if s.link.protocol&featureWrite == 0 { - enc, err = amf.EncodeNamedBoolean(enc, avFpad, false) - if err != nil { - return err - } - enc, err = amf.EncodeNamedNumber(enc, avCapabilities, 15) - if err != nil { - return err - } - enc, err = amf.EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) - if err != nil { - return err - } - enc, err = amf.EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) - if err != nil { - return err - } - enc, err = amf.EncodeNamedNumber(enc, avVideoFunction, 1) - if err != nil { - return err - } - } - - // terminate the AMF object enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) if err != nil { return err @@ -707,8 +666,8 @@ func handleInvoke(s *Session, body []byte) error { return err } - case avPlay, avPublish: - s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") + default: + s.log(FatalLevel, pkg+"unexpected method invoked"+methodInvoked) } case avOnBWDone: @@ -719,21 +678,6 @@ func handleInvoke(s *Session, body []byte) error { } } - case avOnFCUnsubscribe, avOnFCSubscribe: - s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe") - - case avPing: - s.log(FatalLevel, pkg+"unsupported method avPing") - - case av_onbwcheck: - s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") - - case av_onbwdone: - s.log(FatalLevel, pkg+"unsupported method av_onbwdone") - - case avClose: - s.log(FatalLevel, pkg+"unsupported method avClose") - case avOnStatus: obj2, err := obj.ObjectProperty("", 3) if err != nil { @@ -749,40 +693,20 @@ func handleInvoke(s *Session, body []byte) error { } s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) - switch code { - case avNetStreamFailed, avNetStreamPlayFailed, - avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: - s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") - - case avNetStreamPlayStart, avNetStreamPlayPublishNotify: - s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify") - - case avNetStreamPublish_Start: - s.log(DebugLevel, pkg+"playing") - s.isPlaying = true - for i, m := range s.methodCalls { - if m.name == avPublish { - s.methodCalls = eraseMethod(s.methodCalls, i) - break - } + if code != avNetStreamPublish_Start { + s.log(ErrorLevel, pkg+"unexpected response "+code) + return errUnimplemented + } + s.log(DebugLevel, pkg+"playing") + s.isPlaying = true + for i, m := range s.methodCalls { + if m.name == avPublish { + s.methodCalls = eraseMethod(s.methodCalls, i) } - // ToDo: handle case when avPublish method not found - - case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify: - s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify") - - case avNetStreamSeekNotify: - s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify") - - case avNetStreamPauseNotify: - s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify") } - case avPlaylist_ready: - s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready") - default: - s.log(FatalLevel, pkg+"unknown method "+meth) + s.log(FatalLevel, pkg+"unsuppoted method "+meth) } return nil }