/* NAME rtmp.go DESCRIPTION See Readme.md AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE rtmp.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "bytes" "encoding/binary" "errors" "math/rand" "net" "strconv" "time" ) const ( pkg = "rtmp:" minDataSize = 11 ) const ( setDataFrame = "@setDataFrame" av__checkbw = "_checkbw" av__onbwcheck = "_onbwcheck" av__onbwdone = "_onbwdone" av__result = "_result" av_app = "app" av_audioCodecs = "audioCodecs" av_capabilities = "capabilities" av_close = "close" av_code = "code" av_connect = "connect" av_createStream = "createStream" av_deleteStream = "deleteStream" av_FCPublish = "FCPublish" av_FCUnpublish = "FCUnpublish" av_flashVer = "flashVer" av_fpad = "fpad" av_level = "level" av_live = "live" av_NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" av_NetStream_Failed = "NetStream.Failed" av_NetStream_Pause_Notify = "NetStream.Pause.Notify" av_NetStream_Play_Complete = "NetStream.Play.Complete" av_NetStream_Play_Failed = "NetStream.Play.Failed" av_NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" av_NetStream_Play_Start = "NetStream.Play.Start" av_NetStream_Play_Stop = "NetStream.Play.Stop" av_NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" av_NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" av_NetStream_Publish_Start = "NetStream.Publish.Start" av_NetStream_Seek_Notify = "NetStream.Seek.Notify" av_nonprivate = "nonprivate" av_objectEncoding = "objectEncoding" av_onBWDone = "onBWDone" av_onFCSubscribe = "onFCSubscribe" av_onFCUnsubscribe = "onFCUnsubscribe" av_onStatus = "onStatus" av_pageUrl = "pageUrl" av_ping = "ping" av_play = "play" av_playlist_ready = "playlist_ready" av_publish = "publish" av_releaseStream = "releaseStream" av_secureToken = "secureToken" av_set_playlist = "set_playlist" av_swfUrl = "swfUrl" av_tcUrl = "tcUrl" av_type = "type" av_videoCodecs = "videoCodecs" av_videoFunction = "videoFunction" ) // RTMP protocol strings. var rtmpProtocolStrings = [...]string{ "rtmp", "rtmpt", "rtmpe", "rtmpte", "rtmps", "rtmpts", "", "", "rtmfp", } // RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") errConnected = errors.New("rtmp: already connected") errNotConnected = errors.New("rtmp: not connected") errNotWritable = errors.New("rtmp: connection not writable") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") errInvalidHeader = errors.New("rtmp: invalid header") errInvalidBody = errors.New("rtmp: invalid body") errTinyPacket = errors.New("rtmp: packet too small") errEncoding = errors.New("rtmp: encoding error") errDecoding = errors.New("rtmp: decoding error") errUnimplemented = errors.New("rtmp: unimplemented feature") ) // setupURL parses the RTMP URL. func setupURL(s *Session, addr string) (err error) { s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(addr) if err != nil { return err } if s.link.tcUrl == "" { if s.link.app != "" { s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app s.link.lFlags |= RTMP_LF_FTCU } else { s.link.tcUrl = addr } } if s.link.port == 0 { switch { case (s.link.protocol & RTMP_FEATURE_SSL) != 0: s.link.port = 433 s.log(FatalLevel, pkg+"SSL not supported") case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: s.link.port = 80 default: s.link.port = 1935 } } return nil } // connect establishes an RTMP connection. func connect(s *Session) error { addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err } s.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { s.log(WarnLevel, pkg+"dial failed", "error", err.Error()) return err } s.log(DebugLevel, pkg+"connected") err = handshake(s) if err != nil { s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) return errHandshake } s.log(DebugLevel, pkg+"handshaked") err = sendConnectPacket(s) if err != nil { s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return errConnSend } return nil } // connectStream reads a packet and handles it func connectStream(s *Session) error { var err error for !s.isPlaying { pkt := packet{} err = pkt.read(s) if err != nil { break } switch pkt.packetType { case RTMP_PACKET_TYPE_AUDIO, RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_INFO: s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) default: handlePacket(s, &pkt) // ignore errors } err = handlePacket(s, &pkt) if err != nil { break } } if !s.isPlaying { return err } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: if pkt.bodySize >= 4 { s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) } case RTMP_PACKET_TYPE_BYTES_READ_REPORT: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CONTROL: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_CONTROL") case RTMP_PACKET_TYPE_SERVER_BW: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CLIENT_BW: s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] } else { s.clientBW2 = 0xff } case RTMP_PACKET_TYPE_AUDIO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_AUDIO") case RTMP_PACKET_TYPE_VIDEO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_VIDEO") case RTMP_PACKET_TYPE_FLEX_MESSAGE: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLEX_MESSAGE") case RTMP_PACKET_TYPE_INFO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_INFO") case RTMP_PACKET_TYPE_INVOKE: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) return err } case RTMP_PACKET_TYPE_FLASH_VIDEO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLASH_VIDEO") default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } return nil } func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_connect) if enc == nil { return errEncoding } s.numInvokes += 1 enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_OBJECT enc = enc[1:] enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) if enc == nil { return errEncoding } if s.link.protocol&RTMP_FEATURE_WRITE != 0 { enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } if s.link.protocol&RTMP_FEATURE_WRITE == 0 { enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) if enc == nil { return errEncoding } } } if s.encoding != 0.0 || s.sendEncoding { enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) if enc == nil { return errEncoding } } copy(enc, []byte{0, 0, AMF_OBJECT_END}) enc = enc[3:] // add auth string if s.link.auth != "" { enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { return errEncoding } enc = C_AMF_EncodeString(enc, s.link.auth) if enc == nil { return errEncoding } } for i := range s.link.extras.o_props { enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc) if enc == nil { return errEncoding } } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, true) // response expected } func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_createStream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, true) // response expected } func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_releaseStream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, false) } func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCPublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, false) } func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCUnpublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, false) } func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_SOURCE, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_publish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } enc = C_AMF_EncodeString(enc, av_live) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_deleteStream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeNumber(enc, dStreamId) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ return pkt.write(s, false) } // sendBytesReceived tells the server how many bytes the client has received. func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_BYTES_READ, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body s.nBytesInSent = s.nBytesIn enc = C_AMF_EncodeInt32(enc, s.nBytesIn) if enc == nil { return errEncoding } pkt.bodySize = 4 return pkt.write(s, false) } func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av__checkbw) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return pkt.write(s, false) } func eraseMethod(m []method, i int) []method { copy(m[i:], m[i+1:]) m[len(m)-1] = method{} return m[:len(m)-1] } // int handleInvoke handles a packet invoke request // Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } var obj C_AMFObject nRes := C_AMF_Decode(&obj, body, 0) if nRes < 0 { return errDecoding } meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) s.log(DebugLevel, pkg+"invoking method "+meth) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) switch meth { case av__result: var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { methodInvoked = m.name s.methodCalls = eraseMethod(s.methodCalls, i) break } } if methodInvoked == "" { s.log(WarnLevel, pkg+"received result without matching request", "id", txn) goto leave } s.log(DebugLevel, pkg+"received result for "+methodInvoked) switch methodInvoked { case av_connect: if s.link.token != "" { s.log(FatalLevel, pkg+"no support for link token") } if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { return errNotWritable } err := sendReleaseStream(s) if err != nil { return err } err = sendFCPublish(s) if err != nil { return err } err = sendCreateStream(s) if err != nil { return err } case av_createStream: s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) if s.link.protocol&RTMP_FEATURE_WRITE == 0 { return errNotWritable } err := sendPublish(s) if err != nil { return err } case av_play, av_publish: s.log(FatalLevel, pkg+"unsupported method av_play/av_publish") } case av_onBWDone: if s.checkCounter == 0 { // ToDo: why is this always zero? err := sendCheckBW(s) if err != nil { return err } } case av_onFCUnsubscribe, av_onFCSubscribe: s.log(FatalLevel, pkg+"unsupported method av_onFCUnsubscribe/av_onFCSubscribe") case av_ping: s.log(FatalLevel, pkg+"unsupported method av_ping") case av__onbwcheck: s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") case av__onbwdone: s.log(FatalLevel, pkg+"unsupported method av_onbwdone") case av_close: s.log(FatalLevel, pkg+"unsupported method av_close") case av_onStatus: var obj2 C_AMFObject C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_code, -1)) level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_level, -1)) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { case av_NetStream_Failed, av_NetStream_Play_Failed, av_NetStream_Play_StreamNotFound, av_NetConnection_Connect_InvalidApp: s.log(FatalLevel, pkg+"unsupported method av_NetStream/av_NetStream_Play_Failed/av_netSTream_Play_StreamNotFound/av_netConnection_Connect_invalidApp") case av_NetStream_Play_Start, av_NetStream_Play_PublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { if m.name == av_publish { s.methodCalls = eraseMethod(s.methodCalls, i) break } } // ToDo: handle case when av_publish method not found case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") case av_NetStream_Seek_Notify: s.log(FatalLevel, pkg+"unsupported method av_netStream_Seek_Notify") case av_NetStream_Pause_Notify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Pause_Notify") } case av_playlist_ready: s.log(FatalLevel, pkg+"unsupported method av_playlist_ready") default: s.log(FatalLevel, pkg+"unknown method "+meth) } leave: C_AMF_Reset(&obj) return nil } func handshake(s *Session) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] var serversig [RTMP_SIG_SIZE]byte clientbuf[0] = RTMP_CHANNEL_CONTROL binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) for i := 8; i < RTMP_SIG_SIZE; i++ { clientsig[i] = byte(rand.Intn(256)) } err := s.write(clientbuf[:]) if err != nil { return err } s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte err = s.read(typ[:]) if err != nil { return err } s.log(DebugLevel, pkg+"handshake received") if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } err = s.read(serversig[:]) if err != nil { return err } // decode server response suptime := binary.BigEndian.Uint32(serversig[:4]) s.log(DebugLevel, pkg+"server uptime", "uptime", suptime) // 2nd part of handshake err = s.write(serversig[:]) if err != nil { return err } err = s.read(serversig[:]) if err != nil { return err } if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:RTMP_SIG_SIZE], "clientsig", clientbuf[1:RTMP_SIG_SIZE+1]) } return nil }