Moved AMF functions into a new rtmp/amf package; names otherwise unchanged.

This commit is contained in:
scruzin 2019-01-12 07:13:27 +10:30
parent 355e069b5b
commit f13d4010cc
4 changed files with 241 additions and 228 deletions

View File

@ -33,42 +33,51 @@ LICENSE
Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu Copyright (C) 2009-2010 Howard Chu
*/ */
package rtmp
// amf implements Action Message Format (AMF) encoding and decoding.
// See https://en.wikipedia.org/wiki/Action_Message_Format.
package amf
import ( import (
"encoding/binary" "encoding/binary"
"math" "math"
) )
// AMF data types.
const ( const (
amfNumber = iota Number = iota
amfBoolean Boolean
amfString String
amfObject Object
amfMovieClip // reserved, not implemented MovieClip // reserved, not implemented
amfNull Null
amfUndefined Undefined
amfReference Reference
amfEcmaArray EcmaArray
amfObjectEnd ObjectEnd
amfStrictArray StrictArray
amfDate Date
amfLongSring LongSring
amfUnsupported Unsupported
amfRecordset // reserved, not implemented Recordset // reserved, not implemented
amfXmlDoc XmlDoc
amfTypedObject TypedObject
amfAvmplus // reserved, not implemented Avmplus // reserved, not implemented
amfInvalid = 0xff Invalid = 0xff
) )
// DataType represents an AMF data type, which is simply an index into the above.
type DataType int32
// AMF represents an AMF message, which is simply a collection of properties.
type AMF struct { type AMF struct {
props []AMFProperty Props []Property // ToDo: consider not exporting this
} }
type AMFProperty struct { // Property represents an AMF property.
type Property struct {
name string name string
atype AMFDataType atype DataType
vu vu vu vu
UTCoffset int16 UTCoffset int16
} }
@ -79,45 +88,43 @@ type vu struct {
obj AMF obj AMF
} }
type AMFDataType int32
var ( var (
amfObjInvalid AMF ObjInvalid AMF
amfPropInvalid = AMFProperty{atype: amfInvalid} PropInvalid = Property{atype: Invalid}
) )
func amfDecodeInt16(data []byte) uint16 { func DecodeInt16(data []byte) uint16 {
return uint16(binary.BigEndian.Uint16(data)) return uint16(binary.BigEndian.Uint16(data))
} }
func amfDecodeInt24(data []byte) uint32 { func DecodeInt24(data []byte) uint32 {
return uint32(data[0])<<16 | uint32(data[1])<<8 | uint32(data[2]) return uint32(data[0])<<16 | uint32(data[1])<<8 | uint32(data[2])
// return uint16(data[0])<<8 | uint16(data[1]) // return uint16(data[0])<<8 | uint16(data[1])
} }
func amfDecodeInt32(data []byte) uint32 { func DecodeInt32(data []byte) uint32 {
return uint32(binary.BigEndian.Uint32(data)) return uint32(binary.BigEndian.Uint32(data))
} }
func amfDecodeString(data []byte) string { func DecodeString(data []byte) string {
n := amfDecodeInt16(data) n := DecodeInt16(data)
return string(data[2 : 2+n]) return string(data[2 : 2+n])
} }
func amfDecodeLongString(data []byte) string { func DecodeLongString(data []byte) string {
n := amfDecodeInt32(data) n := DecodeInt32(data)
return string(data[2 : 2+n]) return string(data[2 : 2+n])
} }
func amfDecodeNumber(data []byte) float64 { func DecodeNumber(data []byte) float64 {
return math.Float64frombits(binary.BigEndian.Uint64(data)) return math.Float64frombits(binary.BigEndian.Uint64(data))
} }
func amfDecodeBoolean(data []byte) bool { func DecodeBoolean(data []byte) bool {
return data[0] != 0 return data[0] != 0
} }
func amfEncodeInt24(dst []byte, val int32) []byte { func EncodeInt24(dst []byte, val int32) []byte {
if len(dst) < 3 { if len(dst) < 3 {
return nil return nil
} }
@ -130,7 +137,7 @@ func amfEncodeInt24(dst []byte, val int32) []byte {
return dst[3:] return dst[3:]
} }
func amfEncodeInt32(dst []byte, val int32) []byte { func EncodeInt32(dst []byte, val int32) []byte {
if len(dst) < 4 { if len(dst) < 4 {
return nil return nil
} }
@ -141,7 +148,7 @@ func amfEncodeInt32(dst []byte, val int32) []byte {
return dst[4:] return dst[4:]
} }
func amfEncodeString(dst []byte, val string) []byte { func EncodeString(dst []byte, val string) []byte {
const typeSize = 1 const typeSize = 1
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(dst) { if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(dst) {
return nil return nil
@ -151,7 +158,7 @@ func amfEncodeString(dst []byte, val string) []byte {
} }
if len(val) < 65536 { if len(val) < 65536 {
dst[0] = amfString dst[0] = String
dst = dst[1:] dst = dst[1:]
binary.BigEndian.PutUint16(dst[:2], uint16(len(val))) binary.BigEndian.PutUint16(dst[:2], uint16(len(val)))
dst = dst[2:] dst = dst[2:]
@ -161,7 +168,7 @@ func amfEncodeString(dst []byte, val string) []byte {
} }
return dst[len(val):] return dst[len(val):]
} }
dst[0] = amfLongSring dst[0] = LongSring
dst = dst[1:] dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(val))) binary.BigEndian.PutUint32(dst[:4], uint32(len(val)))
dst = dst[4:] dst = dst[4:]
@ -172,21 +179,21 @@ func amfEncodeString(dst []byte, val string) []byte {
return dst[len(val):] return dst[len(val):]
} }
func amfEncodeNumber(dst []byte, val float64) []byte { func EncodeNumber(dst []byte, val float64) []byte {
if len(dst) < 9 { if len(dst) < 9 {
return nil return nil
} }
dst[0] = amfNumber dst[0] = Number
dst = dst[1:] dst = dst[1:]
binary.BigEndian.PutUint64(dst, math.Float64bits(val)) binary.BigEndian.PutUint64(dst, math.Float64bits(val))
return dst[8:] return dst[8:]
} }
func amfEncodeBoolean(dst []byte, val bool) []byte { func EncodeBoolean(dst []byte, val bool) []byte {
if len(dst) < 2 { if len(dst) < 2 {
return nil return nil
} }
dst[0] = amfBoolean dst[0] = Boolean
if val { if val {
dst[1] = 1 dst[1] = 1
} }
@ -197,7 +204,7 @@ func amfEncodeBoolean(dst []byte, val bool) []byte {
} }
func amfEncodeNamedString(dst []byte, key, val string) []byte { func EncodeNamedString(dst []byte, key, val string) []byte {
if 2+len(key) > len(dst) { if 2+len(key) > len(dst) {
return nil return nil
} }
@ -207,10 +214,10 @@ func amfEncodeNamedString(dst []byte, key, val string) []byte {
if len(key) == len(dst) { if len(key) == len(dst) {
return nil return nil
} }
return amfEncodeString(dst[len(key):], val) return EncodeString(dst[len(key):], val)
} }
func amfEncodeNamedNumber(dst []byte, key string, val float64) []byte { func EncodeNamedNumber(dst []byte, key string, val float64) []byte {
if 2+len(key) > len(dst) { if 2+len(key) > len(dst) {
return nil return nil
} }
@ -220,10 +227,10 @@ func amfEncodeNamedNumber(dst []byte, key string, val float64) []byte {
if len(key) == len(dst) { if len(key) == len(dst) {
return nil return nil
} }
return amfEncodeNumber(dst[len(key):], val) return EncodeNumber(dst[len(key):], val)
} }
func amfEncodeNamedBoolean(dst []byte, key string, val bool) []byte { func EncodeNamedBoolean(dst []byte, key string, val bool) []byte {
if 2+len(key) > len(dst) { if 2+len(key) > len(dst) {
return nil return nil
} }
@ -233,42 +240,42 @@ func amfEncodeNamedBoolean(dst []byte, key string, val bool) []byte {
if len(key) == len(dst) { if len(key) == len(dst) {
return nil return nil
} }
return amfEncodeBoolean(dst[len(key):], val) return EncodeBoolean(dst[len(key):], val)
} }
func amfPropSetName(prop *AMFProperty, name string) { func PropSetName(prop *Property, name string) {
prop.name = name prop.name = name
} }
func amfPropGetNumber(prop *AMFProperty) float64 { func PropGetNumber(prop *Property) float64 {
return prop.vu.number return prop.vu.number
} }
func amfPropGetString(prop *AMFProperty) string { func PropGetString(prop *Property) string {
if prop.atype == amfString { if prop.atype == String {
return prop.vu.aval return prop.vu.aval
} }
return "" return ""
} }
func amfPropGetObject(prop *AMFProperty, a *AMF) { func PropGetObject(prop *Property, a *AMF) {
if prop.atype == amfObject { if prop.atype == Object {
*a = prop.vu.obj *a = prop.vu.obj
} else { } else {
*a = amfObjInvalid *a = ObjInvalid
} }
} }
func amfPropEncode(p *AMFProperty, dst []byte) []byte { func PropEncode(p *Property, dst []byte) []byte {
if p.atype == amfInvalid { if p.atype == Invalid {
return nil return nil
} }
if p.atype != amfNull && len(p.name)+2+1 >= len(dst) { if p.atype != Null && len(p.name)+2+1 >= len(dst) {
return nil return nil
} }
if p.atype != amfNull && len(p.name) != 0 { if p.atype != Null && len(p.name) != 0 {
binary.BigEndian.PutUint16(dst[:2], uint16(len(p.name))) binary.BigEndian.PutUint16(dst[:2], uint16(len(p.name)))
dst = dst[2:] dst = dst[2:]
copy(dst, p.name) copy(dst, p.name)
@ -276,32 +283,32 @@ func amfPropEncode(p *AMFProperty, dst []byte) []byte {
} }
switch p.atype { switch p.atype {
case amfNumber: case Number:
dst = amfEncodeNumber(dst, p.vu.number) dst = EncodeNumber(dst, p.vu.number)
case amfBoolean: case Boolean:
dst = amfEncodeBoolean(dst, p.vu.number != 0) dst = EncodeBoolean(dst, p.vu.number != 0)
case amfString: case String:
dst = amfEncodeString(dst, p.vu.aval) dst = EncodeString(dst, p.vu.aval)
case amfNull: case Null:
if len(dst) < 2 { if len(dst) < 2 {
return nil return nil
} }
dst[0] = amfNull dst[0] = Null
dst = dst[1:] dst = dst[1:]
case amfObject: case Object:
dst = amfEncode(&p.vu.obj, dst) dst = Encode(&p.vu.obj, dst)
case amfEcmaArray: case EcmaArray:
dst = amfEncodeEcmaArray(&p.vu.obj, dst) dst = EncodeEcmaArray(&p.vu.obj, dst)
case amfStrictArray: case StrictArray:
dst = amfEncodeArray(&p.vu.obj, dst) dst = EncodeArray(&p.vu.obj, dst)
default: default:
// ??? log.Println("amfPropEncode: invalid type!") // ??? log.Println("PropEncode: invalid type!")
dst = nil dst = nil
} }
return dst return dst
} }
func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 { func PropDecode(prop *Property, data []byte, bDecodeName int32) int32 {
prop.name = "" prop.name = ""
nOriginalSize := len(data) nOriginalSize := len(data)
@ -319,14 +326,14 @@ func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 {
} }
if bDecodeName != 0 { if bDecodeName != 0 {
nNameSize := amfDecodeInt16(data[:2]) nNameSize := DecodeInt16(data[:2])
if int(nNameSize) > len(data)-2 { if int(nNameSize) > len(data)-2 {
// TODO use new logger here // TODO use new logger here
//RTMLog(RTMLOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2",__FUNCTION__, nNameSize, nSize); //RTMLog(RTMLOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2",__FUNCTION__, nNameSize, nSize);
return -1 return -1
} }
prop.name = amfDecodeString(data) prop.name = DecodeString(data)
data = data[2+nNameSize:] data = data[2+nNameSize:]
} }
@ -334,85 +341,85 @@ func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 {
return -1 return -1
} }
prop.atype = AMFDataType(data[0]) prop.atype = DataType(data[0])
data = data[1:] data = data[1:]
var nRes int32 var nRes int32
switch prop.atype { switch prop.atype {
case amfNumber: case Number:
if len(data) < 8 { if len(data) < 8 {
return -1 return -1
} }
prop.vu.number = amfDecodeNumber(data[:8]) prop.vu.number = DecodeNumber(data[:8])
data = data[8:] data = data[8:]
case amfBoolean: case Boolean:
panic("amfBoolean not supported") panic("Boolean not supported")
case amfString: case String:
nStringSize := amfDecodeInt16(data[:2]) nStringSize := DecodeInt16(data[:2])
if len(data) < int(nStringSize+2) { if len(data) < int(nStringSize+2) {
return -1 return -1
} }
prop.vu.aval = amfDecodeString(data) prop.vu.aval = DecodeString(data)
data = data[2+nStringSize:] data = data[2+nStringSize:]
case amfObject: case Object:
nRes := amfDecode(&prop.vu.obj, data, 1) nRes := Decode(&prop.vu.obj, data, 1)
if nRes == -1 { if nRes == -1 {
return -1 return -1
} }
data = data[nRes:] data = data[nRes:]
case amfMovieClip: case MovieClip:
// TODO use new logger here // TODO use new logger here
// ??? log.Println("AMFProDecode: MAF_MOVIECLIP reserved!") // ??? log.Println("PropDecode: MAF_MOVIECLIP reserved!")
//RTMLog(RTMLOGERROR, "amfMovieClip reserved!"); //RTMLog(RTMLOGERROR, "MovieClip reserved!");
return -1 return -1
case amfNull, amfUndefined, amfUnsupported: case Null, Undefined, Unsupported:
prop.atype = amfNull prop.atype = Null
case amfReference: case Reference:
// TODO use new logger here // TODO use new logger here
// ??? log.Println("AMFProDecode: amfReference not supported!") // ??? log.Println("PropDecode: Reference not supported!")
//RTMLog(RTMLOGERROR, "amfReference not supported!"); //RTMLog(RTMLOGERROR, "Reference not supported!");
return -1 return -1
case amfEcmaArray: case EcmaArray:
// next comes the rest, mixed array has a final 0x000009 mark and names, so its an object // next comes the rest, mixed array has a final 0x000009 mark and names, so its an object
data = data[4:] data = data[4:]
nRes = amfDecode(&prop.vu.obj, data, 1) nRes = Decode(&prop.vu.obj, data, 1)
if nRes == -1 { if nRes == -1 {
return -1 return -1
} }
data = data[nRes:] data = data[nRes:]
case amfObjectEnd: case ObjectEnd:
return -1 return -1
case amfStrictArray: case StrictArray:
panic("amfStrictArray not supported") panic("StrictArray not supported")
case amfDate: case Date:
panic("amfDate not supported") panic("Date not supported")
case amfLongSring, amfXmlDoc: case LongSring, XmlDoc:
panic("amfLongSring, amfXmlDoc not supported") panic("LongSring, XmlDoc not supported")
case amfRecordset: case Recordset:
// TODO use new logger here // TODO use new logger here
// ??? log.Println("AMFProDecode: amfRecordset reserved!") // ??? log.Println("PropDecode: Recordset reserved!")
//RTMLog(RTMLOGERROR, "amfRecordset reserved!"); //RTMLog(RTMLOGERROR, "Recordset reserved!");
return -1 return -1
case amfTypedObject: case TypedObject:
// TODO use new logger here // TODO use new logger here
// RTMLog(RTMLOGERROR, "amfTyped_object not supported!") // RTMLog(RTMLOGERROR, "Typed_object not supported!")
return -1 return -1
case amfAvmplus: case Avmplus:
panic("amfAvmplus not supported") panic("Avmplus not supported")
default: default:
// TODO use new logger here // TODO use new logger here
@ -424,28 +431,28 @@ func amfProDecode(prop *AMFProperty, data []byte, bDecodeName int32) int32 {
return int32(nOriginalSize - len(data)) return int32(nOriginalSize - len(data))
} }
func amfPropReset(prop *AMFProperty) { func PropReset(prop *Property) {
if prop.atype == amfObject || prop.atype == amfEcmaArray || if prop.atype == Object || prop.atype == EcmaArray ||
prop.atype == amfStrictArray { prop.atype == StrictArray {
amfReset(&prop.vu.obj) Reset(&prop.vu.obj)
} else { } else {
prop.vu.aval = "" prop.vu.aval = ""
} }
prop.atype = amfInvalid prop.atype = Invalid
} }
func amfEncode(a *AMF, dst []byte) []byte { func Encode(a *AMF, dst []byte) []byte {
if len(dst) < 5 { if len(dst) < 5 {
return nil return nil
} }
dst[0] = amfObject dst[0] = Object
dst = dst[1:] dst = dst[1:]
for i := 0; i < len(a.props); i++ { for i := 0; i < len(a.Props); i++ {
dst = amfPropEncode(&a.props[i], dst) dst = PropEncode(&a.Props[i], dst)
if dst == nil { if dst == nil {
// ??? log.Println("amfEncode: failed to encode property in index") // ??? log.Println("Encode: failed to encode property in index")
break break
} }
} }
@ -453,23 +460,23 @@ func amfEncode(a *AMF, dst []byte) []byte {
if len(dst) < 4 { if len(dst) < 4 {
return nil return nil
} }
return amfEncodeInt24(dst, amfObjectEnd) return EncodeInt24(dst, ObjectEnd)
} }
func amfEncodeEcmaArray(a *AMF, dst []byte) []byte { func EncodeEcmaArray(a *AMF, dst []byte) []byte {
if len(dst) < 5 { if len(dst) < 5 {
return nil return nil
} }
dst[0] = amfEcmaArray dst[0] = EcmaArray
dst = dst[1:] dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(a.props))) binary.BigEndian.PutUint32(dst[:4], uint32(len(a.Props)))
dst = dst[4:] dst = dst[4:]
for i := 0; i < len(a.props); i++ { for i := 0; i < len(a.Props); i++ {
dst = amfPropEncode(&a.props[i], dst) dst = PropEncode(&a.Props[i], dst)
if dst == nil { if dst == nil {
// ??? log.Println("amfEncodeEcmaArray: failed to encode property!") // ??? log.Println("EncodeEcmaArray: failed to encode property!")
break break
} }
} }
@ -477,24 +484,24 @@ func amfEncodeEcmaArray(a *AMF, dst []byte) []byte {
if len(dst) < 4 { if len(dst) < 4 {
return nil return nil
} }
return amfEncodeInt24(dst, amfObjectEnd) return EncodeInt24(dst, ObjectEnd)
} }
// not used? // not used?
func amfEncodeArray(a *AMF, dst []byte) []byte { func EncodeArray(a *AMF, dst []byte) []byte {
if len(dst) < 5 { if len(dst) < 5 {
return nil return nil
} }
dst[0] = amfStrictArray dst[0] = StrictArray
dst = dst[1:] dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(a.props))) binary.BigEndian.PutUint32(dst[:4], uint32(len(a.Props)))
dst = dst[4:] dst = dst[4:]
for i := 0; i < len(a.props); i++ { for i := 0; i < len(a.Props); i++ {
dst = amfPropEncode(&a.props[i], dst) dst = PropEncode(&a.Props[i], dst)
if dst == nil { if dst == nil {
// ??? log.Println("amfEncodeArray: failed to encode property!") // ??? log.Println("EncodeArray: failed to encode property!")
break break
} }
} }
@ -502,48 +509,48 @@ func amfEncodeArray(a *AMF, dst []byte) []byte {
return dst return dst
} }
func amfDecode(a *AMF, data []byte, bDecodeName int32) int32 { func Decode(a *AMF, data []byte, bDecodeName int32) int32 {
nOriginalSize := len(data) nOriginalSize := len(data)
a.props = a.props[:0] a.Props = a.Props[:0]
for len(data) != 0 { for len(data) != 0 {
if len(data) >= 3 && amfDecodeInt24(data[:3]) == amfObjectEnd { if len(data) >= 3 && DecodeInt24(data[:3]) == ObjectEnd {
data = data[3:] data = data[3:]
break break
} }
var prop AMFProperty var prop Property
nRes := amfProDecode(&prop, data, bDecodeName) nRes := PropDecode(&prop, data, bDecodeName)
// nRes = int32(C.AMFProDecode(&prop, (*byte)(unsafe.Pointer(pBuffer)), // nRes = int32(C.PropDecode(&prop, (*byte)(unsafe.Pointer(pBuffer)),
// int32(nSize), int32(bDecodeName))) // int32(nSize), int32(bDecodeName)))
if nRes == -1 { if nRes == -1 {
return -1 return -1
} }
data = data[nRes:] data = data[nRes:]
a.props = append(a.props, prop) a.Props = append(a.Props, prop)
} }
return int32(nOriginalSize - len(data)) return int32(nOriginalSize - len(data))
} }
func amfGetProp(a *AMF, name string, idx int32) *AMFProperty { func GetProp(a *AMF, name string, idx int32) *Property {
if idx >= 0 { if idx >= 0 {
if idx < int32(len(a.props)) { if idx < int32(len(a.Props)) {
return &a.props[idx] return &a.Props[idx]
} }
} else { } else {
for i, p := range a.props { for i, p := range a.Props {
if p.name == name { if p.name == name {
return &a.props[i] return &a.Props[i]
} }
} }
} }
return &amfPropInvalid return &PropInvalid
} }
func amfReset(a *AMF) { func Reset(a *AMF) {
for i := range a.props { for i := range a.Props {
amfPropReset(&a.props[i]) PropReset(&a.Props[i])
} }
*a = AMF{} *a = AMF{}
} }

View File

@ -37,6 +37,8 @@ package rtmp
import ( import (
"encoding/binary" "encoding/binary"
"io" "io"
"bitbucket.org/ausocean/av/rtmp/amf"
) )
// Packet types. // Packet types.
@ -182,9 +184,9 @@ func (pkt *packet) read(s *Session) error {
hSize := len(hbuf) - len(header) + size hSize := len(hbuf) - len(header) + size
if size >= 3 { if size >= 3 {
pkt.timestamp = amfDecodeInt24(header[:3]) pkt.timestamp = amf.DecodeInt24(header[:3])
if size >= 6 { if size >= 6 {
pkt.bodySize = amfDecodeInt24(header[3:6]) pkt.bodySize = amf.DecodeInt24(header[3:6])
pkt.bytesRead = 0 pkt.bytesRead = 0
if size > 6 { if size > 6 {
@ -205,7 +207,7 @@ func (pkt *packet) read(s *Session) error {
return err return err
} }
// TODO: port this // TODO: port this
pkt.timestamp = amfDecodeInt32(header[size : size+4]) pkt.timestamp = amf.DecodeInt32(header[size : size+4])
hSize += 4 hSize += 4
} }
@ -392,12 +394,12 @@ func (pkt *packet) write(s *Session, queue bool) error {
if ts > 0xffffff { if ts > 0xffffff {
res = 0xffffff res = 0xffffff
} }
amfEncodeInt24(headBytes[headerIdx:], int32(res)) amf.EncodeInt24(headBytes[headerIdx:], int32(res))
headerIdx += 3 // 24bits headerIdx += 3 // 24bits
} }
if headerSizes[pkt.headerType] > 4 { if headerSizes[pkt.headerType] > 4 {
amfEncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) amf.EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
headerIdx += 3 // 24bits headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType headBytes[headerIdx] = pkt.packetType
headerIdx++ headerIdx++
@ -409,7 +411,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
if ts >= 0xffffff { if ts >= 0xffffff {
amfEncodeInt32(headBytes[headerIdx:], int32(ts)) amf.EncodeInt32(headBytes[headerIdx:], int32(ts))
headerIdx += 4 // 32bits headerIdx += 4 // 32bits
} }
@ -479,7 +481,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
if ts >= 0xffffff { if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:] extendedTimestamp := headBytes[origIdx+1+cSize:]
amfEncodeInt32(extendedTimestamp[:4], int32(ts)) amf.EncodeInt32(extendedTimestamp[:4], int32(ts))
} }
} }
} }
@ -487,12 +489,12 @@ func (pkt *packet) write(s *Session, queue bool) error {
// We invoked a remote method // We invoked a remote method
if pkt.packetType == packetTypeInvoke { if pkt.packetType == packetTypeInvoke {
buf := pkt.body[1:] buf := pkt.body[1:]
meth := amfDecodeString(buf) meth := amf.DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth) s.log(DebugLevel, pkg+"invoking method "+meth)
// keep it in call queue till result arrives // keep it in call queue till result arrives
if queue { if queue {
buf = buf[3+len(meth):] buf = buf[3+len(meth):]
txn := int32(amfDecodeNumber(buf[:8])) txn := int32(amf.DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
} }
} }

View File

@ -42,6 +42,8 @@ import (
"net" "net"
"strconv" "strconv"
"time" "time"
"bitbucket.org/ausocean/av/rtmp/amf"
) )
const ( const (
@ -252,20 +254,20 @@ func handlePacket(s *Session, pkt *packet) error {
switch pkt.packetType { switch pkt.packetType {
case packetTypeChunkSize: case packetTypeChunkSize:
if pkt.bodySize >= 4 { if pkt.bodySize >= 4 {
s.inChunkSize = int32(amfDecodeInt32(pkt.body[:4])) s.inChunkSize = int32(amf.DecodeInt32(pkt.body[:4]))
} }
case packetTypeBytesReadReport: case packetTypeBytesReadReport:
s.serverBW = int32(amfDecodeInt32(pkt.body[:4])) s.serverBW = int32(amf.DecodeInt32(pkt.body[:4]))
case packetTypeControl: case packetTypeControl:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl")
case packetTypeServerBW: case packetTypeServerBW:
s.serverBW = int32(amfDecodeInt32(pkt.body[:4])) s.serverBW = int32(amf.DecodeInt32(pkt.body[:4]))
case packetTypeClientBW: case packetTypeClientBW:
s.clientBW = int32(amfDecodeInt32(pkt.body[:4])) s.clientBW = int32(amf.DecodeInt32(pkt.body[:4]))
if pkt.bodySize > 4 { if pkt.bodySize > 4 {
s.clientBW2 = pkt.body[4] s.clientBW2 = pkt.body[4]
} else { } else {
@ -312,72 +314,72 @@ func sendConnectPacket(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avConnect) enc = amf.EncodeString(enc, avConnect)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes += 1 s.numInvokes += 1
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfObject enc[0] = amf.Object
enc = enc[1:] enc = enc[1:]
enc = amfEncodeNamedString(enc, avApp, s.link.app) enc = amf.EncodeNamedString(enc, avApp, s.link.app)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
if s.link.protocol&featureWrite != 0 { if s.link.protocol&featureWrite != 0 {
enc = amfEncodeNamedString(enc, avType, avNonprivate) enc = amf.EncodeNamedString(enc, avType, avNonprivate)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
} }
if s.link.flashVer != "" { if s.link.flashVer != "" {
enc = amfEncodeNamedString(enc, avFlashver, s.link.flashVer) enc = amf.EncodeNamedString(enc, avFlashver, s.link.flashVer)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
} }
if s.link.swfUrl != "" { if s.link.swfUrl != "" {
enc = amfEncodeNamedString(enc, avSwfUrl, s.link.swfUrl) enc = amf.EncodeNamedString(enc, avSwfUrl, s.link.swfUrl)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
} }
if s.link.tcUrl != "" { if s.link.tcUrl != "" {
enc = amfEncodeNamedString(enc, avTcUrl, s.link.tcUrl) enc = amf.EncodeNamedString(enc, avTcUrl, s.link.tcUrl)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
} }
if s.link.protocol&featureWrite == 0 { if s.link.protocol&featureWrite == 0 {
enc = amfEncodeNamedBoolean(enc, avFpad, false) enc = amf.EncodeNamedBoolean(enc, avFpad, false)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc = amfEncodeNamedNumber(enc, avCapabilities, 15) enc = amf.EncodeNamedNumber(enc, avCapabilities, 15)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc = amfEncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) enc = amf.EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc = amfEncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) enc = amf.EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc = amfEncodeNamedNumber(enc, avVideoFunction, 1) enc = amf.EncodeNamedNumber(enc, avVideoFunction, 1)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
if s.link.pageUrl != "" { if s.link.pageUrl != "" {
enc = amfEncodeNamedString(enc, avPageUrl, s.link.pageUrl) enc = amf.EncodeNamedString(enc, avPageUrl, s.link.pageUrl)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -385,29 +387,29 @@ func sendConnectPacket(s *Session) error {
} }
if s.encoding != 0.0 || s.sendEncoding { if s.encoding != 0.0 || s.sendEncoding {
enc = amfEncodeNamedNumber(enc, avObjectEncoding, s.encoding) enc = amf.EncodeNamedNumber(enc, avObjectEncoding, s.encoding)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
} }
copy(enc, []byte{0, 0, amfObjectEnd}) copy(enc, []byte{0, 0, amf.ObjectEnd})
enc = enc[3:] enc = enc[3:]
// add auth string // add auth string
if s.link.auth != "" { if s.link.auth != "" {
enc = amfEncodeBoolean(enc, s.link.flags&linkAuth != 0) enc = amf.EncodeBoolean(enc, s.link.flags&linkAuth != 0)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc = amfEncodeString(enc, s.link.auth) enc = amf.EncodeString(enc, s.link.auth)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
} }
for i := range s.link.extras.props { for i := range s.link.extras.Props {
enc = amfPropEncode(&s.link.extras.props[i], enc) enc = amf.PropEncode(&s.link.extras.Props[i], enc)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -429,16 +431,16 @@ func sendCreateStream(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avCreatestream) enc = amf.EncodeString(enc, avCreatestream)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
@ -457,18 +459,18 @@ func sendReleaseStream(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avReleasestream) enc = amf.EncodeString(enc, avReleasestream)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
enc = amfEncodeString(enc, s.link.playpath) enc = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -488,18 +490,18 @@ func sendFCPublish(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avFCPublish) enc = amf.EncodeString(enc, avFCPublish)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
enc = amfEncodeString(enc, s.link.playpath) enc = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -520,18 +522,18 @@ func sendFCUnpublish(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avFCUnpublish) enc = amf.EncodeString(enc, avFCUnpublish)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
enc = amfEncodeString(enc, s.link.playpath) enc = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -552,22 +554,22 @@ func sendPublish(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avPublish) enc = amf.EncodeString(enc, avPublish)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
enc = amfEncodeString(enc, s.link.playpath) enc = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc = amfEncodeString(enc, avLive) enc = amf.EncodeString(enc, avLive)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -588,18 +590,18 @@ func sendDeleteStream(s *Session, dStreamId float64) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, avDeletestream) enc = amf.EncodeString(enc, avDeletestream)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
enc = amfEncodeNumber(enc, dStreamId) enc = amf.EncodeNumber(enc, dStreamId)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -621,7 +623,7 @@ func sendBytesReceived(s *Session) error {
enc := pkt.body enc := pkt.body
s.nBytesInSent = s.nBytesIn s.nBytesInSent = s.nBytesIn
enc = amfEncodeInt32(enc, s.nBytesIn) enc = amf.EncodeInt32(enc, s.nBytesIn)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
@ -641,16 +643,16 @@ func sendCheckBW(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = amfEncodeString(enc, av_checkbw) enc = amf.EncodeString(enc, av_checkbw)
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
s.numInvokes++ s.numInvokes++
enc = amfEncodeNumber(enc, float64(s.numInvokes)) enc = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if enc == nil {
return errEncoding return errEncoding
} }
enc[0] = amfNull enc[0] = amf.Null
enc = enc[1:] enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
@ -670,15 +672,15 @@ func handleInvoke(s *Session, body []byte) error {
if body[0] != 0x02 { if body[0] != 0x02 {
return errInvalidBody return errInvalidBody
} }
var obj AMF var obj amf.AMF
nRes := amfDecode(&obj, body, 0) nRes := amf.Decode(&obj, body, 0)
if nRes < 0 { if nRes < 0 {
return errDecoding return errDecoding
} }
meth := amfPropGetString(amfGetProp(&obj, "", 0)) meth := amf.PropGetString(amf.GetProp(&obj, "", 0))
s.log(DebugLevel, pkg+"invoking method "+meth) s.log(DebugLevel, pkg+"invoking method "+meth)
txn := amfPropGetNumber(amfGetProp(&obj, "", 1)) txn := amf.PropGetNumber(amf.GetProp(&obj, "", 1))
switch meth { switch meth {
case av_result: case av_result:
@ -718,7 +720,7 @@ func handleInvoke(s *Session, body []byte) error {
} }
case avCreatestream: case avCreatestream:
s.streamID = int32(amfPropGetNumber(amfGetProp(&obj, "", 3))) s.streamID = int32(amf.PropGetNumber(amf.GetProp(&obj, "", 3)))
if s.link.protocol&featureWrite == 0 { if s.link.protocol&featureWrite == 0 {
return errNotWritable return errNotWritable
} }
@ -755,10 +757,10 @@ func handleInvoke(s *Session, body []byte) error {
s.log(FatalLevel, pkg+"unsupported method avClose") s.log(FatalLevel, pkg+"unsupported method avClose")
case avOnStatus: case avOnStatus:
var obj2 AMF var obj2 amf.AMF
amfPropGetObject(amfGetProp(&obj, "", 3), &obj2) amf.PropGetObject(amf.GetProp(&obj, "", 3), &obj2)
code := amfPropGetString(amfGetProp(&obj2, avCode, -1)) code := amf.PropGetString(amf.GetProp(&obj2, avCode, -1))
level := amfPropGetString(amfGetProp(&obj2, avLevel, -1)) level := amf.PropGetString(amf.GetProp(&obj2, avLevel, -1))
s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level)
switch code { switch code {
@ -797,7 +799,7 @@ func handleInvoke(s *Session, body []byte) error {
s.log(FatalLevel, pkg+"unknown method "+meth) s.log(FatalLevel, pkg+"unknown method "+meth)
} }
leave: leave:
amfReset(&obj) amf.Reset(&obj)
return nil return nil
} }

View File

@ -37,6 +37,8 @@ import (
"io" "io"
"net" "net"
"time" "time"
"bitbucket.org/ausocean/av/rtmp/amf"
) )
// Session holds the state for an RTMP session. // Session holds the state for an RTMP session.
@ -79,7 +81,7 @@ type link struct {
auth string auth string
flashVer string flashVer string
token string token string
extras AMF extras amf.AMF
flags int32 flags int32
swfAge int32 swfAge int32
protocol int32 protocol int32
@ -187,8 +189,8 @@ func (s *Session) Write(data []byte) (int, error) {
pkt := packet{ pkt := packet{
packetType: data[0], packetType: data[0],
bodySize: amfDecodeInt24(data[1:4]), bodySize: amf.DecodeInt24(data[1:4]),
timestamp: amfDecodeInt24(data[4:7]) | uint32(data[7])<<24, timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: chanSource, channel: chanSource,
info: s.streamID, info: s.streamID,
} }