av/rtmp/rtmp.go

698 lines
17 KiB
Go

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -lrtmp -Wl,-rpath=/usr/local/lib
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
} RTMPTCmd;
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int WriteN(RTMP *r, const char *buffer, int n);
int EncodeInt32LE(char *output, int nVal);
int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len);
*/
import "C"
import (
"errors"
_ "fmt"
"log"
"math"
"reflect"
"strconv"
"unsafe"
)
const (
minDataSize = 11
debugMode = false
nullChar = "golang\000"
)
const (
byteSize = 1
int32Size = 4
int64Size = 8
)
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
)
// memmove copies n bytes from "from" to "to".
//go:linkname memmove runtime.memmove
//func memmove(to, from unsafe.Pointer, n uintptr)
// C.AVal is in amf.h
// See #define AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
// av_setDataFrame is a static const global in rtmp.c
var setDataFrame = AVC("@setDataFrame")
var packetSize = [...]int{ 12, 8, 4, 1}
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
rtmp *C.RTMP
url string
timeout uint
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
func (s *session) Open() error {
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
var err error
s.rtmp, err = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout))
if s.rtmp == nil {
return err
}
return nil
}
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
ret := C.end_session(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
}
//if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 {
if rtmpWrite(s.rtmp, data) == 0 {
return 0, Err(2)
}
return len(data), nil
}
// rtmpWrite writes data to the current rtmp connection encapsulated by r
func rtmpWrite(r *C.RTMP, data []byte) int {
buf := sliceToPtr(data)
// TODO: port RTMPPacket
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
s2 := size
var ret, num int
pkt.m_nChannel = 0x04
pkt.m_nInfoField2 = C.int32_t(r.m_stream_id)
for s2 != 0 {
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
if *indxBytePtr(buf,0) == 'F' && *indxBytePtr(buf,1) == 'L' && *indxBytePtr(buf,2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
s2 -= 13
}
pkt.m_packetType = C.uint8_t(*indxBytePtr(buf,0))
buf = incBytePtr(buf, 1)
// TODO: port this
pkt.m_nBodySize = C.uint32_t(C.AMF_DecodeInt24((*C.char)(buf)))
buf = incBytePtr(buf, 3)
// TODO: replace with ported version
pkt.m_nTimeStamp = C.uint32_t(C.AMF_DecodeInt24((*C.char)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf,0)) << 24
buf = incBytePtr(buf, 4)
s2 -= 11
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_nBodySize += 16
}
} else {
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
}
// TODO: Port this
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
return 0
}
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
// TODO: Port this
enc = unsafe.Pointer(C.AMF_EncodeString((*C.char)(enc), (*C.char)(pend), &setDataFrame))
pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) -
uintptr(unsafe.Pointer(pkt.m_body)))))
}
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
if num > s2 {
num = s2
}
//memmove(enc,buf,uintptr(num))
copy(ptrToSlice(enc, num), ptrToSlice(buf, num))
pkt.m_nBytesRead += C.uint32_t(num)
s2 -= num
buf = incBytePtr(buf, num)
if pkt.m_nBytesRead == pkt.m_nBodySize {
// TODO: Port this
ret = sendPacket(r, pkt, 0)
// TODO: Port this
C.RTMPPacket_Free(pkt)
pkt.m_nBytesRead = 0
if ret == 0 {
return -1
}
buf = incBytePtr(buf, 4)
s2 -= 4
if s2 < 0 {
break
}
}
}
return size + s2
}
// send packet version 1 - less C stuff
func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
var prevPacket *C.RTMPPacket
last := 0
var nSize, hSize, cSize, nChunkSize, tlen int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
var goHbuf [C.RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
var t int32
var packets unsafe.Pointer
if packet.m_nChannel >= r.m_channelsAllocatedOut {
log.Println("Resize")
n := int(packet.m_nChannel+10)
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut),
C.size_t(unsafe.Sizeof(packet) * uintptr(n)))
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet) *
uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
// TODO: port this constant
packet.m_headerType = C.RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = packetSize[int(packet.m_headerType)]
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize)
hend = unsafe.Pointer(packet.m_body)
} else {
header = incBytePtr(hbuf,6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
hend = incBytePtr(hbuf,C.RTMP_MAX_HEADER_SIZE)
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
header = decBytePtr(header,cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header,4)
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = c
hptr = incBytePtr(hptr,1)
if cSize != 0 {
tmp := packet.m_nChannel - 64
*(*byte)(hptr) = byte(tmp & 0xff)
hptr = incBytePtr(hptr,1)
if cSize == 2 {
*(*byte)(hptr) = byte(tmp >> 8)
hptr = incBytePtr(hptr,1)
}
}
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr),(*C.char)(hend), C.int(res)))
}
if nSize > 4 {
hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend),
C.int(packet.m_nBodySize)))
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = incBytePtr(hptr,1)
}
if nSize > 8 {
hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr),
C.int(packet.m_nInfoField2))))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(hptr), (*C.char)(hend), C.int(t)))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("sendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
// send all chunks in one HTTP request
// TODO: port RTMP_FEATURE_HTTP
if int(r.Link.protocol & C.RTMP_FEATURE_HTTP) != 0 {
chunks := (nSize+nChunkSize-1)/nChunkSize
if chunks > 1 {
tlen = chunks *(cSize+1) +nSize +hSize
// TODO: figure out how to do this in go
tbuf = C.malloc(C.size_t(tlen))
if tbuf == nil {
return 0
}
toff = tbuf
}
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
if tbuf != nil {
//memmove(toff, header, uintptr(nChunkSize + hSize))
copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header,
int(nChunkSize+hSize)))
toff = incBytePtr(toff, nChunkSize + hSize)
} else {
// TODO: port this
wrote = int(writeN(r, header, nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
buffer = incBytePtr(buffer,nChunkSize)
hSize = 0
if nSize > 0 {
header = decBytePtr(buffer, 1)
hSize = 1
if cSize != 0 {
header = decBytePtr(header,cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header,4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
*indxBytePtr(header,1) = byte(tmp & 0xff)
if cSize == 2 {
*indxBytePtr(header,2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
extendedTimestamp := incBytePtr(header,1+cSize)
// TODO: port this
C.AMF_EncodeInt32((*C.char)(extendedTimestamp),
(*C.char)(incBytePtr(extendedTimestamp,4)), C.int(t))
}
}
}
if tbuf != nil {
// TODO: port C.writeN
wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff,
int(uintptr(unsafe.Pointer(tbuf))))))))
C.free(tbuf)
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
if packet.m_packetType == C.RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
var method C.AVal
var ptr unsafe.Pointer
ptr = incBytePtr(unsafe.Pointer(packet.m_body),1)
// TODO: port this
C.AMF_DecodeString((*C.char)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
ptr = incBytePtr(ptr, 3 + int(method.av_len))
// TODO: port this
txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
// TODO: port this
C.AV_queue(&r.m_methodCalls, &r.m_numCalls, &method, C.int(txn))
}
}
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil {
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) =
(*C.RTMPPacket)(C.malloc(C.size_t(unsafe.Sizeof(*packet))))
}
//memmove(incPtr(unsafe.Pointer(r.m_vecChannelsOut),int(packet.m_nChannel),
//int(unsafe.Sizeof(packet))),unsafe.Pointer(packet), unsafe.Sizeof(packet))
C.memcpy(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
C.size_t(uintptr(unsafe.Sizeof(*packet))))
return 1
}
func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
if (r.Link.protocol & C.RTMP_FEATURE_HTTP) != 0 {
// TODO: port HTTP_POST
nBytes = int(C.HTTP_Post(r, RTMPT_SEND, (*C.char)(ptr), C.int(n)))
} else {
// TODO: port this if necessary
nBytes = int(C.RTMPSockBuf_Send(&r.m_sb, (*C.char)(ptr), C.int(n)))
}
if nBytes < 0 {
if debugMode {
log.Println("WriteN, RTMP send error")
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
ptr = incBytePtr(ptr, nBytes)
}
if n == 0 {
return 1
}
return 0
}
// TODO: port RTMP_METHOD
func avQueue(vals **C.RTMP_METHOD, num *int, av *C.AVal, txn int ) {
var rtmpMethodPtr *C.RTMP_METHOD
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
*vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16) * int(unsafe.Sizeof(*(*vals))))))
}
tmp := unsafe.Pointer(C.malloc(C.size_t(av.av_len + 1)))
C.memcpy(tmp, unsafe.Pointer(av.av_val), C.size_t(av.av_len))
*indxBytePtr(tmp,int(av.av_len)) = *(*byte)(unsafe.Pointer(C.CString("")))
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals),*num,
int(unsafe.Sizeof(rtmpMethodPtr))))).num = C.int(txn)
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals),*num,
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_len = av.av_len
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals),*num,
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_val = (*C.char)(tmp)
}
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) *byte {
return (*byte)(incPtr(ptr, inc, byteSize))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 {
return (*int32)(incPtr(ptr, inc, int32Size))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 {
return (*int64)(incPtr(ptr, inc, int64Size))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr,inc,byteSize)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr,inc,int32Size)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr,inc,int64Size)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr,dec,byteSize)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr,dec,int32Size)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr,dec,int64Size)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}