revid/revid.go: fixed conflict

This commit is contained in:
saxon 2019-01-16 17:47:57 +10:30
commit 606c1a5885
14 changed files with 1139 additions and 1094 deletions

View File

@ -75,10 +75,13 @@ func main() {
// run revid for the specified duration
rv, _, err := startRevid(nil, cfg)
if err != nil {
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", err.Error())
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
}
time.Sleep(*runDurationPtr)
stopRevid(rv)
err = stopRevid(rv)
if err != nil {
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
}
return
}
@ -296,7 +299,11 @@ func run(rv *revid.Revid, cfg revid.Config) error {
if vars["mode"] == "Paused" {
if !paused {
log.Log(logger.Info, pkg+"pausing revid")
stopRevid(rv)
err = stopRevid(rv)
if err != nil {
log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error())
continue
}
paused = true
}
} else {
@ -343,21 +350,28 @@ func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Con
if err != nil {
return nil, cfg, err
}
rv.Start()
return rv, cfg, nil
err = rv.Start()
return rv, cfg, err
}
func stopRevid(rv *revid.Revid) {
rv.Stop()
func stopRevid(rv *revid.Revid) error {
err := rv.Stop()
if err != nil {
return err
}
// FIXME(kortschak): Is this waiting on completion of work?
// Use a wait group and Wait method if it is.
time.Sleep(revidStopTime)
return nil
}
func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) {
if stop {
stopRevid(rv)
err := stopRevid(rv)
if err != nil {
return nil, cfg, err
}
}
//look through the vars and update revid where needed

View File

@ -56,6 +56,7 @@ const (
readTimeout = 10 * time.Millisecond
)
<<<<<<< HEAD
// RTMP connection properties.
const (
rtmpConnectionMaxTries = 5
@ -84,6 +85,8 @@ const (
Detail = "Detail"
)
=======
>>>>>>> master
type Logger interface {
SetLevel(int8)
Log(level int8, message string, params ...interface{})
@ -182,16 +185,6 @@ func (r *Revid) Bitrate() int {
return r.bitrate
}
// Config returns the Revid's config.
func (r *Revid) Config() *Config {
// FIXME(kortschak): This is a massive footgun and should not exist.
// Since the config's fields are accessed in running goroutines, any
// mutation is a data race. With bad luck a data race is possible by
// reading the returned value since it is possible for the running
// Ravid to mutate the config it holds.
return &r.config
}
// reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid.
func (r *Revid) reset(config Config) error {
@ -307,10 +300,9 @@ func (r *Revid) IsRunning() bool {
// Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() {
func (r *Revid) Start() error {
if r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"revid.Start() called but revid already running")
return
return errors.New(pkg + "start called but revid is already running")
}
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
@ -318,14 +310,14 @@ func (r *Revid) Start() {
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
go r.setupInput()
err := r.setupInput()
return err
}
// Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() {
func (r *Revid) Stop() error {
if !r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"revid.Stop() called but revid not running")
return
return errors.New(pkg + "stop called but revid is already stopped")
}
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
@ -336,6 +328,7 @@ func (r *Revid) Stop() {
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
}
return nil
}
// outputClips takes the clips produced in the packClips method and outputs them

View File

@ -1,579 +0,0 @@
/*
NAME
amf.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
amf.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"encoding/binary"
"log"
"math"
)
var (
AMFObj_Invalid C_AMFObject
AMFProp_Invalid = C_AMFObjectProperty{p_type: AMF_INVALID}
)
const (
AMF3_INTEGER_MAX = 268435455
AMF3_INTEGER_MIN = -268435456
)
// unsigned short AMF_DecodeInt16(const char* data);
// amf.c +41
func C_AMF_DecodeInt16(data []byte) uint16 {
return uint16(data[0])<<8 | uint16(data[1])
}
// unsigned int AMF_DecodeInt24(const char* data);
// amf.c +50
func C_AMF_DecodeInt24(data []byte) uint32 {
return uint32(data[0])<<16 | uint32(data[1])<<8 | uint32(data[2])
}
// unsigned int AMF_DeocdeInt32(const char* data);
// amf.c +59
func C_AMF_DecodeInt32(data []byte) uint32 {
return uint32(data[0])<<24 | uint32(data[1])<<16 | uint32(data[2])<<8 | uint32(data[3])
}
// void AMF_DecodeString(const char* data, C_AVal* bv);
// amf.c +68
func C_AMF_DecodeString(data []byte) string {
n := C_AMF_DecodeInt16(data)
return string(data[2 : 2+n])
}
// void AMF_DecodeLongString(const char *data, AVal *bv);
// amf.c +75
func C_AMF_DecodeLongString(data []byte) string {
n := C_AMF_DecodeInt32(data)
return string(data[2 : 2+n])
}
// double AMF_DecodeNumber(const char* data);
// amf.c +82
func C_AMF_DecodeNumber(data []byte) float64 {
return math.Float64frombits(binary.BigEndian.Uint64(data))
}
// int AMF_DecodeBoolean(const char *data);
// amf.c +132
func C_AMF_DecodeBoolean(data []byte) bool {
return data[0] != 0
}
// char* AMF_EncodeInt24(char* output, char* outend, int nVal);
// amf.c +149
func C_AMF_EncodeInt24(dst []byte, val int32) []byte {
if len(dst) < 3 {
return nil
}
_ = dst[2]
dst[0] = byte(val >> 16)
dst[1] = byte(val >> 8)
dst[2] = byte(val)
if len(dst) == 3 {
return nil
}
return dst[3:]
}
// char* AMF_EncodeInt32(char* output, char* outend, int nVal);
// amf.c +160
func C_AMF_EncodeInt32(dst []byte, val int32) []byte {
if len(dst) < 4 {
return nil
}
binary.BigEndian.PutUint32(dst, uint32(val))
if len(dst) == 4 {
return nil
}
return dst[4:]
}
// char* AMF_EncodeString(char* output, char* outend, const C_AVal* bv);
// amf.c +173
func C_AMF_EncodeString(dst []byte, val string) []byte {
const typeSize = 1
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(dst) {
return nil
}
if len(val)+typeSize+binary.Size(int32(0)) > len(dst) {
return nil
}
if len(val) < 65536 {
dst[0] = AMF_STRING
dst = dst[1:]
binary.BigEndian.PutUint16(dst[:2], uint16(len(val)))
dst = dst[2:]
copy(dst, val)
if len(dst) == len(val) {
return nil
}
return dst[len(val):]
}
dst[0] = AMF_LONG_STRING
dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(val)))
dst = dst[4:]
copy(dst, val)
if len(dst) == len(val) {
return nil
}
return dst[len(val):]
}
// char* AMF_EncodeNumber(char* output, char* outend, double dVal);
// amf.c +199
func C_AMF_EncodeNumber(dst []byte, val float64) []byte {
if len(dst) < 9 {
return nil
}
dst[0] = AMF_NUMBER
dst = dst[1:]
binary.BigEndian.PutUint64(dst, math.Float64bits(val))
return dst[8:]
}
// char* AMF_EncodeBoolean(char* output, char* outend, int bVal);
// amf.c +260
func C_AMF_EncodeBoolean(dst []byte, val bool) []byte {
if len(dst) < 2 {
return nil
}
dst[0] = AMF_BOOLEAN
if val {
dst[1] = 1
}
if len(dst) == 2 {
return nil
}
return dst[2:]
}
// char* AMF_EncodeNamedString(char* output, char* outend, const C_AVal* strName, const C_AVal* strValue);
// amf.c +273
func C_AMF_EncodeNamedString(dst []byte, key, val string) []byte {
if 2+len(key) > len(dst) {
return nil
}
binary.BigEndian.PutUint16(dst[:2], uint16(len(key)))
dst = dst[2:]
copy(dst, key)
if len(key) == len(dst) {
return nil
}
return C_AMF_EncodeString(dst[len(key):], val)
}
// char* AMF_EncodeNamedNumber(char* output, char* outend, const C_AVal* strName, double dVal);
// amf.c +286
func C_AMF_EncodeNamedNumber(dst []byte, key string, val float64) []byte {
if 2+len(key) > len(dst) {
return nil
}
binary.BigEndian.PutUint16(dst[:2], uint16(len(key)))
dst = dst[2:]
copy(dst, key)
if len(key) == len(dst) {
return nil
}
return C_AMF_EncodeNumber(dst[len(key):], val)
}
// char* AMF_EncodeNamedBoolean(char* output, char* outend, const C_AVal* strname, int bVal);
// amf.c +299
func C_AMF_EncodeNamedBoolean(dst []byte, key string, val bool) []byte {
if 2+len(key) > len(dst) {
return nil
}
binary.BigEndian.PutUint16(dst[:2], uint16(len(key)))
dst = dst[2:]
copy(dst, key)
if len(key) == len(dst) {
return nil
}
return C_AMF_EncodeBoolean(dst[len(key):], val)
}
// void AMFProp_SetName(AMFObjectProperty *prop, AVal *name);
// amf.c +318
func C_AMFProp_SetName(prop *C_AMFObjectProperty, name string) {
prop.p_name = name
}
// double AMFProp_GetNumber(AMFObjectProperty* prop);
// amf.c +330
func C_AMFProp_GetNumber(prop *C_AMFObjectProperty) float64 {
return prop.p_vu.p_number
}
// void AMFProp_GetString(AMFObjectProperty* prop, AVal* str);
// amf.c +341
func C_AMFProp_GetString(prop *C_AMFObjectProperty) string {
if prop.p_type == AMF_STRING {
return prop.p_vu.p_aval
}
return ""
}
// void AMFProp_GetObject(AMFObjectProperty *prop, AMFObject *obj);
// amf.c +351
func C_AMFProp_GetObject(prop *C_AMFObjectProperty, obj *C_AMFObject) {
if prop.p_type == AMF_OBJECT {
*obj = prop.p_vu.p_object
} else {
*obj = AMFObj_Invalid
}
}
// char* AMFPropEncode(AMFOBjectProperty* prop, char* pBufer, char* pBufEnd);
// amf.c +366
func C_AMF_PropEncode(p *C_AMFObjectProperty, dst []byte) []byte {
if p.p_type == AMF_INVALID {
return nil
}
if p.p_type != AMF_NULL && len(p.p_name)+2+1 >= len(dst) {
return nil
}
if p.p_type != AMF_NULL && len(p.p_name) != 0 {
binary.BigEndian.PutUint16(dst[:2], uint16(len(p.p_name)))
dst = dst[2:]
copy(dst, p.p_name)
dst = dst[len(p.p_name):]
}
switch p.p_type {
case AMF_NUMBER:
dst = C_AMF_EncodeNumber(dst, p.p_vu.p_number)
case AMF_BOOLEAN:
dst = C_AMF_EncodeBoolean(dst, p.p_vu.p_number != 0)
case AMF_STRING:
dst = C_AMF_EncodeString(dst, p.p_vu.p_aval)
case AMF_NULL:
if len(dst) < 2 {
return nil
}
dst[0] = AMF_NULL
dst = dst[1:]
case AMF_OBJECT:
dst = C_AMF_Encode(&p.p_vu.p_object, dst)
case AMF_ECMA_ARRAY:
dst = C_AMF_EncodeEcmaArray(&p.p_vu.p_object, dst)
case AMF_STRICT_ARRAY:
dst = C_AMF_EncodeArray(&p.p_vu.p_object, dst)
default:
log.Println("C_AMF_PropEncode: invalid type!")
dst = nil
}
return dst
}
// int AMFProp_Decode(C_AMFObjectProperty* prop, const char* pBuffer, int nSize, int bDecodeName);
// amf.c +619
func C_AMFProp_Decode(prop *C_AMFObjectProperty, data []byte, bDecodeName int32) int32 {
prop.p_name = ""
nOriginalSize := len(data)
if len(data) == 0 {
// TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s: Empty buffer/no buffer pointer!", __FUNCTION__);
return -1
}
if bDecodeName != 0 && len(data) < 4 {
// at least name (length + at least 1 byte) and 1 byte of data
// TODO use new logger here
// RTMP_Log(RTMP_LOGDEBUG, "%s: Not enough data for decoding with name, less than 4 bytes!",__FUNCTION__);
return -1
}
if bDecodeName != 0 {
nNameSize := C_AMF_DecodeInt16(data[:2])
if int(nNameSize) > len(data)-2 {
// TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s: Name size out of range: namesize (%d) > len (%d) - 2",__FUNCTION__, nNameSize, nSize);
return -1
}
prop.p_name = C_AMF_DecodeString(data)
data = data[2+nNameSize:]
}
if len(data) == 0 {
return -1
}
prop.p_type = C_AMFDataType(data[0])
data = data[1:]
var nRes int32
switch prop.p_type {
case AMF_NUMBER:
if len(data) < 8 {
return -1
}
prop.p_vu.p_number = C_AMF_DecodeNumber(data[:8])
data = data[8:]
case AMF_BOOLEAN:
panic("AMF_BOOLEAN not supported")
case AMF_STRING:
nStringSize := C_AMF_DecodeInt16(data[:2])
if len(data) < int(nStringSize+2) {
return -1
}
prop.p_vu.p_aval = C_AMF_DecodeString(data)
data = data[2+nStringSize:]
case AMF_OBJECT:
nRes := C_AMF_Decode(&prop.p_vu.p_object, data, 1)
if nRes == -1 {
return -1
}
data = data[nRes:]
case AMF_MOVIECLIP:
// TODO use new logger here
log.Println("AMFProp_Decode: MAF_MOVIECLIP reserved!")
//RTMP_Log(RTMP_LOGERROR, "AMF_MOVIECLIP reserved!");
return -1
case AMF_NULL, AMF_UNDEFINED, AMF_UNSUPPORTED:
prop.p_type = AMF_NULL
case AMF_REFERENCE:
// TODO use new logger here
log.Println("AMFProp_Decode: AMF_REFERENCE not supported!")
//RTMP_Log(RTMP_LOGERROR, "AMF_REFERENCE not supported!");
return -1
case AMF_ECMA_ARRAY:
// next comes the rest, mixed array has a final 0x000009 mark and names, so its an object
data = data[4:]
nRes = C_AMF_Decode(&prop.p_vu.p_object, data, 1)
if nRes == -1 {
return -1
}
data = data[nRes:]
case AMF_OBJECT_END:
return -1
case AMF_STRICT_ARRAY:
panic("AMF_STRICT_ARRAY not supported")
case AMF_DATE:
panic("AMF_DATE not supported")
case AMF_LONG_STRING, AMF_XML_DOC:
panic("AMF_LONG_STRING, AMF_XML_DOC not supported")
case AMF_RECORDSET:
// TODO use new logger here
log.Println("AMFProp_Decode: AMF_RECORDSET reserved!")
//RTMP_Log(RTMP_LOGERROR, "AMF_RECORDSET reserved!");
return -1
case AMF_TYPED_OBJECT:
// TODO use new logger here
// RTMP_Log(RTMP_LOGERROR, "AMF_TYPED_OBJECT not supported!")
return -1
case AMF_AVMPLUS:
panic("AMF_AVMPLUS not supported")
default:
// TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s - unknown datatype 0x%02x, @%p", __FUNCTION__,
//prop.p_type, pBuffer - 1);
return -1
}
return int32(nOriginalSize - len(data))
}
// void AMFProp_Reset(AMFObjectProperty* prop);
// amf.c +875
func C_AMFProp_Reset(prop *C_AMFObjectProperty) {
if prop.p_type == AMF_OBJECT || prop.p_type == AMF_ECMA_ARRAY ||
prop.p_type == AMF_STRICT_ARRAY {
C_AMF_Reset(&prop.p_vu.p_object)
} else {
prop.p_vu.p_aval = ""
}
prop.p_type = AMF_INVALID
}
// char* AMF_Encode(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +891
func C_AMF_Encode(obj *C_AMFObject, dst []byte) []byte {
if len(dst) < 5 {
return nil
}
dst[0] = AMF_OBJECT
dst = dst[1:]
for i := 0; i < len(obj.o_props); i++ {
dst = C_AMF_PropEncode(&obj.o_props[i], dst)
if dst == nil {
log.Println("C_AMF_Encode: failed to encode property in index")
break
}
}
if len(dst) < 4 {
return nil
}
return C_AMF_EncodeInt24(dst, AMF_OBJECT_END)
}
// char* AMF_EncodeEcmaArray(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +924
func C_AMF_EncodeEcmaArray(obj *C_AMFObject, dst []byte) []byte {
if len(dst) < 5 {
return nil
}
dst[0] = AMF_ECMA_ARRAY
dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(obj.o_props)))
dst = dst[4:]
for i := 0; i < len(obj.o_props); i++ {
dst = C_AMF_PropEncode(&obj.o_props[i], dst)
if dst == nil {
log.Println("C_AMF_EncodeEcmaArray: failed to encode property!")
break
}
}
if len(dst) < 4 {
return nil
}
return C_AMF_EncodeInt24(dst, AMF_OBJECT_END)
}
// char* AMF_EncodeArray(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +959
func C_AMF_EncodeArray(obj *C_AMFObject, dst []byte) []byte {
if len(dst) < 5 {
return nil
}
dst[0] = AMF_STRICT_ARRAY
dst = dst[1:]
binary.BigEndian.PutUint32(dst[:4], uint32(len(obj.o_props)))
dst = dst[4:]
for i := 0; i < len(obj.o_props); i++ {
dst = C_AMF_PropEncode(&obj.o_props[i], dst)
if dst == nil {
log.Println("C_AMF_EncodeArray: failed to encode property!")
break
}
}
return dst
}
// int AMF_Decode(AMFObject *obj, const char* pBuffer, int nSize, int bDecodeName);
// amf.c +1180
func C_AMF_Decode(obj *C_AMFObject, data []byte, bDecodeName int32) int32 {
nOriginalSize := len(data)
obj.o_props = obj.o_props[:0]
for len(data) != 0 {
if len(data) >= 3 && C_AMF_DecodeInt24(data[:3]) == AMF_OBJECT_END {
data = data[3:]
break
}
var prop C_AMFObjectProperty
nRes := C_AMFProp_Decode(&prop, data, bDecodeName)
// nRes = int32(C.AMFProp_Decode(&prop, (*byte)(unsafe.Pointer(pBuffer)),
// int32(nSize), int32(bDecodeName)))
if nRes == -1 {
return -1
}
data = data[nRes:]
obj.o_props = append(obj.o_props, prop)
}
return int32(nOriginalSize - len(data))
}
// AMFObjectProperty* AMF_GetProp(AMFObject *obj, const AVal* name, int nIndex);
// amf.c + 1249
func C_AMF_GetProp(obj *C_AMFObject, name string, idx int32) *C_AMFObjectProperty {
if idx >= 0 {
if idx < int32(len(obj.o_props)) {
return &obj.o_props[idx]
}
} else {
for i, p := range obj.o_props {
if p.p_name == name {
return &obj.o_props[i]
}
}
}
return &AMFProp_Invalid
}
// void AMF_Reset(AMFObject* obj);
// amf.c +1282
func C_AMF_Reset(obj *C_AMFObject) {
for i := range obj.o_props {
C_AMFProp_Reset(&obj.o_props[i])
}
obj.o_props = obj.o_props[:0]
}
/*
// void AMF3CD_AddProp(AMF3ClassDef *cd, AVal *prop);
// amf.c +1298
func AMF3CD_AddProp(cd *C.AMF3ClassDef, prop *C_AVal) {
if cd.cd_num&0x0f == 0 {
cd.cd_props = (*C_AVal)(realloc(unsafe.Pointer(cd.cd_props), int(uintptr(cd.cd_num+16)*unsafe.Sizeof(C_AVal{}))))
}
*(*C_AVal)(incPtr(unsafe.Pointer(cd.cd_props), int(cd.cd_num), int(unsafe.Sizeof(C_AVal{})))) = *prop
cd.cd_num++
}
*/

498
rtmp/amf/amf.go Normal file
View File

@ -0,0 +1,498 @@
/*
NAME
amf.go
DESCRIPTION
Action Message Format (AMF) encoding/decoding functions.
See https://en.wikipedia.org/wiki/Action_Message_Format.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
amf.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
// Package amf implements Action Message Format (AMF) encoding and decoding.
// In AMF, encoding of numbers is big endian by default, unless specified otherwise,
// and numbers are all unsigned.
// See https://en.wikipedia.org/wiki/Action_Message_Format.
package amf
import (
"encoding/binary"
"errors"
"math"
)
// AMF data types, as defined by the AMF specification.
// NB: we export these sparingly.
const (
typeNumber = 0x00
typeBoolean = 0x01
typeString = 0x02
TypeObject = 0x03
typeMovieClip = 0x04
TypeNull = 0x05
typeUndefined = 0x06
typeReference = 0x07
typeEcmaArray = 0x08
TypeObjectEnd = 0x09
typeStrictArray = 0x0A
typeDate = 0x0B
typeLongString = 0x0C
typeUnsupported = 0x0D
typeRecordset = 0x0E
typeXmlDoc = 0x0F
typeTypedObject = 0x10
typeAvmplus = 0x11
typeInvalid = 0xff
)
// AMF represents an AMF object, which is simply a collection of properties.
type Object struct {
Properties []Property
}
// Property represents an AMF property, which is effectively a
// union. The Type is the AMF data type (uint8 per the specification),
// and specifies which member holds a value. Numeric types use
// Number, string types use String and arrays and objects use
// Object. The Name is optional.
type Property struct {
Type uint8
Name string
Number float64
String string
Object Object
}
// AMF errors:
var (
ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short.
ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder.
ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding.
ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found.
)
// DecodeInt16 decodes a 16-bit integer.
func DecodeInt16(buf []byte) uint16 {
return binary.BigEndian.Uint16(buf)
}
// DecodeInt24 decodes a 24-bit integer.
func DecodeInt24(buf []byte) uint32 {
return uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])
}
// DecodeInt32 decodes a 32-bit integer.
func DecodeInt32(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
// DecodeInt32LE decodes a 32-bit little-endian integer.
func DecodeInt32LE(buf []byte) uint32 {
return binary.LittleEndian.Uint32(buf)
}
// DecodeString decodes a string that is less than 2^16 bytes long.
func DecodeString(buf []byte) string {
n := DecodeInt16(buf)
return string(buf[2 : 2+n])
}
// DecodeLongString decodes a long string.
func DecodeLongString(buf []byte) string {
n := DecodeInt32(buf)
return string(buf[2 : 2+n])
}
// DecodeNumber decodes a 64-bit floating-point number.
func DecodeNumber(buf []byte) float64 {
return math.Float64frombits(binary.BigEndian.Uint64(buf))
}
// DecodeBoolean decodes a boolean.
func DecodeBoolean(buf []byte) bool {
return buf[0] != 0
}
// EncodeInt24 encodes a 24-bit integer.
func EncodeInt24(buf []byte, val uint32) ([]byte, error) {
if len(buf) < 3 {
return nil, ErrShortBuffer
}
buf[0] = byte(val >> 16)
buf[1] = byte(val >> 8)
buf[2] = byte(val)
return buf[3:], nil
}
// EncodeInt32 encodes a 32-bit integer.
func EncodeInt32(buf []byte, val uint32) ([]byte, error) {
if len(buf) < 4 {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint32(buf, val)
return buf[4:], nil
}
// EncodeString encodes a string.
func EncodeString(buf []byte, val string) ([]byte, error) {
const typeSize = 1
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) {
return nil, ErrShortBuffer
}
if len(val)+typeSize+binary.Size(uint32(0)) > len(buf) {
return nil, ErrShortBuffer
}
if len(val) < 65536 {
buf[0] = typeString
buf = buf[1:]
binary.BigEndian.PutUint16(buf[:2], uint16(len(val)))
buf = buf[2:]
copy(buf, val)
return buf[len(val):], nil
}
buf[0] = typeLongString
buf = buf[1:]
binary.BigEndian.PutUint32(buf[:4], uint32(len(val)))
buf = buf[4:]
copy(buf, val)
return buf[len(val):], nil
}
// EncodeNumber encodes a 64-bit floating-point number.
func EncodeNumber(buf []byte, val float64) ([]byte, error) {
if len(buf) < 9 {
return nil, ErrShortBuffer
}
buf[0] = typeNumber
buf = buf[1:]
binary.BigEndian.PutUint64(buf, math.Float64bits(val))
return buf[8:], nil
}
// EncodeBoolean encodes a boolean.
func EncodeBoolean(buf []byte, val bool) ([]byte, error) {
if len(buf) < 2 {
return nil, ErrShortBuffer
}
buf[0] = typeBoolean
if val {
buf[1] = 1
} else {
buf[1] = 0
}
return buf[2:], nil
}
// EncodeNamedString encodes a named string, where key is the name and val is the string value.
func EncodeNamedString(buf []byte, key, val string) ([]byte, error) {
if 2+len(key) > len(buf) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(key)))
buf = buf[2:]
copy(buf, key)
return EncodeString(buf[len(key):], val)
}
// EncodeNamedNumber encodes a named number, where key is the name and val is the number value.
func EncodeNamedNumber(buf []byte, key string, val float64) ([]byte, error) {
if 2+len(key) > len(buf) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(key)))
buf = buf[2:]
copy(buf, key)
return EncodeNumber(buf[len(key):], val)
}
// EncodeNamedNumber encodes a named boolean, where key is the name and val is the boolean value.
func EncodeNamedBoolean(buf []byte, key string, val bool) ([]byte, error) {
if 2+len(key) > len(buf) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(key)))
buf = buf[2:]
copy(buf, key)
return EncodeBoolean(buf[len(key):], val)
}
// EncodeProperty encodes a property.
func EncodeProperty(prop *Property, buf []byte) ([]byte, error) {
if prop.Type != TypeNull && prop.Name != "" {
if len(buf) < 2+len(prop.Name) {
return nil, ErrShortBuffer
}
binary.BigEndian.PutUint16(buf[:2], uint16(len(prop.Name)))
buf = buf[2:]
copy(buf, prop.Name)
buf = buf[len(prop.Name):]
}
switch prop.Type {
case typeNumber:
return EncodeNumber(buf, prop.Number)
case typeBoolean:
return EncodeBoolean(buf, prop.Number != 0)
case typeString:
return EncodeString(buf, prop.String)
case TypeNull:
if len(buf) < 2 {
return nil, ErrShortBuffer
}
buf[0] = TypeNull
buf = buf[1:]
case TypeObject:
return Encode(&prop.Object, buf)
case typeEcmaArray:
return EncodeEcmaArray(&prop.Object, buf)
case typeStrictArray:
return EncodeArray(&prop.Object, buf)
default:
return nil, ErrInvalidType
}
return buf, nil
}
// DecodeProperty decodes a property, returning the number of bytes consumed from the supplied buffer.
func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
sz := len(buf)
if decodeName {
if len(buf) < 4 {
return 0, ErrShortBuffer
}
n := DecodeInt16(buf[:2])
if int(n) > len(buf)-2 {
return 0, ErrShortBuffer
}
prop.Name = DecodeString(buf)
buf = buf[2+n:]
} else {
prop.Name = ""
}
prop.Type = buf[0]
buf = buf[1:]
switch prop.Type {
case typeNumber:
if len(buf) < 8 {
return 0, ErrShortBuffer
}
prop.Number = DecodeNumber(buf[:8])
buf = buf[8:]
case typeBoolean:
if len(buf) < 1 {
return 0, ErrShortBuffer
}
prop.Number = float64(buf[0])
buf = buf[1:]
case typeString:
n := DecodeInt16(buf[:2])
if len(buf) < int(n+2) {
return 0, ErrShortBuffer
}
prop.String = DecodeString(buf)
buf = buf[2+n:]
case TypeObject:
n, err := Decode(&prop.Object, buf, true)
if err != nil {
return 0, err
}
buf = buf[n:]
case TypeNull, typeUndefined, typeUnsupported:
prop.Type = TypeNull
case typeEcmaArray:
buf = buf[4:]
n, err := Decode(&prop.Object, buf, true)
if err != nil {
return 0, err
}
buf = buf[n:]
default:
return 0, ErrUnexpectedType
}
return sz - len(buf), nil
}
// Encode encodes an Object into its AMF representation.
// This is the top-level encoding function and is typically the only function callers will need to use.
func Encode(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 {
return nil, ErrShortBuffer
}
buf[0] = TypeObject
buf = buf[1:]
for i := 0; i < len(obj.Properties); i++ {
var err error
buf, err = EncodeProperty(&obj.Properties[i], buf)
if err != nil {
return nil, err
}
}
if len(buf) < 3 {
return nil, ErrShortBuffer
}
return EncodeInt24(buf, TypeObjectEnd)
}
// EncodeEcmaArray encodes an ECMA array.
func EncodeEcmaArray(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 {
return nil, ErrShortBuffer
}
buf[0] = typeEcmaArray
buf = buf[1:]
binary.BigEndian.PutUint32(buf[:4], uint32(len(obj.Properties)))
buf = buf[4:]
for i := 0; i < len(obj.Properties); i++ {
var err error
buf, err = EncodeProperty(&obj.Properties[i], buf)
if err != nil {
return nil, err
}
}
if len(buf) < 3 {
return nil, ErrShortBuffer
}
return EncodeInt24(buf, TypeObjectEnd)
}
// EncodeArray encodes an array.
func EncodeArray(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 {
return nil, ErrShortBuffer
}
buf[0] = typeStrictArray
buf = buf[1:]
binary.BigEndian.PutUint32(buf[:4], uint32(len(obj.Properties)))
buf = buf[4:]
for i := 0; i < len(obj.Properties); i++ {
var err error
buf, err = EncodeProperty(&obj.Properties[i], buf)
if err != nil {
return nil, err
}
}
return buf, nil
}
// Decode decodes an object. Property names are only decoded if decodeName is true.
func Decode(obj *Object, buf []byte, decodeName bool) (int, error) {
sz := len(buf)
obj.Properties = obj.Properties[:0]
for len(buf) != 0 {
if len(buf) >= 3 && DecodeInt24(buf[:3]) == TypeObjectEnd {
buf = buf[3:]
break
}
var prop Property
n, err := DecodeProperty(&prop, buf, decodeName)
if err != nil {
return 0, err
}
buf = buf[n:]
obj.Properties = append(obj.Properties, prop)
}
return sz - len(buf), nil
}
// Object methods:
// Property returns a property, either by its index when idx is non-negative, or by its name otherwise.
// If the requested property is not found or the type does not match, an ErrPropertyNotFound error is returned.
func (obj *Object) Property(name string, idx int, typ uint8) (*Property, error) {
var prop *Property
if idx >= 0 {
if idx < len(obj.Properties) {
prop = &obj.Properties[idx]
}
} else {
for i, p := range obj.Properties {
if p.Name == name {
prop = &obj.Properties[i]
break
}
}
}
if prop == nil || prop.Type != typ {
return nil, ErrPropertyNotFound
}
return prop, nil
}
// NumberProperty is a wrapper for Property that returns a Number property's value, if any.
func (obj *Object) NumberProperty(name string, idx int) (float64, error) {
prop, err := obj.Property(name, idx, typeNumber)
if err != nil {
return 0, err
}
return prop.Number, nil
}
// StringProperty is a wrapper for Property that returns a String property's value, if any.
func (obj *Object) StringProperty(name string, idx int) (string, error) {
prop, err := obj.Property(name, idx, typeString)
if err != nil {
return "", err
}
return prop.String, nil
}
// ObjectProperty is a wrapper for Property that returns an Object property's value, if any.
func (obj *Object) ObjectProperty(name string, idx int) (*Object, error) {
prop, err := obj.Property(name, idx, TypeObject)
if err != nil {
return nil, err
}
return &prop.Object, nil
}

308
rtmp/amf/amf_test.go Normal file
View File

@ -0,0 +1,308 @@
/*
NAME
amf_test.go
DESCRIPTION
AMF test suite.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
amf_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package amf
import (
"testing"
)
// Test data.
var testStrings = [...]string{
"",
"foo",
"bar",
"bazz",
}
var testNumbers = [...]uint32{
0,
1,
0xababab,
0xffffff,
}
// TestSanity checks that we haven't accidentally changed constants.
func TestSanity(t *testing.T) {
if TypeObjectEnd != 0x09 {
t.Errorf("TypeObjectEnd has wrong value; got %d, expected %d", TypeObjectEnd, 0x09)
}
}
// TestStrings tests string encoding and decoding.
func TestStrings(t *testing.T) {
// Short string encoding is as follows:
// enc[0] = data type (typeString)
// end[1:3] = size
// enc[3:] = data
for _, s := range testStrings {
buf := make([]byte, len(s)+7)
_, err := EncodeString(buf, s)
if err != nil {
t.Errorf("EncodeString failed")
}
if buf[0] != typeString {
t.Errorf("Expected typeString, got %v", buf[0])
}
ds := DecodeString(buf[1:])
if s != ds {
t.Errorf("DecodeString did not produce original string, got %v", ds)
}
}
// Long string encoding is as follows:
// enc[0] = data type (typeString)
// end[1:5] = size
// enc[5:] = data
s := string(make([]byte, 65536))
buf := make([]byte, len(s)+7)
_, err := EncodeString(buf, s)
if err != nil {
t.Errorf("EncodeString failed")
}
if buf[0] != typeLongString {
t.Errorf("Expected typeLongString, got %v", buf[0])
}
ds := DecodeLongString(buf[1:])
if s != ds {
t.Errorf("DecodeLongString did not produce original string, got %v", ds)
}
}
// TestNumbers tests number encoding and encoding.
func TestNumbers(t *testing.T) {
for _, n := range testNumbers {
buf := make([]byte, 4) // NB: encoder requires an extra byte for some reason
_, err := EncodeInt24(buf, n)
if err != nil {
t.Errorf("EncodeInt24 failed")
}
dn := DecodeInt24(buf)
if n != dn {
t.Errorf("DecodeInt24 did not produce original Number, got %v", dn)
}
_, err = EncodeInt32(buf, n)
if err != nil {
t.Errorf("EncodeInt32 failed")
}
dn = DecodeInt32(buf)
if n != dn {
t.Errorf("DecodeInt32 did not produce original Number, got %v", dn)
}
}
}
// TestProperties tests encoding and decoding of properties.
func TestProperties(t *testing.T) {
var buf [1024]byte
// Encode/decode Number properties.
enc := buf[:]
var err error
for i := range testNumbers {
enc, err = EncodeProperty(&Property{Type: typeNumber, Number: float64(testNumbers[i])}, enc)
if err != nil {
t.Errorf("EncodeProperty of Number failed")
}
}
prop := Property{}
dec := buf[:]
for i := range testNumbers {
n, err := DecodeProperty(&prop, dec, false)
if err != nil {
t.Errorf("DecodeProperty of Number failed")
}
if prop.Number != float64(testNumbers[i]) {
t.Errorf("EncodeProperty/DecodeProperty returned wrong Number; got %v, expected %v", int32(prop.Number), testNumbers[i])
}
dec = dec[n:]
}
// Encode/decode string properties.
enc = buf[:]
for i := range testStrings {
enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc)
if err != nil {
t.Errorf("EncodeProperty of string failed")
}
}
prop = Property{}
dec = buf[:]
for i := range testStrings {
n, err := DecodeProperty(&prop, dec, false)
if err != nil {
t.Errorf("DecodeProperty of string failed")
}
if prop.String != testStrings[i] {
t.Errorf("EncodeProperty/DecodeProperty returned wrong string; got %s, expected %s", prop.String, testStrings[i])
}
dec = dec[n:]
}
}
// TestObject tests encoding and decoding of objects.
func TestObject(t *testing.T) {
var buf [1024]byte
// Construct a simple object that has one property, the Number 42.
prop1 := Property{Type: typeNumber, Number: 42}
obj1 := Object{}
obj1.Properties = append(obj1.Properties, prop1)
// Encode it
enc := buf[:]
var err error
enc, err = Encode(&obj1, enc)
if err != nil {
t.Errorf("Encode of object failed")
}
// Check the encoding
if buf[0] != TypeObject {
t.Errorf("Encoded wrong type; expected %d, got %v", TypeObject, buf[0])
}
if buf[1] != typeNumber {
t.Errorf("Encoded wrong type; expected %d, got %v", typeNumber, buf[0])
}
num := DecodeNumber(buf[2:10])
if num != 42 {
t.Errorf("Encoded wrong Number")
}
end := int32(DecodeInt24(buf[10:13]))
if end != TypeObjectEnd {
t.Errorf("Did not encode TypeObjectEnd")
}
// Decode it
dec := buf[1:]
var dobj1 Object
_, err = Decode(&dobj1, dec, false)
if err != nil {
t.Errorf("Decode of object failed")
}
// Change the object's property to a named boolean.
obj1.Properties[0].Name = "on"
obj1.Properties[0].Type = typeBoolean
obj1.Properties[0].Number = 1
// Re-encode it
enc = buf[:]
enc, err = Encode(&obj1, enc)
if err != nil {
t.Errorf("Encode of object failed")
}
// Decode it, this time with named set to true.
dec = buf[1:]
_, err = Decode(&dobj1, dec, true)
if err != nil {
t.Errorf("Decode of object failed with error: %v", err)
}
if dobj1.Properties[0].Number != 1 {
t.Errorf("Decoded wrong boolean value")
}
// Construct a more complicated object that includes a nested object.
var obj2 Object
for i := range testStrings {
obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]})
obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])})
}
obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1})
obj2.Properties = append(obj2.Properties, Property{Type: typeBoolean, Number: 1})
// Retrieve nested object
obj, err := obj2.ObjectProperty("", 8)
if err != err {
t.Errorf("Failed to retrieve object")
}
if len(obj.Properties) < 1 {
t.Errorf("Properties missing for nested object")
}
if obj.Properties[0].Type != typeBoolean {
t.Errorf("Wrong property type for nested object")
}
// Encode it.
enc = buf[:]
enc, err = Encode(&obj2, enc)
if err != nil {
t.Errorf("Encode of object failed")
}
// Decode it.
dec = buf[1:]
var dobj2 Object
_, err = Decode(&dobj2, dec, false)
if err != nil {
t.Errorf("Decode of object failed with error: %v", err)
}
// Find some properties that exist.
s, err := obj2.StringProperty("", 2)
if err != nil {
t.Errorf("StringProperty failed")
}
if s != "foo" {
t.Errorf("StringProperty returned wrong String")
}
n, err := obj2.NumberProperty("", 3)
if err != nil {
t.Errorf("NumberProperty failed")
}
if n != 1 {
t.Errorf("NumberProperty returned wrong Number")
}
obj, err = obj2.ObjectProperty("", 8)
if err != nil {
t.Errorf("ObjectProperty failed")
return
}
if obj.Properties[0].Type != typeBoolean {
t.Errorf("ObjectProperty returned object with wrong property")
}
prop, err := obj2.Property("", 9, typeBoolean)
if err != nil {
t.Errorf("Property failed")
return
}
if prop.Number != 1 {
t.Errorf("Property returned wrong Property")
}
// Try to find one that doesn't exist.
prop, err = obj2.Property("", 10, TypeObject)
if err != ErrPropertyNotFound {
t.Errorf("Property(10) failed")
}
}

View File

@ -1,81 +0,0 @@
/*
NAME
amf_headers.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
const (
AMF_NUMBER = iota
AMF_BOOLEAN
AMF_STRING
AMF_OBJECT
AMF_MOVIECLIP /* reserved, not used */
AMF_NULL
AMF_UNDEFINED
AMF_REFERENCE
AMF_ECMA_ARRAY
AMF_OBJECT_END
AMF_STRICT_ARRAY
AMF_DATE
AMF_LONG_STRING
AMF_UNSUPPORTED
AMF_RECORDSET /* reserved, not used */
AMF_XML_DOC
AMF_TYPED_OBJECT
AMF_AVMPLUS /* switch to AMF3 */
AMF_INVALID = 0xff
)
// typedef enum
// amf.h +40
type C_AMFDataType int32
// typedef struct AMF_Object
// amf.h +67
type C_AMFObject struct {
o_props []C_AMFObjectProperty
}
// typedef struct P_vu
// amf.h +73
type P_vu struct {
p_number float64
p_aval string
p_object C_AMFObject
}
// typedef struct AMFObjectProperty
// amf.h +79
type C_AMFObjectProperty struct {
p_name string
p_type C_AMFDataType
p_vu P_vu
p_UTCoffset int16
}

View File

@ -37,6 +37,8 @@ package rtmp
import (
"encoding/binary"
"io"
"bitbucket.org/ausocean/av/rtmp/amf"
)
// Packet types.
@ -89,20 +91,12 @@ type packet struct {
info int32
bodySize uint32
bytesRead uint32
chunk *chunk
header []byte
body []byte
}
// chunk defines an RTMP packet chunk.
type chunk struct {
headerSize int32
data []byte
header [fullHeaderSize]byte
}
// read reads a packet.
func (pkt *packet) read(s *Session) error {
// readFrom reads a packet from the RTMP connection.
func (pkt *packet) readFrom(s *Session) error {
var hbuf [fullHeaderSize]byte
header := hbuf[:]
@ -182,16 +176,15 @@ func (pkt *packet) read(s *Session) error {
hSize := len(hbuf) - len(header) + size
if size >= 3 {
pkt.timestamp = C_AMF_DecodeInt24(header[:3])
pkt.timestamp = amf.DecodeInt24(header[:3])
if size >= 6 {
pkt.bodySize = C_AMF_DecodeInt24(header[3:6])
pkt.bodySize = amf.DecodeInt24(header[3:6])
pkt.bytesRead = 0
if size > 6 {
pkt.packetType = header[6]
if size == 11 {
pkt.info = decodeInt32LE(header[7:11])
pkt.info = int32(amf.DecodeInt32LE(header[7:11]))
}
}
}
@ -204,8 +197,7 @@ func (pkt *packet) read(s *Session) error {
s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error())
return err
}
// TODO: port this
pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4])
pkt.timestamp = amf.DecodeInt32(header[size : size+4])
hSize += 4
}
@ -213,20 +205,13 @@ func (pkt *packet) read(s *Session) error {
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
}
toRead := int32(pkt.bodySize - pkt.bytesRead)
toRead := pkt.bodySize - pkt.bytesRead
chunkSize := s.inChunkSize
if toRead < chunkSize {
chunkSize = toRead
}
if pkt.chunk != nil {
panic("non-nil chunk")
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
}
_, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
@ -235,7 +220,7 @@ func (pkt *packet) read(s *Session) error {
pkt.bytesRead += uint32(chunkSize)
// keep the packet as ref for other packets on this channel
// Keep the packet as a reference for other packets on this channel.
if s.channelsIn[pkt.channel] == nil {
s.channelsIn[pkt.channel] = &packet{}
}
@ -245,12 +230,8 @@ func (pkt *packet) read(s *Session) error {
s.channelsIn[pkt.channel].timestamp = 0xffffff
}
if pkt.bytesRead != pkt.bodySize {
panic("readPacket: bytesRead != bodySize")
}
if !pkt.hasAbsTimestamp {
// timestamps seem to always be relative
// Timestamps seem to always be relative.
pkt.timestamp += uint32(s.channelTimestamp[pkt.channel])
}
s.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
@ -262,6 +243,7 @@ func (pkt *packet) read(s *Session) error {
}
// resize adjusts the packet's storage to accommodate a body of the given size and header type.
// When headerSizeAuto is specified, the header type is computed based on packet type.
func (pkt *packet) resize(size uint32, ht uint8) {
buf := make([]byte, fullHeaderSize+size)
pkt.header = buf
@ -285,10 +267,12 @@ func (pkt *packet) resize(size uint32, ht uint8) {
}
}
// write sends a packet.
// writeTo writes a packet to the RTMP connection.
// Packets are written in chunks which are Session.chunkSize in length (128 bytes in length).
// We defer sending small audio packets and combine consecutive small audio packets where possible to reduce I/O.
// When queue is true, we expect a response to this request and cache the method on s.methodCalls.
func (pkt *packet) write(s *Session, queue bool) error {
if pkt.body == nil {
func (pkt *packet) writeTo(s *Session, queue bool) error {
if pkt.body == nil || pkt.bodySize == 0 {
return errInvalidBody
}
@ -314,7 +298,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
prevPkt := s.channelsOut[pkt.channel]
var last int
if prevPkt != nil && pkt.headerType != headerSizeLarge {
// compress a bit by using the prev packet's attributes
// Compress header by using the previous packet's attributes.
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium {
pkt.headerType = headerSizeSmall
}
@ -337,7 +321,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
hSize := headerSizes[pkt.headerType]
origIdx := fullHeaderSize - hSize
// adjust 1 or 2 bytes for the channel
// Adjust 1 or 2 bytes depending on the channel.
cSize := 0
switch {
case pkt.channel > 319:
@ -351,7 +335,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
hSize += cSize
}
// adjust 4 bytes for the timestamp
// Adjust 4 bytes for the timestamp.
var ts uint32
if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last)
@ -388,16 +372,16 @@ func (pkt *packet) write(s *Session, queue bool) error {
}
if headerSizes[pkt.headerType] > 1 {
res := ts
tmp := ts
if ts > 0xffffff {
res = 0xffffff
tmp = 0xffffff
}
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res))
amf.EncodeInt24(headBytes[headerIdx:], tmp)
headerIdx += 3 // 24bits
}
if headerSizes[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
amf.EncodeInt24(headBytes[headerIdx:], pkt.bodySize)
headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType
headerIdx++
@ -409,7 +393,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
}
if ts >= 0xffffff {
C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts))
amf.EncodeInt32(headBytes[headerIdx:], ts)
headerIdx += 4 // 32bits
}
@ -436,7 +420,6 @@ func (pkt *packet) write(s *Session, queue bool) error {
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size)
for size+hSize != 0 {
if chunkSize > size {
@ -459,6 +442,7 @@ func (pkt *packet) write(s *Session, queue bool) error {
hSize = 0
if size > 0 {
// We are writing the 2nd or subsequent chunk.
origIdx -= 1 + cSize
hSize = 1 + cSize
@ -479,22 +463,19 @@ func (pkt *packet) write(s *Session, queue bool) error {
}
if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:]
C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts))
amf.EncodeInt32(extendedTimestamp[:4], ts)
}
}
}
// We invoked a remote method
if pkt.packetType == packetTypeInvoke {
// If we invoked a remote method and queue is true, we queue the method until the result arrives.
if pkt.packetType == packetTypeInvoke && queue {
buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth)
// keep it in call queue till result arrives
if queue {
buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
}
meth := amf.DecodeString(buf)
s.log(DebugLevel, pkg+"queuing method "+meth)
buf = buf[3+len(meth):]
txn := int32(amf.DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
}
if s.channelsOut[pkt.channel] == nil {
@ -504,12 +485,3 @@ func (pkt *packet) write(s *Session, queue bool) error {
return nil
}
func decodeInt32LE(data []byte) int32 {
return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0])
}
func encodeInt32LE(dst []byte, v int32) int32 {
binary.LittleEndian.PutUint32(dst, uint32(v))
return 4
}

