diff --git a/go.mod b/go.mod index 2ed4c734..dfa6f093 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( bitbucket.org/ausocean/iot v1.2.13 - bitbucket.org/ausocean/utils v1.2.12 + bitbucket.org/ausocean/utils v1.2.13 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 diff --git a/go.sum b/go.sum index ef1fb9ff..d74fe617 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ bitbucket.org/ausocean/utils v1.2.11 h1:zA0FOaPjN960ryp8PKCkV5y50uWBYrIxCVnXjwbv bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.12 h1:VnskjWTDM475TnQRhBQE0cNp9D6Y6OELrd4UkD2VVIQ= bitbucket.org/ausocean/utils v1.2.12/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= +bitbucket.org/ausocean/utils v1.2.13 h1:tUaIywtoMc1+zl1GCVQokX4mL5X7LNHX5O51AgAPrWA= +bitbucket.org/ausocean/utils v1.2.13/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= diff --git a/revid/revid.go b/revid/revid.go index 94bd4a6f..dec4ca6d 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -7,6 +7,7 @@ AUTHORS Alan Noble Dan Kortschak Trek Hopton + Scott Barnard LICENSE revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean) @@ -52,6 +53,7 @@ import ( "bitbucket.org/ausocean/av/filter" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/bitrate" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/vring" @@ -115,6 +117,9 @@ type Revid struct { // err will channel errors from revid routines to the handle errors routine. err chan error + + // bitrate is used for bitrate calculations. + bitrate bitrate.Calculator } // New returns a pointer to a new Revid with the desired configuration, and/or @@ -148,10 +153,8 @@ func (r *Revid) handleErrors() { } // Bitrate returns the result of the most recent bitrate check. -// -// TODO: get this working again. func (r *Revid) Bitrate() int { - return -1 + return r.bitrate.Bitrate() } // reset swaps the current config of a Revid with the passed @@ -266,14 +269,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. return fmt.Errorf("could not initialise MTS ring buffer: %w", err) } w = newMtsSender( - newHttpSender(r.ns, r.cfg.Logger.Log), + newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report), r.cfg.Logger.Log, rb, r.cfg.ClipDuration, ) mtsSenders = append(mtsSenders, w) case config.OutputRTP: - w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate) + w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } @@ -295,6 +298,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, + r.bitrate.Report, ) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) diff --git a/revid/senders.go b/revid/senders.go index 4871ddb5..4929bd40 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -61,19 +61,25 @@ const ( type httpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) + report func(sent int) } // newHttpSender returns a pointer to a new httpSender. -func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { +func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender { return &httpSender{ client: ns, log: log, + report: report, } } // Write implements io.Writer. func (s *httpSender) Write(d []byte) (int, error) { - return len(d), httpSend(d, s.client, s.log) + err := httpSend(d, s.client, s.log) + if err == nil { + s.report(len(d)) + } + return len(d), err } func (s *httpSender) Close() error { return nil } @@ -276,9 +282,10 @@ type rtmpSender struct { ring *vring.Buffer done chan struct{} wg sync.WaitGroup + report func(sent int) } -func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -299,6 +306,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log log: log, ring: rb, done: make(chan struct{}), + report: report, } s.wg.Add(1) go s.output() @@ -364,6 +372,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) { if err != nil { s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) } + s.report(len(d)) return len(d), nil } @@ -404,9 +413,10 @@ type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder data []byte + report func(sent int) } -func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { +func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint, report func(sent int)) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err @@ -414,6 +424,7 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ s := &rtpSender{ log: log, encoder: rtp.NewEncoder(conn, int(fps)), + report: report, } return s, nil } @@ -426,6 +437,7 @@ func (s *rtpSender) Write(d []byte) (int, error) { if err != nil { s.log(logger.Warning, pkg+"rtpSender: write error", err.Error()) } + s.report(len(d)) return len(d), nil }