Idiomatic names for constants (1st pass).

This commit is contained in:
scruzin 2019-01-11 09:52:21 +10:30
parent 8a23609f6e
commit 5be5aad6cf
3 changed files with 258 additions and 282 deletions

View File

@ -39,34 +39,37 @@ import (
"io"
)
// Packet types.
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
packetTypeChunkSize = 0x01
packetTypeBytesReadReport = 0x03
packetTypeControl = 0x04
packetTypeServerBW = 0x05
packetTypeClientBW = 0x06
packetTypeAudio = 0x08
packetTypeVideo = 0x09
packetTypeFlexStreamSend = 0x0F // not implemented
packetTypeFlexSharedObject = 0x10 // not implemented
packetTypeFlexMessage = 0x11 // not implemented
packetTypeInfo = 0x12
packetTypeInvoke = 0x14
packetTypeFlashVideo = 0x16 // not implemented
)
// Header sizes.
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
RTMP_PACKET_SIZE_AUTO = 4
headerSizeLarge = 0
headerSizeMedium = 1
headerSizeSmall = 2
headerSizeMinimum = 3
headerSizeAuto = 4
)
// Special channels.
const (
RTMP_CHANNEL_BYTES_READ = 0x02
RTMP_CHANNEL_CONTROL = 0x03
RTMP_CHANNEL_SOURCE = 0x04
chanBytesRead = 0x02
chanControl = 0x03
chanSource = 0x04
)
// headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively:
@ -95,13 +98,12 @@ type packet struct {
type chunk struct {
headerSize int32
data []byte
header [RTMP_MAX_HEADER_SIZE]byte
header [fullHeaderSize]byte
}
// ToDo: Consider making the following functions into methods.
// read reads a packet.
func (pkt *packet) read(s *Session) error {
var hbuf [RTMP_MAX_HEADER_SIZE]byte
var hbuf [fullHeaderSize]byte
header := hbuf[:]
err := s.read(header[:1])
@ -161,9 +163,9 @@ func (pkt *packet) read(s *Session) error {
size := headerSizes[pkt.headerType]
switch {
case size == RTMP_LARGE_HEADER_SIZE:
case size == fullHeaderSize:
pkt.hasAbsTimestamp = true
case size < RTMP_LARGE_HEADER_SIZE:
case size < fullHeaderSize:
if s.channelsIn[pkt.channel] != nil {
*pkt = *(s.channelsIn[pkt.channel])
}
@ -219,6 +221,7 @@ func (pkt *packet) read(s *Session) error {
}
if pkt.chunk != nil {
panic("non-nil chunk")
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
@ -260,25 +263,25 @@ func (pkt *packet) read(s *Session) error {
// resize adjusts the packet's storage to accommodate a body of the given size.
func (pkt *packet) resize(size uint32, ht uint8) {
buf := make([]byte, RTMP_MAX_HEADER_SIZE+size)
buf := make([]byte, fullHeaderSize+size)
pkt.header = buf
pkt.body = buf[RTMP_MAX_HEADER_SIZE:]
if ht != RTMP_PACKET_SIZE_AUTO {
pkt.body = buf[fullHeaderSize:]
if ht != headerSizeAuto {
pkt.headerType = ht
return
}
switch pkt.packetType {
case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO:
case packetTypeVideo, packetTypeAudio:
if pkt.timestamp == 0 {
pkt.headerType = RTMP_PACKET_SIZE_LARGE
pkt.headerType = headerSizeLarge
} else {
pkt.headerType = RTMP_PACKET_SIZE_MEDIUM
pkt.headerType = headerSizeMedium
}
case RTMP_PACKET_TYPE_INFO:
pkt.headerType = RTMP_PACKET_SIZE_LARGE
case packetTypeInfo:
pkt.headerType = headerSizeLarge
pkt.bodySize += 16
default:
pkt.headerType = RTMP_PACKET_SIZE_MEDIUM
pkt.headerType = headerSizeMedium
}
}
@ -309,14 +312,14 @@ func (pkt *packet) write(s *Session, queue bool) error {
prevPkt := s.channelsOut[pkt.channel]
var last int
if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE {
if prevPkt != nil && pkt.headerType != headerSizeLarge {
// compress a bit by using the prev packet's attributes
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM {
pkt.headerType = RTMP_PACKET_SIZE_SMALL
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium {
pkt.headerType = headerSizeSmall
}
if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL {
pkt.headerType = RTMP_PACKET_SIZE_MINIMUM
if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall {
pkt.headerType = headerSizeMinimum
}
last = int(prevPkt.timestamp)
@ -331,7 +334,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
// origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header.
headBytes := pkt.header
hSize := headerSizes[pkt.headerType]
origIdx := RTMP_MAX_HEADER_SIZE - hSize
origIdx := fullHeaderSize - hSize
// adjust 1 or 2 bytes for the channel
cSize := 0
@ -414,7 +417,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
if s.deferred == nil {
// Defer sending small audio packets (at most once).
if pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize {
if pkt.packetType == packetTypeAudio && size < chunkSize {
s.deferred = headBytes[origIdx:][:size+hSize]
s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr())
return nil
@ -443,7 +446,6 @@ func (pkt *packet) write(s *Session, queue bool) error {
// Prepend the previously deferred packet and write it with the current one.
s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred))
bytes = append(s.deferred, bytes...)
s.deferred = nil
}
err := s.write(bytes)
if err != nil {
@ -482,7 +484,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
}
// We invoked a remote method
if pkt.packetType == RTMP_PACKET_TYPE_INVOKE {
if pkt.packetType == packetTypeInvoke {
buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth)

View File

@ -46,118 +46,93 @@ import (
const (
pkg = "rtmp:"
minDataSize = 11
minDataSize = 11 // ToDo: this should be the same as fullHeaderSize
signatureSize = 1536
fullHeaderSize = 12
)
// Link flags.
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
linkAuth = 0x0001 // using auth param
linkLive = 0x0002 // stream is live
linkSWF = 0x0004 // do SWF verification - not implemented
linkPlaylist = 0x0008 // send playlist before play - not implemented
linkBufx = 0x0010 // toggle stream on BufferEmpty msg - not implemented
)
// Protocol features.
const (
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
featureHTTP = 0x01 // not implemented
featureEncode = 0x02 // not implemented
featureSSL = 0x04 // not implemented
featureMFP = 0x08 // not implemented
featureWrite = 0x10 // publish, not play
featureHTTP2 = 0x20 // server-side RTMPT - not implemented
)
// RTMP protocols.
const (
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
protoRTMP = 0
protoRTMPE = featureEncode
protoRTMPT = featureHTTP
protoRTMPS = featureSSL
protoRTMPTE = (featureHTTP | featureEncode)
protoRTMPTS = (featureHTTP | featureSSL)
protoRTMFP = featureMFP
)
// RTMP tokens (lexemes).
// NB: Underscores are deliberately preserved in const names where they exist in the corresponding tokens.
const (
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
)
const (
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
)
const (
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_SIG_SIZE = 1536
RTMP_LARGE_HEADER_SIZE = 12
RTMP_MAX_HEADER_SIZE = RTMP_LARGE_HEADER_SIZE
)
const (
setDataFrame = "@setDataFrame"
av__checkbw = "_checkbw"
av__onbwcheck = "_onbwcheck"
av__onbwdone = "_onbwdone"
av__result = "_result"
av_app = "app"
av_audioCodecs = "audioCodecs"
av_capabilities = "capabilities"
av_close = "close"
av_code = "code"
av_connect = "connect"
av_createStream = "createStream"
av_deleteStream = "deleteStream"
av_FCPublish = "FCPublish"
av_FCUnpublish = "FCUnpublish"
av_flashVer = "flashVer"
av_fpad = "fpad"
av_level = "level"
av_live = "live"
av_NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp"
av_NetStream_Failed = "NetStream.Failed"
av_NetStream_Pause_Notify = "NetStream.Pause.Notify"
av_NetStream_Play_Complete = "NetStream.Play.Complete"
av_NetStream_Play_Failed = "NetStream.Play.Failed"
av_NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify"
av_NetStream_Play_Start = "NetStream.Play.Start"
av_NetStream_Play_Stop = "NetStream.Play.Stop"
av_NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound"
av_NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify"
av_NetStream_Publish_Start = "NetStream.Publish.Start"
av_NetStream_Seek_Notify = "NetStream.Seek.Notify"
av_nonprivate = "nonprivate"
av_objectEncoding = "objectEncoding"
av_onBWDone = "onBWDone"
av_onFCSubscribe = "onFCSubscribe"
av_onFCUnsubscribe = "onFCUnsubscribe"
av_onStatus = "onStatus"
av_pageUrl = "pageUrl"
av_ping = "ping"
av_play = "play"
av_playlist_ready = "playlist_ready"
av_publish = "publish"
av_releaseStream = "releaseStream"
av_secureToken = "secureToken"
av_set_playlist = "set_playlist"
av_swfUrl = "swfUrl"
av_tcUrl = "tcUrl"
av_type = "type"
av_videoCodecs = "videoCodecs"
av_videoFunction = "videoFunction"
av_checkbw = "_checkbw"
av_onbwcheck = "_onbwcheck"
av_onbwdone = "_onbwdone"
av_result = "_result"
avApp = "app"
avAudioCodecs = "audioCodecs"
avCapabilities = "capabilities"
avClose = "close"
avCode = "code"
avConnect = "connect"
avCreatestream = "createStream"
avDeletestream = "deleteStream"
avFCPublish = "FCPublish"
avFCUnpublish = "FCUnpublish"
avFlashver = "flashVer"
avFpad = "fpad"
avLevel = "level"
avLive = "live"
avNetConnectionConnectInvalidApp = "NetConnection.Connect.InvalidApp"
avNetStreamFailed = "NetStream.Failed"
avNetStreamPauseNotify = "NetStream.Pause.Notify"
avNetStreamPlayComplete = "NetStream.Play.Complete"
avNetStreamPlayFailed = "NetStream.Play.Failed"
avNetStreamPlayPublishNotify = "NetStream.Play.PublishNotify"
avNetStreamPlayStart = "NetStream.Play.Start"
avNetStreamPlayStop = "NetStream.Play.Stop"
avNetStreamPlayStreamNotFound = "NetStream.Play.StreamNotFound"
avNetStreamPlayUnpublishNotify = "NetStream.Play.UnpublishNotify"
avNetStreamPublish_Start = "NetStream.Publish.Start"
avNetStreamSeekNotify = "NetStream.Seek.Notify"
avNonprivate = "nonprivate"
avObjectEncoding = "objectEncoding"
avOnBWDone = "onBWDone"
avOnFCSubscribe = "onFCSubscribe"
avOnFCUnsubscribe = "onFCUnsubscribe"
avOnStatus = "onStatus"
avPageUrl = "pageUrl"
avPing = "ping"
avPlay = "play"
avPlaylist_ready = "playlist_ready"
avPublish = "publish"
avReleasestream = "releaseStream"
avSecureToken = "secureToken"
avSet_playlist = "set_playlist"
avSwfUrl = "swfUrl"
avTcUrl = "tcUrl"
avType = "type"
avVideoCodecs = "videoCodecs"
avVideoFunction = "videoFunction"
)
// RTMP protocol strings.
@ -200,7 +175,6 @@ func setupURL(s *Session) (err error) {
if s.link.tcUrl == "" {
if s.link.app != "" {
s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app
s.link.lFlags |= RTMP_LF_FTCU
} else {
s.link.tcUrl = s.url
}
@ -208,10 +182,10 @@ func setupURL(s *Session) (err error) {
if s.link.port == 0 {
switch {
case (s.link.protocol & RTMP_FEATURE_SSL) != 0:
case (s.link.protocol & featureSSL) != 0:
s.link.port = 433
s.log(FatalLevel, pkg+"SSL not supported")
case (s.link.protocol & RTMP_FEATURE_HTTP) != 0:
case (s.link.protocol & featureHTTP) != 0:
s.link.port = 80
default:
s.link.port = 1935
@ -257,7 +231,7 @@ func connectStream(s *Session) error {
}
switch pkt.packetType {
case RTMP_PACKET_TYPE_AUDIO, RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_INFO:
case packetTypeAudio, packetTypeVideo, packetTypeInfo:
s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType)
default:
err = handlePacket(s, &pkt)
@ -277,21 +251,21 @@ func connectStream(s *Session) error {
// NB: cases have been commented out that are not currently used by AusOcean
func handlePacket(s *Session, pkt *packet) error {
switch pkt.packetType {
case RTMP_PACKET_TYPE_CHUNK_SIZE:
case packetTypeChunkSize:
if pkt.bodySize >= 4 {
s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4]))
}
case RTMP_PACKET_TYPE_BYTES_READ_REPORT:
case packetTypeBytesReadReport:
s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4]))
case RTMP_PACKET_TYPE_CONTROL:
s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_CONTROL")
case packetTypeControl:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl")
case RTMP_PACKET_TYPE_SERVER_BW:
case packetTypeServerBW:
s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4]))
case RTMP_PACKET_TYPE_CLIENT_BW:
case packetTypeClientBW:
s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4]))
if pkt.bodySize > 4 {
s.clientBW2 = pkt.body[4]
@ -299,19 +273,19 @@ func handlePacket(s *Session, pkt *packet) error {
s.clientBW2 = 0xff
}
case RTMP_PACKET_TYPE_AUDIO:
s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_AUDIO")
case packetTypeAudio:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio")
case RTMP_PACKET_TYPE_VIDEO:
s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_VIDEO")
case packetTypeVideo:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo")
case RTMP_PACKET_TYPE_FLEX_MESSAGE:
s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLEX_MESSAGE")
case packetTypeFlexMessage:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage")
case RTMP_PACKET_TYPE_INFO:
s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_INFO")
case packetTypeInfo:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo")
case RTMP_PACKET_TYPE_INVOKE:
case packetTypeInvoke:
err := handleInvoke(s, pkt.body[:pkt.bodySize])
if err != nil {
// This will never happen with the methods we implement.
@ -319,8 +293,8 @@ func handlePacket(s *Session, pkt *packet) error {
return err
}
case RTMP_PACKET_TYPE_FLASH_VIDEO:
s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLASH_VIDEO")
case packetTypeFlashVideo:
s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo")
default:
s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType)
@ -331,15 +305,15 @@ func handlePacket(s *Session, pkt *packet) error {
func sendConnectPacket(s *Session) error {
var pbuf [4096]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_LARGE,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeLarge,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_connect)
enc = C_AMF_EncodeString(enc, avConnect)
if enc == nil {
return errEncoding
}
@ -351,60 +325,60 @@ func sendConnectPacket(s *Session) error {
enc[0] = AMF_OBJECT
enc = enc[1:]
enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app)
enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app)
if enc == nil {
return errEncoding
}
if s.link.protocol&RTMP_FEATURE_WRITE != 0 {
enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate)
if s.link.protocol&featureWrite != 0 {
enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate)
if enc == nil {
return errEncoding
}
}
if s.link.flashVer != "" {
enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer)
enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer)
if enc == nil {
return errEncoding
}
}
if s.link.swfUrl != "" {
enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl)
enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl)
if enc == nil {
return errEncoding
}
}
if s.link.tcUrl != "" {
enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl)
enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl)
if enc == nil {
return errEncoding
}
}
if s.link.protocol&RTMP_FEATURE_WRITE == 0 {
enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false)
if s.link.protocol&featureWrite == 0 {
enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15)
enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs)
enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs)
enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1)
enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1)
if enc == nil {
return errEncoding
}
if s.link.pageUrl != "" {
enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl)
enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl)
if enc == nil {
return errEncoding
}
@ -412,7 +386,7 @@ func sendConnectPacket(s *Session) error {
}
if s.encoding != 0.0 || s.sendEncoding {
enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding)
enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding)
if enc == nil {
return errEncoding
}
@ -423,7 +397,7 @@ func sendConnectPacket(s *Session) error {
// add auth string
if s.link.auth != "" {
enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0)
enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0)
if enc == nil {
return errEncoding
}
@ -440,7 +414,7 @@ func sendConnectPacket(s *Session) error {
}
}
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected
}
@ -448,15 +422,15 @@ func sendConnectPacket(s *Session) error {
func sendCreateStream(s *Session) error {
var pbuf [256]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeMedium,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_createStream)
enc = C_AMF_EncodeString(enc, avCreatestream)
if enc == nil {
return errEncoding
}
@ -468,7 +442,7 @@ func sendCreateStream(s *Session) error {
enc[0] = AMF_NULL
enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected
}
@ -476,15 +450,15 @@ func sendCreateStream(s *Session) error {
func sendReleaseStream(s *Session) error {
var pbuf [1024]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeMedium,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_releaseStream)
enc = C_AMF_EncodeString(enc, avReleasestream)
if enc == nil {
return errEncoding
}
@ -499,7 +473,7 @@ func sendReleaseStream(s *Session) error {
if enc == nil {
return errEncoding
}
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
}
@ -507,15 +481,15 @@ func sendReleaseStream(s *Session) error {
func sendFCPublish(s *Session) error {
var pbuf [1024]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeMedium,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_FCPublish)
enc = C_AMF_EncodeString(enc, avFCPublish)
if enc == nil {
return errEncoding
}
@ -531,7 +505,7 @@ func sendFCPublish(s *Session) error {
return errEncoding
}
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
}
@ -539,15 +513,15 @@ func sendFCPublish(s *Session) error {
func sendFCUnpublish(s *Session) error {
var pbuf [1024]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeMedium,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_FCUnpublish)
enc = C_AMF_EncodeString(enc, avFCUnpublish)
if enc == nil {
return errEncoding
}
@ -563,7 +537,7 @@ func sendFCUnpublish(s *Session) error {
return errEncoding
}
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
}
@ -571,15 +545,15 @@ func sendFCUnpublish(s *Session) error {
func sendPublish(s *Session) error {
var pbuf [1024]byte
pkt := packet{
channel: RTMP_CHANNEL_SOURCE,
headerType: RTMP_PACKET_SIZE_LARGE,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanSource,
headerType: headerSizeLarge,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_publish)
enc = C_AMF_EncodeString(enc, avPublish)
if enc == nil {
return errEncoding
}
@ -594,12 +568,12 @@ func sendPublish(s *Session) error {
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeString(enc, av_live)
enc = C_AMF_EncodeString(enc, avLive)
if enc == nil {
return errEncoding
}
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected
}
@ -607,15 +581,15 @@ func sendPublish(s *Session) error {
func sendDeleteStream(s *Session, dStreamId float64) error {
var pbuf [256]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeMedium,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_deleteStream)
enc = C_AMF_EncodeString(enc, avDeletestream)
if enc == nil {
return errEncoding
}
@ -630,7 +604,7 @@ func sendDeleteStream(s *Session, dStreamId float64) error {
if enc == nil {
return errEncoding
}
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
/* no response expected */
return pkt.write(s, false)
@ -640,11 +614,11 @@ func sendDeleteStream(s *Session, dStreamId float64) error {
func sendBytesReceived(s *Session) error {
var pbuf [256]byte
pkt := packet{
channel: RTMP_CHANNEL_BYTES_READ,
headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT,
channel: chanBytesRead,
headerType: headerSizeMedium,
packetType: packetTypeBytesReadReport,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
@ -661,15 +635,15 @@ func sendBytesReceived(s *Session) error {
func sendCheckBW(s *Session) error {
var pbuf [256]byte
pkt := packet{
channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_LARGE,
packetType: RTMP_PACKET_TYPE_INVOKE,
channel: chanControl,
headerType: headerSizeLarge,
packetType: packetTypeInvoke,
header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:],
body: pbuf[fullHeaderSize:],
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av__checkbw)
enc = C_AMF_EncodeString(enc, av_checkbw)
if enc == nil {
return errEncoding
}
@ -681,7 +655,7 @@ func sendCheckBW(s *Session) error {
enc[0] = AMF_NULL
enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
}
@ -693,7 +667,7 @@ func eraseMethod(m []method, i int) []method {
}
// int handleInvoke handles a packet invoke request
// Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start
// Side effects: s.isPlaying set to true upon avNetStreamPublish_Start
func handleInvoke(s *Session, body []byte) error {
if body[0] != 0x02 {
return errInvalidBody
@ -709,7 +683,7 @@ func handleInvoke(s *Session, body []byte) error {
txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1))
switch meth {
case av__result:
case av_result:
var methodInvoked string
for i, m := range s.methodCalls {
if float64(m.num) == txn {
@ -725,11 +699,11 @@ func handleInvoke(s *Session, body []byte) error {
s.log(DebugLevel, pkg+"received result for "+methodInvoked)
switch methodInvoked {
case av_connect:
case avConnect:
if s.link.token != "" {
s.log(FatalLevel, pkg+"no support for link token")
}
if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 {
if (s.link.protocol & featureWrite) == 0 {
return errNotWritable
}
err := sendReleaseStream(s)
@ -745,9 +719,9 @@ func handleInvoke(s *Session, body []byte) error {
return err
}
case av_createStream:
case avCreatestream:
s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3)))
if s.link.protocol&RTMP_FEATURE_WRITE == 0 {
if s.link.protocol&featureWrite == 0 {
return errNotWritable
}
err := sendPublish(s)
@ -755,11 +729,11 @@ func handleInvoke(s *Session, body []byte) error {
return err
}
case av_play, av_publish:
s.log(FatalLevel, pkg+"unsupported method av_play/av_publish")
case avPlay, avPublish:
s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish")
}
case av_onBWDone:
case avOnBWDone:
if s.checkCounter == 0 { // ToDo: why is this always zero?
err := sendCheckBW(s)
if err != nil {
@ -767,59 +741,59 @@ func handleInvoke(s *Session, body []byte) error {
}
}
case av_onFCUnsubscribe, av_onFCSubscribe:
s.log(FatalLevel, pkg+"unsupported method av_onFCUnsubscribe/av_onFCSubscribe")
case avOnFCUnsubscribe, avOnFCSubscribe:
s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe")
case av_ping:
s.log(FatalLevel, pkg+"unsupported method av_ping")
case avPing:
s.log(FatalLevel, pkg+"unsupported method avPing")
case av__onbwcheck:
case av_onbwcheck:
s.log(FatalLevel, pkg+"unsupported method av_onbwcheck")
case av__onbwdone:
case av_onbwdone:
s.log(FatalLevel, pkg+"unsupported method av_onbwdone")
case av_close:
s.log(FatalLevel, pkg+"unsupported method av_close")
case avClose:
s.log(FatalLevel, pkg+"unsupported method avClose")
case av_onStatus:
case avOnStatus:
var obj2 C_AMFObject
C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2)
code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_code, -1))
level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_level, -1))
code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1))
level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1))
s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level)
switch code {
case av_NetStream_Failed, av_NetStream_Play_Failed,
av_NetStream_Play_StreamNotFound, av_NetConnection_Connect_InvalidApp:
s.log(FatalLevel, pkg+"unsupported method av_NetStream/av_NetStream_Play_Failed/av_netSTream_Play_StreamNotFound/av_netConnection_Connect_invalidApp")
case avNetStreamFailed, avNetStreamPlayFailed,
avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp:
s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp")
case av_NetStream_Play_Start, av_NetStream_Play_PublishNotify:
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify")
case avNetStreamPlayStart, avNetStreamPlayPublishNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify")
case av_NetStream_Publish_Start:
case avNetStreamPublish_Start:
s.log(DebugLevel, pkg+"playing")
s.isPlaying = true
for i, m := range s.methodCalls {
if m.name == av_publish {
if m.name == avPublish {
s.methodCalls = eraseMethod(s.methodCalls, i)
break
}
}
// ToDo: handle case when av_publish method not found
// ToDo: handle case when avPublish method not found
case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify:
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify")
case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify")
case av_NetStream_Seek_Notify:
s.log(FatalLevel, pkg+"unsupported method av_netStream_Seek_Notify")
case avNetStreamSeekNotify:
s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify")
case av_NetStream_Pause_Notify:
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Pause_Notify")
case avNetStreamPauseNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify")
}
case av_playlist_ready:
s.log(FatalLevel, pkg+"unsupported method av_playlist_ready")
case avPlaylist_ready:
s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready")
default:
s.log(FatalLevel, pkg+"unknown method "+meth)
@ -830,15 +804,15 @@ leave:
}
func handshake(s *Session) error {
var clientbuf [RTMP_SIG_SIZE + 1]byte
var clientbuf [signatureSize + 1]byte
clientsig := clientbuf[1:]
var serversig [RTMP_SIG_SIZE]byte
clientbuf[0] = RTMP_CHANNEL_CONTROL
var serversig [signatureSize]byte
clientbuf[0] = chanControl
binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000))
copy(clientsig[4:8], []byte{0, 0, 0, 0})
for i := 8; i < RTMP_SIG_SIZE; i++ {
for i := 8; i < signatureSize; i++ {
clientsig[i] = byte(rand.Intn(256))
}
@ -878,8 +852,8 @@ func handshake(s *Session) error {
return err
}
if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) {
s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:RTMP_SIG_SIZE], "clientsig", clientbuf[1:RTMP_SIG_SIZE+1])
if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) {
s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1])
}
return nil
}

