Merged in rtmp-refactoring-2 (pull request #107)

Second pass at RTMP refactoring.

Approved-by: Saxon Milton <saxon.milton@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Alan Noble 2019-01-15 04:31:26 +00:00
commit b9d99cc78a
8 changed files with 1090 additions and 1057 deletions

View File

@ -1,579 +0,0 @@
/*
NAME
amf.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
amf.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"encoding/binary"
"log"
"math"
)
var (
AMFObj_Invalid C_AMFObject
AMFProp_Invalid = C_AMFObjectProperty{p_type: AMF_INVALID}
)
const (
AMF3_INTEGER_MAX = 268435455
AMF3_INTEGER_MIN = -268435456
)
// unsigned short AMF_DecodeInt16(const char* data);
// amf.c +41
func C_AMF_DecodeInt16(data []byte) uint16 {
return uint16(data[0])<<8 | uint16(data[1])
}
// unsigned int AMF_DecodeInt24(const char* data);
// amf.c +50
func C_AMF_DecodeInt24(data []byte) uint32 {
return uint32(data[0])<<16 | uint32(data[1])<<8 | uint32(data[2])
}
// unsigned int AMF_DeocdeInt32(const char* data);
// amf.c +59
func C_AMF_DecodeInt32(data []byte) uint32 {
return uint32(data[0])<<24 | uint32(data[1])<<16 | uint32(data[2])<<8 | uint32(data[3])
}
// void AMF_DecodeString(const char* data, C_AVal* bv);
// amf.c +68
func C_AMF_DecodeString(data []byte) string {
n := C_AMF_DecodeInt16(data)
return string(data[2 : 2+n])
}
// void AMF_DecodeLongString(const char *data, AVal *bv);
// amf.c +75
func C_AMF_DecodeLongString(data []byte) string {
n := C_AMF_DecodeInt32(data)
return string(data[2 : 2+n])
}
// double AMF_DecodeNumber(const char* data);
// amf.c +82
func C_AMF_DecodeNumber(data []byte) float64 {
return math.Float64frombits(binary.BigEndian.Uint64(data))
}
// int AMF_DecodeBoolean(const char *data);
// amf.c +132
func C_AMF_DecodeBoolean(data []byte) bool {
return data[0] != 0
}
// char* AMF_EncodeInt24(char* output, char* outend, int nVal);
// amf.c +149
func C_AMF_EncodeInt24(dst []byte, val int32) []byte {
if len(dst) < 3 {
return nil
}
_ = dst[2]
dst[0] = byte(val >> 16)
dst[1] = byte(val >> 8)
dst[2] = byte(val)
if len(dst) == 3 {
return nil
}
return dst[3:]
}
// char* AMF_EncodeInt32(char* output, char* outend, int nVal);
// amf.c +160
func C_AMF_EncodeInt32(dst []byte, val int32) []byte {
if len(dst) < 4 {
return nil
}
binary.BigEndian.PutUint32(dst, uint32(val))
if len(dst) == 4 {
return nil
}
return dst[4:]
}
// char* AMF_EncodeString(char* output, char* outend, const C_AVal* bv);
// amf.c +173
func C_AMF_EncodeString(dst []byte, val string) []byte {
const typeSize = 1
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(dst) {
return nil
}
if len(val)+typeSize+binary.Size(int32(0)) > len(dst) {
return nil
}
if len(val) < 65536 {
dst[0] = AMF_STRING
dst = dst[1:]
binary.BigEndian.PutUint16(dst[:2], uint16(len(val)))
dst = dst[2:]
copy(dst, val)
if len(dst) == len(val) {
return nil
}
return dst[len(val):]
}
dst[0] = AMF_LONG_STRING
dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(val)))
dst = dst[4:]
copy(dst, val)
if len(dst) == len(val) {
return nil
}
return dst[len(val):]
}
// char* AMF_EncodeNumber(char* output, char* outend, double dVal);
// amf.c +199
func C_AMF_EncodeNumber(dst []byte, val float64) []byte {
if len(dst) < 9 {
return nil
}
dst[0] = AMF_NUMBER
dst = dst[1:]
binary.BigEndian.PutUint64(dst, math.Float64bits(val))
return dst[8:]
}
// char* AMF_EncodeBoolean(char* output, char* outend, int bVal);
// amf.c +260
func C_AMF_EncodeBoolean(dst []byte, val bool) []byte {
if len(dst) < 2 {
return nil
}
dst[0] = AMF_BOOLEAN
if val {
dst[1] = 1
}
if len(dst) == 2 {
return nil
}
return dst[2:]
}
// char* AMF_EncodeNamedString(char* output, char* outend, const C_AVal* strName, const C_AVal* strValue);
// amf.c +273
func C_AMF_EncodeNamedString(dst []byte, key, val string) []byte {
if 2+len(key) > len(dst) {
return nil
}
binary.BigEndian.PutUint16(dst[:2], uint16(len(key)))
dst = dst[2:]
copy(dst, key)
if len(key) == len(dst) {
return nil
}
return C_AMF_EncodeString(dst[len(key):], val)
}
// char* AMF_EncodeNamedNumber(char* output, char* outend, const C_AVal* strName, double dVal);
// amf.c +286
func C_AMF_EncodeNamedNumber(dst []byte, key string, val float64) []byte {
if 2+len(key) > len(dst) {
return nil
}
binary.BigEndian.PutUint16(dst[:2], uint16(len(key)))
dst = dst[2:]
copy(dst, key)
if len(key) == len(dst) {
return nil
}
return C_AMF_EncodeNumber(dst[len(key):], val)
}
// char* AMF_EncodeNamedBoolean(char* output, char* outend, const C_AVal* strname, int bVal);
// amf.c +299
func C_AMF_EncodeNamedBoolean(dst []byte, key string, val bool) []byte {
if 2+len(key) > len(dst) {
return nil
}
binary.BigEndian.PutUint16(dst[:2], uint16(len(key)))
dst = dst[2:]
copy(dst, key)
if len(key) == len(dst) {
return nil
}
return C_AMF_EncodeBoolean(dst[len(key):], val)
}
// void AMFProp_SetName(AMFObjectProperty *prop, AVal *name);
// amf.c +318
func C_AMFProp_SetName(prop *C_AMFObjectProperty, name string) {
prop.p_name = name
}
// double AMFProp_GetNumber(AMFObjectProperty* prop);
// amf.c +330
func C_AMFProp_GetNumber(prop *C_AMFObjectProperty) float64 {
return prop.p_vu.p_number
}
// void AMFProp_GetString(AMFObjectProperty* prop, AVal* str);
// amf.c +341
func C_AMFProp_GetString(prop *C_AMFObjectProperty) string {
if prop.p_type == AMF_STRING {
return prop.p_vu.p_aval
}
return ""
}
// void AMFProp_GetObject(AMFObjectProperty *prop, AMFObject *obj);
// amf.c +351
func C_AMFProp_GetObject(prop *C_AMFObjectProperty, obj *C_AMFObject) {
if prop.p_type == AMF_OBJECT {
*obj = prop.p_vu.p_object
} else {
*obj = AMFObj_Invalid
}
}
// char* AMFPropEncode(AMFOBjectProperty* prop, char* pBufer, char* pBufEnd);
// amf.c +366
func C_AMF_PropEncode(p *C_AMFObjectProperty, dst []byte) []byte {
if p.p_type == AMF_INVALID {
return nil
}
if p.p_type != AMF_NULL && len(p.p_name)+2+1 >= len(dst) {
return nil
}
if p.p_type != AMF_NULL && len(p.p_name) != 0 {
binary.BigEndian.PutUint16(dst[:2], uint16(len(p.p_name)))
dst = dst[2:]
copy(dst, p.p_name)
dst = dst[len(p.p_name):]
}
switch p.p_type {
case AMF_NUMBER:
dst = C_AMF_EncodeNumber(dst, p.p_vu.p_number)
case AMF_BOOLEAN:
dst = C_AMF_EncodeBoolean(dst, p.p_vu.p_number != 0)
case AMF_STRING:
dst = C_AMF_EncodeString(dst, p.p_vu.p_aval)
case AMF_NULL:
if len(dst) < 2 {
return nil
}
dst[0] = AMF_NULL
dst = dst[1:]
case AMF_OBJECT:
dst = C_AMF_Encode(&p.p_vu.p_object, dst)
case AMF_ECMA_ARRAY:
dst = C_AMF_EncodeEcmaArray(&p.p_vu.p_object, dst)
case AMF_STRICT_ARRAY:
dst = C_AMF_EncodeArray(&p.p_vu.p_object, dst)
default:
log.Println("C_AMF_PropEncode: invalid type!")
dst = nil
}
return dst
}
// int AMFProp_Decode(C_AMFObjectProperty* prop, const char* pBuffer, int nSize, int bDecodeName);
// amf.c +619
func C_AMFProp_Decode(prop *C_AMFObjectProperty, data []byte, bDecodeName int32) int32 {
prop.p_name = ""
nOriginalSize := len(data)
if len(data) == 0 {
// TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s: Empty buffer/no buffer pointer!", __FUNCTION__);
return -1
}
if bDecodeName != 0 && len(data) < 4 {
// at least name (length + at least 1 byte) and 1 byte of data
// TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s: Not enough data for decoding with name, less than 4 bytes!",__FUNCTION__);
return -1
}
if bDecodeName != 0 {
nNameSize := C_AMF_DecodeInt16(data[:2])
if int(nNameSize) > len(data)-2 {
// TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2",__FUNCTION__, nNameSize, nSize);
return -1
}
prop.p_name = C_AMF_DecodeString(data)
data = data[2+nNameSize:]
}
if len(data) == 0 {
return -1
}
prop.p_type = C_AMFDataType(data[0])
data = data[1:]
var nRes int32
switch prop.p_type {
case AMF_NUMBER:
if len(data) < 8 {
return -1
}
prop.p_vu.p_number = C_AMF_DecodeNumber(data[:8])
data = data[8:]
case AMF_BOOLEAN:
panic("AMF_BOOLEAN not supported")
case AMF_STRING:
nStringSize := C_AMF_DecodeInt16(data[:2])
if len(data) < int(nStringSize+2) {
return -1
}
prop.p_vu.p_aval = C_AMF_DecodeString(data)
data = data[2+nStringSize:]
case AMF_OBJECT:
nRes := C_AMF_Decode(&prop.p_vu.p_object, data, 1)
if nRes == -1 {
return -1
}
data = data[nRes:]
case AMF_MOVIECLIP:
// TODO use new logger here
log.Println("AMFProp_Decode: MAF_MOVIECLIP reserved!")
//RTMP_Log(RTMP_LOGERROR, "AMF_MOVIECLIP reserved!");
return -1
case AMF_NULL, AMF_UNDEFINED, AMF_UNSUPPORTED:
prop.p_type = AMF_NULL
case AMF_REFERENCE:
// TODO use new logger here
log.Println("AMFProp_Decode: AMF_REFERENCE not supported!")
//RTMP_Log(RTMP_LOGERROR, "AMF_REFERENCE not supported!");
return -1
case AMF_ECMA_ARRAY:
// next comes the rest, mixed array has a final 0x000009 mark and names, so its an object
data = data[4:]
nRes = C_AMF_Decode(&prop.p_vu.p_object, data, 1)
if nRes == -1 {
return -1
}
data = data[nRes:]
case AMF_OBJECT_END:
return -1
case AMF_STRICT_ARRAY:
panic("AMF_STRICT_ARRAY not supported")
case AMF_DATE:
panic("AMF_DATE not supported")
case AMF_LONG_STRING, AMF_XML_DOC:
panic("AMF_LONG_STRING, AMF_XML_DOC not supported")
case AMF_RECORDSET:
// TODO use new logger here
log.Println("AMFProp_Decode: AMF_RECORDSET reserved!")
//RTMP_Log(RTMP_LOGERROR, "AMF_RECORDSET reserved!");
return -1
case AMF_TYPED_OBJECT:
// TODO use new logger here
// RTMP_Log(RTMP_LOGERROR, "AMF_TYPED_OBJECT not supported!")
return -1
case AMF_AVMPLUS:
panic("AMF_AVMPLUS not supported")
default:
// TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s - unknown datatype 0x%02x, @%p", __FUNCTION__,
//prop.p_type, pBuffer - 1);
return -1
}
return int32(nOriginalSize - len(data))
}
// void AMFProp_Reset(AMFObjectProperty* prop);
// amf.c +875
func C_AMFProp_Reset(prop *C_AMFObjectProperty) {
if prop.p_type == AMF_OBJECT || prop.p_type == AMF_ECMA_ARRAY ||
prop.p_type == AMF_STRICT_ARRAY {
C_AMF_Reset(&prop.p_vu.p_object)
} else {
prop.p_vu.p_aval = ""
}
prop.p_type = AMF_INVALID
}
// char* AMF_Encode(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +891
func C_AMF_Encode(obj *C_AMFObject, dst []byte) []byte {
if len(dst) < 5 {
return nil
}
dst[0] = AMF_OBJECT
dst = dst[1:]
for i := 0; i < len(obj.o_props); i++ {
dst = C_AMF_PropEncode(&obj.o_props[i], dst)
if dst == nil {
log.Println("C_AMF_Encode: failed to encode property in index")
break
}
}
if len(dst) < 4 {
return nil
}
return C_AMF_EncodeInt24(dst, AMF_OBJECT_END)
}
// char* AMF_EncodeEcmaArray(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +924
func C_AMF_EncodeEcmaArray(obj *C_AMFObject, dst []byte) []byte {
if len(dst) < 5 {
return nil
}
dst[0] = AMF_ECMA_ARRAY
dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(obj.o_props)))
dst = dst[4:]
for i := 0; i < len(obj.o_props); i++ {
dst = C_AMF_PropEncode(&obj.o_props[i], dst)
if dst == nil {
log.Println("C_AMF_EncodeEcmaArray: failed to encode property!")
break
}
}
if len(dst) < 4 {
return nil
}
return C_AMF_EncodeInt24(dst, AMF_OBJECT_END)
}
// char* AMF_EncodeArray(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +959
func C_AMF_EncodeArray(obj *C_AMFObject, dst []byte) []byte {
if len(dst) < 5 {
return nil
}
dst[0] = AMF_STRICT_ARRAY
dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(obj.o_props)))
dst = dst[4:]
for i := 0; i < len(obj.o_props); i++ {
dst = C_AMF_PropEncode(&obj.o_props[i], dst)
if dst == nil {
log.Println("C_AMF_EncodeArray: failed to encode property!")
break
}
}
return dst
}
// int AMF_Decode(AMFObject *obj, const char* pBuffer, int nSize, int bDecodeName);
// amf.c +1180
func C_AMF_Decode(obj *C_AMFObject, data []byte, bDecodeName int32) int32 {
nOriginalSize := len(data)
obj.o_props = obj.o_props[:0]
for len(data) != 0 {
if len(data) >= 3 && C_AMF_DecodeInt24(data[:3]) == AMF_OBJECT_END {
data = data[3:]
break
}
var prop C_AMFObjectProperty
nRes := C_AMFProp_Decode(&prop, data, bDecodeName)
// nRes = int32(C.AMFProp_Decode(&prop, (*byte)(unsafe.Pointer(pBuffer)),
// int32(nSize), int32(bDecodeName)))
if nRes == -1 {
return -1
}
data = data[nRes:]
obj.o_props = append(obj.o_props, prop)
}
return int32(nOriginalSize - len(data))
}
// AMFObjectProperty* AMF_GetProp(AMFObject *obj, const AVal* name, int nIndex);
// amf.c + 1249
func C_AMF_GetProp(obj *C_AMFObject, name string, idx int32) *C_AMFObjectProperty {
if idx >= 0 {
if idx < int32(len(obj.o_props)) {
return &obj.o_props[idx]
}
} else {
for i, p := range obj.o_props {
if p.p_name == name {
return &obj.o_props[i]
}
}
}
return &AMFProp_Invalid
}
// void AMF_Reset(AMFObject* obj);
// amf.c +1282
func C_AMF_Reset(obj *C_AMFObject) {
for i := range obj.o_props {
C_AMFProp_Reset(&obj.o_props[i])
}
obj.o_props = obj.o_props[:0]
}
/*
// void AMF3CD_AddProp(AMF3ClassDef *cd, AVal *prop);
// amf.c +1298
func AMF3CD_AddProp(cd *C.AMF3ClassDef, prop *C_AVal) {
if cd.cd_num&0x0f == 0 {
cd.cd_props = (*C_AVal)(realloc(unsafe.Pointer(cd.cd_props), int(uintptr(cd.cd_num+16)*unsafe.Sizeof(C_AVal{}))))
}
*(*C_AVal)(incPtr(unsafe.Pointer(cd.cd_props), int(cd.cd_num), int(unsafe.Sizeof(C_AVal{})))) = *prop
cd.cd_num++
}
*/

498
rtmp/amf/amf.go Normal file
View File

@ -0,0 +1,498 @@
/*
NAME
amf.go
DESCRIPTION
Action Message Format (AMF) encoding/decoding functions.
See https://en.wikipedia.org/wiki/Action_Message_Format.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
amf.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
// Package amf implements Action Message Format (AMF) encoding and decoding.
// In AMF, encoding of numbers is big endian by default, unless specified otherwise,
// and numbers are all unsigned.
// See https://en.wikipedia.org/wiki/Action_Message_Format.
package amf
import (
"encoding/binary"
"errors"
"math"
)
// AMF data types, as defined by the AMF specification.
// NB: we export these sparingly.
const (
typeNumber = 0x00
typeBoolean = 0x01
typeString = 0x02
TypeObject = 0x03
typeMovieClip = 0x04
TypeNull = 0x05
typeUndefined = 0x06
typeReference = 0x07
typeEcmaArray = 0x08
TypeObjectEnd = 0x09
typeStrictArray = 0x0A
typeDate = 0x0B
typeLongString = 0x0C
typeUnsupported = 0x0D
typeRecordset = 0x0E
typeXmlDoc = 0x0F
typeTypedObject = 0x10
typeAvmplus = 0x11
typeInvalid = 0xff
)
// AMF represents an AMF object, which is simply a collection of properties.
type Object struct {
Properties []Property
}
// Property represents an AMF property, which is effectively a
// union. The Type is the AMF data type (uint8 per the specification),
// and specifies which member holds a value. Numeric types use
// Number, string types use String and arrays and objects use
// Object. The Name is optional.
type Property struct {
Type uint8
Name string
Number float64
String string
Object Object
}
// AMF errors:
var (
ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short.
ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder.
ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding.
ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found.
)
// DecodeInt16 decodes a 16-bit integer.
func DecodeInt16(buf []byte) uint16 {
return binary.BigEndian.Uint16(buf)
}
// DecodeInt24 decodes a 24-bit integer.
func DecodeInt24(buf []byte) uint32 {
return uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])
}
// DecodeInt32 decodes a 32-bit integer.
func DecodeInt32(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
// DecodeInt32LE decodes a 32-bit little-endian integer.
func DecodeInt32LE(buf []byte) uint32 {
return binary.LittleEndian.Uint32(buf)
}
// DecodeString decodes a string that is less than 2^16 bytes long.
func DecodeString(buf []byte) string {
n := DecodeInt16(buf)
return string(buf[2 : 2+n])
}
// DecodeLongString decodes a long string.
func DecodeLongString(buf []byte) string {
n := DecodeInt32(buf)
return string(buf[2 : 2+n])
}
// DecodeNumber decodes a 64-bit floating-point number.
func DecodeNumber(buf []byte) float64 {
return math.Float64frombits(binary.BigEndian.Uint64(buf))
}
// DecodeBoolean decodes a boolean.
func DecodeBoolean(buf []byte) bool {
return buf[0] != 0
}
// EncodeInt24 encodes a 24-bit integer.
func EncodeInt24(buf []byte, val uint32) ([]byte, error) {
if len(buf) < 3 {
return nil, ErrShortBuffer
}
buf[0] = byte(val >> 16)
buf[1] = byte(val >> 8)
buf[2] = byte(val)
return buf[3:], nil
}
// EncodeInt32 encodes a 32-bit integer.
func EncodeInt32(buf []byte, val uint32) ([]byte, error) {
if len(buf) < 4 {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint32(buf, val)
return buf[4:], nil
}
// EncodeString encodes a string.
func EncodeString(buf []byte, val string) ([]byte, error) {
const typeSize = 1
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) {
return nil, ErrShortBuffer
}
if len(val)+typeSize+binary.Size(uint32(0)) > len(buf) {
return nil, ErrShortBuffer
}
if len(val) < 65536 {
buf[0] = typeString
buf = buf[1:]
binary.BigEndian.PutUint16(buf[:2], uint16(len(val)))
buf = buf[2:]
copy(buf, val)
return buf[len(val):], nil
}
buf[0] = typeLongString
buf = buf[1:]
binary.BigEndian.PutUint32(buf[:4], uint32(len(val)))
buf = buf[4:]
copy(buf, val)
return buf[len(val):], nil
}
// EncodeNumber encodes a 64-bit floating-point number.
func EncodeNumber(buf []byte, val float64) ([]byte, error) {
if len(buf) < 9 {
return nil, ErrShortBuffer
}
buf[0] = typeNumber
buf = buf[1:]
binary.BigEndian.PutUint64(buf, math.Float64bits(val))
return buf[8:], nil
}
// EncodeBoolean encodes a boolean.
func EncodeBoolean(buf []byte, val bool) ([]byte, error) {
if len(buf) < 2 {
return nil, ErrShortBuffer
}
buf[0] = typeBoolean
if val {
buf[1] = 1
} else {
buf[1] = 0
}
return buf[2:], nil
}
// EncodeNamedString encodes a named string, where key is the name and val is the string value.
func EncodeNamedString(buf []byte, key, val string) ([]byte, error) {
if 2+len(key) > len(buf) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(key)))
buf = buf[2:]
copy(buf, key)
return EncodeString(buf[len(key):], val)
}
// EncodeNamedNumber encodes a named number, where key is the name and val is the number value.
func EncodeNamedNumber(buf []byte, key string, val float64) ([]byte, error) {
if 2+len(key) > len(buf) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(key)))
buf = buf[2:]
copy(buf, key)
return EncodeNumber(buf[len(key):], val)
}
// EncodeNamedNumber encodes a named boolean, where key is the name and val is the boolean value.
func EncodeNamedBoolean(buf []byte, key string, val bool) ([]byte, error) {
if 2+len(key) > len(buf) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(key)))
buf = buf[2:]
copy(buf, key)
return EncodeBoolean(buf[len(key):], val)
}
// EncodeProperty encodes a property.
func EncodeProperty(prop *Property, buf []byte) ([]byte, error) {
if prop.Type != TypeNull && prop.Name != "" {
if len(buf) < 2+len(prop.Name) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(prop.Name)))
buf = buf[2:]
copy(buf, prop.Name)
buf = buf[len(prop.Name):]
}
switch prop.Type {
case typeNumber:
return EncodeNumber(buf, prop.Number)
case typeBoolean:
return EncodeBoolean(buf, prop.Number != 0)
case typeString:
return EncodeString(buf, prop.String)
case TypeNull:
if len(buf) < 2 {
return nil, ErrShortBuffer
}
buf[0] = TypeNull
buf = buf[1:]
case TypeObject:
return Encode(&prop.Object, buf)
case typeEcmaArray:
return EncodeEcmaArray(&prop.Object, buf)
case typeStrictArray:
return EncodeArray(&prop.Object, buf)
default:
return nil, ErrInvalidType
}
return buf, nil
}
// DecodeProperty decodes a property, returning the number of bytes consumed from the supplied buffer.
func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
sz := len(buf)
if decodeName {
if len(buf) < 4 {
return 0, ErrShortBuffer
}
n := DecodeInt16(buf[:2])
if int(n) > len(buf)-2 {
return 0, ErrShortBuffer
}
prop.Name = DecodeString(buf)
buf = buf[2+n:]
} else {
prop.Name = ""
}
prop.Type = buf[0]
buf = buf[1:]
switch prop.Type {
case typeNumber:
if len(buf) < 8 {
return 0, ErrShortBuffer
}
prop.Number = DecodeNumber(buf[:8])
buf = buf[8:]
case typeBoolean:
if len(buf) < 1 {
return 0, ErrShortBuffer
}
prop.Number = float64(buf[0])
buf = buf[1:]
case typeString:
n := DecodeInt16(buf[:2])
if len(buf) < int(n+2) {
return 0, ErrShortBuffer
}
prop.String = DecodeString(buf)
buf = buf[2+n:]
case TypeObject:
n, err := Decode(&prop.Object, buf, true)
if err != nil {
return 0, err
}
buf = buf[n:]
case TypeNull, typeUndefined, typeUnsupported:
prop.Type = TypeNull
case typeEcmaArray:
buf = buf[4:]
n, err := Decode(&prop.Object, buf, true)
if err != nil {
return 0, err
}
buf = buf[n:]
default:
return 0, ErrUnexpectedType
}
return sz - len(buf), nil
}
// Encode encodes an Object into its AMF representation.
// This is the top-level encoding function and is typically the only function callers will need to use.
func Encode(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 {
return nil, ErrShortBuffer
}
buf[0] = TypeObject
buf = buf[1:]
for i := 0; i < len(obj.Properties); i++ {
var err error
buf, err = EncodeProperty(&obj.Properties[i], buf)
if err != nil {
return nil, err
}
}
if len(buf) < 3 {
return nil, ErrShortBuffer
}
return EncodeInt24(buf, TypeObjectEnd)
}
// EncodeEcmaArray encodes an ECMA array.
func EncodeEcmaArray(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 {
return nil, ErrShortBuffer
}
buf[0] = typeEcmaArray
buf = buf[1:]
binary.BigEndian.PutUint32(buf[:4], uint32(len(obj.Properties)))
buf = buf[4:]
for i := 0; i < len(obj.Properties); i++ {
var err error
buf, err = EncodeProperty(&obj.Properties[i], buf)
if err != nil {
return nil, err
}
}
if len(buf) < 3 {
return nil, ErrShortBuffer
}
return EncodeInt24(buf, TypeObjectEnd)
}
// EncodeArray encodes an array.
func EncodeArray(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 {
return nil, ErrShortBuffer
}
buf[0] = typeStrictArray
buf = buf[1:]
binary.BigEndian.PutUint32(buf[:4], uint32(len(obj.Properties)))
buf = buf[4:]
for i := 0; i < len(obj.Properties); i++ {
var err error
buf, err = EncodeProperty(&obj.Properties[i], buf)
if err != nil {
return nil, err
}
}
return buf, nil
}
// Decode decodes an object. Property names are only decoded if decodeName is true.
func Decode(obj *Object, buf []byte, decodeName bool) (int, error) {
sz := len(buf)
obj.Properties = obj.Properties[:0]
for len(buf) != 0 {
if len(buf) >= 3 && DecodeInt24(buf[:3]) == TypeObjectEnd {
buf = buf[3:]
break
}
var prop Property
n, err := DecodeProperty(&prop, buf, decodeName)
if err != nil {
return 0, err
}
buf = buf[n:]
obj.Properties = append(obj.Properties, prop)
}
return sz - len(buf), nil
}
// Object methods:
// Property returns a property, either by its index when idx is non-negative, or by its name otherwise.
// If the requested property is not found or the type does not match, an ErrPropertyNotFound error is returned.
func (obj *Object) Property(name string, idx int, typ uint8) (*Property, error) {
var prop *Property
if idx >= 0 {
if idx < len(obj.Properties) {
prop = &obj.Properties[idx]
}
} else {
for i, p := range obj.Properties {
if p.Name == name {
prop = &obj.Properties[i]
break
}
}
}
if prop == nil || prop.Type != typ {
return nil, ErrPropertyNotFound
}
return prop, nil
}
// NumberProperty is a wrapper for Property that returns a Number property's value, if any.
func (obj *Object) NumberProperty(name string, idx int) (float64, error) {
prop, err := obj.Property(name, idx, typeNumber)
if err != nil {
return 0, err
}
return prop.Number, nil
}
// StringProperty is a wrapper for Property that returns a String property's value, if any.
func (obj *Object) StringProperty(name string, idx int) (string, error) {
prop, err := obj.Property(name, idx, typeString)
if err != nil {
return "", err
}
return prop.String, nil
}
// ObjectProperty is a wrapper for Property that returns an Object property's value, if any.
func (obj *Object) ObjectProperty(name string, idx int) (*Object, error) {
prop, err := obj.Property(name, idx, TypeObject)
if err != nil {
return nil, err
}
return &prop.Object, nil
}

308
rtmp/amf/amf_test.go Normal file
View File

@ -0,0 +1,308 @@
/*
NAME
amf_test.go
DESCRIPTION
AMF test suite.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
amf_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package amf
import (
"testing"
)
// Test data.
var testStrings = [...]string{
"",
"foo",
"bar",
"bazz",
}
var testNumbers = [...]uint32{
0,
1,
0xababab,
0xffffff,
}
// TestSanity checks that we haven't accidentally changed constants.
func TestSanity(t *testing.T) {
if TypeObjectEnd != 0x09 {
t.Errorf("TypeObjectEnd has wrong value; got %d, expected %d", TypeObjectEnd, 0x09)
}
}
// TestStrings tests string encoding and decoding.
func TestStrings(t *testing.T) {
// Short string encoding is as follows:
// enc[0] = data type (typeString)
// end[1:3] = size
// enc[3:] = data
for _, s := range testStrings {
buf := make([]byte, len(s)+7)
_, err := EncodeString(buf, s)
if err != nil {
t.Errorf("EncodeString failed")
}
if buf[0] != typeString {
t.Errorf("Expected typeString, got %v", buf[0])
}
ds := DecodeString(buf[1:])
if s != ds {
t.Errorf("DecodeString did not produce original string, got %v", ds)
}
}
// Long string encoding is as follows:
// enc[0] = data type (typeString)
// end[1:5] = size
// enc[5:] = data
s := string(make([]byte, 65536))
buf := make([]byte, len(s)+7)
_, err := EncodeString(buf, s)
if err != nil {
t.Errorf("EncodeString failed")
}
if buf[0] != typeLongString {
t.Errorf("Expected typeLongString, got %v", buf[0])
}
ds := DecodeLongString(buf[1:])
if s != ds {
t.Errorf("DecodeLongString did not produce original string, got %v", ds)
}
}
// TestNumbers tests number encoding and encoding.
func TestNumbers(t *testing.T) {
for _, n := range testNumbers {
buf := make([]byte, 4) // NB: encoder requires an extra byte for some reason
_, err := EncodeInt24(buf, n)
if err != nil {
t.Errorf("EncodeInt24 failed")
}
dn := DecodeInt24(buf)
if n != dn {
t.Errorf("DecodeInt24 did not produce original Number, got %v", dn)
}
_, err = EncodeInt32(buf, n)
if err != nil {
t.Errorf("EncodeInt32 failed")
}
dn = DecodeInt32(buf)
if n != dn {
t.Errorf("DecodeInt32 did not produce original Number, got %v", dn)
}
}
}
// TestProperties tests encoding and decoding of properties.
func TestProperties(t *testing.T) {
var buf [1024]byte
// Encode/decode Number properties.
enc := buf[:]
var err error
for i := range testNumbers {
enc, err = EncodeProperty(&Property{Type: typeNumber, Number: float64(testNumbers[i])}, enc)
if err != nil {
t.Errorf("EncodeProperty of Number failed")
}
}
prop := Property{}
dec := buf[:]
for i := range testNumbers {
n, err := DecodeProperty(&prop, dec, false)
if err != nil {
t.Errorf("DecodeProperty of Number failed")
}
if prop.Number != float64(testNumbers[i]) {
t.Errorf("EncodeProperty/DecodeProperty returned wrong Number; got %v, expected %v", int32(prop.Number), testNumbers[i])
}
dec = dec[n:]
}
// Encode/decode string properties.
enc = buf[:]
for i := range testStrings {
enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc)
if err != nil {
t.Errorf("EncodeProperty of string failed")
}
}
prop = Property{}
dec = buf[:]
for i := range testStrings {
n, err := DecodeProperty(&prop, dec, false)
if err != nil {
t.Errorf("DecodeProperty of string failed")
}
if prop.String != testStrings[i] {
t.Errorf("EncodeProperty/DecodeProperty returned wrong string; got %s, expected %s", prop.String, testStrings[i])
}
dec = dec[n:]
}
}
// TestObject tests encoding and decoding of objects.
func TestObject(t *testing.T) {
var buf [1024]byte
// Construct a simple object that has one property, the Number 42.
prop1 := Property{Type: typeNumber, Number: 42}
obj1 := Object{}
obj1.Properties = append(obj1.Properties, prop1)
// Encode it
enc := buf[:]
var err error
enc, err = Encode(&obj1, enc)
if err != nil {
t.Errorf("Encode of object failed")
}
// Check the encoding
if buf[0] != TypeObject {
t.Errorf("Encoded wrong type; expected %d, got %v", TypeObject, buf[0])
}
if buf[1] != typeNumber {
t.Errorf("Encoded wrong type; expected %d, got %v", typeNumber, buf[0])
}
num := DecodeNumber(buf[2:10])
if num != 42 {
t.Errorf("Encoded wrong Number")
}
end := int32(DecodeInt24(buf[10:13]))
if end != TypeObjectEnd {
t.Errorf("Did not encode TypeObjectEnd")
}
// Decode it
dec := buf[1:]
var dobj1 Object
_, err = Decode(&dobj1, dec, false)
if err != nil {
t.Errorf("Decode of object failed")
}
// Change the object's property to a named boolean.
obj1.Properties[0].Name = "on"
obj1.Properties[0].Type = typeBoolean
obj1.Properties[0].Number = 1
// Re-encode it
enc = buf[:]
enc, err = Encode(&obj1, enc)
if err != nil {
t.Errorf("Encode of object failed")
}
// Decode it, this time with named set to true.
dec = buf[1:]
_, err = Decode(&dobj1, dec, true)
if err != nil {
t.Errorf("Decode of object failed with error: %v", err)
}
if dobj1.Properties[0].Number != 1 {
t.Errorf("Decoded wrong boolean value")
}
// Construct a more complicated object that includes a nested object.
var obj2 Object
for i := range testStrings {
obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]})
obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])})
}
obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1})
obj2.Properties = append(obj2.Properties, Property{Type: typeBoolean, Number: 1})
// Retrieve nested object
obj, err := obj2.ObjectProperty("", 8)
if err != err {
t.Errorf("Failed to retrieve object")
}
if len(obj.Properties) < 1 {
t.Errorf("Properties missing for nested object")
}
if obj.Properties[0].Type != typeBoolean {
t.Errorf("Wrong property type for nested object")
}
// Encode it.
enc = buf[:]
enc, err = Encode(&obj2, enc)
if err != nil {
t.Errorf("Encode of object failed")
}
// Decode it.
dec = buf[1:]
var dobj2 Object
_, err = Decode(&dobj2, dec, false)
if err != nil {
t.Errorf("Decode of object failed with error: %v", err)
}
// Find some properties that exist.
s, err := obj2.StringProperty("", 2)
if err != nil {
t.Errorf("StringProperty failed")
}
if s != "foo" {
t.Errorf("StringProperty returned wrong String")
}
n, err := obj2.NumberProperty("", 3)
if err != nil {
t.Errorf("NumberProperty failed")
}
if n != 1 {
t.Errorf("NumberProperty returned wrong Number")
}
obj, err = obj2.ObjectProperty("", 8)
if err != nil {
t.Errorf("ObjectProperty failed")
return
}
if obj.Properties[0].Type != typeBoolean {
t.Errorf("ObjectProperty returned object with wrong property")
}
prop, err := obj2.Property("", 9, typeBoolean)
if err != nil {
t.Errorf("Property failed")
return
}
if prop.Number != 1 {
t.Errorf("Property returned wrong Property")
}
// Try to find one that doesn't exist.
prop, err = obj2.Property("", 10, TypeObject)
if err != ErrPropertyNotFound {
t.Errorf("Property(10) failed")
}
}

View File

@ -1,81 +0,0 @@
/*
NAME
amf_headers.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
const (
AMF_NUMBER = iota
AMF_BOOLEAN
AMF_STRING
AMF_OBJECT
AMF_MOVIECLIP /* reserved, not used */
AMF_NULL
AMF_UNDEFINED
AMF_REFERENCE
AMF_ECMA_ARRAY
AMF_OBJECT_END
AMF_STRICT_ARRAY
AMF_DATE
AMF_LONG_STRING
AMF_UNSUPPORTED
AMF_RECORDSET /* reserved, not used */
AMF_XML_DOC
AMF_TYPED_OBJECT
AMF_AVMPLUS /* switch to AMF3 */
AMF_INVALID = 0xff
)
// typedef enum
// amf.h +40
type C_AMFDataType int32
// typedef struct AMF_Object
// amf.h +67
type C_AMFObject struct {
o_props []C_AMFObjectProperty
}
// typedef struct P_vu
// amf.h +73
type P_vu struct {
p_number float64
p_aval string
p_object C_AMFObject
}
// typedef struct AMFObjectProperty
// amf.h +79
type C_AMFObjectProperty struct {
p_name string
p_type C_AMFDataType
p_vu P_vu
p_UTCoffset int16
}

