From ce95901b6697ff19e50e2921a3751cb33d198ee6 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:16:46 +1030 Subject: [PATCH] Session renamed Conn and NewSession() and Open() replaced with Dial(). --- rtmp/session.go | 133 ++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/rtmp/session.go b/rtmp/session.go index 8da2626b..1d43c2a4 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -1,9 +1,9 @@ /* NAME - session.go + conn.go DESCRIPTION - RTMP session functionality. + RTMP connection functionality. AUTHORS Saxon Nelson-Milton @@ -11,7 +11,7 @@ AUTHORS Alan Noble LICENSE - session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -36,14 +36,14 @@ package rtmp import ( "io" "net" + "strconv" "time" "bitbucket.org/ausocean/av/rtmp/amf" ) -// Session holds the state for an RTMP session. -type Session struct { - url string +// Conn represents an RTMP connection. +type Conn struct { inChunkSize uint32 outChunkSize uint32 nBytesIn uint32 @@ -60,8 +60,6 @@ type Session struct { channelsIn []*packet channelsOut []*packet channelTimestamp []int32 - audioCodecs float64 - videoCodecs float64 deferred []byte link link log Log @@ -103,69 +101,71 @@ const ( // NB: We don't accept extended headers. const flvTagheaderSize = 11 -// NewSession returns a new Session. -func NewSession(url string, timeout uint, log Log) *Session { - return &Session{ - url: url, +// Dial connects to RTMP server specified by the given URL and returns the connection. +func Dial(url string, timeout uint, log Log) (*Conn, error) { + log(DebugLevel, pkg+"rtmp.Dial") + c := Conn{ inChunkSize: 128, outChunkSize: 128, clientBW: 2500000, clientBW2: 2, serverBW: 2500000, - audioCodecs: 3191.0, - videoCodecs: 252.0, log: log, link: link{ timeout: timeout, }, } + + var err error + c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) + if err != nil { + return nil, err + } + if c.link.app == "" { + return nil, errInvalidURL + } + if c.link.port == 0 { + switch { + case (c.link.protocol & featureSSL) != 0: + c.link.port = 433 + c.log(FatalLevel, pkg+"SSL not supported") + case (c.link.protocol & featureHTTP) != 0: + c.link.port = 80 + default: + c.link.port = 1935 + } + } + c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app + c.link.protocol |= featureWrite + + err = connect(&c) + if err != nil { + return nil, err + } + return &c, nil } -// Open establishes an rtmp connection with the url passed into the constructor. -func (s *Session) Open() error { - s.log(DebugLevel, pkg+"Session.Open") - if s.isConnected() { - return errConnected - } - err := s.init() - if err != nil { - return err - } - err = connect(s) - if err != nil { - s.Close() - return err - } - - err = connectStream(s) - if err != nil { - s.Close() - return err - } - return nil -} - -// Close terminates the rtmp connection. -// NB: Close is idempotent and the session value is cleared completely. -func (s *Session) Close() error { - s.log(DebugLevel, pkg+"Session.Close") - if !s.isConnected() { +// Close terminates the RTMP connection. +// NB: Close is idempotent and the connection value is cleared completely. +func (c *Conn) Close() error { + c.log(DebugLevel, pkg+"Conn.Close") + if !c.isConnected() { return errNotConnected } - if s.streamID > 0 { - if s.link.protocol&featureWrite != 0 { - sendFCUnpublish(s) + if c.streamID > 0 { + if c.link.protocol&featureWrite != 0 { + sendFCUnpublish(c) } - sendDeleteStream(s, float64(s.streamID)) + sendDeleteStream(c, float64(c.streamID)) } - s.link.conn.Close() - *s = Session{} + c.link.conn.Close() + *c = Conn{} return nil } // Write writes a frame (flv tag) to the rtmp connection. -func (s *Session) Write(data []byte) (int, error) { - if !s.isConnected() { +func (c *Conn) Write(data []byte) (int, error) { + if !c.isConnected() { return 0, errNotConnected } if len(data) < flvTagheaderSize { @@ -180,12 +180,12 @@ func (s *Session) Write(data []byte) (int, error) { bodySize: amf.DecodeInt24(data[1:4]), timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: chanSource, - info: s.streamID, + info: c.streamID, } pkt.resize(pkt.bodySize, headerSizeAuto) copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) - err := pkt.writeTo(s, false) + err := pkt.writeTo(c, false) if err != nil { return 0, err } @@ -197,19 +197,19 @@ func (s *Session) Write(data []byte) (int, error) { // read from an RTMP connection. Sends a bytes received message if the // number of bytes received (nBytesIn) is greater than the number sent // (nBytesInSent) by 10% of the bandwidth. -func (s *Session) read(buf []byte) (int, error) { - err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) +func (c *Conn) read(buf []byte) (int, error) { + err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) if err != nil { return 0, err } - n, err := io.ReadFull(s.link.conn, buf) + n, err := io.ReadFull(c.link.conn, buf) if err != nil { - s.log(DebugLevel, pkg+"read failed", "error", err.Error()) + c.log(DebugLevel, pkg+"read failed", "error", err.Error()) return 0, err } - s.nBytesIn += uint32(n) - if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { - err := sendBytesReceived(s) + c.nBytesIn += uint32(n) + if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { + err := sendBytesReceived(c) if err != nil { return n, err // NB: we still read n bytes, even though send bytes failed } @@ -218,22 +218,21 @@ func (s *Session) read(buf []byte) (int, error) { } // write to an RTMP connection. -func (s *Session) write(buf []byte) (int, error) { +func (c *Conn) write(buf []byte) (int, error) { //ToDo: consider using a different timeout for writes than for reads - err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) + err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) if err != nil { return 0, err } - n, err := s.link.conn.Write(buf) + n, err := c.link.conn.Write(buf) if err != nil { - s.log(WarnLevel, pkg+"write failed", "error", err.Error()) + c.log(WarnLevel, pkg+"write failed", "error", err.Error()) return 0, err } return n, nil } // isConnected returns true if the RTMP connection is up. -func (s *Session) isConnected() bool { - return s.link.conn != nil +func (c *Conn) isConnected() bool { + return c.link.conn != nil } -