From 7541015730e03eb8729501de6a14a1112623d9ef Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 17 May 2021 11:03:41 +0930 Subject: [PATCH] cmd/audio-netsender,device/alsa,revid: swap out ring buffer for pool buffer Names of config fields have been left unchanged since they appear to be lexically coupled to behaviour and so require greater care. --- cmd/audio-netsender/main.go | 22 ++++++------- device/alsa/alsa.go | 18 +++++------ revid/config/config.go | 6 ++-- revid/pipeline.go | 20 ++++++------ revid/senders.go | 62 ++++++++++++++++++------------------- revid/senders_test.go | 8 ++--- 6 files changed, 68 insertions(+), 68 deletions(-) diff --git a/cmd/audio-netsender/main.go b/cmd/audio-netsender/main.go index fbae6d11..ba74ca72 100644 --- a/cmd/audio-netsender/main.go +++ b/cmd/audio-netsender/main.go @@ -53,7 +53,7 @@ import ( "bitbucket.org/ausocean/iot/pi/sds" "bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/pool" ) const ( @@ -80,7 +80,7 @@ type audioClient struct { // internals dev *yalsa.Device // audio input device pb pcm.Buffer // Buffer to contain the direct audio from ALSA. - rb *ring.Buffer // Ring buffer to contain processed audio ready to be read. + buf *pool.Buffer // Ring buffer to contain processed audio ready to be read. ns *netsender.Sender // our NetSender vs int // our "var sum" to track var changes } @@ -158,7 +158,7 @@ func main() { float64(ac.parameters.period), ) rbLen := rbDuration / ac.period - ac.rb = ring.NewBuffer(int(rbLen), cs, rbTimeout) + ac.buf = pool.NewBuffer(int(rbLen), cs, rbTimeout) go ac.input() @@ -367,11 +367,11 @@ func (ac *audioClient) input() { log.Log(logger.Debug, "audio format conversion has been performed where needed") var n int - n, err = ac.rb.Write(toWrite.Data) + n, err = ac.buf.Write(toWrite.Data) switch err { case nil: log.Log(logger.Debug, "wrote audio to ringbuffer", "length", n) - case ring.ErrDropped: + case pool.ErrDropped: log.Log(logger.Warning, "dropped audio") default: log.Log(logger.Error, "unexpected ringbuffer error", "error", err.Error()) @@ -417,7 +417,7 @@ func (ac *audioClient) output() { pins = netsender.MakePins(ip, "X") } } else { - n, err := read(ac.rb, buf) + n, err := read(ac.buf, buf) if err != nil { return } @@ -503,24 +503,24 @@ func (ac *audioClient) output() { // read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. // Any errors returned are unexpected and should be considered fatal. -func read(rb *ring.Buffer, buf []byte) (int, error) { +func read(rb *pool.Buffer, buf []byte) (int, error) { chunk, err := rb.Next(rbNextTimeout) switch err { case nil: // Do nothing. - case ring.ErrTimeout: + case pool.ErrTimeout: return 0, nil case io.EOF: - log.Log(logger.Error, "unexpected EOF from ring.Next") + log.Log(logger.Error, "unexpected EOF from pool.Next") return 0, io.ErrUnexpectedEOF default: - log.Log(logger.Error, "unexpected error from ring.Next", "error", err.Error()) + log.Log(logger.Error, "unexpected error from pool.Next", "error", err.Error()) return 0, err } n, err := io.ReadFull(rb, buf[:chunk.Len()]) if err != nil { - log.Log(logger.Error, "unexpected error from ring.Read", "error", err.Error()) + log.Log(logger.Error, "unexpected error from pool.Read", "error", err.Error()) return n, err } diff --git a/device/alsa/alsa.go b/device/alsa/alsa.go index eb35d1b7..9a6801ac 100644 --- a/device/alsa/alsa.go +++ b/device/alsa/alsa.go @@ -41,7 +41,7 @@ import ( "bitbucket.org/ausocean/av/device" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/pool" ) const ( @@ -85,7 +85,7 @@ type ALSA struct { title string // Name of audio title, or empty for the default title. dev *yalsa.Device // ALSA device's Audio input device. pb pcm.Buffer // Buffer to contain the direct audio from ALSA. - rb *ring.Buffer // Ring buffer to contain processed audio ready to be read. + buf *pool.Buffer // Ring buffer to contain processed audio ready to be read. Config // Configuration parameters for this device. } @@ -169,9 +169,9 @@ func (d *ALSA) Setup(c config.Config) error { Data: ab.Data, } - // Create ring buffer with appropriate chunk size. + // Create pool buffer with appropriate chunk size. cs := d.DataSize() - d.rb = ring.NewBuffer(rbLen, cs, rbTimeout) + d.buf = pool.NewBuffer(rbLen, cs, rbTimeout) // Start device in paused mode. d.mode = paused @@ -397,11 +397,11 @@ func (d *ALSA) input() { toWrite := d.formatBuffer() // Write audio to ringbuffer. - n, err := d.rb.Write(toWrite.Data) + n, err := d.buf.Write(toWrite.Data) switch err { case nil: d.l.Log(logger.Debug, "wrote audio to ringbuffer", "length", n) - case ring.ErrDropped: + case pool.ErrDropped: d.l.Log(logger.Warning, "old audio data overwritten") default: d.l.Log(logger.Error, "unexpected ringbuffer error", "error", err.Error()) @@ -413,13 +413,13 @@ func (d *ALSA) input() { // Read reads from the ringbuffer, returning the number of bytes read upon success. func (d *ALSA) Read(p []byte) (int, error) { // Ready ringbuffer for read. - _, err := d.rb.Next(rbNextTimeout) + _, err := d.buf.Next(rbNextTimeout) if err != nil { return 0, err } - // Read from ring buffer. - return d.rb.Read(p) + // Read from pool buffer. + return d.buf.Read(p) } // formatBuffer returns audio that has been converted to the desired format. diff --git a/revid/config/config.go b/revid/config/config.go index 2e9c8c07..c6eed123 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -236,9 +236,9 @@ type Config struct { PSITime uint // Sets the time between a packet being sent. Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera. - RBCapacity uint // The number of bytes the ring buffer will occupy. - RBStartElementSize uint // The starting element size of the ring buffer from which element size will increase to accomodate frames. - RBWriteTimeout uint // The ringbuffer write timeout in seconds. + RBCapacity uint // The number of bytes the pool buffer will occupy. + RBStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames. + RBWriteTimeout uint // The pool buffer write timeout in seconds. RecPeriod float64 // How many seconds to record at a time. Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. RTMPURL string // RTMPURL specifies the Rtmp output destination URL. This must be defined if RTMP is to be used as an output. diff --git a/revid/pipeline.go b/revid/pipeline.go index 575516c5..5618a402 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -50,7 +50,7 @@ import ( "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/pool" ) // TODO(Saxon): put more thought into error severity and how to handle these. @@ -169,9 +169,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. // will hold senders that require FLV encoding. var mtsSenders, flvSenders []io.WriteCloser - // Calculate no. of ring buffer elements based on starting element size - // const and config directed max ring buffer size, then create buffer. - // This is only used if the selected output uses a ring buffer. + // Calculate no. of pool buffer elements based on starting element size + // const and config directed max pool buffer size, then create buffer. + // This is only used if the selected output uses a pool buffer. nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second @@ -183,9 +183,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch out { case config.OutputHTTP: r.cfg.Logger.Log(logger.Debug, "using HTTP output") - rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) - w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) + w = newMTSSender(hs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) case config.OutputRTP: @@ -204,17 +204,17 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputFiles: r.cfg.Logger.Log(logger.Debug, "using Files output") - rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true) if err != nil { return err } - w = newMTSSender(fs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) + w = newMTSSender(fs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Log(logger.Debug, "using RTMP output") - rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) - w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report) + pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger.Log, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) } diff --git a/revid/senders.go b/revid/senders.go index 36310389..4a227d22 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,13 +45,13 @@ import ( "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/pool" ) // Log is used by the multiSender. type Log func(level int8, message string, params ...interface{}) -// Sender ring buffer read timeouts. +// Sender pool buffer read timeouts. const ( rtmpRBReadTimeout = 1 * time.Second mtsRBReadTimeout = 1 * time.Second @@ -205,7 +205,7 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ring *ring.Buffer + pool *pool.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer @@ -218,13 +218,13 @@ type mtsSender struct { } // newMtsSender returns a new mtsSender. -func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { +func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *pool.Buffer, clipDur time.Duration) *mtsSender { log(logger.Debug, "setting up mtsSender", "clip duration", int(clipDur)) s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - ring: rb, + pool: rb, done: make(chan struct{}), clipDur: clipDur, } @@ -235,7 +235,7 @@ func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int // output starts an mtsSender's data handling routine. func (s *mtsSender) output() { - var chunk *ring.Chunk + var chunk *pool.Chunk for { select { case <-s.done: @@ -246,12 +246,12 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ring.Next(mtsRBReadTimeout) + chunk, err = s.pool.Next(mtsRBReadTimeout) switch err { case nil, io.EOF: continue - case ring.ErrTimeout: - s.log(logger.Debug, "mtsSender: ring buffer read timeout") + case pool.ErrTimeout: + s.log(logger.Debug, "mtsSender: pool buffer read timeout") continue default: s.log(logger.Error, "unexpected error", "error", err.Error()) @@ -297,19 +297,19 @@ func (s *mtsSender) Write(d []byte) (int, error) { curDur := time.Now().Sub(s.prev) s.log(logger.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf)) if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { - s.log(logger.Debug, "writing clip to ring buffer for sending", "size", len(s.buf)) + s.log(logger.Debug, "writing clip to pool buffer for sending", "size", len(s.buf)) s.prev = time.Now() - n, err := s.ring.Write(s.buf) + n, err := s.pool.Write(s.buf) if err == nil { - s.ring.Flush() + s.pool.Flush() } if err != nil { s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSRBElementSize) - if err == ring.ErrTooLong { + if err == pool.ErrTooLong { adjustedMTSRBElementSize = len(s.buf) * 2 numElements := maxBuffLen / adjustedMTSRBElementSize - s.ring = ring.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second) - s.log(logger.Info, "adjusted MTS ring buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize) + s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second) + s.log(logger.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize) } } s.buf = s.buf[:0] @@ -332,13 +332,13 @@ type rtmpSender struct { url string retries int log func(lvl int8, msg string, args ...interface{}) - ring *ring.Buffer + pool *pool.Buffer done chan struct{} wg sync.WaitGroup report func(sent int) } -func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { +func newRtmpSender(url string, retries int, rb *pool.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -356,7 +356,7 @@ func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8, url: url, retries: retries, log: log, - ring: rb, + pool: rb, done: make(chan struct{}), report: report, } @@ -367,7 +367,7 @@ func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8, // output starts an mtsSender's data handling routine. func (s *rtmpSender) output() { - var chunk *ring.Chunk + var chunk *pool.Chunk for { select { case <-s.done: @@ -375,15 +375,15 @@ func (s *rtmpSender) output() { defer s.wg.Done() return default: - // If chunk is nil then we're ready to get another from the ring buffer. + // If chunk is nil then we're ready to get another from the pool buffer. if chunk == nil { var err error - chunk, err = s.ring.Next(rtmpRBReadTimeout) + chunk, err = s.pool.Next(rtmpRBReadTimeout) switch err { case nil, io.EOF: continue - case ring.ErrTimeout: - s.log(logger.Debug, "rtmpSender: ring buffer read timeout") + case pool.ErrTimeout: + s.log(logger.Debug, "rtmpSender: pool buffer read timeout") continue default: s.log(logger.Error, "unexpected error", "error", err.Error()) @@ -418,18 +418,18 @@ func (s *rtmpSender) output() { // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - s.log(logger.Debug, "writing to ring buffer") - _, err := s.ring.Write(d) + s.log(logger.Debug, "writing to pool buffer") + _, err := s.pool.Write(d) if err == nil { - s.ring.Flush() - s.log(logger.Debug, "good ring buffer write", "len", len(d)) + s.pool.Flush() + s.log(logger.Debug, "good pool buffer write", "len", len(d)) } else { - s.log(logger.Warning, "ring buffer write error", "error", err.Error()) - if err == ring.ErrTooLong { + s.log(logger.Warning, "pool buffer write error", "error", err.Error()) + if err == pool.ErrTooLong { adjustedRTMPRBElementSize = len(d) * 2 numElements := maxBuffLen / adjustedRTMPRBElementSize - s.ring = ring.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second) - s.log(logger.Info, "adjusted RTMP ring buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize) + s.pool = pool.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second) + s.log(logger.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize) } } s.report(len(d)) diff --git a/revid/senders_test.go b/revid/senders_test.go index 5125ad56..e50ecaf9 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -38,7 +38,7 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/pool" ) var ( @@ -107,7 +107,7 @@ func TestMTSSenderSegment(t *testing.T) { dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} const testRBCapacity = 50000000 nElements := testRBCapacity / rbStartingElementSize - sender := newMTSSender(dst, (*testLogger)(t).Log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) @@ -188,7 +188,7 @@ func TestMtsSenderFailedSend(t *testing.T) { dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} const testRBCapacity = 50000000 // 50MB nElements := testRBCapacity / rbStartingElementSize - sender := newMTSSender(dst, (*testLogger)(t).Log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) @@ -269,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMTSSender(dst, (*testLogger)(t).Log, ring.NewBuffer(1, rbStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, rbStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))