View File

@ -37,6 +37,8 @@ package rtmp
import ( import (
"encoding/binary" "encoding/binary"
"io" "io"
"bitbucket.org/ausocean/av/rtmp/amf"
) )
// Packet types. // Packet types.
@ -89,20 +91,12 @@ type packet struct {
info int32 info int32
bodySize uint32 bodySize uint32
bytesRead uint32 bytesRead uint32
chunk *chunk
header []byte header []byte
body []byte body []byte
} }
// chunk defines an RTMP packet chunk. // readFrom reads a packet from the RTMP connection.
type chunk struct { func (pkt *packet) readFrom(s *Session) error {
headerSize int32
data []byte
header [fullHeaderSize]byte
}
// read reads a packet.
func (pkt *packet) read(s *Session) error {
var hbuf [fullHeaderSize]byte var hbuf [fullHeaderSize]byte
header := hbuf[:] header := hbuf[:]
@ -182,16 +176,15 @@ func (pkt *packet) read(s *Session) error {
hSize := len(hbuf) - len(header) + size hSize := len(hbuf) - len(header) + size
if size >= 3 { if size >= 3 {
pkt.timestamp = C_AMF_DecodeInt24(header[:3]) pkt.timestamp = amf.DecodeInt24(header[:3])
if size >= 6 { if size >= 6 {
pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) pkt.bodySize = amf.DecodeInt24(header[3:6])
pkt.bytesRead = 0 pkt.bytesRead = 0
if size > 6 { if size > 6 {
pkt.packetType = header[6] pkt.packetType = header[6]
if size == 11 { if size == 11 {
pkt.info = decodeInt32LE(header[7:11]) pkt.info = int32(amf.DecodeInt32LE(header[7:11]))
} }
} }
} }
@ -204,8 +197,7 @@ func (pkt *packet) read(s *Session) error {
s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error())
return err return err
} }
// TODO: port this pkt.timestamp = amf.DecodeInt32(header[size : size+4])
pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4])
hSize += 4 hSize += 4
} }
@ -213,20 +205,13 @@ func (pkt *packet) read(s *Session) error {
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
} }
toRead := int32(pkt.bodySize - pkt.bytesRead) toRead := pkt.bodySize - pkt.bytesRead
chunkSize := s.inChunkSize chunkSize := s.inChunkSize
if toRead < chunkSize { if toRead < chunkSize {
chunkSize = toRead chunkSize = toRead
} }
if pkt.chunk != nil {
panic("non-nil chunk")
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
}
_, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil { if err != nil {
s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
@ -235,7 +220,7 @@ func (pkt *packet) read(s *Session) error {
pkt.bytesRead += uint32(chunkSize) pkt.bytesRead += uint32(chunkSize)
// keep the packet as ref for other packets on this channel // Keep the packet as a reference for other packets on this channel.
if s.channelsIn[pkt.channel] == nil { if s.channelsIn[pkt.channel] == nil {
s.channelsIn[pkt.channel] = &packet{} s.channelsIn[pkt.channel] = &packet{}
} }
@ -245,12 +230,8 @@ func (pkt *packet) read(s *Session) error {
s.channelsIn[pkt.channel].timestamp = 0xffffff s.channelsIn[pkt.channel].timestamp = 0xffffff
} }
if pkt.bytesRead != pkt.bodySize {
panic("readPacket: bytesRead != bodySize")
}
if !pkt.hasAbsTimestamp { if !pkt.hasAbsTimestamp {
// timestamps seem to always be relative // Timestamps seem to always be relative.
pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) pkt.timestamp += uint32(s.channelTimestamp[pkt.channel])
} }
s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) s.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
@ -262,6 +243,7 @@ func (pkt *packet) read(s *Session) error {
} }
// resize adjusts the packet's storage to accommodate a body of the given size and header type. // resize adjusts the packet's storage to accommodate a body of the given size and header type.
// When headerSizeAuto is specified, the header type is computed based on packet type.
func (pkt *packet) resize(size uint32, ht uint8) { func (pkt *packet) resize(size uint32, ht uint8) {
buf := make([]byte, fullHeaderSize+size) buf := make([]byte, fullHeaderSize+size)
pkt.header = buf pkt.header = buf
@ -285,10 +267,12 @@ func (pkt *packet) resize(size uint32, ht uint8) {
} }
} }
// write sends a packet. // writeTo writes a packet to the RTMP connection.
// Packets are written in chunks which are Session.chunkSize in length (128 bytes in length).
// We defer sending small audio packets and combine consecutive small audio packets where possible to reduce I/O.
// When queue is true, we expect a response to this request and cache the method on s.methodCalls. // When queue is true, we expect a response to this request and cache the method on s.methodCalls.
func (pkt *packet) write(s *Session, queue bool) error { func (pkt *packet) writeTo(s *Session, queue bool) error {
if pkt.body == nil { if pkt.body == nil || pkt.bodySize == 0 {
return errInvalidBody return errInvalidBody
} }
@ -314,7 +298,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
prevPkt := s.channelsOut[pkt.channel] prevPkt := s.channelsOut[pkt.channel]
var last int var last int
if prevPkt != nil && pkt.headerType != headerSizeLarge { if prevPkt != nil && pkt.headerType != headerSizeLarge {
// compress a bit by using the prev packet's attributes // Compress header by using the previous packet's attributes.
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium {
pkt.headerType = headerSizeSmall pkt.headerType = headerSizeSmall
} }
@ -337,7 +321,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
hSize := headerSizes[pkt.headerType] hSize := headerSizes[pkt.headerType]
origIdx := fullHeaderSize - hSize origIdx := fullHeaderSize - hSize
// adjust 1 or 2 bytes for the channel // Adjust 1 or 2 bytes depending on the channel.
cSize := 0 cSize := 0
switch { switch {
case pkt.channel > 319: case pkt.channel > 319:
@ -351,7 +335,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
hSize += cSize hSize += cSize
} }
// adjust 4 bytes for the timestamp // Adjust 4 bytes for the timestamp.
var ts uint32 var ts uint32
if prevPkt != nil { if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last) ts = uint32(int(pkt.timestamp) - last)
@ -388,16 +372,16 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
if headerSizes[pkt.headerType] > 1 { if headerSizes[pkt.headerType] > 1 {
res := ts tmp := ts
if ts > 0xffffff { if ts > 0xffffff {
res = 0xffffff tmp = 0xffffff
} }
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) amf.EncodeInt24(headBytes[headerIdx:], tmp)
headerIdx += 3 // 24bits headerIdx += 3 // 24bits
} }
if headerSizes[pkt.headerType] > 4 { if headerSizes[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) amf.EncodeInt24(headBytes[headerIdx:], pkt.bodySize)
headerIdx += 3 // 24bits headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType headBytes[headerIdx] = pkt.packetType
headerIdx++ headerIdx++
@ -409,7 +393,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
if ts >= 0xffffff { if ts >= 0xffffff {
C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) amf.EncodeInt32(headBytes[headerIdx:], ts)
headerIdx += 4 // 32bits headerIdx += 4 // 32bits
} }
@ -436,7 +420,6 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
// TODO(kortschak): Rewrite this horrific peice of premature optimisation. // TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size)
for size+hSize != 0 { for size+hSize != 0 {
if chunkSize > size { if chunkSize > size {
@ -459,6 +442,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
hSize = 0 hSize = 0
if size > 0 { if size > 0 {
// We are writing the 2nd or subsequent chunk.
origIdx -= 1 + cSize origIdx -= 1 + cSize
hSize = 1 + cSize hSize = 1 + cSize
@ -479,23 +463,20 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
if ts >= 0xffffff { if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:] extendedTimestamp := headBytes[origIdx+1+cSize:]
C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) amf.EncodeInt32(extendedTimestamp[:4], ts)
} }
} }
} }
// We invoked a remote method // If we invoked a remote method and queue is true, we queue the method until the result arrives.
if pkt.packetType == packetTypeInvoke { if pkt.packetType == packetTypeInvoke && queue {
buf := pkt.body[1:] buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf) meth := amf.DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth) s.log(DebugLevel, pkg+"queuing method "+meth)
// keep it in call queue till result arrives
if queue {
buf = buf[3+len(meth):] buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8])) txn := int32(amf.DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
} }
}
if s.channelsOut[pkt.channel] == nil { if s.channelsOut[pkt.channel] == nil {
s.channelsOut[pkt.channel] = &packet{} s.channelsOut[pkt.channel] = &packet{}
@ -504,12 +485,3 @@ func (pkt *packet) write(s *Session, queue bool) error {
return nil return nil
} }
func decodeInt32LE(data []byte) int32 {
return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0])
}
func encodeInt32LE(dst []byte, v int32) int32 {
binary.LittleEndian.PutUint32(dst, uint32(v))
return 4
}

