cmd/audio-netsender,device/alsa,revid: swap out ring buffer for pool buffer

Names of config fields have been left unchanged since they appear to be lexically
coupled to behaviour and so require greater care.
This commit is contained in:
Dan Kortschak 2021-05-17 11:03:41 +09:30
parent f27e4abdf8
commit 7541015730
No known key found for this signature in database
GPG Key ID: 45817BAD958E3F0C
6 changed files with 68 additions and 68 deletions

View File

@ -53,7 +53,7 @@ import (
"bitbucket.org/ausocean/iot/pi/sds"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/pool"
)
const (
@ -80,7 +80,7 @@ type audioClient struct {
// internals
dev *yalsa.Device // audio input device
pb pcm.Buffer // Buffer to contain the direct audio from ALSA.
rb *ring.Buffer // Ring buffer to contain processed audio ready to be read.
buf *pool.Buffer // Ring buffer to contain processed audio ready to be read.
ns *netsender.Sender // our NetSender
vs int // our "var sum" to track var changes
}
@ -158,7 +158,7 @@ func main() {
float64(ac.parameters.period),
)
rbLen := rbDuration / ac.period
ac.rb = ring.NewBuffer(int(rbLen), cs, rbTimeout)
ac.buf = pool.NewBuffer(int(rbLen), cs, rbTimeout)
go ac.input()
@ -367,11 +367,11 @@ func (ac *audioClient) input() {
log.Log(logger.Debug, "audio format conversion has been performed where needed")
var n int
n, err = ac.rb.Write(toWrite.Data)
n, err = ac.buf.Write(toWrite.Data)
switch err {
case nil:
log.Log(logger.Debug, "wrote audio to ringbuffer", "length", n)
case ring.ErrDropped:
case pool.ErrDropped:
log.Log(logger.Warning, "dropped audio")
default:
log.Log(logger.Error, "unexpected ringbuffer error", "error", err.Error())
@ -417,7 +417,7 @@ func (ac *audioClient) output() {
pins = netsender.MakePins(ip, "X")
}
} else {
n, err := read(ac.rb, buf)
n, err := read(ac.buf, buf)
if err != nil {
return
}
@ -503,24 +503,24 @@ func (ac *audioClient) output() {
// read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success.
// Any errors returned are unexpected and should be considered fatal.
func read(rb *ring.Buffer, buf []byte) (int, error) {
func read(rb *pool.Buffer, buf []byte) (int, error) {
chunk, err := rb.Next(rbNextTimeout)
switch err {
case nil:
// Do nothing.
case ring.ErrTimeout:
case pool.ErrTimeout:
return 0, nil
case io.EOF:
log.Log(logger.Error, "unexpected EOF from ring.Next")
log.Log(logger.Error, "unexpected EOF from pool.Next")
return 0, io.ErrUnexpectedEOF
default:
log.Log(logger.Error, "unexpected error from ring.Next", "error", err.Error())
log.Log(logger.Error, "unexpected error from pool.Next", "error", err.Error())
return 0, err
}
n, err := io.ReadFull(rb, buf[:chunk.Len()])
if err != nil {
log.Log(logger.Error, "unexpected error from ring.Read", "error", err.Error())
log.Log(logger.Error, "unexpected error from pool.Read", "error", err.Error())
return n, err
}

View File

@ -41,7 +41,7 @@ import (
"bitbucket.org/ausocean/av/device"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/pool"
)
const (
@ -85,7 +85,7 @@ type ALSA struct {
title string // Name of audio title, or empty for the default title.
dev *yalsa.Device // ALSA device's Audio input device.
pb pcm.Buffer // Buffer to contain the direct audio from ALSA.
rb *ring.Buffer // Ring buffer to contain processed audio ready to be read.
buf *pool.Buffer // Ring buffer to contain processed audio ready to be read.
Config // Configuration parameters for this device.
}
@ -169,9 +169,9 @@ func (d *ALSA) Setup(c config.Config) error {
Data: ab.Data,
}
// Create ring buffer with appropriate chunk size.
// Create pool buffer with appropriate chunk size.
cs := d.DataSize()
d.rb = ring.NewBuffer(rbLen, cs, rbTimeout)
d.buf = pool.NewBuffer(rbLen, cs, rbTimeout)
// Start device in paused mode.
d.mode = paused
@ -397,11 +397,11 @@ func (d *ALSA) input() {
toWrite := d.formatBuffer()
// Write audio to ringbuffer.
n, err := d.rb.Write(toWrite.Data)
n, err := d.buf.Write(toWrite.Data)
switch err {
case nil:
d.l.Log(logger.Debug, "wrote audio to ringbuffer", "length", n)
case ring.ErrDropped:
case pool.ErrDropped:
d.l.Log(logger.Warning, "old audio data overwritten")
default:
d.l.Log(logger.Error, "unexpected ringbuffer error", "error", err.Error())
@ -413,13 +413,13 @@ func (d *ALSA) input() {
// Read reads from the ringbuffer, returning the number of bytes read upon success.
func (d *ALSA) Read(p []byte) (int, error) {
// Ready ringbuffer for read.
_, err := d.rb.Next(rbNextTimeout)
_, err := d.buf.Next(rbNextTimeout)
if err != nil {
return 0, err
}
// Read from ring buffer.
return d.rb.Read(p)
// Read from pool buffer.
return d.buf.Read(p)
}
// formatBuffer returns audio that has been converted to the desired format.

View File

@ -236,9 +236,9 @@ type Config struct {
PSITime uint // Sets the time between a packet being sent.
Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera.
RBCapacity uint // The number of bytes the ring buffer will occupy.
RBStartElementSize uint // The starting element size of the ring buffer from which element size will increase to accomodate frames.
RBWriteTimeout uint // The ringbuffer write timeout in seconds.
RBCapacity uint // The number of bytes the pool buffer will occupy.
RBStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames.
RBWriteTimeout uint // The pool buffer write timeout in seconds.
RecPeriod float64 // How many seconds to record at a time.
Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input.
RTMPURL string // RTMPURL specifies the Rtmp output destination URL. This must be defined if RTMP is to be used as an output.

View File

@ -50,7 +50,7 @@ import (
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/pool"
)
// TODO(Saxon): put more thought into error severity and how to handle these.
@ -169,9 +169,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
// will hold senders that require FLV encoding.
var mtsSenders, flvSenders []io.WriteCloser
// Calculate no. of ring buffer elements based on starting element size
// const and config directed max ring buffer size, then create buffer.
// This is only used if the selected output uses a ring buffer.
// Calculate no. of pool buffer elements based on starting element size
// const and config directed max pool buffer size, then create buffer.
// This is only used if the selected output uses a pool buffer.
nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize
writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second
@ -183,9 +183,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
switch out {
case config.OutputHTTP:
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
w = newMTSSender(hs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)
case config.OutputRTP:
@ -204,17 +204,17 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
mtsSenders = append(mtsSenders, w)
case config.OutputFiles:
r.cfg.Logger.Log(logger.Debug, "using Files output")
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true)
if err != nil {
return err
}
w = newMTSSender(fs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
w = newMTSSender(fs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)
case config.OutputRTMP:
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger.Log, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())
}

View File

@ -45,13 +45,13 @@ import (
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/pool"
)
// Log is used by the multiSender.
type Log func(level int8, message string, params ...interface{})
// Sender ring buffer read timeouts.
// Sender pool buffer read timeouts.
const (
rtmpRBReadTimeout = 1 * time.Second
mtsRBReadTimeout = 1 * time.Second
@ -205,7 +205,7 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct {
dst io.WriteCloser
buf []byte
ring *ring.Buffer
pool *pool.Buffer
next []byte
pkt packet.Packet
repairer *mts.DiscontinuityRepairer
@ -218,13 +218,13 @@ type mtsSender struct {
}
// newMtsSender returns a new mtsSender.
func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *pool.Buffer, clipDur time.Duration) *mtsSender {
log(logger.Debug, "setting up mtsSender", "clip duration", int(clipDur))
s := &mtsSender{
dst: dst,
repairer: mts.NewDiscontinuityRepairer(),
log: log,
ring: rb,
pool: rb,
done: make(chan struct{}),
clipDur: clipDur,
}
@ -235,7 +235,7 @@ func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int
// output starts an mtsSender's data handling routine.
func (s *mtsSender) output() {
var chunk *ring.Chunk
var chunk *pool.Chunk
for {
select {
case <-s.done:
@ -246,12 +246,12 @@ func (s *mtsSender) output() {
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.ring.Next(mtsRBReadTimeout)
chunk, err = s.pool.Next(mtsRBReadTimeout)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
s.log(logger.Debug, "mtsSender: ring buffer read timeout")
case pool.ErrTimeout:
s.log(logger.Debug, "mtsSender: pool buffer read timeout")
continue
default:
s.log(logger.Error, "unexpected error", "error", err.Error())
@ -297,19 +297,19 @@ func (s *mtsSender) Write(d []byte) (int, error) {
curDur := time.Now().Sub(s.prev)
s.log(logger.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf))
if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 {
s.log(logger.Debug, "writing clip to ring buffer for sending", "size", len(s.buf))
s.log(logger.Debug, "writing clip to pool buffer for sending", "size", len(s.buf))
s.prev = time.Now()
n, err := s.ring.Write(s.buf)
n, err := s.pool.Write(s.buf)
if err == nil {
s.ring.Flush()
s.pool.Flush()
}
if err != nil {
s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSRBElementSize)
if err == ring.ErrTooLong {
if err == pool.ErrTooLong {
adjustedMTSRBElementSize = len(s.buf) * 2
numElements := maxBuffLen / adjustedMTSRBElementSize
s.ring = ring.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second)
s.log(logger.Info, "adjusted MTS ring buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize)
s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second)
s.log(logger.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize)
}
}
s.buf = s.buf[:0]
@ -332,13 +332,13 @@ type rtmpSender struct {
url string
retries int
log func(lvl int8, msg string, args ...interface{})
ring *ring.Buffer
pool *pool.Buffer
done chan struct{}
wg sync.WaitGroup
report func(sent int)
}
func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
func newRtmpSender(url string, retries int, rb *pool.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
@ -356,7 +356,7 @@ func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8,
url: url,
retries: retries,
log: log,
ring: rb,
pool: rb,
done: make(chan struct{}),
report: report,
}
@ -367,7 +367,7 @@ func newRtmpSender(url string, retries int, rb *ring.Buffer, log func(lvl int8,
// output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() {
var chunk *ring.Chunk
var chunk *pool.Chunk
for {
select {
case <-s.done:
@ -375,15 +375,15 @@ func (s *rtmpSender) output() {
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ring buffer.
// If chunk is nil then we're ready to get another from the pool buffer.
if chunk == nil {
var err error
chunk, err = s.ring.Next(rtmpRBReadTimeout)
chunk, err = s.pool.Next(rtmpRBReadTimeout)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
s.log(logger.Debug, "rtmpSender: ring buffer read timeout")
case pool.ErrTimeout:
s.log(logger.Debug, "rtmpSender: pool buffer read timeout")
continue
default:
s.log(logger.Error, "unexpected error", "error", err.Error())
@ -418,18 +418,18 @@ func (s *rtmpSender) output() {
// Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) {
s.log(logger.Debug, "writing to ring buffer")
_, err := s.ring.Write(d)
s.log(logger.Debug, "writing to pool buffer")
_, err := s.pool.Write(d)
if err == nil {
s.ring.Flush()
s.log(logger.Debug, "good ring buffer write", "len", len(d))
s.pool.Flush()
s.log(logger.Debug, "good pool buffer write", "len", len(d))
} else {
s.log(logger.Warning, "ring buffer write error", "error", err.Error())
if err == ring.ErrTooLong {
s.log(logger.Warning, "pool buffer write error", "error", err.Error())
if err == pool.ErrTooLong {
adjustedRTMPRBElementSize = len(d) * 2
numElements := maxBuffLen / adjustedRTMPRBElementSize
s.ring = ring.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second)
s.log(logger.Info, "adjusted RTMP ring buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize)
s.pool = pool.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second)
s.log(logger.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize)
}
}
s.report(len(d))

View File

@ -38,7 +38,7 @@ import (
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/pool"
)
var (
@ -107,7 +107,7 @@ func TestMTSSenderSegment(t *testing.T) {
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
const testRBCapacity = 50000000
nElements := testRBCapacity / rbStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
@ -188,7 +188,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
const testRBCapacity = 50000000 // 50MB
nElements := testRBCapacity / rbStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
@ -269,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMTSSender(dst, (*testLogger)(t).Log, ring.NewBuffer(1, rbStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, rbStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))