av/rtmp/rtmp.go

1985 lines
53 KiB
Go

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -L/usr/local/lib -lrtmp
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
} RTMPTCmd;
typedef struct sockaddr_in sockaddr_in;
typedef struct sockaddr sockaddr;
int add_addr_info(struct sockaddr_in *service, AVal *host, int port);
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int writeN(RTMP *r, const char *buffer, int n);
int EncodeInt32LE(char *output, int nVal);
int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len);
*/
import "C"
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"reflect"
"strconv"
"unsafe"
"github.com/chamaken/cgolmnl/inet"
)
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
AMF_NUMBER = iota
AMF_BOOLEAN
AMF_STRING
AMF_OBJECT
AMF_MOVIECLIP /* reserved, not used */
AMF_NULL
AMF_UNDEFINED
AMF_REFERENCE
AMF_ECMA_ARRAY
AMF_OBJECT_END
AMF_STRICT_ARRAY
AMF_DATE
AMF_LONG_STRING
AMF_UNSUPPORTED
AMF_RECORDSET /* reserved, not used */
AMF_XML_DOC
AMF_TYPED_OBJECT
AMF_AVMPLUS /* switch to AMF3 */
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_SHARED_OBJECT = 0x13
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
RTMP_MAX_HEADER_SIZE = 18
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
RTMP_PROTOCOL_UNDEFINED = -1
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_CHANNELS = 65600
RTMP_SWF_HASHLEN = 32
RTMP_SIG_SIZE = 1536
RTMP_LARGE_HEADER_SIZE = 12
AMF_INVALID = 0xff
)
const (
minDataSize = 11
debugMode = false
)
const (
byteSize = 1
int32Size = 4
int64Size = 8
)
// av_setDataFrame is a static const global in rtmp.c
var (
setDataFrame = AVC("@setDataFrame")
av_connect = AVC("connect")
av_app = AVC("app")
av_type = AVC("type")
av_nonprivate = AVC("nonprivate")
av_flashVer = AVC("flashVer")
av_swfUrl = AVC("swfUrl")
av_tcUrl = AVC("tcUrl")
av_fpad = AVC("fpad")
av_capabilities = AVC("capabilities")
av_audioCodecs = AVC("audioCodecs")
av_videoCodecs = AVC("videoCodecs")
av_videoFunction = AVC("videoFunction")
av_pageUrl = AVC("pageUrl")
av_objectEncoding = AVC("objectEncoding")
)
var packetSize = [...]int{12, 8, 4, 1}
var RTMPProtocolStringsLower = [...]string{
"rtmp",
"rtmpt",
"rtmpe",
"rtmpte",
"rtmps",
"rtmpts",
"",
"",
"rtmfp",
}
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
rtmp *C.RTMP
url string
timeout uint
}
type RTMP struct {
m_inChunkSize int
m_outChunkSize int
m_nBWCheckCounter int
m_nBytesIn int
m_nBytesInSent int
m_nBufferMS int
m_stream_id int
m_mediaChannel int
m_mediaStamp uint32
m_pauseStamp uint32
m_pausing int
m_nServerBw int
m_nClientBw int
m_nClientBw2 uint8
m_bPlaying uint8
m_bSendEncoding uint8
m_bSendCounter uint8
m_numInvokes int
m_numCalls int
m_methodCalls *RTMP_METHOD
m_channelsAllocatedIn int
m_channelsAllocatedOut int
m_vecChannelsIn **RTMPPacket
m_vecChannelsOut **RTMPPacket
m_channelTimestamp *int
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int
m_polling int
m_resplen int
m_unackd int
m_clientID AVal
m_read RTMP_READ
m_write RTMPPacket
m_sb RTMPSockBuf
Link RTMP_LNK
}
type RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp uint8
m_nChannel int
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *RTMPChunk
m_body *byte
}
type RTMP_METHOD struct {
name AVal
num int
}
type AVal struct {
av_val *byte
av_len int
}
type RTMP_READ struct {
buf *byte
bufpos *byte
buflen uint
timestamp uint32
dataType uint8
flags uint8
status int8
initialFrameType uint8
nResumeTS uint32
metaHeader *byte
initialFrame *byte
nMetaHeaderSize uint32
nInitialFrameSize uint32
nIgnoredFrameCounter uint32
nIgnoredFlvFrameCounter uint32
}
type RTMPSockBuf struct {
sb_socket int
sb_size int
sb_start *byte
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout int
sb_ssl uintptr
}
type RTMPChunk struct {
c_headerSize int
c_chunkSize int
c_chunk *byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
type ushort [2]byte
type RTMP_LNK struct {
hostname AVal
sockshost AVal
playpath0 AVal
playpath AVal
tcUrl AVal
swfUrl AVal
pageUrl AVal
app AVal
auth AVal
flashVer AVal
subscribepath AVal
usherToken AVal
token AVal
pubUser AVal
pubPasswd AVal
extras AMFObject
edepth int
seekTime int
stopTime int
lFlags int
swfAge int
protocol int
timeout int
pFlags int
socksport ushort
port ushort
}
type AMFObject struct {
o_num int
o_props *C.AMFObjectProperty
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
func (s *session) Open() error {
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
var err error
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout))
if s.rtmp == nil {
return err
}
return nil
}
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
ret := endSession(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
// if rtmpIsConnected(s.rtmp) == 0 {
if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
}
// if rtmpWrite(s.rtmp, data) == 0 {
if C.RTMP_Write(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.int(len(data))) == 0 {
return 0, Err(2)
}
return len(data), nil
}
func rtmpIsConnected(r *C.RTMP) int {
if r.m_sb.sb_socket != -1 {
return 1
}
return 0
}
func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) {
connect_timeout := C.int(timeout)
rtmp = rtmpAlloc()
//rtmp = C.RTMP_Alloc()
rtmpInit(rtmp)
//C.RTMP_Init(rtmp)
rtmp.Link.timeout = connect_timeout
if rtmpSetupUrl(rtmp, u) == 0 {
// if C.RTMP_SetupURL(rtmp, C.CString(u)) == 0 {
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to setup URL!")
}
rtmpEnableWrite(rtmp)
//C.RTMP_EnableWrite(rtmp)
rtmpSetBufferMS(rtmp, 3600*1000)
//C.RTMP_SetBufferMS(rtmp, 3600*1000)
if rtmpConnect(rtmp, nil) == 0 {
//if C.RTMP_Connect(rtmp, nil) == 0 {
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect!")
}
// TODO: port this
// if rtmpConnectStream(rtmp, 0) == 0 {
if C.RTMP_ConnectStream(rtmp, 0) == 0 {
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect stream!")
}
return rtmp, nil
}
func rtmpAlloc() *C.RTMP {
var r C.RTMP
//return (*C.RTMP)(C.malloc(C.size_t(unsafe.Sizeof(r))))
return (*C.RTMP)(allocate(unsafe.Sizeof(r)))
}
func rtmpInit(r *C.RTMP) {
r.m_sb.sb_socket = -1
r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE
r.m_nBufferMS = 30000
r.m_nClientBW = 2500000
r.m_nClientBW2 = 2
r.m_nServerBW = 2500000
r.m_fAudioCodecs = 3191.0
r.m_fVideoCodecs = 252.0
r.Link.timeout = 30
r.Link.swfAge = 30
}
func rtmpSetupUrl(r *C.RTMP, u string) int32 {
url := goStrToCStr(u)
var ret, len int32
var port uint32
port = 0
len = strlen(url)
// TODO: port this
ret = int32(C.RTMP_ParseURL((*C.char)(unsafe.Pointer(url)), &r.Link.protocol, &r.Link.hostname,
(*C.uint)(&port), &r.Link.playpath0, &r.Link.app))
if ret == 0 {
return ret
}
r.Link.port = C.ushort(port)
r.Link.playpath = r.Link.playpath0
if r.Link.tcUrl.av_len == 0 {
r.Link.tcUrl.av_val = (*C.char)(unsafe.Pointer(url))
if r.Link.app.av_len != 0 {
if int(uintptr(unsafe.Pointer(r.Link.app.av_val))) <
int(uintptr(incBytePtr(unsafe.Pointer(url), int(len)))) {
r.Link.tcUrl.av_len = C.int(int(r.Link.app.av_len) +
int(uintptr(decBytePtr(unsafe.Pointer(r.Link.app.av_val),
int(uintptr(unsafe.Pointer(url)))))))
} else {
len = int32(r.Link.hostname.av_len) + int32(r.Link.app.av_len) +
int32(unsafe.Sizeof("rtmpte://:65535/"))
r.Link.tcUrl.av_val = (*C.char)(allocate(uintptr(len)))
hostname := string(ptrToSlice(unsafe.Pointer(r.Link.hostname.av_val),
int(r.Link.hostname.av_len)))
app := string(ptrToSlice(unsafe.Pointer(r.Link.app.av_val),
int(r.Link.app.av_len)))
fString := fmt.Sprintf("%v://%v:%v/%v",
RTMPProtocolStringsLower[r.Link.protocol], hostname, r.Link.port, app)
r.Link.tcUrl.av_val = (*C.char)(bToUP(goStrToCStr(fString)))
r.Link.tcUrl.av_len = C.int(strLen(RTMPProtocolStringsLower[r.Link.protocol]) +
strLen(string("://")) + strLen(hostname) + strLen(string(":")) +
strLen(strconv.Itoa(int(r.Link.port))) + strLen(string("/")) + strLen(app))
r.Link.lFlags |= RTMP_LF_FTCU
}
} else {
r.Link.tcUrl.av_len = C.int(strlen(url))
}
}
socksSetup(r, &r.Link.sockshost)
if r.Link.port == 0 {
switch {
case (r.Link.protocol & RTMP_FEATURE_SSL) != 0:
r.Link.port = 433
case (r.Link.protocol & RTMP_FEATURE_HTTP) != 0:
r.Link.port = 80
default:
r.Link.port = 1935
}
}
return 1
}
func socksSetup(r *C.RTMP, sockshost *C.AVal) {
if sockshost.av_len != 0 {
socksport := strchr((*byte)(unsafe.Pointer(sockshost.av_val)), ':')
hostname := strdup((*byte)(unsafe.Pointer(sockshost.av_val)))
if uintptr(unsafe.Pointer(socksport)) != 0 {
*indxBytePtr(unsafe.Pointer(hostname),
int(uintptr(decBytePtr(unsafe.Pointer(socksport),
int(uintptr(unsafe.Pointer(sockshost.av_val))))))) = '\000'
r.Link.sockshost.av_val = (*C.char)(unsafe.Pointer(hostname))
r.Link.sockshost.av_len = C.int(strlen(hostname))
value, err := strconv.Atoi(string(ptrToSlice(unsafe.Pointer(uintptr(
unsafe.Pointer(socksport))+uintptr(1)), int(strlen((*byte)(unsafe.Pointer(
uintptr(unsafe.Pointer(socksport))+uintptr(1)))))+1)))
if err != nil {
log.Println("socksSetup: bad string conversion!")
}
if uintptr(unsafe.Pointer(socksport)) == 0 {
value = 1080
}
r.Link.socksport = C.ushort(value)
}
}
}
/*
func rtmpClose(r *C.RTMP) {
closeInternal(r, 0)
}
func closeInternal(r *C.RTMP, reconnect int32) {
var i int32
if rtmpIsConnected(r) != 0 {
if r.m_stream_id > 0 {
i = int32(r.m_stream_id)
if r.Link.protocol&RTMP_FEATURE_WRITE != 0 {
C.SendFCUnpublish(r)
}
C.SendDeleteStream(r, C.double(i))
}
C.RTMPSockBuf_Close(&r.m_sb)
}
r.m_stream_id = -1
r.m_sb.sb_socket = -1
r.m_nBWCheckCounter = 0
r.m_nBytesIn = 0
r.m_nBytesInSent = 0
if r.m_read.flags&RTMP_READ_HEADER != 0 {
C.free(unsafe.Pointer(r.m_read.buf))
r.m_read.buf = nil
}
r.m_read.dataType = 0
r.m_read.flags = 0
r.m_read.status = 0
r.m_read.nResumeTS = 0
r.m_read.nIgnoredFrameCounter = 0
r.m_read.nIgnoredFlvFrameCounter = 0
r.m_write.m_nBytesRead = 0
C.RTMPPacket_Free(&r.m_write)
for i := 0; i < int(r.m_channelsAllocatedIn); i++ {
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i,
int(unsafe.Sizeof(&r.m_write)))) != nil {
C.RTMPPacket_Free(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i,
int(unsafe.Sizeof(&r.m_write)))))
C.free(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
i, int(unsafe.Sizeof(&r.m_write))))))
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn),
i, int(unsafe.Sizeof(&r.m_write)))) = nil
}
}
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
C.AV_clear(r.m_methodCalls, r.m_numCalls)
r.m_methodCalls = nil
r.m_numCalls = 0
r.m_numInvokes = 0
r.m_bPlaying = C.uchar(0)
r.m_sb.sb_size = 0
r.m_msgCounter = 0
r.m_resplen = 0
r.m_unackd = 0
if ((r.Link.lFlags & RTMP_LF_FTCU) != 0) && (reconnect == 0) {
C.free(unsafe.Pointer(r.Link.app.av_val))
r.Link.app.av_val = nil
r.Link.lFlags ^= RTMP_LF_FAPU
}
if reconnect == 0 {
C.free(unsafe.Pointer(r.Link.playpath0.av_val))
r.Link.playpath0.av_val = nil
}
}
*/
func rtmpEnableWrite(r *C.RTMP) {
r.Link.protocol |= RTMP_FEATURE_WRITE
}
func rtmpSetBufferMS(r *C.RTMP, size int32) {
r.m_nBufferMS = C.int(size)
}
func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int {
// TODO: port this
var service C.sockaddr_in
if r.Link.hostname.av_len == 0 {
return 0
}
var tmp C.sockaddr_in
memset((*byte)(unsafe.Pointer(&service)), 0, int(unsafe.Sizeof(tmp)))
// TODO: port this
service.sin_family = C.AF_INET
if r.Link.socksport != 0 {
// TODO: port this
if C.add_addr_info(&service, &r.Link.sockshost, C.int(r.Link.socksport)) == 0 {
return 0
}
} else {
// connect directly
if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)),
C.int(r.Link.port)) == 0 {
return 0
}
}
//if C.RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 {
if rtmpConnect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 {
return 0
}
r.m_bSendCounter = 1
return int(rtmpConnect1(r, cp))
//return int(C.RTMP_Connect1(r, cp))
}
func rtmpConnect0(r *C.RTMP, service *C.sockaddr) int {
on := 1
r.m_sb.sb_timedout = 0
r.m_pausing = 0
r.m_fDuration = 0.0
r.m_sb.sb_socket = C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP)
if r.m_sb.sb_socket != -1 {
if C.connect(r.m_sb.sb_socket, service, C.socklen_t(unsafe.Sizeof(*service))) < 0 {
log.Println("rtmpConnect0, failed to connect socket.")
}
if r.Link.socksport != 0 {
if debugMode {
log.Println("rtmpConnect0: socks negotiation.")
}
if C.SocksNegotiate(r) == 0 {
log.Println("rtmpConnect0: SOCKS negotiation failed.")
return 0
}
}
} else {
log.Println("rtmpConnect0: failed to create socket.")
return 0
}
{
tv := C.int(r.Link.timeout * 1000)
if C.setsockopt(r.m_sb.sb_socket, C.SOL_SOCKET, C.SO_RCVTIMEO,
unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 {
log.Println("rtmpConnect0: Setting socket timeout failed")
}
}
C.setsockopt(r.m_sb.sb_socket, C.IPPROTO_TCP, C.TCP_NODELAY,
unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on)))
return 1
}
func rtmpConnect1(r *C.RTMP, cp *C.RTMPPacket) int {
if debugMode {
log.Println("... connected, handshaking...")
}
//if C.HandShake(r, 1) == 0 {
if handShake(r, 1) == 0 {
log.Println("rtmpConnect1: handshake failed!")
return 0
}
if debugMode {
log.Println("... handshaked...")
}
//if C.SendConnectPacket(r, cp) == 0 {
if sendConnectPacket(r, cp) == 0 {
log.Println("RTMP connect failed!")
return 0
}
return 1
}
func handShake(r *C.RTMP, FP9HandShake int32) int {
var bMatch int
//uptime := uint32(0)
//suptime := uint32(0)
//typ := byte(0)
var uptime, suptime uint32
var typ byte
//clientbuf := make([]byte, RTMP_SIG_SIZE+1)
var clientbuf [RTMP_SIG_SIZE + 1]byte
clientsig := (*byte)(incBytePtr(unsafe.Pointer(&clientbuf[0]), 1))
//serversig := make([]byte, RTMP_SIG_SIZE)
var serversig [RTMP_SIG_SIZE]byte
clientbuf[0] = 0x03 // not encrypted
// TODO: port rtmp_getTime
uptime = inet.Htonl(uint32(C.RTMP_GetTime()))
memmove(unsafe.Pointer(clientsig), unsafe.Pointer(&uptime), 4)
memset(indxBytePtr(unsafe.Pointer(clientsig), 4), 0, 4)
for i := 8; i < RTMP_SIG_SIZE; i++ {
*indxBytePtr(unsafe.Pointer(clientsig), i) = byte(rand.Intn(256))
}
if writeN(r, unsafe.Pointer(&clientbuf[0]), RTMP_SIG_SIZE+1) == 0 {
return 0
}
if C.ReadN(r, (*C.char)(unsafe.Pointer(&typ)), 1) != 1 {
//if readN(r, (*byte)(unsafe.Pointer(&typ)), 1) != 1 {
return 0
}
if debugMode {
log.Println("handShake: Type answer: %v", typ)
}
if typ != clientbuf[0] {
log.Println("handShake: type mismatch: client sent %v, server sent: %v",
clientbuf[0], typ)
}
if readN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
//if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
return 0
}
// decode server response
memmove(unsafe.Pointer(&suptime), unsafe.Pointer(&serversig[0]), 4)
suptime = inet.Ntohl(suptime)
// 2nd part of handshake
if writeN(r, unsafe.Pointer(&serversig[0]), RTMP_SIG_SIZE) == 0 {
return 0
}
//if readN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE {
return 0
}
// TODO: find golang memcmp
bMatch = 0
if memcmp(unsafe.Pointer(&serversig[0]), unsafe.Pointer(clientsig),
RTMP_SIG_SIZE) == 0 {
bMatch = 1
}
if bMatch == 0 {
log.Println("Client signature does not match!")
}
return 1
}
func readN(r *C.RTMP, buffer *byte, n int) int {
nOriginalSize := n
var avail int
var ptr *byte
r.m_sb.sb_timedout = 0
ptr = buffer
for n > 0 {
nBytes := 0
var nRead int
avail = int(r.m_sb.sb_size)
if avail == 0 {
if rtmpSockBufFill(&r.m_sb) < 1 {
// if C.RTMPSockBuf_Fill(&r.m_sb) < 1 {
if r.m_sb.sb_timedout == 0 {
return 0
}
}
avail = int(r.m_sb.sb_size)
}
if n < avail {
nRead = n
} else {
nRead = avail
}
if nRead > 0 {
memmove(unsafe.Pointer(ptr), unsafe.Pointer(r.m_sb.sb_start), uintptr(nRead))
r.m_sb.sb_start = (*C.char)(incBytePtr(unsafe.Pointer(r.m_sb.sb_start),
nRead))
r.m_sb.sb_size -= C.int(nRead)
nBytes = nRead
r.m_nBytesIn += C.int(nRead)
if r.m_bSendCounter != 0 && r.m_nBytesIn > (r.m_nBytesInSent+
r.m_nClientBW/10) {
//if C.SendBytesReceived(r) == 0 {
if sendBytesReceived(r) == 0 {
return 0
}
}
}
if nBytes == 0 {
log.Println("RTMP socket closed by peer")
// RTMP_Close(r)
break
}
n -= nBytes
ptr = (*byte)(incBytePtr(unsafe.Pointer(ptr), nBytes))
}
return nOriginalSize - n
}
func rtmpSockBufFill(sb *C.RTMPSockBuf) int {
var nBytes int
if sb.sb_size == 0 {
sb.sb_start = &sb.sb_buf[0]
}
for {
nBytes = int(unsafe.Sizeof(sb.sb_buf)) - 1 - int(sb.sb_size) -
int(uintptr(unsafe.Pointer(sb.sb_start))-uintptr(unsafe.Pointer(
&sb.sb_buf[0])))
// TODO: figure out what to do with recv
nBytes = int(C.recv(sb.sb_socket, unsafe.Pointer(uintptr(unsafe.Pointer(
sb.sb_start))+uintptr(int(sb.sb_size))), C.size_t(nBytes), 0))
if nBytes != -1 {
sb.sb_size += C.int(nBytes)
} else {
log.Println("rtmpSockBufFill: recv error!")
}
break
}
return nBytes
}
func sendBytesReceived(r *C.RTMP) int {
var packet C.RTMPPacket
var pbuf [256]byte
pend := (*byte)(incBytePtr(unsafe.Pointer(&pbuf[0]), 256))
packet.m_nChannel = 0x02 /* control channel (invoke) */
packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM
packet.m_packetType = RTMP_PACKET_TYPE_BYTES_READ_REPORT
packet.m_nTimeStamp = 0
packet.m_nInfoField2 = 0
packet.m_hasAbsTimestamp = 0
packet.m_body = (*C.char)(incBytePtr(unsafe.Pointer(&pbuf[0]),
RTMP_MAX_HEADER_SIZE))
packet.m_nBodySize = 4
amfEncodeInt32((*byte)(unsafe.Pointer(packet.m_body)), pend, int32(r.m_nBytesIn))
// C.AMF_EncodeInt32(packet.m_body, (*C.char)(unsafe.Pointer(pend)), r.m_nBytesIn)
r.m_nBytesInSent = r.m_nBytesIn
return int(C.RTMP_SendPacket(r, &packet, 0))
}
func sendConnectPacket(r *C.RTMP, cp *C.RTMPPacket) int {
var packet C.RTMPPacket
var pbuf [4096]byte
pend := (*byte)(unsafe.Pointer(incBytePtr(unsafe.Pointer(&pbuf[0]),
int(unsafe.Sizeof(pbuf)))))
var enc *byte
if cp != nil {
//return rtmpSendPacket(r, cp, 1)
return int(C.RTMP_SendPacket(r, cp, 1))
}
packet.m_nChannel = 0x03
packet.m_headerType = RTMP_PACKET_SIZE_LARGE
packet.m_packetType = RTMP_PACKET_TYPE_INVOKE
packet.m_nTimeStamp = 0
packet.m_nInfoField2 = 0
packet.m_hasAbsTimestamp = 0
packet.m_body = (*C.char)(incBytePtr(unsafe.Pointer(&pbuf[0]),
RTMP_MAX_HEADER_SIZE))
enc = (*byte)(unsafe.Pointer(packet.m_body))
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeString((*C.char)(unsafe.Pointer(enc)),
//(*C.char)(unsafe.Pointer(pend)), &av_connect)))
enc = amfEncodeString(enc, pend, &av_connect)
r.m_numInvokes += 1
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNumber((*C.char)(unsafe.Pointer(enc)),
//(*C.char)(unsafe.Pointer(pend)), C.double(r.m_numInvokes))))
enc = amfEncodeNumber(enc, pend, float64(r.m_numInvokes))
*indxBytePtr(unsafe.Pointer(enc), 0) = AMF_OBJECT
enc = (*byte)(unsafe.Pointer(incBytePtr(unsafe.Pointer(enc), 1)))
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_app, &r.Link.app)))
enc = amfEncodeNamedString(enc, pend, &av_app, &r.Link.app)
if enc == nil {
return 0
}
if r.Link.protocol&RTMP_FEATURE_WRITE != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_type, &av_nonprivate)))
enc = amfEncodeNamedString(enc, pend, &av_type, &av_nonprivate)
if enc == nil {
return 0
}
}
if r.Link.flashVer.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_flashVer, &r.Link.flashVer)))
enc = amfEncodeNamedString(enc, pend, &av_flashVer, &r.Link.flashVer)
if enc == nil {
return 0
}
}
if r.Link.swfUrl.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_swfUrl, &r.Link.swfUrl)))
enc = amfEncodeNamedString(enc, pend, &av_swfUrl, &r.Link.swfUrl)
if enc == nil {
return 0
}
}
if r.Link.tcUrl.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_tcUrl, &r.Link.tcUrl)))
enc = amfEncodeNamedString(enc, pend, &av_tcUrl, &r.Link.tcUrl)
if enc == nil {
return 0
}
}
if r.Link.protocol&RTMP_FEATURE_WRITE == 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedBoolean((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_fpad, 0)))
enc = amfEncodeNamedBoolean(enc, pend, &av_fpad, 0)
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_capabilities, 15.0)))
enc = amfEncodeNamedNumber(enc, pend, &av_capabilities, 15.0)
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_audioCodecs, r.m_fAudioCodecs)))
enc = amfEncodeNamedNumber(enc, pend, &av_audioCodecs, float64(r.m_fAudioCodecs))
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_videoCodecs, r.m_fVideoCodecs)))
enc = amfEncodeNamedNumber(enc, pend, &av_videoCodecs, float64(r.m_fVideoCodecs))
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_videoFunction, 1.0)))
enc = amfEncodeNamedNumber(enc, pend, &av_videoFunction, 1.0)
if enc == nil {
return 0
}
if r.Link.pageUrl.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_pageUrl, &r.Link.pageUrl)))
enc = amfEncodeNamedString(enc, pend, &av_pageUrl, &r.Link.pageUrl)
if enc == nil {
return 0
}
}
}
if r.m_fEncoding != 0.0 || r.m_bSendEncoding != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)(
//unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_objectEncoding, r.m_fEncoding)))
enc = amfEncodeNamedNumber(enc, pend, &av_objectEncoding, float64(r.m_fEncoding))
if enc == nil {
return 0
}
}
if int(uintptr(incBytePtr(unsafe.Pointer(enc), 3))) >= int(uintptr(
unsafe.Pointer(pend))) {
return 0
}
*indxBytePtr(unsafe.Pointer(enc), 0) = 0
enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1))
*indxBytePtr(unsafe.Pointer(enc), 0) = 0
enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1))
*indxBytePtr(unsafe.Pointer(enc), 0) = C.AMF_OBJECT_END
enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1))
/* add auth string */
if r.Link.auth.av_len != 0 {
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeBoolean((*C.char)(
// unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), r.Link.lFlags&RTMP_LF_AUTH)))
enc = amfEncodeBoolean(enc, pend, int(r.Link.lFlags&RTMP_LF_AUTH))
if enc == nil {
return 0
}
//enc = (*byte)(unsafe.Pointer(C.AMF_EncodeString((*C.char)(unsafe.Pointer(enc)),
//(*C.char)(unsafe.Pointer(pend)), &r.Link.auth)))
enc = amfEncodeString(enc, (*byte)(pend), &r.Link.auth)
if enc == nil {
return 0
}
}
if r.Link.extras.o_num != 0 {
for i := 0; i < int(r.Link.extras.o_num); i++ {
enc = (*byte)(unsafe.Pointer(C.AMFProp_Encode((*C.AMFObjectProperty)(
incPtr(unsafe.Pointer(&r.Link.extras.o_props), int(unsafe.Sizeof(
r.Link.extras.o_props)), i)), (*C.char)(unsafe.Pointer(enc)), (*C.char)(
unsafe.Pointer(pend)))))
if enc == nil {
return 0
}
}
}
packet.m_nBodySize = C.uint32_t(int(uintptr(decBytePtr(unsafe.Pointer(enc),
int(uintptr(unsafe.Pointer(packet.m_body)))))))
return int(C.RTMP_SendPacket(r, &packet, 1))
//return rtmpSendPacket(r, &packet, 1)
}
func rtmpConnectStream(r *C.RTMP, seekTime int32) int {
var packet C.RTMPPacket
memset((*byte)(unsafe.Pointer(&packet)), 0, int(unsafe.Sizeof(packet)))
if seekTime > 0 {
r.Link.seekTime = C.int(seekTime)
}
r.m_mediaChannel = 0
// TODO: port is connected and read packet
for r.m_bPlaying == 0 && rtmpIsConnected(r) != 0 &&
C.RTMP_ReadPacket(r, &packet) != 0 {
// TODO: port is ready
if rtmpPacketIsReady(&packet) != 0 {
if packet.m_nBodySize == 0 {
continue
}
if packet.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
packet.m_packetType == RTMP_PACKET_TYPE_VIDEO ||
packet.m_packetType == RTMP_PACKET_TYPE_INFO {
log.Println("rtmpConnectStream: got packet before play()! Ignoring.")
C.RTMPPacket_Free(&packet)
continue
}
// TODO: port this
C.RTMP_ClientPacket(r, &packet)
C.RTMPPacket_Free(&packet)
}
}
return int(r.m_bPlaying)
}
func rtmpPacketIsReady(p *C.RTMPPacket) int {
if p.m_nBytesRead == p.m_nBodySize {
return 1
}
return 0
}
func endSession(rtmp *C.RTMP) uint32 {
if rtmp == nil {
return 3
}
//C.RTMP_Close(rtmp)
//C.RTMP_Free(rtmp)
return 0
}
// rtmpWrite writes data to the current rtmp connection encapsulated by r
func rtmpWrite(r *C.RTMP, data []byte) int {
buf := sliceToPtr(data)
// TODO: port RTMPPacket
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
s2 := size
var ret, num int
pkt.m_nChannel = 0x04
pkt.m_nInfoField2 = C.int32_t(r.m_stream_id)
for s2 != 0 {
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
s2 -= 13
}
pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0))
buf = incBytePtr(buf, 1)
pkt.m_nBodySize = C.uint32_t(amfDecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp = C.uint32_t(amfDecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24
buf = incBytePtr(buf, 4)
s2 -= 11
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
pkt.m_nBodySize += 16
}
} else {
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
}
// TODO: Port this
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
return 0
}
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
enc = unsafe.Pointer(amfEncodeString((*byte)(enc), (*byte)(pend), &setDataFrame))
pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) -
uintptr(unsafe.Pointer(pkt.m_body)))))
}
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
if num > s2 {
num = s2
}
memmove(enc, buf, uintptr(num))
pkt.m_nBytesRead += C.uint32_t(num)
s2 -= num
buf = incBytePtr(buf, num)
if pkt.m_nBytesRead == pkt.m_nBodySize {
// TODO: Port this
ret = rtmpSendPacket(r, pkt, 0)
// TODO: Port this
C.RTMPPacket_Free(pkt)
pkt.m_nBytesRead = 0
if ret == 0 {
return -1
}
buf = incBytePtr(buf, 4)
s2 -= 4
if s2 < 0 {
break
}
}
}
return size + s2
}
// send packet version 1 - less C stuff
func rtmpSendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
var prevPacket *C.RTMPPacket
last := 0
var nSize, hSize, cSize, nChunkSize int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
var goHbuf [RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
var t int32
var packets unsafe.Pointer
if packet.m_nChannel >= r.m_channelsAllocatedOut {
n := int(packet.m_nChannel + 10)
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut), C.size_t(
unsafe.Sizeof(packet)*uintptr(n)))
//packets = realloc(unsafe.Pointer(r.m_vecChannelsOut),
//int(unsafe.Sizeof(packet)*uintptr(n)))
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)*
uintptr(n-int(r.m_channelsAllocatedOut))))
//memset((*byte)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(
// r.m_channelsAllocatedOut), int(unsafe.Sizeof(packet)))), 0, int(
// unsafe.Sizeof(packet)*uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = packetSize[int(packet.m_headerType)]
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize)
hend = unsafe.Pointer(packet.m_body)
} else {
header = incBytePtr(hbuf, 6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
hend = incBytePtr(hbuf, RTMP_MAX_HEADER_SIZE)
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header, 4)
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = c
hptr = incBytePtr(hptr, 1)
if cSize != 0 {
tmp := packet.m_nChannel - 64
*(*byte)(hptr) = byte(tmp & 0xff)
hptr = incBytePtr(hptr, 1)
if cSize == 2 {
*(*byte)(hptr) = byte(tmp >> 8)
hptr = incBytePtr(hptr, 1)
}
}
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
hptr = unsafe.Pointer(amfEncodeInt24((*byte)(hptr), (*byte)(hend), res))
}
if nSize > 4 {
hptr = unsafe.Pointer(amfEncodeInt24((*byte)(hptr), (*byte)(hend), (int32(packet.m_nBodySize))))
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = incBytePtr(hptr, 1)
}
if nSize > 8 {
hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr),
C.int(packet.m_nInfoField2))))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(amfEncodeInt32((*byte)(hptr), (*byte)(hend), (int32)(t)))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("rtmpSendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
if tbuf != nil {
//memmove(toff, header, uintptr(nChunkSize + hSize))
copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header,
int(nChunkSize+hSize)))
toff = incBytePtr(toff, nChunkSize+hSize)
} else {
// TODO: port this
wrote = int(writeN(r, header, nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
buffer = incBytePtr(buffer, nChunkSize)
hSize = 0
if nSize > 0 {
header = decBytePtr(buffer, 1)
hSize = 1
if cSize != 0 {
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
header = decBytePtr(header, 4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
*indxBytePtr(header, 1) = byte(tmp & 0xff)
if cSize == 2 {
*indxBytePtr(header, 2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
extendedTimestamp := incBytePtr(header, 1+cSize)
amfEncodeInt32((*byte)(extendedTimestamp),
(*byte)(incBytePtr(extendedTimestamp, 4)), (int32)(t))
}
}
}
if tbuf != nil {
wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff,
int(uintptr(unsafe.Pointer(tbuf))))))))
//C.free(tbuf)
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
var method C.AVal
var ptr unsafe.Pointer
ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1)
C.AMF_DecodeString((*C.char)(ptr), &method)
//amfDecodeString((*byte)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
ptr = incBytePtr(ptr, 3+int(method.av_len))
txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
//txn = int(amfDecodeNumber((*byte)(ptr)))
// TODO: port this
C.AV_queue(&r.m_methodCalls, (*C.int)(unsafe.Pointer(&r.m_numCalls)), &method,
C.int(txn))
}
}
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil {
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) =
(*C.RTMPPacket)(allocate(unsafe.Sizeof(*packet)))
}
memmove(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
uintptr(unsafe.Sizeof(*packet)))
return 1
}
func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
// TODO: port this if necessary
nBytes = int(sockBufSend(&r.m_sb, (*byte)(ptr), int32(n)))
if nBytes < 0 {
if debugMode {
log.Println("writeN, RTMP send error")
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
ptr = incBytePtr(ptr, nBytes)
}
if n == 0 {
return 1
}
return 0
}
const length = 512
var RTMPT_cmds = []string{
"open",
"send",
"idle",
"close",
}
func sockBufSend(sb *C.RTMPSockBuf, buf *byte, l int32) int32 {
return int32(C.send(sb.sb_socket, unsafe.Pointer(buf), C.size_t(l), 0))
}
// TODO: port RTMP_METHOD
func avQueue(vals **C.RTMP_METHOD, num *int, av *C.AVal, txn int) {
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
//*vals = (*C.RTMP_METHOD)(realloc(unsafe.Pointer(*vals), int((*num+16)*
//int(unsafe.Sizeof(*(*vals))))))
*vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)*int(
unsafe.Sizeof(*(*vals))))))
}
//tmp := C.malloc(C.size_t(av.av_len + 1))
tmp := allocate(uintptr(av.av_len + 1))
memmove(tmp, unsafe.Pointer(av.av_val), uintptr(av.av_len))
*indxBytePtr(tmp, int(av.av_len)) = '\000'
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
int(unsafe.Sizeof(*(*vals)))))).num = C.int(txn)
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
int(unsafe.Sizeof(*(*vals)))))).name.av_len = av.av_len
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
int(unsafe.Sizeof(*(*vals)))))).name.av_val = (*C.char)(tmp)
*num++
}
func amfEncodeNamedNumber(output *byte, outend *byte, strName *C.AVal, dVal float64) *byte {
if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
output = amfEncodeInt16(output, outend, int16(strName.av_len))
memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len))
output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len)))
return amfEncodeNumber(output, outend, dVal)
}
func amfEncodeNamedBoolean(output *byte, outend *byte, strName *C.AVal, bVal int) *byte {
if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
output = amfEncodeInt16(output, outend, int16(strName.av_len))
memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len))
output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len)))
return amfEncodeBoolean(output, outend, bVal)
}
func amfEncodeBoolean(output *byte, outend *byte, bVal int) *byte {
if int(uintptr(unsafe.Pointer(output)))+2 > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
*(*byte)(unsafe.Pointer(output)) = AMF_BOOLEAN
output = (*byte)(incBytePtr(unsafe.Pointer(output), 1))
val := byte(0x01)
if bVal == 0 {
val = byte(0x00)
}
*(*byte)(unsafe.Pointer(output)) = val
output = (*byte)(incBytePtr(unsafe.Pointer(output), 1))
return output
}
func amfEncodeNumber(output *byte, outend *byte, dVal float64) *byte {
if int(uintptr(unsafe.Pointer(output)))+1+8 > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
// TODO: port this
*(*byte)(unsafe.Pointer(output)) = C.AMF_NUMBER
output = (*byte)(incBytePtr(unsafe.Pointer(output), 1))
// NOTE: here we are assuming little endian for both byte order and float
// word order
var ci, co *uint8
ci = (*uint8)(unsafe.Pointer(&dVal))
co = (*uint8)(unsafe.Pointer(output))
for i := 0; i < 8; i++ {
*indxBytePtr(unsafe.Pointer(co), i) = *indxBytePtr(unsafe.Pointer(ci), 7-i)
}
return (*byte)(incBytePtr(unsafe.Pointer(output), 8))
}
func amfDecodeNumber(data *byte) float64 {
var dVal float64
var ci, co *uint8
ci = (*uint8)(unsafe.Pointer(data))
co = (*uint8)(unsafe.Pointer(&dVal))
for i := 0; i < 8; i++ {
*indxBytePtr(unsafe.Pointer(co), i) = *indxBytePtr(unsafe.Pointer(ci), 7-i)
}
return dVal
}
func amfEncodeNamedString(output *byte, outend *byte, strName *C.AVal, strValue *C.AVal) *byte {
if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) {
return nil
}
output = amfEncodeInt16(output, outend, int16(strName.av_len))
memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len))
output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len)))
return amfEncodeString(output, outend, strValue)
}
// amfDecodeString decodes data into a string inside a AVal
func amfDecodeString(data *byte, bv *C.AVal) {
dataPtr := unsafe.Pointer(data)
bv.av_len = C.int(amfDecodeInt16((*byte)(dataPtr)))
if bv.av_len > 0 {
bv.av_val = (*C.char)(incBytePtr(dataPtr, 2))
} else {
bv.av_val = nil
}
}
// amfDecodeInt16 decodes data into a 16 bit number
func amfDecodeInt16(data *byte) uint16 {
c := unsafe.Pointer(data)
return (uint16(*(*uint8)(c)) << 8) | *(*uint16)(incBytePtr(c, 1))
}
// amfEncodeInt24 encodes a int24 into data
func amfEncodeInt24(output *byte, outend *byte, nVal int32) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+3 > uintptr(outendPtr) {
// length < 3
return nil
}
// Assign output[2]
third := (*byte)(incBytePtr(outputPtr, 2))
*third = (byte)(nVal & 0xff)
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal >> 8)
// Assign output[0]
*output = (byte)(nVal >> 16)
return (*byte)(incBytePtr(outputPtr, 3))
}
// amfDecodeInt24 decodes data into an unsigned int
func amfDecodeInt24(data *byte) uint32 {
// TODO Understand logic and simplify
c := (*uint8)(unsafe.Pointer(data))
dst := uint32(int32(*c) << 16)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(1))*unsafe.Sizeof(*c))))) << 8)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(2))*unsafe.Sizeof(*c))))))
return dst
}
func amfEncodeString(output *byte, outend *byte, bv *C.AVal) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) >
uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) >
uintptr(outendPtr) {
return nil
}
if bv.av_len < 65536 {
*(*byte)(outputPtr) = AMF_STRING
outputPtr = incBytePtr(outputPtr, 1)
outputPtr = unsafe.Pointer(C.AMF_EncodeInt16((*C.char)(outputPtr), (*C.char)(
outendPtr), C.short(bv.av_len)))
//outputPtr = unsafe.Pointer(amfEncodeInt16((*byte)(outputPtr),
//(*byte)(outendPtr), (int16)(bv.av_len)))
} else {
*(*byte)(outputPtr) = AMF_LONG_STRING
outputPtr = incBytePtr(outputPtr, 1)
outputPtr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(outputPtr), (*C.char)(
outendPtr), C.int(bv.av_len)))
//outputPtr = unsafe.Pointer(amfEncodeInt32((*byte)(outputPtr),
//(*byte)(outendPtr), (int32)(bv.av_len)))
}
memmove(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), uintptr(bv.av_len))
//C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len))
outputPtr = incBytePtr(outputPtr, int(bv.av_len))
return (*byte)(outputPtr)
}
// amfEncodeInt16 encodes a int16 into data
func amfEncodeInt16(output *byte, outend *byte, nVal int16) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+2 > uintptr(outendPtr) {
// length < 2
return nil
}
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal & 0xff)
// Assign output[0]
*output = (byte)(nVal >> 8)
return (*byte)(incBytePtr(outputPtr, 2))
}
// amfEncodeInt32 encodes a int32 into data
func amfEncodeInt32(output *byte, outend *byte, nVal int32) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if uintptr(outputPtr)+4 > uintptr(outendPtr) {
// length < 4
return nil
}
// Assign output[3]
forth := (*byte)(incBytePtr(outputPtr, 3))
*forth = (byte)(nVal & 0xff)
// Assign output[2]
third := (*byte)(incBytePtr(outputPtr, 2))
*third = (byte)(nVal >> 8)
// Assign output[1]
second := (*byte)(incBytePtr(outputPtr, 1))
*second = (byte)(nVal >> 16)
// Assign output[0]
*output = (byte)(nVal >> 24)
return (*byte)(incBytePtr(outputPtr, 4))
}
func realloc(ptr unsafe.Pointer, size int) unsafe.Pointer {
dest := allocate(uintptr(size))
if ptr != nil {
memmove(dest, ptr, uintptr(size))
}
return dest
}
func memmove(to, from unsafe.Pointer, n uintptr) {
copy(ptrToSlice(to, int(n)), ptrToSlice(from, int(n)))
}
// TODO: write test for this func
func memcmp(a, b unsafe.Pointer, size int) int {
for i := 0; i < size; i++ {
aValue := *indxBytePtr(a, i)
bValue := *indxBytePtr(b, i)
if aValue != bValue {
if aValue < bValue {
return -1
} else {
return 1
}
}
}
return 0
}
func memset(ptr *byte, val int, num int) {
for i := 0; i < num; i++ {
*indxBytePtr(unsafe.Pointer(ptr), int(i)) = byte(uint8(val))
}
}
func strLen(str string) int {
return len(str)
}
// wrapper for converting byte pointer to unsafe.Pointer
func bToUP(b *byte) unsafe.Pointer {
return unsafe.Pointer(b)
}
// wrapper for converting slice to unsafe.pointer
func sToUP(b []byte) unsafe.Pointer {
return unsafe.Pointer(&b[0])
}
// Creates a new C style string from a go string
func goStrToCStr(str string) *byte {
l := len(str)
slice := make([]byte, l+1)
ptr := unsafe.Pointer(&[]byte(str)[0])
for i := 0; i < l; i++ {
slice[i] = *indxBytePtr(ptr, i)
}
slice[l] = '\000'
return &slice[0]
}
// Duplicates a string given as a byte pointer
func strdup(str *byte) *byte {
length := strlen(str)
newMem := make([]byte, length+1)
oldMem := ptrToSlice(unsafe.Pointer(str), int(length+1))
copy(newMem, oldMem)
return &newMem[0]
}
// Gets the length of the string found at str - length is number of chars
// between start and terminating null char. Returns -1 if a null char is not
// found before a count of 1000
func strlen(str *byte) int32 {
var ptr *byte
for i := 0; i < 1000; i++ {
ptr = indxBytePtr(unsafe.Pointer(str), i)
if *ptr == '\000' {
return int32(i)
}
}
return int32(-1)
}
// Returns the pointer where the first occurance of val is located in a string
// which is terminated by a null char. Returns nil if null char is not found
// before a count of 10000
func strchr(str *byte, val byte) *byte {
var ptr *byte
for i := 0; i < 1000; i++ {
ptr = indxBytePtr(unsafe.Pointer(str), i)
if *ptr == val {
return ptr
}
if *ptr == '\000' {
break
}
}
return nil
}
// Creates mem of the size noOfBytes. returns as unsafe pointer
func allocate(nOfBytes uintptr) unsafe.Pointer {
mem := make([]byte, int(nOfBytes))
return unsafe.Pointer(&mem[0])
}
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) *byte {
return (*byte)(incPtr(ptr, inc, byteSize))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 {
return (*int32)(incPtr(ptr, inc, int32Size))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 {
return (*int64)(incPtr(ptr, inc, int64Size))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, byteSize)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, int32Size)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
return incPtr(ptr, inc, int64Size)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, byteSize)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, int32Size)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
return decPtr(ptr, dec, int64Size)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
// C.AVal is in amf.h
// See AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}