View File

@ -42,11 +42,12 @@ import (
"net"
"strconv"
"time"
"bitbucket.org/ausocean/av/rtmp/amf"
)
const (
pkg = "rtmp:"
minDataSize = 11 // ToDo: this should be the same as fullHeaderSize
signatureSize = 1536
fullHeaderSize = 12
)
@ -151,35 +152,25 @@ var rtmpProtocolStrings = [...]string{
// RTMP errors.
var (
errUnknownScheme = errors.New("rtmp: unknown scheme")
errInvalidURL = errors.New("rtmp: invalid URL")
errConnected = errors.New("rtmp: already connected")
errNotConnected = errors.New("rtmp: not connected")
errNotWritable = errors.New("rtmp: connection not writable")
errHandshake = errors.New("rtmp: handshake failed")
errConnSend = errors.New("rtmp: connection send error")
errConnStream = errors.New("rtmp: connection stream error")
errInvalidHeader = errors.New("rtmp: invalid header")
errInvalidBody = errors.New("rtmp: invalid body")
errTinyPacket = errors.New("rtmp: packet too small")
errEncoding = errors.New("rtmp: encoding error")
errDecoding = errors.New("rtmp: decoding error")
errInvalidFlvTag = errors.New("rtmp: invalid FLV tag")
errUnimplemented = errors.New("rtmp: unimplemented feature")
)
// setupURL parses the RTMP URL.
func setupURL(s *Session) (err error) {
// init initialises the Session link
func (s *Session) init() (err error) {
s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url)
if err != nil {
return err
}
if s.link.tcUrl == "" {
if s.link.app != "" {
s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app
} else {
s.link.tcUrl = s.url
}
if s.link.app == "" {
return errInvalidURL
}
if s.link.port == 0 {
switch {
case (s.link.protocol & featureSSL) != 0:
@ -191,6 +182,8 @@ func setupURL(s *Session) (err error) {
s.link.port = 1935
}
}
s.link.url = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app
s.link.protocol |= featureWrite
return nil
}
@ -209,13 +202,13 @@ func connect(s *Session) error {
err = handshake(s)
if err != nil {
s.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
return errHandshake
return err
}
s.log(DebugLevel, pkg+"handshaked")
err = sendConnectPacket(s)
if err != nil {
s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
return errConnSend
return err
}
return nil
}
@ -225,7 +218,7 @@ func connectStream(s *Session) error {
var err error
for !s.isPlaying {
pkt := packet{}
err = pkt.read(s)
err = pkt.readFrom(s)
if err != nil {
break
}
@ -248,53 +241,39 @@ func connectStream(s *Session) error {
}
// handlePacket handles a packet that the client has received.
// NB: cases have been commented out that are not currently used by AusOcean
// NB: Unsupported packet types are logged fatally.
func handlePacket(s *Session, pkt *packet) error {
if pkt.bodySize < 4 {
return errInvalidBody
}
switch pkt.packetType {
case packetTypeChunkSize:
if pkt.bodySize >= 4 {
s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4]))
}
s.inChunkSize = amf.DecodeInt32(pkt.body[:4])
case packetTypeBytesReadReport:
s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4]))
case packetTypeControl:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeControl")
s.serverBW = amf.DecodeInt32(pkt.body[:4])
case packetTypeServerBW:
s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4]))
s.serverBW = amf.DecodeInt32(pkt.body[:4])
case packetTypeClientBW:
s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4]))
s.clientBW = amf.DecodeInt32(pkt.body[:4])
if pkt.bodySize > 4 {
s.clientBW2 = pkt.body[4]
} else {
s.clientBW2 = 0xff
}
case packetTypeAudio:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeAudio")
case packetTypeVideo:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeVideo")
case packetTypeFlexMessage:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeFlexMessage")
case packetTypeInfo:
s.log(FatalLevel, pkg+"unsupported packet type packetTypeInfo")
case packetTypeInvoke:
err := handleInvoke(s, pkt.body[:pkt.bodySize])
if err != nil {
// This will never happen with the methods we implement.
s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error())
return err
}
case packetTypeFlashVideo:
s.log(FatalLevel, pkg+"unsupported packet type packetType_FLASHVideo")
case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo:
s.log(FatalLevel, pkg+"unsupported packet type "+strconv.Itoa(int(pkt.packetType)))
default:
s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType)
@ -313,110 +292,50 @@ func sendConnectPacket(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avConnect)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avConnect)
if err != nil {
return err
}
s.numInvokes += 1
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_OBJECT
enc[0] = amf.TypeObject
enc = enc[1:]
enc = C_AMF_EncodeNamedString(enc, avApp, s.link.app)
if enc == nil {
return errEncoding
enc, err = amf.EncodeNamedString(enc, avApp, s.link.app)
if err != nil {
return err
}
if s.link.protocol&featureWrite != 0 {
enc = C_AMF_EncodeNamedString(enc, avType, avNonprivate)
if enc == nil {
return errEncoding
}
enc, err = amf.EncodeNamedString(enc, avType, avNonprivate)
if err != nil {
return err
}
enc, err = amf.EncodeNamedString(enc, avTcUrl, s.link.url)
if err != nil {
return err
}
enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd)
if err != nil {
return err
}
if s.link.flashVer != "" {
enc = C_AMF_EncodeNamedString(enc, avFlashver, s.link.flashVer)
if enc == nil {
return errEncoding
}
}
if s.link.swfUrl != "" {
enc = C_AMF_EncodeNamedString(enc, avSwfUrl, s.link.swfUrl)
if enc == nil {
return errEncoding
}
}
if s.link.tcUrl != "" {
enc = C_AMF_EncodeNamedString(enc, avTcUrl, s.link.tcUrl)
if enc == nil {
return errEncoding
}
}
if s.link.protocol&featureWrite == 0 {
enc = C_AMF_EncodeNamedBoolean(enc, avFpad, false)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avCapabilities, 15)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avAudioCodecs, s.audioCodecs)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avVideoCodecs, s.videoCodecs)
if enc == nil {
return errEncoding
}
enc = C_AMF_EncodeNamedNumber(enc, avVideoFunction, 1)
if enc == nil {
return errEncoding
}
if s.link.pageUrl != "" {
enc = C_AMF_EncodeNamedString(enc, avPageUrl, s.link.pageUrl)
if enc == nil {
return errEncoding
}
}
}
if s.encoding != 0.0 || s.sendEncoding {
enc = C_AMF_EncodeNamedNumber(enc, avObjectEncoding, s.encoding)
if enc == nil {
return errEncoding
}
}
copy(enc, []byte{0, 0, AMF_OBJECT_END})
enc = enc[3:]
// add auth string
// add auth string, if any
if s.link.auth != "" {
enc = C_AMF_EncodeBoolean(enc, s.link.flags&linkAuth != 0)
if enc == nil {
return errEncoding
enc, err = amf.EncodeBoolean(enc, s.link.flags&linkAuth != 0)
if err != nil {
return err
}
enc = C_AMF_EncodeString(enc, s.link.auth)
if enc == nil {
return errEncoding
}
}
for i := range s.link.extras.o_props {
enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc)
if enc == nil {
return errEncoding
enc, err = amf.EncodeString(enc, s.link.auth)
if err != nil {
return err
}
}
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected
return pkt.writeTo(s, true) // response expected
}
func sendCreateStream(s *Session) error {
@ -430,21 +349,21 @@ func sendCreateStream(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avCreatestream)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avCreatestream)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected
return pkt.writeTo(s, true) // response expected
}
func sendReleaseStream(s *Session) error {
@ -458,24 +377,24 @@ func sendReleaseStream(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avReleasestream)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avReleasestream)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath)
if enc == nil {
return errEncoding
enc, err = amf.EncodeString(enc, s.link.playpath)
if err != nil {
return err
}
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
return pkt.writeTo(s, false)
}
func sendFCPublish(s *Session) error {
@ -489,25 +408,25 @@ func sendFCPublish(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avFCPublish)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avFCPublish)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath)
if enc == nil {
return errEncoding
enc, err = amf.EncodeString(enc, s.link.playpath)
if err != nil {
return err
}
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
return pkt.writeTo(s, false)
}
func sendFCUnpublish(s *Session) error {
@ -521,25 +440,25 @@ func sendFCUnpublish(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avFCUnpublish)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avFCUnpublish)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath)
if enc == nil {
return errEncoding
enc, err = amf.EncodeString(enc, s.link.playpath)
if err != nil {
return err
}
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
return pkt.writeTo(s, false)
}
func sendPublish(s *Session) error {
@ -553,29 +472,29 @@ func sendPublish(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avPublish)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avPublish)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
enc = C_AMF_EncodeString(enc, s.link.playpath)
if enc == nil {
return errEncoding
enc, err = amf.EncodeString(enc, s.link.playpath)
if err != nil {
return err
}
enc = C_AMF_EncodeString(enc, avLive)
if enc == nil {
return errEncoding
enc, err = amf.EncodeString(enc, avLive)
if err != nil {
return err
}
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, true) // response expected
return pkt.writeTo(s, true) // response expected
}
func sendDeleteStream(s *Session, dStreamId float64) error {
@ -589,24 +508,24 @@ func sendDeleteStream(s *Session, dStreamId float64) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, avDeletestream)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, avDeletestream)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
enc = C_AMF_EncodeNumber(enc, dStreamId)
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, dStreamId)
if err != nil {
return err
}
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
return pkt.writeTo(s, false)
}
// sendBytesReceived tells the server how many bytes the client has received.
@ -622,13 +541,14 @@ func sendBytesReceived(s *Session) error {
enc := pkt.body
s.nBytesInSent = s.nBytesIn
enc = C_AMF_EncodeInt32(enc, s.nBytesIn)
if enc == nil {
return errEncoding
enc, err := amf.EncodeInt32(enc, s.nBytesIn)
if err != nil {
return err
}
pkt.bodySize = 4
return pkt.write(s, false)
return pkt.writeTo(s, false)
}
func sendCheckBW(s *Session) error {
@ -642,21 +562,21 @@ func sendCheckBW(s *Session) error {
}
enc := pkt.body
enc = C_AMF_EncodeString(enc, av_checkbw)
if enc == nil {
return errEncoding
enc, err := amf.EncodeString(enc, av_checkbw)
if err != nil {
return err
}
s.numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes))
if enc == nil {
return errEncoding
enc, err = amf.EncodeNumber(enc, float64(s.numInvokes))
if err != nil {
return err
}
enc[0] = AMF_NULL
enc[0] = amf.TypeNull
enc = enc[1:]
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
return pkt.write(s, false)
return pkt.writeTo(s, false)
}
func eraseMethod(m []method, i int) []method {
@ -671,18 +591,27 @@ func handleInvoke(s *Session, body []byte) error {
if body[0] != 0x02 {
return errInvalidBody
}
var obj C_AMFObject
nRes := C_AMF_Decode(&obj, body, 0)
if nRes < 0 {
return errDecoding
var obj amf.Object
_, err := amf.Decode(&obj, body, false)
if err != nil {
return err
}
meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0))
s.log(DebugLevel, pkg+"invoking method "+meth)
txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1))
meth, err := obj.StringProperty("", 0)
if err != nil {
return err
}
txn, err := obj.NumberProperty("", 1)
if err != nil {
return err
}
s.log(DebugLevel, pkg+"invoking method "+meth)
switch meth {
case av_result:
if (s.link.protocol & featureWrite) == 0 {
return errNotWritable
}
var methodInvoked string
for i, m := range s.methodCalls {
if float64(m.num) == txn {
@ -693,18 +622,12 @@ func handleInvoke(s *Session, body []byte) error {
}
if methodInvoked == "" {
s.log(WarnLevel, pkg+"received result without matching request", "id", txn)
goto leave
return nil
}
s.log(DebugLevel, pkg+"received result for "+methodInvoked)
switch methodInvoked {
case avConnect:
if s.link.token != "" {
s.log(FatalLevel, pkg+"no support for link token")
}
if (s.link.protocol & featureWrite) == 0 {
return errNotWritable
}
err := sendReleaseStream(s)
if err != nil {
return err
@ -719,86 +642,56 @@ func handleInvoke(s *Session, body []byte) error {
}
case avCreatestream:
s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3)))
if s.link.protocol&featureWrite == 0 {
return errNotWritable
n, err := obj.NumberProperty("", 3)
if err != nil {
return err
}
err := sendPublish(s)
s.streamID = int32(n)
err = sendPublish(s)
if err != nil {
return err
}
case avPlay, avPublish:
s.log(FatalLevel, pkg+"unsupported method avPlay/avPublish")
default:
s.log(FatalLevel, pkg+"unexpected method invoked"+methodInvoked)
}
case avOnBWDone:
if s.checkCounter == 0 { // ToDo: why is this always zero?
err := sendCheckBW(s)
if err != nil {
return err
}
err := sendCheckBW(s)
if err != nil {
return err
}
case avOnFCUnsubscribe, avOnFCSubscribe:
s.log(FatalLevel, pkg+"unsupported method avOnFCUnsubscribe/avOonfcsubscribe")
case avPing:
s.log(FatalLevel, pkg+"unsupported method avPing")
case av_onbwcheck:
s.log(FatalLevel, pkg+"unsupported method av_onbwcheck")
case av_onbwdone:
s.log(FatalLevel, pkg+"unsupported method av_onbwdone")
case avClose:
s.log(FatalLevel, pkg+"unsupported method avClose")
case avOnStatus:
var obj2 C_AMFObject
C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2)
code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avCode, -1))
level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, avLevel, -1))
obj2, err := obj.ObjectProperty("", 3)
if err != nil {
return err
}
code, err := obj2.StringProperty(avCode, -1)
if err != nil {
return err
}
level, err := obj2.StringProperty(avLevel, -1)
if err != nil {
return err
}
s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level)
switch code {
case avNetStreamFailed, avNetStreamPlayFailed,
avNetStreamPlayStreamNotFound, avNetConnectionConnectInvalidApp:
s.log(FatalLevel, pkg+"unsupported method avNetStream/avNetStreamPlayFailed/avNetstream_play_streamnotfound/av_netConnection_Connect_invalidApp")
case avNetStreamPlayStart, avNetStreamPlayPublishNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayStart/avNetStreamPlayPublishNotify")
case avNetStreamPublish_Start:
s.log(DebugLevel, pkg+"playing")
s.isPlaying = true
for i, m := range s.methodCalls {
if m.name == avPublish {
s.methodCalls = eraseMethod(s.methodCalls, i)
break
}
if code != avNetStreamPublish_Start {
s.log(ErrorLevel, pkg+"unexpected response "+code)
return errUnimplemented
}
s.log(DebugLevel, pkg+"playing")
s.isPlaying = true
for i, m := range s.methodCalls {
if m.name == avPublish {
s.methodCalls = eraseMethod(s.methodCalls, i)
}
// ToDo: handle case when avPublish method not found
case avNetStreamPlayComplete, avNetStreamPlayStop, avNetStreamPlayUnpublishNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPlayComplete/avNetStreamPlayStop/avNetStreamPlayUnpublishNotify")
case avNetStreamSeekNotify:
s.log(FatalLevel, pkg+"unsupported method avNetstream_seek_notify")
case avNetStreamPauseNotify:
s.log(FatalLevel, pkg+"unsupported method avNetStreamPauseNotify")
}
case avPlaylist_ready:
s.log(FatalLevel, pkg+"unsupported method avPlaylist_ready")
default:
s.log(FatalLevel, pkg+"unknown method "+meth)
s.log(FatalLevel, pkg+"unsuppoted method "+meth)
}
leave:
C_AMF_Reset(&obj)
return nil
}

