Merged in bitrate-calc (pull request #336)

Bitrate calculations in revid.

Approved-by: Saxon Milton <saxon.milton@gmail.com>
This commit is contained in:
Scott Barnard 2020-01-27 00:16:26 +00:00 committed by Saxon Milton
commit 2084753985
4 changed files with 28 additions and 10 deletions

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.13
require ( require (
bitbucket.org/ausocean/iot v1.2.13 bitbucket.org/ausocean/iot v1.2.13
bitbucket.org/ausocean/utils v1.2.12 bitbucket.org/ausocean/utils v1.2.13
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884

2
go.sum
View File

@ -12,6 +12,8 @@ bitbucket.org/ausocean/utils v1.2.11 h1:zA0FOaPjN960ryp8PKCkV5y50uWBYrIxCVnXjwbv
bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.12 h1:VnskjWTDM475TnQRhBQE0cNp9D6Y6OELrd4UkD2VVIQ= bitbucket.org/ausocean/utils v1.2.12 h1:VnskjWTDM475TnQRhBQE0cNp9D6Y6OELrd4UkD2VVIQ=
bitbucket.org/ausocean/utils v1.2.12/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.12/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.13 h1:tUaIywtoMc1+zl1GCVQokX4mL5X7LNHX5O51AgAPrWA=
bitbucket.org/ausocean/utils v1.2.13/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=

View File

@ -7,6 +7,7 @@ AUTHORS
Alan Noble <alan@ausocean.org> Alan Noble <alan@ausocean.org>
Dan Kortschak <dan@ausocean.org> Dan Kortschak <dan@ausocean.org>
Trek Hopton <trek@ausocean.org> Trek Hopton <trek@ausocean.org>
Scott Barnard <scott@ausocean.org>
LICENSE LICENSE
revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean) revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean)
@ -52,6 +53,7 @@ import (
"bitbucket.org/ausocean/av/filter" "bitbucket.org/ausocean/av/filter"
"bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/bitrate"
"bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/vring" "bitbucket.org/ausocean/utils/vring"
@ -115,6 +117,9 @@ type Revid struct {
// err will channel errors from revid routines to the handle errors routine. // err will channel errors from revid routines to the handle errors routine.
err chan error err chan error
// bitrate is used for bitrate calculations.
bitrate bitrate.Calculator
} }
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
@ -148,10 +153,8 @@ func (r *Revid) handleErrors() {
} }
// Bitrate returns the result of the most recent bitrate check. // Bitrate returns the result of the most recent bitrate check.
//
// TODO: get this working again.
func (r *Revid) Bitrate() int { func (r *Revid) Bitrate() int {
return -1 return r.bitrate.Bitrate()
} }
// reset swaps the current config of a Revid with the passed // reset swaps the current config of a Revid with the passed
@ -266,14 +269,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
return fmt.Errorf("could not initialise MTS ring buffer: %w", err) return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
} }
w = newMtsSender( w = newMtsSender(
newHttpSender(r.ns, r.cfg.Logger.Log), newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report),
r.cfg.Logger.Log, r.cfg.Logger.Log,
rb, rb,
r.cfg.ClipDuration, r.cfg.ClipDuration,
) )
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case config.OutputRTP: case config.OutputRTP:
w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate) w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report)
if err != nil { if err != nil {
r.cfg.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) r.cfg.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error())
} }
@ -295,6 +298,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
rtmpConnectionMaxTries, rtmpConnectionMaxTries,
rb, rb,
r.cfg.Logger.Log, r.cfg.Logger.Log,
r.bitrate.Report,
) )
if err != nil { if err != nil {
r.cfg.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) r.cfg.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())

View File

@ -61,19 +61,25 @@ const (
type httpSender struct { type httpSender struct {
client *netsender.Sender client *netsender.Sender
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
report func(sent int)
} }
// newHttpSender returns a pointer to a new httpSender. // newHttpSender returns a pointer to a new httpSender.
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender {
return &httpSender{ return &httpSender{
client: ns, client: ns,
log: log, log: log,
report: report,
} }
} }
// Write implements io.Writer. // Write implements io.Writer.
func (s *httpSender) Write(d []byte) (int, error) { func (s *httpSender) Write(d []byte) (int, error) {
return len(d), httpSend(d, s.client, s.log) err := httpSend(d, s.client, s.log)
if err == nil {
s.report(len(d))
}
return len(d), err
} }
func (s *httpSender) Close() error { return nil } func (s *httpSender) Close() error { return nil }
@ -276,9 +282,10 @@ type rtmpSender struct {
ring *vring.Buffer ring *vring.Buffer
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
report func(sent int)
} }
func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
var conn *rtmp.Conn var conn *rtmp.Conn
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
@ -299,6 +306,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log
log: log, log: log,
ring: rb, ring: rb,
done: make(chan struct{}), done: make(chan struct{}),
report: report,
} }
s.wg.Add(1) s.wg.Add(1)
go s.output() go s.output()
@ -364,6 +372,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error())
} }
s.report(len(d))
return len(d), nil return len(d), nil
} }
@ -404,9 +413,10 @@ type rtpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder encoder *rtp.Encoder
data []byte data []byte
report func(sent int)
} }
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint, report func(sent int)) (*rtpSender, error) {
conn, err := net.Dial("udp", addr) conn, err := net.Dial("udp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -414,6 +424,7 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
s := &rtpSender{ s := &rtpSender{
log: log, log: log,
encoder: rtp.NewEncoder(conn, int(fps)), encoder: rtp.NewEncoder(conn, int(fps)),
report: report,
} }
return s, nil return s, nil
} }
@ -426,6 +437,7 @@ func (s *rtpSender) Write(d []byte) (int, error) {
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"rtpSender: write error", err.Error()) s.log(logger.Warning, pkg+"rtpSender: write error", err.Error())
} }
s.report(len(d))
return len(d), nil return len(d), nil
} }