View File

@ -80,7 +80,7 @@ type link struct {
flashVer string
token string
extras C_AMFObject
lFlags int32
flags int32
swfAge int32
protocol int32
timeout uint
@ -159,7 +159,7 @@ func (s *Session) Close() error {
return errNotConnected
}
if s.streamID > 0 {
if s.link.protocol&RTMP_FEATURE_WRITE != 0 {
if s.link.protocol&featureWrite != 0 {
sendFCUnpublish(s)
}
sendDeleteStream(s, float64(s.streamID))
@ -174,7 +174,7 @@ func (s *Session) Write(data []byte) (int, error) {
if !s.isConnected() {
return 0, errNotConnected
}
if data[0] == RTMP_PACKET_TYPE_INFO || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented
}
if len(data) < minDataSize {
@ -185,11 +185,11 @@ func (s *Session) Write(data []byte) (int, error) {
packetType: data[0],
bodySize: C_AMF_DecodeInt24(data[1:4]),
timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: RTMP_CHANNEL_SOURCE,
channel: chanSource,
info: s.streamID,
}
pkt.resize(pkt.bodySize, RTMP_PACKET_SIZE_AUTO)
pkt.resize(pkt.bodySize, headerSizeAuto)
copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize])
err := pkt.write(s, false)
if err != nil {
@ -245,5 +245,5 @@ func (s *Session) isConnected() bool {
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= RTMP_FEATURE_WRITE
s.link.protocol |= featureWrite
}