av/rtmp/rtmp.go

660 lines
16 KiB
Go

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -lrtmp -Wl,-rpath=/usr/local/lib
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int WriteN(RTMP *r, const char *buffer, int n);
*/
import "C"
import (
"errors"
_ "fmt"
"log"
"math"
"reflect"
"strconv"
"unsafe"
)
const (
minDataSize = 11
debugMode = false
nullChar = "golang\000"
)
const (
byteSize = 1
int32Size = 4
int64Size = 8
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
)
// C.AVal is in amf.h
// See #define AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
// av_setDataFrame is a static const global in rtmp.c
var setDataFrame = AVC("@setDataFrame")
var packetSize = [...]int32{ 12, 8, 4, 1}
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
rtmp *C.RTMP
url string
timeout uint
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
func (s *session) Open() error {
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
var err error
s.rtmp, err = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout))
if s.rtmp == nil {
return err
}
return nil
}
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
ret := C.end_session(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
}
//if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 {
if rtmpWrite(s.rtmp, data) == 0 {
return 0, Err(2)
}
return len(data), nil
}
// rtmpWrite writes data to the current rtmp connection encapsulated by r
func rtmpWrite(r *C.RTMP, data []byte) int {
buf := sliceToPtr(data)
// TODO: port RTMPPacket
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
s2 := size
var ret, num int
pkt.m_nChannel = 0x04
pkt.m_nInfoField2 = r.m_stream_id
for s2 != 0 {
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
if indxBytePtr(buf,0) == 'F' && indxBytePtr(buf,1) == 'L' && indxBytePtr(buf,2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
s2 -= 13
}
pkt.m_packetType = C.uchar(indxBytePtr(buf,0))
buf = incBytePtr(buf, 1)
// TODO: port this
pkt.m_nBodySize = C.AMF_DecodeInt24((*C.char)(buf))
buf = incBytePtr(buf, 3)
// TODO: replace with ported version
pkt.m_nTimeStamp = C.AMF_DecodeInt24((*C.char)(buf))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp |= C.uint(indxBytePtr(buf,0)) << 24
buf = incBytePtr(buf, 4)
s2 -= 11
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_nBodySize += 16
}
} else {
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
}
// TODO: Port this
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
return 0
}
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
// TODO: Port this
enc = unsafe.Pointer(C.AMF_EncodeString((*C.char)(enc), (*C.char)(pend), &setDataFrame))
pkt.m_nBytesRead = C.uint(math.Abs(float64(uintptr(enc) -
uintptr(unsafe.Pointer(pkt.m_body)))))
}
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
if num > s2 {
num = s2
}
memmove(enc,buf,uintptr(num))
//copy(ptrToSlice(enc, num), ptrToSlice(buf, num))
pkt.m_nBytesRead += C.uint(num)
s2 -= num
buf = incBytePtr(buf, num)
if pkt.m_nBytesRead == pkt.m_nBodySize {
// TODO: Port this
ret = int(C.RTMP_SendPacket(r, pkt, 0))
// TODO: Port this
C.RTMPPacket_Free(pkt)
pkt.m_nBytesRead = 0
if ret == 0 {
return -1
}
buf = incBytePtr(buf, 4)
s2 -= 4
if s2 < 0 {
break
}
}
}
return size + s2
}
func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
var prevPacket *C.RTMPPacket
last := 0
var nSize, hSize, cSize, nChunkSize, tlen int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
var goHbuf [C.RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
var t int32
if packet.m_nChannel >= r.m_channelsAllocatedOut {
n := int(packet.m_nChannel+10)
var tmp *C.RTMPPacket
packets := C.realloc(unsafe.Pointer(r.m_vecChannelsOut),
C.ulong(unsafe.Sizeof(tmp) * uintptr(n)))
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
int(unsafe.Sizeof(tmp))), 0, C.ulong(unsafe.Sizeof(tmp) *
uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
// TODO: port this constant
packet.m_headerType = C.RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = int(*(*int)(incInt32Ptr(unsafe.Pointer(&packetSize[0]),int(packet.m_headerType))))
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decPtr(unsafe.Pointer(packet.m_body), nSize,
hend = packet.m_body
} else {
header = incPtr(hbuf,6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
hend = incPtr(hbuf,unsafe.Sizeof((*byte)(hbuf)))
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
header = decPtr(header,4)
hSize = incPtr(hSize,4)
log.Printf("Larger timsetamp than 24-bit: 0x%x", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = byte(tmp >> 8)
hptr = incPtr(hptr,1)
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
hptr = unsafe.Pointer(AMF_EncodeInt24((*C.char)(hptr),(*C.char)(hend), res))
}
if nSize > 4 {
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = unsafe.Pointer(AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend),
packet.m_nbodySize))
}
if nSize > 8 {
hptr = incPtr(hptr, int(EncodeInt32LE((*C.char)(hptr), packet.m_nInfoField2)))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(AMF_EncodeInt32((*C.char)(hptr), (*C.char)(hend), t))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("sendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
// send all chunks in one HTTP request
// TODO: port RTMP_FEATURE_HTTP
if r.Link.protocol & C.RTMP_FEATURE_HTTP {
chunks := (nSize+nChunkSize-1)/nChunkSize
if chunks > 1 {
tlen = chunks *(cSize+1) +nSize +hSize
// TODO: figure out how to do this in go
tbuf = C.malloc(tlen)
if tbuf == 0 {
return 0
}
toff = tbuf
}
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
// TODO: figure out what's happening here:
// RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize);
// RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize);
if tbuf != nil {
memmove(toff, header, nChunksize + hSize)
toff = incPtr(toff, nChunkSize + hSize)
} else {
// TODO: port this
wrote = C.WriteN(r, (*C.char)(header), C.int(nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
buffer = incPtr(buffer,nChunkSize)
hSize = 0
if nSize > 0 {
header = decPtr(buffer, 1)
hSize = 1
if cSize != 0 {
header = decPtr(header,1)
hSize += cSize
}
if t >= 0xffffff {
header = decPtr(header,4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
indxBytePtr(header,1) = byte(tmp & 0xff)
if cSize == 2 {
indxBytePtr(header,2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
extendedTimestamp := incPtr(header,1+cSize)
// TODO: port this
C.AMF_EncodeInt32((*C.char)(extendedTimestamp),
(*C.char)(incPtr(extendedTimestamp,4)), t)
}
}
}
if tbuf != 0 {
// TODO: port C.writeN
wrote := int(C.WriteN(r, (*C.char)(tbuf), (*C.char)(decPtr(toff,tbuf))))
C.free((*C.char)(tbuf))
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
if packet.m_packetType == C.RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
var method C.AVal
var ptr unsafe.Pointer
ptr = incPtr(unsafe.Pointer(packet.m_body),1)
// TODO: port this
C.AMF_DecodeString((*C.char)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
ptr = incPtr(ptr, 3 + int(method.av_len))
// TODO: port this
txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
// TODO: port this
C.AV_queue(&r.m_methodCalls, &r.m_numCalls, &method, C.int(txn))
}
}
if indxPtr(unsafe.Pointer(r.m_vecChannelsOut),packet.m_nChannel) == 0 {
(*C.char)(indxPtr(r.m_vecChannelsOut)) = C.malloc(unsafe.Sizof(C.RTMPPacket))
}
memmove(indxPtr(unsafe.Pointer(r.m_vecChannelsOut),packet.m_nChannel),
unsafe.Pointer(packet), unsafe.Sizeof(C.RTMPPacket))
return 1
}
/*
func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
if r.Link.protocol & C.RTMP_FEATURE_HTTP {
// TODO: port HTTP_POST
nbytes = int(C.HTTP_POST(r, C.RTMPT_SEND, (*C.char)(ptr), C.int(n)))
} else {
// TODO: port this if necessary
nBytes = RTMPSockBuf_Send(&r.m_sb, (*C.char)(ptr), C.int(n))
}
if nBytes < 0 {
// TODO: port this
sockerr := int(C.GetSockError())
if debugMode {
log.Printf("WriteN, RTMP send error %v (%v bytes)",sockerr, n)
}
if sockerr == C.EINTR && RTMP_ctrlC == 0 {
continue
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
ptr = incPtr(ptr, nBytes)
}
return n == 0
}
// TODO: port RTMP_METHOD
func avQueue(vals **C.RTMP_METHOD, num *int, av *AVal, txn int ) {
var tmp unsafe.Pointer
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
*vals = C.realloc(*vals, (*num+16) * C.int(int(unsafe.sizeof(RTMP_METHOD))))
}
tmp := C.malloc(av.av_len + 1 )
memmove(tmp, unsafe.Pointer(av.av_val), int(av.av_len))
indxPtr(tmp,av.av_len) = byte(nullChar)
}
*/
// memmove copies n bytes from "from" to "to".
//go:linkname memmove runtime.memmove
func memmove(to, from unsafe.Pointer, n uintptr)
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) byte {
return *(*byte)(incPtr(ptr, inc, byteSize))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) int32 {
return *(*int32)(incPtr(ptr, inc, int32Size))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) int64 {
return *(*int64)(incPtr(ptr, inc, int64Size))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr,inc,byteSize)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr,inc,int32Size)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr,inc,int64Size)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr,dec,byteSize)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr,dec,int32Size)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPTr(ptr,dec,int64Size)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}