av/rtmp/rtmp.go

2448 lines
67 KiB
Go

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -L/usr/local/lib -lrtmp
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
} RTMPTCmd;
typedef struct sockaddr_in sockaddr_in;
typedef struct sockaddr sockaddr;
int add_addr_info(struct sockaddr_in *service, AVal *host, int port);
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int C_WriteN(RTMP *r, const char *buffer, int n);
int EncodeInt32LE(char *output, int nVal);
int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len);
*/
import "C"
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"reflect"
"strconv"
"unsafe"
"github.com/chamaken/cgolmnl/inet"
)
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
AMF_NUMBER = iota
AMF_BOOLEAN
AMF_STRING
AMF_OBJECT
AMF_MOVIECLIP /* reserved, not used */
AMF_NULL
AMF_UNDEFINED
AMF_REFERENCE
AMF_ECMA_ARRAY
AMF_OBJECT_END
AMF_STRICT_ARRAY
AMF_DATE
AMF_LONG_STRING
AMF_UNSUPPORTED
AMF_RECORDSET /* reserved, not used */
AMF_XML_DOC
AMF_TYPED_OBJECT
AMF_AVMPLUS /* switch to AMF3 */
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_SHARED_OBJECT = 0x13
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const (
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
)
const (
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
)
const (
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
)
const (
RTMP_PROTOCOL_UNDEFINED = -1
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
)
const (
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_CHANNELS = 65600
RTMP_SWF_HASHLEN = 32
RTMP_SIG_SIZE = 1536
RTMP_LARGE_HEADER_SIZE = 12
AMF_INVALID = 0xff
RTMP_MAX_HEADER_SIZE = 18
)
const (
minDataSize = 11
debugMode = false
)
// av_setDataFrame is a static const global in rtmp.c
var (
setDataFrame = AVC("@setDataFrame")
av_connect = AVC("connect")
av_app = AVC("app")
av_type = AVC("type")
av_nonprivate = AVC("nonprivate")
av_flashVer = AVC("flashVer")
av_swfUrl = AVC("swfUrl")
av_tcUrl = AVC("tcUrl")
av_fpad = AVC("fpad")
av_capabilities = AVC("capabilities")
av_audioCodecs = AVC("audioCodecs")
av_videoCodecs = AVC("videoCodecs")
av_videoFunction = AVC("videoFunction")
av_pageUrl = AVC("pageUrl")
av_objectEncoding = AVC("objectEncoding")
)
var packetSize = [...]int{12, 8, 4, 1}
var RTMPProtocolStringsLower = [...]string{
"rtmp",
"rtmpt",
"rtmpe",
"rtmpte",
"rtmps",
"rtmpts",
"",
"",
"rtmfp",
}
type Session interface {
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
rtmp *C.RTMP
url string
timeout uint
}
// typedef struct RTMP
// rtmp.h +237
type C_RTMP struct {
m_inChunkSize int32
m_outChunkSize int32
m_nBWCheckCounter int32
m_nBytesIn int32
m_nBytesInSent int32
m_nBufferMS int32
m_stream_id int32
m_mediaChannel int32
m_mediaStamp uint32
m_pauseStamp uint32
m_pausing int32
m_nServerBw int32
m_nClientBw int32
m_nClientBw2 uint8
m_bPlaying uint8
m_bSendEncoding uint8
m_bSendCounter uint8
m_numInvokes int32
m_numCalls int32
m_methodCalls *C.RTMP_METHOD
m_channelsAllocatedIn int32
m_channelsAllocatedOut int32
m_vecChannelsIn **C.RTMPPacket
m_vecChannelsOut **C.RTMPPacket
m_channelTimestamp *int32
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int32
m_polling int32
m_resplen int32
m_unackd int32
m_clientID C.AVal
m_read C.RTMP_READ
m_write C.RTMPPacket
m_sb C.RTMPSockBuf
Link C.RTMP_LNK
}
// typedef struct RTMPPacket
// rtmp.h +113
type C_RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp uint8
m_nChannel int32
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *C.RTMPChunk
m_body *byte
}
// typedef struct RTMPMethod
// rtmp.h +231
type C_RTMP_METHOD struct {
name C.AVal
num int32
}
// typedef struct C.AVal
// amf.h +57
type C_AVal struct {
av_val *byte
av_len int32
}
// typedef struct RTMP_READ
// rtmp.h +200
type C_RTMP_READ struct {
buf *byte
bufpos *byte
buflen uint
timestamp uint32
dataType uint8
flags uint8
status int8
initialFrameType uint8
nResumeTS uint32
metaHeader *byte
initialFrame *byte
nMetaHeaderSize uint32
nInitialFrameSize uint32
nIgnoredFrameCounter uint32
nIgnoredFlvFrameCounter uint32
}
// typedef struct RTMP_READ
// rtmp.h +200
type C_RTMPSockBuf struct {
sb_socket int32
sb_size int32
sb_start *byte
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout int32
sb_ssl uintptr
}
// typedef struct RTMPChunk
// rtmp.h +105
type C_RTMPChunk struct {
c_headerSize int32
c_chunkSize int32
c_chunk *byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
// typedef struct RTMP_LNK
// rtmp.h +144
type C_RTMP_LNK struct {
hostname C.AVal
sockshost C.AVal
playpath0 C.AVal
playpath C.AVal
tcUrl C.AVal
swfUrl C.AVal
pageUrl C.AVal
app C.AVal
auth C.AVal
flashVer C.AVal
subscribepath C.AVal
usherToken C.AVal
token C.AVal
pubUser C.AVal
pubPasswd C.AVal
extras C.AMFObject
edepth int32
seekTime int32
stopTime int32
lFlags int32
swfAge int32
protocol int32
timeout int32
pFlags int32
socksport uint16
port uint16
}
// typedef struct AMF_Object
// amf.h +67
type C_AMFObject struct {
o_num int32
o_props *C.AMFObjectProperty
}
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
func (s *session) Open() error {
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
var err error
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout))
if s.rtmp == nil {
return err
}
return nil
}
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
ret := endSession(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
if C_RTMP_IsConnected(s.rtmp) == 0 {
//if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
}
// if C_RTMP_Write(s.rtmp, data) == 0 {
if C.RTMP_Write(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.int(len(data))) == 0 {
return 0, Err(2)
}
return len(data), nil
}
// int RTMP_IsConnected(RTMP *r);
// rtmp.c +363
func C_RTMP_IsConnected(r *C.RTMP) int32 {
if r.m_sb.sb_socket != -1 {
return 1
}
return 0
}
func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) {
connect_timeout := C.int(timeout)
rtmp = C_RTMP_Alloc()
//rtmp = C.RTMP_Alloc()
C_RTMP_Init(rtmp)
//C.RTMP_Init(rtmp)
rtmp.Link.timeout = connect_timeout
if C_RTMP_SetupURL(rtmp, u) == 0 {
// if C.RTMP_SetupURL(rtmp, C.CString(u)) == 0 {
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to setup URL!")
}
C_RTMP_EnableWrite(rtmp)
//C.RTMP_EnableWrite(rtmp)
C_RTMP_SetBufferMS(rtmp, 3600*1000)
//C.RTMP_SetBufferMS(rtmp, 3600*1000)
if C_RTMP_Connect(rtmp, nil) == 0 {
//if C.RTMP_Connect(rtmp, nil) == 0 {
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect!")
}
// TODO: port this
// if C_RTMP_ConnectStream(rtmp, 0) == 0 {
if C.RTMP_ConnectStream(rtmp, 0) == 0 {
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect stream!")
}
return rtmp, nil
}
// RTMP* RTMP_IsConnected();
// rtmp.c +317
func C_RTMP_Alloc() *C.RTMP {
var r C.RTMP
//return (*C.RTMP)(C.malloc(C.size_t(unsafe.Sizeof(r))))
return (*C.RTMP)(allocate(unsafe.Sizeof(r)))
}
// void RTMP_Init(RTMP *r);
// rtmp.c +329
func C_RTMP_Init(r *C.RTMP) {
r.m_sb.sb_socket = -1
r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_nBufferMS = 30000
r.m_nClientBW = 2500000
r.m_nClientBW2 = 2
r.m_nServerBW = 2500000
r.m_fAudioCodecs = 3191.0
r.m_fVideoCodecs = 252.0
r.Link.timeout = 30
r.Link.swfAge = 30
}
// int RTMP_SetupURL(RTMP *r, char* url);
// rtmp.c +757
// NOTE: code dealing with rtmp over http has been disregarded
func C_RTMP_SetupURL(r *C.RTMP, u string) int32 {
url := goStrToCStr(u)
var ret, length int32
var port uint32
port = 0
length = strlen(url)
// TODO: port this
ret = int32(C.RTMP_ParseURL((*C.char)(unsafe.Pointer(url)), &r.Link.protocol, &r.Link.hostname,
(*C.uint)(&port), &r.Link.playpath0, &r.Link.app))
if ret == 0 {
return ret
}
r.Link.port = C.ushort(port)
r.Link.playpath = r.Link.playpath0
if r.Link.tcUrl.av_len == 0 {
r.Link.tcUrl.av_val = (*C.char)(unsafe.Pointer(url))
if r.Link.app.av_len != 0 {
if int(uintptr(unsafe.Pointer(r.Link.app.av_val))) <
int(uintptr(incBytePtr(unsafe.Pointer(url), int(length)))) {
r.Link.tcUrl.av_len = C.int(int(r.Link.app.av_len) +
int(uintptr(decBytePtr(unsafe.Pointer(r.Link.app.av_val),
int(uintptr(unsafe.Pointer(url)))))))
} else {
length = int32(r.Link.hostname.av_len) + int32(r.Link.app.av_len) +
int32(len("rtmpte://:65535/\x00"))
r.Link.tcUrl.av_val = (*C.char)(allocate(uintptr(length)))
hostname := string(ptrToSlice(unsafe.Pointer(r.Link.hostname.av_val),
int(r.Link.hostname.av_len)))
app := string(ptrToSlice(unsafe.Pointer(r.Link.app.av_val),
int(r.Link.app.av_len)))
fString := fmt.Sprintf("%v://%v:%v/%v",
RTMPProtocolStringsLower[r.Link.protocol], hostname, r.Link.port, app)
r.Link.tcUrl.av_val = (*C.char)(bToUP(goStrToCStr(fString)))
r.Link.tcUrl.av_len = C.int(strLen(RTMPProtocolStringsLower[r.Link.protocol]) +
strLen(string("://")) + strLen(hostname) + strLen(string(":")) +
strLen(strconv.Itoa(int(r.Link.port))) + strLen(string("/")) + strLen(app))
r.Link.lFlags |= RTMP_LF_FTCU
}
} else {
r.Link.tcUrl.av_len = C.int(strlen(url))
}
}
C_SocksSetup(r, &r.Link.sockshost)
if r.Link.port == 0 {
switch {
case (r.Link.protocol & RTMP_FEATURE_SSL) != 0:
r.Link.port = 433
case (r.Link.protocol & RTMP_FEATURE_HTTP) != 0:
r.Link.port = 80
default:
r.Link.port = 1935
}
}
return 1
}
// void SocksSetup(RTMP *r, C.AVal* sockshost);
// rtmp.c +410
func C_SocksSetup(r *C.RTMP, sockshost *C.AVal) {
if sockshost.av_len != 0 {
socksport := strchr((*byte)(unsafe.Pointer(sockshost.av_val)), ':')
hostname := strdup((*byte)(unsafe.Pointer(sockshost.av_val)))
if unsafe.Pointer(socksport) != nil {
*indxBytePtr(unsafe.Pointer(hostname),
int(uintptr(decBytePtr(unsafe.Pointer(socksport),
int(uintptr(unsafe.Pointer(sockshost.av_val))))))) = '\000'
r.Link.sockshost.av_val = (*C.char)(unsafe.Pointer(hostname))
r.Link.sockshost.av_len = C.int(strlen(hostname))
value, err := strconv.Atoi(string(ptrToSlice(unsafe.Pointer(uintptr(
unsafe.Pointer(socksport))+uintptr(1)), int(strlen((*byte)(unsafe.Pointer(
uintptr(unsafe.Pointer(socksport))+uintptr(1)))))+1)))
if err != nil {
log.Println("C_SocksSetup: bad string conversion!")
}
if uintptr(unsafe.Pointer(socksport)) == 0 {
value = 1080
}
r.Link.socksport = C.ushort(value)
}
} else {
r.Link.sockshost.av_val = nil
r.Link.sockshost.av_len = 0
r.Link.socksport = 0
}
}
/*
func rtmpClose(r *C.RTMP) {
closeInternal(r, 0)
}
func closeInternal(r *C.RTMP, reconnect int32) {
var i int32
if C_RTMP_IsConnected(r) != 0 {
if r.m_stream_id > 0 {
i = int32(r.m_stream_id)
if r.Link.protocol&RTMP_FEATURE_WRITE != 0 {
C.SendFCUnpublish(r)
}
C.SendDeleteStream(r, C.double(i))
}
C.RTMPSockBuf_Close(&r.m_sb)
}
r.m_stream_id = -1
r.m_sb.sb_socket = -1
r.m_nBWCheckCounter = 0
r.m_nBytesIn = 0
r.m_nBytesInSent = 0
if r.m_read.flags&RTMP_READ_HEADER != 0 {
C.free(unsafe.Pointer(r.m_read.buf))
r.m_read.buf = nil
}
r.m_read.dataType = 0
r.m_read.flags = 0
r.m_read.status = 0
r.m_read.nResumeTS = 0
r.m_read.nIgnoredFrameCounter = 0
r.m_read.nIgnoredFlvFrameCounter = 0
r.m_write.m_nBytesRead = 0
C.RTMPPacket_Free(&r.m_write)
for i := 0; i < int(r.m_channelsAllocatedIn); i++ {
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i,
int(unsafe.Sizeof(&r.m_write)))) != nil {
C.RTMPPacket_Free(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i,
int(unsafe.Sizeof(&r.m_write)))))
C.free(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
i, int(unsafe.Sizeof(&r.m_write))))))
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
i, int(unsafe.Sizeof(&r.m_write)))) = nil
}
}
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
C.AV_clear(r.m_methodCalls, r.m_numCalls)
r.m_methodCalls = nil
r.m_numCalls = 0
r.m_numInvokes = 0
r.m_bPlaying = C.uchar(0)
r.m_sb.sb_size = 0
r.m_msgCounter = 0
r.m_resplen = 0
r.m_unackd = 0
if ((r.Link.lFlags & RTMP_LF_FTCU) != 0) && (reconnect == 0) {
C.free(unsafe.Pointer(r.Link.app.av_val))
r.Link.app.av_val = nil
r.Link.lFlags ^= RTMP_LF_FAPU
}
if reconnect == 0 {
C.free(unsafe.Pointer(r.Link.playpath0.av_val))
r.Link.playpath0.av_val = nil
}
}
*/
// void RTMP_EnableWrite(RTMP *r);
// rtmp.c +351
func C_RTMP_EnableWrite(r *C.RTMP) {
r.Link.protocol |= RTMP_FEATURE_WRITE
}
// void RTMP_SetBufferMS(RTMP *r, int size);
// rtmp.c +381
func C_RTMP_SetBufferMS(r *C.RTMP, size int32) {
r.m_nBufferMS = C.int(size)
}
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C.RTMP, cp *C.RTMPPacket) int {
// TODO: port this
var service C.sockaddr_in
if r.Link.hostname.av_len == 0 {
return 0
}
memset((*byte)(unsafe.Pointer(&service)), 0, int(unsafe.Sizeof(service)))
// TODO: port this
service.sin_family = C.AF_INET
if r.Link.socksport != 0 {
// TODO: port this
if C.add_addr_info(&service, &r.Link.sockshost, C.int(r.Link.socksport)) == 0 {
return 0
}
} else {
// connect directly
if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)),
C.int(r.Link.port)) == 0 {
return 0
}
}
//if C.RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 {
if C_RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 {
return 0
}
r.m_bSendCounter = 1
return int(C_RTMP_Connect1(r, cp))
//return int(C.RTMP_Connect1(r, cp))
}
// int RTMP_Connect0(RTMP *r, struct sockaddr* service);
// rtmp.c +906
func C_RTMP_Connect0(r *C.RTMP, service *C.sockaddr) int {
on := 1
r.m_sb.sb_timedout = 0
r.m_pausing = 0
r.m_fDuration = 0
r.m_sb.sb_socket = C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP)
if r.m_sb.sb_socket != -1 {
if C.connect(r.m_sb.sb_socket, service, C.socklen_t(unsafe.Sizeof(*service))) < 0 {
log.Println("C_RTMP_Connect0, failed to connect socket.")
}
if r.Link.socksport != 0 {
if debugMode {
log.Println("C_RTMP_Connect0: socks negotiation.")
}
if C.SocksNegotiate(r) == 0 {
log.Println("C_RTMP_Connect0: SOCKS negotiation failed.")
return 0
}
}
} else {
log.Println("C_RTMP_Connect0: failed to create socket.")
return 0
}
{
var tv int32
SET_RCVTIMEO(&tv, int32(r.Link.timeout))
// tv := C.int(r.Link.timeout * 1000)
if C.setsockopt(r.m_sb.sb_socket, C.SOL_SOCKET, C.SO_RCVTIMEO,
unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 {
log.Println("C_RTMP_Connect0: Setting socket timeout failed")
}
}
C.setsockopt(r.m_sb.sb_socket, C.IPPROTO_TCP, C.TCP_NODELAY,
unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on)))
return 1
}
func SET_RCVTIMEO(tv *int32, s int32) {
*tv = s * 1000
}
// int RTMP_Connect1(RTMP* r, RTMPPacket* cp);
// rtmp.c +978
func C_RTMP_Connect1(r *C.RTMP, cp *C.RTMPPacket) int {
if debugMode {
log.Println("... connected, handshaking...")
}
//if C.HandShake(r, 1) == 0 {
if C_HandShake(r, 1) == 0 {
log.Println("C_RTMP_Connect1: handshake failed!")
return 0
}
if debugMode {
log.Println("... handshaked...")
}
//if C.SendConnectPacket(r, cp) == 0 {
if C_SendConnectPacket(r, cp) == 0 {
log.Println("RTMP connect failed!")
return 0
}
return 1
}
// int HandShake(RTMP* r, int FP9HandShake);
// rtmp.c +3744
func C_HandShake(r *C.RTMP, FP9HandShake int32) int {
var bMatch int
//uptime := uint32(0)
//suptime := uint32(0)
//typ := byte(0)
var uptime, suptime uint32
var typ byte
//clientbuf := make([]byte, RTMP_SIG_SIZE+1)
var clientbuf [RTMP_SIG_SIZE + 1]byte
clientsig := (*byte)(incBytePtr(unsafe.Pointer(&clientbuf[0]), 1))
//serversig := make([]byte, RTMP_SIG_SIZE)
var serversig [RTMP_SIG_SIZE]byte
clientbuf[0] = 0x03 // not encrypted
// TODO: port rtmp_getTime
uptime = inet.Htonl(uint32(C.RTMP_GetTime()))
memmove(unsafe.Pointer(clientsig), unsafe.Pointer(&uptime), 4)
memset(indxBytePtr(unsafe.Pointer(clientsig), 4), 0, 4)
for i := 8; i < RTMP_SIG_SIZE; i++ {
*indxBytePtr(unsafe.Pointer(clientsig), i) = byte(rand.Intn(256))
}
if C_WriteN(r, unsafe.Pointer(&clientbuf[0]), RTMP_SIG_SIZE+1) == 0 {
return 0
}
//if C.ReadN(r, (*C.char)(unsafe.Pointer(&typ)), 1) != 1 {
if C_ReadN(r, (*byte)(unsafe.Pointer(&typ)), 1) != 1 {
return 0
}
if debugMode {
log.Println("C_HandShake: Type answer: %v", typ)
}
if typ != clientbuf[0] {
log.Println("C_HandShake: type mismatch: client sent %v, server sent: %v",
clientbuf[0], typ)
}
if C_ReadN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
//if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
return 0
}
// decode server response
memmove(unsafe.Pointer(&suptime), unsafe.Pointer(&serversig[0]), 4)
suptime = inet.Ntohl(suptime)
// 2nd part of handshake
if C_WriteN(r, unsafe.Pointer(&serversig[0]), RTMP_SIG_SIZE) == 0 {
return 0
}
if C_ReadN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
//if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
return 0
}
// TODO: find golang memcmp
bMatch = 0
if memcmp(unsafe.Pointer(&serversig[0]), unsafe.Pointer(clientsig),
RTMP_SIG_SIZE) == 0 {
bMatch = 1
}
if bMatch == 0 {
log.Println("Client signature does not match!")
}
return 1
}
// int ReadN(RTMP* r, char* buffer, int n);
// rtmp.c +1390
func C_ReadN(r *C.RTMP, buffer *byte, n int) int {
nOriginalSize := n
var avail int
var ptr *byte
r.m_sb.sb_timedout = 0
ptr = buffer
for n > 0 {
nBytes := 0
var nRead int
avail = int(r.m_sb.sb_size)
if avail == 0 {
if C_RTMPSockBuf_Fill(&r.m_sb) < 1 {
// if C.RTMPSockBuf_Fill(&r.m_sb) < 1 {
if r.m_sb.sb_timedout == 0 {
return 0
}
}
avail = int(r.m_sb.sb_size)
}
if n < avail {
nRead = n
} else {
nRead = avail
}
if nRead > 0 {
memmove(unsafe.Pointer(ptr), unsafe.Pointer(r.m_sb.sb_start), uintptr(nRead))
r.m_sb.sb_start = (*C.char)(incBytePtr(unsafe.Pointer(r.m_sb.sb_start),
nRead))
r.m_sb.sb_size -= C.int(nRead)
nBytes = nRead
r.m_nBytesIn += C.int(nRead)
if r.m_bSendCounter != 0 && r.m_nBytesIn > (r.m_nBytesInSent+
r.m_nClientBW/10) {
//if C.SendBytesReceived(r) == 0 {
if C_SendBytesReceived(r) == 0 {
return 0
}
}
}
if nBytes == 0 {
log.Println("RTMP socket closed by peer")
// RTMP_Close(r)
break
}
n -= nBytes
ptr = (*byte)(incBytePtr(unsafe.Pointer(ptr), nBytes))
}
return nOriginalSize - n
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C.RTMPSockBuf) int {
var nBytes int
if sb.sb_size == 0 {
sb.sb_start = &sb.sb_buf[0]
}
for {
nBytes = int(unsafe.Sizeof(sb.sb_buf)) - 1 - int(sb.sb_size) -
int(uintptr(unsafe.Pointer(sb.sb_start))-uintptr(unsafe.Pointer(
&sb.sb_buf[0])))
// TODO: figure out what to do with recv
nBytes = int(C.recv(sb.sb_socket, unsafe.Pointer(uintptr(unsafe.Pointer(
sb.sb_start))+uintptr(int(sb.sb_size))), C.size_t(nBytes), 0))
if nBytes != -1 {
sb.sb_size += C.int(nBytes)
} else {
log.Println("C_RTMPSockBuf_Fill: recv error!")
}
break
}
return nBytes
}
// int SendBytesReceived(RTMP* r);
// rtmp.c +2080
func C_SendBytesReceived(r *C.RTMP) int {
var packet C.RTMPPacket
var pbuf [256]byte
pend := (*byte)(incBytePtr(unsafe.Pointer(&pbuf[0]), 256))
packet.m_nChannel = 0x02 /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM
packet.m_packetType = RTMP_PACKET_TYPE_BYTES_READ_REPORT
packet.m_nTimeStamp = 0
packet.m_nInfoField2 = 0
packet.m_hasAbsTimestamp = 0
packet.m_body = (*C.char)(incBytePtr(unsafe.Pointer(&pbuf[0]),
RTMP_MAX_HEADER_SIZE))
packet.m_nBodySize = 4
C_AMF_EncodeInt32((*byte)(unsafe.Pointer(packet.m_body)), pend, int32(r.m_nBytesIn))
// C.AMF_EncodeInt32(packet.m_body, (*C.char)(unsafe.Pointer(pend)), r.m_nBytesIn)
r.m_nBytesInSent = r.m_nBytesIn
//return int(C.RTMP_SendPacket(r, &packet, 0))
return C_RTMP_SendPacket(r, &packet, 0)
}
// int SendConnectPacket(RTMP* r, RTMPPacket* cp);
// rtmp.c +1579
func C_SendConnectPacket(r *C.RTMP, cp *C.RTMPPacket) int {
var packet C.RTMPPacket
var pbuf [4096]byte
pend := (*byte)(unsafe.Pointer(incBytePtr(unsafe.Pointer(&pbuf[0]),
int(unsafe.Sizeof(pbuf)))))
var enc *byte
if cp != nil {
return C_RTMP_SendPacket(r, cp, 1)
//return int(C.RTMP_SendPacket(r, cp, 1))
}
packet.m_nChannel = 0x03
packet.m_headerType = RTMP_PACKET_SIZE_LARGE
packet.m_packetType = RTMP_PACKET_TYPE_INVOKE
packet.m_nTimeStamp = 0
packet.m_nInfoField2 = 0
packet.m_hasAbsTimestamp = 0
packet.m_body = (*C.char)(incBytePtr(unsafe.Pointer(&pbuf[0]),
RTMP_MAX_HEADER_SIZE))
enc = (*byte)(unsafe.Pointer(packet.m_body))
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeString((*C.char)(unsafe.Pointer(enc)),
//(*C.char)(unsafe.Pointer(pend)), &av_connect)))
enc = C_AMF_EncodeString(enc, pend, &av_connect)
r.m_numInvokes += 1
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNumber((*C.char)(unsafe.Pointer(enc)),
//(*C.char)(unsafe.Pointer(pend)), C.double(r.m_numInvokes))))
enc = C_AMF_EncodeNumber(enc, pend, float64(r.m_numInvokes))
*indxBytePtr(unsafe.Pointer(enc), 0) = AMF_OBJECT
enc = (*byte)(unsafe.Pointer(incBytePtr(unsafe.Pointer(enc), 1)))
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_app, &r.Link.app)))
enc = C_AMF_EncodeNamedString(enc, pend, &av_app, &r.Link.app)
if enc == nil {
return 0
}
if r.Link.protocol&RTMP_FEATURE_WRITE != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_type, &av_nonprivate)))
enc = C_AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate)
if enc == nil {
return 0
}
}
if r.Link.flashVer.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_flashVer, &r.Link.flashVer)))
enc = C_AMF_EncodeNamedString(enc, pend, &av_flashVer, &r.Link.flashVer)
if enc == nil {
return 0
}
}
if r.Link.swfUrl.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_swfUrl, &r.Link.swfUrl)))
enc = C_AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r.Link.swfUrl)
if enc == nil {
return 0
}
}
if r.Link.tcUrl.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_tcUrl, &r.Link.tcUrl)))
enc = C_AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r.Link.tcUrl)
if enc == nil {
return 0
}
}
if r.Link.protocol&RTMP_FEATURE_WRITE == 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedBoolean((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_fpad, 0)))
enc = C_AMF_EncodeNamedBoolean(enc, pend, &av_fpad, 0)
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_capabilities, 15.0)))
enc = C_AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0)
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_audioCodecs, r.m_fAudioCodecs)))
enc = C_AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, float64(r.m_fAudioCodecs))
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_videoCodecs, r.m_fVideoCodecs)))
enc = C_AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, float64(r.m_fVideoCodecs))
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_videoFunction, 1.0)))
enc = C_AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0)
if enc == nil {
return 0
}
if r.Link.pageUrl.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_pageUrl, &r.Link.pageUrl)))
enc = C_AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r.Link.pageUrl)
if enc == nil {
return 0
}
}
}
if r.m_fEncoding != 0.0 || r.m_bSendEncoding != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_objectEncoding, r.m_fEncoding)))
enc = C_AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, float64(r.m_fEncoding))
if enc == nil {
return 0
}
}
if int(uintptr(incBytePtr(unsafe.Pointer(enc), 3))) >= int(uintptr(
unsafe.Pointer(pend))) {
return 0
}
*indxBytePtr(unsafe.Pointer(enc), 0) = 0
enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1))
*indxBytePtr(unsafe.Pointer(enc), 0) = 0
enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1))
*indxBytePtr(unsafe.Pointer(enc), 0) = C.AMF_OBJECT_END
enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1))
/* add auth string */
if r.Link.auth.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeBoolean((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), r.Link.lFlags&RTMP_LF_AUTH)))
enc = C_AMF_EncodeBoolean(enc, pend, int(r.Link.lFlags&RTMP_LF_AUTH))
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeString((*C.char)(unsafe.Pointer(enc)),
//(*C.char)(unsafe.Pointer(pend)), &r.Link.auth)))
enc = C_AMF_EncodeString(enc, (*byte)(pend), &r.Link.auth)
if enc == nil {
return 0
}
}
if r.Link.extras.o_num != 0 {
for i := 0; i < int(r.Link.extras.o_num); i++ {
//enc = (*byte)(unsafe.Pointer(C.AMFProp_Encode((*C.AMFObjectProperty)(
//incPtr(unsafe.Pointer(&r.Link.extras.o_props), int(unsafe.Sizeof(
//r.Link.extras.o_props)), i)), (*C.char)(unsafe.Pointer(enc)), (*C.char)(
//unsafe.Pointer(pend)))))
enc = C_AMFPropEncode((*C.AMFObjectProperty)(incPtr(unsafe.Pointer(
&r.Link.extras.o_props), int(unsafe.Sizeof(r.Link.extras.o_props)), i)),
enc, pend)
if enc == nil {
return 0
}
}
}
packet.m_nBodySize = C.uint32_t(int(uintptr(decBytePtr(unsafe.Pointer(enc),
int(uintptr(unsafe.Pointer(packet.m_body)))))))
//return int(C.RTMP_SendPacket(r, &packet, 1))
return C_RTMP_SendPacket(r, &packet, 1)
}
// char* AMFPropEncode(AMFOBjectProperty* prop, char* pBufer, char* pBufEnd);
// amf.c +366
func C_AMFPropEncode(p *C.AMFObjectProperty, pBuffer *byte, pBufEnd *byte) *byte {
if p.p_type == AMF_INVALID {
return nil
}
if p.p_type != AMF_NULL && int(uintptr(unsafe.Pointer(pBuffer)))+
int(p.p_name.av_len)+2+1 >= int(
uintptr(unsafe.Pointer(pBufEnd))) {
return nil
}
if p.p_type != AMF_NULL && p.p_name.av_len != 0 {
*indxBytePtr(unsafe.Pointer(pBuffer), 0) = byte(p.p_name.av_len >> 8)
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1))
*indxBytePtr(unsafe.Pointer(pBuffer), 0) = byte(p.p_name.av_len & 0xff)
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1))
memmove(unsafe.Pointer(pBuffer), unsafe.Pointer(p.p_name.av_val),
uintptr(p.p_name.av_len))
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), int(p.p_name.av_len)))
}
switch p.p_type {
case AMF_NUMBER:
pBuffer = C_AMF_EncodeNumber(pBuffer, pBufEnd, float64(p.p_vu.p_number))
case AMF_BOOLEAN:
val := 0
if p.p_vu.p_number != 0 {
val = 1
}
pBuffer = C_AMF_EncodeBoolean(pBuffer, pBufEnd, val)
case AMF_STRING:
pBuffer = C_AMF_EncodeString(pBuffer, pBufEnd, &p.p_vu.p_aval)
case AMF_NULL:
if uintptr(incBytePtr(unsafe.Pointer(pBuffer), 1)) >= uintptr(unsafe.Pointer(
pBufEnd)) {
return nil
}
*(*byte)(unsafe.Pointer(pBuffer)) = AMF_NULL
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1))
case AMF_OBJECT:
pBuffer = C_AMFEncode(&p.p_vu.p_object, pBuffer, pBufEnd)
//pBuffer = (*byte)(unsafe.Pointer(C.AMF_Encode(&p.p_vu.p_object, (*C.char)(
//unsafe.Pointer(pBuffer)), (*C.char)(unsafe.Pointer(pBufEnd)))))
case AMF_ECMA_ARRAY:
pBuffer = C_AMFEncodeEcmaArray(&p.p_vu.p_object, pBuffer, pBufEnd)
//pBuffer = (*byte)(unsafe.Pointer(C.AMF_EncodeEcmaArray(&p.p_vu.p_object, (*C.char)(unsafe.Pointer(pBuffer)), (*C.char)(unsafe.Pointer(pBufEnd)))))
case AMF_STRICT_ARRAY:
//pBuffer = (*byte)(unsafe.Pointer(C.AMF_EncodeArray(&p.p_vu.p_object, (*C.char)(unsafe.Pointer(pBuffer)), (*C.char)(unsafe.Pointer(pBufEnd)))))
pBuffer = C_AMFEncodeArray(&p.p_vu.p_object, pBuffer, pBufEnd)
default:
log.Println("C_AMFPropEncode: invalid type!")
pBuffer = nil
}
return pBuffer
}
// char* AMF_ENCODE(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +891
func C_AMFEncode(obj *C.AMFObject, pBuffer *byte, pBufEnd *byte) *byte {
if uintptr(unsafe.Pointer(pBuffer))+uintptr(4) >= uintptr(unsafe.Pointer(pBufEnd)) {
return nil
}
*pBuffer = AMF_OBJECT
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1))
for i := 0; i < int(obj.o_num); i++ {
res := C_AMFPropEncode((*C.AMFObjectProperty)(incPtr(unsafe.Pointer(
obj.o_props), i, int(unsafe.Sizeof(*obj.o_props)))), pBuffer, pBufEnd)
if res == nil {
log.Println("C_AMFEncode: failed to encode property in index")
break
} else {
pBuffer = res
}
}
if uintptr(incBytePtr(unsafe.Pointer(pBuffer), 3)) >= uintptr(unsafe.Pointer(pBufEnd)) {
return nil
}
pBuffer = C_AMFEncodeInt24(pBuffer, pBufEnd, int32(AMF_OBJECT_END))
return pBuffer
}
// char* AMF_EncodeEcmaArray(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +924
func C_AMFEncodeEcmaArray(obj *C.AMFObject, pBuffer *byte, pBufEnd *byte) *byte {
if int(uintptr(unsafe.Pointer(pBuffer)))+4 >= int(uintptr(unsafe.Pointer(pBufEnd))) {
return nil
}
*pBuffer = AMF_ECMA_ARRAY
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1))
pBuffer = C_AMF_EncodeInt32(pBuffer, pBufEnd, int32(obj.o_num))
for i := 0; i < int(obj.o_num); i++ {
res := C_AMFPropEncode((*C.AMFObjectProperty)(incPtr(unsafe.Pointer(
obj.o_props), i, int(unsafe.Sizeof(*obj.o_props)))), pBuffer, pBufEnd)
if res == nil {
log.Println("C_AMFEncodeEcmaArray: failed to encode property!")
break
} else {
pBuffer = res
}
}
if int(uintptr(unsafe.Pointer(pBuffer)))+3 >= int(uintptr(unsafe.Pointer(pBufEnd))) {
return nil
}
pBuffer = C_AMFEncodeInt24(pBuffer, pBufEnd, AMF_OBJECT_END)
return pBuffer
}
// char* AMF_EncodeArray(AMFObject* obj, char* pBuffer, char* pBufEnd);
// amf.c +959
func C_AMFEncodeArray(obj *C.AMFObject, pBuffer *byte, pBufEnd *byte) *byte {
if int(uintptr(unsafe.Pointer(pBuffer)))+4 >= int(uintptr(unsafe.Pointer(pBufEnd))) {
return nil
}
*pBuffer = AMF_STRICT_ARRAY
pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1))
pBuffer = C_AMF_EncodeInt32(pBuffer, pBufEnd, int32(obj.o_num))
for i := 0; i < int(obj.o_num); i++ {
res := C_AMFPropEncode((*C.AMFObjectProperty)(incPtr(unsafe.Pointer(
obj.o_props), i, int(unsafe.Sizeof(*obj.o_props)))), pBuffer, pBufEnd)
if res == nil {
log.Println("C_AMFEncodeEcmaArray: failed to encode property!")
break
} else {
pBuffer = res
}
}
return pBuffer
}
// int RTMP_ConnectStream(RTMP* r, int seekTime);
// rtmp.c +1099
func C_RTMP_ConnectStream(r *C.RTMP, seekTime int32) int {
var packet C.RTMPPacket
memset((*byte)(unsafe.Pointer(&packet)), 0, int(unsafe.Sizeof(packet)))
if seekTime > 0 {
r.Link.seekTime = C.int(seekTime)
}
r.m_mediaChannel = 0
// TODO: read packet
for r.m_bPlaying == 0 && C_RTMP_IsConnected(r) != 0 &&
C.RTMP_ReadPacket(r, &packet) != 0 {
// TODO: port is ready
if C_RTMPPacket_IsReady(&packet) != 0 {
if packet.m_nBodySize == 0 {
continue
}
if packet.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
packet.m_packetType == RTMP_PACKET_TYPE_VIDEO ||
packet.m_packetType == RTMP_PACKET_TYPE_INFO {
log.Println("C_RTMP_ConnectStream: got packet before play()! Ignoring.")
C.RTMPPacket_Free(&packet)
continue
}
// TODO: port this
C.RTMP_ClientPacket(r, &packet)
C.RTMPPacket_Free(&packet)
}
}
return int(r.m_bPlaying)
}
// int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet);
// rtmp.c +3550
func C_RTMP_ReadPacket(r *C.RTMP, packet *C.RTMPPacket) int32 {
var hbuf [RTMP_MAX_HEADER_SIZE]uint8
memset((*byte)(&hbuf[0]), 0, RTMP_MAX_HEADER_SIZE)
var header *byte
header = (*byte)(unsafe.Pointer(&hbuf[0]))
var nSize, hSize, nToRead, nChunk int32
var extendedTimestamp int32
if C_ReadN(r, (*byte)(&hbuf[0]), 1) == 0 {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!")
return 0
}
packet.m_headerType = C.uint8_t((hbuf[0] & 0xc0) >> 6)
packet.m_nChannel = C.int(hbuf[0] & 0x3f)
header = (*byte)(incBytePtr(unsafe.Pointer(header), 1))
switch {
case packet.m_nChannel == 0:
if C_ReadN(r, (*byte)(&hbuf[1]), 1) != 1 {
log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.")
return 0
}
packet.m_nChannel = C.int(hbuf[1])
packet.m_nChannel += 64
header = (*byte)(incBytePtr(unsafe.Pointer(header), 1))
case packet.m_nChannel == 1:
var tmp int32
if C_ReadN(r, (*byte)(&hbuf[1]), 2) != 2 {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte")
return 0
}
tmp = int32((hbuf[2] << 8) + hbuf[1])
packet.m_nChannel = C.int(tmp + 64)
header = (*byte)(incBytePtr(unsafe.Pointer(header), 2))
}
nSize = int32(packetSize[packet.m_headerType])
if packet.m_nChannel >= r.m_channelsAllocatedIn {
var n int32 = int32(packet.m_nChannel + 10)
timestamp := (*int32)(C.realloc(unsafe.Pointer(r.m_channelTimestamp),
C.size_t(int32(unsafe.Sizeof(n))*n)))
var packetPtr *C.RTMPPacket
packets := (**C.RTMPPacket)(C.realloc(unsafe.Pointer(r.m_vecChannelsIn),
C.size_t(int32(unsafe.Sizeof(packetPtr))*n)))
if timestamp == nil {
C.free(unsafe.Pointer(r.m_channelTimestamp))
}
if packets == nil {
C.free(unsafe.Pointer(r.m_vecChannelsIn))
}
r.m_channelTimestamp = (*C.int)(timestamp)
r.m_vecChannelsIn = packets
if timestamp == nil || packets == nil {
r.m_channelsAllocatedIn = 0
return 0
}
memset((*byte)(incPtr(unsafe.Pointer(r.m_channelTimestamp),
int(r.m_channelsAllocatedIn), int(unsafe.Sizeof(*r.m_channelTimestamp)))),
0, int(4*int32((n-int32(r.m_channelsAllocatedIn)))))
memset((*byte)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), int(
r.m_channelsAllocatedIn), int(unsafe.Sizeof(*r.m_vecChannelsIn)))), 0,
int(int32(unsafe.Sizeof(*packets))*(n-int32(r.m_channelsAllocatedIn))))
r.m_channelsAllocatedIn = C.int(n)
}
switch {
case nSize == RTMP_LARGE_HEADER_SIZE:
packet.m_hasAbsTimestamp = 1
case nSize < RTMP_LARGE_HEADER_SIZE:
var tmpPacketPtr *C.RTMPPacket
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
int(packet.m_nChannel), int(unsafe.Sizeof(tmpPacketPtr)))) != nil {
var tmpPacket C.RTMPPacket
memmove(unsafe.Pointer(packet), unsafe.Pointer(
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
int(packet.m_nChannel), int(unsafe.Sizeof(tmpPacketPtr))))),
unsafe.Sizeof(tmpPacket))
}
}
nSize--
if nSize > 0 && C_ReadN(r, header, int(nSize)) != int(nSize) {
log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.")
return 0
}
hSize = int32(uintptr(incBytePtr(decBytePtr(unsafe.Pointer(header), int(uintptr(
unsafe.Pointer(&hbuf[0])))), int(nSize))))
if nSize >= 3 {
packet.m_nTimeStamp = C.uint32_t(C_AMF_DecodeInt24(header))
if nSize >= 6 {
packet.m_nBodySize = C.uint32_t(C_AMF_DecodeInt24((*byte)(incBytePtr(
unsafe.Pointer(header), 3))))
packet.m_nBytesRead = 0
if nSize > 6 {
packet.m_packetType = C.uint8_t(*indxBytePtr(unsafe.Pointer(header), 6))
if nSize == 11 {
// TODO: port this
packet.m_nInfoField2 = C.int32_t(C.DecodeInt32LE((*C.char)(incBytePtr(
unsafe.Pointer(header), 7))))
}
}
}
}
extendedTimestamp = 0
if packet.m_nTimeStamp == 0xffffff {
extendedTimestamp = 1
}
if extendedTimestamp != 0 {
if C_ReadN(r, (*byte)(incBytePtr(unsafe.Pointer(header), int(nSize))), 4) != 4 {
log.Println("RTMPRead_Packet: Failed to read extended timestamp")
return 0
}
// TODO: port this
packet.m_nTimeStamp = C.uint32_t(C.AMF_DecodeInt32((*C.char)(incBytePtr(
unsafe.Pointer(header), int(nSize)))))
hSize += 4
}
if packet.m_nBodySize > 0 && packet.m_body == nil {
// TODO: port this
if C.RTMPPacket_Alloc(packet, packet.m_nBodySize) == 0 {
log.Println("RTMPRead_Packet: failed to allocate packet")
return 0
}
packet.m_headerType = C.uint8_t((hbuf[0] & 0xc0) >> 6)
}
nToRead = int32(packet.m_nBodySize - packet.m_nBytesRead)
nChunk = int32(r.m_inChunkSize)
if nToRead < nChunk {
nChunk = nToRead
}
if packet.m_chunk != nil {
packet.m_chunk.c_headerSize = C.int(hSize)
memmove(unsafe.Pointer(&packet.m_chunk.c_header[0]), unsafe.Pointer(&hbuf[0]),
uintptr(hSize))
packet.m_chunk.c_chunk = (*C.char)(incBytePtr(unsafe.Pointer(packet.m_body),
int(packet.m_nBytesRead)))
packet.m_chunk.c_chunkSize = C.int(nChunk)
}
if C_ReadN(r, (*byte)(incBytePtr(unsafe.Pointer(packet.m_body), int(packet.m_nBytesRead))),
int(nChunk)) != int(nChunk) {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body")
return 0
}
packet.m_nBytesRead += C.uint32_t(nChunk)
var tmpPktPtr *C.RTMPPacket
// keep the packet as ref for other packets on this channel
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
int(packet.m_nChannel), int(unsafe.Sizeof(tmpPktPtr)))) == nil {
var tmpPkt C.RTMPPacket
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), int(packet.m_nChannel),
int(unsafe.Sizeof(tmpPktPtr)))) = (*C.RTMPPacket)(C.malloc(C.size_t(unsafe.Sizeof(tmpPkt))))
}
memmove(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(
r.m_vecChannelsIn), int(packet.m_nChannel), int(unsafe.Sizeof(tmpPktPtr))))),
unsafe.Pointer(packet), unsafe.Sizeof(tmpPktPtr))
if extendedTimestamp != 0 {
(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(
r.m_vecChannelsIn), int(packet.m_nChannel),
int(unsafe.Sizeof(tmpPktPtr))))).m_nTimeStamp = 0xffffff
}
// TODO: port this
if C_RTMPPacket_IsReady(packet) != 0 {
if packet.m_hasAbsTimestamp == 0 {
// timestamps seem to always be relative
packet.m_nTimeStamp += *(*C.uint32_t)(incPtr(unsafe.Pointer(r.m_channelTimestamp),
int(packet.m_nChannel), 4))
}
*(*C.uint32_t)(incPtr(unsafe.Pointer(r.m_channelTimestamp), int(packet.m_nChannel),
4)) = packet.m_nTimeStamp
(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), int(packet.m_nChannel),
int(unsafe.Sizeof(*r.m_vecChannelsIn))))).m_body = nil
(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), int(packet.m_nChannel),
int(unsafe.Sizeof(*r.m_vecChannelsIn))))).m_nBytesRead = 0
(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), int(packet.m_nChannel),
int(unsafe.Sizeof(*r.m_vecChannelsIn))))).m_hasAbsTimestamp = 0
} else {
packet.m_body = nil /* so it won't be erased on free */
}
return 1
}
// #define RTMPPacket_IsReady(a)
// rtmp.h +142
func C_RTMPPacket_IsReady(p *C.RTMPPacket) int {
if p.m_nBytesRead == p.m_nBodySize {
return 1
}
return 0
}
func endSession(rtmp *C.RTMP) uint32 {
if rtmp == nil {
return 3
}
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return 0
}
// int RTMP_Write(RTMP* r, const char* buf, int size);
// rtmp.c +5095
func C_RTMP_Write(r *C.RTMP, data []byte) int {
buf := sliceToPtr(data)
// TODO: port RTMPPacket
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
s2 := size
var ret, num int
pkt.m_nChannel = 0x04
pkt.m_nInfoField2 = C.int32_t(r.m_stream_id)
for s2 != 0 {
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
s2 -= 13
}
pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0))
buf = incBytePtr(buf, 1)
pkt.m_nBodySize = C.uint32_t(C_AMF_DecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp = C.uint32_t(C_AMF_DecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24
buf = incBytePtr(buf, 4)
s2 -= 11
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_nBodySize += 16
}
} else {
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
}
// TODO: Port this
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
return 0
}
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
enc = unsafe.Pointer(C_AMF_EncodeString((*byte)(enc), (*byte)(pend), &setDataFrame))
pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) -
uintptr(unsafe.Pointer(pkt.m_body)))))
}
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
if num > s2 {
num = s2
}
memmove(enc, buf, uintptr(num))
pkt.m_nBytesRead += C.uint32_t(num)
s2 -= num
buf = incBytePtr(buf, num)
if pkt.m_nBytesRead == pkt.m_nBodySize {
// TODO: Port this
ret = C_RTMP_SendPacket(r, pkt, 0)
// TODO: Port this
C.RTMPPacket_Free(pkt)
pkt.m_nBytesRead = 0
if ret == 0 {
return -1
}
buf = incBytePtr(buf, 4)
s2 -= 4
if s2 < 0 {
break
}
}
}
return size + s2
}
// int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue);
// rtmp.c +3896
func C_RTMP_SendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
var prevPacket *C.RTMPPacket
last := 0
var nSize, hSize, cSize, nChunkSize int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
var goHbuf [RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
var t int32
var packets unsafe.Pointer
if packet.m_nChannel >= r.m_channelsAllocatedOut {
n := int(packet.m_nChannel + 10)
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut), C.size_t(
unsafe.Sizeof(packet)*uintptr(n)))
//packets = realloc(unsafe.Pointer(r.m_vecChannelsOut),
//int(unsafe.Sizeof(packet)*uintptr(n)))
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)*
uintptr(n-int(r.m_channelsAllocatedOut))))
//memset((*byte)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(
// r.m_channelsAllocatedOut), int(unsafe.Sizeof(packet)))), 0, int(
// unsafe.Sizeof(packet)*uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = packetSize[int(packet.m_headerType)]
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize)
hend = unsafe.Pointer(packet.m_body)
} else {
header = incBytePtr(hbuf, 6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
hend = incBytePtr(hbuf, RTMP_MAX_HEADER_SIZE)
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header, 4)
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = c
hptr = incBytePtr(hptr, 1)
if cSize != 0 {
tmp := packet.m_nChannel - 64
*(*byte)(hptr) = byte(tmp & 0xff)
hptr = incBytePtr(hptr, 1)
if cSize == 2 {
*(*byte)(hptr) = byte(tmp >> 8)
hptr = incBytePtr(hptr, 1)
}
}
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
hptr = unsafe.Pointer(C_AMFEncodeInt24((*byte)(hptr), (*byte)(hend), res))
}
if nSize > 4 {
hptr = unsafe.Pointer(C_AMFEncodeInt24((*byte)(hptr), (*byte)(hend), (int32(packet.m_nBodySize))))
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = incBytePtr(hptr, 1)
}
if nSize > 8 {
// TODO: port this
hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr),
C.int(packet.m_nInfoField2))))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(C_AMF_EncodeInt32((*byte)(hptr), (*byte)(hend), (int32)(t)))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("C_RTMP_SendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
if tbuf != nil {
//memmove(toff, header, uintptr(nChunkSize + hSize))
copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header,
int(nChunkSize+hSize)))
toff = incBytePtr(toff, nChunkSize+hSize)
} else {
// TODO: port this
wrote = int(C_WriteN(r, header, nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
buffer = incBytePtr(buffer, nChunkSize)
hSize = 0
if nSize > 0 {
header = decBytePtr(buffer, 1)
hSize = 1
if cSize != 0 {
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header, 4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
*indxBytePtr(header, 1) = byte(tmp & 0xff)
if cSize == 2 {
*indxBytePtr(header, 2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
extendedTimestamp := incBytePtr(header, 1+cSize)
C_AMF_EncodeInt32((*byte)(extendedTimestamp),
(*byte)(incBytePtr(extendedTimestamp, 4)), (int32)(t))
}
}
}
if tbuf != nil {
wrote := int(C_WriteN(r, tbuf, int(uintptr(decBytePtr(toff,
int(uintptr(unsafe.Pointer(tbuf))))))))
//C.free(tbuf)
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
var method C.AVal
var ptr unsafe.Pointer
ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1)
//C.AMF_DecodeString((*C.char)(ptr), &method)
C_AMF_DecodeString((*byte)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
ptr = incBytePtr(ptr, 3+int(method.av_len))
//txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
txn = int(C_AMF_DecodeNumber((*byte)(ptr)))
C_AV_queue(&r.m_methodCalls, (*int32)(unsafe.Pointer(&r.m_numCalls)),
&method, int32(txn))
//C.AV_queue(&r.m_methodCalls, (*C.int)(unsafe.Pointer(&r.m_numCalls)), &method,
//C.int(txn))
}
}
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil {
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) =
(*C.RTMPPacket)(allocate(unsafe.Sizeof(*packet)))
}
memmove(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
uintptr(unsafe.Sizeof(*packet)))
return 1
}
// int WriteN(RTMP* r, const char* buffer, int n);
// rtmp.c +1502
func C_WriteN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
// TODO: port this if necessary
nBytes = int(C_RTMPSockBuf_Send(&r.m_sb, (*byte)(ptr), int32(n)))
if nBytes < 0 {
if debugMode {
log.Println("C_WriteN, RTMP send error")
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
ptr = incBytePtr(ptr, nBytes)
}
if n == 0 {
return 1
}
return 0
}
const length = 512
var RTMPT_cmds = []string{
"open",
"send",
"idle",
"close",
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
func C_RTMPSockBuf_Send(sb *C.RTMPSockBuf, buf *byte, l int32) int32 {
return int32(C.send(sb.sb_socket, unsafe.Pointer(buf), C.size_t(l), 0))
}
// void AV_queue(RTMP_METHOD** vals, int* num, C.AVal* av, int txn);
// rtmp.c +2414
func C_AV_queue(vals **C.RTMP_METHOD, num *int32, av *C.AVal, txn int32) {
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
//*vals = (*C.RTMP_METHOD)(realloc(unsafe.Pointer(*vals), int((*num+16)*
//int(unsafe.Sizeof(*(*vals))))))
*vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)*
int32(unsafe.Sizeof(*(*vals))))))
}
tmp := C.malloc(C.size_t(av.av_len + 1))
//tmp := allocate(uintptr(av.av_len + 1))
memmove(tmp, unsafe.Pointer(av.av_val), uintptr(av.av_len))
*indxBytePtr(tmp, int(av.av_len)) = '\000'
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), int(*num),
int(unsafe.Sizeof(*(*vals)))))).num = C.int(txn)
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), int(*num),
int(unsafe.Sizeof(*(*vals)))))).name.av_len = av.av_len
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), int(*num),
int(unsafe.Sizeof(*(*vals)))))).name.av_val = (*C.char)(tmp)
(*num)++
}
// char* AMF_EncodeNamedNumber(char* output, char* outend, const C.AVal* strName, double dVal);
// amf.c +286
func C_AMF_EncodeNamedNumber(output *byte, outend *byte, strName *C.AVal, dVal float64) *byte {
if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
output = C_AMF_EncodeInt16(output, outend, int16(strName.av_len))
memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len))
output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len)))
return C_AMF_EncodeNumber(output, outend, dVal)
}
// char* AMF_EncodeNamedBoolean(char* output, char* outend, const C.AVal* strname, int bVal);
// amf.c +299
func C_AMF_EncodeNamedBoolean(output *byte, outend *byte, strName *C.AVal, bVal int) *byte {
if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
output = C_AMF_EncodeInt16(output, outend, int16(strName.av_len))
memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len))
output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len)))
return C_AMF_EncodeBoolean(output, outend, bVal)
}
// char* AMF_EncodeBoolean(char* output, char* outend, int bVal);
// amf.c +260
func C_AMF_EncodeBoolean(output *byte, outend *byte, bVal int) *byte {
if int(uintptr(unsafe.Pointer(output)))+2 > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
*(*byte)(unsafe.Pointer(output)) = AMF_BOOLEAN
output = (*byte)(incBytePtr(unsafe.Pointer(output), 1))
val := byte(0x01)
if bVal == 0 {
val = byte(0x00)
}
*(*byte)(unsafe.Pointer(output)) = val
output = (*byte)(incBytePtr(unsafe.Pointer(output), 1))
return output
}
// char* AMF_EncodeNumber(char* output, char* outend, double dVal);
// amf.c +199
func C_AMF_EncodeNumber(output *byte, outend *byte, dVal float64) *byte {
if int(uintptr(unsafe.Pointer(output)))+1+8 > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
// TODO: port this
*(*byte)(unsafe.Pointer(output)) = C.AMF_NUMBER
output = (*byte)(incBytePtr(unsafe.Pointer(output), 1))
// NOTE: here we are assuming little endian for both byte order and float
// word order
var ci, co *uint8
ci = (*uint8)(unsafe.Pointer(&dVal))
co = (*uint8)(unsafe.Pointer(output))
for i := 0; i < 8; i++ {
*indxBytePtr(unsafe.Pointer(co), i) = *indxBytePtr(unsafe.Pointer(ci), 7-i)
}
return (*byte)(incBytePtr(unsafe.Pointer(output), 8))
}
// double AMF_DecodeNumber(const char* data);
// amf.c +82
func C_AMF_DecodeNumber(data *byte) float64 {
var dVal float64
var ci, co *uint8
ci = (*uint8)(unsafe.Pointer(data))
co = (*uint8)(unsafe.Pointer(&dVal))
for i := 0; i < 8; i++ {
*indxBytePtr(unsafe.Pointer(co), i) = *indxBytePtr(unsafe.Pointer(ci), 7-i)
}
return dVal
}
// char* AMF_EncodeNamedString(char* output, char* outend, const C.AVal* strName, const C.AVal* strValue);
// amf.c +273
func C_AMF_EncodeNamedString(output *byte, outend *byte, strName *C.AVal, strValue *C.AVal) *byte {
if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
output = C_AMF_EncodeInt16(output, outend, int16(strName.av_len))
memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len))
output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len)))
return C_AMF_EncodeString(output, outend, strValue)
}
// void AMF_DecodeString(const char* data, C.AVal* bv);
// amf.c +68
func C_AMF_DecodeString(data *byte, bv *C.AVal) {
dataPtr := unsafe.Pointer(data)
//bv.av_len = C.int(C.AMF_DecodeInt16((*C.char)(dataPtr)))
bv.av_len = C.int(C_AMF_DecodeInt16((*byte)(dataPtr)))
if bv.av_len > 0 {
bv.av_val = (*C.char)(incBytePtr(dataPtr, 2))
} else {
bv.av_val = nil
}
}
// unsigned short AMF_DecodeInt16(const char* data);
// amf.c +41
func C_AMF_DecodeInt16(data *byte) uint16 {
c := unsafe.Pointer(data)
return uint16(*(*uint8)(c)<<8 | *(*byte)(incBytePtr(c, 1)))
}
// char* AMF_EncodeInt24(char* output, char* outend, int nVal);
// amf.c +149
func C_AMFEncodeInt24(output *byte, outend *byte, nVal int32) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+3 > uintptr(outendPtr) {
// length < 3
return nil
}
// Assign output[2]
third := (*byte)(incBytePtr(outputPtr, 2))
*third = (byte)(nVal & 0xff)
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal >> 8)
// Assign output[0]
*output = (byte)(nVal >> 16)
return (*byte)(incBytePtr(outputPtr, 3))
}
// unsigned int AMF_DecodeInt24(const char* data);
// amf.c +50
func C_AMF_DecodeInt24(data *byte) uint32 {
// TODO Understand logic and simplify
c := (*uint8)(unsafe.Pointer(data))
dst := uint32(int32(*c) << 16)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(1))*unsafe.Sizeof(*c))))) << 8)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(2))*unsafe.Sizeof(*c))))))
return dst
}
// char* AMF_EncodeString(char* output, char* outend, const C.AVal* bv);
// amf.c +174
func C_AMF_EncodeString(output *byte, outend *byte, bv *C.AVal) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) >
uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) >
uintptr(outendPtr) {
return nil
}
if bv.av_len < 65536 {
*(*byte)(outputPtr) = AMF_STRING
outputPtr = incBytePtr(outputPtr, 1)
outputPtr = unsafe.Pointer(C.AMF_EncodeInt16((*C.char)(outputPtr), (*C.char)(
outendPtr), C.short(bv.av_len)))
//outputPtr = unsafe.Pointer(C_AMF_EncodeInt16((*byte)(outputPtr),
//(*byte)(outendPtr), (int16)(bv.av_len)))
} else {
*(*byte)(outputPtr) = AMF_LONG_STRING
outputPtr = incBytePtr(outputPtr, 1)
outputPtr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(outputPtr), (*C.char)(
outendPtr), C.int(bv.av_len)))
//outputPtr = unsafe.Pointer(C_AMF_EncodeInt32((*byte)(outputPtr),
//(*byte)(outendPtr), (int32)(bv.av_len)))
}
memmove(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), uintptr(bv.av_len))
//C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len))
outputPtr = incBytePtr(outputPtr, int(bv.av_len))
return (*byte)(outputPtr)
}
// char* AMF_EncodeInt16(char* output, char* outend, short nVal);
// amf.c +138
func C_AMF_EncodeInt16(output *byte, outend *byte, nVal int16) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+2 > uintptr(outendPtr) {
// length < 2
return nil
}
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal & 0xff)
// Assign output[0]
*output = (byte)(nVal >> 8)
return (*byte)(incBytePtr(outputPtr, 2))
}
// char* AMF_EncodeInt32(char* output, char* outend, int nVal);
// amf.c +161
func C_AMF_EncodeInt32(output *byte, outend *byte, nVal int32) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+4 > uintptr(outendPtr) {
// length < 4
return nil
}
// Assign output[3]
forth := (*byte)(incBytePtr(outputPtr, 3))
*forth = (byte)(nVal & 0xff)
// Assign output[2]
third := (*byte)(incBytePtr(outputPtr, 2))
*third = (byte)(nVal >> 8)
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal >> 16)
// Assign output[0]
*output = (byte)(nVal >> 24)
return (*byte)(incBytePtr(outputPtr, 4))
}
func realloc(ptr unsafe.Pointer, size int) unsafe.Pointer {
dest := allocate(uintptr(size))
if ptr != nil {
memmove(dest, ptr, uintptr(size))
}
return dest
}
func memmove(to, from unsafe.Pointer, n uintptr) {
copy(ptrToSlice(to, int(n)), ptrToSlice(from, int(n)))
}
// TODO: write test for this func
func memcmp(a, b unsafe.Pointer, size int) int {
for i := 0; i < size; i++ {
aValue := *indxBytePtr(a, i)
bValue := *indxBytePtr(b, i)
if aValue != bValue {
if aValue < bValue {
return -1
} else {
return 1
}
}
}
return 0
}
func memset(ptr *byte, val int, num int) {
for i := 0; i < num; i++ {
*indxBytePtr(unsafe.Pointer(ptr), int(i)) = byte(uint8(val))
}
}
func strLen(str string) int {
return len(str)
}
// wrapper for converting byte pointer to unsafe.Pointer
func bToUP(b *byte) unsafe.Pointer {
return unsafe.Pointer(b)
}
// wrapper for converting slice to unsafe.pointer
func sToUP(b []byte) unsafe.Pointer {
return unsafe.Pointer(&b[0])
}
// Creates a new C style string from a go string
func goStrToCStr(str string) *byte {
l := len(str)
slice := make([]byte, l+1)
ptr := unsafe.Pointer(&[]byte(str)[0])
for i := 0; i < l; i++ {
slice[i] = *indxBytePtr(ptr, i)
}
slice[l] = '\000'
return &slice[0]
}
// Duplicates a string given as a byte pointer
func strdup(str *byte) *byte {
length := strlen(str)
newMem := make([]byte, length+1)
oldMem := ptrToSlice(unsafe.Pointer(str), int(length+1))
copy(newMem, oldMem)
return &newMem[0]
}
// Gets the length of the string found at str - length is number of chars
// between start and terminating null char. Returns -1 if a null char is not
// found before a count of 1000
func strlen(str *byte) int32 {
var ptr *byte
for i := 0; i < 1000; i++ {
ptr = indxBytePtr(unsafe.Pointer(str), i)
if *ptr == '\000' {
return int32(i)
}
}
return int32(-1)
}
// Returns the pointer where the first occurance of val is located in a string
// which is terminated by a null char. Returns nil if null char is not found
// before a count of 10000
func strchr(str *byte, val byte) *byte {
var ptr *byte
for i := 0; i < 1000; i++ {
ptr = indxBytePtr(unsafe.Pointer(str), i)
if *ptr == val {
return ptr
}
if *ptr == '\000' {
break
}
}
return nil
}
// Creates mem of the size noOfBytes. returns as unsafe pointer
func allocate(nOfBytes uintptr) unsafe.Pointer {
mem := make([]byte, int(nOfBytes))
return unsafe.Pointer(&mem[0])
}
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) *byte {
return (*byte)(incPtr(ptr, inc, 1))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 {
return (*int32)(incPtr(ptr, inc, 4))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 {
return (*int64)(incPtr(ptr, inc, 8))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, 1)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, 4)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, 8)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, 1)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, 4)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, 8)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
if len(data) == 0 {
return nil
}
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
// C.AVal is in amf.h
// See AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}