/* NAME session.go DESCRIPTION RTMP session functionality. AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "io" "net" "time" ) // Session holds the state for an RTMP session. type Session struct { url string inChunkSize int32 outChunkSize int32 checkCounter int32 nBytesIn int32 nBytesInSent int32 streamID int32 serverBW int32 clientBW int32 clientBW2 uint8 isPlaying bool sendEncoding bool numInvokes int32 methodCalls []method channelsAllocatedIn int32 channelsAllocatedOut int32 channelsIn []*packet channelsOut []*packet channelTimestamp []int32 audioCodecs float64 videoCodecs float64 encoding float64 deferred []byte link link log Log } // link represents RTMP URL and connection information. type link struct { host string playpath string tcUrl string swfUrl string pageUrl string app string auth string flashVer string token string extras C_AMFObject lFlags int32 swfAge int32 protocol int32 timeout uint port uint16 conn *net.TCPConn } // method represents an RTMP method. type method struct { name string num int32 } // Log defines the RTMP logging function. type Log func(level int8, message string, params ...interface{}) // Log levels used by Log. const ( DebugLevel int8 = -1 InfoLevel int8 = 0 WarnLevel int8 = 1 ErrorLevel int8 = 2 FatalLevel int8 = 5 ) // NewSession returns a new Session. func NewSession(url string, timeout uint, log Log) *Session { return &Session{ url: url, inChunkSize: 128, outChunkSize: 128, clientBW: 2500000, clientBW2: 2, serverBW: 2500000, audioCodecs: 3191.0, videoCodecs: 252.0, log: log, link: link{ timeout: timeout, swfAge: 30, }, } } // Open establishes an rtmp connection with the url passed into the constructor. func (s *Session) Open() error { if s.isConnected() { return errConnected } err := s.start() if err != nil { return err } return nil } // start does the heavylifting for Open(). func (s *Session) start() error { s.log(DebugLevel, pkg+"Session.start") err := setupURL(s) if err != nil { s.close() return err } s.enableWrite() err = connect(s) if err != nil { s.close() return err } err = connectStream(s) if err != nil { s.close() return err } return nil } // Close terminates the rtmp connection, func (s *Session) Close() error { if !s.isConnected() { return errNotConnected } s.close() return nil } // close does the heavylifting for Close(). // Any errors are ignored as it is often called in response to an earlier error. func (s *Session) close() { s.log(DebugLevel, pkg+"Session.close") if s.isConnected() { if s.streamID > 0 { if s.link.protocol&RTMP_FEATURE_WRITE != 0 { sendFCUnpublish(s) } sendDeleteStream(s, float64(s.streamID)) } s.link.conn.Close() } *s = Session{} } // Write writes a frame (flv tag) to the rtmp connection. func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { return 0, errNotConnected } if data[0] == RTMP_PACKET_TYPE_INFO || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { return 0, errUnimplemented } if len(data) < minDataSize { return 0, errTinyPacket } pkt := packet{ packetType: data[0], bodySize: C_AMF_DecodeInt24(data[1:4]), timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: RTMP_CHANNEL_SOURCE, info: s.streamID, } pkt.resize(pkt.bodySize, RTMP_PACKET_SIZE_AUTO) copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize]) err := pkt.write(s, false) if err != nil { return 0, err } return len(data), nil } // I/O functions // read from an RTMP connection. func (s *Session) read(buf []byte) error { err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } n, err := io.ReadFull(s.link.conn, buf) if err != nil { s.log(DebugLevel, pkg+"read failed", "error", err.Error()) return err } s.nBytesIn += int32(n) if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { err := sendBytesReceived(s) if err != nil { return err } } return nil } // write to an RTMP connection. func (s *Session) write(buf []byte) error { //ToDo: consider using a different timeout for writes than for reads err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } _, err = s.link.conn.Write(buf) if err != nil { s.log(WarnLevel, pkg+"write failed", "error", err.Error()) return err } return nil } // isConnected returns true if the RTMP connection is up. func (s *Session) isConnected() bool { return s.link.conn != nil } // enableWrite enables the current session for writing. func (s *Session) enableWrite() { s.link.protocol |= RTMP_FEATURE_WRITE }