diff --git a/rtmp/conn.go b/rtmp/conn.go new file mode 100644 index 00000000..1d43c2a4 --- /dev/null +++ b/rtmp/conn.go @@ -0,0 +1,238 @@ +/* +NAME + conn.go + +DESCRIPTION + RTMP connection functionality. + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + Alan Noble + +LICENSE + conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + + Derived from librtmp under the GNU Lesser General Public License 2.1 + Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org + Copyright (C) 2008-2009 Andrej Stepanchuk + Copyright (C) 2009-2010 Howard Chu +*/ +package rtmp + +import ( + "io" + "net" + "strconv" + "time" + + "bitbucket.org/ausocean/av/rtmp/amf" +) + +// Conn represents an RTMP connection. +type Conn struct { + inChunkSize uint32 + outChunkSize uint32 + nBytesIn uint32 + nBytesInSent uint32 + streamID int32 + serverBW uint32 + clientBW uint32 + clientBW2 uint8 + isPlaying bool + numInvokes int32 + methodCalls []method + channelsAllocatedIn int32 + channelsAllocatedOut int32 + channelsIn []*packet + channelsOut []*packet + channelTimestamp []int32 + deferred []byte + link link + log Log +} + +// link represents RTMP URL and connection information. +type link struct { + host string + playpath string + url string + app string + auth string + flags int32 + protocol int32 + timeout uint + port uint16 + conn *net.TCPConn +} + +// method represents an RTMP method. +type method struct { + name string + num int32 +} + +// Log defines the RTMP logging function. +type Log func(level int8, message string, params ...interface{}) + +// Log levels used by Log. +const ( + DebugLevel int8 = -1 + InfoLevel int8 = 0 + WarnLevel int8 = 1 + ErrorLevel int8 = 2 + FatalLevel int8 = 5 +) + +// flvTagheaderSize is the FLV header size we expect. +// NB: We don't accept extended headers. +const flvTagheaderSize = 11 + +// Dial connects to RTMP server specified by the given URL and returns the connection. +func Dial(url string, timeout uint, log Log) (*Conn, error) { + log(DebugLevel, pkg+"rtmp.Dial") + c := Conn{ + inChunkSize: 128, + outChunkSize: 128, + clientBW: 2500000, + clientBW2: 2, + serverBW: 2500000, + log: log, + link: link{ + timeout: timeout, + }, + } + + var err error + c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) + if err != nil { + return nil, err + } + if c.link.app == "" { + return nil, errInvalidURL + } + if c.link.port == 0 { + switch { + case (c.link.protocol & featureSSL) != 0: + c.link.port = 433 + c.log(FatalLevel, pkg+"SSL not supported") + case (c.link.protocol & featureHTTP) != 0: + c.link.port = 80 + default: + c.link.port = 1935 + } + } + c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app + c.link.protocol |= featureWrite + + err = connect(&c) + if err != nil { + return nil, err + } + return &c, nil +} + +// Close terminates the RTMP connection. +// NB: Close is idempotent and the connection value is cleared completely. +func (c *Conn) Close() error { + c.log(DebugLevel, pkg+"Conn.Close") + if !c.isConnected() { + return errNotConnected + } + if c.streamID > 0 { + if c.link.protocol&featureWrite != 0 { + sendFCUnpublish(c) + } + sendDeleteStream(c, float64(c.streamID)) + } + c.link.conn.Close() + *c = Conn{} + return nil +} + +// Write writes a frame (flv tag) to the rtmp connection. +func (c *Conn) Write(data []byte) (int, error) { + if !c.isConnected() { + return 0, errNotConnected + } + if len(data) < flvTagheaderSize { + return 0, errInvalidFlvTag + } + if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { + return 0, errUnimplemented + } + + pkt := packet{ + packetType: data[0], + bodySize: amf.DecodeInt24(data[1:4]), + timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, + channel: chanSource, + info: c.streamID, + } + + pkt.resize(pkt.bodySize, headerSizeAuto) + copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) + err := pkt.writeTo(c, false) + if err != nil { + return 0, err + } + return len(data), nil +} + +// I/O functions + +// read from an RTMP connection. Sends a bytes received message if the +// number of bytes received (nBytesIn) is greater than the number sent +// (nBytesInSent) by 10% of the bandwidth. +func (c *Conn) read(buf []byte) (int, error) { + err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) + if err != nil { + return 0, err + } + n, err := io.ReadFull(c.link.conn, buf) + if err != nil { + c.log(DebugLevel, pkg+"read failed", "error", err.Error()) + return 0, err + } + c.nBytesIn += uint32(n) + if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { + err := sendBytesReceived(c) + if err != nil { + return n, err // NB: we still read n bytes, even though send bytes failed + } + } + return n, nil +} + +// write to an RTMP connection. +func (c *Conn) write(buf []byte) (int, error) { + //ToDo: consider using a different timeout for writes than for reads + err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) + if err != nil { + return 0, err + } + n, err := c.link.conn.Write(buf) + if err != nil { + c.log(WarnLevel, pkg+"write failed", "error", err.Error()) + return 0, err + } + return n, nil +} + +// isConnected returns true if the RTMP connection is up. +func (c *Conn) isConnected() bool { + return c.link.conn != nil +}