/* NAME session.go DESCRIPTION RTMP session functionality. AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "io" "net" "time" "bitbucket.org/ausocean/av/rtmp/amf" ) // Session holds the state for an RTMP session. type Session struct { url string inChunkSize int32 outChunkSize int32 checkCounter int32 nBytesIn int32 nBytesInSent int32 streamID int32 serverBW int32 clientBW int32 clientBW2 uint8 isPlaying bool sendEncoding bool numInvokes int32 methodCalls []method channelsAllocatedIn int32 channelsAllocatedOut int32 channelsIn []*packet channelsOut []*packet channelTimestamp []int32 audioCodecs float64 videoCodecs float64 encoding float64 deferred []byte link link log Log } // link represents RTMP URL and connection information. type link struct { host string playpath string tcUrl string swfUrl string pageUrl string app string auth string flashVer string token string extras amf.Object flags int32 swfAge int32 protocol int32 timeout uint port uint16 conn *net.TCPConn } // method represents an RTMP method. type method struct { name string num int32 } // Log defines the RTMP logging function. type Log func(level int8, message string, params ...interface{}) // Log levels used by Log. const ( DebugLevel int8 = -1 InfoLevel int8 = 0 WarnLevel int8 = 1 ErrorLevel int8 = 2 FatalLevel int8 = 5 ) // flvTagheaderSize is the FLV header size we expect. // NB: We don't accept extended headers. const flvTagheaderSize = 11 // NewSession returns a new Session. func NewSession(url string, timeout uint, log Log) *Session { return &Session{ url: url, inChunkSize: 128, outChunkSize: 128, clientBW: 2500000, clientBW2: 2, serverBW: 2500000, audioCodecs: 3191.0, videoCodecs: 252.0, log: log, link: link{ timeout: timeout, swfAge: 30, }, } } // Open establishes an rtmp connection with the url passed into the constructor. func (s *Session) Open() error { s.log(DebugLevel, pkg+"Session.Open") if s.isConnected() { return errConnected } err := setupURL(s) if err != nil { return err } s.enableWrite() err = connect(s) if err != nil { s.Close() return err } err = connectStream(s) if err != nil { s.Close() return err } return nil } // Close terminates the rtmp connection. // NB: Close is idempotent and the session value is cleared completely. func (s *Session) Close() error { s.log(DebugLevel, pkg+"Session.Close") if !s.isConnected() { return errNotConnected } if s.streamID > 0 { if s.link.protocol&featureWrite != 0 { sendFCUnpublish(s) } sendDeleteStream(s, float64(s.streamID)) } s.link.conn.Close() *s = Session{} return nil } // Write writes a frame (flv tag) to the rtmp connection. func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { return 0, errNotConnected } if len(data) < flvTagheaderSize { return 0, errInvalidFlvTag } if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { return 0, errUnimplemented } pkt := packet{ packetType: data[0], bodySize: amf.DecodeInt24(data[1:4]), timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: chanSource, info: s.streamID, } pkt.resize(pkt.bodySize, headerSizeAuto) copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) err := pkt.write(s, false) if err != nil { return 0, err } return len(data), nil } // I/O functions // read from an RTMP connection. Sends a bytes received message if the // number of bytes received (nBytesIn) is greater than the number sent // (nBytesInSent) by 10% of the bandwidth. func (s *Session) read(buf []byte) (int, error) { err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return 0, err } n, err := io.ReadFull(s.link.conn, buf) if err != nil { s.log(DebugLevel, pkg+"read failed", "error", err.Error()) return 0, err } s.nBytesIn += int32(n) if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { err := sendBytesReceived(s) if err != nil { return n, err // NB: we still read n bytes, even though send bytes failed } } return n, nil } // write to an RTMP connection. func (s *Session) write(buf []byte) (int, error) { //ToDo: consider using a different timeout for writes than for reads err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return 0, err } n, err := s.link.conn.Write(buf) if err != nil { s.log(WarnLevel, pkg+"write failed", "error", err.Error()) return 0, err } return n, nil } // isConnected returns true if the RTMP connection is up. func (s *Session) isConnected() bool { return s.link.conn != nil } // enableWrite enables the current session for writing. func (s *Session) enableWrite() { s.link.protocol |= featureWrite }