av/rtmp/rtmp.go

1237 lines
32 KiB
Go

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -L/usr/local/lib -lrtmp
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
#include <sys/socket.h>
typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
} RTMPTCmd;
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int WriteN(RTMP *r, const char *buffer, int n);
int EncodeInt32LE(char *output, int nVal);
int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len);
*/
import "C"
import (
"errors"
"fmt"
"log"
"math"
"reflect"
"strconv"
"unsafe"
)
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_SHARED_OBJECT = 0x13
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
RTMP_MAX_HEADER_SIZE = 18
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
RTMP_PROTOCOL_UNDEFINED = -1
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_CHANNELS = 65600
RTMP_SWF_HASHLEN = 32
)
const (
minDataSize = 11
debugMode = false
)
const (
byteSize = 1
int32Size = 4
int64Size = 8
)
// memmove copies n bytes from "from" to "to".
//go:linkname memmove runtime.memmove
func memmove(to, from unsafe.Pointer, n uintptr)
// av_setDataFrame is a static const global in rtmp.c
var setDataFrame = AVC("@setDataFrame")
var packetSize = [...]int{12, 8, 4, 1}
var RTMPProtocolStringsLower = [...]string{
"rtmp",
"rtmpt",
"rtmpe",
"rtmpte",
"rtmps",
"rtmpts",
"",
"",
"rtmfp",
}
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
rtmp *C.RTMP
url string
timeout uint
}
type RTMP struct {
m_inChunkSize int
m_outChunkSize int
m_nBWCheckCounter int
m_nBytesIn int
m_nBytesInSent int
m_nBufferMS int
m_stream_id int
m_mediaChannel int
m_mediaStamp uint32
m_pauseStamp uint32
m_pausing int
m_nServerBw int
m_nClientBw int
m_nClientBw2 uint8
m_bPlaying uint8
m_bSendEncoding uint8
m_bSendCounter uint8
m_numInvokes int
m_numCalls int
m_methodCalls *RTMP_METHOD
m_channelsAllocatedIn int
m_channelsAllocatedOut int
m_vecChannelsIn **RTMPPacket
m_vecChannelsOut **RTMPPacket
m_channelTimestamp *int
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int
m_polling int
m_resplen int
m_unackd int
m_clientID AVal
m_read RTMP_READ
m_write RTMPPacket
m_sb RTMPSockBuf
Link RTMP_LNK
}
type RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp uint8
m_nChannel int
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *RTMPChunk
m_body *byte
}
type RTMP_METHOD struct {
name AVal
num int
}
type AVal struct {
av_val *byte
av_len int
}
type RTMP_READ struct {
buf *byte
bufpos *byte
buflen uint
timestamp uint32
dataType uint8
flags uint8
status int8
initialFrameType uint8
nResumeTS uint32
metaHeader *byte
initialFrame *byte
nMetaHeaderSize uint32
nInitialFrameSize uint32
nIgnoredFrameCounter uint32
nIgnoredFlvFrameCounter uint32
}
type RTMPSockBuf struct {
sb_socket int
sb_size int
sb_start *byte
sb_buf [C.RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout int
sb_ssl uintptr
}
type RTMPChunk struct {
c_headerSize int
c_chunkSize int
c_chunk *byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
type ushort [2]byte
type RTMP_LNK struct {
hostname AVal
sockshost AVal
playpath0 AVal
playpath AVal
tcUrl AVal
swfUrl AVal
pageUrl AVal
app AVal
auth AVal
flashVer AVal
subscribepath AVal
usherToken AVal
token AVal
pubUser AVal
pubPasswd AVal
extras AMFObject
edepth int
seekTime int
stopTime int
lFlags int
swfAge int
protocol int
timeout int
pFlags int
socksport ushort
port ushort
}
type AMFObject struct {
o_num int
o_props *C.AMFObjectProperty
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
func (s *session) Open() error {
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
var err error
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout))
if s.rtmp == nil {
return err
}
return nil
}
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
ret := endSession(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
}
//if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 {
if rtmpWrite(s.rtmp, data) == 0 {
return 0, Err(2)
}
return len(data), nil
}
func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) {
connect_timeout := C.int(timeout)
rtmp = rtmpAlloc()
rtmpInit(rtmp)
rtmp.Link.timeout = connect_timeout
if rtmpSetupUrl(rtmp, u) == 0 {
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to setup URL!")
}
C.RTMP_EnableWrite(rtmp)
C.RTMP_SetBufferMS(rtmp, 3600*1000)
if C.RTMP_Connect(rtmp, nil) == 0 {
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect!")
}
if C.RTMP_ConnectStream(rtmp, 0) == 0 {
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect stream!")
}
return rtmp, nil
}
func rtmpAlloc() *C.RTMP {
var r C.RTMP
return (*C.RTMP)(allocate(unsafe.Sizeof(r)))
}
func rtmpInit(r *C.RTMP) {
r.m_sb.sb_socket = -1
r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_nBufferMS = 30000
r.m_nClientBW = 2500000
r.m_nClientBW2 = 2
r.m_nServerBW = 2500000
r.m_fAudioCodecs = 3191.0
r.m_fVideoCodecs = 252.0
r.Link.timeout = 30
r.Link.swfAge = 30
}
func rtmpSetupUrl(r *C.RTMP, u string) int32 {
length := len(u)
url := goStrToCStr(u)
var opt, arg AVal
var p1, p2 *byte
ptr := strchr(url, byte(' '))
var ret, len int32
var port uint32
port = 0
len = strlen(url)
ret = int32(C.RTMP_ParseURL((*C.char)(unsafe.Pointer(url)), &r.Link.protocol, &r.Link.hostname,
(*C.uint)(&port), &r.Link.playpath0, &r.Link.app))
if ret == 0 {
return ret
}
r.Link.port = C.ushort(port)
r.Link.playpath = r.Link.playpath0
if r.Link.tcUrl.av_len == 0 {
r.Link.tcUrl.av_val = (*C.char)(unsafe.Pointer(url))
if r.Link.app.av_len != 0 {
if int(uintptr(unsafe.Pointer(r.Link.app.av_val))) <
int(uintptr(incBytePtr(unsafe.Pointer(url), len))) {
r.Link.tcUrl.av_len = r.Link.app.av_len +
int(uintptr(decBytePtr(r.Link.app.av_val, int(uintptr(unsafe.Pointer(url))))))
} else {
len = r.Link.hostname.av_len + r.Link.app.av_len +
int(unsafe.Sizeof("rtmpte://:65535/"))
r.Link.tcUrl.av_val = (*C.char)(allocate(uintptr(len)))
hostname := string(ptrToSlice(unsafe.Pointer(r.Link.hostname.av_val),
int(r.Link.hostname.av_len)))
app := string(ptrToSlice(unsafe.Pointer(r.Link.app.av_val),
int(r.Link.app.av_len)))
fString := fmt.Sprintf("%v://%v:%v/%v",
RTMPProtocolStringsLower[r.Link.protocol], hostname, r.Link.port, app)
r.Link.tcUrl.av_val = (*C.char)(goStrToCStr(fString))
r.Link.tcUrl.av_len = len(RTMPProtocolStringsLower[r.Link.protocol]) +
len(string("://")) + len(hostname) + len(string(":")) +
len(strconv.Itoa(int(r.Link.port))) + len(string("/")) + len(app)
r.Link.lFlags |= RTMP_LF_FTCU
}
} else {
r.Link.tcUrl.av_len = strlen(url)
}
}
C.SocksSetup(r, &r.Link.sockshost)
if r.Link.port == 0 {
switch {
case r.Link.Protocol & RRTMP_FEATURE_SSL:
r.Link.port = 433
case r.Link.protocol & RTMP_FEATURE_HTTP:
r.Link.port = 80
default:
r.Link.port = 1935
}
}
return 1
}
func socksSetup(r *RTMP, sockshost *AVal) {
if sockshost.av_len != 0 {
socksport := strchr((*byte)(unsafe.Pointer(sockshost.av_val)), ':')
hostname := strdup((*byte)(sockshost.av_val))
}
}
func endSession(rtmp *C.RTMP) uint32 {
if rtmp == nil {
return 3
}
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return 0
}
// rtmpWrite writes data to the current rtmp connection encapsulated by r
func rtmpWrite(r *C.RTMP, data []byte) int {
buf := sliceToPtr(data)
// TODO: port RTMPPacket
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
s2 := size
var ret, num int
pkt.m_nChannel = 0x04
pkt.m_nInfoField2 = C.int32_t(r.m_stream_id)
for s2 != 0 {
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
s2 -= 13
}
pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0))
buf = incBytePtr(buf, 1)
pkt.m_nBodySize = C.uint32_t(afmDecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp = C.uint32_t(afmDecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24
buf = incBytePtr(buf, 4)
s2 -= 11
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_nBodySize += 16
}
} else {
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
}
// TODO: Port this
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
return 0
}
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
enc = unsafe.Pointer(afmEncodeString((*byte)(enc), (*byte)(pend), &setDataFrame))
pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) -
uintptr(unsafe.Pointer(pkt.m_body)))))
}
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
if num > s2 {
num = s2
}
//memmove(enc,buf,uintptr(num))
copy(ptrToSlice(enc, num), ptrToSlice(buf, num))
pkt.m_nBytesRead += C.uint32_t(num)
s2 -= num
buf = incBytePtr(buf, num)
if pkt.m_nBytesRead == pkt.m_nBodySize {
// TODO: Port this
ret = sendPacket(r, pkt, 0)
// TODO: Port this
C.RTMPPacket_Free(pkt)
pkt.m_nBytesRead = 0
if ret == 0 {
return -1
}
buf = incBytePtr(buf, 4)
s2 -= 4
if s2 < 0 {
break
}
}
}
return size + s2
}
// afmDecodeInt24 decodes data into an unsigned int
func afmDecodeInt24(data *byte) uint32 {
// TODO Understand logic and simplify
c := (*uint8)(unsafe.Pointer(data))
dst := uint32(int32(*c) << 16)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(1))*unsafe.Sizeof(*c))))) << 8)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(2))*unsafe.Sizeof(*c))))))
return dst
}
func afmEncodeString(output *byte, outend *byte, bv *C.AVal) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) >
uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) >
uintptr(outendPtr) {
return nil
}
if bv.av_len < 65536 {
*(*C.char)(outputPtr) = C.AMF_STRING
incBytePtr(outputPtr, 1)
outputPtr = unsafe.Pointer(afmEncodeInt16((*byte)(outputPtr),
(*byte)(outendPtr), (int16)(bv.av_len)))
} else {
*(*C.char)(outputPtr) = C.AMF_LONG_STRING
incBytePtr(outputPtr, 1)
outputPtr = unsafe.Pointer(afmEncodeInt32((*byte)(outputPtr),
(*byte)(outendPtr), (int32)(bv.av_len)))
}
memmove(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), uintptr(bv.av_len))
//C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len))
incBytePtr(outputPtr, int(bv.av_len))
return (*byte)(outputPtr)
}
// afmEncodeInt16 encodes a int16 into data
func afmEncodeInt16(output *byte, outend *byte, nVal int16) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+2 > uintptr(outendPtr) {
// length < 2
return nil
}
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal & 0xff)
// Assign output[0]
*output = (byte)(nVal >> 8)
return (*byte)(incBytePtr(outputPtr, 2))
}
// afmEncodeInt32 encodes a int32 into data
func afmEncodeInt32(output *byte, outend *byte, nVal int32) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+4 > uintptr(outendPtr) {
// length < 4
return nil
}
// Assign output[3]
forth := (*byte)(incBytePtr(outputPtr, 3))
*forth = (byte)(nVal & 0xff)
// Assign output[2]
third := (*byte)(incBytePtr(outputPtr, 2))
*third = (byte)(nVal >> 8)
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal >> 16)
// Assign output[0]
*output = (byte)(nVal >> 24)
return (*byte)(incBytePtr(outputPtr, 4))
}
// send packet version 1 - less C stuff
func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
var prevPacket *C.RTMPPacket
last := 0
var nSize, hSize, cSize, nChunkSize, tlen int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
var goHbuf [RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
var t int32
var packets unsafe.Pointer
if packet.m_nChannel >= r.m_channelsAllocatedOut {
log.Println("Resize")
n := int(packet.m_nChannel + 10)
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut),
C.size_t(unsafe.Sizeof(packet)*uintptr(n)))
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)*
uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
// TODO: port this constant
packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = packetSize[int(packet.m_headerType)]
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize)
hend = unsafe.Pointer(packet.m_body)
} else {
header = incBytePtr(hbuf, 6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
hend = incBytePtr(hbuf, RTMP_MAX_HEADER_SIZE)
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header, 4)
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = c
hptr = incBytePtr(hptr, 1)
if cSize != 0 {
tmp := packet.m_nChannel - 64
*(*byte)(hptr) = byte(tmp & 0xff)
hptr = incBytePtr(hptr, 1)
if cSize == 2 {
*(*byte)(hptr) = byte(tmp >> 8)
hptr = incBytePtr(hptr, 1)
}
}
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
hptr = unsafe.Pointer(afmEncodeInt24((*byte)(hptr), (*byte)(hend), res))
}
if nSize > 4 {
hptr = unsafe.Pointer(afmEncodeInt24((*byte)(hptr), (*byte)(hend), (int32(packet.m_nBodySize))))
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = incBytePtr(hptr, 1)
}
if nSize > 8 {
hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr),
C.int(packet.m_nInfoField2))))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(afmEncodeInt32((*byte)(hptr), (*byte)(hend), (int32)(t)))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("sendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
// send all chunks in one HTTP request
// TODO: port RTMP_FEATURE_HTTP
if int(r.Link.protocol&RTMP_FEATURE_HTTP) != 0 {
chunks := (nSize + nChunkSize - 1) / nChunkSize
if chunks > 1 {
tlen = chunks*(cSize+1) + nSize + hSize
// TODO: figure out how to do this in go
tbuf = C.malloc(C.size_t(tlen))
if tbuf == nil {
return 0
}
toff = tbuf
}
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
if tbuf != nil {
//memmove(toff, header, uintptr(nChunkSize + hSize))
copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header,
int(nChunkSize+hSize)))
toff = incBytePtr(toff, nChunkSize+hSize)
} else {
// TODO: port this
wrote = int(writeN(r, header, nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
buffer = incBytePtr(buffer, nChunkSize)
hSize = 0
if nSize > 0 {
header = decBytePtr(buffer, 1)
hSize = 1
if cSize != 0 {
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header, 4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
*indxBytePtr(header, 1) = byte(tmp & 0xff)
if cSize == 2 {
*indxBytePtr(header, 2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
extendedTimestamp := incBytePtr(header, 1+cSize)
afmEncodeInt32((*byte)(extendedTimestamp),
(*byte)(incBytePtr(extendedTimestamp, 4)), (int32)(t))
}
}
}
if tbuf != nil {
// TODO: port C.writeN
wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff,
int(uintptr(unsafe.Pointer(tbuf))))))))
C.free(tbuf)
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
var method C.AVal
var ptr unsafe.Pointer
ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1)
afmDecodeString((*byte)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
ptr = incBytePtr(ptr, 3+int(method.av_len))
// TODO: port this
txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
// TODO: port this
C.AV_queue(&r.m_methodCalls, &r.m_numCalls, &method, C.int(txn))
}
}
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil {
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) =
(*C.RTMPPacket)(C.malloc(C.size_t(unsafe.Sizeof(*packet))))
}
memmove(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
uintptr(unsafe.Sizeof(*packet)))
//C.memcpy(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
//int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
//C.size_t(uintptr(unsafe.Sizeof(*packet))))
return 1
}
// afmDecodeString decodes data into a string inside a AVal
func afmDecodeString(data *byte, bv *C.AVal) {
dataPtr := unsafe.Pointer(data)
bv.av_len = C.int(afmDecodeInt16((*byte)(dataPtr)))
if bv.av_len > 0 {
bv.av_val = (*C.char)(incBytePtr(dataPtr, 2))
} else {
bv.av_val = nil
}
}
// afmDecodeInt16 decodes data into a 16 bit number
func afmDecodeInt16(data *byte) uint16 {
c := unsafe.Pointer(data)
return (uint16(*(*uint8)(c)) << 8) | *(*uint16)(incBytePtr(c, 1))
}
// afmEncodeInt24 encodes a int24 into data
func afmEncodeInt24(output *byte, outend *byte, nVal int32) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+3 > uintptr(outendPtr) {
// length < 3
return nil
}
// Assign output[2]
third := (*byte)(incBytePtr(outputPtr, 2))
*third = (byte)(nVal & 0xff)
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal >> 8)
// Assign output[0]
*output = (byte)(nVal >> 16)
return (*byte)(incBytePtr(outputPtr, 3))
}
func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
if (r.Link.protocol & RTMP_FEATURE_HTTP) != 0 {
// TODO: port HTTP_POST
nBytes = httpPost(r, RTMPT_SEND, (*byte)(ptr), n)
} else {
// TODO: port this if necessary
nBytes = int(sockBufSend(&r.m_sb, (*byte)(ptr), int32(n)))
}
if nBytes < 0 {
if debugMode {
log.Println("WriteN, RTMP send error")
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
ptr = incBytePtr(ptr, nBytes)
}
if n == 0 {
return 1
}
return 0
}
const length = 512
var RTMPT_cmds = []string{
"open",
"send",
"idle",
"close",
}
func httpPost(r *C.RTMP, cmd C.RTMPTCmd, buf *byte, l int) int {
res := ""
if r.m_clientID.av_val != nil {
res = string(ptrToSlice(unsafe.Pointer(r.m_clientID.av_val),
int(r.m_clientID.av_len)))
}
fString := fmt.Sprintf("POST /%s%s/%d HTTP/1.1\r\n"+
"Host: %v:%d\r\n"+
"Accept: */*\r\n"+
"User-Agent: Shockwave Flash\r\n"+
"Connection: Keep-Alive\r\n"+
"Cache-Control: no-cache\r\n"+
"Content-type: application/x-fcs\r\n"+
"Content-length: %d\r\n\r\n", RTMPT_cmds[cmd], res, r.m_msgCounter,
r.Link.hostname.av_len, r.Link.hostname.av_val, r.Link.port, l)
hlen := len(fString)
hbuf := (*byte)(unsafe.Pointer(&(([]byte(fString))[0])))
// TODO: port this
sockBufSend(&r.m_sb, (*byte)(unsafe.Pointer(hbuf)), int32(hlen))
hlen = int(sockBufSend(&r.m_sb, (*byte)(unsafe.Pointer(buf)), int32(l)))
r.m_msgCounter++
r.m_unackd++
return hlen
}
func sockBufSend(sb *C.RTMPSockBuf, buf *byte, l int32) int32 {
return int32(C.send(sb.sb_socket, unsafe.Pointer(buf), C.size_t(l), 0))
}
// TODO: port RTMP_METHOD
func avQueue(vals **C.RTMP_METHOD, num *int, av *C.AVal, txn int) {
var rtmpMethodPtr *C.RTMP_METHOD
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
*vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)*int(unsafe.Sizeof(*(*vals))))))
}
tmp := unsafe.Pointer(C.malloc(C.size_t(av.av_len + 1)))
memmove(tmp, unsafe.Pointer(av.av_val), uintptr(av.av_len))
//C.memcpy(tmp, unsafe.Pointer(av.av_val), C.size_t(av.av_len))
*indxBytePtr(tmp, int(av.av_len)) = *(*byte)(unsafe.Pointer(C.CString("")))
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
int(unsafe.Sizeof(rtmpMethodPtr))))).num = C.int(txn)
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_len = av.av_len
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_val = (*C.char)(tmp)
}
// wrapper for converting byte pointer to unsafe.Pointer
func uBP(b *byte) unsafe.Pointer {
return unsafe.Pointer(b)
}
// wrapper for converting slice to unsafe.pointer
func uBSP(b []byte) unsafe.Pointer {
return unsafe.Pointer(&b[0])
}
func goStrToCStr(str string) *byte {
l := len(str)
slice := make([]byte, l+1)
ptr := unsafe.Pointer(&[]byte(str)[0])
for i := 0; i < l; i++ {
slice[i] = *indxBytePtr(ptr, i)
}
slice[l] = '\000'
return slice
}
func strlen(str *byte) int32 {
for i := 0; true; i++ {
ptr = indxBytePtr(unsafe.Pointer(str), i)
if *ptr == '\000' {
return i
}
}
return 0
}
func strchr(str *byte, val byte) *byte {
var ptr *byte
for i := 0; true; i++ {
ptr = indxBytePtr(unsafe.Pointer(str), i)
if *ptr == val {
return ptr
}
if *ptr == '\000' {
break
}
}
return nil
}
func allocate(nOfBytes uintptr) unsafe.Pointer {
mem := make([]byte, int(nOfBytes))
return unsafe.Pointer(&mem[0])
}
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) *byte {
return (*byte)(incPtr(ptr, inc, byteSize))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 {
return (*int32)(incPtr(ptr, inc, int32Size))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 {
return (*int64)(incPtr(ptr, inc, int64Size))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, byteSize)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, int32Size)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, int64Size)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, byteSize)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, int32Size)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, int64Size)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
// C.AVal is in amf.h
// See AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}