updating this branch with master

Merge branch 'master' into psi-pr-cleanup
This commit is contained in:
saxon 2019-01-11 13:15:23 +10:30
commit 4e62606622
11 changed files with 1080 additions and 1459 deletions

View File

@ -289,10 +289,7 @@ func (r *Revid) reset(config Config) error {
r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate))
case Flv:
r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
if err != nil {
return err
}
r.encoder = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
}
return nil

View File

@ -9,7 +9,7 @@ AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
amf_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the

494
rtmp/packet.go Normal file
View File

@ -0,0 +1,494 @@
/*
NAME
packet.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"encoding/binary"
"log"
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const (
RTMP_CHANNEL_BYTES_READ = 0x02
RTMP_CHANNEL_CONTROL = 0x03
RTMP_CHANNEL_SOURCE = 0x04
)
// packetSize defines valid packet sizes.
var packetSize = [...]int{12, 8, 4, 1}
// packet defines an RTMP packet.
type packet struct {
headerType uint8
packetType uint8
channel int32
hasAbsTimestamp bool
timestamp uint32
info int32
bodySize uint32
bytesRead uint32
chunk *chunk
header []byte
body []byte
}
// chunk defines an RTMP packet chunk.
type chunk struct {
headerSize int32
data []byte
header [RTMP_MAX_HEADER_SIZE]byte
}
// ToDo: Consider making the following functions into methods.
// readPacket reads a packet.
func readPacket(s *Session, pkt *packet) error {
var hbuf [RTMP_MAX_HEADER_SIZE]byte
header := hbuf[:]
err := readN(s, header[:1])
if err != nil {
log.Println("readPacket: failed to read RTMP packet header!")
return err
}
pkt.headerType = (header[0] & 0xc0) >> 6
pkt.channel = int32(header[0] & 0x3f)
header = header[1:]
switch {
case pkt.channel == 0:
err = readN(s, header[:1])
if err != nil {
log.Println("readPacket: failed to read rtmp packet header 2nd byte.")
return err
}
header = header[1:]
pkt.channel = int32(header[0]) + 64
case pkt.channel == 1:
err = readN(s, header[:2])
if err != nil {
log.Println("readPacket: failed to read RTMP packet 3rd byte")
return err
}
header = header[2:]
pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64
}
if pkt.channel >= s.channelsAllocatedIn {
n := pkt.channel + 10
timestamp := append(s.channelTimestamp, make([]int32, 10)...)
var pkts []*packet
if s.vecChannelsIn == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.channelTimestamp = timestamp
s.vecChannelsIn = pkts
for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ {
s.channelTimestamp[i] = 0
}
for i := int(s.channelsAllocatedIn); i < int(n); i++ {
s.vecChannelsIn[i] = nil
}
s.channelsAllocatedIn = n
}
size := packetSize[pkt.headerType]
switch {
case size == RTMP_LARGE_HEADER_SIZE:
pkt.hasAbsTimestamp = true
case size < RTMP_LARGE_HEADER_SIZE:
if s.vecChannelsIn[pkt.channel] != nil {
*pkt = *(s.vecChannelsIn[pkt.channel])
}
}
size--
if size > 0 {
err = readN(s, header[:size])
if err != nil {
log.Println("readPacket: failed to read rtmp packet heades.")
return err
}
}
hSize := len(hbuf) - len(header) + size
if size >= 3 {
pkt.timestamp = C_AMF_DecodeInt24(header[:3])
if size >= 6 {
pkt.bodySize = C_AMF_DecodeInt24(header[3:6])
pkt.bytesRead = 0
if size > 6 {
pkt.packetType = header[6]
if size == 11 {
pkt.info = decodeInt32LE(header[7:11])
}
}
}
}
extendedTimestamp := pkt.timestamp == 0xffffff
if extendedTimestamp {
err = readN(s, header[size:size+4])
if err != nil {
log.Println("readPacket: Failed to read extended timestamp")
return err
}
// TODO: port this
pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4])
hSize += 4
}
if pkt.bodySize > 0 && pkt.body == nil {
resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6)
}
toRead := int32(pkt.bodySize - pkt.bytesRead)
chunkSize := s.inChunkSize
if toRead < chunkSize {
chunkSize = toRead
}
if pkt.chunk != nil {
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
}
err = readN(s, pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil {
log.Println("readPacket: failed to read RTMP packet body")
return err
}
pkt.bytesRead += uint32(chunkSize)
// keep the packet as ref for other packets on this channel
if s.vecChannelsIn[pkt.channel] == nil {
s.vecChannelsIn[pkt.channel] = &packet{}
}
*(s.vecChannelsIn[pkt.channel]) = *pkt
if extendedTimestamp {
s.vecChannelsIn[pkt.channel].timestamp = 0xffffff
}
if pkt.bytesRead != pkt.bodySize {
panic("readPacket: bytesRead != bodySize")
}
if !pkt.hasAbsTimestamp {
// timestamps seem to always be relative
pkt.timestamp += uint32(s.channelTimestamp[pkt.channel])
}
s.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
s.vecChannelsIn[pkt.channel].body = nil
s.vecChannelsIn[pkt.channel].bytesRead = 0
s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false
return nil
}
// resizePacket adjust the packet's storage to accommodate a body of the given size.
func resizePacket(pkt *packet, size uint32, ht uint8) {
buf := make([]byte, RTMP_MAX_HEADER_SIZE+size)
pkt.headerType = ht
pkt.header = buf
pkt.body = buf[RTMP_MAX_HEADER_SIZE:]
}
// sendPacket sends a packet.
func sendPacket(s *Session, pkt *packet, queue bool) error {
var prevPkt *packet
var last int
if pkt.channel >= s.channelsAllocatedOut {
n := int(pkt.channel + 10)
var pkts []*packet
if s.vecChannelsOut == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.vecChannelsOut = pkts
for i := int(s.channelsAllocatedOut); i < n; i++ {
s.vecChannelsOut[i] = nil
}
s.channelsAllocatedOut = int32(n)
}
prevPkt = s.vecChannelsOut[pkt.channel]
if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM {
pkt.headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL {
pkt.headerType = RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPkt.timestamp)
}
if pkt.headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
pkt.headerType)
return errInvalidHeader
}
var headBytes []byte
var origIdx int
if pkt.body != nil {
// Span from -packetsize for the type to the start of the body.
headBytes = pkt.header
origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType]
} else {
// Allocate a new header and allow 6 bytes of movement backward.
var hbuf [RTMP_MAX_HEADER_SIZE]byte
headBytes = hbuf[:]
origIdx = 6
}
var cSize int
switch {
case pkt.channel > 319:
cSize = 2
case pkt.channel > 63:
cSize = 1
}
hSize := packetSize[pkt.headerType]
if cSize != 0 {
origIdx -= cSize
hSize += cSize
}
var ts uint32
if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last)
}
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", ts)
}
headerIdx := origIdx
c := pkt.headerType << 6
switch cSize {
case 0:
c |= byte(pkt.channel)
case 1:
// Do nothing.
case 2:
c |= 1
}
headBytes[headerIdx] = c
headerIdx++
if cSize != 0 {
tmp := pkt.channel - 64
headBytes[headerIdx] = byte(tmp & 0xff)
headerIdx++
if cSize == 2 {
headBytes[headerIdx] = byte(tmp >> 8)
headerIdx++
}
}
if packetSize[pkt.headerType] > 1 {
res := ts
if ts > 0xffffff {
res = 0xffffff
}
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res))
headerIdx += 3 // 24bits
}
if packetSize[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType
headerIdx++
}
if packetSize[pkt.headerType] > 8 {
n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info))
headerIdx += n
}
if ts >= 0xffffff {
C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts))
headerIdx += 4 // 32bits
}
size := int(pkt.bodySize)
chunkSize := int(s.outChunkSize)
if debugMode {
log.Printf("sendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size)
}
// Send the previously deferred packet if combining it with the next packet would exceed the chunk size.
if s.defered != nil && len(s.defered)+size+hSize > chunkSize {
err := writeN(s, s.defered)
if err != nil {
return err
}
s.defered = nil
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
for size+hSize != 0 {
if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize {
s.defered = headBytes[origIdx:][:size+hSize]
break
}
if chunkSize > size {
chunkSize = size
}
bytes := headBytes[origIdx:][:chunkSize+hSize]
if s.defered != nil {
// Prepend the previously deferred packet and write it with the current one.
bytes = append(s.defered, bytes...)
}
err := writeN(s, bytes)
if err != nil {
return err
}
s.defered = nil
size -= chunkSize
origIdx += chunkSize + hSize
hSize = 0
if size > 0 {
origIdx -= 1 + cSize
hSize = 1 + cSize
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
}
headBytes[origIdx] = 0xc0 | c
if cSize != 0 {
tmp := int(pkt.channel) - 64
headBytes[origIdx+1] = byte(tmp)
if cSize == 2 {
headBytes[origIdx+2] = byte(tmp >> 8)
}
}
if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:]
C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts))
}
}
}
// We invoked a remote method
if pkt.packetType == RTMP_PACKET_TYPE_INVOKE {
buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf)
if debugMode {
log.Printf("invoking %v", meth)
}
// keep it in call queue till result arrives
if queue {
buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
}
}
if s.vecChannelsOut[pkt.channel] == nil {
s.vecChannelsOut[pkt.channel] = &packet{}
}
*(s.vecChannelsOut[pkt.channel]) = *pkt
return nil
}
func decodeInt32LE(data []byte) int32 {
return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0])
}
func encodeInt32LE(dst []byte, v int32) int32 {
binary.LittleEndian.PutUint32(dst, uint32(v))
return 4
}

View File

@ -40,13 +40,12 @@ import (
"strings"
)
// int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app);
// parseurl.c +33
func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, ok bool) {
// parseURL parses an RTMP URL (ok, technically it is lexing).
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
u, err := url.Parse(addr)
if err != nil {
log.Printf("failed to parse addr: %v", err)
return protocol, host, port, app, playpath, false
return protocol, host, port, app, playpath, err
}
switch u.Scheme {
@ -66,20 +65,20 @@ func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app
protocol = RTMP_PROTOCOL_RTMPTS
default:
log.Printf("unknown scheme: %q", u.Scheme)
return protocol, host, port, app, playpath, false
return protocol, host, port, app, playpath, errUnknownScheme
}
host = u.Host
if p := u.Port(); p != "" {
pi, err := strconv.Atoi(p)
if err != nil {
return protocol, host, port, app, playpath, false
return protocol, host, port, app, playpath, err
}
port = uint16(pi)
}
if !path.IsAbs(u.Path) {
return protocol, host, port, app, playpath, true
return protocol, host, port, app, playpath, nil
}
elems := strings.SplitN(u.Path[1:], "/", 3)
app = elems[0]
@ -99,5 +98,5 @@ func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app
}
}
return protocol, host, port, app, playpath, true
return protocol, host, port, app, playpath, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,11 @@ DESCRIPTION
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
rtmp_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
rtmp_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
@ -31,7 +33,9 @@ LICENSE
*/
package rtmp
import "net"
import (
"net"
)
const (
RTMPT_OPEN = iota
@ -40,28 +44,6 @@ const (
RTMPT_CLOSE
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const (
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
@ -112,111 +94,26 @@ const (
RTMP_MAX_HEADER_SIZE = 18
)
// typedef struct RTMPChunk
// rtmp.h +105
type C_RTMPChunk struct {
c_headerSize int32
c_chunk []byte
c_header [RTMP_MAX_HEADER_SIZE]byte
type link struct {
host string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras C_AMFObject
lFlags int32
swfAge int32
protocol int32
timeout uint
port uint16
conn *net.TCPConn
}
// typedef struct RTMPPacket
// rtmp.h +113
type C_RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp bool
m_nChannel int32
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *C_RTMPChunk
m_header []byte
m_body []byte
}
// typedef struct RTMPSockBuf
// rtmp.h +127
type C_RTMPSockBuf struct {
conn *net.TCPConn
timeout int32
sb_size int
sb_start int
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout bool
}
// RTMPPacket_IsReady(a)
// rtmp.h +142
func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool {
return p.m_nBytesRead == p.m_nBodySize
}
// typedef struct RTMP_LNK
// rtmp.h +144
type C_RTMP_LNK struct {
hostname string
sockshost string
playpath0 string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras C_AMFObject
seekTime int32
lFlags int32
swfAge int32
protocol int32
timeout int32
socksport uint16
port uint16
}
// typedef struct RTMPMethod
// rtmp.h +231
type C_RTMP_METHOD struct {
type method struct {
name string
num int32
}
// typedef struct RTMP
// rtmp.h +237
type C_RTMP struct {
m_inChunkSize int32
m_outChunkSize int32
m_nBWCheckCounter int32
m_nBytesIn int32
m_nBytesInSent int32
m_nBufferMS int32
m_stream_id int32
m_mediaChannel int32
m_pausing int32
m_nServerBW int32
m_nClientBW int32
m_nClientBW2 uint8
m_bPlaying bool
m_bSendEncoding bool
m_bSendCounter bool
m_numInvokes int32
m_methodCalls []C_RTMP_METHOD
m_channelsAllocatedIn int32
m_channelsAllocatedOut int32
m_vecChannelsIn []*C_RTMPPacket
m_vecChannelsOut []*C_RTMPPacket
m_channelTimestamp []int32
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int32
m_resplen int32
m_unackd int32
m_write C_RTMPPacket
m_sb C_RTMPSockBuf
Link C_RTMP_LNK
}

View File

@ -1,7 +0,0 @@
package rtmp
// #define SET_RCVTIMEO(tv,s) int tv = s*1000
// rtmp_sys.h +43
func SET_RCVTIMEO(tv *int32, s int32) {
*tv = s * 1000
}

View File

@ -8,9 +8,10 @@ DESCRIPTION
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
session.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
@ -36,14 +37,36 @@ import (
"errors"
)
// session provides parameters required for an rtmp communication session.
// Session holds the state for an RTMP session.
type Session struct {
rtmp *C_RTMP
url string
timeout uint
url string
timeout uint
inChunkSize int32
outChunkSize int32
bwCheckCounter int32
nBytesIn int32
nBytesInSent int32
streamID int32
serverBW int32
clientBW int32
clientBW2 uint8
isPlaying bool
sendEncoding bool
numInvokes int32
methodCalls []method
channelsAllocatedIn int32
channelsAllocatedOut int32
vecChannelsIn []*packet
vecChannelsOut []*packet
channelTimestamp []int32
audioCodecs float64
videoCodecs float64
encoding float64
defered []byte
link link
}
// NewSession returns a new session.
// NewSession returns a new Session.
func NewSession(url string, connectTimeout uint) *Session {
return &Session{
url: url,
@ -51,47 +74,98 @@ func NewSession(url string, connectTimeout uint) *Session {
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
// Open establishes an rtmp connection with the url passed into the constructor.
func (s *Session) Open() error {
if s.rtmp != nil {
if s.isConnected() {
return errors.New("rtmp: attempt to start already running session")
}
var err error
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout))
if s.rtmp == nil {
err := s.start()
if err != nil {
return err
}
return nil
}
// Close terminates the rtmp connection
func (s *Session) Close() error {
if s.rtmp == nil {
return Err(3)
// start does the heavylifting for Open().
func (s *Session) start() error {
s.init()
err := setupURL(s, s.url)
if err != nil {
s.close()
return err
}
ret := endSession(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
s.enableWrite()
err = connect(s)
if err != nil {
s.close()
return err
}
err = connectStream(s)
if err != nil {
s.close()
return err
}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
// init initializes various RTMP defauls.
// ToDo: define consts for the magic numbers.
func (s *Session) init() {
s.inChunkSize = RTMP_DEFAULT_CHUNKSIZE
s.outChunkSize = RTMP_DEFAULT_CHUNKSIZE
s.clientBW = 2500000
s.clientBW2 = 2
s.serverBW = 2500000
s.audioCodecs = 3191.0
s.videoCodecs = 252.0
s.link.timeout = s.timeout
s.link.swfAge = 30
}
// Close terminates the rtmp connection,
func (s *Session) Close() error {
if !s.isConnected() {
return errNotConnected
}
s.close()
return nil
}
// close does the heavylifting for Close().
// Any errors are ignored as it is often called in response to an earlier error.
func (s *Session) close() {
if s.isConnected() {
if s.streamID > 0 {
if s.link.protocol&RTMP_FEATURE_WRITE != 0 {
sendFCUnpublish(s)
}
sendDeleteStream(s, float64(s.streamID))
}
s.link.conn.Close()
}
*s = Session{}
}
// Write writes a frame (flv tag) to the rtmp connection.
func (s *Session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
if !s.isConnected() {
return 0, errNotConnected
}
if C_RTMP_IsConnected(s.rtmp) == 0 {
//if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
}
if C_RTMP_Write(s.rtmp, data) == 0 {
//if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 {
return 0, Err(2)
err := s.write(data)
if err != nil {
return 0, err
}
return len(data), nil
}
// isConnected returns true if the RTMP connection is up.
func (s *Session) isConnected() bool {
return s.link.conn != nil
}
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= RTMP_FEATURE_WRITE
}

View File

@ -1,146 +0,0 @@
/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"fmt"
"log"
"net"
"time"
)
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
if r.Link.hostname == "" {
return false
}
var hostname string
if r.Link.socksport != 0 {
hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport)
} else {
hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port)
}
addr, err := net.ResolveTCPAddr("tcp4", hostname)
if err != nil {
return false
}
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
return false
}
if r.Link.socksport != 0 {
if !C_SocksNegotiate(r, addr) {
return false
}
}
r.m_sb.timeout = r.Link.timeout
r.m_bSendCounter = true
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r);
// rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP, addr *net.TCPAddr) (ok bool) {
ip := addr.IP.To4()
packet := []byte{
0x4, // SOCKS version
0x1, // Establish a TCP/IP stream connection
byte(r.Link.port >> 8), byte(r.Link.port),
ip[0], ip[1], ip[2], ip[3],
0x0, // Empty user ID string
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0x0 && packet[1] == 0x5a {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
if sb.sb_size == 0 {
sb.sb_start = 0
}
err := sb.conn.SetReadDeadline(time.Now().Local().Add(time.Second * time.Duration(sb.timeout)))
if err != nil {
return -1
}
n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:])
if err != nil {
return 0
}
sb.sb_size += n
return n
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
// TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 {
err := sb.conn.SetWriteDeadline(time.Now().Local().Add(time.Second * time.Duration(sb.timeout)))
if err != nil {
return -1
}
n, err := sb.conn.Write(buf)
if err != nil {
return -1
}
return int32(n)
}
// int
// RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 {
if sb.conn != nil {
err := sb.conn.Close()
sb.conn = nil
if err == nil {
return 1
}
}
return 0
}

View File

@ -1,7 +0,0 @@
package rtmp
import "golang.org/x/sys/unix"
func setTimeval(sec int) unix.Timeval {
return unix.Timeval{Sec: int32(sec)}
}

View File

@ -66,19 +66,13 @@ type Encoder struct {
}
// NewEncoder retuns a new FLV encoder.
func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) {
e := Encoder{
func NewEncoder(dst io.Writer, audio, video bool, fps int) *Encoder {
return &Encoder{
dst: dst,
fps: fps,
audio: audio,
video: video,
}
// TODO(kortschak): Do this lazily.
_, err := e.dst.Write(e.HeaderBytes())
if err != nil {
return nil, err
}
return &e, nil
}
// HeaderBytes returns the a