View File

@ -42,11 +42,12 @@ import (
"net" "net"
"strconv" "strconv"
"time" "time"
"bitbucket.org/ausocean/av/rtmp/amf"
) )
const ( const (
pkg = "rtmp:" pkg = "rtmp:"
minDataSize = 11 // ToDo: this should be the same as fullHeaderSize
signatureSize = 1536 signatureSize = 1536
fullHeaderSize = 12 fullHeaderSize = 12
) )
@ -151,35 +152,25 @@ var rtmpProtocolStrings = [...]string{
// RTMP errors. // RTMP errors.
var ( var (
errUnknownScheme = errors.New("rtmp: unknown scheme") errUnknownScheme = errors.New("rtmp: unknown scheme")
errInvalidURL = errors.New("rtmp: invalid URL")
errConnected = errors.New("rtmp: already connected") errConnected = errors.New("rtmp: already connected")
errNotConnected = errors.New("rtmp: not connected") errNotConnected = errors.New("rtmp: not connected")
errNotWritable = errors.New("rtmp: connection not writable") errNotWritable = errors.New("rtmp: connection not writable")
errHandshake = errors.New("rtmp: handshake failed")
errConnSend = errors.New("rtmp: connection send error")
errConnStream = errors.New("rtmp: connection stream error")
errInvalidHeader = errors.New("rtmp: invalid header") errInvalidHeader = errors.New("rtmp: invalid header")
errInvalidBody = errors.New("rtmp: invalid body") errInvalidBody = errors.New("rtmp: invalid body")
errTinyPacket = errors.New("rtmp: packet too small") errInvalidFlvTag = errors.New("rtmp: invalid FLV tag")
errEncoding = errors.New("rtmp: encoding error")
errDecoding = errors.New("rtmp: decoding error")
errUnimplemented = errors.New("rtmp: unimplemented feature") errUnimplemented = errors.New("rtmp: unimplemented feature")
) )
// setupURL parses the RTMP URL. // init initialises the Session link
func setupURL(s *Session) (err error) { func (s *Session) init() (err error) {
s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url) s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url)
if err != nil { if err != nil {
return err return err
} }
if s.link.app == "" {
if s.link.tcUrl == "" { return errInvalidURL
if s.link.app != "" {
s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app
} else {
s.link.tcUrl = s.url
} }
}
if s.link.port == 0 { if s.link.port == 0 {
switch { switch {
case (s.link.protocol & featureSSL) != 0: case (s.link.protocol & featureSSL) != 0:
@ -191,6 +182,8 @@ func setupURL(s *Session) (err error) {
s.link.port = 1935 s.link.port = 1935
} }
} }
s.link.url = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app
s.link.protocol |= featureWrite
return nil return nil
} }
@ -209,13 +202,13 @@ func connect(s *Session) error {
err = handshake(s) err = handshake(s)
if err != nil { if err != nil {
s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) s.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
return errHandshake return err
} }
s.log(DebugLevel, pkg+"handshaked") s.log(DebugLevel, pkg+"handshaked")
err = sendConnectPacket(s) err = sendConnectPacket(s)
if err != nil { if err != nil {
s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
return errConnSend return err
} }
return nil return nil
} }
@ -225,7 +218,7 @@ func connectStream(s *Session) error {
var err error var err error
for !s.isPlaying { for !s.isPlaying {
pkt := packet{} pkt := packet{}
err = pkt.read(s) err = pkt.readFrom(s)
if err != nil { if err != nil {
break break
} }
@ -248,53 +241,39 @@ func connectStream(s *Session) error {
} }
// handlePacket handles a packet that the client has received. // handlePacket handles a packet that the client has received.
// NB: cases have been commented out that are not currently used by AusOcean // NB: Unsupported packet types are logged fatally.
func handlePacket(s *Session, pkt *packet) error { func handlePacket(s *Session, pkt *packet) error {
switch pkt.packetType { if pkt.bodySize < 4 {
case packetTypeChunkSize: return errInvalidBody
if pkt.bodySize >= 4 {
s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4]))
} }
case packetTypeBytesReadReport: switch pkt.packetType {
s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case packetTypeChunkSize:
s.inChunkSize = amf.DecodeInt32(pkt.body[:4])
case packetTypeControl: case packetTypeBytesReadReport:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl") s.serverBW = amf.DecodeInt32(pkt.body[:4])
case packetTypeServerBW: case packetTypeServerBW:
s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) s.serverBW = amf.DecodeInt32(pkt.body[:4])
case packetTypeClientBW: case packetTypeClientBW:
s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) s.clientBW = amf.DecodeInt32(pkt.body[:4])
if pkt.bodySize > 4 { if pkt.bodySize > 4 {
s.clientBW2 = pkt.body[4] s.clientBW2 = pkt.body[4]
} else { } else {
s.clientBW2 = 0xff s.clientBW2 = 0xff
} }
case packetTypeAudio:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio")
case packetTypeVideo:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo")
case packetTypeFlexMessage:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage")
case packetTypeInfo:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo")
case packetTypeInvoke: case packetTypeInvoke:
err := handleInvoke(s, pkt.body[:pkt.bodySize]) err := handleInvoke(s, pkt.body[:pkt.bodySize])
if err != nil { if err != nil {
// This will never happen with the methods we implement.
s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error())
return err return err
} }
case packetTypeFlashVideo: case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo:
s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo") s.log(FatalLevel, pkg+"unsupported packet type "+strconv.Itoa(int(pkt.packetType)))
default: default:
s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType)
@ -313,110 +292,50 @@ func sendConnectPacket(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avConnect) enc, err := amf.EncodeString(enc, avConnect)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes += 1 s.numInvokes += 1
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_OBJECT
enc[0] = amf.TypeObject
enc = enc[1:] enc = enc[1:]
enc, err = amf.EncodeNamedString(enc, avApp, s.link.app)
enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app) if err != nil {
if enc == nil { return err
return errEncoding
} }
if s.link.protocol&featureWrite != 0 { enc, err = amf.EncodeNamedString(enc, avType, avNonprivate)
enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate) if err != nil {
if enc == nil { return err
return errEncoding
} }
enc, err = amf.EncodeNamedString(enc, avTcUrl, s.link.url)
if err != nil {
return err
}
enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd)
if err != nil {
return err
} }
if s.link.flashVer != "" { // add auth string, if any
enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer)
if enc == nil {
return errEncoding
}
}
if s.link.swfUrl != "" {
enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl)
if enc == nil {
return errEncoding
}
}
if s.link.tcUrl != "" {
enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl)
if enc == nil {
return errEncoding
}
}
if s.link.protocol&featureWrite == 0 {
enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1)
if enc == nil {
return errEncoding
}
if s.link.pageUrl != "" {
enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl)
if enc == nil {
return errEncoding
}
}
}
if s.encoding != 0.0 || s.sendEncoding {
enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding)
if enc == nil {
return errEncoding
}
}
copy(enc, []byte{0, 0, AMF_OBJECT_END})
enc = enc[3:]
// add auth string
if s.link.auth != "" { if s.link.auth != "" {
enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0) enc, err = amf.EncodeBoolean(enc, s.link.flags&linkAuth != 0)
if enc == nil { if err != nil {
return errEncoding return err
} }
enc = C_AMF_EncodeString(enc, s.link.auth) enc, err = amf.EncodeString(enc, s.link.auth)
if enc == nil { if err != nil {
return errEncoding return err
}
}
for i := range s.link.extras.o_props {
enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc)
if enc == nil {
return errEncoding
} }
} }
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected return pkt.writeTo(s, true) // response expected
} }
func sendCreateStream(s *Session) error { func sendCreateStream(s *Session) error {
@ -430,21 +349,21 @@ func sendCreateStream(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avCreatestream) enc, err := amf.EncodeString(enc, avCreatestream)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected return pkt.writeTo(s, true) // response expected
} }
func sendReleaseStream(s *Session) error { func sendReleaseStream(s *Session) error {
@ -458,24 +377,24 @@ func sendReleaseStream(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avReleasestream) enc, err := amf.EncodeString(enc, avReleasestream)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath) enc, err = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if err != nil {
return errEncoding return err
} }
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false) return pkt.writeTo(s, false)
} }
func sendFCPublish(s *Session) error { func sendFCPublish(s *Session) error {
@ -489,25 +408,25 @@ func sendFCPublish(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avFCPublish) enc, err := amf.EncodeString(enc, avFCPublish)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath) enc, err = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if err != nil {
return errEncoding return err
} }
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false) return pkt.writeTo(s, false)
} }
func sendFCUnpublish(s *Session) error { func sendFCUnpublish(s *Session) error {
@ -521,25 +440,25 @@ func sendFCUnpublish(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avFCUnpublish) enc, err := amf.EncodeString(enc, avFCUnpublish)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath) enc, err = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if err != nil {
return errEncoding return err
} }
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false) return pkt.writeTo(s, false)
} }
func sendPublish(s *Session) error { func sendPublish(s *Session) error {
@ -553,29 +472,29 @@ func sendPublish(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avPublish) enc, err := amf.EncodeString(enc, avPublish)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath) enc, err = amf.EncodeString(enc, s.link.playpath)
if enc == nil { if err != nil {
return errEncoding return err
} }
enc = C_AMF_EncodeString(enc, avLive) enc, err = amf.EncodeString(enc, avLive)
if enc == nil { if err != nil {
return errEncoding return err
} }
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected return pkt.writeTo(s, true) // response expected
} }
func sendDeleteStream(s *Session, dStreamId float64) error { func sendDeleteStream(s *Session, dStreamId float64) error {
@ -589,24 +508,24 @@ func sendDeleteStream(s *Session, dStreamId float64) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, avDeletestream) enc, err := amf.EncodeString(enc, avDeletestream)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeNumber(enc, dStreamId) enc, err = amf.EncodeNumber(enc, dStreamId)
if enc == nil { if err != nil {
return errEncoding return err
} }
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false) return pkt.writeTo(s, false)
} }
// sendBytesReceived tells the server how many bytes the client has received. // sendBytesReceived tells the server how many bytes the client has received.
@ -622,13 +541,14 @@ func sendBytesReceived(s *Session) error {
enc := pkt.body enc := pkt.body
s.nBytesInSent = s.nBytesIn s.nBytesInSent = s.nBytesIn
enc = C_AMF_EncodeInt32(enc, s.nBytesIn)
if enc == nil { enc, err := amf.EncodeInt32(enc, s.nBytesIn)
return errEncoding if err != nil {
return err
} }
pkt.bodySize = 4 pkt.bodySize = 4
return pkt.write(s, false) return pkt.writeTo(s, false)
} }
func sendCheckBW(s *Session) error { func sendCheckBW(s *Session) error {
@ -642,21 +562,21 @@ func sendCheckBW(s *Session) error {
} }
enc := pkt.body enc := pkt.body
enc = C_AMF_EncodeString(enc, av_checkbw) enc, err := amf.EncodeString(enc, av_checkbw)
if enc == nil { if err != nil {
return errEncoding return err
} }
s.numInvokes++ s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if enc == nil { if err != nil {
return errEncoding return err
} }
enc[0] = AMF_NULL enc[0] = amf.TypeNull
enc = enc[1:] enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false) return pkt.writeTo(s, false)
} }
func eraseMethod(m []method, i int) []method { func eraseMethod(m []method, i int) []method {
@ -671,18 +591,27 @@ func handleInvoke(s *Session, body []byte) error {
if body[0] != 0x02 { if body[0] != 0x02 {
return errInvalidBody return errInvalidBody
} }
var obj C_AMFObject var obj amf.Object
nRes := C_AMF_Decode(&obj, body, 0) _, err := amf.Decode(&obj, body, false)
if nRes < 0 { if err != nil {
return errDecoding return err
} }
meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) meth, err := obj.StringProperty("", 0)
s.log(DebugLevel, pkg+"invoking method "+meth) if err != nil {
txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) return err
}
txn, err := obj.NumberProperty("", 1)
if err != nil {
return err
}
s.log(DebugLevel, pkg+"invoking method "+meth)
switch meth { switch meth {
case av_result: case av_result:
if (s.link.protocol & featureWrite) == 0 {
return errNotWritable
}
var methodInvoked string var methodInvoked string
for i, m := range s.methodCalls { for i, m := range s.methodCalls {
if float64(m.num) == txn { if float64(m.num) == txn {
@ -693,18 +622,12 @@ func handleInvoke(s *Session, body []byte) error {
} }
if methodInvoked == "" { if methodInvoked == "" {
s.log(WarnLevel, pkg+"received result without matching request", "id", txn) s.log(WarnLevel, pkg+"received result without matching request", "id", txn)
goto leave return nil
} }
s.log(DebugLevel, pkg+"received result for "+methodInvoked) s.log(DebugLevel, pkg+"received result for "+methodInvoked)
switch methodInvoked { switch methodInvoked {
case avConnect: case avConnect:
if s.link.token != "" {
s.log(FatalLevel, pkg+"no support for link token")
}
if (s.link.protocol & featureWrite) == 0 {
return errNotWritable
}
err := sendReleaseStream(s) err := sendReleaseStream(s)
if err != nil { if err != nil {
return err return err
@ -719,86 +642,56 @@ func handleInvoke(s *Session, body []byte) error {
} }
case avCreatestream: case avCreatestream:
s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) n, err := obj.NumberProperty("", 3)
if s.link.protocol&featureWrite == 0 { if err != nil {
return errNotWritable return err
} }
err := sendPublish(s) s.streamID = int32(n)
err = sendPublish(s)
if err != nil { if err != nil {
return err return err
} }
case avPlay, avPublish: default:
s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish") s.log(FatalLevel, pkg+"unexpected method invoked"+methodInvoked)
} }
case avOnBWDone: case avOnBWDone:
if s.checkCounter == 0 { // ToDo: why is this always zero?
err := sendCheckBW(s) err := sendCheckBW(s)
if err != nil { if err != nil {
return err return err
} }
}
case avOnFCUnsubscribe, avOnFCSubscribe:
s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe")
case avPing:
s.log(FatalLevel, pkg+"unsupported method avPing")
case av_onbwcheck:
s.log(FatalLevel, pkg+"unsupported method av_onbwcheck")
case av_onbwdone:
s.log(FatalLevel, pkg+"unsupported method av_onbwdone")
case avClose:
s.log(FatalLevel, pkg+"unsupported method avClose")
case avOnStatus: case avOnStatus:
var obj2 C_AMFObject obj2, err := obj.ObjectProperty("", 3)
C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) if err != nil {
code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1)) return err
level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1)) }
code, err := obj2.StringProperty(avCode, -1)
if err != nil {
return err
}
level, err := obj2.StringProperty(avLevel, -1)
if err != nil {
return err
}
s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level)
switch code { if code != avNetStreamPublish_Start {
case avNetStreamFailed, avNetStreamPlayFailed, s.log(ErrorLevel, pkg+"unexpected response "+code)
avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp: return errUnimplemented
s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp") }
case avNetStreamPlayStart, avNetStreamPlayPublishNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify")
case avNetStreamPublish_Start:
s.log(DebugLevel, pkg+"playing") s.log(DebugLevel, pkg+"playing")
s.isPlaying = true s.isPlaying = true
for i, m := range s.methodCalls { for i, m := range s.methodCalls {
if m.name == avPublish { if m.name == avPublish {
s.methodCalls = eraseMethod(s.methodCalls, i) s.methodCalls = eraseMethod(s.methodCalls, i)
break
} }
} }
// ToDo: handle case when avPublish method not found
case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify")
case avNetStreamSeekNotify:
s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify")
case avNetStreamPauseNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify")
}
case avPlaylist_ready:
s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready")
default: default:
s.log(FatalLevel, pkg+"unknown method "+meth) s.log(FatalLevel, pkg+"unsuppoted method "+meth)
} }
leave:
C_AMF_Reset(&obj)
return nil return nil
} }

