diff --git a/rtmp/packet.go b/rtmp/packet.go index 6d33731c..dac3d803 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -3,7 +3,7 @@ NAME packet.go DESCRIPTION - See Readme.md + RTMP packet functionality. AUTHORS Saxon Nelson-Milton @@ -35,37 +35,41 @@ LICENSE package rtmp import ( - "io" "encoding/binary" + "io" ) +// Packet types. const ( - RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 - RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 - RTMP_PACKET_TYPE_CONTROL = 0x04 - RTMP_PACKET_TYPE_SERVER_BW = 0x05 - RTMP_PACKET_TYPE_CLIENT_BW = 0x06 - RTMP_PACKET_TYPE_AUDIO = 0x08 - RTMP_PACKET_TYPE_VIDEO = 0x09 - RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F - RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 - RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 - RTMP_PACKET_TYPE_INFO = 0x12 - RTMP_PACKET_TYPE_INVOKE = 0x14 - RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 + packetTypeChunkSize = 0x01 + packetTypeBytesReadReport = 0x03 + packetTypeControl = 0x04 + packetTypeServerBW = 0x05 + packetTypeClientBW = 0x06 + packetTypeAudio = 0x08 + packetTypeVideo = 0x09 + packetTypeFlexStreamSend = 0x0F // not implemented + packetTypeFlexSharedObject = 0x10 // not implemented + packetTypeFlexMessage = 0x11 // not implemented + packetTypeInfo = 0x12 + packetTypeInvoke = 0x14 + packetTypeFlashVideo = 0x16 // not implemented ) +// Header sizes. const ( - RTMP_PACKET_SIZE_LARGE = 0 - RTMP_PACKET_SIZE_MEDIUM = 1 - RTMP_PACKET_SIZE_SMALL = 2 - RTMP_PACKET_SIZE_MINIMUM = 3 + headerSizeLarge = 0 + headerSizeMedium = 1 + headerSizeSmall = 2 + headerSizeMinimum = 3 + headerSizeAuto = 4 ) +// Special channels. const ( - RTMP_CHANNEL_BYTES_READ = 0x02 - RTMP_CHANNEL_CONTROL = 0x03 - RTMP_CHANNEL_SOURCE = 0x04 + chanBytesRead = 0x02 + chanControl = 0x03 + chanSource = 0x04 ) // headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: @@ -94,16 +98,15 @@ type packet struct { type chunk struct { headerSize int32 data []byte - header [RTMP_MAX_HEADER_SIZE]byte + header [fullHeaderSize]byte } -// ToDo: Consider making the following functions into methods. -// readPacket reads a packet. -func readPacket(s *Session, pkt *packet) error { - var hbuf [RTMP_MAX_HEADER_SIZE]byte +// read reads a packet. +func (pkt *packet) read(s *Session) error { + var hbuf [fullHeaderSize]byte header := hbuf[:] - err := readN(s, header[:1]) + _, err := s.read(header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) if err == io.EOF { @@ -117,7 +120,7 @@ func readPacket(s *Session, pkt *packet) error { switch { case pkt.channel == 0: - err = readN(s, header[:1]) + _, err = s.read(header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) return err @@ -126,7 +129,7 @@ func readPacket(s *Session, pkt *packet) error { pkt.channel = int32(header[0]) + 64 case pkt.channel == 1: - err = readN(s, header[:2]) + _, err = s.read(header[:2]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) return err @@ -160,9 +163,9 @@ func readPacket(s *Session, pkt *packet) error { size := headerSizes[pkt.headerType] switch { - case size == RTMP_LARGE_HEADER_SIZE: + case size == fullHeaderSize: pkt.hasAbsTimestamp = true - case size < RTMP_LARGE_HEADER_SIZE: + case size < fullHeaderSize: if s.channelsIn[pkt.channel] != nil { *pkt = *(s.channelsIn[pkt.channel]) } @@ -170,7 +173,7 @@ func readPacket(s *Session, pkt *packet) error { size-- if size > 0 { - err = readN(s, header[:size]) + _, err = s.read(header[:size]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) return err @@ -196,7 +199,7 @@ func readPacket(s *Session, pkt *packet) error { extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { - err = readN(s, header[size:size+4]) + _, err = s.read(header[size : size+4]) if err != nil { s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err @@ -207,7 +210,7 @@ func readPacket(s *Session, pkt *packet) error { } if pkt.bodySize > 0 && pkt.body == nil { - resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) + pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } toRead := int32(pkt.bodySize - pkt.bytesRead) @@ -218,12 +221,13 @@ func readPacket(s *Session, pkt *packet) error { } if pkt.chunk != nil { + panic("non-nil chunk") pkt.chunk.headerSize = int32(hSize) copy(pkt.chunk.header[:], hbuf[:hSize]) pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] } - err = readN(s, pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err @@ -257,20 +261,39 @@ func readPacket(s *Session, pkt *packet) error { return nil } -// resizePacket adjusts the packet's storage to accommodate a body of the given size. -func resizePacket(pkt *packet, size uint32, ht uint8) { - buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) - pkt.headerType = ht +// resize adjusts the packet's storage to accommodate a body of the given size and header type. +func (pkt *packet) resize(size uint32, ht uint8) { + buf := make([]byte, fullHeaderSize+size) pkt.header = buf - pkt.body = buf[RTMP_MAX_HEADER_SIZE:] + pkt.body = buf[fullHeaderSize:] + if ht != headerSizeAuto { + pkt.headerType = ht + return + } + switch pkt.packetType { + case packetTypeVideo, packetTypeAudio: + if pkt.timestamp == 0 { + pkt.headerType = headerSizeLarge + } else { + pkt.headerType = headerSizeMedium + } + case packetTypeInfo: + pkt.headerType = headerSizeLarge + pkt.bodySize += 16 + default: + pkt.headerType = headerSizeMedium + } } -// sendPacket sends a packet. -func sendPacket(s *Session, pkt *packet, queue bool) error { - var prevPkt *packet - var last int +// write sends a packet. +// When queue is true, we expect a response to this request and cache the method on s.methodCalls. +func (pkt *packet) write(s *Session, queue bool) error { + if pkt.body == nil { + return errInvalidBody + } if pkt.channel >= s.channelsAllocatedOut { + s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) n := int(pkt.channel + 10) var pkts []*packet @@ -287,16 +310,17 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { s.channelsAllocatedOut = int32(n) } - prevPkt = s.channelsOut[pkt.channel] - if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { + prevPkt := s.channelsOut[pkt.channel] + var last int + if prevPkt != nil && pkt.headerType != headerSizeLarge { // compress a bit by using the prev packet's attributes - if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { - pkt.headerType = RTMP_PACKET_SIZE_SMALL + if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { + pkt.headerType = headerSizeSmall } - if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { - pkt.headerType = RTMP_PACKET_SIZE_MINIMUM + if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall { + pkt.headerType = headerSizeMinimum } last = int(prevPkt.timestamp) @@ -307,20 +331,14 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { return errInvalidHeader } - var headBytes []byte - var origIdx int - if pkt.body != nil { - // Span from -packetsize for the type to the start of the body. - headBytes = pkt.header - origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType] - } else { - // Allocate a new header and allow 6 bytes of movement backward. - var hbuf [RTMP_MAX_HEADER_SIZE]byte - headBytes = hbuf[:] - origIdx = 6 - } + // The complete packet starts from headerSize _before_ the start the body. + // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. + headBytes := pkt.header + hSize := headerSizes[pkt.headerType] + origIdx := fullHeaderSize - hSize - var cSize int + // adjust 1 or 2 bytes for the channel + cSize := 0 switch { case pkt.channel > 319: cSize = 2 @@ -328,12 +346,12 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { cSize = 1 } - hSize := headerSizes[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize } + // adjust 4 bytes for the timestamp var ts uint32 if prevPkt != nil { ts = uint32(int(pkt.timestamp) - last) @@ -386,8 +404,8 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } if headerSizes[pkt.headerType] > 8 { - n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) - headerIdx += n + binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info)) + headerIdx += 4 // 32bits } if ts >= 0xffffff { @@ -398,33 +416,39 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { size := int(pkt.bodySize) chunkSize := int(s.outChunkSize) - s.log(DebugLevel, pkg+"sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) - - if s.deferred != nil && len(s.deferred)+size+hSize > chunkSize { - err := writeN(s, s.deferred) - if err != nil { - return err + if s.deferred == nil { + // Defer sending small audio packets (at most once). + if pkt.packetType == packetTypeAudio && size < chunkSize { + s.deferred = headBytes[origIdx:][:size+hSize] + s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) + return nil + } + } else { + // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + if len(s.deferred)+size+hSize > chunkSize { + s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred)) + _, err := s.write(s.deferred) + if err != nil { + return err + } + s.deferred = nil } - s.deferred = nil } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. + s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { - if s.deferred == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { - s.deferred = headBytes[origIdx:][:size+hSize] - s.log(DebugLevel, pkg+"deferred sending packet") - break - } if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. + s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) bytes = append(s.deferred, bytes...) } - err := writeN(s, bytes) + _, err := s.write(bytes) if err != nil { return err } @@ -461,12 +485,12 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } // We invoked a remote method - if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { + if pkt.packetType == packetTypeInvoke { buf := pkt.body[1:] meth := C_AMF_DecodeString(buf) + s.log(DebugLevel, pkg+"invoking method "+meth) // keep it in call queue till result arrives if queue { - s.log(DebugLevel, pkg+"queuing method "+meth) buf = buf[3+len(meth):] txn := int32(C_AMF_DecodeNumber(buf[:8])) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index 97110eb8..eae4277e 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -8,9 +8,10 @@ DESCRIPTION AUTHOR Dan Kortschak Saxon Nelson-Milton + Alan Noble LICENSE - parseurl.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + parseurl.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -33,7 +34,6 @@ LICENSE package rtmp import ( - "log" "net/url" "path" "strconv" @@ -41,30 +41,29 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). +// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { - log.Printf("failed to parse addr: %v", err) return protocol, host, port, app, playpath, err } switch u.Scheme { case "rtmp": - protocol = RTMP_PROTOCOL_RTMP + protocol = protoRTMP case "rtmpt": - protocol = RTMP_PROTOCOL_RTMPT + protocol = protoRTMPT case "rtmps": - protocol = RTMP_PROTOCOL_RTMPS + protocol = protoRTMPS case "rtmpe": - protocol = RTMP_PROTOCOL_RTMPE + protocol = protoRTMPE case "rtmfp": - protocol = RTMP_PROTOCOL_RTMFP + protocol = protoRTMFP case "rtmpte": - protocol = RTMP_PROTOCOL_RTMPTE + protocol = protoRTMPTE case "rtmpts": - protocol = RTMP_PROTOCOL_RTMPTS + protocol = protoRTMPTS default: - log.Printf("unknown scheme: %q", u.Scheme) return protocol, host, port, app, playpath, errUnknownScheme } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 2c6300d5..4b334587 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -3,7 +3,7 @@ NAME rtmp.go DESCRIPTION - See Readme.md + RTMP command functionality. AUTHORS Saxon Nelson-Milton @@ -38,7 +38,6 @@ import ( "bytes" "encoding/binary" "errors" - "io" "math/rand" "net" "strconv" @@ -46,62 +45,94 @@ import ( ) const ( - pkg = "rtmp:" - minDataSize = 11 + pkg = "rtmp:" + minDataSize = 11 // ToDo: this should be the same as fullHeaderSize + signatureSize = 1536 + fullHeaderSize = 12 ) +// Link flags. const ( - setDataFrame = "@setDataFrame" + linkAuth = 0x0001 // using auth param + linkLive = 0x0002 // stream is live + linkSWF = 0x0004 // do SWF verification - not implemented + linkPlaylist = 0x0008 // send playlist before play - not implemented + linkBufx = 0x0010 // toggle stream on BufferEmpty msg - not implemented +) - av__checkbw = "_checkbw" - av__onbwcheck = "_onbwcheck" - av__onbwdone = "_onbwdone" - av__result = "_result" - av_app = "app" - av_audioCodecs = "audioCodecs" - av_capabilities = "capabilities" - av_close = "close" - av_code = "code" - av_connect = "connect" - av_createStream = "createStream" - av_deleteStream = "deleteStream" - av_FCPublish = "FCPublish" - av_FCUnpublish = "FCUnpublish" - av_flashVer = "flashVer" - av_fpad = "fpad" - av_level = "level" - av_live = "live" - av_NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" - av_NetStream_Failed = "NetStream.Failed" - av_NetStream_Pause_Notify = "NetStream.Pause.Notify" - av_NetStream_Play_Complete = "NetStream.Play.Complete" - av_NetStream_Play_Failed = "NetStream.Play.Failed" - av_NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" - av_NetStream_Play_Start = "NetStream.Play.Start" - av_NetStream_Play_Stop = "NetStream.Play.Stop" - av_NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" - av_NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" - av_NetStream_Publish_Start = "NetStream.Publish.Start" - av_NetStream_Seek_Notify = "NetStream.Seek.Notify" - av_nonprivate = "nonprivate" - av_objectEncoding = "objectEncoding" - av_onBWDone = "onBWDone" - av_onFCSubscribe = "onFCSubscribe" - av_onFCUnsubscribe = "onFCUnsubscribe" - av_onStatus = "onStatus" - av_pageUrl = "pageUrl" - av_ping = "ping" - av_play = "play" - av_playlist_ready = "playlist_ready" - av_publish = "publish" - av_releaseStream = "releaseStream" - av_secureToken = "secureToken" - av_set_playlist = "set_playlist" - av_swfUrl = "swfUrl" - av_tcUrl = "tcUrl" - av_type = "type" - av_videoCodecs = "videoCodecs" - av_videoFunction = "videoFunction" +// Protocol features. +const ( + featureHTTP = 0x01 // not implemented + featureEncode = 0x02 // not implemented + featureSSL = 0x04 // not implemented + featureMFP = 0x08 // not implemented + featureWrite = 0x10 // publish, not play + featureHTTP2 = 0x20 // server-side RTMPT - not implemented +) + +// RTMP protocols. +const ( + protoRTMP = 0 + protoRTMPE = featureEncode + protoRTMPT = featureHTTP + protoRTMPS = featureSSL + protoRTMPTE = (featureHTTP | featureEncode) + protoRTMPTS = (featureHTTP | featureSSL) + protoRTMFP = featureMFP +) + +// RTMP tokens (lexemes). +// NB: Underscores are deliberately preserved in const names where they exist in the corresponding tokens. +const ( + av_checkbw = "_checkbw" + av_onbwcheck = "_onbwcheck" + av_onbwdone = "_onbwdone" + av_result = "_result" + avApp = "app" + avAudioCodecs = "audioCodecs" + avCapabilities = "capabilities" + avClose = "close" + avCode = "code" + avConnect = "connect" + avCreatestream = "createStream" + avDeletestream = "deleteStream" + avFCPublish = "FCPublish" + avFCUnpublish = "FCUnpublish" + avFlashver = "flashVer" + avFpad = "fpad" + avLevel = "level" + avLive = "live" + avNetConnectionConnectInvalidApp = "NetConnection.Connect.InvalidApp" + avNetStreamFailed = "NetStream.Failed" + avNetStreamPauseNotify = "NetStream.Pause.Notify" + avNetStreamPlayComplete = "NetStream.Play.Complete" + avNetStreamPlayFailed = "NetStream.Play.Failed" + avNetStreamPlayPublishNotify = "NetStream.Play.PublishNotify" + avNetStreamPlayStart = "NetStream.Play.Start" + avNetStreamPlayStop = "NetStream.Play.Stop" + avNetStreamPlayStreamNotFound = "NetStream.Play.StreamNotFound" + avNetStreamPlayUnpublishNotify = "NetStream.Play.UnpublishNotify" + avNetStreamPublish_Start = "NetStream.Publish.Start" + avNetStreamSeekNotify = "NetStream.Seek.Notify" + avNonprivate = "nonprivate" + avObjectEncoding = "objectEncoding" + avOnBWDone = "onBWDone" + avOnFCSubscribe = "onFCSubscribe" + avOnFCUnsubscribe = "onFCUnsubscribe" + avOnStatus = "onStatus" + avPageUrl = "pageUrl" + avPing = "ping" + avPlay = "play" + avPlaylist_ready = "playlist_ready" + avPublish = "publish" + avReleasestream = "releaseStream" + avSecureToken = "secureToken" + avSet_playlist = "set_playlist" + avSwfUrl = "swfUrl" + avTcUrl = "tcUrl" + avType = "type" + avVideoCodecs = "videoCodecs" + avVideoFunction = "videoFunction" ) // RTMP protocol strings. @@ -122,6 +153,7 @@ var ( errUnknownScheme = errors.New("rtmp: unknown scheme") errConnected = errors.New("rtmp: already connected") errNotConnected = errors.New("rtmp: not connected") + errNotWritable = errors.New("rtmp: connection not writable") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") @@ -130,12 +162,12 @@ var ( errTinyPacket = errors.New("rtmp: packet too small") errEncoding = errors.New("rtmp: encoding error") errDecoding = errors.New("rtmp: decoding error") - errCopying = errors.New("rtmp: copying error") + errUnimplemented = errors.New("rtmp: unimplemented feature") ) // setupURL parses the RTMP URL. -func setupURL(s *Session, addr string) (err error) { - s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(addr) +func setupURL(s *Session) (err error) { + s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url) if err != nil { return err } @@ -143,18 +175,17 @@ func setupURL(s *Session, addr string) (err error) { if s.link.tcUrl == "" { if s.link.app != "" { s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app - s.link.lFlags |= RTMP_LF_FTCU } else { - s.link.tcUrl = addr + s.link.tcUrl = s.url } } if s.link.port == 0 { switch { - case (s.link.protocol & RTMP_FEATURE_SSL) != 0: + case (s.link.protocol & featureSSL) != 0: s.link.port = 433 s.log(FatalLevel, pkg+"SSL not supported") - case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: + case (s.link.protocol & featureHTTP) != 0: s.link.port = 80 default: s.link.port = 1935 @@ -192,25 +223,21 @@ func connect(s *Session) error { // connectStream reads a packet and handles it func connectStream(s *Session) error { var err error - for !s.isPlaying && s.isConnected() { + for !s.isPlaying { pkt := packet{} - err = readPacket(s, &pkt) + err = pkt.read(s) if err != nil { break } - if pkt.bodySize == 0 { - continue - } - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || - pkt.packetType == RTMP_PACKET_TYPE_VIDEO || - pkt.packetType == RTMP_PACKET_TYPE_INFO { - s.log(DebugLevel, pkg+"got packet before play; ignoring") - continue - } - err = handlePacket(s, &pkt) - if err != nil { - break + switch pkt.packetType { + case packetTypeAudio, packetTypeVideo, packetTypeInfo: + s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) + default: + err = handlePacket(s, &pkt) + if err != nil { + break + } } } @@ -224,22 +251,21 @@ func connectStream(s *Session) error { // NB: cases have been commented out that are not currently used by AusOcean func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { - - case RTMP_PACKET_TYPE_CHUNK_SIZE: + case packetTypeChunkSize: if pkt.bodySize >= 4 { s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) } - case RTMP_PACKET_TYPE_BYTES_READ_REPORT: + case packetTypeBytesReadReport: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - case RTMP_PACKET_TYPE_CONTROL: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_CONTROL") + case packetTypeControl: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") - case RTMP_PACKET_TYPE_SERVER_BW: + case packetTypeServerBW: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - case RTMP_PACKET_TYPE_CLIENT_BW: + case packetTypeClientBW: s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] @@ -247,19 +273,19 @@ func handlePacket(s *Session, pkt *packet) error { s.clientBW2 = 0xff } - case RTMP_PACKET_TYPE_AUDIO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_AUDIO") + case packetTypeAudio: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio") - case RTMP_PACKET_TYPE_VIDEO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_VIDEO") + case packetTypeVideo: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo") - case RTMP_PACKET_TYPE_FLEX_MESSAGE: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLEX_MESSAGE") + case packetTypeFlexMessage: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage") - case RTMP_PACKET_TYPE_INFO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_INFO") + case packetTypeInfo: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo") - case RTMP_PACKET_TYPE_INVOKE: + case packetTypeInvoke: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. @@ -267,8 +293,8 @@ func handlePacket(s *Session, pkt *packet) error { return err } - case RTMP_PACKET_TYPE_FLASH_VIDEO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLASH_VIDEO") + case packetTypeFlashVideo: + s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) @@ -276,52 +302,18 @@ func handlePacket(s *Session, pkt *packet) error { return nil } -func readN(s *Session, buf []byte) error { - err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) - if err != nil { - return err - } - n, err := io.ReadFull(s.link.conn, buf) - if err != nil { - s.log(DebugLevel, pkg+"read failed", "error", err.Error()) - return err - } - s.nBytesIn += int32(n) - if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { - err := sendBytesReceived(s) - if err != nil { - return err - } - } - return nil -} - -func writeN(s *Session, buf []byte) error { - //ToDo: consider using a different timeout for writes than for reads - err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) - if err != nil { - return err - } - _, err = s.link.conn.Write(buf) - if err != nil { - s.log(WarnLevel, pkg+"write failed", "error", err.Error()) - return err - } - return nil -} - func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_connect) + enc = C_AMF_EncodeString(enc, avConnect) if enc == nil { return errEncoding } @@ -333,60 +325,60 @@ func sendConnectPacket(s *Session) error { enc[0] = AMF_OBJECT enc = enc[1:] - enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) + enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app) if enc == nil { return errEncoding } - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) + if s.link.protocol&featureWrite != 0 { + enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { - enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) + enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) + enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) + enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } - if s.link.protocol&RTMP_FEATURE_WRITE == 0 { - enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) + if s.link.protocol&featureWrite == 0 { + enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) + enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) + enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) + enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) + enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) + enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl) if enc == nil { return errEncoding } @@ -394,7 +386,7 @@ func sendConnectPacket(s *Session) error { } if s.encoding != 0.0 || s.sendEncoding { - enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) + enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding) if enc == nil { return errEncoding } @@ -405,7 +397,7 @@ func sendConnectPacket(s *Session) error { // add auth string if s.link.auth != "" { - enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) + enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0) if enc == nil { return errEncoding } @@ -422,23 +414,23 @@ func sendConnectPacket(s *Session) error { } } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, true) // response expected + return pkt.write(s, true) // response expected } func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_createStream) + enc = C_AMF_EncodeString(enc, avCreatestream) if enc == nil { return errEncoding } @@ -450,23 +442,23 @@ func sendCreateStream(s *Session) error { enc[0] = AMF_NULL enc = enc[1:] - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, true) // response expected + return pkt.write(s, true) // response expected } func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_releaseStream) + enc = C_AMF_EncodeString(enc, avReleasestream) if enc == nil { return errEncoding } @@ -481,23 +473,23 @@ func sendReleaseStream(s *Session) error { if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_FCPublish) + enc = C_AMF_EncodeString(enc, avFCPublish) if enc == nil { return errEncoding } @@ -513,23 +505,23 @@ func sendFCPublish(s *Session) error { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_FCUnpublish) + enc = C_AMF_EncodeString(enc, avFCUnpublish) if enc == nil { return errEncoding } @@ -545,23 +537,23 @@ func sendFCUnpublish(s *Session) error { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_SOURCE, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanSource, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_publish) + enc = C_AMF_EncodeString(enc, avPublish) if enc == nil { return errEncoding } @@ -576,28 +568,28 @@ func sendPublish(s *Session) error { if enc == nil { return errEncoding } - enc = C_AMF_EncodeString(enc, av_live) + enc = C_AMF_EncodeString(enc, avLive) if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, true) // response expected + return pkt.write(s, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_deleteStream) + enc = C_AMF_EncodeString(enc, avDeletestream) if enc == nil { return errEncoding } @@ -612,21 +604,20 @@ func sendDeleteStream(s *Session, dStreamId float64) error { if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - /* no response expected */ - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } // sendBytesReceived tells the server how many bytes the client has received. func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_BYTES_READ, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, + channel: chanBytesRead, + headerType: headerSizeMedium, + packetType: packetTypeBytesReadReport, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -637,21 +628,21 @@ func sendBytesReceived(s *Session) error { } pkt.bodySize = 4 - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av__checkbw) + enc = C_AMF_EncodeString(enc, av_checkbw) if enc == nil { return errEncoding } @@ -663,9 +654,9 @@ func sendCheckBW(s *Session) error { enc[0] = AMF_NULL enc = enc[1:] - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func eraseMethod(m []method, i int) []method { @@ -675,7 +666,7 @@ func eraseMethod(m []method, i int) []method { } // int handleInvoke handles a packet invoke request -// Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start +// Side effects: s.isPlaying set to true upon avNetStreamPublish_Start func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody @@ -691,7 +682,7 @@ func handleInvoke(s *Session, body []byte) error { txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) switch meth { - case av__result: + case av_result: var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { @@ -707,94 +698,101 @@ func handleInvoke(s *Session, body []byte) error { s.log(DebugLevel, pkg+"received result for "+methodInvoked) switch methodInvoked { - case av_connect: + case avConnect: if s.link.token != "" { s.log(FatalLevel, pkg+"no support for link token") - } - if (s.link.protocol & RTMP_FEATURE_WRITE) != 0 { - sendReleaseStream(s) - sendFCPublish(s) - } else { - s.log(FatalLevel, pkg+"link protocol has no RTMP_FEATURE_WRITE") + if (s.link.protocol & featureWrite) == 0 { + return errNotWritable + } + err := sendReleaseStream(s) + if err != nil { + return err + } + err = sendFCPublish(s) + if err != nil { + return err + } + err = sendCreateStream(s) + if err != nil { + return err } - sendCreateStream(s) - if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { - s.log(FatalLevel, pkg+"link protocol has no RTMP_FEATURE_WRITE") - } - - case av_createStream: + case avCreatestream: s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) - - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - sendPublish(s) - } else { - s.log(FatalLevel, pkg+"link protocol has no RTMP_FEATURE_WRITE") + if s.link.protocol&featureWrite == 0 { + return errNotWritable + } + err := sendPublish(s) + if err != nil { + return err } - case av_play, av_publish: - s.log(FatalLevel, pkg+"unsupported method av_play/av_publish") + case avPlay, avPublish: + s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") } - case av_onBWDone: + case avOnBWDone: if s.checkCounter == 0 { // ToDo: why is this always zero? - sendCheckBW(s) + err := sendCheckBW(s) + if err != nil { + return err + } } - case av_onFCUnsubscribe, av_onFCSubscribe: - s.log(FatalLevel, pkg+"unsupported method av_onFCUnsubscribe/av_onFCSubscribe") + case avOnFCUnsubscribe, avOnFCSubscribe: + s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe") - case av_ping: - s.log(FatalLevel, pkg+"unsupported method av_ping") + case avPing: + s.log(FatalLevel, pkg+"unsupported method avPing") - case av__onbwcheck: + case av_onbwcheck: s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") - case av__onbwdone: + case av_onbwdone: s.log(FatalLevel, pkg+"unsupported method av_onbwdone") - case av_close: - s.log(FatalLevel, pkg+"unsupported method av_close") + case avClose: + s.log(FatalLevel, pkg+"unsupported method avClose") - case av_onStatus: + case avOnStatus: var obj2 C_AMFObject C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) - code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_code, -1)) - level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_level, -1)) + code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1)) + level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1)) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { - case av_NetStream_Failed, av_NetStream_Play_Failed, - av_NetStream_Play_StreamNotFound, av_NetConnection_Connect_InvalidApp: - s.log(FatalLevel, pkg+"unsupported method av_NetStream/av_NetStream_Play_Failed/av_netSTream_Play_StreamNotFound/av_netConnection_Connect_invalidApp") + case avNetStreamFailed, avNetStreamPlayFailed, + avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: + s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") - case av_NetStream_Play_Start, av_NetStream_Play_PublishNotify: - s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") + case avNetStreamPlayStart, avNetStreamPlayPublishNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify") - case av_NetStream_Publish_Start: + case avNetStreamPublish_Start: s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { - if m.name == av_publish { + if m.name == avPublish { s.methodCalls = eraseMethod(s.methodCalls, i) break } } - // ToDo: handle case when av_publish method not found + // ToDo: handle case when avPublish method not found - case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: - s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") + case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify") - case av_NetStream_Seek_Notify: - s.log(FatalLevel, pkg+"unsupported method av_netStream_Seek_Notify") + case avNetStreamSeekNotify: + s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify") - case av_NetStream_Pause_Notify: - s.log(FatalLevel, pkg+"unsupported method av_NetStream_Pause_Notify") + case avNetStreamPauseNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify") } - case av_playlist_ready: - s.log(FatalLevel, pkg+"unsupported method av_playlist_ready") + case avPlaylist_ready: + s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready") default: s.log(FatalLevel, pkg+"unknown method "+meth) @@ -805,26 +803,26 @@ leave: } func handshake(s *Session) error { - var clientbuf [RTMP_SIG_SIZE + 1]byte + var clientbuf [signatureSize + 1]byte clientsig := clientbuf[1:] - var serversig [RTMP_SIG_SIZE]byte - clientbuf[0] = RTMP_CHANNEL_CONTROL + var serversig [signatureSize]byte + clientbuf[0] = chanControl binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) - for i := 8; i < RTMP_SIG_SIZE; i++ { + for i := 8; i < signatureSize; i++ { clientsig[i] = byte(rand.Intn(256)) } - err := writeN(s, clientbuf[:]) + _, err := s.write(clientbuf[:]) if err != nil { return err } s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte - err = readN(s, typ[:]) + _, err = s.read(typ[:]) if err != nil { return err } @@ -833,7 +831,7 @@ func handshake(s *Session) error { if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } - err = readN(s, serversig[:]) + _, err = s.read(serversig[:]) if err != nil { return err } @@ -843,94 +841,18 @@ func handshake(s *Session) error { s.log(DebugLevel, pkg+"server uptime", "uptime", suptime) // 2nd part of handshake - err = writeN(s, serversig[:]) + _, err = s.write(serversig[:]) if err != nil { return err } - err = readN(s, serversig[:]) + _, err = s.read(serversig[:]) if err != nil { return err } - if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { - s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:RTMP_SIG_SIZE], "clientsig", clientbuf[1:RTMP_SIG_SIZE+1]) - } - return nil -} - -// write prepares data to write then sends it. -func (s *Session) write(buf []byte) error { - pkt := packet{ - channel: RTMP_CHANNEL_SOURCE, - info: s.streamID, - } - var enc []byte - - for len(buf) != 0 { - if pkt.bytesRead == 0 { - if len(buf) < minDataSize { - return errTinyPacket - } - - if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' { - buf = buf[13:] - } - - pkt.packetType = buf[0] - buf = buf[1:] - pkt.bodySize = C_AMF_DecodeInt24(buf[:3]) - buf = buf[3:] - pkt.timestamp = C_AMF_DecodeInt24(buf[:3]) - buf = buf[3:] - pkt.timestamp |= uint32(buf[0]) << 24 - buf = buf[4:] - - headerType := uint8(RTMP_PACKET_SIZE_MEDIUM) - switch pkt.packetType { - case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: - if pkt.timestamp == 0 { - headerType = RTMP_PACKET_SIZE_LARGE - } - case RTMP_PACKET_TYPE_INFO: - headerType = RTMP_PACKET_SIZE_LARGE - pkt.bodySize += 16 - } - - resizePacket(&pkt, pkt.bodySize, headerType) - - enc = pkt.body[:pkt.bodySize] - if pkt.packetType == RTMP_PACKET_TYPE_INFO { - enc = C_AMF_EncodeString(enc, setDataFrame) - if enc == nil { - return errEncoding - } - pkt.bytesRead = uint32(len(pkt.body) - len(enc)) - } - - } else { - enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] - } - num := int(pkt.bodySize - pkt.bytesRead) - if num > len(buf) { - num = len(buf) - } - - copy(enc[:num], buf[:num]) - pkt.bytesRead += uint32(num) - buf = buf[num:] - if pkt.bytesRead == pkt.bodySize { - err := sendPacket(s, &pkt, false) - pkt.body = nil - pkt.bytesRead = 0 - if err != nil { - return err - } - if len(buf) < 4 { - return nil - } - buf = buf[4:] - } + if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) { + s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1]) } return nil } diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go deleted file mode 100644 index becb48be..00000000 --- a/rtmp/rtmp_headers.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -NAME - rtmp_headers.go - -DESCRIPTION - See Readme.md - -AUTHORS - Saxon Nelson-Milton - Dan Kortschak - Alan Noble - -LICENSE - rtmp_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -import ( - "net" -) - -const ( - RTMPT_OPEN = iota - RTMPT_SEND - RTMPT_IDLE - RTMPT_CLOSE -) - -const ( - RTMP_READ_HEADER = 0x01 - RTMP_READ_RESUME = 0x02 - RTMP_READ_NO_IGNORE = 0x04 - RTMP_READ_GOTKF = 0x08 - RTMP_READ_GOTFLVK = 0x10 - RTMP_READ_SEEKING = 0x20 - RTMP_READ_COMPLETE = -3 - RTMP_READ_ERROR = -2 - RTMP_READ_EOF = -1 - RTMP_READ_IGNORE = 0 -) - -const ( - RTMP_LF_AUTH = 0x0001 /* using auth param */ - RTMP_LF_LIVE = 0x0002 /* stream is live */ - RTMP_LF_SWFV = 0x0004 /* do SWF verification */ - RTMP_LF_PLST = 0x0008 /* send playlist before play */ - RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */ - RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */ - RTMP_LF_FAPU = 0x0040 /* free app on close */ -) - -const ( - RTMP_FEATURE_HTTP = 0x01 - RTMP_FEATURE_ENC = 0x02 - RTMP_FEATURE_SSL = 0x04 - RTMP_FEATURE_MFP = 0x08 /* not yet supported */ - RTMP_FEATURE_WRITE = 0x10 /* publish, not play */ - RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */ -) - -const ( - RTMP_PROTOCOL_RTMP = 0 - RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC - RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP - RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL - RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC) - RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL) - RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP -) - -const ( - RTMP_DEFAULT_CHUNKSIZE = 128 - RTMP_BUFFER_CACHE_SIZE = (16 * 1024) - RTMP_SIG_SIZE = 1536 - RTMP_LARGE_HEADER_SIZE = 12 - RTMP_MAX_HEADER_SIZE = 18 -) - -type link struct { - host string - playpath string - tcUrl string - swfUrl string - pageUrl string - app string - auth string - flashVer string - token string - extras C_AMFObject - lFlags int32 - swfAge int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -type method struct { - name string - num int32 -} diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 296809c6..01dad10f 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -6,6 +6,8 @@ DESCRIPTION RTMP tests AUTHORS + Saxon Nelson-Milton + Dan Kortschak Alan Noble LICENSE @@ -49,20 +51,22 @@ const ( testDataDir = "../../test/test-data/av/input" ) -// testVerbosity controls the amount of output +// testVerbosity controls the amount of output. // NB: This is not the log level, which is DebugLevel. // 0: suppress logging completely // 1: log messages only // 2: log messages with errors, if any var testVerbosity = 1 -// testKey is the RTMP key required for YouTube streaming (RTMP_TEST_KEY env var) +// testKey is the YouTube RTMP key required for YouTube streaming (RTMP_TEST_KEY env var). +// NB: don't share your key with others. var testKey string -// testFile is the test video file (RTMP_TEST_FILE env var) +// testFile is the test video file (RTMP_TEST_FILE env var). +// betterInput.h264 is a good one to use. var testFile string -// testLog is a bare bones logger that logs to stdout. +// testLog is a bare bones logger that logs to stdout, and exits upon either an error or fatal error. func testLog(level int8, msg string, params ...interface{}) { logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} if testVerbosity == 0 { @@ -71,21 +75,28 @@ func testLog(level int8, msg string, params ...interface{}) { if level < -1 || level > 5 { panic("Invalid log level") } - if testVerbosity == 2 && len(params) >= 2 { - // extract the params we know about, otherwise just print the message - switch params[0].(string) { - case "error": - fmt.Printf("%s: %s, error=%v\n", logLevels[level+1], msg, params[1].(string)) - case "size": - fmt.Printf("%s: %s, size=%d\n", logLevels[level+1], msg, params[1].(int)) - default: + switch testVerbosity { + case 0: + // silence is golden + case 1: + fmt.Printf("%s: %s\n", logLevels[level+1], msg) + case 2: + // extract the first param if it is one we care about, otherwise just print the message + if len(params) >= 2 { + switch params[0].(string) { + case "error": + fmt.Printf("%s: %s, error=%v\n", logLevels[level+1], msg, params[1].(string)) + case "size": + fmt.Printf("%s: %s, size=%d\n", logLevels[level+1], msg, params[1].(int)) + default: + fmt.Printf("%s: %s\n", logLevels[level+1], msg) + } + } else { fmt.Printf("%s: %s\n", logLevels[level+1], msg) } - } else { - fmt.Printf("%s: %s\n", logLevels[level+1], msg) } - if level == 5 { - // Fatal + if level >= 4 { + // Error or Fatal buf := make([]byte, 1<<16) size := runtime.Stack(buf, true) fmt.Printf("%s\n", string(buf[:size])) @@ -113,7 +124,7 @@ func TestSetupURL(t *testing.T) { if s.url != testBaseURL && s.link.timeout != testTimeout { t.Errorf("NewSession failed") } - err := setupURL(s, s.url) + err := setupURL(s) if err != nil { t.Errorf("setupURL(testBaseURL) failed with error: %v", err) } @@ -166,12 +177,16 @@ func TestFromFrame(t *testing.T) { } // Pass RTMP session, true for audio, true for video, and 25 FPS + // ToDo: fix this. Although we can encode the file and YouTube + // doesn't complain, YouTube doesn't play it (even when we + // send 1 minute's worth). flvEncoder := flv.NewEncoder(s, true, true, 25) for i := 0; i < 25; i++ { err := flvEncoder.Encode(b) if err != nil { t.Errorf("Encoding failed with error: %v", err) } + time.Sleep(time.Millisecond / 25) // rate limit to 1/25s } err = s.Close() diff --git a/rtmp/session.go b/rtmp/session.go index a402a320..eb97c938 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -3,7 +3,7 @@ NAME session.go DESCRIPTION - See Readme.md + RTMP session functionality. AUTHORS Saxon Nelson-Milton @@ -33,6 +33,12 @@ LICENSE */ package rtmp +import ( + "io" + "net" + "time" +) + // Session holds the state for an RTMP session. type Session struct { url string @@ -62,6 +68,32 @@ type Session struct { log Log } +// link represents RTMP URL and connection information. +type link struct { + host string + playpath string + tcUrl string + swfUrl string + pageUrl string + app string + auth string + flashVer string + token string + extras C_AMFObject + flags int32 + swfAge int32 + protocol int32 + timeout uint + port uint16 + conn *net.TCPConn +} + +// method represents an RTMP method. +type method struct { + name string + num int32 +} + // Log defines the RTMP logging function. type Log func(level int8, message string, params ...interface{}) @@ -95,63 +127,46 @@ func NewSession(url string, timeout uint, log Log) *Session { // Open establishes an rtmp connection with the url passed into the constructor. func (s *Session) Open() error { + s.log(DebugLevel, pkg+"Session.Open") if s.isConnected() { return errConnected } - err := s.start() + err := setupURL(s) if err != nil { return err } - return nil -} - -// start does the heavylifting for Open(). -func (s *Session) start() error { - s.log(DebugLevel, pkg+"Session.start") - err := setupURL(s, s.url) - if err != nil { - s.close() - return err - } s.enableWrite() err = connect(s) if err != nil { - s.close() + s.Close() return err } err = connectStream(s) if err != nil { - s.close() + s.Close() return err } return nil } -// Close terminates the rtmp connection, +// Close terminates the rtmp connection. +// NB: Close is idempotent and the session value is cleared completely. func (s *Session) Close() error { + s.log(DebugLevel, pkg+"Session.Close") if !s.isConnected() { return errNotConnected } - s.close() - return nil -} - -// close does the heavylifting for Close(). -// Any errors are ignored as it is often called in response to an earlier error. -func (s *Session) close() { - s.log(DebugLevel, pkg+"Session.close") - if s.isConnected() { - if s.streamID > 0 { - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - sendFCUnpublish(s) - } - sendDeleteStream(s, float64(s.streamID)) + if s.streamID > 0 { + if s.link.protocol&featureWrite != 0 { + sendFCUnpublish(s) } - s.link.conn.Close() + sendDeleteStream(s, float64(s.streamID)) } + s.link.conn.Close() *s = Session{} + return nil } // Write writes a frame (flv tag) to the rtmp connection. @@ -159,13 +174,70 @@ func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { return 0, errNotConnected } - err := s.write(data) + if len(data) < minDataSize { + return 0, errTinyPacket + } + if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { + return 0, errUnimplemented + } + + pkt := packet{ + packetType: data[0], + bodySize: C_AMF_DecodeInt24(data[1:4]), + timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24, + channel: chanSource, + info: s.streamID, + } + + pkt.resize(pkt.bodySize, headerSizeAuto) + copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize]) + err := pkt.write(s, false) if err != nil { return 0, err } return len(data), nil } +// I/O functions + +// read from an RTMP connection. Sends a bytes received message if the +// number of bytes received (nBytesIn) is greater than the number sent +// (nBytesInSent) by 10% of the bandwidth. +func (s *Session) read(buf []byte) (int, error) { + err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) + if err != nil { + return 0, err + } + n, err := io.ReadFull(s.link.conn, buf) + if err != nil { + s.log(DebugLevel, pkg+"read failed", "error", err.Error()) + return 0, err + } + s.nBytesIn += int32(n) + if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { + err := sendBytesReceived(s) + if err != nil { + return n, err // NB: we still read n bytes, even though send bytes failed + } + } + return n, nil +} + +// write to an RTMP connection. +func (s *Session) write(buf []byte) (int, error) { + //ToDo: consider using a different timeout for writes than for reads + err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) + if err != nil { + return 0, err + } + n, err := s.link.conn.Write(buf) + if err != nil { + s.log(WarnLevel, pkg+"write failed", "error", err.Error()) + return 0, err + } + return n, nil +} + // isConnected returns true if the RTMP connection is up. func (s *Session) isConnected() bool { return s.link.conn != nil @@ -173,5 +245,5 @@ func (s *Session) isConnected() bool { // enableWrite enables the current session for writing. func (s *Session) enableWrite() { - s.link.protocol |= RTMP_FEATURE_WRITE + s.link.protocol |= featureWrite }