mirror of https://bitbucket.org/ausocean/av.git
revid/senders.go,revid.go: Added bitrate calculations to revid.
This commit is contained in:
parent
27681e773f
commit
9a0fa09879
|
@ -7,6 +7,7 @@ AUTHORS
|
|||
Alan Noble <alan@ausocean.org>
|
||||
Dan Kortschak <dan@ausocean.org>
|
||||
Trek Hopton <trek@ausocean.org>
|
||||
Scott Barnard <scott@ausocean.org>
|
||||
|
||||
LICENSE
|
||||
revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean)
|
||||
|
@ -52,6 +53,7 @@ import (
|
|||
"bitbucket.org/ausocean/av/filter"
|
||||
"bitbucket.org/ausocean/av/revid/config"
|
||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||
"bitbucket.org/ausocean/utils/bitrate"
|
||||
"bitbucket.org/ausocean/utils/ioext"
|
||||
"bitbucket.org/ausocean/utils/logger"
|
||||
"bitbucket.org/ausocean/utils/vring"
|
||||
|
@ -115,6 +117,9 @@ type Revid struct {
|
|||
|
||||
// err will channel errors from revid routines to the handle errors routine.
|
||||
err chan error
|
||||
|
||||
// bitrate is used for bitrate calculations.
|
||||
bitrate bitrate.Calculator
|
||||
}
|
||||
|
||||
// New returns a pointer to a new Revid with the desired configuration, and/or
|
||||
|
@ -148,10 +153,8 @@ func (r *Revid) handleErrors() {
|
|||
}
|
||||
|
||||
// Bitrate returns the result of the most recent bitrate check.
|
||||
//
|
||||
// TODO: get this working again.
|
||||
func (r *Revid) Bitrate() int {
|
||||
return -1
|
||||
return r.bitrate.Bitrate()
|
||||
}
|
||||
|
||||
// reset swaps the current config of a Revid with the passed
|
||||
|
@ -266,7 +269,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
|
||||
}
|
||||
w = newMtsSender(
|
||||
newHttpSender(r.ns, r.cfg.Logger.Log),
|
||||
newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report),
|
||||
r.cfg.Logger.Log,
|
||||
rb,
|
||||
r.cfg.ClipDuration,
|
||||
|
@ -295,6 +298,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
rtmpConnectionMaxTries,
|
||||
rb,
|
||||
r.cfg.Logger.Log,
|
||||
r.bitrate.Report,
|
||||
)
|
||||
if err != nil {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
|
||||
|
|
|
@ -59,21 +59,27 @@ const (
|
|||
// httpSender provides an implemntation of io.Writer to perform sends to a http
|
||||
// destination.
|
||||
type httpSender struct {
|
||||
client *netsender.Sender
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
client *netsender.Sender
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
reportSent func(sent int)
|
||||
}
|
||||
|
||||
// newHttpSender returns a pointer to a new httpSender.
|
||||
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
|
||||
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), reportSent func(sent int)) *httpSender {
|
||||
return &httpSender{
|
||||
client: ns,
|
||||
log: log,
|
||||
client: ns,
|
||||
log: log,
|
||||
reportSent: reportSent,
|
||||
}
|
||||
}
|
||||
|
||||
// Write implements io.Writer.
|
||||
func (s *httpSender) Write(d []byte) (int, error) {
|
||||
return len(d), httpSend(d, s.client, s.log)
|
||||
err := httpSend(d, s.client, s.log)
|
||||
if err == nil {
|
||||
s.reportSent(len(d))
|
||||
}
|
||||
return len(d), err
|
||||
}
|
||||
|
||||
func (s *httpSender) Close() error { return nil }
|
||||
|
@ -268,17 +274,18 @@ func (s *mtsSender) Close() error {
|
|||
|
||||
// rtmpSender implements loadSender for a native RTMP destination.
|
||||
type rtmpSender struct {
|
||||
conn *rtmp.Conn
|
||||
url string
|
||||
timeout uint
|
||||
retries int
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
ring *vring.Buffer
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
conn *rtmp.Conn
|
||||
url string
|
||||
timeout uint
|
||||
retries int
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
ring *vring.Buffer
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
reportSent func(sent int)
|
||||
}
|
||||
|
||||
func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
|
||||
func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{}), reportSent func(sent int)) (*rtmpSender, error) {
|
||||
var conn *rtmp.Conn
|
||||
var err error
|
||||
for n := 0; n < retries; n++ {
|
||||
|
@ -292,13 +299,14 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log
|
|||
}
|
||||
}
|
||||
s := &rtmpSender{
|
||||
conn: conn,
|
||||
url: url,
|
||||
timeout: timeout,
|
||||
retries: retries,
|
||||
log: log,
|
||||
ring: rb,
|
||||
done: make(chan struct{}),
|
||||
conn: conn,
|
||||
url: url,
|
||||
timeout: timeout,
|
||||
retries: retries,
|
||||
log: log,
|
||||
ring: rb,
|
||||
done: make(chan struct{}),
|
||||
reportSent: reportSent,
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.output()
|
||||
|
@ -364,6 +372,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
|
|||
if err != nil {
|
||||
s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error())
|
||||
}
|
||||
s.reportSent(len(d))
|
||||
return len(d), nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue