Merged in rtmp-bw-modifiable (pull request #412)

protocol/rtmp: added options parameter to rtmp.Dial and provided options for bandwidths and link timeout

Approved-by: Trek Hopton <trek.hopton@gmail.com>
This commit is contained in:
Saxon Milton 2020-05-19 03:07:34 +00:00
parent 4d3bbd680c
commit a286d6c140
5 changed files with 110 additions and 30 deletions

View File

@ -37,6 +37,7 @@ LICENSE
package rtmp
import (
"fmt"
"io"
"net"
"strconv"
@ -45,6 +46,22 @@ import (
"bitbucket.org/ausocean/av/protocol/rtmp/amf"
)
// Log levels used by Log.
const (
DebugLevel int8 = -1
InfoLevel int8 = 0
WarnLevel int8 = 1
ErrorLevel int8 = 2
FatalLevel int8 = 5
)
// Configuration defaults.
const (
defaultTimeout = 10
defaultClientBandwidth = 2500000
defaultServerBandwidth = 2500000
)
// Conn represents an RTMP connection.
type Conn struct {
inChunkSize uint32
@ -91,34 +108,33 @@ type method struct {
// Log defines the RTMP logging function.
type Log func(level int8, message string, params ...interface{})
// Log levels used by Log.
const (
DebugLevel int8 = -1
InfoLevel int8 = 0
WarnLevel int8 = 1
ErrorLevel int8 = 2
FatalLevel int8 = 5
)
// flvTagheaderSize is the FLV header size we expect.
// NB: We don't accept extended headers.
const flvTagheaderSize = 11
// Dial connects to RTMP server specified by the given URL and returns the connection.
func Dial(url string, timeout uint, log Log) (*Conn, error) {
func Dial(url string, log Log, options ...func(*Conn) error) (*Conn, error) {
log(DebugLevel, pkg+"rtmp.Dial")
c := Conn{
inChunkSize: 128,
outChunkSize: 128,
clientBW: 2500000,
clientBW: defaultClientBandwidth,
clientBW2: 2,
serverBW: 2500000,
serverBW: defaultServerBandwidth,
log: log,
link: link{
timeout: timeout,
timeout: defaultTimeout,
},
}
// Apply any options that have been provided.
for _, option := range options {
err := option(&c)
if err != nil {
return nil, fmt.Errorf("error from option: %w", err)
}
}
var err error
c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url)
if err != nil {

71
protocol/rtmp/options.go Normal file
View File

@ -0,0 +1,71 @@
/*
DESCRIPTION
options.go provides RTMP connection option functions used to change
configuration parameters such as timeouts and bandwidths.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2020 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
import "errors"
// Option parameter errors.
var (
ErrClientBandwidth = errors.New("bad client bandwidth")
ErrServerBandwidth = errors.New("bad server bandwidth")
ErrLinkTimeout = errors.New("bad link timeout")
)
// ClientBandwidth changes the Conn's clientBW parameter to the given value.
// See default value under conn.go.
func ClientBandwidth(b int) func(*Conn) error {
return func(c *Conn) error {
if b <= 0 {
return ErrClientBandwidth
}
c.clientBW = uint32(b)
return nil
}
}
// ServerBandwidth changes the Conn's serverBW parameter to the given value.
// See default value under conn.go.
func ServerBandwidth(b int) func(*Conn) error {
return func(c *Conn) error {
if b <= 0 {
return ErrServerBandwidth
}
c.serverBW = uint32(b)
return nil
}
}
// LinkTimeout changes the Conn.link's timeout parameter to the given value.
// See default value under conn.go.
func LinkTimeout(t uint) func(*Conn) error {
return func(c *Conn) error {
if t <= 0 {
return ErrLinkTimeout
}
c.link.timeout = t
return nil
}
}

View File

@ -122,7 +122,7 @@ func TestErorHandling(t *testing.T) {
if testKey == "" {
t.Skip("Skipping TestErrorHandling since no RTMP_TEST_KEY")
}
c, err := Dial(testBaseURL+testKey, testTimeout, testLog)
c, err := Dial(testBaseURL+testKey, testLog, LinkTimeout(testTimeout))
if err != nil {
t.Errorf("Dial failed with error: %v", err)
}
@ -180,7 +180,7 @@ func TestFromFrame(t *testing.T) {
if testKey == "" {
t.Skip("Skipping TestFromFrame since no RTMP_TEST_KEY")
}
c, err := Dial(testBaseURL+testKey, testTimeout, testLog)
c, err := Dial(testBaseURL+testKey, testLog, LinkTimeout(testTimeout))
if err != nil {
t.Errorf("Dial failed with error: %v", err)
}
@ -238,7 +238,7 @@ func TestFromFile(t *testing.T) {
if testKey == "" {
t.Skip("Skipping TestFromFile since no RTMP_TEST_KEY")
}
c, err := Dial(testBaseURL+testKey, testTimeout, testLog)
c, err := Dial(testBaseURL+testKey, testLog, LinkTimeout(testTimeout))
if err != nil {
t.Errorf("Dial failed with error: %v", err)
}

View File

@ -58,15 +58,10 @@ import (
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer parameters.
// Misc consts.
const (
rbStartingElementSize = 10000 // Bytes.
)
// RTMP connection properties.
const (
rtmpConnectionMaxTries = 5
rtmpConnectionTimeout = 10
)
type Logger interface {
@ -304,7 +299,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
case config.OutputRTMP:
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())
}

View File

@ -299,7 +299,6 @@ func (s *mtsSender) Close() error {
type rtmpSender struct {
conn *rtmp.Conn
url string
timeout uint
retries int
log func(lvl int8, msg string, args ...interface{})
ring *ring.Buffer
@ -308,11 +307,11 @@ type rtmpSender struct {
report func(sent int)
}
func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
conn, err = rtmp.Dial(url, timeout, log)
conn, err = rtmp.Dial(url, log)
if err == nil {
break
}
@ -324,7 +323,6 @@ func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log f
s := &rtmpSender{
conn: conn,
url: url,
timeout: timeout,
retries: retries,
log: log,
ring: rb,
@ -412,7 +410,7 @@ func (s *rtmpSender) restart() error {
var err error
for n := 0; n < s.retries; n++ {
s.log(logger.Debug, "dialing", "dials", n)
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
s.conn, err = rtmp.Dial(s.url, s.log)
if err == nil {
break
}