av/rtmp/rtmp.go

1010 lines
26 KiB
Go
Raw Normal View History

/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Jake Lane <jake@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
2018-07-19 10:13:37 +03:00
#cgo LDFLAGS: -L/usr/local/lib -lrtmp
#include <stdlib.h>
#include <string.h>
#include <rtmp.h>
2018-07-19 08:53:06 +03:00
#include <sys/socket.h>
2018-07-17 13:05:25 +03:00
typedef enum {
RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE
} RTMPTCmd;
2018-05-30 09:22:33 +03:00
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
2018-07-16 06:46:40 +03:00
void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn);
int WriteN(RTMP *r, const char *buffer, int n);
int EncodeInt32LE(char *output, int nVal);
2018-07-17 13:05:25 +03:00
int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len);
*/
import "C"
import (
"errors"
"fmt"
"log"
"math"
2018-07-14 08:19:21 +03:00
"reflect"
"strconv"
"unsafe"
)
2018-07-17 13:05:25 +03:00
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_SHARED_OBJECT = 0x13
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
RTMP_MAX_HEADER_SIZE = 18
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
RTMP_PROTOCOL_UNDEFINED = -1
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_CHANNELS = 65600
RTMP_SWF_HASHLEN = 32
2018-07-18 18:24:36 +03:00
)
const (
minDataSize = 11
debugMode = false
)
const (
byteSize = 1
int32Size = 4
int64Size = 8
)
// memmove copies n bytes from "from" to "to".
//go:linkname memmove runtime.memmove
//func memmove(to, from unsafe.Pointer, n uintptr)
// av_setDataFrame is a static const global in rtmp.c
var setDataFrame = AVC("@setDataFrame")
2018-07-18 05:26:32 +03:00
var packetSize = [...]int{12, 8, 4, 1}
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
2018-06-17 14:15:48 +03:00
Open() error
Write([]byte) (int, error)
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
2018-07-11 18:20:18 +03:00
rtmp *C.RTMP
url string
timeout uint
}
2018-07-18 18:10:36 +03:00
type RTMP struct {
m_inChunkSize int
m_outChunkSize int
m_nBWCheckCounter int
m_nBytesIn int
m_nBytesInSent int
m_nBufferMS int
m_stream_id int
m_mediaChannel int
m_mediaStamp uint32
m_pauseStamp uint32
m_pausing int
m_nServerBw int
m_nClientBw int
m_nClientBw2 uint8
m_bPlaying uint8
m_bSendEncoding uint8
m_bSendCounter uint8
m_numInvokes int
m_numCalls int
2018-07-18 18:24:36 +03:00
m_methodCalls *RTMP_METHOD
2018-07-18 18:10:36 +03:00
m_channelsAllocatedIn int
m_channelsAllocatedOut int
2018-07-18 18:15:01 +03:00
m_vecChannelsIn **RTMPPacket
m_vecChannelsOut **RTMPPacket
2018-07-18 18:10:36 +03:00
m_channelTimestamp *int
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int
m_polling int
m_resplen int
m_unackd int
2018-07-18 18:24:36 +03:00
m_clientID AVal
2018-07-18 18:33:59 +03:00
m_read RTMP_READ
m_write RTMPPacket
2018-07-18 18:54:47 +03:00
m_sb RTMPSockBuf
2018-07-18 18:57:56 +03:00
Link RTMP_LNK
2018-07-18 18:10:36 +03:00
}
2018-07-18 18:13:59 +03:00
type RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp uint8
m_nChannel int
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
2018-07-18 18:54:47 +03:00
m_chunk *RTMPChunk
2018-07-18 18:13:59 +03:00
m_body *byte
}
2018-07-18 18:15:01 +03:00
type RTMP_METHOD struct {
2018-07-18 18:24:36 +03:00
name AVal
2018-07-18 18:15:01 +03:00
num int
}
2018-07-18 18:16:52 +03:00
type AVal struct {
av_val *byte
av_len int
}
2018-07-18 18:24:36 +03:00
type RTMP_READ struct {
buf *byte
bufpos *byte
buflen uint
timestamp uint32
dataType uint8
flags uint8
status int8
initialFrameType uint8
nResumeTS uint32
metaHeader *byte
initialFrame *byte
nMetaHeaderSize uint32
nInitialFrameSize uint32
nIgnoredFrameCounter uint32
nIgnoredFlvFrameCounter uint32
}
2018-07-18 18:33:59 +03:00
type RTMPSockBuf struct {
sb_socket int
sb_size int
sb_start *byte
sb_buf [C.RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout int
sb_ssl uintptr
}
2018-07-18 18:41:55 +03:00
type RTMPChunk struct {
c_headerSize int
c_chunkSize int
c_chunk *byte
2018-07-19 09:08:10 +03:00
c_header [RTMP_MAX_HEADER_SIZE]byte
2018-07-18 18:41:55 +03:00
}
2018-07-18 22:18:31 +03:00
type ushort [2]byte
2018-07-18 18:54:47 +03:00
type RTMP_LNK struct {
hostname AVal
sockshost AVal
playpath0 AVal
playpath AVal
tcUrl AVal
swfUrl AVal
pageUrl AVal
app AVal
auth AVal
flashVer AVal
subscribepath AVal
usherToken AVal
token AVal
pubUser AVal
pubPasswd AVal
2018-07-18 18:57:56 +03:00
extras AMFObject
2018-07-18 18:54:47 +03:00
edepth int
seekTime int
stopTime int
lFlags int
swfAge int
protocol int
timeout int
pFlags int
2018-07-18 22:18:31 +03:00
socksport ushort
port ushort
2018-07-18 18:54:47 +03:00
}
2018-07-18 18:57:56 +03:00
type AMFObject struct {
o_num int
o_props *C.AMFObjectProperty
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
2018-06-17 14:15:48 +03:00
// Open establishes an rtmp connection with the url passed into the
// constructor
2018-06-17 14:15:48 +03:00
func (s *session) Open() error {
2018-05-30 09:22:33 +03:00
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
}
2018-06-20 07:26:40 +03:00
var err error
2018-07-19 09:35:14 +03:00
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout))
2018-05-30 09:22:33 +03:00
if s.rtmp == nil {
2018-06-20 07:26:40 +03:00
return err
}
return nil
}
2018-07-19 09:35:14 +03:00
func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) {
url := C.CString(u)
connect_timeout := C.int(timeout)
rtmp = C.RTMP_Alloc()
C.RTMP_Init(rtmp)
rtmp.Link.timeout = connect_timeout
if C.RTMP_SetupURL(rtmp, url) == 0 {
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to setup URL!")
}
C.RTMP_EnableWrite(rtmp)
C.RTMP_SetBufferMS(rtmp, 3600*1000)
if C.RTMP_Connect(rtmp, nil) == 0 {
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect!")
}
if C.RTMP_ConnectStream(rtmp, 0) == 0 {
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return nil, errors.New("rtmp startSession: Failed to connect stream!")
}
return rtmp, nil
}
2018-07-15 21:09:36 +03:00
// Close terminates the rtmp connection
func (s *session) Close() error {
if s.rtmp == nil {
return Err(3)
}
2018-07-19 09:37:31 +03:00
ret := endSession(s.rtmp)
2018-07-15 21:09:36 +03:00
s.rtmp = nil
if ret != 0 {
return Err(ret)
}
return nil
}
2018-07-19 09:37:31 +03:00
func endSession(rtmp *C.RTMP) uint32 {
if rtmp == nil {
return 3
}
C.RTMP_Close(rtmp)
C.RTMP_Free(rtmp)
return 0
}
2018-07-11 20:20:00 +03:00
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
}
2018-07-13 14:00:52 +03:00
if C.RTMP_IsConnected(s.rtmp) == 0 {
2018-07-11 20:20:00 +03:00
return 0, Err(1)
}
2018-07-13 14:00:52 +03:00
//if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 {
if rtmpWrite(s.rtmp, data) == 0 {
2018-07-11 20:20:00 +03:00
return 0, Err(2)
}
return len(data), nil
}
2018-07-05 09:55:46 +03:00
2018-07-14 08:51:01 +03:00
// rtmpWrite writes data to the current rtmp connection encapsulated by r
func rtmpWrite(r *C.RTMP, data []byte) int {
2018-07-14 08:19:21 +03:00
buf := sliceToPtr(data)
2018-07-14 08:22:54 +03:00
// TODO: port RTMPPacket
2018-07-12 20:03:06 +03:00
var pkt = &r.m_write
var pend, enc unsafe.Pointer
size := len(data)
2018-07-10 12:15:34 +03:00
s2 := size
var ret, num int
2018-07-05 09:55:46 +03:00
pkt.m_nChannel = 0x04
2018-07-16 18:38:31 +03:00
pkt.m_nInfoField2 = C.int32_t(r.m_stream_id)
for s2 != 0 {
2018-07-10 12:15:34 +03:00
if pkt.m_nBytesRead == 0 {
if size < minDataSize {
2018-07-05 09:55:46 +03:00
log.Printf("size: %d\n", size)
log.Printf("too small \n")
return 0
}
2018-07-14 08:31:06 +03:00
2018-07-18 05:26:32 +03:00
if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' {
buf = unsafe.Pointer(uintptr(buf) + uintptr(13))
2018-07-05 09:55:46 +03:00
s2 -= 13
}
2018-07-18 05:26:32 +03:00
pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0))
buf = incBytePtr(buf, 1)
pkt.m_nBodySize = C.uint32_t(afmDecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
pkt.m_nTimeStamp = C.uint32_t(afmDecodeInt24((*byte)(buf)))
buf = incBytePtr(buf, 3)
2018-07-18 05:26:32 +03:00
pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24
buf = incBytePtr(buf, 4)
2018-07-05 09:55:46 +03:00
s2 -= 11
2018-07-12 21:07:57 +03:00
if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) &&
pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
2018-07-05 09:55:46 +03:00
2018-07-12 21:07:57 +03:00
pkt.m_headerType = RTMP_PACKET_SIZE_LARGE
2018-07-11 20:20:00 +03:00
2018-07-12 21:07:57 +03:00
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
2018-07-11 18:20:18 +03:00
pkt.m_nBodySize += 16
}
2018-07-05 09:55:46 +03:00
} else {
2018-07-12 21:07:57 +03:00
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
2018-07-05 09:55:46 +03:00
}
2018-07-14 08:22:54 +03:00
// TODO: Port this
2018-07-11 18:20:18 +03:00
if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 {
log.Println("Failed to allocate packet")
2018-07-05 09:55:46 +03:00
return 0
}
2018-06-27 21:02:16 +03:00
enc = unsafe.Pointer(pkt.m_body)
pend = incBytePtr(enc, int(pkt.m_nBodySize))
2018-07-12 21:07:57 +03:00
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
2018-07-18 05:26:32 +03:00
enc = unsafe.Pointer(afmEncodeString((*byte)(enc), (*byte)(pend), &setDataFrame))
2018-07-16 18:38:31 +03:00
pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) -
2018-07-12 21:07:57 +03:00
uintptr(unsafe.Pointer(pkt.m_body)))))
2018-07-05 09:55:46 +03:00
}
2018-07-12 22:51:17 +03:00
2018-07-05 09:55:46 +03:00
} else {
enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead))
2018-07-05 09:55:46 +03:00
}
num = int(pkt.m_nBodySize - pkt.m_nBytesRead)
2018-07-05 09:55:46 +03:00
if num > s2 {
num = s2
}
2018-07-14 15:40:01 +03:00
//memmove(enc,buf,uintptr(num))
copy(ptrToSlice(enc, num), ptrToSlice(buf, num))
2018-07-16 18:38:31 +03:00
pkt.m_nBytesRead += C.uint32_t(num)
2018-07-05 09:55:46 +03:00
s2 -= num
buf = incBytePtr(buf, num)
2018-07-05 09:55:46 +03:00
if pkt.m_nBytesRead == pkt.m_nBodySize {
2018-07-14 08:22:54 +03:00
// TODO: Port this
ret = sendPacket(r, pkt, 0)
// TODO: Port this
C.RTMPPacket_Free(pkt)
2018-07-05 09:55:46 +03:00
pkt.m_nBytesRead = 0
2018-07-11 07:56:07 +03:00
if ret == 0 {
2018-07-05 09:55:46 +03:00
return -1
}
buf = incBytePtr(buf, 4)
2018-07-05 09:55:46 +03:00
s2 -= 4
if s2 < 0 {
break
}
}
}
2018-07-11 18:20:18 +03:00
return size + s2
2018-06-27 21:02:16 +03:00
}
2018-07-05 09:55:46 +03:00
// afmDecodeInt24 decodes data into an unsigned int
func afmDecodeInt24(data *byte) uint32 {
// TODO Understand logic and simplify
c := (*uint8)(unsafe.Pointer(data))
dst := uint32(int32(*c) << 16)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(1))*unsafe.Sizeof(*c))))) << 8)
dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) +
(uintptr)(int32(2))*unsafe.Sizeof(*c))))))
return dst
}
2018-07-18 05:32:02 +03:00
func afmEncodeString(output *byte, outend *byte, bv *C.AVal) *byte {
outputPtr := unsafe.Pointer(output)
outendPtr := unsafe.Pointer(outend)
if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) >
uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) >
uintptr(outendPtr) {
2018-07-18 05:32:02 +03:00
return nil
}
if bv.av_len < 65536 {
*(*C.char)(outputPtr) = C.AMF_STRING
incBytePtr(outputPtr, 1)
// TODO Encode Int16
outputPtr = unsafe.Pointer(C.AMF_EncodeInt16((*C.char)(outputPtr),
(*C.char)(outendPtr), (C.short)(bv.av_len)))
2018-07-18 05:32:02 +03:00
} else {
*(*C.char)(outputPtr) = C.AMF_LONG_STRING
incBytePtr(outputPtr, 1)
// TODO Encode Int16
outputPtr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(outputPtr),
(*C.char)(outendPtr), bv.av_len))
2018-07-18 05:32:02 +03:00
}
2018-07-18 06:56:23 +03:00
C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len))
2018-07-18 05:32:02 +03:00
incBytePtr(outputPtr, int(bv.av_len))
return (*byte)(outputPtr)
}
// send packet version 1 - less C stuff
2018-07-16 06:46:40 +03:00
func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int {
2018-07-15 10:48:24 +03:00
var prevPacket *C.RTMPPacket
2018-07-11 20:20:00 +03:00
last := 0
2018-07-13 19:38:59 +03:00
var nSize, hSize, cSize, nChunkSize, tlen int
var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer
2018-07-19 09:08:10 +03:00
var goHbuf [RTMP_MAX_HEADER_SIZE]byte
var hbuf = unsafe.Pointer(&goHbuf[0])
var c byte
2018-07-13 19:38:59 +03:00
var t int32
var packets unsafe.Pointer
2018-07-14 09:06:58 +03:00
if packet.m_nChannel >= r.m_channelsAllocatedOut {
log.Println("Resize")
2018-07-18 05:26:32 +03:00
n := int(packet.m_nChannel + 10)
packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut),
2018-07-18 05:26:32 +03:00
C.size_t(unsafe.Sizeof(packet)*uintptr(n)))
2018-07-14 09:06:58 +03:00
if uintptr(packets) == uintptr(0) {
C.free(unsafe.Pointer(r.m_vecChannelsOut))
2018-07-14 09:06:58 +03:00
r.m_vecChannelsOut = nil
r.m_channelsAllocatedOut = 0
return 0
}
r.m_vecChannelsOut = (**C.RTMPPacket)(packets)
C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut),
2018-07-18 05:26:32 +03:00
int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)*
uintptr(n-int(r.m_channelsAllocatedOut))))
r.m_channelsAllocatedOut = C.int(n)
2018-07-14 09:06:58 +03:00
}
prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))
if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPacket.m_nBodySize == packet.m_nBodySize &&
prevPacket.m_packetType == packet.m_packetType &&
packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM {
packet.m_headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPacket.m_nTimeStamp == packet.m_nTimeStamp &&
packet.m_headerType == RTMP_PACKET_SIZE_SMALL {
// TODO: port this constant
2018-07-19 09:08:10 +03:00
packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPacket.m_nTimeStamp)
}
if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType)
return 0
}
nSize = packetSize[int(packet.m_headerType)]
hSize = nSize
cSize = 0
t = int32(int(packet.m_nTimeStamp) - last)
if packet.m_body != nil {
header = decBytePtr(unsafe.Pointer(packet.m_body), nSize)
hend = unsafe.Pointer(packet.m_body)
} else {
2018-07-18 05:26:32 +03:00
header = incBytePtr(hbuf, 6)
// TODO: be cautious about this sizeof - make sure it works how you think it
// does. C code used sizeof(hbuf) where hbuf is a *char
2018-07-19 09:08:10 +03:00
hend = incBytePtr(hbuf, RTMP_MAX_HEADER_SIZE)
}
switch {
case packet.m_nChannel > 319:
cSize = 2
case packet.m_nChannel > 63:
cSize = 1
}
if cSize != 0 {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, 4)
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", t)
}
hptr = header
c = byte(packet.m_headerType) << 6
switch cSize {
case 0:
c |= byte(packet.m_nChannel)
case 1:
case 2:
c |= byte(1)
}
*(*byte)(hptr) = c
2018-07-18 05:26:32 +03:00
hptr = incBytePtr(hptr, 1)
if cSize != 0 {
tmp := packet.m_nChannel - 64
2018-07-18 05:26:32 +03:00
*(*byte)(hptr) = byte(tmp & 0xff)
hptr = incBytePtr(hptr, 1)
if cSize == 2 {
*(*byte)(hptr) = byte(tmp >> 8)
2018-07-18 05:26:32 +03:00
hptr = incBytePtr(hptr, 1)
}
}
if nSize > 1 {
res := t
if t > 0xffffff {
res = 0xffffff
}
2018-07-18 05:26:32 +03:00
hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend), C.int(res)))
}
if nSize > 4 {
2018-07-18 05:26:32 +03:00
hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend),
C.int(packet.m_nBodySize)))
*(*byte)(hptr) = byte(packet.m_packetType)
hptr = incBytePtr(hptr, 1)
}
if nSize > 8 {
2018-07-16 18:38:31 +03:00
hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr),
C.int(packet.m_nInfoField2))))
}
if t >= 0xffffff {
hptr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(hptr), (*C.char)(hend), C.int(t)))
}
nSize = int(packet.m_nBodySize)
buffer = unsafe.Pointer(packet.m_body)
nChunkSize = int(r.m_outChunkSize)
if debugMode {
log.Printf("sendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize)
}
// send all chunks in one HTTP request
// TODO: port RTMP_FEATURE_HTTP
2018-07-19 09:08:10 +03:00
if int(r.Link.protocol&RTMP_FEATURE_HTTP) != 0 {
2018-07-18 05:26:32 +03:00
chunks := (nSize + nChunkSize - 1) / nChunkSize
if chunks > 1 {
2018-07-18 05:26:32 +03:00
tlen = chunks*(cSize+1) + nSize + hSize
// TODO: figure out how to do this in go
2018-07-16 18:38:31 +03:00
tbuf = C.malloc(C.size_t(tlen))
if tbuf == nil {
return 0
}
toff = tbuf
}
}
for (nSize + hSize) != 0 {
var wrote int
if nSize < nChunkSize {
nChunkSize = nSize
}
if tbuf != nil {
//memmove(toff, header, uintptr(nChunkSize + hSize))
copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header,
int(nChunkSize+hSize)))
2018-07-18 05:26:32 +03:00
toff = incBytePtr(toff, nChunkSize+hSize)
} else {
// TODO: port this
2018-07-17 13:05:25 +03:00
wrote = int(writeN(r, header, nChunkSize+hSize))
if wrote == 0 {
return 0
}
}
nSize -= nChunkSize
2018-07-18 05:26:32 +03:00
buffer = incBytePtr(buffer, nChunkSize)
hSize = 0
if nSize > 0 {
header = decBytePtr(buffer, 1)
hSize = 1
if cSize != 0 {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, cSize)
hSize += cSize
}
if t >= 0xffffff {
2018-07-18 05:26:32 +03:00
header = decBytePtr(header, 4)
hSize += 4
}
*(*byte)(header) = byte(0xc0 | c)
if cSize != 0 {
tmp := int(packet.m_nChannel) - 64
2018-07-18 05:26:32 +03:00
*indxBytePtr(header, 1) = byte(tmp & 0xff)
if cSize == 2 {
2018-07-18 05:26:32 +03:00
*indxBytePtr(header, 2) = byte(tmp >> 8)
}
}
if t >= 0xffffff {
2018-07-18 05:26:32 +03:00
extendedTimestamp := incBytePtr(header, 1+cSize)
// TODO: port this
C.AMF_EncodeInt32((*C.char)(extendedTimestamp),
2018-07-18 05:26:32 +03:00
(*C.char)(incBytePtr(extendedTimestamp, 4)), C.int(t))
}
}
}
if tbuf != nil {
// TODO: port C.writeN
2018-07-17 13:05:25 +03:00
wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff,
int(uintptr(unsafe.Pointer(tbuf))))))))
C.free(tbuf)
tbuf = nil
if wrote == 0 {
return 0
}
}
// We invoked a remote method
// TODO: port the const
2018-07-19 09:08:10 +03:00
if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE {
// TODO: port C.AVal
2018-07-15 10:48:24 +03:00
var method C.AVal
var ptr unsafe.Pointer
2018-07-18 05:26:32 +03:00
ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1)
afmDecodeString((*byte)(ptr), &method)
if debugMode {
log.Printf("Invoking %v", method.av_val)
}
// keep it in call queue till result arrives
if queue != 0 {
var txn int
2018-07-18 05:26:32 +03:00
ptr = incBytePtr(ptr, 3+int(method.av_len))
// TODO: port this
txn = int(C.AMF_DecodeNumber((*C.char)(ptr)))
// TODO: port this
C.AV_queue(&r.m_methodCalls, &r.m_numCalls, &method, C.int(txn))
}
}
if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil {
2018-07-18 05:26:32 +03:00
*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) =
(*C.RTMPPacket)(C.malloc(C.size_t(unsafe.Sizeof(*packet))))
}
2018-07-14 09:06:58 +03:00
//memmove(incPtr(unsafe.Pointer(r.m_vecChannelsOut),int(packet.m_nChannel),
2018-07-18 05:26:32 +03:00
//int(unsafe.Sizeof(packet))),unsafe.Pointer(packet), unsafe.Sizeof(packet))
C.memcpy(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut),
int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet),
C.size_t(uintptr(unsafe.Sizeof(*packet))))
return 1
}
// afmDecodeString decodes data into a string inside a AVal
func afmDecodeString(data *byte, bv *C.AVal) {
dataPtr := unsafe.Pointer(data)
2018-07-19 09:52:35 +03:00
bv.av_len = C.int(afmDecodeInt16((*byte)(dataPtr)))
if bv.av_len > 0 {
bv.av_val = (*C.char)(incBytePtr(dataPtr, 2))
} else {
bv.av_val = nil
}
}
2018-07-19 09:52:35 +03:00
// afmDecodeInt16 decodes data into a 16 bit number
func afmDecodeInt16(data *byte) uint16 {
c := unsafe.Pointer(data)
return (uint16(*(*uint8)(c)) << 8) | *(*uint16)(incBytePtr(c, 1))
}
2018-07-15 10:48:24 +03:00
func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int {
ptr := buffer
for n > 0 {
var nBytes int
2018-07-19 09:08:10 +03:00
if (r.Link.protocol & RTMP_FEATURE_HTTP) != 0 {
2018-07-15 10:48:24 +03:00
// TODO: port HTTP_POST
2018-07-18 06:56:23 +03:00
nBytes = httpPost(r, RTMPT_SEND, (*byte)(ptr), n)
2018-07-15 10:48:24 +03:00
} else {
// TODO: port this if necessary
nBytes = int(sockBufSend(&r.m_sb, (*byte)(ptr), int32(n)))
2018-07-15 10:48:24 +03:00
}
if nBytes < 0 {
if debugMode {
2018-07-17 13:05:25 +03:00
log.Println("WriteN, RTMP send error")
2018-07-15 10:48:24 +03:00
}
// TODO: port this
C.RTMP_Close(r)
n = 1
break
}
if nBytes == 0 {
break
}
n -= nBytes
2018-07-17 13:05:25 +03:00
ptr = incBytePtr(ptr, nBytes)
2018-07-15 10:48:24 +03:00
}
2018-07-17 13:05:25 +03:00
if n == 0 {
return 1
}
return 0
2018-07-15 10:48:24 +03:00
}
2018-07-18 06:56:23 +03:00
const length = 512
var RTMPT_cmds = []string{
"open",
"send",
"idle",
"close",
}
func httpPost(r *C.RTMP, cmd C.RTMPTCmd, buf *byte, l int) int {
res := ""
if r.m_clientID.av_val != nil {
res = string(ptrToSlice(unsafe.Pointer(r.m_clientID.av_val),
int(r.m_clientID.av_len)))
}
fString := fmt.Sprintf("POST /%s%s/%d HTTP/1.1\r\n"+
"Host: %v:%d\r\n"+
"Accept: */*\r\n"+
"User-Agent: Shockwave Flash\r\n"+
"Connection: Keep-Alive\r\n"+
"Cache-Control: no-cache\r\n"+
"Content-type: application/x-fcs\r\n"+
"Content-length: %d\r\n\r\n", RTMPT_cmds[cmd], res, r.m_msgCounter,
r.Link.hostname.av_len, r.Link.hostname.av_val, r.Link.port, l)
hlen := len(fString)
2018-07-18 06:56:23 +03:00
hbuf := (*byte)(unsafe.Pointer(&(([]byte(fString))[0])))
// TODO: port this
sockBufSend(&r.m_sb, (*byte)(unsafe.Pointer(hbuf)), int32(hlen))
2018-07-19 08:55:10 +03:00
hlen = int(sockBufSend(&r.m_sb, (*byte)(unsafe.Pointer(buf)), int32(l)))
r.m_msgCounter++
r.m_unackd++
return hlen
}
2018-07-18 05:32:02 +03:00
2018-07-19 08:53:06 +03:00
func sockBufSend(sb *C.RTMPSockBuf, buf *byte, l int32) int32 {
return int32(C.send(sb.sb_socket, unsafe.Pointer(buf), C.size_t(l), 0))
}
// TODO: port RTMP_METHOD
2018-07-18 06:56:23 +03:00
func avQueue(vals **C.RTMP_METHOD, num *int, av *C.AVal, txn int) {
2018-07-18 05:11:44 +03:00
var rtmpMethodPtr *C.RTMP_METHOD
if (*num & 0x0f) == 0 {
// TODO: work out what to do with the realloc
2018-07-18 06:56:23 +03:00
*vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)*int(unsafe.Sizeof(*(*vals))))))
}
2018-07-18 05:11:44 +03:00
tmp := unsafe.Pointer(C.malloc(C.size_t(av.av_len + 1)))
C.memcpy(tmp, unsafe.Pointer(av.av_val), C.size_t(av.av_len))
2018-07-18 06:56:23 +03:00
*indxBytePtr(tmp, int(av.av_len)) = *(*byte)(unsafe.Pointer(C.CString("")))
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
2018-07-18 05:11:44 +03:00
int(unsafe.Sizeof(rtmpMethodPtr))))).num = C.int(txn)
2018-07-18 06:56:23 +03:00
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
2018-07-18 05:11:44 +03:00
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_len = av.av_len
2018-07-18 06:56:23 +03:00
(*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num,
2018-07-18 05:11:44 +03:00
int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_val = (*C.char)(tmp)
}
2018-07-18 05:32:02 +03:00
// indxBytePtr returns a byte at the indx inc give a ptr
func indxBytePtr(ptr unsafe.Pointer, inc int) *byte {
return (*byte)(incPtr(ptr, inc, byteSize))
}
// indxInt32Ptr returns an int32 at the indx inc given a ptr
func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 {
return (*int32)(incPtr(ptr, inc, int32Size))
}
// indxInt64Ptr returns an int64 at the indx inc given a ptr
func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 {
return (*int64)(incPtr(ptr, inc, int64Size))
}
// incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions
// from the passed ptr
func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return incPtr(ptr, inc, byteSize)
}
// incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive
// positions from the passed ptr
func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return incPtr(ptr, inc, int32Size)
}
// incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive
// positions from the passed ptr
func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return incPtr(ptr, inc, int64Size)
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize))
}
// incPtr attempts to replicate C like pointer arithmatic functionality
func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize))
}
// decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions
// from ptr
func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return decPtr(ptr, dec, byteSize)
}
// decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions
// from ptr
func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return decPtr(ptr, dec, int32Size)
}
// decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions
// from ptr
func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer {
2018-07-18 05:26:32 +03:00
return decPtr(ptr, dec, int64Size)
}
// sliceToPtr get's the address of the first data element and returns as unsafe
// pointer
func sliceToPtr(data []byte) unsafe.Pointer {
return unsafe.Pointer(&data[0])
}
// ptrToSlice returns a slice given unsafe pointer and size - no allocation and
// copying is required - same data is used.
func ptrToSlice(data unsafe.Pointer, size int) []byte {
var ret []byte
shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
shDest.Data = uintptr(data)
shDest.Len = size
shDest.Cap = size
return ret
}
2018-07-18 18:24:36 +03:00
// C.AVal is in amf.h
// See AVC(str) {str, sizeof(str)-1} in amf.h
func AVC(str string) C.AVal {
var aval C.AVal
aval.av_val = C.CString(str)
aval.av_len = C.int(len(str))
return aval
}
2018-06-20 07:26:40 +03:00
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}