av/rtmp/session.go

254 lines
6.0 KiB
Go

/*
NAME
session.go
DESCRIPTION
RTMP session functionality.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"io"
"net"
"time"
)
// Session holds the state for an RTMP session.
type Session struct {
url string
inChunkSize int32
outChunkSize int32
checkCounter int32
nBytesIn int32
nBytesInSent int32
streamID int32
serverBW int32
clientBW int32
clientBW2 uint8
isPlaying bool
sendEncoding bool
numInvokes int32
methodCalls []method
channelsAllocatedIn int32
channelsAllocatedOut int32
channelsIn []*packet
channelsOut []*packet
channelTimestamp []int32
audioCodecs float64
videoCodecs float64
encoding float64
deferred []byte
link link
log Log
}
// link represents RTMP URL and connection information.
type link struct {
host string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras AMF
flags int32
swfAge int32
protocol int32
timeout uint
port uint16
conn *net.TCPConn
}
// method represents an RTMP method.
type method struct {
name string
num int32
}
// Log defines the RTMP logging function.
type Log func(level int8, message string, params ...interface{})
// Log levels used by Log.
const (
DebugLevel int8 = -1
InfoLevel int8 = 0
WarnLevel int8 = 1
ErrorLevel int8 = 2
FatalLevel int8 = 5
)
// flvTagheaderSize is the FLV header size we expect.
// NB: We don't accept extended headers.
const flvTagheaderSize = 11
// NewSession returns a new Session.
func NewSession(url string, timeout uint, log Log) *Session {
return &Session{
url: url,
inChunkSize: 128,
outChunkSize: 128,
clientBW: 2500000,
clientBW2: 2,
serverBW: 2500000,
audioCodecs: 3191.0,
videoCodecs: 252.0,
log: log,
link: link{
timeout: timeout,
swfAge: 30,
},
}
}
// Open establishes an rtmp connection with the url passed into the constructor.
func (s *Session) Open() error {
s.log(DebugLevel, pkg+"Session.Open")
if s.isConnected() {
return errConnected
}
err := setupURL(s)
if err != nil {
return err
}
s.enableWrite()
err = connect(s)
if err != nil {
s.Close()
return err
}
err = connectStream(s)
if err != nil {
s.Close()
return err
}
return nil
}
// Close terminates the rtmp connection.
// NB: Close is idempotent and the session value is cleared completely.
func (s *Session) Close() error {
s.log(DebugLevel, pkg+"Session.Close")
if !s.isConnected() {
return errNotConnected
}
if s.streamID > 0 {
if s.link.protocol&featureWrite != 0 {
sendFCUnpublish(s)
}
sendDeleteStream(s, float64(s.streamID))
}
s.link.conn.Close()
*s = Session{}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection.
func (s *Session) Write(data []byte) (int, error) {
if !s.isConnected() {
return 0, errNotConnected
}
if len(data) < flvTagheaderSize {
return 0, errInvalidFlvTag
}
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented
}
pkt := packet{
packetType: data[0],
bodySize: amfDecodeInt24(data[1:4]),
timestamp: amfDecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: chanSource,
info: s.streamID,
}
pkt.resize(pkt.bodySize, headerSizeAuto)
copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize])
err := pkt.write(s, false)
if err != nil {
return 0, err
}
return len(data), nil
}
// I/O functions
// read from an RTMP connection. Sends a bytes received message if the
// number of bytes received (nBytesIn) is greater than the number sent
// (nBytesInSent) by 10% of the bandwidth.
func (s *Session) read(buf []byte) (int, error) {
err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout)))
if err != nil {
return 0, err
}
n, err := io.ReadFull(s.link.conn, buf)
if err != nil {
s.log(DebugLevel, pkg+"read failed", "error", err.Error())
return 0, err
}
s.nBytesIn += int32(n)
if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) {
err := sendBytesReceived(s)
if err != nil {
return n, err // NB: we still read n bytes, even though send bytes failed
}
}
return n, nil
}
// write to an RTMP connection.
func (s *Session) write(buf []byte) (int, error) {
//ToDo: consider using a different timeout for writes than for reads
err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout)))
if err != nil {
return 0, err
}
n, err := s.link.conn.Write(buf)
if err != nil {
s.log(WarnLevel, pkg+"write failed", "error", err.Error())
return 0, err
}
return n, nil
}
// isConnected returns true if the RTMP connection is up.
func (s *Session) isConnected() bool {
return s.link.conn != nil
}
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= featureWrite
}