diff --git a/revid/senders.go b/revid/senders.go index a06762c6..145362f5 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -266,7 +266,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg var sess *rtmp.Session var err error for n := 0; n < retries; n++ { - sess = rtmp.NewSession(url, timeout) + sess = rtmp.NewSession(url, timeout, log) err = sess.Open() if err == nil { break @@ -312,7 +312,7 @@ func (s *rtmpSender) restart() error { return err } for n := 0; n < s.retries; n++ { - s.sess = rtmp.NewSession(s.url, s.timeout) + s.sess = rtmp.NewSession(s.url, s.timeout, s.log) err = s.sess.Open() if err == nil { break diff --git a/rtmp/packet.go b/rtmp/packet.go index 95efb658..dac3d803 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -3,7 +3,7 @@ NAME packet.go DESCRIPTION - See Readme.md + RTMP packet functionality. AUTHORS Saxon Nelson-Milton @@ -36,40 +36,48 @@ package rtmp import ( "encoding/binary" - "log" + "io" ) +// Packet types. const ( - RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 - RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 - RTMP_PACKET_TYPE_CONTROL = 0x04 - RTMP_PACKET_TYPE_SERVER_BW = 0x05 - RTMP_PACKET_TYPE_CLIENT_BW = 0x06 - RTMP_PACKET_TYPE_AUDIO = 0x08 - RTMP_PACKET_TYPE_VIDEO = 0x09 - RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F - RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 - RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 - RTMP_PACKET_TYPE_INFO = 0x12 - RTMP_PACKET_TYPE_INVOKE = 0x14 - RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 + packetTypeChunkSize = 0x01 + packetTypeBytesReadReport = 0x03 + packetTypeControl = 0x04 + packetTypeServerBW = 0x05 + packetTypeClientBW = 0x06 + packetTypeAudio = 0x08 + packetTypeVideo = 0x09 + packetTypeFlexStreamSend = 0x0F // not implemented + packetTypeFlexSharedObject = 0x10 // not implemented + packetTypeFlexMessage = 0x11 // not implemented + packetTypeInfo = 0x12 + packetTypeInvoke = 0x14 + packetTypeFlashVideo = 0x16 // not implemented ) +// Header sizes. const ( - RTMP_PACKET_SIZE_LARGE = 0 - RTMP_PACKET_SIZE_MEDIUM = 1 - RTMP_PACKET_SIZE_SMALL = 2 - RTMP_PACKET_SIZE_MINIMUM = 3 + headerSizeLarge = 0 + headerSizeMedium = 1 + headerSizeSmall = 2 + headerSizeMinimum = 3 + headerSizeAuto = 4 ) +// Special channels. const ( - RTMP_CHANNEL_BYTES_READ = 0x02 - RTMP_CHANNEL_CONTROL = 0x03 - RTMP_CHANNEL_SOURCE = 0x04 + chanBytesRead = 0x02 + chanControl = 0x03 + chanSource = 0x04 ) -// packetSize defines valid packet sizes. -var packetSize = [...]int{12, 8, 4, 1} +// headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: +// 0: full header (12 bytes) +// 1: header without message ID (8 bytes) +// 2: basic header + timestamp (4 byes) +// 3: basic header (chunk type and stream ID) (1 byte) +var headerSizes = [...]int{12, 8, 4, 1} // packet defines an RTMP packet. type packet struct { @@ -90,18 +98,20 @@ type packet struct { type chunk struct { headerSize int32 data []byte - header [RTMP_MAX_HEADER_SIZE]byte + header [fullHeaderSize]byte } -// ToDo: Consider making the following functions into methods. -// readPacket reads a packet. -func readPacket(s *Session, pkt *packet) error { - var hbuf [RTMP_MAX_HEADER_SIZE]byte +// read reads a packet. +func (pkt *packet) read(s *Session) error { + var hbuf [fullHeaderSize]byte header := hbuf[:] - err := readN(s, header[:1]) + _, err := s.read(header[:1]) if err != nil { - log.Println("readPacket: failed to read RTMP packet header!") + s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) + if err == io.EOF { + s.log(WarnLevel, pkg+"EOF error; connection likely terminated") + } return err } pkt.headerType = (header[0] & 0xc0) >> 6 @@ -110,23 +120,22 @@ func readPacket(s *Session, pkt *packet) error { switch { case pkt.channel == 0: - err = readN(s, header[:1]) + _, err = s.read(header[:1]) if err != nil { - log.Println("readPacket: failed to read rtmp packet header 2nd byte.") + s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) return err } header = header[1:] pkt.channel = int32(header[0]) + 64 case pkt.channel == 1: - err = readN(s, header[:2]) + _, err = s.read(header[:2]) if err != nil { - log.Println("readPacket: failed to read RTMP packet 3rd byte") + s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) return err } header = header[2:] pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 - } if pkt.channel >= s.channelsAllocatedIn { @@ -134,49 +143,46 @@ func readPacket(s *Session, pkt *packet) error { timestamp := append(s.channelTimestamp, make([]int32, 10)...) var pkts []*packet - if s.vecChannelsIn == nil { + if s.channelsIn == nil { pkts = make([]*packet, n) } else { - pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) + pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) } s.channelTimestamp = timestamp - s.vecChannelsIn = pkts + s.channelsIn = pkts for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { s.channelTimestamp[i] = 0 } for i := int(s.channelsAllocatedIn); i < int(n); i++ { - s.vecChannelsIn[i] = nil + s.channelsIn[i] = nil } s.channelsAllocatedIn = n } - size := packetSize[pkt.headerType] + size := headerSizes[pkt.headerType] switch { - case size == RTMP_LARGE_HEADER_SIZE: + case size == fullHeaderSize: pkt.hasAbsTimestamp = true - case size < RTMP_LARGE_HEADER_SIZE: - if s.vecChannelsIn[pkt.channel] != nil { - *pkt = *(s.vecChannelsIn[pkt.channel]) + case size < fullHeaderSize: + if s.channelsIn[pkt.channel] != nil { + *pkt = *(s.channelsIn[pkt.channel]) } } - size-- if size > 0 { - err = readN(s, header[:size]) + _, err = s.read(header[:size]) if err != nil { - log.Println("readPacket: failed to read rtmp packet heades.") + s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) return err } } - hSize := len(hbuf) - len(header) + size if size >= 3 { pkt.timestamp = C_AMF_DecodeInt24(header[:3]) - if size >= 6 { pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) pkt.bytesRead = 0 @@ -193,9 +199,9 @@ func readPacket(s *Session, pkt *packet) error { extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { - err = readN(s, header[size:size+4]) + _, err = s.read(header[size : size+4]) if err != nil { - log.Println("readPacket: Failed to read extended timestamp") + s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err } // TODO: port this @@ -204,7 +210,7 @@ func readPacket(s *Session, pkt *packet) error { } if pkt.bodySize > 0 && pkt.body == nil { - resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) + pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } toRead := int32(pkt.bodySize - pkt.bytesRead) @@ -215,27 +221,28 @@ func readPacket(s *Session, pkt *packet) error { } if pkt.chunk != nil { + panic("non-nil chunk") pkt.chunk.headerSize = int32(hSize) copy(pkt.chunk.header[:], hbuf[:hSize]) pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] } - err = readN(s, pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { - log.Println("readPacket: failed to read RTMP packet body") + s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } pkt.bytesRead += uint32(chunkSize) // keep the packet as ref for other packets on this channel - if s.vecChannelsIn[pkt.channel] == nil { - s.vecChannelsIn[pkt.channel] = &packet{} + if s.channelsIn[pkt.channel] == nil { + s.channelsIn[pkt.channel] = &packet{} } - *(s.vecChannelsIn[pkt.channel]) = *pkt + *(s.channelsIn[pkt.channel]) = *pkt if extendedTimestamp { - s.vecChannelsIn[pkt.channel].timestamp = 0xffffff + s.channelsIn[pkt.channel].timestamp = 0xffffff } if pkt.bytesRead != pkt.bodySize { @@ -248,77 +255,90 @@ func readPacket(s *Session, pkt *packet) error { } s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) - s.vecChannelsIn[pkt.channel].body = nil - s.vecChannelsIn[pkt.channel].bytesRead = 0 - s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false + s.channelsIn[pkt.channel].body = nil + s.channelsIn[pkt.channel].bytesRead = 0 + s.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } -// resizePacket adjust the packet's storage to accommodate a body of the given size. -func resizePacket(pkt *packet, size uint32, ht uint8) { - buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) - pkt.headerType = ht +// resize adjusts the packet's storage to accommodate a body of the given size and header type. +func (pkt *packet) resize(size uint32, ht uint8) { + buf := make([]byte, fullHeaderSize+size) pkt.header = buf - pkt.body = buf[RTMP_MAX_HEADER_SIZE:] + pkt.body = buf[fullHeaderSize:] + if ht != headerSizeAuto { + pkt.headerType = ht + return + } + switch pkt.packetType { + case packetTypeVideo, packetTypeAudio: + if pkt.timestamp == 0 { + pkt.headerType = headerSizeLarge + } else { + pkt.headerType = headerSizeMedium + } + case packetTypeInfo: + pkt.headerType = headerSizeLarge + pkt.bodySize += 16 + default: + pkt.headerType = headerSizeMedium + } } -// sendPacket sends a packet. -func sendPacket(s *Session, pkt *packet, queue bool) error { - var prevPkt *packet - var last int +// write sends a packet. +// When queue is true, we expect a response to this request and cache the method on s.methodCalls. +func (pkt *packet) write(s *Session, queue bool) error { + if pkt.body == nil { + return errInvalidBody + } if pkt.channel >= s.channelsAllocatedOut { + s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) n := int(pkt.channel + 10) var pkts []*packet - if s.vecChannelsOut == nil { + if s.channelsOut == nil { pkts = make([]*packet, n) } else { - pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) + pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - s.vecChannelsOut = pkts + s.channelsOut = pkts for i := int(s.channelsAllocatedOut); i < n; i++ { - s.vecChannelsOut[i] = nil + s.channelsOut[i] = nil } s.channelsAllocatedOut = int32(n) } - prevPkt = s.vecChannelsOut[pkt.channel] - if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { + prevPkt := s.channelsOut[pkt.channel] + var last int + if prevPkt != nil && pkt.headerType != headerSizeLarge { // compress a bit by using the prev packet's attributes - if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { - pkt.headerType = RTMP_PACKET_SIZE_SMALL + if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { + pkt.headerType = headerSizeSmall } - if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { - pkt.headerType = RTMP_PACKET_SIZE_MINIMUM + if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall { + pkt.headerType = headerSizeMinimum } last = int(prevPkt.timestamp) } if pkt.headerType > 3 { - log.Printf("Sanity failed! trying to send header of type: 0x%02x.", - pkt.headerType) + s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) return errInvalidHeader } - var headBytes []byte - var origIdx int - if pkt.body != nil { - // Span from -packetsize for the type to the start of the body. - headBytes = pkt.header - origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] - } else { - // Allocate a new header and allow 6 bytes of movement backward. - var hbuf [RTMP_MAX_HEADER_SIZE]byte - headBytes = hbuf[:] - origIdx = 6 - } + // The complete packet starts from headerSize _before_ the start the body. + // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. + headBytes := pkt.header + hSize := headerSizes[pkt.headerType] + origIdx := fullHeaderSize - hSize - var cSize int + // adjust 1 or 2 bytes for the channel + cSize := 0 switch { case pkt.channel > 319: cSize = 2 @@ -326,12 +346,12 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { cSize = 1 } - hSize := packetSize[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize } + // adjust 4 bytes for the timestamp var ts uint32 if prevPkt != nil { ts = uint32(int(pkt.timestamp) - last) @@ -339,7 +359,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { if ts >= 0xffffff { origIdx -= 4 hSize += 4 - log.Printf("Larger timestamp than 24-bit: 0x%v", ts) + s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) } headerIdx := origIdx @@ -367,7 +387,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } } - if packetSize[pkt.headerType] > 1 { + if headerSizes[pkt.headerType] > 1 { res := ts if ts > 0xffffff { res = 0xffffff @@ -376,16 +396,16 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { headerIdx += 3 // 24bits } - if packetSize[pkt.headerType] > 4 { + if headerSizes[pkt.headerType] > 4 { C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ } - if packetSize[pkt.headerType] > 8 { - n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) - headerIdx += n + if headerSizes[pkt.headerType] > 8 { + binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info)) + headerIdx += 4 // 32bits } if ts >= 0xffffff { @@ -396,39 +416,43 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { size := int(pkt.bodySize) chunkSize := int(s.outChunkSize) - if debugMode { - log.Printf("sendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size) - } - - // Send the previously deferred packet if combining it with the next packet would exceed the chunk size. - if s.defered != nil && len(s.defered)+size+hSize > chunkSize { - err := writeN(s, s.defered) - if err != nil { - return err + if s.deferred == nil { + // Defer sending small audio packets (at most once). + if pkt.packetType == packetTypeAudio && size < chunkSize { + s.deferred = headBytes[origIdx:][:size+hSize] + s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) + return nil + } + } else { + // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + if len(s.deferred)+size+hSize > chunkSize { + s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred)) + _, err := s.write(s.deferred) + if err != nil { + return err + } + s.deferred = nil } - s.defered = nil } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. + s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { - if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { - s.defered = headBytes[origIdx:][:size+hSize] - break - } if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] - if s.defered != nil { + if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. - bytes = append(s.defered, bytes...) + s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) + bytes = append(s.deferred, bytes...) } - err := writeN(s, bytes) + _, err := s.write(bytes) if err != nil { return err } - s.defered = nil + s.deferred = nil size -= chunkSize origIdx += chunkSize + hSize @@ -461,13 +485,10 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } // We invoked a remote method - if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { + if pkt.packetType == packetTypeInvoke { buf := pkt.body[1:] meth := C_AMF_DecodeString(buf) - - if debugMode { - log.Printf("invoking %v", meth) - } + s.log(DebugLevel, pkg+"invoking method "+meth) // keep it in call queue till result arrives if queue { buf = buf[3+len(meth):] @@ -476,10 +497,10 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } } - if s.vecChannelsOut[pkt.channel] == nil { - s.vecChannelsOut[pkt.channel] = &packet{} + if s.channelsOut[pkt.channel] == nil { + s.channelsOut[pkt.channel] = &packet{} } - *(s.vecChannelsOut[pkt.channel]) = *pkt + *(s.channelsOut[pkt.channel]) = *pkt return nil } diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index 97110eb8..eae4277e 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -8,9 +8,10 @@ DESCRIPTION AUTHOR Dan Kortschak Saxon Nelson-Milton + Alan Noble LICENSE - parseurl.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + parseurl.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -33,7 +34,6 @@ LICENSE package rtmp import ( - "log" "net/url" "path" "strconv" @@ -41,30 +41,29 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). +// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { - log.Printf("failed to parse addr: %v", err) return protocol, host, port, app, playpath, err } switch u.Scheme { case "rtmp": - protocol = RTMP_PROTOCOL_RTMP + protocol = protoRTMP case "rtmpt": - protocol = RTMP_PROTOCOL_RTMPT + protocol = protoRTMPT case "rtmps": - protocol = RTMP_PROTOCOL_RTMPS + protocol = protoRTMPS case "rtmpe": - protocol = RTMP_PROTOCOL_RTMPE + protocol = protoRTMPE case "rtmfp": - protocol = RTMP_PROTOCOL_RTMFP + protocol = protoRTMFP case "rtmpte": - protocol = RTMP_PROTOCOL_RTMPTE + protocol = protoRTMPTE case "rtmpts": - protocol = RTMP_PROTOCOL_RTMPTS + protocol = protoRTMPTS default: - log.Printf("unknown scheme: %q", u.Scheme) return protocol, host, port, app, playpath, errUnknownScheme } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index d73c2fc2..4b334587 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -3,7 +3,7 @@ NAME rtmp.go DESCRIPTION - See Readme.md + RTMP command functionality. AUTHORS Saxon Nelson-Milton @@ -38,9 +38,6 @@ import ( "bytes" "encoding/binary" "errors" - "fmt" - "io" - "log" "math/rand" "net" "strconv" @@ -48,63 +45,94 @@ import ( ) const ( - minDataSize = 11 - debugMode = false - length = 512 + pkg = "rtmp:" + minDataSize = 11 // ToDo: this should be the same as fullHeaderSize + signatureSize = 1536 + fullHeaderSize = 12 ) +// Link flags. const ( - setDataFrame = "@setDataFrame" + linkAuth = 0x0001 // using auth param + linkLive = 0x0002 // stream is live + linkSWF = 0x0004 // do SWF verification - not implemented + linkPlaylist = 0x0008 // send playlist before play - not implemented + linkBufx = 0x0010 // toggle stream on BufferEmpty msg - not implemented +) - av__checkbw = "_checkbw" - av__onbwcheck = "_onbwcheck" - av__onbwdone = "_onbwdone" - av__result = "_result" - av_app = "app" - av_audioCodecs = "audioCodecs" - av_capabilities = "capabilities" - av_close = "close" - av_code = "code" - av_connect = "connect" - av_createStream = "createStream" - av_deleteStream = "deleteStream" - av_FCPublish = "FCPublish" - av_FCUnpublish = "FCUnpublish" - av_flashVer = "flashVer" - av_fpad = "fpad" - av_level = "level" - av_live = "live" - av_NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" - av_NetStream_Failed = "NetStream.Failed" - av_NetStream_Pause_Notify = "NetStream.Pause.Notify" - av_NetStream_Play_Complete = "NetStream.Play.Complete" - av_NetStream_Play_Failed = "NetStream.Play.Failed" - av_NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" - av_NetStream_Play_Start = "NetStream.Play.Start" - av_NetStream_Play_Stop = "NetStream.Play.Stop" - av_NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" - av_NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" - av_NetStream_Publish_Start = "NetStream.Publish.Start" - av_NetStream_Seek_Notify = "NetStream.Seek.Notify" - av_nonprivate = "nonprivate" - av_objectEncoding = "objectEncoding" - av_onBWDone = "onBWDone" - av_onFCSubscribe = "onFCSubscribe" - av_onFCUnsubscribe = "onFCUnsubscribe" - av_onStatus = "onStatus" - av_pageUrl = "pageUrl" - av_ping = "ping" - av_play = "play" - av_playlist_ready = "playlist_ready" - av_publish = "publish" - av_releaseStream = "releaseStream" - av_secureToken = "secureToken" - av_set_playlist = "set_playlist" - av_swfUrl = "swfUrl" - av_tcUrl = "tcUrl" - av_type = "type" - av_videoCodecs = "videoCodecs" - av_videoFunction = "videoFunction" +// Protocol features. +const ( + featureHTTP = 0x01 // not implemented + featureEncode = 0x02 // not implemented + featureSSL = 0x04 // not implemented + featureMFP = 0x08 // not implemented + featureWrite = 0x10 // publish, not play + featureHTTP2 = 0x20 // server-side RTMPT - not implemented +) + +// RTMP protocols. +const ( + protoRTMP = 0 + protoRTMPE = featureEncode + protoRTMPT = featureHTTP + protoRTMPS = featureSSL + protoRTMPTE = (featureHTTP | featureEncode) + protoRTMPTS = (featureHTTP | featureSSL) + protoRTMFP = featureMFP +) + +// RTMP tokens (lexemes). +// NB: Underscores are deliberately preserved in const names where they exist in the corresponding tokens. +const ( + av_checkbw = "_checkbw" + av_onbwcheck = "_onbwcheck" + av_onbwdone = "_onbwdone" + av_result = "_result" + avApp = "app" + avAudioCodecs = "audioCodecs" + avCapabilities = "capabilities" + avClose = "close" + avCode = "code" + avConnect = "connect" + avCreatestream = "createStream" + avDeletestream = "deleteStream" + avFCPublish = "FCPublish" + avFCUnpublish = "FCUnpublish" + avFlashver = "flashVer" + avFpad = "fpad" + avLevel = "level" + avLive = "live" + avNetConnectionConnectInvalidApp = "NetConnection.Connect.InvalidApp" + avNetStreamFailed = "NetStream.Failed" + avNetStreamPauseNotify = "NetStream.Pause.Notify" + avNetStreamPlayComplete = "NetStream.Play.Complete" + avNetStreamPlayFailed = "NetStream.Play.Failed" + avNetStreamPlayPublishNotify = "NetStream.Play.PublishNotify" + avNetStreamPlayStart = "NetStream.Play.Start" + avNetStreamPlayStop = "NetStream.Play.Stop" + avNetStreamPlayStreamNotFound = "NetStream.Play.StreamNotFound" + avNetStreamPlayUnpublishNotify = "NetStream.Play.UnpublishNotify" + avNetStreamPublish_Start = "NetStream.Publish.Start" + avNetStreamSeekNotify = "NetStream.Seek.Notify" + avNonprivate = "nonprivate" + avObjectEncoding = "objectEncoding" + avOnBWDone = "onBWDone" + avOnFCSubscribe = "onFCSubscribe" + avOnFCUnsubscribe = "onFCUnsubscribe" + avOnStatus = "onStatus" + avPageUrl = "pageUrl" + avPing = "ping" + avPlay = "play" + avPlaylist_ready = "playlist_ready" + avPublish = "publish" + avReleasestream = "releaseStream" + avSecureToken = "secureToken" + avSet_playlist = "set_playlist" + avSwfUrl = "swfUrl" + avTcUrl = "tcUrl" + avType = "type" + avVideoCodecs = "videoCodecs" + avVideoFunction = "videoFunction" ) // RTMP protocol strings. @@ -123,7 +151,9 @@ var rtmpProtocolStrings = [...]string{ // RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") + errConnected = errors.New("rtmp: already connected") errNotConnected = errors.New("rtmp: not connected") + errNotWritable = errors.New("rtmp: connection not writable") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") @@ -132,31 +162,30 @@ var ( errTinyPacket = errors.New("rtmp: packet too small") errEncoding = errors.New("rtmp: encoding error") errDecoding = errors.New("rtmp: decoding error") - errCopying = errors.New("rtmp: copying error") + errUnimplemented = errors.New("rtmp: unimplemented feature") ) // setupURL parses the RTMP URL. -func setupURL(s *Session, addr string) (err error) { - s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(addr) +func setupURL(s *Session) (err error) { + s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url) if err != nil { return err } if s.link.tcUrl == "" { if s.link.app != "" { - s.link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", - rtmpProtocolStrings[s.link.protocol], s.link.host, s.link.port, s.link.app) - s.link.lFlags |= RTMP_LF_FTCU + s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app } else { - s.link.tcUrl = addr + s.link.tcUrl = s.url } } if s.link.port == 0 { switch { - case (s.link.protocol & RTMP_FEATURE_SSL) != 0: + case (s.link.protocol & featureSSL) != 0: s.link.port = 433 - case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: + s.log(FatalLevel, pkg+"SSL not supported") + case (s.link.protocol & featureHTTP) != 0: s.link.port = 80 default: s.link.port = 1935 @@ -173,22 +202,19 @@ func connect(s *Session) error { } s.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { + s.log(WarnLevel, pkg+"dial failed", "error", err.Error()) return err } - if debugMode { - log.Println("... connected, handshaking...") - } + s.log(DebugLevel, pkg+"connected") err = handshake(s) if err != nil { - log.Println("connect: handshake failed") + s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) return errHandshake } - if debugMode { - log.Println("... handshaked...") - } + s.log(DebugLevel, pkg+"handshaked") err = sendConnectPacket(s) if err != nil { - log.Println("connect: sendConnect failed") + s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return errConnSend } return nil @@ -196,57 +222,50 @@ func connect(s *Session) error { // connectStream reads a packet and handles it func connectStream(s *Session) error { - var pkt packet - - for !s.isPlaying && s.isConnected() { - err := readPacket(s, &pkt) + var err error + for !s.isPlaying { + pkt := packet{} + err = pkt.read(s) if err != nil { break } - if pkt.bodySize == 0 { - continue + switch pkt.packetType { + case packetTypeAudio, packetTypeVideo, packetTypeInfo: + s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) + default: + err = handlePacket(s, &pkt) + if err != nil { + break + } } - - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || - pkt.packetType == RTMP_PACKET_TYPE_VIDEO || - pkt.packetType == RTMP_PACKET_TYPE_INFO { - log.Println("connectStream: got packet before play()! Ignoring.") - pkt.body = nil - continue - } - - handlePacket(s, &pkt) - pkt.body = nil } if !s.isPlaying { - return errConnStream + return err } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean -func handlePacket(s *Session, pkt *packet) int32 { - var hasMediaPacket int32 +func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { - - case RTMP_PACKET_TYPE_CHUNK_SIZE: + case packetTypeChunkSize: if pkt.bodySize >= 4 { s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) } - case RTMP_PACKET_TYPE_BYTES_READ_REPORT: + case packetTypeBytesReadReport: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - case RTMP_PACKET_TYPE_CONTROL: - panic("Unsupported packet type RTMP_PACKET_TYPE_CONTROL") + case packetTypeControl: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") - case RTMP_PACKET_TYPE_SERVER_BW: + case packetTypeServerBW: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - case RTMP_PACKET_TYPE_CLIENT_BW: + case packetTypeClientBW: s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] @@ -254,75 +273,31 @@ func handlePacket(s *Session, pkt *packet) int32 { s.clientBW2 = 0xff } - case RTMP_PACKET_TYPE_AUDIO: - panic("Unsupported packet type RTMP_PACKET_TYPE_AUDIO") + case packetTypeAudio: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio") - case RTMP_PACKET_TYPE_VIDEO: - panic("Unsupported packet type RTMP_PACKET_TYPE_VIDEO") + case packetTypeVideo: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo") - case RTMP_PACKET_TYPE_FLEX_MESSAGE: - panic("Unsupported packet type RTMP_PACKET_TYPE_FLEX_MESSAGE") + case packetTypeFlexMessage: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage") - case RTMP_PACKET_TYPE_INFO: - panic("Unsupported packet type RTMP_PACKET_TYPE_INFO") + case packetTypeInfo: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo") - case RTMP_PACKET_TYPE_INVOKE: - if debugMode { - log.Println("RTMP_PACKET_TYPE_INVOKE:") - } + case packetTypeInvoke: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. - log.Println("HasMediaPacket") - hasMediaPacket = 2 - } - - case RTMP_PACKET_TYPE_FLASH_VIDEO: - panic("Unsupported packet type RTMP_PACKET_TYPE_FLASH_VIDEO") - - default: - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,pkt.packetType); - } - return hasMediaPacket -} - -func readN(s *Session, buf []byte) error { - err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) - if err != nil { - return err - } - n, err := io.ReadFull(s.link.conn, buf) - if err != nil { - if debugMode { - log.Printf("readN error: %v\n", err) - } - s.close() - return err - } - s.nBytesIn += int32(n) - if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { - err := sendBytesReceived(s) - if err != nil { + s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) return err } - } - return nil -} -func writeN(s *Session, buf []byte) error { - //ToDo: consider using a different timeout for writes than for reads - err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) - if err != nil { - return err - } - _, err = s.link.conn.Write(buf) - if err != nil { - if debugMode { - log.Printf("writeN, RTMP send error: %v\n", err) - } - s.close() - return err + case packetTypeFlashVideo: + s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") + + default: + s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } return nil } @@ -330,15 +305,15 @@ func writeN(s *Session, buf []byte) error { func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_connect) + enc = C_AMF_EncodeString(enc, avConnect) if enc == nil { return errEncoding } @@ -350,60 +325,60 @@ func sendConnectPacket(s *Session) error { enc[0] = AMF_OBJECT enc = enc[1:] - enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) + enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app) if enc == nil { return errEncoding } - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) + if s.link.protocol&featureWrite != 0 { + enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { - enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) + enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) + enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) + enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } - if s.link.protocol&RTMP_FEATURE_WRITE == 0 { - enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) + if s.link.protocol&featureWrite == 0 { + enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) + enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) + enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) + enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) + enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) + enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl) if enc == nil { return errEncoding } @@ -411,20 +386,18 @@ func sendConnectPacket(s *Session) error { } if s.encoding != 0.0 || s.sendEncoding { - enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) + enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding) if enc == nil { return errEncoding } } - if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { - return errCopying // TODO: is this even possible? - } + copy(enc, []byte{0, 0, AMF_OBJECT_END}) enc = enc[3:] - /* add auth string */ + // add auth string if s.link.auth != "" { - enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) + enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0) if enc == nil { return errEncoding } @@ -441,23 +414,23 @@ func sendConnectPacket(s *Session) error { } } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, true) + return pkt.write(s, true) // response expected } func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_createStream) + enc = C_AMF_EncodeString(enc, avCreatestream) if enc == nil { return errEncoding } @@ -469,23 +442,23 @@ func sendCreateStream(s *Session) error { enc[0] = AMF_NULL enc = enc[1:] - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, true) + return pkt.write(s, true) // response expected } func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_releaseStream) + enc = C_AMF_EncodeString(enc, avReleasestream) if enc == nil { return errEncoding } @@ -500,23 +473,23 @@ func sendReleaseStream(s *Session) error { if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_FCPublish) + enc = C_AMF_EncodeString(enc, avFCPublish) if enc == nil { return errEncoding } @@ -532,23 +505,23 @@ func sendFCPublish(s *Session) error { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_FCUnpublish) + enc = C_AMF_EncodeString(enc, avFCUnpublish) if enc == nil { return errEncoding } @@ -564,23 +537,23 @@ func sendFCUnpublish(s *Session) error { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_SOURCE, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanSource, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_publish) + enc = C_AMF_EncodeString(enc, avPublish) if enc == nil { return errEncoding } @@ -595,28 +568,28 @@ func sendPublish(s *Session) error { if enc == nil { return errEncoding } - enc = C_AMF_EncodeString(enc, av_live) + enc = C_AMF_EncodeString(enc, avLive) if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, true) + return pkt.write(s, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_deleteStream) + enc = C_AMF_EncodeString(enc, avDeletestream) if enc == nil { return errEncoding } @@ -631,21 +604,20 @@ func sendDeleteStream(s *Session, dStreamId float64) error { if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - /* no response expected */ - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } // sendBytesReceived tells the server how many bytes the client has received. func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_BYTES_READ, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, + channel: chanBytesRead, + headerType: headerSizeMedium, + packetType: packetTypeBytesReadReport, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -656,21 +628,21 @@ func sendBytesReceived(s *Session) error { } pkt.bodySize = 4 - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av__checkbw) + enc = C_AMF_EncodeString(enc, av_checkbw) if enc == nil { return errEncoding } @@ -682,9 +654,9 @@ func sendCheckBW(s *Session) error { enc[0] = AMF_NULL enc = enc[1:] - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return sendPacket(s, &pkt, false) + return pkt.write(s, false) } func eraseMethod(m []method, i int) []method { @@ -694,7 +666,7 @@ func eraseMethod(m []method, i int) []method { } // int handleInvoke handles a packet invoke request -// Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start +// Side effects: s.isPlaying set to true upon avNetStreamPublish_Start func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody @@ -706,12 +678,11 @@ func handleInvoke(s *Session, body []byte) error { } meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) + s.log(DebugLevel, pkg+"invoking method "+meth) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); switch meth { - case av__result: + case av_result: var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { @@ -721,109 +692,110 @@ func handleInvoke(s *Session, body []byte) error { } } if methodInvoked == "" { - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", - //__FUNCTION__, txn); + s.log(WarnLevel, pkg+"received result without matching request", "id", txn) goto leave } - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, - //methodInvoked.av_val); + s.log(DebugLevel, pkg+"received result for "+methodInvoked) + switch methodInvoked { - case av_connect: + case avConnect: if s.link.token != "" { - panic("No support for link token") - + s.log(FatalLevel, pkg+"no support for link token") } - if (s.link.protocol & RTMP_FEATURE_WRITE) != 0 { - sendReleaseStream(s) - sendFCPublish(s) - } else { - panic("Link protocol has no RTMP_FEATURE_WRITE") + if (s.link.protocol & featureWrite) == 0 { + return errNotWritable + } + err := sendReleaseStream(s) + if err != nil { + return err + } + err = sendFCPublish(s) + if err != nil { + return err + } + err = sendCreateStream(s) + if err != nil { + return err } - sendCreateStream(s) - if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { - panic("Link protocol has no RTMP_FEATURE_WRITE") - } - - case av_createStream: + case avCreatestream: s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) - - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - sendPublish(s) - } else { - panic("Link protocol has no RTMP_FEATURE_WRITE") + if s.link.protocol&featureWrite == 0 { + return errNotWritable + } + err := sendPublish(s) + if err != nil { + return err } - case av_play, av_publish: - panic("Unsupported method av_play/av_publish") - } - //C.free(unsafe.Pointer(methodInvoked.av_val)) - - case av_onBWDone: - if s.bwCheckCounter == 0 { - sendCheckBW(s) + case avPlay, avPublish: + s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") } - case av_onFCUnsubscribe, av_onFCSubscribe: - panic("Unsupported method av_onFCUnsubscribe/av_onFCSubscribe") + case avOnBWDone: + if s.checkCounter == 0 { // ToDo: why is this always zero? + err := sendCheckBW(s) + if err != nil { + return err + } + } - case av_ping: - panic("Unsupported method av_ping") + case avOnFCUnsubscribe, avOnFCSubscribe: + s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe") - case av__onbwcheck: - panic("Unsupported method av_onbwcheck") + case avPing: + s.log(FatalLevel, pkg+"unsupported method avPing") - case av__onbwdone: - panic("Unsupported method av_onbwdone") + case av_onbwcheck: + s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") - case av_close: - panic("Unsupported method av_close") + case av_onbwdone: + s.log(FatalLevel, pkg+"unsupported method av_onbwdone") - case av_onStatus: + case avClose: + s.log(FatalLevel, pkg+"unsupported method avClose") + + case avOnStatus: var obj2 C_AMFObject C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) - code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_code, -1)) - - level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_level, -1)) // Not used. - _ = level - - // TODO use new logger - // RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); + code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1)) + level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1)) + s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { - case av_NetStream_Failed, av_NetStream_Play_Failed, - av_NetStream_Play_StreamNotFound, av_NetConnection_Connect_InvalidApp: - panic("Unsupported method av_NetStream/av_NetStream_Play_Failed/av_netSTream_Play_StreamNotFound/av_netConnection_Connect_invalidApp") + case avNetStreamFailed, avNetStreamPlayFailed, + avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: + s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") - case av_NetStream_Play_Start, av_NetStream_Play_PublishNotify: - panic("Unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") + case avNetStreamPlayStart, avNetStreamPlayPublishNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify") - case av_NetStream_Publish_Start: + case avNetStreamPublish_Start: + s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { - if m.name == av_publish { + if m.name == avPublish { s.methodCalls = eraseMethod(s.methodCalls, i) break } } + // ToDo: handle case when avPublish method not found - case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: - panic("Unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") + case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify") - case av_NetStream_Seek_Notify: - panic("Unsupported method av_netStream_Seek_Notify") + case avNetStreamSeekNotify: + s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify") - case av_NetStream_Pause_Notify: - panic("Unsupported method av_NetStream_Pause_Notify") + case avNetStreamPauseNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify") } - case av_playlist_ready: - panic("Unsupported method av_playlist_ready") + case avPlaylist_ready: + s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready") default: - panic(fmt.Sprintf("unknown method: %q", meth)) + s.log(FatalLevel, pkg+"unknown method "+meth) } leave: C_AMF_Reset(&obj) @@ -831,138 +803,56 @@ leave: } func handshake(s *Session) error { - var clientbuf [RTMP_SIG_SIZE + 1]byte + var clientbuf [signatureSize + 1]byte clientsig := clientbuf[1:] - var serversig [RTMP_SIG_SIZE]byte - clientbuf[0] = RTMP_CHANNEL_CONTROL + var serversig [signatureSize]byte + clientbuf[0] = chanControl binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) - for i := 8; i < RTMP_SIG_SIZE; i++ { + for i := 8; i < signatureSize; i++ { clientsig[i] = byte(rand.Intn(256)) } - err := writeN(s, clientbuf[:]) + _, err := s.write(clientbuf[:]) if err != nil { return err } + s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte - err = readN(s, typ[:]) + _, err = s.read(typ[:]) if err != nil { return err } + s.log(DebugLevel, pkg+"handshake received") - if debugMode { - log.Printf("handshake: Type answer: %v\n", typ[0]) - } if typ[0] != clientbuf[0] { - log.Printf("handshake: type mismatch: client sent %v, server sent: %v\n", - clientbuf[0], typ) + s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } - err = readN(s, serversig[:]) + _, err = s.read(serversig[:]) if err != nil { return err } // decode server response suptime := binary.BigEndian.Uint32(serversig[:4]) - _ = suptime - // RTMP_Log(RTMP_LOGDEBUG, "%s: Server Uptime : %d", __FUNCTION__, suptime) - // RTMP_Log(RTMP_LOGDEBUG, "%s: FMS Version : %d.%d.%d.%d", __FUNCTION__, - // serversig[4], serversig[5], serversig[6], serversig[7]) + s.log(DebugLevel, pkg+"server uptime", "uptime", suptime) // 2nd part of handshake - err = writeN(s, serversig[:]) + _, err = s.write(serversig[:]) if err != nil { return err } - err = readN(s, serversig[:]) + _, err = s.read(serversig[:]) if err != nil { return err } - if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { - log.Printf("Client signature does not match: %q != %q", - serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) - } - return nil -} - -// write prepares data to write then sends it. -func (s *Session) write(buf []byte) error { - pkt := packet{ - channel: RTMP_CHANNEL_SOURCE, - info: s.streamID, - } - var enc []byte - - for len(buf) != 0 { - if pkt.bytesRead == 0 { - if len(buf) < minDataSize { - return errTinyPacket - } - - if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' { - buf = buf[13:] - } - - pkt.packetType = buf[0] - buf = buf[1:] - pkt.bodySize = C_AMF_DecodeInt24(buf[:3]) - buf = buf[3:] - pkt.timestamp = C_AMF_DecodeInt24(buf[:3]) - buf = buf[3:] - pkt.timestamp |= uint32(buf[0]) << 24 - buf = buf[4:] - - headerType := uint8(RTMP_PACKET_SIZE_MEDIUM) - switch pkt.packetType { - case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: - if pkt.timestamp == 0 { - headerType = RTMP_PACKET_SIZE_LARGE - } - case RTMP_PACKET_TYPE_INFO: - headerType = RTMP_PACKET_SIZE_LARGE - pkt.bodySize += 16 - } - - resizePacket(&pkt, pkt.bodySize, headerType) - - enc = pkt.body[:pkt.bodySize] - if pkt.packetType == RTMP_PACKET_TYPE_INFO { - enc = C_AMF_EncodeString(enc, setDataFrame) - if enc == nil { - return errEncoding - } - pkt.bytesRead = uint32(len(pkt.body) - len(enc)) - } - - } else { - enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] - } - num := int(pkt.bodySize - pkt.bytesRead) - if num > len(buf) { - num = len(buf) - } - - copy(enc[:num], buf[:num]) - pkt.bytesRead += uint32(num) - buf = buf[num:] - if pkt.bytesRead == pkt.bodySize { - err := sendPacket(s, &pkt, false) - pkt.body = nil - pkt.bytesRead = 0 - if err != nil { - return err - } - if len(buf) < 4 { - return nil - } - buf = buf[4:] - } + if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) { + s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1]) } return nil } diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go deleted file mode 100644 index becb48be..00000000 --- a/rtmp/rtmp_headers.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -NAME - rtmp_headers.go - -DESCRIPTION - See Readme.md - -AUTHORS - Saxon Nelson-Milton - Dan Kortschak - Alan Noble - -LICENSE - rtmp_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -import ( - "net" -) - -const ( - RTMPT_OPEN = iota - RTMPT_SEND - RTMPT_IDLE - RTMPT_CLOSE -) - -const ( - RTMP_READ_HEADER = 0x01 - RTMP_READ_RESUME = 0x02 - RTMP_READ_NO_IGNORE = 0x04 - RTMP_READ_GOTKF = 0x08 - RTMP_READ_GOTFLVK = 0x10 - RTMP_READ_SEEKING = 0x20 - RTMP_READ_COMPLETE = -3 - RTMP_READ_ERROR = -2 - RTMP_READ_EOF = -1 - RTMP_READ_IGNORE = 0 -) - -const ( - RTMP_LF_AUTH = 0x0001 /* using auth param */ - RTMP_LF_LIVE = 0x0002 /* stream is live */ - RTMP_LF_SWFV = 0x0004 /* do SWF verification */ - RTMP_LF_PLST = 0x0008 /* send playlist before play */ - RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */ - RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */ - RTMP_LF_FAPU = 0x0040 /* free app on close */ -) - -const ( - RTMP_FEATURE_HTTP = 0x01 - RTMP_FEATURE_ENC = 0x02 - RTMP_FEATURE_SSL = 0x04 - RTMP_FEATURE_MFP = 0x08 /* not yet supported */ - RTMP_FEATURE_WRITE = 0x10 /* publish, not play */ - RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */ -) - -const ( - RTMP_PROTOCOL_RTMP = 0 - RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC - RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP - RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL - RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC) - RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL) - RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP -) - -const ( - RTMP_DEFAULT_CHUNKSIZE = 128 - RTMP_BUFFER_CACHE_SIZE = (16 * 1024) - RTMP_SIG_SIZE = 1536 - RTMP_LARGE_HEADER_SIZE = 12 - RTMP_MAX_HEADER_SIZE = 18 -) - -type link struct { - host string - playpath string - tcUrl string - swfUrl string - pageUrl string - app string - auth string - flashVer string - token string - extras C_AMFObject - lFlags int32 - swfAge int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -type method struct { - name string - num int32 -} diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go new file mode 100644 index 00000000..01dad10f --- /dev/null +++ b/rtmp/rtmp_test.go @@ -0,0 +1,232 @@ +/* +NAME + rtmp_test.go + +DESCRIPTION + RTMP tests + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + Alan Noble + +LICENSE + rtmp_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package rtmp + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "bitbucket.org/ausocean/av/stream/flv" + "bitbucket.org/ausocean/av/stream/lex" +) + +const ( + rtmpProtocol = "rtmp" + testHost = "a.rtmp.youtube.com" + testApp = "live2" + testBaseURL = rtmpProtocol + "://" + testHost + "/" + testApp + "/" + testTimeout = 30 + testDataDir = "../../test/test-data/av/input" +) + +// testVerbosity controls the amount of output. +// NB: This is not the log level, which is DebugLevel. +// 0: suppress logging completely +// 1: log messages only +// 2: log messages with errors, if any +var testVerbosity = 1 + +// testKey is the YouTube RTMP key required for YouTube streaming (RTMP_TEST_KEY env var). +// NB: don't share your key with others. +var testKey string + +// testFile is the test video file (RTMP_TEST_FILE env var). +// betterInput.h264 is a good one to use. +var testFile string + +// testLog is a bare bones logger that logs to stdout, and exits upon either an error or fatal error. +func testLog(level int8, msg string, params ...interface{}) { + logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} + if testVerbosity == 0 { + return + } + if level < -1 || level > 5 { + panic("Invalid log level") + } + switch testVerbosity { + case 0: + // silence is golden + case 1: + fmt.Printf("%s: %s\n", logLevels[level+1], msg) + case 2: + // extract the first param if it is one we care about, otherwise just print the message + if len(params) >= 2 { + switch params[0].(string) { + case "error": + fmt.Printf("%s: %s, error=%v\n", logLevels[level+1], msg, params[1].(string)) + case "size": + fmt.Printf("%s: %s, size=%d\n", logLevels[level+1], msg, params[1].(int)) + default: + fmt.Printf("%s: %s\n", logLevels[level+1], msg) + } + } else { + fmt.Printf("%s: %s\n", logLevels[level+1], msg) + } + } + if level >= 4 { + // Error or Fatal + buf := make([]byte, 1<<16) + size := runtime.Stack(buf, true) + fmt.Printf("%s\n", string(buf[:size])) + os.Exit(1) + } +} + +// TestKey tests that the RTMP_TEST_KEY environment variable is present +func TestKey(t *testing.T) { + testLog(0, "TestKey") + testKey = os.Getenv("RTMP_TEST_KEY") + if testKey == "" { + msg := "RTMP_TEST_KEY environment variable not defined" + testLog(0, msg) + t.Skip(msg) + } + testLog(0, "Testing against URL "+testBaseURL+testKey) +} + +// TestSetupURL tests URL parsing. +func TestSetupURL(t *testing.T) { + testLog(0, "TestSetupURL") + // test with just the base URL + s := NewSession(testBaseURL, testTimeout, testLog) + if s.url != testBaseURL && s.link.timeout != testTimeout { + t.Errorf("NewSession failed") + } + err := setupURL(s) + if err != nil { + t.Errorf("setupURL(testBaseURL) failed with error: %v", err) + } + // test the parts are as expected + if rtmpProtocolStrings[s.link.protocol] != rtmpProtocol { + t.Errorf("setupURL returned wrong protocol: %v", s.link.protocol) + } + if s.link.host != testHost { + t.Errorf("setupURL returned wrong host: %v", s.link.host) + } + if s.link.app != testApp { + t.Errorf("setupURL returned wrong app: %v", s.link.app) + } +} + +// TestOpenClose tests opening an closing an RTMP connection. +func TestOpenClose(t *testing.T) { + testLog(0, "TestOpenClose") + if testKey == "" { + t.Skip("Skipping TestOpenClose since no RTMP_TEST_KEY") + } + s := NewSession(testBaseURL+testKey, testTimeout, testLog) + err := s.Open() + if err != nil { + t.Errorf("Open failed with error: %v", err) + return + } + err = s.Close() + if err != nil { + t.Errorf("Close failed with error: %v", err) + return + } +} + +// TestFromFrame tests streaming from a single H.264 frame which is repeated. +func TestFromFrame(t *testing.T) { + testLog(0, "TestFromFrame") + if testKey == "" { + t.Skip("Skipping TestFromFrame since no RTMP_TEST_KEY") + } + s := NewSession(testBaseURL+testKey, testTimeout, testLog) + err := s.Open() + if err != nil { + t.Errorf("Session.Open failed with error: %v", err) + } + + b, err := ioutil.ReadFile(filepath.Join(testDataDir, "AusOcean_logo_1080p.h264")) + if err != nil { + t.Errorf("ReadFile failed with error: %v", err) + } + + // Pass RTMP session, true for audio, true for video, and 25 FPS + // ToDo: fix this. Although we can encode the file and YouTube + // doesn't complain, YouTube doesn't play it (even when we + // send 1 minute's worth). + flvEncoder := flv.NewEncoder(s, true, true, 25) + for i := 0; i < 25; i++ { + err := flvEncoder.Encode(b) + if err != nil { + t.Errorf("Encoding failed with error: %v", err) + } + time.Sleep(time.Millisecond / 25) // rate limit to 1/25s + } + + err = s.Close() + if err != nil { + t.Errorf("Session.Close failed with error: %v", err) + } +} + +// TestFromFile tests streaming from an video file comprising raw H.264. +// The test file is supplied via the RTMP_TEST_FILE environment variable. +func TestFromFile(t *testing.T) { + testLog(0, "TestFromFile") + testFile := os.Getenv("RTMP_TEST_FILE") + if testFile == "" { + t.Skip("Skipping TestFromFile since no RTMP_TEST_FILE") + } + if testKey == "" { + t.Skip("Skipping TestFromFile since no RTMP_TEST_KEY") + } + s := NewSession(testBaseURL+testKey, testTimeout, testLog) + err := s.Open() + if err != nil { + t.Errorf("Session.Open failed with error: %v", err) + } + + f, err := os.Open(testFile) + if err != nil { + t.Errorf("Open failed with error: %v", err) + } + defer f.Close() + + // Pass RTMP session, true for audio, true for video, and 25 FPS + flvEncoder := flv.NewEncoder(s, true, true, 25) + err = lex.H264(flvEncoder, f, time.Second/time.Duration(25)) + if err != nil { + t.Errorf("Lexing and encoding failed with error: %v", err) + } + + err = s.Close() + if err != nil { + t.Errorf("Session.Close failed with error: %v", err) + } +} diff --git a/rtmp/session.go b/rtmp/session.go index 6d1fad40..eb97c938 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -3,7 +3,7 @@ NAME session.go DESCRIPTION - See Readme.md + RTMP session functionality. AUTHORS Saxon Nelson-Milton @@ -34,16 +34,17 @@ LICENSE package rtmp import ( - "errors" + "io" + "net" + "time" ) // Session holds the state for an RTMP session. type Session struct { url string - timeout uint inChunkSize int32 outChunkSize int32 - bwCheckCounter int32 + checkCounter int32 nBytesIn int32 nBytesInSent int32 streamID int32 @@ -56,96 +57,116 @@ type Session struct { methodCalls []method channelsAllocatedIn int32 channelsAllocatedOut int32 - vecChannelsIn []*packet - vecChannelsOut []*packet + channelsIn []*packet + channelsOut []*packet channelTimestamp []int32 audioCodecs float64 videoCodecs float64 encoding float64 - defered []byte + deferred []byte link link + log Log } +// link represents RTMP URL and connection information. +type link struct { + host string + playpath string + tcUrl string + swfUrl string + pageUrl string + app string + auth string + flashVer string + token string + extras C_AMFObject + flags int32 + swfAge int32 + protocol int32 + timeout uint + port uint16 + conn *net.TCPConn +} + +// method represents an RTMP method. +type method struct { + name string + num int32 +} + +// Log defines the RTMP logging function. +type Log func(level int8, message string, params ...interface{}) + +// Log levels used by Log. +const ( + DebugLevel int8 = -1 + InfoLevel int8 = 0 + WarnLevel int8 = 1 + ErrorLevel int8 = 2 + FatalLevel int8 = 5 +) + // NewSession returns a new Session. -func NewSession(url string, connectTimeout uint) *Session { +func NewSession(url string, timeout uint, log Log) *Session { return &Session{ - url: url, - timeout: connectTimeout, + url: url, + inChunkSize: 128, + outChunkSize: 128, + clientBW: 2500000, + clientBW2: 2, + serverBW: 2500000, + audioCodecs: 3191.0, + videoCodecs: 252.0, + log: log, + link: link{ + timeout: timeout, + swfAge: 30, + }, } } // Open establishes an rtmp connection with the url passed into the constructor. func (s *Session) Open() error { + s.log(DebugLevel, pkg+"Session.Open") if s.isConnected() { - return errors.New("rtmp: attempt to start already running session") + return errConnected } - err := s.start() + err := setupURL(s) if err != nil { return err } - return nil -} - -// start does the heavylifting for Open(). -func (s *Session) start() error { - s.init() - err := setupURL(s, s.url) - if err != nil { - s.close() - return err - } s.enableWrite() err = connect(s) if err != nil { - s.close() + s.Close() return err } err = connectStream(s) if err != nil { - s.close() + s.Close() return err } return nil } -// init initializes various RTMP defauls. -// ToDo: define consts for the magic numbers. -func (s *Session) init() { - s.inChunkSize = RTMP_DEFAULT_CHUNKSIZE - s.outChunkSize = RTMP_DEFAULT_CHUNKSIZE - s.clientBW = 2500000 - s.clientBW2 = 2 - s.serverBW = 2500000 - s.audioCodecs = 3191.0 - s.videoCodecs = 252.0 - s.link.timeout = s.timeout - s.link.swfAge = 30 -} - -// Close terminates the rtmp connection, +// Close terminates the rtmp connection. +// NB: Close is idempotent and the session value is cleared completely. func (s *Session) Close() error { + s.log(DebugLevel, pkg+"Session.Close") if !s.isConnected() { return errNotConnected } - s.close() - return nil -} - -// close does the heavylifting for Close(). -// Any errors are ignored as it is often called in response to an earlier error. -func (s *Session) close() { - if s.isConnected() { - if s.streamID > 0 { - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - sendFCUnpublish(s) - } - sendDeleteStream(s, float64(s.streamID)) + if s.streamID > 0 { + if s.link.protocol&featureWrite != 0 { + sendFCUnpublish(s) } - s.link.conn.Close() + sendDeleteStream(s, float64(s.streamID)) } + s.link.conn.Close() *s = Session{} + return nil } // Write writes a frame (flv tag) to the rtmp connection. @@ -153,13 +174,70 @@ func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { return 0, errNotConnected } - err := s.write(data) + if len(data) < minDataSize { + return 0, errTinyPacket + } + if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { + return 0, errUnimplemented + } + + pkt := packet{ + packetType: data[0], + bodySize: C_AMF_DecodeInt24(data[1:4]), + timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24, + channel: chanSource, + info: s.streamID, + } + + pkt.resize(pkt.bodySize, headerSizeAuto) + copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize]) + err := pkt.write(s, false) if err != nil { return 0, err } return len(data), nil } +// I/O functions + +// read from an RTMP connection. Sends a bytes received message if the +// number of bytes received (nBytesIn) is greater than the number sent +// (nBytesInSent) by 10% of the bandwidth. +func (s *Session) read(buf []byte) (int, error) { + err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) + if err != nil { + return 0, err + } + n, err := io.ReadFull(s.link.conn, buf) + if err != nil { + s.log(DebugLevel, pkg+"read failed", "error", err.Error()) + return 0, err + } + s.nBytesIn += int32(n) + if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { + err := sendBytesReceived(s) + if err != nil { + return n, err // NB: we still read n bytes, even though send bytes failed + } + } + return n, nil +} + +// write to an RTMP connection. +func (s *Session) write(buf []byte) (int, error) { + //ToDo: consider using a different timeout for writes than for reads + err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) + if err != nil { + return 0, err + } + n, err := s.link.conn.Write(buf) + if err != nil { + s.log(WarnLevel, pkg+"write failed", "error", err.Error()) + return 0, err + } + return n, nil +} + // isConnected returns true if the RTMP connection is up. func (s *Session) isConnected() bool { return s.link.conn != nil @@ -167,5 +245,5 @@ func (s *Session) isConnected() bool { // enableWrite enables the current session for writing. func (s *Session) enableWrite() { - s.link.protocol |= RTMP_FEATURE_WRITE + s.link.protocol |= featureWrite } diff --git a/stream/mts/psi/psi.go b/stream/mts/psi/psi.go index c9c6b4bb..55f50f78 100644 --- a/stream/mts/psi/psi.go +++ b/stream/mts/psi/psi.go @@ -114,109 +114,6 @@ type Desc struct { Dd []byte // Descriptor data } -// ReadPSI creates a PSI data structure from a given byte slice that represents a PSI -func ReadPSI(data []byte) *PSI { - psi := PSI{} - pos := 0 - psi.Pf = data[pos] - if psi.Pf != 0 { - panic("No support for pointer filler bytes") - } - psi.Tid = data[pos] - pos++ - psi.Ssi = data[pos]&0x80 != 0 - psi.Pb = data[pos]&0x40 != 0 - psi.Sl = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1]) - pos += 2 - psi.Tss = readTSS(data[pos:], &psi) - return &psi -} - -// ReadTSS creates a TSS data structure from a given byte slice that represents a TSS -func readTSS(data []byte, p *PSI) *TSS { - tss := TSS{} - pos := 0 - tss.Tide = uint16(data[pos])<<8 | uint16(data[pos+1]) - pos += 2 - tss.V = (data[pos] & 0x3e) >> 1 - tss.Cni = data[pos]&0x01 != 0 - pos++ - tss.Sn = data[pos] - pos++ - tss.Lsn = data[pos] - pos++ - switch p.Tid { - case patID: - tss.Sd = readPAT(data[pos:], &tss) - case pmtID: - tss.Sd = readPMT(data[pos:], &tss) - default: - panic("Can't yet deal with tables that are not PAT or PMT") - } - return &tss -} - -// readPAT creates a pat struct based on a bytes slice representing a pat -func readPAT(data []byte, p *TSS) *PAT { - pat := PAT{} - pos := 0 - pat.Pn = uint16(data[pos])<<8 | uint16(data[pos+1]) - pos += 2 - pat.Pmpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1]) - return &pat -} - -// readPMT creates a pmt struct based on a bytes slice that represents a pmt -func readPMT(data []byte, p *TSS) *PAT { - pmt := PMT{} - pos := 0 - pmt.Pcrpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1]) - pos += 2 - pmt.Pil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1]) - pos += 2 - if pmt.Pil != 0 { - pmt.Pd = readDescs(data[pos:], int(pmt.Pil)) - } - pos += int(pmt.Pil) - // TODO Read ES stuff - pmt.Essd = readEssd(data[pos:]) - return nil -} - -// readDescs reads provides a slice of Descs given a byte slice that represents Descs -// and the no of bytes that the descs accumilate -func readDescs(data []byte, descLen int) (o []Desc) { - pos := 0 - o = make([]Desc, 1) - o[0].Dt = data[pos] - pos++ - o[0].Dl = data[pos] - pos++ - o[0].Dd = make([]byte, o[0].Dl) - for i := 0; i < int(o[0].Dl); i++ { - o[0].Dd[i] = data[pos] - pos++ - } - if 2+len(o[0].Dd) != descLen { - panic("No support for reading more than one descriptor") - } - return -} - -// readEESD creates an ESSD struct based on a bytes slice that represents ESSD -func readEssd(data []byte) *ESSD { - essd := ESSD{} - pos := 0 - essd.St = data[pos] - pos++ - essd.Epid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1]) - pos += 2 - essd.Esil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1]) - pos += 2 - essd.Esd = readDescs(data[pos:], int(essd.Esil)) - return &essd -} - // Bytes outputs a byte slice representation of the PSI func (p *PSI) Bytes() []byte { out := make([]byte, 4)