View File

@ -116,42 +116,74 @@ func TestKey(t *testing.T) {
testLog(0, "Testing against URL "+testBaseURL+testKey)
}
// TestSetupURL tests URL parsing.
func TestSetupURL(t *testing.T) {
testLog(0, "TestSetupURL")
// TestInit tests session construction and link initialization.
func TestInit(t *testing.T) {
testLog(0, "TestInit")
// test with just the base URL
s := NewSession(testBaseURL, testTimeout, testLog)
if s.url != testBaseURL && s.link.timeout != testTimeout {
t.Errorf("NewSession failed")
}
err := setupURL(s)
err := s.init()
if err != nil {
t.Errorf("setupURL(testBaseURL) failed with error: %v", err)
t.Errorf("setupURL: failed with error: %v", err)
}
// test the parts are as expected
if rtmpProtocolStrings[s.link.protocol] != rtmpProtocol {
t.Errorf("setupURL returned wrong protocol: %v", s.link.protocol)
if s.link.protocol&featureWrite == 0 {
t.Errorf("setupURL: link not writable")
}
if rtmpProtocolStrings[s.link.protocol&^featureWrite] != rtmpProtocol {
t.Errorf("setupURL: wrong protocol: %v", s.link.protocol)
}
if s.link.host != testHost {
t.Errorf("setupURL returned wrong host: %v", s.link.host)
t.Errorf("setupURL: wrong host: %v", s.link.host)
}
if s.link.app != testApp {
t.Errorf("setupURL returned wrong app: %v", s.link.app)
t.Errorf("setupURL: wrong app: %v", s.link.app)
}
}
// TestOpenClose tests opening an closing an RTMP connection.
func TestOpenClose(t *testing.T) {
testLog(0, "TestOpenClose")
// TestErrorHandling tests error handling
func TestErorHandling(t *testing.T) {
testLog(0, "TestErrorHandling")
if testKey == "" {
t.Skip("Skipping TestOpenClose since no RTMP_TEST_KEY")
t.Skip("Skipping TestErrorHandling since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
// test errNotConnected
var buf [1024]byte
tag := buf[:0]
_, err := s.Write(tag)
if err == nil {
t.Errorf("Write did not return errNotConnected")
}
err = s.Open()
if err != nil {
t.Errorf("Open failed with error: %v", err)
return
}
// test errInvalidFlvTag
_, err = s.Write(tag)
if err == nil {
t.Errorf("Write did not return errInvalidFlvTag")
}
// test errUnimplemented
copy(tag, []byte("FLV"))
_, err = s.Write(tag)
if err == nil {
t.Errorf("Write did not return errUnimplemented")
}
// test errInvalidBody
tag = buf[:11]
_, err = s.Write(tag)
if err == nil {
t.Errorf("Write did not return errInvalidBody")
}
err = s.Close()
if err != nil {
t.Errorf("Close failed with error: %v", err)

View File

@ -37,22 +37,22 @@ import (
"io"
"net"
"time"
"bitbucket.org/ausocean/av/rtmp/amf"
)
// Session holds the state for an RTMP session.
type Session struct {
url string
inChunkSize int32
outChunkSize int32
checkCounter int32
nBytesIn int32
nBytesInSent int32
inChunkSize uint32
outChunkSize uint32
nBytesIn uint32
nBytesInSent uint32
streamID int32
serverBW int32
clientBW int32
serverBW uint32
clientBW uint32
clientBW2 uint8
isPlaying bool
sendEncoding bool
numInvokes int32
methodCalls []method
channelsAllocatedIn int32
@ -62,7 +62,6 @@ type Session struct {
channelTimestamp []int32
audioCodecs float64
videoCodecs float64
encoding float64
deferred []byte
link link
log Log
@ -72,16 +71,10 @@ type Session struct {
type link struct {
host string
playpath string
tcUrl string
swfUrl string
pageUrl string
url string
app string
auth string
flashVer string
token string
extras C_AMFObject
flags int32
swfAge int32
protocol int32
timeout uint
port uint16
@ -106,6 +99,10 @@ const (
FatalLevel int8 = 5
)
// flvTagheaderSize is the FLV header size we expect.
// NB: We don't accept extended headers.
const flvTagheaderSize = 11
// NewSession returns a new Session.
func NewSession(url string, timeout uint, log Log) *Session {
return &Session{
@ -120,7 +117,6 @@ func NewSession(url string, timeout uint, log Log) *Session {
log: log,
link: link{
timeout: timeout,
swfAge: 30,
},
}
}
@ -131,12 +127,10 @@ func (s *Session) Open() error {
if s.isConnected() {
return errConnected
}
err := setupURL(s)
err := s.init()
if err != nil {
return err
}
s.enableWrite()
err = connect(s)
if err != nil {
s.Close()
@ -174,8 +168,8 @@ func (s *Session) Write(data []byte) (int, error) {
if !s.isConnected() {
return 0, errNotConnected
}
if len(data) < minDataSize {
return 0, errTinyPacket
if len(data) < flvTagheaderSize {
return 0, errInvalidFlvTag
}
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented
@ -183,15 +177,15 @@ func (s *Session) Write(data []byte) (int, error) {
pkt := packet{
packetType: data[0],
bodySize: C_AMF_DecodeInt24(data[1:4]),
timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24,
bodySize: amf.DecodeInt24(data[1:4]),
timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: chanSource,
info: s.streamID,
}
pkt.resize(pkt.bodySize, headerSizeAuto)
copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize])
err := pkt.write(s, false)
copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize])
err := pkt.writeTo(s, false)
if err != nil {
return 0, err
}
@ -213,7 +207,7 @@ func (s *Session) read(buf []byte) (int, error) {
s.log(DebugLevel, pkg+"read failed", "error", err.Error())
return 0, err
}
s.nBytesIn += int32(n)
s.nBytesIn += uint32(n)
if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) {
err := sendBytesReceived(s)
if err != nil {
@ -243,7 +237,3 @@ func (s *Session) isConnected() bool {
return s.link.conn != nil
}
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= featureWrite
}

View File

@ -138,5 +138,5 @@ func addPadding(d []byte) []byte {
for i := range padding {
padding[i] = 0xff
}
return d
return t
}

View File

@ -40,7 +40,7 @@ const (
timestampFreq = 90000 // Hz
mtsSize = 188
bufferSize = 1000
sendLength = 7 * 188
sendLen = 7 * 188
)
// Encoder implements io writer and provides functionality to wrap data into
@ -53,6 +53,7 @@ type Encoder struct {
frameInterval time.Duration
fps int
buffer []byte
pktSpace [defPktSize]byte
}
// NewEncoder returns a new Encoder type given an io.Writer - the destination
@ -71,9 +72,9 @@ func NewEncoder(dst io.Writer, fps int) *Encoder {
// so that multiple layers of packetization can occur.
func (e *Encoder) Write(data []byte) (int, error) {
e.buffer = append(e.buffer, data...)
for len(e.buffer) >= sendLength {
e.Encode(e.buffer[:sendLength])
e.buffer = e.buffer[sendLength:]
for len(e.buffer) >= sendLen {
e.Encode(e.buffer[:sendLen])
e.buffer = e.buffer[sendLen:]
}
return len(data), nil
}
@ -93,7 +94,7 @@ func (e *Encoder) Encode(payload []byte) error {
Payload: payload,
Padding: no,
}
_, err := e.dst.Write(pkt.Bytes())
_, err := e.dst.Write(pkt.Bytes(e.pktSpace[:defPktSize]))
if err != nil {
return err
}

View File

@ -33,7 +33,10 @@ for rtp-h264 and rtp standards.
package rtp
const (
rtpVer = 2
rtpVer = 2
headSize = 3 * 4 // Header size of an rtp packet.
defPayloadSize = sendLen // Default payload size for the rtp packet.
defPktSize = headSize + defPayloadSize // Default packet size is header size + payload size.
)
// Pkt provides fields consistent with RFC3550 definition of an rtp packet
@ -53,7 +56,11 @@ type Pkt struct {
}
// Bytes provides a byte slice of the packet
func (p *Pkt) Bytes() []byte {
func (p *Pkt) Bytes(buf []byte) []byte {
if buf == nil || len(buf) != defPktSize {
buf = make([]byte, headSize, defPktSize)
}
buf = buf[:headSize]
if p.V == 0 {
p.V = rtpVer
}
@ -74,9 +81,6 @@ func (p *Pkt) Bytes() []byte {
panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported.")
}
const headSize = 3 * 4 // bytes
buf := make([]byte, headSize, headSize+len(p.Payload)+int(p.Padding))
buf[0] = p.V<<6 | p.p<<5 | p.CC
buf[1] = p.M<<7 | p.PT
buf[2] = byte(p.SN >> 8)

View File

@ -65,7 +65,7 @@ var rtpTests = []struct {
func TestRtpPktToByteSlice(t *testing.T) {
for _, test := range rtpTests {
got := test.pkt.Bytes()
got := test.pkt.Bytes(nil)
if !reflect.DeepEqual(got, test.want) {
t.Errorf("unexpected error for test %v: got:%v want:%v", test.num, got,
test.want)