diff --git a/rtmp/amf.go b/rtmp/amf.go deleted file mode 100644 index d46ea16e..00000000 --- a/rtmp/amf.go +++ /dev/null @@ -1,579 +0,0 @@ -/* -NAME - amf.go - -DESCRIPTION - See Readme.md - -AUTHORS - Saxon Nelson-Milton - Dan Kortschak - Jake Lane - -LICENSE - amf.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -import ( - "encoding/binary" - "log" - "math" -) - -var ( - AMFObj_Invalid C_AMFObject - AMFProp_Invalid = C_AMFObjectProperty{p_type: AMF_INVALID} -) - -const ( - AMF3_INTEGER_MAX = 268435455 - AMF3_INTEGER_MIN = -268435456 -) - -// unsigned short AMF_DecodeInt16(const char* data); -// amf.c +41 -func C_AMF_DecodeInt16(data []byte) uint16 { - return uint16(data[0])<<8 | uint16(data[1]) -} - -// unsigned int AMF_DecodeInt24(const char* data); -// amf.c +50 -func C_AMF_DecodeInt24(data []byte) uint32 { - return uint32(data[0])<<16 | uint32(data[1])<<8 | uint32(data[2]) -} - -// unsigned int AMF_DeocdeInt32(const char* data); -// amf.c +59 -func C_AMF_DecodeInt32(data []byte) uint32 { - return uint32(data[0])<<24 | uint32(data[1])<<16 | uint32(data[2])<<8 | uint32(data[3]) -} - -// void AMF_DecodeString(const char* data, C_AVal* bv); -// amf.c +68 -func C_AMF_DecodeString(data []byte) string { - n := C_AMF_DecodeInt16(data) - return string(data[2 : 2+n]) -} - -// void AMF_DecodeLongString(const char *data, AVal *bv); -// amf.c +75 -func C_AMF_DecodeLongString(data []byte) string { - n := C_AMF_DecodeInt32(data) - return string(data[2 : 2+n]) -} - -// double AMF_DecodeNumber(const char* data); -// amf.c +82 -func C_AMF_DecodeNumber(data []byte) float64 { - return math.Float64frombits(binary.BigEndian.Uint64(data)) -} - -// int AMF_DecodeBoolean(const char *data); -// amf.c +132 -func C_AMF_DecodeBoolean(data []byte) bool { - return data[0] != 0 -} - -// char* AMF_EncodeInt24(char* output, char* outend, int nVal); -// amf.c +149 -func C_AMF_EncodeInt24(dst []byte, val int32) []byte { - if len(dst) < 3 { - return nil - } - _ = dst[2] - dst[0] = byte(val >> 16) - dst[1] = byte(val >> 8) - dst[2] = byte(val) - if len(dst) == 3 { - return nil - } - return dst[3:] -} - -// char* AMF_EncodeInt32(char* output, char* outend, int nVal); -// amf.c +160 -func C_AMF_EncodeInt32(dst []byte, val int32) []byte { - if len(dst) < 4 { - return nil - } - binary.BigEndian.PutUint32(dst, uint32(val)) - if len(dst) == 4 { - return nil - } - return dst[4:] -} - -// char* AMF_EncodeString(char* output, char* outend, const C_AVal* bv); -// amf.c +173 -func C_AMF_EncodeString(dst []byte, val string) []byte { - const typeSize = 1 - if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(dst) { - return nil - } - if len(val)+typeSize+binary.Size(int32(0)) > len(dst) { - return nil - } - - if len(val) < 65536 { - dst[0] = AMF_STRING - dst = dst[1:] - binary.BigEndian.PutUint16(dst[:2], uint16(len(val))) - dst = dst[2:] - copy(dst, val) - if len(dst) == len(val) { - return nil - } - return dst[len(val):] - } - dst[0] = AMF_LONG_STRING - dst = dst[1:] - binary.BigEndian.PutUint32(dst[:4], uint32(len(val))) - dst = dst[4:] - copy(dst, val) - if len(dst) == len(val) { - return nil - } - return dst[len(val):] -} - -// char* AMF_EncodeNumber(char* output, char* outend, double dVal); -// amf.c +199 -func C_AMF_EncodeNumber(dst []byte, val float64) []byte { - if len(dst) < 9 { - return nil - } - dst[0] = AMF_NUMBER - dst = dst[1:] - binary.BigEndian.PutUint64(dst, math.Float64bits(val)) - return dst[8:] -} - -// char* AMF_EncodeBoolean(char* output, char* outend, int bVal); -// amf.c +260 -func C_AMF_EncodeBoolean(dst []byte, val bool) []byte { - if len(dst) < 2 { - return nil - } - dst[0] = AMF_BOOLEAN - if val { - dst[1] = 1 - } - if len(dst) == 2 { - return nil - } - return dst[2:] - -} - -// char* AMF_EncodeNamedString(char* output, char* outend, const C_AVal* strName, const C_AVal* strValue); -// amf.c +273 -func C_AMF_EncodeNamedString(dst []byte, key, val string) []byte { - if 2+len(key) > len(dst) { - return nil - } - binary.BigEndian.PutUint16(dst[:2], uint16(len(key))) - dst = dst[2:] - copy(dst, key) - if len(key) == len(dst) { - return nil - } - return C_AMF_EncodeString(dst[len(key):], val) -} - -// char* AMF_EncodeNamedNumber(char* output, char* outend, const C_AVal* strName, double dVal); -// amf.c +286 -func C_AMF_EncodeNamedNumber(dst []byte, key string, val float64) []byte { - if 2+len(key) > len(dst) { - return nil - } - binary.BigEndian.PutUint16(dst[:2], uint16(len(key))) - dst = dst[2:] - copy(dst, key) - if len(key) == len(dst) { - return nil - } - return C_AMF_EncodeNumber(dst[len(key):], val) -} - -// char* AMF_EncodeNamedBoolean(char* output, char* outend, const C_AVal* strname, int bVal); -// amf.c +299 -func C_AMF_EncodeNamedBoolean(dst []byte, key string, val bool) []byte { - if 2+len(key) > len(dst) { - return nil - } - binary.BigEndian.PutUint16(dst[:2], uint16(len(key))) - dst = dst[2:] - copy(dst, key) - if len(key) == len(dst) { - return nil - } - return C_AMF_EncodeBoolean(dst[len(key):], val) -} - -// void AMFProp_SetName(AMFObjectProperty *prop, AVal *name); -// amf.c +318 -func C_AMFProp_SetName(prop *C_AMFObjectProperty, name string) { - prop.p_name = name -} - -// double AMFProp_GetNumber(AMFObjectProperty* prop); -// amf.c +330 -func C_AMFProp_GetNumber(prop *C_AMFObjectProperty) float64 { - return prop.p_vu.p_number -} - -// void AMFProp_GetString(AMFObjectProperty* prop, AVal* str); -// amf.c +341 -func C_AMFProp_GetString(prop *C_AMFObjectProperty) string { - if prop.p_type == AMF_STRING { - return prop.p_vu.p_aval - } - return "" -} - -// void AMFProp_GetObject(AMFObjectProperty *prop, AMFObject *obj); -// amf.c +351 -func C_AMFProp_GetObject(prop *C_AMFObjectProperty, obj *C_AMFObject) { - if prop.p_type == AMF_OBJECT { - *obj = prop.p_vu.p_object - } else { - *obj = AMFObj_Invalid - } -} - -// char* AMFPropEncode(AMFOBjectProperty* prop, char* pBufer, char* pBufEnd); -// amf.c +366 -func C_AMF_PropEncode(p *C_AMFObjectProperty, dst []byte) []byte { - if p.p_type == AMF_INVALID { - return nil - } - - if p.p_type != AMF_NULL && len(p.p_name)+2+1 >= len(dst) { - return nil - } - - if p.p_type != AMF_NULL && len(p.p_name) != 0 { - binary.BigEndian.PutUint16(dst[:2], uint16(len(p.p_name))) - dst = dst[2:] - copy(dst, p.p_name) - dst = dst[len(p.p_name):] - } - - switch p.p_type { - case AMF_NUMBER: - dst = C_AMF_EncodeNumber(dst, p.p_vu.p_number) - case AMF_BOOLEAN: - dst = C_AMF_EncodeBoolean(dst, p.p_vu.p_number != 0) - case AMF_STRING: - dst = C_AMF_EncodeString(dst, p.p_vu.p_aval) - case AMF_NULL: - if len(dst) < 2 { - return nil - } - dst[0] = AMF_NULL - dst = dst[1:] - case AMF_OBJECT: - dst = C_AMF_Encode(&p.p_vu.p_object, dst) - case AMF_ECMA_ARRAY: - dst = C_AMF_EncodeEcmaArray(&p.p_vu.p_object, dst) - case AMF_STRICT_ARRAY: - dst = C_AMF_EncodeArray(&p.p_vu.p_object, dst) - default: - log.Println("C_AMF_PropEncode: invalid type!") - dst = nil - } - return dst -} - -// int AMFProp_Decode(C_AMFObjectProperty* prop, const char* pBuffer, int nSize, int bDecodeName); -// amf.c +619 -func C_AMFProp_Decode(prop *C_AMFObjectProperty, data []byte, bDecodeName int32) int32 { - prop.p_name = "" - - nOriginalSize := len(data) - if len(data) == 0 { - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: Empty buffer/no buffer pointer!", __FUNCTION__); - return -1 - } - - if bDecodeName != 0 && len(data) < 4 { - // at least name (length + at least 1 byte) and 1 byte of data - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: Not enough data for decoding with name, less than 4 bytes!",__FUNCTION__); - return -1 - } - - if bDecodeName != 0 { - nNameSize := C_AMF_DecodeInt16(data[:2]) - if int(nNameSize) > len(data)-2 { - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2",__FUNCTION__, nNameSize, nSize); - return -1 - } - - prop.p_name = C_AMF_DecodeString(data) - data = data[2+nNameSize:] - } - - if len(data) == 0 { - return -1 - } - - prop.p_type = C_AMFDataType(data[0]) - data = data[1:] - - var nRes int32 - switch prop.p_type { - case AMF_NUMBER: - if len(data) < 8 { - return -1 - } - prop.p_vu.p_number = C_AMF_DecodeNumber(data[:8]) - data = data[8:] - - case AMF_BOOLEAN: - panic("AMF_BOOLEAN not supported") - - case AMF_STRING: - nStringSize := C_AMF_DecodeInt16(data[:2]) - if len(data) < int(nStringSize+2) { - return -1 - } - prop.p_vu.p_aval = C_AMF_DecodeString(data) - data = data[2+nStringSize:] - - case AMF_OBJECT: - nRes := C_AMF_Decode(&prop.p_vu.p_object, data, 1) - if nRes == -1 { - return -1 - } - data = data[nRes:] - - case AMF_MOVIECLIP: - // TODO use new logger here - log.Println("AMFProp_Decode: MAF_MOVIECLIP reserved!") - //RTMP_Log(RTMP_LOGERROR, "AMF_MOVIECLIP reserved!"); - return -1 - - case AMF_NULL, AMF_UNDEFINED, AMF_UNSUPPORTED: - prop.p_type = AMF_NULL - - case AMF_REFERENCE: - // TODO use new logger here - log.Println("AMFProp_Decode: AMF_REFERENCE not supported!") - //RTMP_Log(RTMP_LOGERROR, "AMF_REFERENCE not supported!"); - return -1 - - case AMF_ECMA_ARRAY: - // next comes the rest, mixed array has a final 0x000009 mark and names, so its an object - data = data[4:] - nRes = C_AMF_Decode(&prop.p_vu.p_object, data, 1) - if nRes == -1 { - return -1 - } - data = data[nRes:] - - case AMF_OBJECT_END: - return -1 - - case AMF_STRICT_ARRAY: - panic("AMF_STRICT_ARRAY not supported") - - case AMF_DATE: - panic("AMF_DATE not supported") - - case AMF_LONG_STRING, AMF_XML_DOC: - panic("AMF_LONG_STRING, AMF_XML_DOC not supported") - - case AMF_RECORDSET: - // TODO use new logger here - log.Println("AMFProp_Decode: AMF_RECORDSET reserved!") - //RTMP_Log(RTMP_LOGERROR, "AMF_RECORDSET reserved!"); - return -1 - - case AMF_TYPED_OBJECT: - // TODO use new logger here - // RTMP_Log(RTMP_LOGERROR, "AMF_TYPED_OBJECT not supported!") - return -1 - - case AMF_AVMPLUS: - panic("AMF_AVMPLUS not supported") - - default: - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s - unknown datatype 0x%02x, @%p", __FUNCTION__, - //prop.p_type, pBuffer - 1); - return -1 - } - - return int32(nOriginalSize - len(data)) -} - -// void AMFProp_Reset(AMFObjectProperty* prop); -// amf.c +875 -func C_AMFProp_Reset(prop *C_AMFObjectProperty) { - if prop.p_type == AMF_OBJECT || prop.p_type == AMF_ECMA_ARRAY || - prop.p_type == AMF_STRICT_ARRAY { - C_AMF_Reset(&prop.p_vu.p_object) - } else { - prop.p_vu.p_aval = "" - } - prop.p_type = AMF_INVALID -} - -// char* AMF_Encode(AMFObject* obj, char* pBuffer, char* pBufEnd); -// amf.c +891 -func C_AMF_Encode(obj *C_AMFObject, dst []byte) []byte { - if len(dst) < 5 { - return nil - } - - dst[0] = AMF_OBJECT - dst = dst[1:] - - for i := 0; i < len(obj.o_props); i++ { - dst = C_AMF_PropEncode(&obj.o_props[i], dst) - if dst == nil { - log.Println("C_AMF_Encode: failed to encode property in index") - break - } - } - - if len(dst) < 4 { - return nil - } - return C_AMF_EncodeInt24(dst, AMF_OBJECT_END) -} - -// char* AMF_EncodeEcmaArray(AMFObject* obj, char* pBuffer, char* pBufEnd); -// amf.c +924 -func C_AMF_EncodeEcmaArray(obj *C_AMFObject, dst []byte) []byte { - if len(dst) < 5 { - return nil - } - - dst[0] = AMF_ECMA_ARRAY - dst = dst[1:] - binary.BigEndian.PutUint32(dst[:4], uint32(len(obj.o_props))) - dst = dst[4:] - - for i := 0; i < len(obj.o_props); i++ { - dst = C_AMF_PropEncode(&obj.o_props[i], dst) - if dst == nil { - log.Println("C_AMF_EncodeEcmaArray: failed to encode property!") - break - } - } - - if len(dst) < 4 { - return nil - } - return C_AMF_EncodeInt24(dst, AMF_OBJECT_END) -} - -// char* AMF_EncodeArray(AMFObject* obj, char* pBuffer, char* pBufEnd); -// amf.c +959 -func C_AMF_EncodeArray(obj *C_AMFObject, dst []byte) []byte { - if len(dst) < 5 { - return nil - } - - dst[0] = AMF_STRICT_ARRAY - dst = dst[1:] - binary.BigEndian.PutUint32(dst[:4], uint32(len(obj.o_props))) - dst = dst[4:] - - for i := 0; i < len(obj.o_props); i++ { - dst = C_AMF_PropEncode(&obj.o_props[i], dst) - if dst == nil { - log.Println("C_AMF_EncodeArray: failed to encode property!") - break - } - } - - return dst -} - -// int AMF_Decode(AMFObject *obj, const char* pBuffer, int nSize, int bDecodeName); -// amf.c +1180 -func C_AMF_Decode(obj *C_AMFObject, data []byte, bDecodeName int32) int32 { - nOriginalSize := len(data) - - obj.o_props = obj.o_props[:0] - for len(data) != 0 { - if len(data) >= 3 && C_AMF_DecodeInt24(data[:3]) == AMF_OBJECT_END { - data = data[3:] - break - } - - var prop C_AMFObjectProperty - nRes := C_AMFProp_Decode(&prop, data, bDecodeName) - // nRes = int32(C.AMFProp_Decode(&prop, (*byte)(unsafe.Pointer(pBuffer)), - // int32(nSize), int32(bDecodeName))) - if nRes == -1 { - return -1 - } - data = data[nRes:] - obj.o_props = append(obj.o_props, prop) - } - - return int32(nOriginalSize - len(data)) -} - -// AMFObjectProperty* AMF_GetProp(AMFObject *obj, const AVal* name, int nIndex); -// amf.c + 1249 -func C_AMF_GetProp(obj *C_AMFObject, name string, idx int32) *C_AMFObjectProperty { - if idx >= 0 { - if idx < int32(len(obj.o_props)) { - return &obj.o_props[idx] - } - } else { - for i, p := range obj.o_props { - if p.p_name == name { - return &obj.o_props[i] - } - } - } - return &AMFProp_Invalid -} - -// void AMF_Reset(AMFObject* obj); -// amf.c +1282 -func C_AMF_Reset(obj *C_AMFObject) { - for i := range obj.o_props { - C_AMFProp_Reset(&obj.o_props[i]) - } - obj.o_props = obj.o_props[:0] -} - -/* -// void AMF3CD_AddProp(AMF3ClassDef *cd, AVal *prop); -// amf.c +1298 -func AMF3CD_AddProp(cd *C.AMF3ClassDef, prop *C_AVal) { - if cd.cd_num&0x0f == 0 { - cd.cd_props = (*C_AVal)(realloc(unsafe.Pointer(cd.cd_props), int(uintptr(cd.cd_num+16)*unsafe.Sizeof(C_AVal{})))) - } - *(*C_AVal)(incPtr(unsafe.Pointer(cd.cd_props), int(cd.cd_num), int(unsafe.Sizeof(C_AVal{})))) = *prop - cd.cd_num++ -} -*/ diff --git a/rtmp/amf/amf.go b/rtmp/amf/amf.go new file mode 100644 index 00000000..837531bb --- /dev/null +++ b/rtmp/amf/amf.go @@ -0,0 +1,498 @@ +/* +NAME + amf.go + +DESCRIPTION + Action Message Format (AMF) encoding/decoding functions. + See https://en.wikipedia.org/wiki/Action_Message_Format. + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + Jake Lane + Alan Noble + +LICENSE + amf.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + + Derived from librtmp under the GNU Lesser General Public License 2.1 + Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org + Copyright (C) 2008-2009 Andrej Stepanchuk + Copyright (C) 2009-2010 Howard Chu +*/ + +// Package amf implements Action Message Format (AMF) encoding and decoding. +// In AMF, encoding of numbers is big endian by default, unless specified otherwise, +// and numbers are all unsigned. +// See https://en.wikipedia.org/wiki/Action_Message_Format. +package amf + +import ( + "encoding/binary" + "errors" + "math" +) + +// AMF data types, as defined by the AMF specification. +// NB: we export these sparingly. +const ( + typeNumber = 0x00 + typeBoolean = 0x01 + typeString = 0x02 + TypeObject = 0x03 + typeMovieClip = 0x04 + TypeNull = 0x05 + typeUndefined = 0x06 + typeReference = 0x07 + typeEcmaArray = 0x08 + TypeObjectEnd = 0x09 + typeStrictArray = 0x0A + typeDate = 0x0B + typeLongString = 0x0C + typeUnsupported = 0x0D + typeRecordset = 0x0E + typeXmlDoc = 0x0F + typeTypedObject = 0x10 + typeAvmplus = 0x11 + typeInvalid = 0xff +) + +// AMF represents an AMF object, which is simply a collection of properties. +type Object struct { + Properties []Property +} + +// Property represents an AMF property, which is effectively a +// union. The Type is the AMF data type (uint8 per the specification), +// and specifies which member holds a value. Numeric types use +// Number, string types use String and arrays and objects use +// Object. The Name is optional. +type Property struct { + Type uint8 + + Name string + Number float64 + String string + Object Object +} + +// AMF errors: +var ( + ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. + ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. + ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. + ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. +) + +// DecodeInt16 decodes a 16-bit integer. +func DecodeInt16(buf []byte) uint16 { + return binary.BigEndian.Uint16(buf) +} + +// DecodeInt24 decodes a 24-bit integer. +func DecodeInt24(buf []byte) uint32 { + return uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2]) +} + +// DecodeInt32 decodes a 32-bit integer. +func DecodeInt32(buf []byte) uint32 { + return binary.BigEndian.Uint32(buf) +} + +// DecodeInt32LE decodes a 32-bit little-endian integer. +func DecodeInt32LE(buf []byte) uint32 { + return binary.LittleEndian.Uint32(buf) +} + +// DecodeString decodes a string that is less than 2^16 bytes long. +func DecodeString(buf []byte) string { + n := DecodeInt16(buf) + return string(buf[2 : 2+n]) +} + +// DecodeLongString decodes a long string. +func DecodeLongString(buf []byte) string { + n := DecodeInt32(buf) + return string(buf[2 : 2+n]) +} + +// DecodeNumber decodes a 64-bit floating-point number. +func DecodeNumber(buf []byte) float64 { + return math.Float64frombits(binary.BigEndian.Uint64(buf)) +} + +// DecodeBoolean decodes a boolean. +func DecodeBoolean(buf []byte) bool { + return buf[0] != 0 +} + +// EncodeInt24 encodes a 24-bit integer. +func EncodeInt24(buf []byte, val uint32) ([]byte, error) { + if len(buf) < 3 { + return nil, ErrShortBuffer + } + buf[0] = byte(val >> 16) + buf[1] = byte(val >> 8) + buf[2] = byte(val) + return buf[3:], nil +} + +// EncodeInt32 encodes a 32-bit integer. +func EncodeInt32(buf []byte, val uint32) ([]byte, error) { + if len(buf) < 4 { + return nil, ErrShortBuffer + } + binary.BigEndian.PutUint32(buf, val) + return buf[4:], nil +} + +// EncodeString encodes a string. +func EncodeString(buf []byte, val string) ([]byte, error) { + const typeSize = 1 + if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { + return nil, ErrShortBuffer + } + + if len(val)+typeSize+binary.Size(uint32(0)) > len(buf) { + return nil, ErrShortBuffer + } + + if len(val) < 65536 { + buf[0] = typeString + buf = buf[1:] + binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) + buf = buf[2:] + copy(buf, val) + return buf[len(val):], nil + } + + buf[0] = typeLongString + buf = buf[1:] + binary.BigEndian.PutUint32(buf[:4], uint32(len(val))) + buf = buf[4:] + copy(buf, val) + return buf[len(val):], nil +} + +// EncodeNumber encodes a 64-bit floating-point number. +func EncodeNumber(buf []byte, val float64) ([]byte, error) { + if len(buf) < 9 { + return nil, ErrShortBuffer + } + buf[0] = typeNumber + buf = buf[1:] + binary.BigEndian.PutUint64(buf, math.Float64bits(val)) + return buf[8:], nil +} + +// EncodeBoolean encodes a boolean. +func EncodeBoolean(buf []byte, val bool) ([]byte, error) { + if len(buf) < 2 { + return nil, ErrShortBuffer + } + buf[0] = typeBoolean + if val { + buf[1] = 1 + } else { + buf[1] = 0 + } + return buf[2:], nil + +} + +// EncodeNamedString encodes a named string, where key is the name and val is the string value. +func EncodeNamedString(buf []byte, key, val string) ([]byte, error) { + if 2+len(key) > len(buf) { + return nil, ErrShortBuffer + } + binary.BigEndian.PutUint16(buf[:2], uint16(len(key))) + buf = buf[2:] + copy(buf, key) + return EncodeString(buf[len(key):], val) +} + +// EncodeNamedNumber encodes a named number, where key is the name and val is the number value. +func EncodeNamedNumber(buf []byte, key string, val float64) ([]byte, error) { + if 2+len(key) > len(buf) { + return nil, ErrShortBuffer + } + binary.BigEndian.PutUint16(buf[:2], uint16(len(key))) + buf = buf[2:] + copy(buf, key) + return EncodeNumber(buf[len(key):], val) +} + +// EncodeNamedNumber encodes a named boolean, where key is the name and val is the boolean value. +func EncodeNamedBoolean(buf []byte, key string, val bool) ([]byte, error) { + if 2+len(key) > len(buf) { + return nil, ErrShortBuffer + } + binary.BigEndian.PutUint16(buf[:2], uint16(len(key))) + buf = buf[2:] + copy(buf, key) + return EncodeBoolean(buf[len(key):], val) +} + +// EncodeProperty encodes a property. +func EncodeProperty(prop *Property, buf []byte) ([]byte, error) { + if prop.Type != TypeNull && prop.Name != "" { + if len(buf) < 2+len(prop.Name) { + return nil, ErrShortBuffer + } + binary.BigEndian.PutUint16(buf[:2], uint16(len(prop.Name))) + buf = buf[2:] + copy(buf, prop.Name) + buf = buf[len(prop.Name):] + } + + switch prop.Type { + case typeNumber: + return EncodeNumber(buf, prop.Number) + case typeBoolean: + return EncodeBoolean(buf, prop.Number != 0) + case typeString: + return EncodeString(buf, prop.String) + case TypeNull: + if len(buf) < 2 { + return nil, ErrShortBuffer + } + buf[0] = TypeNull + buf = buf[1:] + case TypeObject: + return Encode(&prop.Object, buf) + case typeEcmaArray: + return EncodeEcmaArray(&prop.Object, buf) + case typeStrictArray: + return EncodeArray(&prop.Object, buf) + default: + return nil, ErrInvalidType + } + return buf, nil +} + +// DecodeProperty decodes a property, returning the number of bytes consumed from the supplied buffer. +func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { + sz := len(buf) + + if decodeName { + if len(buf) < 4 { + return 0, ErrShortBuffer + } + n := DecodeInt16(buf[:2]) + if int(n) > len(buf)-2 { + return 0, ErrShortBuffer + } + + prop.Name = DecodeString(buf) + buf = buf[2+n:] + } else { + prop.Name = "" + } + + prop.Type = buf[0] + buf = buf[1:] + + switch prop.Type { + case typeNumber: + if len(buf) < 8 { + return 0, ErrShortBuffer + } + prop.Number = DecodeNumber(buf[:8]) + buf = buf[8:] + + case typeBoolean: + if len(buf) < 1 { + return 0, ErrShortBuffer + } + prop.Number = float64(buf[0]) + buf = buf[1:] + + case typeString: + n := DecodeInt16(buf[:2]) + if len(buf) < int(n+2) { + return 0, ErrShortBuffer + } + prop.String = DecodeString(buf) + buf = buf[2+n:] + + case TypeObject: + n, err := Decode(&prop.Object, buf, true) + if err != nil { + return 0, err + } + buf = buf[n:] + + case TypeNull, typeUndefined, typeUnsupported: + prop.Type = TypeNull + + case typeEcmaArray: + buf = buf[4:] + n, err := Decode(&prop.Object, buf, true) + if err != nil { + return 0, err + } + buf = buf[n:] + + default: + return 0, ErrUnexpectedType + } + + return sz - len(buf), nil +} + +// Encode encodes an Object into its AMF representation. +// This is the top-level encoding function and is typically the only function callers will need to use. +func Encode(obj *Object, buf []byte) ([]byte, error) { + if len(buf) < 5 { + return nil, ErrShortBuffer + } + + buf[0] = TypeObject + buf = buf[1:] + + for i := 0; i < len(obj.Properties); i++ { + var err error + buf, err = EncodeProperty(&obj.Properties[i], buf) + if err != nil { + return nil, err + } + } + + if len(buf) < 3 { + return nil, ErrShortBuffer + } + return EncodeInt24(buf, TypeObjectEnd) +} + +// EncodeEcmaArray encodes an ECMA array. +func EncodeEcmaArray(obj *Object, buf []byte) ([]byte, error) { + if len(buf) < 5 { + return nil, ErrShortBuffer + } + + buf[0] = typeEcmaArray + buf = buf[1:] + binary.BigEndian.PutUint32(buf[:4], uint32(len(obj.Properties))) + buf = buf[4:] + + for i := 0; i < len(obj.Properties); i++ { + var err error + buf, err = EncodeProperty(&obj.Properties[i], buf) + if err != nil { + return nil, err + } + } + + if len(buf) < 3 { + return nil, ErrShortBuffer + } + return EncodeInt24(buf, TypeObjectEnd) +} + +// EncodeArray encodes an array. +func EncodeArray(obj *Object, buf []byte) ([]byte, error) { + if len(buf) < 5 { + return nil, ErrShortBuffer + } + + buf[0] = typeStrictArray + buf = buf[1:] + binary.BigEndian.PutUint32(buf[:4], uint32(len(obj.Properties))) + buf = buf[4:] + + for i := 0; i < len(obj.Properties); i++ { + var err error + buf, err = EncodeProperty(&obj.Properties[i], buf) + if err != nil { + return nil, err + } + } + + return buf, nil +} + +// Decode decodes an object. Property names are only decoded if decodeName is true. +func Decode(obj *Object, buf []byte, decodeName bool) (int, error) { + sz := len(buf) + + obj.Properties = obj.Properties[:0] + for len(buf) != 0 { + if len(buf) >= 3 && DecodeInt24(buf[:3]) == TypeObjectEnd { + buf = buf[3:] + break + } + var prop Property + n, err := DecodeProperty(&prop, buf, decodeName) + if err != nil { + return 0, err + } + buf = buf[n:] + obj.Properties = append(obj.Properties, prop) + } + + return sz - len(buf), nil +} + +// Object methods: + +// Property returns a property, either by its index when idx is non-negative, or by its name otherwise. +// If the requested property is not found or the type does not match, an ErrPropertyNotFound error is returned. +func (obj *Object) Property(name string, idx int, typ uint8) (*Property, error) { + var prop *Property + if idx >= 0 { + if idx < len(obj.Properties) { + prop = &obj.Properties[idx] + } + } else { + for i, p := range obj.Properties { + if p.Name == name { + prop = &obj.Properties[i] + break + } + } + } + if prop == nil || prop.Type != typ { + return nil, ErrPropertyNotFound + } + return prop, nil +} + +// NumberProperty is a wrapper for Property that returns a Number property's value, if any. +func (obj *Object) NumberProperty(name string, idx int) (float64, error) { + prop, err := obj.Property(name, idx, typeNumber) + if err != nil { + return 0, err + } + return prop.Number, nil +} + +// StringProperty is a wrapper for Property that returns a String property's value, if any. +func (obj *Object) StringProperty(name string, idx int) (string, error) { + prop, err := obj.Property(name, idx, typeString) + if err != nil { + return "", err + } + return prop.String, nil +} + +// ObjectProperty is a wrapper for Property that returns an Object property's value, if any. +func (obj *Object) ObjectProperty(name string, idx int) (*Object, error) { + prop, err := obj.Property(name, idx, TypeObject) + if err != nil { + return nil, err + } + return &prop.Object, nil +} diff --git a/rtmp/amf/amf_test.go b/rtmp/amf/amf_test.go new file mode 100644 index 00000000..59548c09 --- /dev/null +++ b/rtmp/amf/amf_test.go @@ -0,0 +1,308 @@ +/* +NAME + amf_test.go + +DESCRIPTION + AMF test suite. + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + Alan Noble + +LICENSE + amf_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package amf + +import ( + "testing" +) + +// Test data. +var testStrings = [...]string{ + "", + "foo", + "bar", + "bazz", +} + +var testNumbers = [...]uint32{ + 0, + 1, + 0xababab, + 0xffffff, +} + +// TestSanity checks that we haven't accidentally changed constants. +func TestSanity(t *testing.T) { + if TypeObjectEnd != 0x09 { + t.Errorf("TypeObjectEnd has wrong value; got %d, expected %d", TypeObjectEnd, 0x09) + } +} + +// TestStrings tests string encoding and decoding. +func TestStrings(t *testing.T) { + // Short string encoding is as follows: + // enc[0] = data type (typeString) + // end[1:3] = size + // enc[3:] = data + for _, s := range testStrings { + buf := make([]byte, len(s)+7) + _, err := EncodeString(buf, s) + if err != nil { + t.Errorf("EncodeString failed") + } + if buf[0] != typeString { + t.Errorf("Expected typeString, got %v", buf[0]) + } + ds := DecodeString(buf[1:]) + if s != ds { + t.Errorf("DecodeString did not produce original string, got %v", ds) + } + } + // Long string encoding is as follows: + // enc[0] = data type (typeString) + // end[1:5] = size + // enc[5:] = data + s := string(make([]byte, 65536)) + buf := make([]byte, len(s)+7) + _, err := EncodeString(buf, s) + if err != nil { + t.Errorf("EncodeString failed") + } + if buf[0] != typeLongString { + t.Errorf("Expected typeLongString, got %v", buf[0]) + } + ds := DecodeLongString(buf[1:]) + if s != ds { + t.Errorf("DecodeLongString did not produce original string, got %v", ds) + } +} + +// TestNumbers tests number encoding and encoding. +func TestNumbers(t *testing.T) { + for _, n := range testNumbers { + buf := make([]byte, 4) // NB: encoder requires an extra byte for some reason + _, err := EncodeInt24(buf, n) + if err != nil { + t.Errorf("EncodeInt24 failed") + } + dn := DecodeInt24(buf) + if n != dn { + t.Errorf("DecodeInt24 did not produce original Number, got %v", dn) + } + _, err = EncodeInt32(buf, n) + if err != nil { + t.Errorf("EncodeInt32 failed") + } + dn = DecodeInt32(buf) + if n != dn { + t.Errorf("DecodeInt32 did not produce original Number, got %v", dn) + } + } +} + +// TestProperties tests encoding and decoding of properties. +func TestProperties(t *testing.T) { + var buf [1024]byte + + // Encode/decode Number properties. + enc := buf[:] + var err error + for i := range testNumbers { + enc, err = EncodeProperty(&Property{Type: typeNumber, Number: float64(testNumbers[i])}, enc) + if err != nil { + t.Errorf("EncodeProperty of Number failed") + } + + } + + prop := Property{} + dec := buf[:] + for i := range testNumbers { + n, err := DecodeProperty(&prop, dec, false) + if err != nil { + t.Errorf("DecodeProperty of Number failed") + } + if prop.Number != float64(testNumbers[i]) { + t.Errorf("EncodeProperty/DecodeProperty returned wrong Number; got %v, expected %v", int32(prop.Number), testNumbers[i]) + } + dec = dec[n:] + } + + // Encode/decode string properties. + enc = buf[:] + for i := range testStrings { + enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) + if err != nil { + t.Errorf("EncodeProperty of string failed") + } + + } + prop = Property{} + dec = buf[:] + for i := range testStrings { + n, err := DecodeProperty(&prop, dec, false) + if err != nil { + t.Errorf("DecodeProperty of string failed") + } + if prop.String != testStrings[i] { + t.Errorf("EncodeProperty/DecodeProperty returned wrong string; got %s, expected %s", prop.String, testStrings[i]) + } + dec = dec[n:] + } + +} + +// TestObject tests encoding and decoding of objects. +func TestObject(t *testing.T) { + var buf [1024]byte + + // Construct a simple object that has one property, the Number 42. + prop1 := Property{Type: typeNumber, Number: 42} + obj1 := Object{} + obj1.Properties = append(obj1.Properties, prop1) + + // Encode it + enc := buf[:] + var err error + enc, err = Encode(&obj1, enc) + if err != nil { + t.Errorf("Encode of object failed") + } + + // Check the encoding + if buf[0] != TypeObject { + t.Errorf("Encoded wrong type; expected %d, got %v", TypeObject, buf[0]) + } + if buf[1] != typeNumber { + t.Errorf("Encoded wrong type; expected %d, got %v", typeNumber, buf[0]) + } + num := DecodeNumber(buf[2:10]) + if num != 42 { + t.Errorf("Encoded wrong Number") + } + end := int32(DecodeInt24(buf[10:13])) + if end != TypeObjectEnd { + t.Errorf("Did not encode TypeObjectEnd") + } + + // Decode it + dec := buf[1:] + var dobj1 Object + _, err = Decode(&dobj1, dec, false) + if err != nil { + t.Errorf("Decode of object failed") + } + + // Change the object's property to a named boolean. + obj1.Properties[0].Name = "on" + obj1.Properties[0].Type = typeBoolean + obj1.Properties[0].Number = 1 + + // Re-encode it + enc = buf[:] + enc, err = Encode(&obj1, enc) + if err != nil { + t.Errorf("Encode of object failed") + } + + // Decode it, this time with named set to true. + dec = buf[1:] + _, err = Decode(&dobj1, dec, true) + if err != nil { + t.Errorf("Decode of object failed with error: %v", err) + } + if dobj1.Properties[0].Number != 1 { + t.Errorf("Decoded wrong boolean value") + } + + // Construct a more complicated object that includes a nested object. + var obj2 Object + for i := range testStrings { + obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) + obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) + } + obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) + obj2.Properties = append(obj2.Properties, Property{Type: typeBoolean, Number: 1}) + + // Retrieve nested object + obj, err := obj2.ObjectProperty("", 8) + if err != err { + t.Errorf("Failed to retrieve object") + } + if len(obj.Properties) < 1 { + t.Errorf("Properties missing for nested object") + } + if obj.Properties[0].Type != typeBoolean { + t.Errorf("Wrong property type for nested object") + } + + // Encode it. + enc = buf[:] + enc, err = Encode(&obj2, enc) + if err != nil { + t.Errorf("Encode of object failed") + } + + // Decode it. + dec = buf[1:] + var dobj2 Object + _, err = Decode(&dobj2, dec, false) + if err != nil { + t.Errorf("Decode of object failed with error: %v", err) + } + + // Find some properties that exist. + s, err := obj2.StringProperty("", 2) + if err != nil { + t.Errorf("StringProperty failed") + } + if s != "foo" { + t.Errorf("StringProperty returned wrong String") + } + n, err := obj2.NumberProperty("", 3) + if err != nil { + t.Errorf("NumberProperty failed") + } + if n != 1 { + t.Errorf("NumberProperty returned wrong Number") + } + obj, err = obj2.ObjectProperty("", 8) + if err != nil { + t.Errorf("ObjectProperty failed") + return + } + if obj.Properties[0].Type != typeBoolean { + t.Errorf("ObjectProperty returned object with wrong property") + } + prop, err := obj2.Property("", 9, typeBoolean) + if err != nil { + t.Errorf("Property failed") + return + } + if prop.Number != 1 { + t.Errorf("Property returned wrong Property") + } + + // Try to find one that doesn't exist. + prop, err = obj2.Property("", 10, TypeObject) + if err != ErrPropertyNotFound { + t.Errorf("Property(10) failed") + } +} diff --git a/rtmp/amf_headers.go b/rtmp/amf_headers.go deleted file mode 100644 index 453b38de..00000000 --- a/rtmp/amf_headers.go +++ /dev/null @@ -1,81 +0,0 @@ -/* -NAME - amf_headers.go - -DESCRIPTION - See Readme.md - -AUTHORS - Saxon Nelson-Milton - -LICENSE - amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -const ( - AMF_NUMBER = iota - AMF_BOOLEAN - AMF_STRING - AMF_OBJECT - AMF_MOVIECLIP /* reserved, not used */ - AMF_NULL - AMF_UNDEFINED - AMF_REFERENCE - AMF_ECMA_ARRAY - AMF_OBJECT_END - AMF_STRICT_ARRAY - AMF_DATE - AMF_LONG_STRING - AMF_UNSUPPORTED - AMF_RECORDSET /* reserved, not used */ - AMF_XML_DOC - AMF_TYPED_OBJECT - AMF_AVMPLUS /* switch to AMF3 */ - AMF_INVALID = 0xff -) - -// typedef enum -// amf.h +40 -type C_AMFDataType int32 - -// typedef struct AMF_Object -// amf.h +67 -type C_AMFObject struct { - o_props []C_AMFObjectProperty -} - -// typedef struct P_vu -// amf.h +73 -type P_vu struct { - p_number float64 - p_aval string - p_object C_AMFObject -} - -// typedef struct AMFObjectProperty -// amf.h +79 -type C_AMFObjectProperty struct { - p_name string - p_type C_AMFDataType - p_vu P_vu - p_UTCoffset int16 -} diff --git a/rtmp/packet.go b/rtmp/packet.go index dac3d803..b88b15bc 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -37,6 +37,8 @@ package rtmp import ( "encoding/binary" "io" + + "bitbucket.org/ausocean/av/rtmp/amf" ) // Packet types. @@ -89,20 +91,12 @@ type packet struct { info int32 bodySize uint32 bytesRead uint32 - chunk *chunk header []byte body []byte } -// chunk defines an RTMP packet chunk. -type chunk struct { - headerSize int32 - data []byte - header [fullHeaderSize]byte -} - -// read reads a packet. -func (pkt *packet) read(s *Session) error { +// readFrom reads a packet from the RTMP connection. +func (pkt *packet) readFrom(s *Session) error { var hbuf [fullHeaderSize]byte header := hbuf[:] @@ -182,16 +176,15 @@ func (pkt *packet) read(s *Session) error { hSize := len(hbuf) - len(header) + size if size >= 3 { - pkt.timestamp = C_AMF_DecodeInt24(header[:3]) + pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { - pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) + pkt.bodySize = amf.DecodeInt24(header[3:6]) pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] - if size == 11 { - pkt.info = decodeInt32LE(header[7:11]) + pkt.info = int32(amf.DecodeInt32LE(header[7:11])) } } } @@ -204,8 +197,7 @@ func (pkt *packet) read(s *Session) error { s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err } - // TODO: port this - pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4]) + pkt.timestamp = amf.DecodeInt32(header[size : size+4]) hSize += 4 } @@ -213,20 +205,13 @@ func (pkt *packet) read(s *Session) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - toRead := int32(pkt.bodySize - pkt.bytesRead) + toRead := pkt.bodySize - pkt.bytesRead chunkSize := s.inChunkSize if toRead < chunkSize { chunkSize = toRead } - if pkt.chunk != nil { - panic("non-nil chunk") - pkt.chunk.headerSize = int32(hSize) - copy(pkt.chunk.header[:], hbuf[:hSize]) - pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] - } - _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) @@ -235,7 +220,7 @@ func (pkt *packet) read(s *Session) error { pkt.bytesRead += uint32(chunkSize) - // keep the packet as ref for other packets on this channel + // Keep the packet as a reference for other packets on this channel. if s.channelsIn[pkt.channel] == nil { s.channelsIn[pkt.channel] = &packet{} } @@ -245,12 +230,8 @@ func (pkt *packet) read(s *Session) error { s.channelsIn[pkt.channel].timestamp = 0xffffff } - if pkt.bytesRead != pkt.bodySize { - panic("readPacket: bytesRead != bodySize") - } - if !pkt.hasAbsTimestamp { - // timestamps seem to always be relative + // Timestamps seem to always be relative. pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) } s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) @@ -262,6 +243,7 @@ func (pkt *packet) read(s *Session) error { } // resize adjusts the packet's storage to accommodate a body of the given size and header type. +// When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { buf := make([]byte, fullHeaderSize+size) pkt.header = buf @@ -285,10 +267,12 @@ func (pkt *packet) resize(size uint32, ht uint8) { } } -// write sends a packet. +// writeTo writes a packet to the RTMP connection. +// Packets are written in chunks which are Session.chunkSize in length (128 bytes in length). +// We defer sending small audio packets and combine consecutive small audio packets where possible to reduce I/O. // When queue is true, we expect a response to this request and cache the method on s.methodCalls. -func (pkt *packet) write(s *Session, queue bool) error { - if pkt.body == nil { +func (pkt *packet) writeTo(s *Session, queue bool) error { + if pkt.body == nil || pkt.bodySize == 0 { return errInvalidBody } @@ -314,7 +298,7 @@ func (pkt *packet) write(s *Session, queue bool) error { prevPkt := s.channelsOut[pkt.channel] var last int if prevPkt != nil && pkt.headerType != headerSizeLarge { - // compress a bit by using the prev packet's attributes + // Compress header by using the previous packet's attributes. if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { pkt.headerType = headerSizeSmall } @@ -337,7 +321,7 @@ func (pkt *packet) write(s *Session, queue bool) error { hSize := headerSizes[pkt.headerType] origIdx := fullHeaderSize - hSize - // adjust 1 or 2 bytes for the channel + // Adjust 1 or 2 bytes depending on the channel. cSize := 0 switch { case pkt.channel > 319: @@ -351,7 +335,7 @@ func (pkt *packet) write(s *Session, queue bool) error { hSize += cSize } - // adjust 4 bytes for the timestamp + // Adjust 4 bytes for the timestamp. var ts uint32 if prevPkt != nil { ts = uint32(int(pkt.timestamp) - last) @@ -388,16 +372,16 @@ func (pkt *packet) write(s *Session, queue bool) error { } if headerSizes[pkt.headerType] > 1 { - res := ts + tmp := ts if ts > 0xffffff { - res = 0xffffff + tmp = 0xffffff } - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) + amf.EncodeInt24(headBytes[headerIdx:], tmp) headerIdx += 3 // 24bits } if headerSizes[pkt.headerType] > 4 { - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) + amf.EncodeInt24(headBytes[headerIdx:], pkt.bodySize) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ @@ -409,7 +393,7 @@ func (pkt *packet) write(s *Session, queue bool) error { } if ts >= 0xffffff { - C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) + amf.EncodeInt32(headBytes[headerIdx:], ts) headerIdx += 4 // 32bits } @@ -436,7 +420,6 @@ func (pkt *packet) write(s *Session, queue bool) error { } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { if chunkSize > size { @@ -459,6 +442,7 @@ func (pkt *packet) write(s *Session, queue bool) error { hSize = 0 if size > 0 { + // We are writing the 2nd or subsequent chunk. origIdx -= 1 + cSize hSize = 1 + cSize @@ -479,22 +463,19 @@ func (pkt *packet) write(s *Session, queue bool) error { } if ts >= 0xffffff { extendedTimestamp := headBytes[origIdx+1+cSize:] - C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) + amf.EncodeInt32(extendedTimestamp[:4], ts) } } } - // We invoked a remote method - if pkt.packetType == packetTypeInvoke { + // If we invoked a remote method and queue is true, we queue the method until the result arrives. + if pkt.packetType == packetTypeInvoke && queue { buf := pkt.body[1:] - meth := C_AMF_DecodeString(buf) - s.log(DebugLevel, pkg+"invoking method "+meth) - // keep it in call queue till result arrives - if queue { - buf = buf[3+len(meth):] - txn := int32(C_AMF_DecodeNumber(buf[:8])) - s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) - } + meth := amf.DecodeString(buf) + s.log(DebugLevel, pkg+"queuing method "+meth) + buf = buf[3+len(meth):] + txn := int32(amf.DecodeNumber(buf[:8])) + s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) } if s.channelsOut[pkt.channel] == nil { @@ -504,12 +485,3 @@ func (pkt *packet) write(s *Session, queue bool) error { return nil } - -func decodeInt32LE(data []byte) int32 { - return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0]) -} - -func encodeInt32LE(dst []byte, v int32) int32 { - binary.LittleEndian.PutUint32(dst, uint32(v)) - return 4 -} diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 4b334587..11a9d71b 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -42,11 +42,12 @@ import ( "net" "strconv" "time" + + "bitbucket.org/ausocean/av/rtmp/amf" ) const ( pkg = "rtmp:" - minDataSize = 11 // ToDo: this should be the same as fullHeaderSize signatureSize = 1536 fullHeaderSize = 12 ) @@ -151,35 +152,25 @@ var rtmpProtocolStrings = [...]string{ // RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") + errInvalidURL = errors.New("rtmp: invalid URL") errConnected = errors.New("rtmp: already connected") errNotConnected = errors.New("rtmp: not connected") errNotWritable = errors.New("rtmp: connection not writable") - errHandshake = errors.New("rtmp: handshake failed") - errConnSend = errors.New("rtmp: connection send error") - errConnStream = errors.New("rtmp: connection stream error") errInvalidHeader = errors.New("rtmp: invalid header") errInvalidBody = errors.New("rtmp: invalid body") - errTinyPacket = errors.New("rtmp: packet too small") - errEncoding = errors.New("rtmp: encoding error") - errDecoding = errors.New("rtmp: decoding error") + errInvalidFlvTag = errors.New("rtmp: invalid FLV tag") errUnimplemented = errors.New("rtmp: unimplemented feature") ) -// setupURL parses the RTMP URL. -func setupURL(s *Session) (err error) { +// init initialises the Session link +func (s *Session) init() (err error) { s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url) if err != nil { return err } - - if s.link.tcUrl == "" { - if s.link.app != "" { - s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app - } else { - s.link.tcUrl = s.url - } + if s.link.app == "" { + return errInvalidURL } - if s.link.port == 0 { switch { case (s.link.protocol & featureSSL) != 0: @@ -191,6 +182,8 @@ func setupURL(s *Session) (err error) { s.link.port = 1935 } } + s.link.url = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app + s.link.protocol |= featureWrite return nil } @@ -209,13 +202,13 @@ func connect(s *Session) error { err = handshake(s) if err != nil { s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) - return errHandshake + return err } s.log(DebugLevel, pkg+"handshaked") err = sendConnectPacket(s) if err != nil { s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) - return errConnSend + return err } return nil } @@ -225,7 +218,7 @@ func connectStream(s *Session) error { var err error for !s.isPlaying { pkt := packet{} - err = pkt.read(s) + err = pkt.readFrom(s) if err != nil { break } @@ -248,53 +241,39 @@ func connectStream(s *Session) error { } // handlePacket handles a packet that the client has received. -// NB: cases have been commented out that are not currently used by AusOcean +// NB: Unsupported packet types are logged fatally. func handlePacket(s *Session, pkt *packet) error { + if pkt.bodySize < 4 { + return errInvalidBody + } + switch pkt.packetType { case packetTypeChunkSize: - if pkt.bodySize >= 4 { - s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) - } + s.inChunkSize = amf.DecodeInt32(pkt.body[:4]) case packetTypeBytesReadReport: - s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - - case packetTypeControl: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") + s.serverBW = amf.DecodeInt32(pkt.body[:4]) case packetTypeServerBW: - s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) + s.serverBW = amf.DecodeInt32(pkt.body[:4]) case packetTypeClientBW: - s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) + s.clientBW = amf.DecodeInt32(pkt.body[:4]) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] } else { s.clientBW2 = 0xff } - case packetTypeAudio: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio") - - case packetTypeVideo: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo") - - case packetTypeFlexMessage: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage") - - case packetTypeInfo: - s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo") - case packetTypeInvoke: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { - // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) return err } - case packetTypeFlashVideo: - s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") + case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo: + s.log(FatalLevel, pkg+"unsupported packet type "+strconv.Itoa(int(pkt.packetType))) default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) @@ -313,110 +292,50 @@ func sendConnectPacket(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avConnect) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avConnect) + if err != nil { + return err } s.numInvokes += 1 - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_OBJECT + + enc[0] = amf.TypeObject enc = enc[1:] - - enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNamedString(enc, avApp, s.link.app) + if err != nil { + return err } - if s.link.protocol&featureWrite != 0 { - enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate) - if enc == nil { - return errEncoding - } + enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) + if err != nil { + return err + } + enc, err = amf.EncodeNamedString(enc, avTcUrl, s.link.url) + if err != nil { + return err + } + enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) + if err != nil { + return err } - if s.link.flashVer != "" { - enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer) - if enc == nil { - return errEncoding - } - } - if s.link.swfUrl != "" { - enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl) - if enc == nil { - return errEncoding - } - } - - if s.link.tcUrl != "" { - enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl) - if enc == nil { - return errEncoding - } - } - - if s.link.protocol&featureWrite == 0 { - enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false) - if enc == nil { - return errEncoding - } - enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15) - if enc == nil { - return errEncoding - } - enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs) - if enc == nil { - return errEncoding - } - enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs) - if enc == nil { - return errEncoding - } - enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1) - if enc == nil { - return errEncoding - } - if s.link.pageUrl != "" { - enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl) - if enc == nil { - return errEncoding - } - } - } - - if s.encoding != 0.0 || s.sendEncoding { - enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding) - if enc == nil { - return errEncoding - } - } - - copy(enc, []byte{0, 0, AMF_OBJECT_END}) - enc = enc[3:] - - // add auth string + // add auth string, if any if s.link.auth != "" { - enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0) - if enc == nil { - return errEncoding + enc, err = amf.EncodeBoolean(enc, s.link.flags&linkAuth != 0) + if err != nil { + return err } - enc = C_AMF_EncodeString(enc, s.link.auth) - if enc == nil { - return errEncoding - } - } - - for i := range s.link.extras.o_props { - enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc) - if enc == nil { - return errEncoding + enc, err = amf.EncodeString(enc, s.link.auth) + if err != nil { + return err } } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, true) // response expected + return pkt.writeTo(s, true) // response expected } func sendCreateStream(s *Session) error { @@ -430,21 +349,21 @@ func sendCreateStream(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avCreatestream) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avCreatestream) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, true) // response expected + return pkt.writeTo(s, true) // response expected } func sendReleaseStream(s *Session) error { @@ -458,24 +377,24 @@ func sendReleaseStream(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avReleasestream) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avReleasestream) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] - enc = C_AMF_EncodeString(enc, s.link.playpath) - if enc == nil { - return errEncoding + enc, err = amf.EncodeString(enc, s.link.playpath) + if err != nil { + return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, false) + return pkt.writeTo(s, false) } func sendFCPublish(s *Session) error { @@ -489,25 +408,25 @@ func sendFCPublish(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avFCPublish) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avFCPublish) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] - enc = C_AMF_EncodeString(enc, s.link.playpath) - if enc == nil { - return errEncoding + enc, err = amf.EncodeString(enc, s.link.playpath) + if err != nil { + return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, false) + return pkt.writeTo(s, false) } func sendFCUnpublish(s *Session) error { @@ -521,25 +440,25 @@ func sendFCUnpublish(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avFCUnpublish) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avFCUnpublish) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] - enc = C_AMF_EncodeString(enc, s.link.playpath) - if enc == nil { - return errEncoding + enc, err = amf.EncodeString(enc, s.link.playpath) + if err != nil { + return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, false) + return pkt.writeTo(s, false) } func sendPublish(s *Session) error { @@ -553,29 +472,29 @@ func sendPublish(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avPublish) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avPublish) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] - enc = C_AMF_EncodeString(enc, s.link.playpath) - if enc == nil { - return errEncoding + enc, err = amf.EncodeString(enc, s.link.playpath) + if err != nil { + return err } - enc = C_AMF_EncodeString(enc, avLive) - if enc == nil { - return errEncoding + enc, err = amf.EncodeString(enc, avLive) + if err != nil { + return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, true) // response expected + return pkt.writeTo(s, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { @@ -589,24 +508,24 @@ func sendDeleteStream(s *Session, dStreamId float64) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, avDeletestream) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, avDeletestream) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] - enc = C_AMF_EncodeNumber(enc, dStreamId) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, dStreamId) + if err != nil { + return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, false) + return pkt.writeTo(s, false) } // sendBytesReceived tells the server how many bytes the client has received. @@ -622,13 +541,14 @@ func sendBytesReceived(s *Session) error { enc := pkt.body s.nBytesInSent = s.nBytesIn - enc = C_AMF_EncodeInt32(enc, s.nBytesIn) - if enc == nil { - return errEncoding + + enc, err := amf.EncodeInt32(enc, s.nBytesIn) + if err != nil { + return err } pkt.bodySize = 4 - return pkt.write(s, false) + return pkt.writeTo(s, false) } func sendCheckBW(s *Session) error { @@ -642,21 +562,21 @@ func sendCheckBW(s *Session) error { } enc := pkt.body - enc = C_AMF_EncodeString(enc, av_checkbw) - if enc == nil { - return errEncoding + enc, err := amf.EncodeString(enc, av_checkbw) + if err != nil { + return err } s.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) - if enc == nil { - return errEncoding + enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + if err != nil { + return err } - enc[0] = AMF_NULL + enc[0] = amf.TypeNull enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.write(s, false) + return pkt.writeTo(s, false) } func eraseMethod(m []method, i int) []method { @@ -671,18 +591,27 @@ func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } - var obj C_AMFObject - nRes := C_AMF_Decode(&obj, body, 0) - if nRes < 0 { - return errDecoding + var obj amf.Object + _, err := amf.Decode(&obj, body, false) + if err != nil { + return err } - meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) - s.log(DebugLevel, pkg+"invoking method "+meth) - txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) + meth, err := obj.StringProperty("", 0) + if err != nil { + return err + } + txn, err := obj.NumberProperty("", 1) + if err != nil { + return err + } + s.log(DebugLevel, pkg+"invoking method "+meth) switch meth { case av_result: + if (s.link.protocol & featureWrite) == 0 { + return errNotWritable + } var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { @@ -693,18 +622,12 @@ func handleInvoke(s *Session, body []byte) error { } if methodInvoked == "" { s.log(WarnLevel, pkg+"received result without matching request", "id", txn) - goto leave + return nil } s.log(DebugLevel, pkg+"received result for "+methodInvoked) switch methodInvoked { case avConnect: - if s.link.token != "" { - s.log(FatalLevel, pkg+"no support for link token") - } - if (s.link.protocol & featureWrite) == 0 { - return errNotWritable - } err := sendReleaseStream(s) if err != nil { return err @@ -719,86 +642,56 @@ func handleInvoke(s *Session, body []byte) error { } case avCreatestream: - s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) - if s.link.protocol&featureWrite == 0 { - return errNotWritable + n, err := obj.NumberProperty("", 3) + if err != nil { + return err } - err := sendPublish(s) + s.streamID = int32(n) + err = sendPublish(s) if err != nil { return err } - case avPlay, avPublish: - s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") + default: + s.log(FatalLevel, pkg+"unexpected method invoked"+methodInvoked) } case avOnBWDone: - if s.checkCounter == 0 { // ToDo: why is this always zero? - err := sendCheckBW(s) - if err != nil { - return err - } + err := sendCheckBW(s) + if err != nil { + return err } - case avOnFCUnsubscribe, avOnFCSubscribe: - s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe") - - case avPing: - s.log(FatalLevel, pkg+"unsupported method avPing") - - case av_onbwcheck: - s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") - - case av_onbwdone: - s.log(FatalLevel, pkg+"unsupported method av_onbwdone") - - case avClose: - s.log(FatalLevel, pkg+"unsupported method avClose") - case avOnStatus: - var obj2 C_AMFObject - C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) - code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1)) - level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1)) + obj2, err := obj.ObjectProperty("", 3) + if err != nil { + return err + } + code, err := obj2.StringProperty(avCode, -1) + if err != nil { + return err + } + level, err := obj2.StringProperty(avLevel, -1) + if err != nil { + return err + } s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) - switch code { - case avNetStreamFailed, avNetStreamPlayFailed, - avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: - s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") - - case avNetStreamPlayStart, avNetStreamPlayPublishNotify: - s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify") - - case avNetStreamPublish_Start: - s.log(DebugLevel, pkg+"playing") - s.isPlaying = true - for i, m := range s.methodCalls { - if m.name == avPublish { - s.methodCalls = eraseMethod(s.methodCalls, i) - break - } + if code != avNetStreamPublish_Start { + s.log(ErrorLevel, pkg+"unexpected response "+code) + return errUnimplemented + } + s.log(DebugLevel, pkg+"playing") + s.isPlaying = true + for i, m := range s.methodCalls { + if m.name == avPublish { + s.methodCalls = eraseMethod(s.methodCalls, i) } - // ToDo: handle case when avPublish method not found - - case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify: - s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify") - - case avNetStreamSeekNotify: - s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify") - - case avNetStreamPauseNotify: - s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify") } - case avPlaylist_ready: - s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready") - default: - s.log(FatalLevel, pkg+"unknown method "+meth) + s.log(FatalLevel, pkg+"unsuppoted method "+meth) } -leave: - C_AMF_Reset(&obj) return nil } diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index aeb86263..74e4db71 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -116,42 +116,74 @@ func TestKey(t *testing.T) { testLog(0, "Testing against URL "+testBaseURL+testKey) } -// TestSetupURL tests URL parsing. -func TestSetupURL(t *testing.T) { - testLog(0, "TestSetupURL") +// TestInit tests session construction and link initialization. +func TestInit(t *testing.T) { + testLog(0, "TestInit") // test with just the base URL s := NewSession(testBaseURL, testTimeout, testLog) if s.url != testBaseURL && s.link.timeout != testTimeout { t.Errorf("NewSession failed") } - err := setupURL(s) + err := s.init() if err != nil { - t.Errorf("setupURL(testBaseURL) failed with error: %v", err) + t.Errorf("setupURL: failed with error: %v", err) } // test the parts are as expected - if rtmpProtocolStrings[s.link.protocol] != rtmpProtocol { - t.Errorf("setupURL returned wrong protocol: %v", s.link.protocol) + if s.link.protocol&featureWrite == 0 { + t.Errorf("setupURL: link not writable") + } + if rtmpProtocolStrings[s.link.protocol&^featureWrite] != rtmpProtocol { + t.Errorf("setupURL: wrong protocol: %v", s.link.protocol) } if s.link.host != testHost { - t.Errorf("setupURL returned wrong host: %v", s.link.host) + t.Errorf("setupURL: wrong host: %v", s.link.host) } if s.link.app != testApp { - t.Errorf("setupURL returned wrong app: %v", s.link.app) + t.Errorf("setupURL: wrong app: %v", s.link.app) } } -// TestOpenClose tests opening an closing an RTMP connection. -func TestOpenClose(t *testing.T) { - testLog(0, "TestOpenClose") +// TestErrorHandling tests error handling +func TestErorHandling(t *testing.T) { + testLog(0, "TestErrorHandling") if testKey == "" { - t.Skip("Skipping TestOpenClose since no RTMP_TEST_KEY") + t.Skip("Skipping TestErrorHandling since no RTMP_TEST_KEY") } s := NewSession(testBaseURL+testKey, testTimeout, testLog) - err := s.Open() + + // test errNotConnected + var buf [1024]byte + tag := buf[:0] + _, err := s.Write(tag) + if err == nil { + t.Errorf("Write did not return errNotConnected") + } + + err = s.Open() if err != nil { t.Errorf("Open failed with error: %v", err) - return } + + // test errInvalidFlvTag + _, err = s.Write(tag) + if err == nil { + t.Errorf("Write did not return errInvalidFlvTag") + } + + // test errUnimplemented + copy(tag, []byte("FLV")) + _, err = s.Write(tag) + if err == nil { + t.Errorf("Write did not return errUnimplemented") + } + + // test errInvalidBody + tag = buf[:11] + _, err = s.Write(tag) + if err == nil { + t.Errorf("Write did not return errInvalidBody") + } + err = s.Close() if err != nil { t.Errorf("Close failed with error: %v", err) diff --git a/rtmp/session.go b/rtmp/session.go index eb97c938..8da2626b 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -37,22 +37,22 @@ import ( "io" "net" "time" + + "bitbucket.org/ausocean/av/rtmp/amf" ) // Session holds the state for an RTMP session. type Session struct { url string - inChunkSize int32 - outChunkSize int32 - checkCounter int32 - nBytesIn int32 - nBytesInSent int32 + inChunkSize uint32 + outChunkSize uint32 + nBytesIn uint32 + nBytesInSent uint32 streamID int32 - serverBW int32 - clientBW int32 + serverBW uint32 + clientBW uint32 clientBW2 uint8 isPlaying bool - sendEncoding bool numInvokes int32 methodCalls []method channelsAllocatedIn int32 @@ -62,7 +62,6 @@ type Session struct { channelTimestamp []int32 audioCodecs float64 videoCodecs float64 - encoding float64 deferred []byte link link log Log @@ -72,16 +71,10 @@ type Session struct { type link struct { host string playpath string - tcUrl string - swfUrl string - pageUrl string + url string app string auth string - flashVer string - token string - extras C_AMFObject flags int32 - swfAge int32 protocol int32 timeout uint port uint16 @@ -106,6 +99,10 @@ const ( FatalLevel int8 = 5 ) +// flvTagheaderSize is the FLV header size we expect. +// NB: We don't accept extended headers. +const flvTagheaderSize = 11 + // NewSession returns a new Session. func NewSession(url string, timeout uint, log Log) *Session { return &Session{ @@ -120,7 +117,6 @@ func NewSession(url string, timeout uint, log Log) *Session { log: log, link: link{ timeout: timeout, - swfAge: 30, }, } } @@ -131,12 +127,10 @@ func (s *Session) Open() error { if s.isConnected() { return errConnected } - err := setupURL(s) + err := s.init() if err != nil { return err } - - s.enableWrite() err = connect(s) if err != nil { s.Close() @@ -174,8 +168,8 @@ func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { return 0, errNotConnected } - if len(data) < minDataSize { - return 0, errTinyPacket + if len(data) < flvTagheaderSize { + return 0, errInvalidFlvTag } if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { return 0, errUnimplemented @@ -183,15 +177,15 @@ func (s *Session) Write(data []byte) (int, error) { pkt := packet{ packetType: data[0], - bodySize: C_AMF_DecodeInt24(data[1:4]), - timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24, + bodySize: amf.DecodeInt24(data[1:4]), + timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: chanSource, info: s.streamID, } pkt.resize(pkt.bodySize, headerSizeAuto) - copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize]) - err := pkt.write(s, false) + copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) + err := pkt.writeTo(s, false) if err != nil { return 0, err } @@ -213,7 +207,7 @@ func (s *Session) read(buf []byte) (int, error) { s.log(DebugLevel, pkg+"read failed", "error", err.Error()) return 0, err } - s.nBytesIn += int32(n) + s.nBytesIn += uint32(n) if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { err := sendBytesReceived(s) if err != nil { @@ -243,7 +237,3 @@ func (s *Session) isConnected() bool { return s.link.conn != nil } -// enableWrite enables the current session for writing. -func (s *Session) enableWrite() { - s.link.protocol |= featureWrite -}