av/rtmp/rtmp.go

877 lines
22 KiB
Go
Raw Normal View History

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -lrtmp -Wl,-rpath=/usr/local/lib
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
2018-07-17 13:05:25 +03:00
typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
} RTMPTCmd;
2018-05-30 09:22:33 +03:00
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
2018-07-16 06:46:40 +03:00
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int WriteN(RTMP *r, const char *buffer, int n);
int EncodeInt32LE(char *output, int nVal);
2018-07-17 13:05:25 +03:00
int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len);
*/
import "C"
import (
"errors"
"fmt"
"log"
"math"
2018-07-14 08:19:21 +03:00
"reflect"
"strconv"
"unsafe"
)
2018-07-17 13:05:25 +03:00
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
2018-07-18 18:24:36 +03:00
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
)
const (
minDataSize = 11
debugMode = false
)
const (
byteSize = 1
int32Size = 4
int64Size = 8
)
// memmove copies n bytes from "from" to "to".
//go:linkname memmove runtime.memmove
//func memmove(to, from unsafe.Pointer, n uintptr)
// av_setDataFrame is a static const global in rtmp.c
var setDataFrame = AVC("@setDataFrame")
2018-07-18 05:26:32 +03:00
var packetSize = [...]int{12, 8, 4, 1}
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
2018-06-17 14:15:48 +03:00
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
2018-07-11 18:20:18 +03:00
rtmp *C.RTMP
url string
timeout uint
}
2018-07-18 18:10:36 +03:00
type RTMP struct {
m_inChunkSize int
m_outChunkSize int
m_nBWCheckCounter int
m_nBytesIn int
m_nBytesInSent int
m_nBufferMS int
m_stream_id int
m_mediaChannel int
m_mediaStamp uint32
m_pauseStamp uint32
m_pausing int
m_nServerBw int
m_nClientBw int
m_nClientBw2 uint8
m_bPlaying uint8
m_bSendEncoding uint8
m_bSendCounter uint8
m_numInvokes int
m_numCalls int
2018-07-18 18:24:36 +03:00
m_methodCalls *RTMP_METHOD
2018-07-18 18:10:36 +03:00
m_channelsAllocatedIn int
m_channelsAllocatedOut int
2018-07-18 18:15:01 +03:00
m_vecChannelsIn **RTMPPacket
m_vecChannelsOut **RTMPPacket
2018-07-18 18:10:36 +03:00
m_channelTimestamp *int
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int
m_polling int
m_resplen int
m_unackd int
2018-07-18 18:24:36 +03:00
m_clientID AVal
2018-07-18 18:33:59 +03:00
m_read RTMP_READ
m_write RTMPPacket
2018-07-18 18:10:36 +03:00
m_sb C.RTMPSockBuf
Link C.RTMP_LNK
}
2018-07-18 18:13:59 +03:00
type RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp uint8
m_nChannel int
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
2018-07-18 18:33:59 +03:00
m_chunk *C.RTMPChunk
2018-07-18 18:13:59 +03:00
m_body *byte
}
2018-07-18 18:15:01 +03:00
type RTMP_METHOD struct {
2018-07-18 18:24:36 +03:00
name AVal
2018-07-18 18:15:01 +03:00
num int
}
2018-07-18 18:16:52 +03:00
type AVal struct {
av_val *byte
av_len int
}
2018-07-18 18:24:36 +03:00
type RTMP_READ struct {
buf *byte
bufpos *byte
buflen uint
timestamp uint32
dataType uint8
flags uint8
status int8
initialFrameType uint8
nResumeTS uint32
metaHeader *byte
initialFrame *byte
nMetaHeaderSize uint32
nInitialFrameSize uint32
nIgnoredFrameCounter uint32
nIgnoredFlvFrameCounter uint32
}
2018-07-18 18:33:59 +03:00
type RTMPSockBuf struct {
sb_socket int
sb_size int
sb_start *byte
sb_buf [C.RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout int
sb_ssl uintptr
}
2018-07-18 18:41:55 +03:00
type RTMPChunk struct {
c_headerSize int
c_chunkSize int
c_chunk *byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
2018-06-17 14:15:48 +03:00
// Open establishes an rtmp connection with the url passed into the
// constructor
2018-06-17 14:15:48 +03:00
func (s *session) Open() error {
2018-05-30 09:22:33 +03:00
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
2018-06-20 07:26:40 +03:00
var err error
s.rtmp, err = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout))
2018-05-30 09:22:33 +03:00
if s.rtmp == nil {
2018-06-20 07:26:40 +03:00
return err
}
return nil
}
2018-07-15 21:09:36 +03:00
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
ret := C.end_session(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
2018-07-11 20:20:00 +03:00
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
2018-07-13 14:00:52 +03:00
if C.RTMP_IsConnected(s.rtmp) == 0 {
2018-07-11 20:20:00 +03:00
return 0, Err(1)
}
2018-07-13 14:00:52 +03:00
//if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 {
if rtmpWrite(s.rtmp, data) == 0 {
2018-07-11 20:20:00 +03:00
return 0, Err(2)
}
return len(data), nil
}
2018-07-05 09:55:46 +03:00
2018-07-14 08:51:01 +03:00
// rtmpWrite writes data to the current rtmp connection encapsulated by r
func rtmpWrite(r *C.RTMP, data []byte) int {
2018-07-14 08:19:21 +03:00
buf := sliceToPtr(data)
2018-07-14 08:22:54 +03:00
// TODO: port RTMPPacket
2018-07-12 20:03:06 +03:00
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
2018-07-10 12:15:34 +03:00
s2 := size
var ret, num int
2018-07-05 09:55:46 +03:00
pkt.m_nChannel = 0x04
2018-07-16 18:38:31 +03:00
pkt.m_nInfoField2 = C.int32_t(r.m_stream_id)
for s2 != 0 {
2018-07-10 12:15:34 +03:00
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
2018-07-05 09:55:46 +03:00
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
2018-07-14 08:31:06 +03:00
2018-07-18 05:26:32 +03:00
if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
2018-07-05 09:55:46 +03:00
s2 -= 13
}
2018-07-18 05:26:32 +03:00
pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0))
buf = incBytePtr(buf, 1)
2018-07-18 18:10:36 +03:00
pkt.m_nBodySize = C.uint32_t(C.AMF_DecodeInt24((*C.char)(buf)))
buf = incBytePtr(buf, 3)
2018-07-18 18:10:36 +03:00
pkt.m_nTimeStamp = C.uint32_t(C.AMF_DecodeInt24((*C.char)(buf)))
buf = incBytePtr(buf, 3)
2018-07-18 05:26:32 +03:00
pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24
buf = incBytePtr(buf, 4)
2018-07-05 09:55:46 +03:00
s2 -= 11
2018-07-12 21:07:57 +03:00
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
2018-07-05 09:55:46 +03:00
2018-07-12 21:07:57 +03:00
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
2018-07-11 20:20:00 +03:00
2018-07-12 21:07:57 +03:00
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
2018-07-11 18:20:18 +03:00
pkt.m_nBodySize += 16
}
2018-07-05 09:55:46 +03:00
} else {
2018-07-12 21:07:57 +03:00
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
2018-07-05 09:55:46 +03:00
}
2018-07-14 08:22:54 +03:00
// TODO: Port this
2018-07-11 18:20:18 +03:00
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
2018-07-05 09:55:46 +03:00
return 0
}
2018-06-27 21:02:16 +03:00
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
2018-07-12 21:07:57 +03:00
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
2018-07-18 05:26:32 +03:00
enc = unsafe.Pointer(afmEncodeString((*byte)(enc), (*byte)(pend), &setDataFrame))
2018-07-16 18:38:31 +03:00
pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) -
2018-07-12 21:07:57 +03:00
uintptr(unsafe.Pointer(pkt.m_body)))))
2018-07-05 09:55:46 +03:00
}
2018-07-12 22:51:17 +03:00
2018-07-05 09:55:46 +03:00
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
2018-07-05 09:55:46 +03:00
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
2018-07-05 09:55:46 +03:00
if num > s2 {
num = s2
}
2018-07-14 15:40:01 +03:00
//memmove(enc,buf,uintptr(num))
copy(ptrToSlice(enc, num), ptrToSlice(buf, num))
2018-07-16 18:38:31 +03:00
pkt.m_nBytesRead += C.uint32_t(num)
2018-07-05 09:55:46 +03:00
s2 -= num
buf = incBytePtr(buf, num)
2018-07-05 09:55:46 +03:00
if pkt.m_nBytesRead == pkt.m_nBodySize {
2018-07-14 08:22:54 +03:00
// TODO: Port this
ret = sendPacket(r, pkt, 0)
// TODO: Port this
C.RTMPPacket_Free(pkt)
2018-07-05 09:55:46 +03:00
pkt.m_nBytesRead = 0
2018-07-11 07:56:07 +03:00
if ret == 0 {
2018-07-05 09:55:46 +03:00
return -1
}
buf = incBytePtr(buf, 4)
2018-07-05 09:55:46 +03:00
s2 -= 4
if s2 < 0 {
break
}
}
}
2018-07-11 18:20:18 +03:00
return size + s2
2018-06-27 21:02:16 +03:00
}
2018-07-05 09:55:46 +03:00
// afmDecodeInt24 decodes data into an unsigned int
func afmDecodeInt24(data *byte) uint32 {
dataPtr := unsafe.Pointer(data)
return (uint32)(*data)<<16 | *(*uint32)(incBytePtr(dataPtr, 1))<<8 | *(*uint32)(incBytePtr(dataPtr, 2))
}
2018-07-18 05:32:02 +03:00
func afmEncodeString(output *byte, outend *byte, bv *C.AVal) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) > uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) > uintptr(outendPtr) {
return nil
}
if bv.av_len < 65536 {
*(*C.char)(outputPtr) = C.AMF_STRING
incBytePtr(outputPtr, 1)
// TODO Encode Int16
outputPtr = unsafe.Pointer(C.AMF_EncodeInt16((*C.char)(outputPtr), (*C.char)(outendPtr), (C.short)(bv.av_len)))
} else {
*(*C.char)(outputPtr) = C.AMF_LONG_STRING
incBytePtr(outputPtr, 1)
// TODO Encode Int16
outputPtr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(outputPtr), (*C.char)(outendPtr), bv.av_len))
}
2018-07-18 06:56:23 +03:00
C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len))
2018-07-18 05:32:02 +03:00
incBytePtr(outputPtr, int(bv.av_len))
return (*byte)(outputPtr)
}
// send packet version 1 - less C stuff
2018-07-16 06:46:40 +03:00
func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
2018-07-15 10:48:24 +03:00
var prevPacket *C.RTMPPacket
2018-07-11 20:20:00 +03:00
last := 0
2018-07-13 19:38:59 +03:00
var nSize, hSize, cSize, nChunkSize, tlen int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
2018-07-16 06:46:40 +03:00
var goHbuf [C.RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
2018-07-13 19:38:59 +03:00
var t int32
var packets unsafe.Pointer
2018-07-14 09:06:58 +03:00
if packet.m_nChannel >= r.m_channelsAllocatedOut {
log.Println("Resize")
2018-07-18 05:26:32 +03:00
n := int(packet.m_nChannel + 10)
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut),
2018-07-18 05:26:32 +03:00
C.size_t(unsafe.Sizeof(packet)*uintptr(n)))
2018-07-14 09:06:58 +03:00
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
2018-07-14 09:06:58 +03:00
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
2018-07-18 05:26:32 +03:00
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)*
uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
2018-07-14 09:06:58 +03:00
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
// TODO: port this constant
packet.m_headerType = C.RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = packetSize[int(packet.m_headerType)]
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize)
hend = unsafe.Pointer(packet.m_body)
} else {
2018-07-18 05:26:32 +03:00
header = incBytePtr(hbuf, 6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
2018-07-18 05:26:32 +03:00
hend = incBytePtr(hbuf, C.RTMP_MAX_HEADER_SIZE)
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, 4)
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = c
2018-07-18 05:26:32 +03:00
hptr = incBytePtr(hptr, 1)
if cSize != 0 {
tmp := packet.m_nChannel - 64
2018-07-18 05:26:32 +03:00
*(*byte)(hptr) = byte(tmp & 0xff)
hptr = incBytePtr(hptr, 1)
if cSize == 2 {
*(*byte)(hptr) = byte(tmp >> 8)
2018-07-18 05:26:32 +03:00
hptr = incBytePtr(hptr, 1)
}
}
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
2018-07-18 05:26:32 +03:00
hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend), C.int(res)))
}
if nSize > 4 {
2018-07-18 05:26:32 +03:00
hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend),
C.int(packet.m_nBodySize)))
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = incBytePtr(hptr, 1)
}
if nSize > 8 {
2018-07-16 18:38:31 +03:00
hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr),
C.int(packet.m_nInfoField2))))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(hptr), (*C.char)(hend), C.int(t)))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("sendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
// send all chunks in one HTTP request
// TODO: port RTMP_FEATURE_HTTP
2018-07-18 05:26:32 +03:00
if int(r.Link.protocol&C.RTMP_FEATURE_HTTP) != 0 {
chunks := (nSize + nChunkSize - 1) / nChunkSize
if chunks > 1 {
2018-07-18 05:26:32 +03:00
tlen = chunks*(cSize+1) + nSize + hSize
// TODO: figure out how to do this in go
2018-07-16 18:38:31 +03:00
tbuf = C.malloc(C.size_t(tlen))
if tbuf == nil {
return 0
}
toff = tbuf
}
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
if tbuf != nil {
//memmove(toff, header, uintptr(nChunkSize + hSize))
copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header,
int(nChunkSize+hSize)))
2018-07-18 05:26:32 +03:00
toff = incBytePtr(toff, nChunkSize+hSize)
} else {
// TODO: port this
2018-07-17 13:05:25 +03:00
wrote = int(writeN(r, header, nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
2018-07-18 05:26:32 +03:00
buffer = incBytePtr(buffer, nChunkSize)
hSize = 0
if nSize > 0 {
header = decBytePtr(buffer, 1)
hSize = 1
if cSize != 0 {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, 4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
2018-07-18 05:26:32 +03:00
*indxBytePtr(header, 1) = byte(tmp & 0xff)
if cSize == 2 {
2018-07-18 05:26:32 +03:00
*indxBytePtr(header, 2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
2018-07-18 05:26:32 +03:00
extendedTimestamp := incBytePtr(header, 1+cSize)
// TODO: port this
C.AMF_EncodeInt32((*C.char)(extendedTimestamp),
2018-07-18 05:26:32 +03:00
(*C.char)(incBytePtr(extendedTimestamp, 4)), C.int(t))
}
}
}
if tbuf != nil {
// TODO: port C.writeN
2018-07-17 13:05:25 +03:00
wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff,
int(uintptr(unsafe.Pointer(tbuf))))))))
C.free(tbuf)
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
if packet.m_packetType == C.RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
2018-07-15 10:48:24 +03:00
var method C.AVal
var ptr unsafe.Pointer
2018-07-18 05:26:32 +03:00
ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1)
afmDecodeString((*byte)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
2018-07-18 05:26:32 +03:00
ptr = incBytePtr(ptr, 3+int(method.av_len))
// TODO: port this
txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
// TODO: port this
C.AV_queue(&r.m_methodCalls, &r.m_numCalls, &method, C.int(txn))
}
}
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil {
2018-07-18 05:26:32 +03:00
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) =
(*C.RTMPPacket)(C.malloc(C.size_t(unsafe.Sizeof(*packet))))
}
2018-07-14 09:06:58 +03:00
//memmove(incPtr(unsafe.Pointer(r.m_vecChannelsOut),int(packet.m_nChannel),
2018-07-18 05:26:32 +03:00
//int(unsafe.Sizeof(packet))),unsafe.Pointer(packet), unsafe.Sizeof(packet))
C.memcpy(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
C.size_t(uintptr(unsafe.Sizeof(*packet))))
return 1
}
// afmDecodeString decodes data into a string inside a AVal
func afmDecodeString(data *byte, bv *C.AVal) {
dataPtr := unsafe.Pointer(data)
bv.av_len = C.int(C.AMF_DecodeInt16((*C.char)(dataPtr)))
if bv.av_len > 0 {
bv.av_val = (*C.char)(incBytePtr(dataPtr, 2))
} else {
bv.av_val = nil
}
}
2018-07-15 10:48:24 +03:00
func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
2018-07-17 13:05:25 +03:00
if (r.Link.protocol & C.RTMP_FEATURE_HTTP) != 0 {
2018-07-15 10:48:24 +03:00
// TODO: port HTTP_POST
2018-07-18 06:56:23 +03:00
nBytes = httpPost(r, RTMPT_SEND, (*byte)(ptr), n)
2018-07-15 10:48:24 +03:00
} else {
// TODO: port this if necessary
2018-07-17 13:05:25 +03:00
nBytes = int(C.RTMPSockBuf_Send(&r.m_sb, (*C.char)(ptr), C.int(n)))
2018-07-15 10:48:24 +03:00
}
if nBytes < 0 {
if debugMode {
2018-07-17 13:05:25 +03:00
log.Println("WriteN, RTMP send error")
2018-07-15 10:48:24 +03:00
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
2018-07-17 13:05:25 +03:00
ptr = incBytePtr(ptr, nBytes)
2018-07-15 10:48:24 +03:00
}
2018-07-17 13:05:25 +03:00
if n == 0 {
return 1
}
return 0
2018-07-15 10:48:24 +03:00
}
2018-07-18 06:56:23 +03:00
const length = 512
var RTMPT_cmds = []string{
"open",
"send",
"idle",
"close",
}
func httpPost(r *C.RTMP, cmd C.RTMPTCmd, buf *byte, l int) int {
res := ""
if r.m_clientID.av_val != nil {
res = string(ptrToSlice(unsafe.Pointer(r.m_clientID.av_val),
int(r.m_clientID.av_len)))
}
fString := fmt.Sprintf("POST /%s%s/%d HTTP/1.1\r\n"+
"Host: %v:%d\r\n"+
"Accept: */*\r\n"+
"User-Agent: Shockwave Flash\r\n"+
"Connection: Keep-Alive\r\n"+
"Cache-Control: no-cache\r\n"+
"Content-type: application/x-fcs\r\n"+
"Content-length: %d\r\n\r\n", RTMPT_cmds[cmd], res, r.m_msgCounter,
r.Link.hostname.av_len, r.Link.hostname.av_val, r.Link.port, l)
hlen := len(fString)
2018-07-18 06:56:23 +03:00
hbuf := (*byte)(unsafe.Pointer(&(([]byte(fString))[0])))
// TODO: port this
2018-07-18 06:56:23 +03:00
C.RTMPSockBuf_Send(&r.m_sb, (*C.char)(unsafe.Pointer(hbuf)), C.int(hlen))
hlen = int(C.RTMPSockBuf_Send(&r.m_sb, (*C.char)(unsafe.Pointer(buf)), C.int(l)))
r.m_msgCounter++
r.m_unackd++
return hlen
}
2018-07-18 05:32:02 +03:00
// TODO: port RTMP_METHOD
2018-07-18 06:56:23 +03:00
func avQueue(vals **C.RTMP_METHOD, num *int, av *C.AVal, txn int) {
2018-07-18 05:11:44 +03:00
var rtmpMethodPtr *C.RTMP_METHOD
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
2018-07-18 06:56:23 +03:00
*vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)*int(unsafe.Sizeof(*(*vals))))))
}
2018-07-18 05:11:44 +03:00
tmp := unsafe.Pointer(C.malloc(C.size_t(av.av_len + 1)))
C.memcpy(tmp, unsafe.Pointer(av.av_val), C.size_t(av.av_len))
2018-07-18 06:56:23 +03:00
*indxBytePtr(tmp, int(av.av_len)) = *(*byte)(unsafe.Pointer(C.CString("")))
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
2018-07-18 05:11:44 +03:00
int(unsafe.Sizeof(rtmpMethodPtr))))).num = C.int(txn)
2018-07-18 06:56:23 +03:00
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
2018-07-18 05:11:44 +03:00
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_len = av.av_len
2018-07-18 06:56:23 +03:00
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
2018-07-18 05:11:44 +03:00
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_val = (*C.char)(tmp)
}
2018-07-18 05:32:02 +03:00
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) *byte {
return (*byte)(incPtr(ptr, inc, byteSize))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 {
return (*int32)(incPtr(ptr, inc, int32Size))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 {
return (*int64)(incPtr(ptr, inc, int64Size))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return incPtr(ptr, inc, byteSize)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return incPtr(ptr, inc, int32Size)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return incPtr(ptr, inc, int64Size)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return decPtr(ptr, dec, byteSize)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return decPtr(ptr, dec, int32Size)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return decPtr(ptr, dec, int64Size)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
2018-07-18 18:24:36 +03:00
// C.AVal is in amf.h
// See AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
2018-06-20 07:26:40 +03:00
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}