View File

@ -116,42 +116,74 @@ func TestKey(t *testing.T) {
testLog(0, "Testing against URL "+testBaseURL+testKey) testLog(0, "Testing against URL "+testBaseURL+testKey)
} }
// TestSetupURL tests URL parsing. // TestInit tests session construction and link initialization.
func TestSetupURL(t *testing.T) { func TestInit(t *testing.T) {
testLog(0, "TestSetupURL") testLog(0, "TestInit")
// test with just the base URL // test with just the base URL
s := NewSession(testBaseURL, testTimeout, testLog) s := NewSession(testBaseURL, testTimeout, testLog)
if s.url != testBaseURL && s.link.timeout != testTimeout { if s.url != testBaseURL && s.link.timeout != testTimeout {
t.Errorf("NewSession failed") t.Errorf("NewSession failed")
} }
err := setupURL(s) err := s.init()
if err != nil { if err != nil {
t.Errorf("setupURL(testBaseURL) failed with error: %v", err) t.Errorf("setupURL: failed with error: %v", err)
} }
// test the parts are as expected // test the parts are as expected
if rtmpProtocolStrings[s.link.protocol] != rtmpProtocol { if s.link.protocol&featureWrite == 0 {
t.Errorf("setupURL returned wrong protocol: %v", s.link.protocol) t.Errorf("setupURL: link not writable")
}
if rtmpProtocolStrings[s.link.protocol&^featureWrite] != rtmpProtocol {
t.Errorf("setupURL: wrong protocol: %v", s.link.protocol)
} }
if s.link.host != testHost { if s.link.host != testHost {
t.Errorf("setupURL returned wrong host: %v", s.link.host) t.Errorf("setupURL: wrong host: %v", s.link.host)
} }
if s.link.app != testApp { if s.link.app != testApp {
t.Errorf("setupURL returned wrong app: %v", s.link.app) t.Errorf("setupURL: wrong app: %v", s.link.app)
} }
} }
// TestOpenClose tests opening an closing an RTMP connection. // TestErrorHandling tests error handling
func TestOpenClose(t *testing.T) { func TestErorHandling(t *testing.T) {
testLog(0, "TestOpenClose") testLog(0, "TestErrorHandling")
if testKey == "" { if testKey == "" {
t.Skip("Skipping TestOpenClose since no RTMP_TEST_KEY") t.Skip("Skipping TestErrorHandling since no RTMP_TEST_KEY")
} }
s := NewSession(testBaseURL+testKey, testTimeout, testLog) s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
// test errNotConnected
var buf [1024]byte
tag := buf[:0]
_, err := s.Write(tag)
if err == nil {
t.Errorf("Write did not return errNotConnected")
}
err = s.Open()
if err != nil { if err != nil {
t.Errorf("Open failed with error: %v", err) t.Errorf("Open failed with error: %v", err)
return
} }
// test errInvalidFlvTag
_, err = s.Write(tag)
if err == nil {
t.Errorf("Write did not return errInvalidFlvTag")
}
// test errUnimplemented
copy(tag, []byte("FLV"))
_, err = s.Write(tag)
if err == nil {
t.Errorf("Write did not return errUnimplemented")
}
// test errInvalidBody
tag = buf[:11]
_, err = s.Write(tag)
if err == nil {
t.Errorf("Write did not return errInvalidBody")
}
err = s.Close() err = s.Close()
if err != nil { if err != nil {
t.Errorf("Close failed with error: %v", err) t.Errorf("Close failed with error: %v", err)

View File

@ -37,22 +37,22 @@ import (
"io" "io"
"net" "net"
"time" "time"
"bitbucket.org/ausocean/av/rtmp/amf"
) )
// Session holds the state for an RTMP session. // Session holds the state for an RTMP session.
type Session struct { type Session struct {
url string url string
inChunkSize int32 inChunkSize uint32
outChunkSize int32 outChunkSize uint32
checkCounter int32 nBytesIn uint32
nBytesIn int32 nBytesInSent uint32
nBytesInSent int32
streamID int32 streamID int32
serverBW int32 serverBW uint32
clientBW int32 clientBW uint32
clientBW2 uint8 clientBW2 uint8
isPlaying bool isPlaying bool
sendEncoding bool
numInvokes int32 numInvokes int32
methodCalls []method methodCalls []method
channelsAllocatedIn int32 channelsAllocatedIn int32
@ -62,7 +62,6 @@ type Session struct {
channelTimestamp []int32 channelTimestamp []int32
audioCodecs float64 audioCodecs float64
videoCodecs float64 videoCodecs float64
encoding float64
deferred []byte deferred []byte
link link link link
log Log log Log
@ -72,16 +71,10 @@ type Session struct {
type link struct { type link struct {
host string host string
playpath string playpath string
tcUrl string url string
swfUrl string
pageUrl string
app string app string
auth string auth string
flashVer string
token string
extras C_AMFObject
flags int32 flags int32
swfAge int32
protocol int32 protocol int32
timeout uint timeout uint
port uint16 port uint16
@ -106,6 +99,10 @@ const (
FatalLevel int8 = 5 FatalLevel int8 = 5
) )
// flvTagheaderSize is the FLV header size we expect.
// NB: We don't accept extended headers.
const flvTagheaderSize = 11
// NewSession returns a new Session. // NewSession returns a new Session.
func NewSession(url string, timeout uint, log Log) *Session { func NewSession(url string, timeout uint, log Log) *Session {
return &Session{ return &Session{
@ -120,7 +117,6 @@ func NewSession(url string, timeout uint, log Log) *Session {
log: log, log: log,
link: link{ link: link{
timeout: timeout, timeout: timeout,
swfAge: 30,
}, },
} }
} }
@ -131,12 +127,10 @@ func (s *Session) Open() error {
if s.isConnected() { if s.isConnected() {
return errConnected return errConnected
} }
err := setupURL(s) err := s.init()
if err != nil { if err != nil {
return err return err
} }
s.enableWrite()
err = connect(s) err = connect(s)
if err != nil { if err != nil {
s.Close() s.Close()
@ -174,8 +168,8 @@ func (s *Session) Write(data []byte) (int, error) {
if !s.isConnected() { if !s.isConnected() {
return 0, errNotConnected return 0, errNotConnected
} }
if len(data) < minDataSize { if len(data) < flvTagheaderSize {
return 0, errTinyPacket return 0, errInvalidFlvTag
} }
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented return 0, errUnimplemented
@ -183,15 +177,15 @@ func (s *Session) Write(data []byte) (int, error) {
pkt := packet{ pkt := packet{
packetType: data[0], packetType: data[0],
bodySize: C_AMF_DecodeInt24(data[1:4]), bodySize: amf.DecodeInt24(data[1:4]),
timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24, timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: chanSource, channel: chanSource,
info: s.streamID, info: s.streamID,
} }
pkt.resize(pkt.bodySize, headerSizeAuto) pkt.resize(pkt.bodySize, headerSizeAuto)
copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize]) copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize])
err := pkt.write(s, false) err := pkt.writeTo(s, false)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -213,7 +207,7 @@ func (s *Session) read(buf []byte) (int, error) {
s.log(DebugLevel, pkg+"read failed", "error", err.Error()) s.log(DebugLevel, pkg+"read failed", "error", err.Error())
return 0, err return 0, err
} }
s.nBytesIn += int32(n) s.nBytesIn += uint32(n)
if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) {
err := sendBytesReceived(s) err := sendBytesReceived(s)
if err != nil { if err != nil {
@ -243,7 +237,3 @@ func (s *Session) isConnected() bool {
return s.link.conn != nil return s.link.conn != nil
} }
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= featureWrite
}