/* NAME rtmp.go DESCRIPTION RTMP command functionality. AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE rtmp.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "bytes" "encoding/binary" "errors" "math/rand" "net" "strconv" "time" ) const ( pkg = "rtmp:" minDataSize = 11 // ToDo: this should be the same as fullHeaderSize signatureSize = 1536 fullHeaderSize = 12 ) // Link flags. const ( linkAuth = 0x0001 // using auth param linkLive = 0x0002 // stream is live linkSWF = 0x0004 // do SWF verification - not implemented linkPlaylist = 0x0008 // send playlist before play - not implemented linkBufx = 0x0010 // toggle stream on BufferEmpty msg - not implemented ) // Protocol features. const ( featureHTTP = 0x01 // not implemented featureEncode = 0x02 // not implemented featureSSL = 0x04 // not implemented featureMFP = 0x08 // not implemented featureWrite = 0x10 // publish, not play featureHTTP2 = 0x20 // server-side RTMPT - not implemented ) // RTMP protocols. const ( protoRTMP = 0 protoRTMPE = featureEncode protoRTMPT = featureHTTP protoRTMPS = featureSSL protoRTMPTE = (featureHTTP | featureEncode) protoRTMPTS = (featureHTTP | featureSSL) protoRTMFP = featureMFP ) // RTMP tokens (lexemes). // NB: Underscores are deliberately preserved in const names where they exist in the corresponding tokens. const ( av_checkbw = "_checkbw" av_onbwcheck = "_onbwcheck" av_onbwdone = "_onbwdone" av_result = "_result" avApp = "app" avAudioCodecs = "audioCodecs" avCapabilities = "capabilities" avClose = "close" avCode = "code" avConnect = "connect" avCreatestream = "createStream" avDeletestream = "deleteStream" avFCPublish = "FCPublish" avFCUnpublish = "FCUnpublish" avFlashver = "flashVer" avFpad = "fpad" avLevel = "level" avLive = "live" avNetConnectionConnectInvalidApp = "NetConnection.Connect.InvalidApp" avNetStreamFailed = "NetStream.Failed" avNetStreamPauseNotify = "NetStream.Pause.Notify" avNetStreamPlayComplete = "NetStream.Play.Complete" avNetStreamPlayFailed = "NetStream.Play.Failed" avNetStreamPlayPublishNotify = "NetStream.Play.PublishNotify" avNetStreamPlayStart = "NetStream.Play.Start" avNetStreamPlayStop = "NetStream.Play.Stop" avNetStreamPlayStreamNotFound = "NetStream.Play.StreamNotFound" avNetStreamPlayUnpublishNotify = "NetStream.Play.UnpublishNotify" avNetStreamPublish_Start = "NetStream.Publish.Start" avNetStreamSeekNotify = "NetStream.Seek.Notify" avNonprivate = "nonprivate" avObjectEncoding = "objectEncoding" avOnBWDone = "onBWDone" avOnFCSubscribe = "onFCSubscribe" avOnFCUnsubscribe = "onFCUnsubscribe" avOnStatus = "onStatus" avPageUrl = "pageUrl" avPing = "ping" avPlay = "play" avPlaylist_ready = "playlist_ready" avPublish = "publish" avReleasestream = "releaseStream" avSecureToken = "secureToken" avSet_playlist = "set_playlist" avSwfUrl = "swfUrl" avTcUrl = "tcUrl" avType = "type" avVideoCodecs = "videoCodecs" avVideoFunction = "videoFunction" ) // RTMP protocol strings. var rtmpProtocolStrings = [...]string{ "rtmp", "rtmpt", "rtmpe", "rtmpte", "rtmps", "rtmpts", "", "", "rtmfp", } // RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") errConnected = errors.New("rtmp: already connected") errNotConnected = errors.New("rtmp: not connected") errNotWritable = errors.New("rtmp: connection not writable") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") errInvalidHeader = errors.New("rtmp: invalid header") errInvalidBody = errors.New("rtmp: invalid body") errTinyPacket = errors.New("rtmp: packet too small") errEncoding = errors.New("rtmp: encoding error") errDecoding = errors.New("rtmp: decoding error") errUnimplemented = errors.New("rtmp: unimplemented feature") ) // setupURL parses the RTMP URL. func setupURL(s *Session) (err error) { s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url) if err != nil { return err } if s.link.tcUrl == "" { if s.link.app != "" { s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app } else { s.link.tcUrl = s.url } } if s.link.port == 0 { switch { case (s.link.protocol & featureSSL) != 0: s.link.port = 433 s.log(FatalLevel, pkg+"SSL not supported") case (s.link.protocol & featureHTTP) != 0: s.link.port = 80 default: s.link.port = 1935 } } return nil } // connect establishes an RTMP connection. func connect(s *Session) error { addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err } s.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { s.log(WarnLevel, pkg+"dial failed", "error", err.Error()) return err } s.log(DebugLevel, pkg+"connected") err = handshake(s) if err != nil { s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) return errHandshake } s.log(DebugLevel, pkg+"handshaked") err = sendConnectPacket(s) if err != nil { s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return errConnSend } return nil } // connectStream reads a packet and handles it func connectStream(s *Session) error { var err error for !s.isPlaying { pkt := packet{} err = pkt.read(s) if err != nil { break } switch pkt.packetType { case packetTypeAudio, packetTypeVideo, packetTypeInfo: s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) default: err = handlePacket(s, &pkt) if err != nil { break } } } if !s.isPlaying { return err } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case packetTypeChunkSize: if pkt.bodySize >= 4 { s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) } case packetTypeBytesReadReport: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case packetTypeControl: s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") case packetTypeServerBW: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case packetTypeClientBW: s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] } else { s.clientBW2 = 0xff } case packetTypeAudio: s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio") case packetTypeVideo: s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo") case packetTypeFlexMessage: s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage") case packetTypeInfo: s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo") case packetTypeInvoke: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) return err } case packetTypeFlashVideo: s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } return nil } func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ channel: chanControl, headerType: headerSizeLarge, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avConnect) if enc == nil { return errEncoding } s.numInvokes += 1 enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_OBJECT enc = enc[1:] enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app) if enc == nil { return errEncoding } if s.link.protocol&featureWrite != 0 { enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } if s.link.protocol&featureWrite == 0 { enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl) if enc == nil { return errEncoding } } } if s.encoding != 0.0 || s.sendEncoding { enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding) if enc == nil { return errEncoding } } copy(enc, []byte{0, 0, AMF_OBJECT_END}) enc = enc[3:] // add auth string if s.link.auth != "" { enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0) if enc == nil { return errEncoding } enc = C_AMF_EncodeString(enc, s.link.auth) if enc == nil { return errEncoding } } for i := range s.link.extras.o_props { enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc) if enc == nil { return errEncoding } } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, true) // response expected } func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avCreatestream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, true) // response expected } func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avReleasestream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avFCPublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avFCUnpublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: chanSource, headerType: headerSizeLarge, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avPublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } enc = C_AMF_EncodeString(enc, avLive) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, avDeletestream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeNumber(enc, dStreamId) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) /* no response expected */ return pkt.write(s, false) } // sendBytesReceived tells the server how many bytes the client has received. func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ channel: chanBytesRead, headerType: headerSizeMedium, packetType: packetTypeBytesReadReport, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body s.nBytesInSent = s.nBytesIn enc = C_AMF_EncodeInt32(enc, s.nBytesIn) if enc == nil { return errEncoding } pkt.bodySize = 4 return pkt.write(s, false) } func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ channel: chanControl, headerType: headerSizeLarge, packetType: packetTypeInvoke, header: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_checkbw) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } func eraseMethod(m []method, i int) []method { copy(m[i:], m[i+1:]) m[len(m)-1] = method{} return m[:len(m)-1] } // int handleInvoke handles a packet invoke request // Side effects: s.isPlaying set to true upon avNetStreamPublish_Start func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } var obj C_AMFObject nRes := C_AMF_Decode(&obj, body, 0) if nRes < 0 { return errDecoding } meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) s.log(DebugLevel, pkg+"invoking method "+meth) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) switch meth { case av_result: var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { methodInvoked = m.name s.methodCalls = eraseMethod(s.methodCalls, i) break } } if methodInvoked == "" { s.log(WarnLevel, pkg+"received result without matching request", "id", txn) goto leave } s.log(DebugLevel, pkg+"received result for "+methodInvoked) switch methodInvoked { case avConnect: if s.link.token != "" { s.log(FatalLevel, pkg+"no support for link token") } if (s.link.protocol & featureWrite) == 0 { return errNotWritable } err := sendReleaseStream(s) if err != nil { return err } err = sendFCPublish(s) if err != nil { return err } err = sendCreateStream(s) if err != nil { return err } case avCreatestream: s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) if s.link.protocol&featureWrite == 0 { return errNotWritable } err := sendPublish(s) if err != nil { return err } case avPlay, avPublish: s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") } case avOnBWDone: if s.checkCounter == 0 { // ToDo: why is this always zero? err := sendCheckBW(s) if err != nil { return err } } case avOnFCUnsubscribe, avOnFCSubscribe: s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe") case avPing: s.log(FatalLevel, pkg+"unsupported method avPing") case av_onbwcheck: s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") case av_onbwdone: s.log(FatalLevel, pkg+"unsupported method av_onbwdone") case avClose: s.log(FatalLevel, pkg+"unsupported method avClose") case avOnStatus: var obj2 C_AMFObject C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1)) level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1)) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { case avNetStreamFailed, avNetStreamPlayFailed, avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") case avNetStreamPlayStart, avNetStreamPlayPublishNotify: s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify") case avNetStreamPublish_Start: s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { if m.name == avPublish { s.methodCalls = eraseMethod(s.methodCalls, i) break } } // ToDo: handle case when avPublish method not found case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify: s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify") case avNetStreamSeekNotify: s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify") case avNetStreamPauseNotify: s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify") } case avPlaylist_ready: s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready") default: s.log(FatalLevel, pkg+"unknown method "+meth) } leave: C_AMF_Reset(&obj) return nil } func handshake(s *Session) error { var clientbuf [signatureSize + 1]byte clientsig := clientbuf[1:] var serversig [signatureSize]byte clientbuf[0] = chanControl binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) for i := 8; i < signatureSize; i++ { clientsig[i] = byte(rand.Intn(256)) } err := s.write(clientbuf[:]) if err != nil { return err } s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte err = s.read(typ[:]) if err != nil { return err } s.log(DebugLevel, pkg+"handshake received") if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } err = s.read(serversig[:]) if err != nil { return err } // decode server response suptime := binary.BigEndian.Uint32(serversig[:4]) s.log(DebugLevel, pkg+"server uptime", "uptime", suptime) // 2nd part of handshake err = s.write(serversig[:]) if err != nil { return err } err = s.read(serversig[:]) if err != nil { return err } if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) { s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1]) } return nil }