Merged in rtmp-renaming (pull request #100)

Rtmp renaming

Approved-by: Saxon Milton <saxon.milton@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Alan Noble 2019-01-07 11:44:46 +00:00
commit f564a4927d
8 changed files with 919 additions and 1161 deletions

View File

@ -9,7 +9,7 @@ AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
amf_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the

494
rtmp/packet.go Normal file
View File

@ -0,0 +1,494 @@
/*
NAME
packet.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"encoding/binary"
"log"
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const (
RTMP_CHANNEL_BYTES_READ = 0x02
RTMP_CHANNEL_CONTROL = 0x03
RTMP_CHANNEL_SOURCE = 0x04
)
// packetSize defines valid packet sizes.
var packetSize = [...]int{12, 8, 4, 1}
// packet defines an RTMP packet.
type packet struct {
headerType uint8
packetType uint8
channel int32
hasAbsTimestamp bool
timestamp uint32
info int32
bodySize uint32
bytesRead uint32
chunk *chunk
header []byte
body []byte
}
// chunk defines an RTMP packet chunk.
type chunk struct {
headerSize int32
data []byte
header [RTMP_MAX_HEADER_SIZE]byte
}
// ToDo: Consider making the following functions into methods.
// readPacket reads a packet.
func readPacket(s *Session, pkt *packet) error {
var hbuf [RTMP_MAX_HEADER_SIZE]byte
header := hbuf[:]
err := readN(s, header[:1])
if err != nil {
log.Println("readPacket: failed to read RTMP packet header!")
return err
}
pkt.headerType = (header[0] & 0xc0) >> 6
pkt.channel = int32(header[0] & 0x3f)
header = header[1:]
switch {
case pkt.channel == 0:
err = readN(s, header[:1])
if err != nil {
log.Println("readPacket: failed to read rtmp packet header 2nd byte.")
return err
}
header = header[1:]
pkt.channel = int32(header[0]) + 64
case pkt.channel == 1:
err = readN(s, header[:2])
if err != nil {
log.Println("readPacket: failed to read RTMP packet 3rd byte")
return err
}
header = header[2:]
pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64
}
if pkt.channel >= s.channelsAllocatedIn {
n := pkt.channel + 10
timestamp := append(s.channelTimestamp, make([]int32, 10)...)
var pkts []*packet
if s.vecChannelsIn == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.channelTimestamp = timestamp
s.vecChannelsIn = pkts
for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ {
s.channelTimestamp[i] = 0
}
for i := int(s.channelsAllocatedIn); i < int(n); i++ {
s.vecChannelsIn[i] = nil
}
s.channelsAllocatedIn = n
}
size := packetSize[pkt.headerType]
switch {
case size == RTMP_LARGE_HEADER_SIZE:
pkt.hasAbsTimestamp = true
case size < RTMP_LARGE_HEADER_SIZE:
if s.vecChannelsIn[pkt.channel] != nil {
*pkt = *(s.vecChannelsIn[pkt.channel])
}
}
size--
if size > 0 {
err = readN(s, header[:size])
if err != nil {
log.Println("readPacket: failed to read rtmp packet heades.")
return err
}
}
hSize := len(hbuf) - len(header) + size
if size >= 3 {
pkt.timestamp = C_AMF_DecodeInt24(header[:3])
if size >= 6 {
pkt.bodySize = C_AMF_DecodeInt24(header[3:6])
pkt.bytesRead = 0
if size > 6 {
pkt.packetType = header[6]
if size == 11 {
pkt.info = decodeInt32LE(header[7:11])
}
}
}
}
extendedTimestamp := pkt.timestamp == 0xffffff
if extendedTimestamp {
err = readN(s, header[size:size+4])
if err != nil {
log.Println("readPacket: Failed to read extended timestamp")
return err
}
// TODO: port this
pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4])
hSize += 4
}
if pkt.bodySize > 0 && pkt.body == nil {
resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6)
}
toRead := int32(pkt.bodySize - pkt.bytesRead)
chunkSize := s.inChunkSize
if toRead < chunkSize {
chunkSize = toRead
}
if pkt.chunk != nil {
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
}
err = readN(s, pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil {
log.Println("readPacket: failed to read RTMP packet body")
return err
}
pkt.bytesRead += uint32(chunkSize)
// keep the packet as ref for other packets on this channel
if s.vecChannelsIn[pkt.channel] == nil {
s.vecChannelsIn[pkt.channel] = &packet{}
}
*(s.vecChannelsIn[pkt.channel]) = *pkt
if extendedTimestamp {
s.vecChannelsIn[pkt.channel].timestamp = 0xffffff
}
if pkt.bytesRead != pkt.bodySize {
panic("readPacket: bytesRead != bodySize")
}
if !pkt.hasAbsTimestamp {
// timestamps seem to always be relative
pkt.timestamp += uint32(s.channelTimestamp[pkt.channel])
}
s.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
s.vecChannelsIn[pkt.channel].body = nil
s.vecChannelsIn[pkt.channel].bytesRead = 0
s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false
return nil
}
// resizePacket adjust the packet's storage to accommodate a body of the given size.
func resizePacket(pkt *packet, size uint32, ht uint8) {
buf := make([]byte, RTMP_MAX_HEADER_SIZE+size)
pkt.headerType = ht
pkt.header = buf
pkt.body = buf[RTMP_MAX_HEADER_SIZE:]
}
// sendPacket sends a packet.
func sendPacket(s *Session, pkt *packet, queue bool) error {
var prevPkt *packet
var last int
if pkt.channel >= s.channelsAllocatedOut {
n := int(pkt.channel + 10)
var pkts []*packet
if s.vecChannelsOut == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.vecChannelsOut = pkts
for i := int(s.channelsAllocatedOut); i < n; i++ {
s.vecChannelsOut[i] = nil
}
s.channelsAllocatedOut = int32(n)
}
prevPkt = s.vecChannelsOut[pkt.channel]
if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM {
pkt.headerType = RTMP_PACKET_SIZE_SMALL
}
if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL {
pkt.headerType = RTMP_PACKET_SIZE_MINIMUM
}
last = int(prevPkt.timestamp)
}
if pkt.headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
pkt.headerType)
return errInvalidHeader
}
var headBytes []byte
var origIdx int
if pkt.body != nil {
// Span from -packetsize for the type to the start of the body.
headBytes = pkt.header
origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType]
} else {
// Allocate a new header and allow 6 bytes of movement backward.
var hbuf [RTMP_MAX_HEADER_SIZE]byte
headBytes = hbuf[:]
origIdx = 6
}
var cSize int
switch {
case pkt.channel > 319:
cSize = 2
case pkt.channel > 63:
cSize = 1
}
hSize := packetSize[pkt.headerType]
if cSize != 0 {
origIdx -= cSize
hSize += cSize
}
var ts uint32
if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last)
}
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
log.Printf("Larger timestamp than 24-bit: 0x%v", ts)
}
headerIdx := origIdx
c := pkt.headerType << 6
switch cSize {
case 0:
c |= byte(pkt.channel)
case 1:
// Do nothing.
case 2:
c |= 1
}
headBytes[headerIdx] = c
headerIdx++
if cSize != 0 {
tmp := pkt.channel - 64
headBytes[headerIdx] = byte(tmp & 0xff)
headerIdx++
if cSize == 2 {
headBytes[headerIdx] = byte(tmp >> 8)
headerIdx++
}
}
if packetSize[pkt.headerType] > 1 {
res := ts
if ts > 0xffffff {
res = 0xffffff
}
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res))
headerIdx += 3 // 24bits
}
if packetSize[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType
headerIdx++
}
if packetSize[pkt.headerType] > 8 {
n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info))
headerIdx += n
}
if ts >= 0xffffff {
C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts))
headerIdx += 4 // 32bits
}
size := int(pkt.bodySize)
chunkSize := int(s.outChunkSize)
if debugMode {
log.Printf("sendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size)
}
// Send the previously deferred packet if combining it with the next packet would exceed the chunk size.
if s.defered != nil && len(s.defered)+size+hSize > chunkSize {
err := writeN(s, s.defered)
if err != nil {
return err
}
s.defered = nil
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
for size+hSize != 0 {
if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize {
s.defered = headBytes[origIdx:][:size+hSize]
break
}
if chunkSize > size {
chunkSize = size
}
bytes := headBytes[origIdx:][:chunkSize+hSize]
if s.defered != nil {
// Prepend the previously deferred packet and write it with the current one.
bytes = append(s.defered, bytes...)
}
err := writeN(s, bytes)
if err != nil {
return err
}
s.defered = nil
size -= chunkSize
origIdx += chunkSize + hSize
hSize = 0
if size > 0 {
origIdx -= 1 + cSize
hSize = 1 + cSize
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
}
headBytes[origIdx] = 0xc0 | c
if cSize != 0 {
tmp := int(pkt.channel) - 64
headBytes[origIdx+1] = byte(tmp)
if cSize == 2 {
headBytes[origIdx+2] = byte(tmp >> 8)
}
}
if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:]
C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts))
}
}
}
// We invoked a remote method
if pkt.packetType == RTMP_PACKET_TYPE_INVOKE {
buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf)
if debugMode {
log.Printf("invoking %v", meth)
}
// keep it in call queue till result arrives
if queue {
buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
}
}
if s.vecChannelsOut[pkt.channel] == nil {
s.vecChannelsOut[pkt.channel] = &packet{}
}
*(s.vecChannelsOut[pkt.channel]) = *pkt
return nil
}
func decodeInt32LE(data []byte) int32 {
return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0])
}
func encodeInt32LE(dst []byte, v int32) int32 {
binary.LittleEndian.PutUint32(dst, uint32(v))
return 4
}

View File

@ -40,9 +40,8 @@ import (
"strings" "strings"
) )
// int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app); // parseURL parses an RTMP URL (ok, technically it is lexing).
// parseurl.c +33 func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
log.Printf("failed to parse addr: %v", err) log.Printf("failed to parse addr: %v", err)

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,11 @@ DESCRIPTION
AUTHORS AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE LICENSE
rtmp_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) rtmp_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -31,7 +33,9 @@ LICENSE
*/ */
package rtmp package rtmp
import "net" import (
"net"
)
const ( const (
RTMPT_OPEN = iota RTMPT_OPEN = iota
@ -40,28 +44,6 @@ const (
RTMPT_CLOSE RTMPT_CLOSE
) )
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const ( const (
RTMP_READ_HEADER = 0x01 RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02 RTMP_READ_RESUME = 0x02
@ -112,45 +94,8 @@ const (
RTMP_MAX_HEADER_SIZE = 18 RTMP_MAX_HEADER_SIZE = 18
) )
// typedef struct RTMPChunk type link struct {
// rtmp.h +105
type C_RTMPChunk struct {
c_headerSize int32
c_chunk []byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
// typedef struct RTMPPacket
// rtmp.h +113
type C_RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp bool
m_nChannel int32
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *C_RTMPChunk
m_header []byte
m_body []byte
}
// typedef struct RTMPSockBuf
// rtmp.h +127
// DELETED: subsumed by C_RTMP_LNK
// RTMPPacket_IsReady(a)
// rtmp.h +142
func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool {
return p.m_nBytesRead == p.m_nBodySize
}
// typedef struct RTMP_LNK
// rtmp.h +144
type C_RTMP_LNK struct {
host string host string
playpath0 string
playpath string playpath string
tcUrl string tcUrl string
swfUrl string swfUrl string
@ -160,7 +105,6 @@ type C_RTMP_LNK struct {
flashVer string flashVer string
token string token string
extras C_AMFObject extras C_AMFObject
seekTime int32
lFlags int32 lFlags int32
swfAge int32 swfAge int32
protocol int32 protocol int32
@ -169,44 +113,7 @@ type C_RTMP_LNK struct {
conn *net.TCPConn conn *net.TCPConn
} }
// typedef struct RTMPMethod type method struct {
// rtmp.h +231
type C_RTMP_METHOD struct {
name string name string
num int32 num int32
} }
// typedef struct RTMP
// rtmp.h +237
type C_RTMP struct {
m_inChunkSize int32
m_outChunkSize int32
m_nBWCheckCounter int32
m_nBytesIn int32
m_nBytesInSent int32
m_nBufferMS int32
m_stream_id int32
m_mediaChannel int32
m_pausing int32
m_nServerBW int32
m_nClientBW int32
m_nClientBW2 uint8
m_bPlaying bool
m_bSendEncoding bool
m_numInvokes int32
m_methodCalls []C_RTMP_METHOD
m_channelsAllocatedIn int32
m_channelsAllocatedOut int32
m_vecChannelsIn []*C_RTMPPacket
m_vecChannelsOut []*C_RTMPPacket
m_channelTimestamp []int32
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int32
m_resplen int32
m_unackd int32
m_write C_RTMPPacket
Link C_RTMP_LNK
}

View File

@ -1,7 +0,0 @@
package rtmp
// #define SET_RCVTIMEO(tv,s) int tv = s*1000
// rtmp_sys.h +43
func SET_RCVTIMEO(tv *int32, s int32) {
*tv = s * 1000
}

View File

@ -8,9 +8,10 @@ DESCRIPTION
AUTHORS AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org> Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE LICENSE
session.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -36,14 +37,36 @@ import (
"errors" "errors"
) )
// session provides parameters required for an rtmp communication session. // Session holds the state for an RTMP session.
type Session struct { type Session struct {
rtmp *C_RTMP
url string url string
timeout uint timeout uint
inChunkSize int32
outChunkSize int32
bwCheckCounter int32
nBytesIn int32
nBytesInSent int32
streamID int32
serverBW int32
clientBW int32
clientBW2 uint8
isPlaying bool
sendEncoding bool
numInvokes int32
methodCalls []method
channelsAllocatedIn int32
channelsAllocatedOut int32
vecChannelsIn []*packet
vecChannelsOut []*packet
channelTimestamp []int32
audioCodecs float64
videoCodecs float64
encoding float64
defered []byte
link link
} }
// NewSession returns a new session. // NewSession returns a new Session.
func NewSession(url string, connectTimeout uint) *Session { func NewSession(url string, connectTimeout uint) *Session {
return &Session{ return &Session{
url: url, url: url,
@ -51,48 +74,98 @@ func NewSession(url string, connectTimeout uint) *Session {
} }
} }
// Open establishes an rtmp connection with the url passed into the // Open establishes an rtmp connection with the url passed into the constructor.
// constructor
func (s *Session) Open() error { func (s *Session) Open() error {
if s.rtmp != nil { if s.isConnected() {
return errors.New("rtmp: attempt to start already running session") return errors.New("rtmp: attempt to start already running session")
} }
var err error err := s.start()
s.rtmp, err = startSession(s.rtmp, s.url, s.timeout) if err != nil {
if s.rtmp == nil {
return err return err
} }
return nil return nil
} }
// Close terminates the rtmp connection // start does the heavylifting for Open().
func (s *Session) Close() error { func (s *Session) start() error {
if s.rtmp == nil { s.init()
return Err(3) err := setupURL(s, s.url)
if err != nil {
s.close()
return err
} }
ret := endSession(s.rtmp)
s.rtmp = nil s.enableWrite()
if ret != 0 { err = connect(s)
return Err(ret) if err != nil {
s.close()
return err
}
err = connectStream(s)
if err != nil {
s.close()
return err
} }
return nil return nil
} }
// Write writes a frame (flv tag) to the rtmp connection // init initializes various RTMP defauls.
// ToDo: define consts for the magic numbers.
func (s *Session) init() {
s.inChunkSize = RTMP_DEFAULT_CHUNKSIZE
s.outChunkSize = RTMP_DEFAULT_CHUNKSIZE
s.clientBW = 2500000
s.clientBW2 = 2
s.serverBW = 2500000
s.audioCodecs = 3191.0
s.videoCodecs = 252.0
s.link.timeout = s.timeout
s.link.swfAge = 30
}
// Close terminates the rtmp connection,
func (s *Session) Close() error {
if !s.isConnected() {
return errNotConnected
}
s.close()
return nil
}
// close does the heavylifting for Close().
// Any errors are ignored as it is often called in response to an earlier error.
func (s *Session) close() {
if s.isConnected() {
if s.streamID > 0 {
if s.link.protocol&RTMP_FEATURE_WRITE != 0 {
sendFCUnpublish(s)
}
sendDeleteStream(s, float64(s.streamID))
}
s.link.conn.Close()
}
*s = Session{}
}
// Write writes a frame (flv tag) to the rtmp connection.
func (s *Session) Write(data []byte) (int, error) { func (s *Session) Write(data []byte) (int, error) {
if s.rtmp == nil { if !s.isConnected() {
return 0, Err(3) return 0, errNotConnected
} }
err := s.write(data)
if !C_RTMP_IsConnected(s.rtmp) {
return 0, Err(1)
}
err := C_RTMP_Write(s.rtmp, data)
if err != nil { if err != nil {
//if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 { return 0, err
// TODO: propagate err
return 0, Err(2)
} }
return len(data), nil return len(data), nil
} }
// isConnected returns true if the RTMP connection is up.
func (s *Session) isConnected() bool {
return s.link.conn != nil
}
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= RTMP_FEATURE_WRITE
}

View File

@ -1,7 +0,0 @@
package rtmp
import "golang.org/x/sys/unix"
func setTimeval(sec int) unix.Timeval {
return unix.Timeval{Sec: int32(sec)}
}