diff --git a/rtmp/packet.go b/rtmp/packet.go index 4fc2141b..3c50b59d 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -39,34 +39,37 @@ import ( "io" ) +// Packet types. const ( - RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 - RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 - RTMP_PACKET_TYPE_CONTROL = 0x04 - RTMP_PACKET_TYPE_SERVER_BW = 0x05 - RTMP_PACKET_TYPE_CLIENT_BW = 0x06 - RTMP_PACKET_TYPE_AUDIO = 0x08 - RTMP_PACKET_TYPE_VIDEO = 0x09 - RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F - RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 - RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 - RTMP_PACKET_TYPE_INFO = 0x12 - RTMP_PACKET_TYPE_INVOKE = 0x14 - RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 + packetTypeChunkSize = 0x01 + packetTypeBytesReadReport = 0x03 + packetTypeControl = 0x04 + packetTypeServerBW = 0x05 + packetTypeClientBW = 0x06 + packetTypeAudio = 0x08 + packetTypeVideo = 0x09 + packetTypeFlexStreamSend = 0x0F // not implemented + packetTypeFlexSharedObject = 0x10 // not implemented + packetTypeFlexMessage = 0x11 // not implemented + packetTypeInfo = 0x12 + packetTypeInvoke = 0x14 + packetTypeFlashVideo = 0x16 // not implemented ) +// Header sizes. const ( - RTMP_PACKET_SIZE_LARGE = 0 - RTMP_PACKET_SIZE_MEDIUM = 1 - RTMP_PACKET_SIZE_SMALL = 2 - RTMP_PACKET_SIZE_MINIMUM = 3 - RTMP_PACKET_SIZE_AUTO = 4 + headerSizeLarge = 0 + headerSizeMedium = 1 + headerSizeSmall = 2 + headerSizeMinimum = 3 + headerSizeAuto = 4 ) +// Special channels. const ( - RTMP_CHANNEL_BYTES_READ = 0x02 - RTMP_CHANNEL_CONTROL = 0x03 - RTMP_CHANNEL_SOURCE = 0x04 + chanBytesRead = 0x02 + chanControl = 0x03 + chanSource = 0x04 ) // headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: @@ -95,13 +98,12 @@ type packet struct { type chunk struct { headerSize int32 data []byte - header [RTMP_MAX_HEADER_SIZE]byte + header [fullHeaderSize]byte } -// ToDo: Consider making the following functions into methods. // read reads a packet. func (pkt *packet) read(s *Session) error { - var hbuf [RTMP_MAX_HEADER_SIZE]byte + var hbuf [fullHeaderSize]byte header := hbuf[:] err := s.read(header[:1]) @@ -161,9 +163,9 @@ func (pkt *packet) read(s *Session) error { size := headerSizes[pkt.headerType] switch { - case size == RTMP_LARGE_HEADER_SIZE: + case size == fullHeaderSize: pkt.hasAbsTimestamp = true - case size < RTMP_LARGE_HEADER_SIZE: + case size < fullHeaderSize: if s.channelsIn[pkt.channel] != nil { *pkt = *(s.channelsIn[pkt.channel]) } @@ -219,6 +221,7 @@ func (pkt *packet) read(s *Session) error { } if pkt.chunk != nil { + panic("non-nil chunk") pkt.chunk.headerSize = int32(hSize) copy(pkt.chunk.header[:], hbuf[:hSize]) pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] @@ -260,25 +263,25 @@ func (pkt *packet) read(s *Session) error { // resize adjusts the packet's storage to accommodate a body of the given size. func (pkt *packet) resize(size uint32, ht uint8) { - buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) + buf := make([]byte, fullHeaderSize+size) pkt.header = buf - pkt.body = buf[RTMP_MAX_HEADER_SIZE:] - if ht != RTMP_PACKET_SIZE_AUTO { + pkt.body = buf[fullHeaderSize:] + if ht != headerSizeAuto { pkt.headerType = ht return } switch pkt.packetType { - case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: + case packetTypeVideo, packetTypeAudio: if pkt.timestamp == 0 { - pkt.headerType = RTMP_PACKET_SIZE_LARGE + pkt.headerType = headerSizeLarge } else { - pkt.headerType = RTMP_PACKET_SIZE_MEDIUM + pkt.headerType = headerSizeMedium } - case RTMP_PACKET_TYPE_INFO: - pkt.headerType = RTMP_PACKET_SIZE_LARGE + case packetTypeInfo: + pkt.headerType = headerSizeLarge pkt.bodySize += 16 default: - pkt.headerType = RTMP_PACKET_SIZE_MEDIUM + pkt.headerType = headerSizeMedium } } @@ -309,14 +312,14 @@ func (pkt *packet) write(s *Session, queue bool) error { prevPkt := s.channelsOut[pkt.channel] var last int - if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { + if prevPkt != nil && pkt.headerType != headerSizeLarge { // compress a bit by using the prev packet's attributes - if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { - pkt.headerType = RTMP_PACKET_SIZE_SMALL + if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { + pkt.headerType = headerSizeSmall } - if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { - pkt.headerType = RTMP_PACKET_SIZE_MINIMUM + if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall { + pkt.headerType = headerSizeMinimum } last = int(prevPkt.timestamp) @@ -331,7 +334,7 @@ func (pkt *packet) write(s *Session, queue bool) error { // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. headBytes := pkt.header hSize := headerSizes[pkt.headerType] - origIdx := RTMP_MAX_HEADER_SIZE - hSize + origIdx := fullHeaderSize - hSize // adjust 1 or 2 bytes for the channel cSize := 0 @@ -414,7 +417,7 @@ func (pkt *packet) write(s *Session, queue bool) error { if s.deferred == nil { // Defer sending small audio packets (at most once). - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { + if pkt.packetType == packetTypeAudio && size < chunkSize { s.deferred = headBytes[origIdx:][:size+hSize] s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) return nil @@ -443,7 +446,6 @@ func (pkt *packet) write(s *Session, queue bool) error { // Prepend the previously deferred packet and write it with the current one. s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) bytes = append(s.deferred, bytes...) - s.deferred = nil } err := s.write(bytes) if err != nil { @@ -482,7 +484,7 @@ func (pkt *packet) write(s *Session, queue bool) error { } // We invoked a remote method - if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { + if pkt.packetType == packetTypeInvoke { buf := pkt.body[1:] meth := C_AMF_DecodeString(buf) s.log(DebugLevel, pkg+"invoking method "+meth) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index e9392019..f48e9e53 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -45,119 +45,94 @@ import ( ) const ( - pkg = "rtmp:" - minDataSize = 11 + pkg = "rtmp:" + minDataSize = 11 // ToDo: this should be the same as fullHeaderSize + signatureSize = 1536 + fullHeaderSize = 12 ) +// Link flags. const ( - RTMPT_OPEN = iota - RTMPT_SEND - RTMPT_IDLE - RTMPT_CLOSE + linkAuth = 0x0001 // using auth param + linkLive = 0x0002 // stream is live + linkSWF = 0x0004 // do SWF verification - not implemented + linkPlaylist = 0x0008 // send playlist before play - not implemented + linkBufx = 0x0010 // toggle stream on BufferEmpty msg - not implemented ) +// Protocol features. const ( - RTMP_READ_HEADER = 0x01 - RTMP_READ_RESUME = 0x02 - RTMP_READ_NO_IGNORE = 0x04 - RTMP_READ_GOTKF = 0x08 - RTMP_READ_GOTFLVK = 0x10 - RTMP_READ_SEEKING = 0x20 - RTMP_READ_COMPLETE = -3 - RTMP_READ_ERROR = -2 - RTMP_READ_EOF = -1 - RTMP_READ_IGNORE = 0 + featureHTTP = 0x01 // not implemented + featureEncode = 0x02 // not implemented + featureSSL = 0x04 // not implemented + featureMFP = 0x08 // not implemented + featureWrite = 0x10 // publish, not play + featureHTTP2 = 0x20 // server-side RTMPT - not implemented ) +// RTMP protocols. const ( - RTMP_LF_AUTH = 0x0001 /* using auth param */ - RTMP_LF_LIVE = 0x0002 /* stream is live */ - RTMP_LF_SWFV = 0x0004 /* do SWF verification */ - RTMP_LF_PLST = 0x0008 /* send playlist before play */ - RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */ - RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */ - RTMP_LF_FAPU = 0x0040 /* free app on close */ + protoRTMP = 0 + protoRTMPE = featureEncode + protoRTMPT = featureHTTP + protoRTMPS = featureSSL + protoRTMPTE = (featureHTTP | featureEncode) + protoRTMPTS = (featureHTTP | featureSSL) + protoRTMFP = featureMFP ) +// RTMP tokens (lexemes). +// NB: Underscores are deliberately preserved in const names where they exist in the corresponding tokens. const ( - RTMP_FEATURE_HTTP = 0x01 - RTMP_FEATURE_ENC = 0x02 - RTMP_FEATURE_SSL = 0x04 - RTMP_FEATURE_MFP = 0x08 /* not yet supported */ - RTMP_FEATURE_WRITE = 0x10 /* publish, not play */ - RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */ -) - -const ( - RTMP_PROTOCOL_RTMP = 0 - RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC - RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP - RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL - RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC) - RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL) - RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP -) - -const ( - RTMP_DEFAULT_CHUNKSIZE = 128 - RTMP_BUFFER_CACHE_SIZE = (16 * 1024) - RTMP_SIG_SIZE = 1536 - RTMP_LARGE_HEADER_SIZE = 12 - RTMP_MAX_HEADER_SIZE = RTMP_LARGE_HEADER_SIZE -) - -const ( - setDataFrame = "@setDataFrame" - - av__checkbw = "_checkbw" - av__onbwcheck = "_onbwcheck" - av__onbwdone = "_onbwdone" - av__result = "_result" - av_app = "app" - av_audioCodecs = "audioCodecs" - av_capabilities = "capabilities" - av_close = "close" - av_code = "code" - av_connect = "connect" - av_createStream = "createStream" - av_deleteStream = "deleteStream" - av_FCPublish = "FCPublish" - av_FCUnpublish = "FCUnpublish" - av_flashVer = "flashVer" - av_fpad = "fpad" - av_level = "level" - av_live = "live" - av_NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" - av_NetStream_Failed = "NetStream.Failed" - av_NetStream_Pause_Notify = "NetStream.Pause.Notify" - av_NetStream_Play_Complete = "NetStream.Play.Complete" - av_NetStream_Play_Failed = "NetStream.Play.Failed" - av_NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" - av_NetStream_Play_Start = "NetStream.Play.Start" - av_NetStream_Play_Stop = "NetStream.Play.Stop" - av_NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" - av_NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" - av_NetStream_Publish_Start = "NetStream.Publish.Start" - av_NetStream_Seek_Notify = "NetStream.Seek.Notify" - av_nonprivate = "nonprivate" - av_objectEncoding = "objectEncoding" - av_onBWDone = "onBWDone" - av_onFCSubscribe = "onFCSubscribe" - av_onFCUnsubscribe = "onFCUnsubscribe" - av_onStatus = "onStatus" - av_pageUrl = "pageUrl" - av_ping = "ping" - av_play = "play" - av_playlist_ready = "playlist_ready" - av_publish = "publish" - av_releaseStream = "releaseStream" - av_secureToken = "secureToken" - av_set_playlist = "set_playlist" - av_swfUrl = "swfUrl" - av_tcUrl = "tcUrl" - av_type = "type" - av_videoCodecs = "videoCodecs" - av_videoFunction = "videoFunction" + av_checkbw = "_checkbw" + av_onbwcheck = "_onbwcheck" + av_onbwdone = "_onbwdone" + av_result = "_result" + avApp = "app" + avAudioCodecs = "audioCodecs" + avCapabilities = "capabilities" + avClose = "close" + avCode = "code" + avConnect = "connect" + avCreatestream = "createStream" + avDeletestream = "deleteStream" + avFCPublish = "FCPublish" + avFCUnpublish = "FCUnpublish" + avFlashver = "flashVer" + avFpad = "fpad" + avLevel = "level" + avLive = "live" + avNetConnectionConnectInvalidApp = "NetConnection.Connect.InvalidApp" + avNetStreamFailed = "NetStream.Failed" + avNetStreamPauseNotify = "NetStream.Pause.Notify" + avNetStreamPlayComplete = "NetStream.Play.Complete" + avNetStreamPlayFailed = "NetStream.Play.Failed" + avNetStreamPlayPublishNotify = "NetStream.Play.PublishNotify" + avNetStreamPlayStart = "NetStream.Play.Start" + avNetStreamPlayStop = "NetStream.Play.Stop" + avNetStreamPlayStreamNotFound = "NetStream.Play.StreamNotFound" + avNetStreamPlayUnpublishNotify = "NetStream.Play.UnpublishNotify" + avNetStreamPublish_Start = "NetStream.Publish.Start" + avNetStreamSeekNotify = "NetStream.Seek.Notify" + avNonprivate = "nonprivate" + avObjectEncoding = "objectEncoding" + avOnBWDone = "onBWDone" + avOnFCSubscribe = "onFCSubscribe" + avOnFCUnsubscribe = "onFCUnsubscribe" + avOnStatus = "onStatus" + avPageUrl = "pageUrl" + avPing = "ping" + avPlay = "play" + avPlaylist_ready = "playlist_ready" + avPublish = "publish" + avReleasestream = "releaseStream" + avSecureToken = "secureToken" + avSet_playlist = "set_playlist" + avSwfUrl = "swfUrl" + avTcUrl = "tcUrl" + avType = "type" + avVideoCodecs = "videoCodecs" + avVideoFunction = "videoFunction" ) // RTMP protocol strings. @@ -200,7 +175,6 @@ func setupURL(s *Session) (err error) { if s.link.tcUrl == "" { if s.link.app != "" { s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app - s.link.lFlags |= RTMP_LF_FTCU } else { s.link.tcUrl = s.url } @@ -208,10 +182,10 @@ func setupURL(s *Session) (err error) { if s.link.port == 0 { switch { - case (s.link.protocol & RTMP_FEATURE_SSL) != 0: + case (s.link.protocol & featureSSL) != 0: s.link.port = 433 s.log(FatalLevel, pkg+"SSL not supported") - case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: + case (s.link.protocol & featureHTTP) != 0: s.link.port = 80 default: s.link.port = 1935 @@ -257,7 +231,7 @@ func connectStream(s *Session) error { } switch pkt.packetType { - case RTMP_PACKET_TYPE_AUDIO, RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_INFO: + case packetTypeAudio, packetTypeVideo, packetTypeInfo: s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) default: err = handlePacket(s, &pkt) @@ -277,21 +251,21 @@ func connectStream(s *Session) error { // NB: cases have been commented out that are not currently used by AusOcean func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { - case RTMP_PACKET_TYPE_CHUNK_SIZE: + case packetTypeChunkSize: if pkt.bodySize >= 4 { s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) } - case RTMP_PACKET_TYPE_BYTES_READ_REPORT: + case packetTypeBytesReadReport: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - case RTMP_PACKET_TYPE_CONTROL: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_CONTROL") + case packetTypeControl: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") - case RTMP_PACKET_TYPE_SERVER_BW: + case packetTypeServerBW: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - case RTMP_PACKET_TYPE_CLIENT_BW: + case packetTypeClientBW: s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] @@ -299,19 +273,19 @@ func handlePacket(s *Session, pkt *packet) error { s.clientBW2 = 0xff } - case RTMP_PACKET_TYPE_AUDIO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_AUDIO") + case packetTypeAudio: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio") - case RTMP_PACKET_TYPE_VIDEO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_VIDEO") + case packetTypeVideo: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo") - case RTMP_PACKET_TYPE_FLEX_MESSAGE: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLEX_MESSAGE") + case packetTypeFlexMessage: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage") - case RTMP_PACKET_TYPE_INFO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_INFO") + case packetTypeInfo: + s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo") - case RTMP_PACKET_TYPE_INVOKE: + case packetTypeInvoke: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. @@ -319,8 +293,8 @@ func handlePacket(s *Session, pkt *packet) error { return err } - case RTMP_PACKET_TYPE_FLASH_VIDEO: - s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLASH_VIDEO") + case packetTypeFlashVideo: + s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) @@ -331,15 +305,15 @@ func handlePacket(s *Session, pkt *packet) error { func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_connect) + enc = C_AMF_EncodeString(enc, avConnect) if enc == nil { return errEncoding } @@ -351,60 +325,60 @@ func sendConnectPacket(s *Session) error { enc[0] = AMF_OBJECT enc = enc[1:] - enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) + enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app) if enc == nil { return errEncoding } - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) + if s.link.protocol&featureWrite != 0 { + enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { - enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) + enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) + enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) + enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } - if s.link.protocol&RTMP_FEATURE_WRITE == 0 { - enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) + if s.link.protocol&featureWrite == 0 { + enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) + enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) + enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) + enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) + enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) + enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl) if enc == nil { return errEncoding } @@ -412,7 +386,7 @@ func sendConnectPacket(s *Session) error { } if s.encoding != 0.0 || s.sendEncoding { - enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) + enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding) if enc == nil { return errEncoding } @@ -423,7 +397,7 @@ func sendConnectPacket(s *Session) error { // add auth string if s.link.auth != "" { - enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) + enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0) if enc == nil { return errEncoding } @@ -440,7 +414,7 @@ func sendConnectPacket(s *Session) error { } } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, true) // response expected } @@ -448,15 +422,15 @@ func sendConnectPacket(s *Session) error { func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_createStream) + enc = C_AMF_EncodeString(enc, avCreatestream) if enc == nil { return errEncoding } @@ -468,7 +442,7 @@ func sendCreateStream(s *Session) error { enc[0] = AMF_NULL enc = enc[1:] - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, true) // response expected } @@ -476,15 +450,15 @@ func sendCreateStream(s *Session) error { func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_releaseStream) + enc = C_AMF_EncodeString(enc, avReleasestream) if enc == nil { return errEncoding } @@ -499,7 +473,7 @@ func sendReleaseStream(s *Session) error { if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } @@ -507,15 +481,15 @@ func sendReleaseStream(s *Session) error { func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_FCPublish) + enc = C_AMF_EncodeString(enc, avFCPublish) if enc == nil { return errEncoding } @@ -531,7 +505,7 @@ func sendFCPublish(s *Session) error { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } @@ -539,15 +513,15 @@ func sendFCPublish(s *Session) error { func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_FCUnpublish) + enc = C_AMF_EncodeString(enc, avFCUnpublish) if enc == nil { return errEncoding } @@ -563,7 +537,7 @@ func sendFCUnpublish(s *Session) error { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } @@ -571,15 +545,15 @@ func sendFCUnpublish(s *Session) error { func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: RTMP_CHANNEL_SOURCE, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanSource, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_publish) + enc = C_AMF_EncodeString(enc, avPublish) if enc == nil { return errEncoding } @@ -594,12 +568,12 @@ func sendPublish(s *Session) error { if enc == nil { return errEncoding } - enc = C_AMF_EncodeString(enc, av_live) + enc = C_AMF_EncodeString(enc, avLive) if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, true) // response expected } @@ -607,15 +581,15 @@ func sendPublish(s *Session) error { func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeMedium, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_deleteStream) + enc = C_AMF_EncodeString(enc, avDeletestream) if enc == nil { return errEncoding } @@ -630,7 +604,7 @@ func sendDeleteStream(s *Session, dStreamId float64) error { if enc == nil { return errEncoding } - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) /* no response expected */ return pkt.write(s, false) @@ -640,11 +614,11 @@ func sendDeleteStream(s *Session, dStreamId float64) error { func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_BYTES_READ, - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, + channel: chanBytesRead, + headerType: headerSizeMedium, + packetType: packetTypeBytesReadReport, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -661,15 +635,15 @@ func sendBytesReceived(s *Session) error { func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: RTMP_CHANNEL_CONTROL, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, + channel: chanControl, + headerType: headerSizeLarge, + packetType: packetTypeInvoke, header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + body: pbuf[fullHeaderSize:], } enc := pkt.body - enc = C_AMF_EncodeString(enc, av__checkbw) + enc = C_AMF_EncodeString(enc, av_checkbw) if enc == nil { return errEncoding } @@ -681,7 +655,7 @@ func sendCheckBW(s *Session) error { enc[0] = AMF_NULL enc = enc[1:] - pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) return pkt.write(s, false) } @@ -693,7 +667,7 @@ func eraseMethod(m []method, i int) []method { } // int handleInvoke handles a packet invoke request -// Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start +// Side effects: s.isPlaying set to true upon avNetStreamPublish_Start func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody @@ -709,7 +683,7 @@ func handleInvoke(s *Session, body []byte) error { txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) switch meth { - case av__result: + case av_result: var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { @@ -725,11 +699,11 @@ func handleInvoke(s *Session, body []byte) error { s.log(DebugLevel, pkg+"received result for "+methodInvoked) switch methodInvoked { - case av_connect: + case avConnect: if s.link.token != "" { s.log(FatalLevel, pkg+"no support for link token") } - if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { + if (s.link.protocol & featureWrite) == 0 { return errNotWritable } err := sendReleaseStream(s) @@ -745,9 +719,9 @@ func handleInvoke(s *Session, body []byte) error { return err } - case av_createStream: + case avCreatestream: s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) - if s.link.protocol&RTMP_FEATURE_WRITE == 0 { + if s.link.protocol&featureWrite == 0 { return errNotWritable } err := sendPublish(s) @@ -755,11 +729,11 @@ func handleInvoke(s *Session, body []byte) error { return err } - case av_play, av_publish: - s.log(FatalLevel, pkg+"unsupported method av_play/av_publish") + case avPlay, avPublish: + s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") } - case av_onBWDone: + case avOnBWDone: if s.checkCounter == 0 { // ToDo: why is this always zero? err := sendCheckBW(s) if err != nil { @@ -767,59 +741,59 @@ func handleInvoke(s *Session, body []byte) error { } } - case av_onFCUnsubscribe, av_onFCSubscribe: - s.log(FatalLevel, pkg+"unsupported method av_onFCUnsubscribe/av_onFCSubscribe") + case avOnFCUnsubscribe, avOnFCSubscribe: + s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe") - case av_ping: - s.log(FatalLevel, pkg+"unsupported method av_ping") + case avPing: + s.log(FatalLevel, pkg+"unsupported method avPing") - case av__onbwcheck: + case av_onbwcheck: s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") - case av__onbwdone: + case av_onbwdone: s.log(FatalLevel, pkg+"unsupported method av_onbwdone") - case av_close: - s.log(FatalLevel, pkg+"unsupported method av_close") + case avClose: + s.log(FatalLevel, pkg+"unsupported method avClose") - case av_onStatus: + case avOnStatus: var obj2 C_AMFObject C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) - code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_code, -1)) - level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_level, -1)) + code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1)) + level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1)) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { - case av_NetStream_Failed, av_NetStream_Play_Failed, - av_NetStream_Play_StreamNotFound, av_NetConnection_Connect_InvalidApp: - s.log(FatalLevel, pkg+"unsupported method av_NetStream/av_NetStream_Play_Failed/av_netSTream_Play_StreamNotFound/av_netConnection_Connect_invalidApp") + case avNetStreamFailed, avNetStreamPlayFailed, + avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: + s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") - case av_NetStream_Play_Start, av_NetStream_Play_PublishNotify: - s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") + case avNetStreamPlayStart, avNetStreamPlayPublishNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify") - case av_NetStream_Publish_Start: + case avNetStreamPublish_Start: s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { - if m.name == av_publish { + if m.name == avPublish { s.methodCalls = eraseMethod(s.methodCalls, i) break } } - // ToDo: handle case when av_publish method not found + // ToDo: handle case when avPublish method not found - case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: - s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") + case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify") - case av_NetStream_Seek_Notify: - s.log(FatalLevel, pkg+"unsupported method av_netStream_Seek_Notify") + case avNetStreamSeekNotify: + s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify") - case av_NetStream_Pause_Notify: - s.log(FatalLevel, pkg+"unsupported method av_NetStream_Pause_Notify") + case avNetStreamPauseNotify: + s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify") } - case av_playlist_ready: - s.log(FatalLevel, pkg+"unsupported method av_playlist_ready") + case avPlaylist_ready: + s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready") default: s.log(FatalLevel, pkg+"unknown method "+meth) @@ -830,15 +804,15 @@ leave: } func handshake(s *Session) error { - var clientbuf [RTMP_SIG_SIZE + 1]byte + var clientbuf [signatureSize + 1]byte clientsig := clientbuf[1:] - var serversig [RTMP_SIG_SIZE]byte - clientbuf[0] = RTMP_CHANNEL_CONTROL + var serversig [signatureSize]byte + clientbuf[0] = chanControl binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) - for i := 8; i < RTMP_SIG_SIZE; i++ { + for i := 8; i < signatureSize; i++ { clientsig[i] = byte(rand.Intn(256)) } @@ -878,8 +852,8 @@ func handshake(s *Session) error { return err } - if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { - s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:RTMP_SIG_SIZE], "clientsig", clientbuf[1:RTMP_SIG_SIZE+1]) + if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) { + s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1]) } return nil } diff --git a/rtmp/session.go b/rtmp/session.go index aa1ee80b..aa4c0ac2 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -80,7 +80,7 @@ type link struct { flashVer string token string extras C_AMFObject - lFlags int32 + flags int32 swfAge int32 protocol int32 timeout uint @@ -159,7 +159,7 @@ func (s *Session) Close() error { return errNotConnected } if s.streamID > 0 { - if s.link.protocol&RTMP_FEATURE_WRITE != 0 { + if s.link.protocol&featureWrite != 0 { sendFCUnpublish(s) } sendDeleteStream(s, float64(s.streamID)) @@ -174,7 +174,7 @@ func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { return 0, errNotConnected } - if data[0] == RTMP_PACKET_TYPE_INFO || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { + if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { return 0, errUnimplemented } if len(data) < minDataSize { @@ -185,11 +185,11 @@ func (s *Session) Write(data []byte) (int, error) { packetType: data[0], bodySize: C_AMF_DecodeInt24(data[1:4]), timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24, - channel: RTMP_CHANNEL_SOURCE, + channel: chanSource, info: s.streamID, } - pkt.resize(pkt.bodySize, RTMP_PACKET_SIZE_AUTO) + pkt.resize(pkt.bodySize, headerSizeAuto) copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize]) err := pkt.write(s, false) if err != nil { @@ -245,5 +245,5 @@ func (s *Session) isConnected() bool { // enableWrite enables the current session for writing. func (s *Session) enableWrite() { - s.link.protocol |= RTMP_FEATURE_WRITE + s.link.protocol |= featureWrite }