diff --git a/rtmp/packet.go b/rtmp/packet.go index a949cbbc..23671eac 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -91,7 +91,7 @@ type packet struct { info int32 bodySize uint32 bytesRead uint32 - header []byte + buf []byte body []byte } @@ -245,9 +245,8 @@ func (pkt *packet) readFrom(c *Conn) error { // resize adjusts the packet's storage to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - buf := make([]byte, fullHeaderSize+size) - pkt.header = buf - pkt.body = buf[fullHeaderSize:] + pkt.buf = make([]byte, fullHeaderSize+size) + pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht return @@ -317,7 +316,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { // The complete packet starts from headerSize _before_ the start the body. // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. - headBytes := pkt.header + headBytes := pkt.buf hSize := headerSizes[pkt.headerType] origIdx := fullHeaderSize - hSize diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index fe4c04d6..86c1f09e 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -261,7 +261,7 @@ func sendConnectPacket(c *Conn) error { channel: chanControl, headerType: headerSizeLarge, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -318,7 +318,7 @@ func sendCreateStream(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -346,7 +346,7 @@ func sendReleaseStream(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -377,7 +377,7 @@ func sendFCPublish(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -409,7 +409,7 @@ func sendFCUnpublish(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -441,7 +441,7 @@ func sendPublish(c *Conn) error { channel: chanSource, headerType: headerSizeLarge, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -477,7 +477,7 @@ func sendDeleteStream(c *Conn, dStreamId float64) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -509,7 +509,7 @@ func sendBytesReceived(c *Conn) error { channel: chanBytesRead, headerType: headerSizeMedium, packetType: packetTypeBytesReadReport, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -531,7 +531,7 @@ func sendCheckBW(c *Conn) error { channel: chanControl, headerType: headerSizeLarge, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body diff --git a/rtmp/session.go b/rtmp/session.go deleted file mode 100644 index 1d43c2a4..00000000 --- a/rtmp/session.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -NAME - conn.go - -DESCRIPTION - RTMP connection functionality. - -AUTHORS - Saxon Nelson-Milton - Dan Kortschak - Alan Noble - -LICENSE - conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -import ( - "io" - "net" - "strconv" - "time" - - "bitbucket.org/ausocean/av/rtmp/amf" -) - -// Conn represents an RTMP connection. -type Conn struct { - inChunkSize uint32 - outChunkSize uint32 - nBytesIn uint32 - nBytesInSent uint32 - streamID int32 - serverBW uint32 - clientBW uint32 - clientBW2 uint8 - isPlaying bool - numInvokes int32 - methodCalls []method - channelsAllocatedIn int32 - channelsAllocatedOut int32 - channelsIn []*packet - channelsOut []*packet - channelTimestamp []int32 - deferred []byte - link link - log Log -} - -// link represents RTMP URL and connection information. -type link struct { - host string - playpath string - url string - app string - auth string - flags int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -// method represents an RTMP method. -type method struct { - name string - num int32 -} - -// Log defines the RTMP logging function. -type Log func(level int8, message string, params ...interface{}) - -// Log levels used by Log. -const ( - DebugLevel int8 = -1 - InfoLevel int8 = 0 - WarnLevel int8 = 1 - ErrorLevel int8 = 2 - FatalLevel int8 = 5 -) - -// flvTagheaderSize is the FLV header size we expect. -// NB: We don't accept extended headers. -const flvTagheaderSize = 11 - -// Dial connects to RTMP server specified by the given URL and returns the connection. -func Dial(url string, timeout uint, log Log) (*Conn, error) { - log(DebugLevel, pkg+"rtmp.Dial") - c := Conn{ - inChunkSize: 128, - outChunkSize: 128, - clientBW: 2500000, - clientBW2: 2, - serverBW: 2500000, - log: log, - link: link{ - timeout: timeout, - }, - } - - var err error - c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) - if err != nil { - return nil, err - } - if c.link.app == "" { - return nil, errInvalidURL - } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } - c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app - c.link.protocol |= featureWrite - - err = connect(&c) - if err != nil { - return nil, err - } - return &c, nil -} - -// Close terminates the RTMP connection. -// NB: Close is idempotent and the connection value is cleared completely. -func (c *Conn) Close() error { - c.log(DebugLevel, pkg+"Conn.Close") - if !c.isConnected() { - return errNotConnected - } - if c.streamID > 0 { - if c.link.protocol&featureWrite != 0 { - sendFCUnpublish(c) - } - sendDeleteStream(c, float64(c.streamID)) - } - c.link.conn.Close() - *c = Conn{} - return nil -} - -// Write writes a frame (flv tag) to the rtmp connection. -func (c *Conn) Write(data []byte) (int, error) { - if !c.isConnected() { - return 0, errNotConnected - } - if len(data) < flvTagheaderSize { - return 0, errInvalidFlvTag - } - if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { - return 0, errUnimplemented - } - - pkt := packet{ - packetType: data[0], - bodySize: amf.DecodeInt24(data[1:4]), - timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, - channel: chanSource, - info: c.streamID, - } - - pkt.resize(pkt.bodySize, headerSizeAuto) - copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) - err := pkt.writeTo(c, false) - if err != nil { - return 0, err - } - return len(data), nil -} - -// I/O functions - -// read from an RTMP connection. Sends a bytes received message if the -// number of bytes received (nBytesIn) is greater than the number sent -// (nBytesInSent) by 10% of the bandwidth. -func (c *Conn) read(buf []byte) (int, error) { - err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) - if err != nil { - return 0, err - } - n, err := io.ReadFull(c.link.conn, buf) - if err != nil { - c.log(DebugLevel, pkg+"read failed", "error", err.Error()) - return 0, err - } - c.nBytesIn += uint32(n) - if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { - err := sendBytesReceived(c) - if err != nil { - return n, err // NB: we still read n bytes, even though send bytes failed - } - } - return n, nil -} - -// write to an RTMP connection. -func (c *Conn) write(buf []byte) (int, error) { - //ToDo: consider using a different timeout for writes than for reads - err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) - if err != nil { - return 0, err - } - n, err := c.link.conn.Write(buf) - if err != nil { - c.log(WarnLevel, pkg+"write failed", "error", err.Error()) - return 0, err - } - return n, nil -} - -// isConnected returns true if the RTMP connection is up. -func (c *Conn) isConnected() bool { - return c.link.conn != nil -}