diff --git a/rtmp/amf.go b/rtmp/amf/amf.go similarity index 52% rename from rtmp/amf.go rename to rtmp/amf/amf.go index c0212c5f..18199cf3 100644 --- a/rtmp/amf.go +++ b/rtmp/amf/amf.go @@ -33,42 +33,51 @@ LICENSE Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ -package rtmp + +// amf implements Action Message Format (AMF) encoding and decoding. +// See https://en.wikipedia.org/wiki/Action_Message_Format. +package amf import ( "encoding/binary" "math" ) +// AMF data types. const ( - amfNumber = iota - amfBoolean - amfString - amfObject - amfMovieClip // reserved, not implemented - amfNull - amfUndefined - amfReference - amfEcmaArray - amfObjectEnd - amfStrictArray - amfDate - amfLongSring - amfUnsupported - amfRecordset // reserved, not implemented - amfXmlDoc - amfTypedObject - amfAvmplus // reserved, not implemented - amfInvalid = 0xff + Number = iota + Boolean + String + Object + MovieClip // reserved, not implemented + Null + Undefined + Reference + EcmaArray + ObjectEnd + StrictArray + Date + LongSring + Unsupported + Recordset // reserved, not implemented + XmlDoc + TypedObject + Avmplus // reserved, not implemented + Invalid = 0xff ) +// DataType represents an AMF data type, which is simply an index into the above. +type DataType int32 + +// AMF represents an AMF message, which is simply a collection of properties. type AMF struct { - props []AMFProperty + Props []Property // ToDo: consider not exporting this } -type AMFProperty struct { +// Property represents an AMF property. +type Property struct { name string - atype AMFDataType + atype DataType vu vu UTCoffset int16 } @@ -79,45 +88,43 @@ type vu struct { obj AMF } -type AMFDataType int32 - var ( - amfObjInvalid AMF - amfPropInvalid = AMFProperty{atype: amfInvalid} + ObjInvalid AMF + PropInvalid = Property{atype: Invalid} ) -func amfDecodeInt16(data []byte) uint16 { +func DecodeInt16(data []byte) uint16 { return uint16(binary.BigEndian.Uint16(data)) } -func amfDecodeInt24(data []byte) uint32 { +func DecodeInt24(data []byte) uint32 { return uint32(data[0])<<16 | uint32(data[1])<<8 | uint32(data[2]) // return uint16(data[0])<<8 | uint16(data[1]) } -func amfDecodeInt32(data []byte) uint32 { +func DecodeInt32(data []byte) uint32 { return uint32(binary.BigEndian.Uint32(data)) } -func amfDecodeString(data []byte) string { - n := amfDecodeInt16(data) +func DecodeString(data []byte) string { + n := DecodeInt16(data) return string(data[2 : 2+n]) } -func amfDecodeLongString(data []byte) string { - n := amfDecodeInt32(data) +func DecodeLongString(data []byte) string { + n := DecodeInt32(data) return string(data[2 : 2+n]) } -func amfDecodeNumber(data []byte) float64 { +func DecodeNumber(data []byte) float64 { return math.Float64frombits(binary.BigEndian.Uint64(data)) } -func amfDecodeBoolean(data []byte) bool { +func DecodeBoolean(data []byte) bool { return data[0] != 0 } -func amfEncodeInt24(dst []byte, val int32) []byte { +func EncodeInt24(dst []byte, val int32) []byte { if len(dst) < 3 { return nil } @@ -130,7 +137,7 @@ func amfEncodeInt24(dst []byte, val int32) []byte { return dst[3:] } -func amfEncodeInt32(dst []byte, val int32) []byte { +func EncodeInt32(dst []byte, val int32) []byte { if len(dst) < 4 { return nil } @@ -141,7 +148,7 @@ func amfEncodeInt32(dst []byte, val int32) []byte { return dst[4:] } -func amfEncodeString(dst []byte, val string) []byte { +func EncodeString(dst []byte, val string) []byte { const typeSize = 1 if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(dst) { return nil @@ -151,7 +158,7 @@ func amfEncodeString(dst []byte, val string) []byte { } if len(val) < 65536 { - dst[0] = amfString + dst[0] = String dst = dst[1:] binary.BigEndian.PutUint16(dst[:2], uint16(len(val))) dst = dst[2:] @@ -161,7 +168,7 @@ func amfEncodeString(dst []byte, val string) []byte { } return dst[len(val):] } - dst[0] = amfLongSring + dst[0] = LongSring dst = dst[1:] binary.BigEndian.PutUint32(dst[:4], uint32(len(val))) dst = dst[4:] @@ -172,21 +179,21 @@ func amfEncodeString(dst []byte, val string) []byte { return dst[len(val):] } -func amfEncodeNumber(dst []byte, val float64) []byte { +func EncodeNumber(dst []byte, val float64) []byte { if len(dst) < 9 { return nil } - dst[0] = amfNumber + dst[0] = Number dst = dst[1:] binary.BigEndian.PutUint64(dst, math.Float64bits(val)) return dst[8:] } -func amfEncodeBoolean(dst []byte, val bool) []byte { +func EncodeBoolean(dst []byte, val bool) []byte { if len(dst) < 2 { return nil } - dst[0] = amfBoolean + dst[0] = Boolean if val { dst[1] = 1 } @@ -197,7 +204,7 @@ func amfEncodeBoolean(dst []byte, val bool) []byte { } -func amfEncodeNamedString(dst []byte, key, val string) []byte { +func EncodeNamedString(dst []byte, key, val string) []byte { if 2+len(key) > len(dst) { return nil } @@ -207,10 +214,10 @@ func amfEncodeNamedString(dst []byte, key, val string) []byte { if len(key) == len(dst) { return nil } - return amfEncodeString(dst[len(key):], val) + return EncodeString(dst[len(key):], val) } -func amfEncodeNamedNumber(dst []byte, key string, val float64) []byte { +func EncodeNamedNumber(dst []byte, key string, val float64) []byte { if 2+len(key) > len(dst) { return nil } @@ -220,10 +227,10 @@ func amfEncodeNamedNumber(dst []byte, key string, val float64) []byte { if len(key) == len(dst) { return nil } - return amfEncodeNumber(dst[len(key):], val) + return EncodeNumber(dst[len(key):], val) } -func amfEncodeNamedBoolean(dst []byte, key string, val bool) []byte { +func EncodeNamedBoolean(dst []byte, key string, val bool) []byte { if 2+len(key) > len(dst) { return nil } @@ -233,42 +240,42 @@ func amfEncodeNamedBoolean(dst []byte, key string, val bool) []byte { if len(key) == len(dst) { return nil } - return amfEncodeBoolean(dst[len(key):], val) + return EncodeBoolean(dst[len(key):], val) } -func amfPropSetName(prop *AMFProperty, name string) { +func PropSetName(prop *Property, name string) { prop.name = name } -func amfPropGetNumber(prop *AMFProperty) float64 { +func PropGetNumber(prop *Property) float64 { return prop.vu.number } -func amfPropGetString(prop *AMFProperty) string { - if prop.atype == amfString { +func PropGetString(prop *Property) string { + if prop.atype == String { return prop.vu.aval } return "" } -func amfPropGetObject(prop *AMFProperty, a *AMF) { - if prop.atype == amfObject { +func PropGetObject(prop *Property, a *AMF) { + if prop.atype == Object { *a = prop.vu.obj } else { - *a = amfObjInvalid + *a = ObjInvalid } } -func amfPropEncode(p *AMFProperty, dst []byte) []byte { - if p.atype == amfInvalid { +func PropEncode(p *Property, dst []byte) []byte { + if p.atype == Invalid { return nil } - if p.atype != amfNull && len(p.name)+2+1 >= len(dst) { + if p.atype != Null && len(p.name)+2+1 >= len(dst) { return nil } - if p.atype != amfNull && len(p.name) != 0 { + if p.atype != Null && len(p.name) != 0 { binary.BigEndian.PutUint16(dst[:2], uint16(len(p.name))) dst = dst[2:] copy(dst, p.name) @@ -276,32 +283,32 @@ func amfPropEncode(p *AMFProperty, dst []byte) []byte { } switch p.atype { - case amfNumber: - dst = amfEncodeNumber(dst, p.vu.number) - case amfBoolean: - dst = amfEncodeBoolean(dst, p.vu.number != 0) - case amfString: - dst = amfEncodeString(dst, p.vu.aval) - case amfNull: + case Number: + dst = EncodeNumber(dst, p.vu.number) + case Boolean: + dst = EncodeBoolean(dst, p.vu.number != 0) + case String: + dst = EncodeString(dst, p.vu.aval) + case Null: if len(dst) < 2 { return nil } - dst[0] = amfNull + dst[0] = Null dst = dst[1:] - case amfObject: - dst = amfEncode(&p.vu.obj, dst) - case amfEcmaArray: - dst = amfEncodeEcmaArray(&p.vu.obj, dst) - case amfStrictArray: - dst = amfEncodeArray(&p.vu.obj, dst) + case Object: + dst = Encode(&p.vu.obj, dst) + case EcmaArray: + dst = EncodeEcmaArray(&p.vu.obj, dst) + case StrictArray: + dst = EncodeArray(&p.vu.obj, dst) default: - // ??? log.Println("amfPropEncode: invalid type!") + // ??? log.Println("PropEncode: invalid type!") dst = nil } return dst } -func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 { +func PropDecode(prop *Property, data []byte, bDecodeName int32) int32 { prop.name = "" nOriginalSize := len(data) @@ -319,14 +326,14 @@ func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 { } if bDecodeName != 0 { - nNameSize := amfDecodeInt16(data[:2]) + nNameSize := DecodeInt16(data[:2]) if int(nNameSize) > len(data)-2 { // TODO use new logger here //RTMLog(RTMLOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2",__FUNCTION__, nNameSize, nSize); return -1 } - prop.name = amfDecodeString(data) + prop.name = DecodeString(data) data = data[2+nNameSize:] } @@ -334,85 +341,85 @@ func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 { return -1 } - prop.atype = AMFDataType(data[0]) + prop.atype = DataType(data[0]) data = data[1:] var nRes int32 switch prop.atype { - case amfNumber: + case Number: if len(data) < 8 { return -1 } - prop.vu.number = amfDecodeNumber(data[:8]) + prop.vu.number = DecodeNumber(data[:8]) data = data[8:] - case amfBoolean: - panic("amfBoolean not supported") + case Boolean: + panic("Boolean not supported") - case amfString: - nStringSize := amfDecodeInt16(data[:2]) + case String: + nStringSize := DecodeInt16(data[:2]) if len(data) < int(nStringSize+2) { return -1 } - prop.vu.aval = amfDecodeString(data) + prop.vu.aval = DecodeString(data) data = data[2+nStringSize:] - case amfObject: - nRes := amfDecode(&prop.vu.obj, data, 1) + case Object: + nRes := Decode(&prop.vu.obj, data, 1) if nRes == -1 { return -1 } data = data[nRes:] - case amfMovieClip: + case MovieClip: // TODO use new logger here - // ??? log.Println("AMFProDecode: MAF_MOVIECLIP reserved!") - //RTMLog(RTMLOGERROR, "amfMovieClip reserved!"); + // ??? log.Println("PropDecode: MAF_MOVIECLIP reserved!") + //RTMLog(RTMLOGERROR, "MovieClip reserved!"); return -1 - case amfNull, amfUndefined, amfUnsupported: - prop.atype = amfNull + case Null, Undefined, Unsupported: + prop.atype = Null - case amfReference: + case Reference: // TODO use new logger here - // ??? log.Println("AMFProDecode: amfReference not supported!") - //RTMLog(RTMLOGERROR, "amfReference not supported!"); + // ??? log.Println("PropDecode: Reference not supported!") + //RTMLog(RTMLOGERROR, "Reference not supported!"); return -1 - case amfEcmaArray: + case EcmaArray: // next comes the rest, mixed array has a final 0x000009 mark and names, so its an object data = data[4:] - nRes = amfDecode(&prop.vu.obj, data, 1) + nRes = Decode(&prop.vu.obj, data, 1) if nRes == -1 { return -1 } data = data[nRes:] - case amfObjectEnd: + case ObjectEnd: return -1 - case amfStrictArray: - panic("amfStrictArray not supported") + case StrictArray: + panic("StrictArray not supported") - case amfDate: - panic("amfDate not supported") + case Date: + panic("Date not supported") - case amfLongSring, amfXmlDoc: - panic("amfLongSring, amfXmlDoc not supported") + case LongSring, XmlDoc: + panic("LongSring, XmlDoc not supported") - case amfRecordset: + case Recordset: // TODO use new logger here - // ??? log.Println("AMFProDecode: amfRecordset reserved!") - //RTMLog(RTMLOGERROR, "amfRecordset reserved!"); + // ??? log.Println("PropDecode: Recordset reserved!") + //RTMLog(RTMLOGERROR, "Recordset reserved!"); return -1 - case amfTypedObject: + case TypedObject: // TODO use new logger here - // RTMLog(RTMLOGERROR, "amfTyped_object not supported!") + // RTMLog(RTMLOGERROR, "Typed_object not supported!") return -1 - case amfAvmplus: - panic("amfAvmplus not supported") + case Avmplus: + panic("Avmplus not supported") default: // TODO use new logger here @@ -424,28 +431,28 @@ func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 { return int32(nOriginalSize - len(data)) } -func amfPropReset(prop *AMFProperty) { - if prop.atype == amfObject || prop.atype == amfEcmaArray || - prop.atype == amfStrictArray { - amfReset(&prop.vu.obj) +func PropReset(prop *Property) { + if prop.atype == Object || prop.atype == EcmaArray || + prop.atype == StrictArray { + Reset(&prop.vu.obj) } else { prop.vu.aval = "" } - prop.atype = amfInvalid + prop.atype = Invalid } -func amfEncode(a *AMF, dst []byte) []byte { +func Encode(a *AMF, dst []byte) []byte { if len(dst) < 5 { return nil } - dst[0] = amfObject + dst[0] = Object dst = dst[1:] - for i := 0; i < len(a.props); i++ { - dst = amfPropEncode(&a.props[i], dst) + for i := 0; i < len(a.Props); i++ { + dst = PropEncode(&a.Props[i], dst) if dst == nil { - // ??? log.Println("amfEncode: failed to encode property in index") + // ??? log.Println("Encode: failed to encode property in index") break } } @@ -453,23 +460,23 @@ func amfEncode(a *AMF, dst []byte) []byte { if len(dst) < 4 { return nil } - return amfEncodeInt24(dst, amfObjectEnd) + return EncodeInt24(dst, ObjectEnd) } -func amfEncodeEcmaArray(a *AMF, dst []byte) []byte { +func EncodeEcmaArray(a *AMF, dst []byte) []byte { if len(dst) < 5 { return nil } - dst[0] = amfEcmaArray + dst[0] = EcmaArray dst = dst[1:] - binary.BigEndian.PutUint32(dst[:4], uint32(len(a.props))) + binary.BigEndian.PutUint32(dst[:4], uint32(len(a.Props))) dst = dst[4:] - for i := 0; i < len(a.props); i++ { - dst = amfPropEncode(&a.props[i], dst) + for i := 0; i < len(a.Props); i++ { + dst = PropEncode(&a.Props[i], dst) if dst == nil { - // ??? log.Println("amfEncodeEcmaArray: failed to encode property!") + // ??? log.Println("EncodeEcmaArray: failed to encode property!") break } } @@ -477,24 +484,24 @@ func amfEncodeEcmaArray(a *AMF, dst []byte) []byte { if len(dst) < 4 { return nil } - return amfEncodeInt24(dst, amfObjectEnd) + return EncodeInt24(dst, ObjectEnd) } // not used? -func amfEncodeArray(a *AMF, dst []byte) []byte { +func EncodeArray(a *AMF, dst []byte) []byte { if len(dst) < 5 { return nil } - dst[0] = amfStrictArray + dst[0] = StrictArray dst = dst[1:] - binary.BigEndian.PutUint32(dst[:4], uint32(len(a.props))) + binary.BigEndian.PutUint32(dst[:4], uint32(len(a.Props))) dst = dst[4:] - for i := 0; i < len(a.props); i++ { - dst = amfPropEncode(&a.props[i], dst) + for i := 0; i < len(a.Props); i++ { + dst = PropEncode(&a.Props[i], dst) if dst == nil { - // ??? log.Println("amfEncodeArray: failed to encode property!") + // ??? log.Println("EncodeArray: failed to encode property!") break } } @@ -502,48 +509,48 @@ func amfEncodeArray(a *AMF, dst []byte) []byte { return dst } -func amfDecode(a *AMF, data []byte, bDecodeName int32) int32 { +func Decode(a *AMF, data []byte, bDecodeName int32) int32 { nOriginalSize := len(data) - a.props = a.props[:0] + a.Props = a.Props[:0] for len(data) != 0 { - if len(data) >= 3 && amfDecodeInt24(data[:3]) == amfObjectEnd { + if len(data) >= 3 && DecodeInt24(data[:3]) == ObjectEnd { data = data[3:] break } - var prop AMFProperty - nRes := amfProDecode(&prop, data, bDecodeName) - // nRes = int32(C.AMFProDecode(&prop, (*byte)(unsafe.Pointer(pBuffer)), + var prop Property + nRes := PropDecode(&prop, data, bDecodeName) + // nRes = int32(C.PropDecode(&prop, (*byte)(unsafe.Pointer(pBuffer)), // int32(nSize), int32(bDecodeName))) if nRes == -1 { return -1 } data = data[nRes:] - a.props = append(a.props, prop) + a.Props = append(a.Props, prop) } return int32(nOriginalSize - len(data)) } -func amfGetProp(a *AMF, name string, idx int32) *AMFProperty { +func GetProp(a *AMF, name string, idx int32) *Property { if idx >= 0 { - if idx < int32(len(a.props)) { - return &a.props[idx] + if idx < int32(len(a.Props)) { + return &a.Props[idx] } } else { - for i, p := range a.props { + for i, p := range a.Props { if p.name == name { - return &a.props[i] + return &a.Props[i] } } } - return &amfPropInvalid + return &PropInvalid } -func amfReset(a *AMF) { - for i := range a.props { - amfPropReset(&a.props[i]) +func Reset(a *AMF) { + for i := range a.Props { + PropReset(&a.Props[i]) } *a = AMF{} } diff --git a/rtmp/packet.go b/rtmp/packet.go index a61d7bc3..adbe32f3 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -37,6 +37,8 @@ package rtmp import ( "encoding/binary" "io" + + "bitbucket.org/ausocean/av/rtmp/amf" ) // Packet types. @@ -182,9 +184,9 @@ func (pkt *packet) read(s *Session) error { hSize := len(hbuf) - len(header) + size if size >= 3 { - pkt.timestamp = amfDecodeInt24(header[:3]) + pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { - pkt.bodySize = amfDecodeInt24(header[3:6]) + pkt.bodySize = amf.DecodeInt24(header[3:6]) pkt.bytesRead = 0 if size > 6 { @@ -205,7 +207,7 @@ func (pkt *packet) read(s *Session) error { return err } // TODO: port this - pkt.timestamp = amfDecodeInt32(header[size : size+4]) + pkt.timestamp = amf.DecodeInt32(header[size : size+4]) hSize += 4 } @@ -392,12 +394,12 @@ func (pkt *packet) write(s *Session, queue bool) error { if ts > 0xffffff { res = 0xffffff } - amfEncodeInt24(headBytes[headerIdx:], int32(res)) + amf.EncodeInt24(headBytes[headerIdx:], int32(res)) headerIdx += 3 // 24bits } if headerSizes[pkt.headerType] > 4 { - amfEncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) + amf.EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ @@ -409,7 +411,7 @@ func (pkt *packet) write(s *Session, queue bool) error { } if ts >= 0xffffff { - amfEncodeInt32(headBytes[headerIdx:], int32(ts)) + amf.EncodeInt32(headBytes[headerIdx:], int32(ts)) headerIdx += 4 // 32bits } @@ -479,7 +481,7 @@ func (pkt *packet) write(s *Session, queue bool) error { } if ts >= 0xffffff { extendedTimestamp := headBytes[origIdx+1+cSize:] - amfEncodeInt32(extendedTimestamp[:4], int32(ts)) + amf.EncodeInt32(extendedTimestamp[:4], int32(ts)) } } } @@ -487,12 +489,12 @@ func (pkt *packet) write(s *Session, queue bool) error { // We invoked a remote method if pkt.packetType == packetTypeInvoke { buf := pkt.body[1:] - meth := amfDecodeString(buf) + meth := amf.DecodeString(buf) s.log(DebugLevel, pkg+"invoking method "+meth) // keep it in call queue till result arrives if queue { buf = buf[3+len(meth):] - txn := int32(amfDecodeNumber(buf[:8])) + txn := int32(amf.DecodeNumber(buf[:8])) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) } } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index c530ed38..0088613d 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -42,6 +42,8 @@ import ( "net" "strconv" "time" + + "bitbucket.org/ausocean/av/rtmp/amf" ) const ( @@ -252,20 +254,20 @@ func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case packetTypeChunkSize: if pkt.bodySize >= 4 { - s.inChunkSize = int32(amfDecodeInt32(pkt.body[:4])) + s.inChunkSize = int32(amf.DecodeInt32(pkt.body[:4])) } case packetTypeBytesReadReport: - s.serverBW = int32(amfDecodeInt32(pkt.body[:4])) + s.serverBW = int32(amf.DecodeInt32(pkt.body[:4])) case packetTypeControl: s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") case packetTypeServerBW: - s.serverBW = int32(amfDecodeInt32(pkt.body[:4])) + s.serverBW = int32(amf.DecodeInt32(pkt.body[:4])) case packetTypeClientBW: - s.clientBW = int32(amfDecodeInt32(pkt.body[:4])) + s.clientBW = int32(amf.DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] } else { @@ -312,72 +314,72 @@ func sendConnectPacket(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, avConnect) + enc = amf.EncodeString(enc, avConnect) if enc == nil { return errEncoding } s.numInvokes += 1 - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfObject + enc[0] = amf.Object enc = enc[1:] - enc = amfEncodeNamedString(enc, avApp, s.link.app) + enc = amf.EncodeNamedString(enc, avApp, s.link.app) if enc == nil { return errEncoding } if s.link.protocol&featureWrite != 0 { - enc = amfEncodeNamedString(enc, avType, avNonprivate) + enc = amf.EncodeNamedString(enc, avType, avNonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { - enc = amfEncodeNamedString(enc, avFlashver, s.link.flashVer) + enc = amf.EncodeNamedString(enc, avFlashver, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { - enc = amfEncodeNamedString(enc, avSwfUrl, s.link.swfUrl) + enc = amf.EncodeNamedString(enc, avSwfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { - enc = amfEncodeNamedString(enc, avTcUrl, s.link.tcUrl) + enc = amf.EncodeNamedString(enc, avTcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } if s.link.protocol&featureWrite == 0 { - enc = amfEncodeNamedBoolean(enc, avFpad, false) + enc = amf.EncodeNamedBoolean(enc, avFpad, false) if enc == nil { return errEncoding } - enc = amfEncodeNamedNumber(enc, avCapabilities, 15) + enc = amf.EncodeNamedNumber(enc, avCapabilities, 15) if enc == nil { return errEncoding } - enc = amfEncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) + enc = amf.EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) if enc == nil { return errEncoding } - enc = amfEncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) + enc = amf.EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) if enc == nil { return errEncoding } - enc = amfEncodeNamedNumber(enc, avVideoFunction, 1) + enc = amf.EncodeNamedNumber(enc, avVideoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { - enc = amfEncodeNamedString(enc, avPageUrl, s.link.pageUrl) + enc = amf.EncodeNamedString(enc, avPageUrl, s.link.pageUrl) if enc == nil { return errEncoding } @@ -385,29 +387,29 @@ func sendConnectPacket(s *Session) error { } if s.encoding != 0.0 || s.sendEncoding { - enc = amfEncodeNamedNumber(enc, avObjectEncoding, s.encoding) + enc = amf.EncodeNamedNumber(enc, avObjectEncoding, s.encoding) if enc == nil { return errEncoding } } - copy(enc, []byte{0, 0, amfObjectEnd}) + copy(enc, []byte{0, 0, amf.ObjectEnd}) enc = enc[3:] // add auth string if s.link.auth != "" { - enc = amfEncodeBoolean(enc, s.link.flags&linkAuth != 0) + enc = amf.EncodeBoolean(enc, s.link.flags&linkAuth != 0) if enc == nil { return errEncoding } - enc = amfEncodeString(enc, s.link.auth) + enc = amf.EncodeString(enc, s.link.auth) if enc == nil { return errEncoding } } - for i := range s.link.extras.props { - enc = amfPropEncode(&s.link.extras.props[i], enc) + for i := range s.link.extras.Props { + enc = amf.PropEncode(&s.link.extras.Props[i], enc) if enc == nil { return errEncoding } @@ -429,16 +431,16 @@ func sendCreateStream(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, avCreatestream) + enc = amf.EncodeString(enc, avCreatestream) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) @@ -457,18 +459,18 @@ func sendReleaseStream(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, avReleasestream) + enc = amf.EncodeString(enc, avReleasestream) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] - enc = amfEncodeString(enc, s.link.playpath) + enc = amf.EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } @@ -488,18 +490,18 @@ func sendFCPublish(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, avFCPublish) + enc = amf.EncodeString(enc, avFCPublish) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] - enc = amfEncodeString(enc, s.link.playpath) + enc = amf.EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } @@ -520,18 +522,18 @@ func sendFCUnpublish(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, avFCUnpublish) + enc = amf.EncodeString(enc, avFCUnpublish) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] - enc = amfEncodeString(enc, s.link.playpath) + enc = amf.EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } @@ -552,22 +554,22 @@ func sendPublish(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, avPublish) + enc = amf.EncodeString(enc, avPublish) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] - enc = amfEncodeString(enc, s.link.playpath) + enc = amf.EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - enc = amfEncodeString(enc, avLive) + enc = amf.EncodeString(enc, avLive) if enc == nil { return errEncoding } @@ -588,18 +590,18 @@ func sendDeleteStream(s *Session, dStreamId float64) error { } enc := pkt.body - enc = amfEncodeString(enc, avDeletestream) + enc = amf.EncodeString(enc, avDeletestream) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] - enc = amfEncodeNumber(enc, dStreamId) + enc = amf.EncodeNumber(enc, dStreamId) if enc == nil { return errEncoding } @@ -621,7 +623,7 @@ func sendBytesReceived(s *Session) error { enc := pkt.body s.nBytesInSent = s.nBytesIn - enc = amfEncodeInt32(enc, s.nBytesIn) + enc = amf.EncodeInt32(enc, s.nBytesIn) if enc == nil { return errEncoding } @@ -641,16 +643,16 @@ func sendCheckBW(s *Session) error { } enc := pkt.body - enc = amfEncodeString(enc, av_checkbw) + enc = amf.EncodeString(enc, av_checkbw) if enc == nil { return errEncoding } s.numInvokes++ - enc = amfEncodeNumber(enc, float64(s.numInvokes)) + enc = amf.EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } - enc[0] = amfNull + enc[0] = amf.Null enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) @@ -670,15 +672,15 @@ func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } - var obj AMF - nRes := amfDecode(&obj, body, 0) + var obj amf.AMF + nRes := amf.Decode(&obj, body, 0) if nRes < 0 { return errDecoding } - meth := amfPropGetString(amfGetProp(&obj, "", 0)) + meth := amf.PropGetString(amf.GetProp(&obj, "", 0)) s.log(DebugLevel, pkg+"invoking method "+meth) - txn := amfPropGetNumber(amfGetProp(&obj, "", 1)) + txn := amf.PropGetNumber(amf.GetProp(&obj, "", 1)) switch meth { case av_result: @@ -718,7 +720,7 @@ func handleInvoke(s *Session, body []byte) error { } case avCreatestream: - s.streamID = int32(amfPropGetNumber(amfGetProp(&obj, "", 3))) + s.streamID = int32(amf.PropGetNumber(amf.GetProp(&obj, "", 3))) if s.link.protocol&featureWrite == 0 { return errNotWritable } @@ -755,10 +757,10 @@ func handleInvoke(s *Session, body []byte) error { s.log(FatalLevel, pkg+"unsupported method avClose") case avOnStatus: - var obj2 AMF - amfPropGetObject(amfGetProp(&obj, "", 3), &obj2) - code := amfPropGetString(amfGetProp(&obj2, avCode, -1)) - level := amfPropGetString(amfGetProp(&obj2, avLevel, -1)) + var obj2 amf.AMF + amf.PropGetObject(amf.GetProp(&obj, "", 3), &obj2) + code := amf.PropGetString(amf.GetProp(&obj2, avCode, -1)) + level := amf.PropGetString(amf.GetProp(&obj2, avLevel, -1)) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { @@ -797,7 +799,7 @@ func handleInvoke(s *Session, body []byte) error { s.log(FatalLevel, pkg+"unknown method "+meth) } leave: - amfReset(&obj) + amf.Reset(&obj) return nil } diff --git a/rtmp/session.go b/rtmp/session.go index aaba6e85..62496921 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -37,6 +37,8 @@ import ( "io" "net" "time" + + "bitbucket.org/ausocean/av/rtmp/amf" ) // Session holds the state for an RTMP session. @@ -79,7 +81,7 @@ type link struct { auth string flashVer string token string - extras AMF + extras amf.AMF flags int32 swfAge int32 protocol int32 @@ -187,8 +189,8 @@ func (s *Session) Write(data []byte) (int, error) { pkt := packet{ packetType: data[0], - bodySize: amfDecodeInt24(data[1:4]), - timestamp: amfDecodeInt24(data[4:7]) | uint32(data[7])<<24, + bodySize: amf.DecodeInt24(data[1:4]), + timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: chanSource, info: s.streamID, }