diff --git a/revid/config/config.go b/revid/config/config.go index 936e51ce..667faa88 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -86,14 +86,16 @@ const ( defaultAudioInputCodec = codecutil.ADPCM defaultPSITime = 2 + // Ring buffer defaults. + // MTS ring buffer defaults. - defaultMTSRBSize = 100 - defaultMTSRBElementSize = 300000 + defaultMTSRBMaxElements = 10000 + defaultMTSRBCapacity = 200000000 // bytes defaultMTSRBWriteTimeout = 5 // RTMP ring buffer defaults. - defaultRTMPRBSize = 100 - defaultRTMPRBElementSize = 300000 + defaultRTMPRBMaxElements = 10000 + defaultRTMPRBCapacity = 200000000 // bytes defaultRTMPRBWriteTimeout = 5 // Motion filter parameter defaults. @@ -283,13 +285,13 @@ type Config struct { PSITime int // Sets the time between a packet being sent. // RTMP ring buffer parameters. - RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. - RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer. + RTMPRBMaxElements int // The maximum possible number of elements in ring buffer. + RTMPRBCapacity int // The total number of bytes available for the ring buffer. RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds. // MTS ring buffer parameters. - MTSRBSize int // The number of elements in the MTS sender ringbuffer. - MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer. + MTSRBMaxElements int // The maximum possible number of elements in ring buffer. + MTSRBCapacity int // The total number of bytes available for the ring buffer. MTSRBWriteTimeout int // The ringbuffer write timeout in seconds. // Motion filter parameters. @@ -454,14 +456,14 @@ func (c *Config) Validate() error { c.RTPAddress = defaultRtpAddr } - if c.RTMPRBSize <= 0 { - c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize) - c.RTMPRBSize = defaultRTMPRBSize + if c.RTMPRBMaxElements <= 0 { + c.Logger.Log(logger.Info, pkg+"RTMPRBMaxElements bad or unset, defaulting", "RTMPRBMaxElements", defaultRTMPRBMaxElements) + c.RTMPRBMaxElements = defaultRTMPRBMaxElements } - if c.RTMPRBElementSize <= 0 { - c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize) - c.RTMPRBElementSize = defaultRTMPRBElementSize + if c.RTMPRBCapacity <= 0 { + c.Logger.Log(logger.Info, pkg+"RTMPRBCapacity bad or unset, defaulting", "RTMPRBCapacity", defaultRTMPRBCapacity) + c.RTMPRBCapacity = defaultRTMPRBCapacity } if c.RTMPRBWriteTimeout <= 0 { @@ -469,14 +471,14 @@ func (c *Config) Validate() error { c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout } - if c.MTSRBSize <= 0 { - c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize) - c.MTSRBSize = defaultMTSRBSize + if c.MTSRBMaxElements <= 0 { + c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBMaxElements) + c.MTSRBMaxElements = defaultMTSRBMaxElements } - if c.MTSRBElementSize <= 0 { - c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize) - c.MTSRBElementSize = defaultMTSRBElementSize + if c.MTSRBCapacity <= 0 { + c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBCapacity) + c.MTSRBCapacity = defaultMTSRBCapacity } if c.MTSRBWriteTimeout <= 0 { diff --git a/revid/revid.go b/revid/revid.go index ebfdbe8b..12f88f71 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -54,20 +54,7 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" -) - -// Ring buffer defaults. -const ( - // MTS ring buffer defaults. - defaultMTSRBSize = 1000 - defaultMTSRBElementSize = 100000 - defaultMTSRBWriteTimeout = 5 - - // RTMP ring buffer defaults. - defaultRTMPRBSize = 1000 - defaultRTMPRBElementSize = 300000 - defaultRTMPRBWriteTimeout = 5 + "bitbucket.org/ausocean/utils/vring" ) // RTMP connection properties. @@ -274,10 +261,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. for _, out := range r.cfg.Outputs { switch out { case config.OutputHTTP: + rb, err := vring.NewBuffer(r.cfg.MTSRBMaxElements, r.cfg.MTSRBCapacity, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second) + if err != nil { + return fmt.Errorf("could not initialise MTS ring buffer: %w", err) + } w = newMtsSender( newHttpSender(r.ns, r.cfg.Logger.Log), r.cfg.Logger.Log, - ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second), + rb, r.cfg.ClipDuration, ) mtsSenders = append(mtsSenders, w) @@ -294,11 +285,15 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. } mtsSenders = append(mtsSenders, w) case config.OutputRTMP: + rb, err := vring.NewBuffer(r.cfg.RTMPRBMaxElements, r.cfg.RTMPRBCapacity, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second) + if err != nil { + return fmt.Errorf("could not initialise RTMP ring buffer: %w", err) + } w, err := newRtmpSender( r.cfg.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, - ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second), + rb, r.cfg.Logger.Log, ) if err != nil { @@ -705,20 +700,20 @@ func (r *Revid) Update(vars map[string]string) error { default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) } - case "RTMPRBSize": + case "RTMPRBMaxElements": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value) + r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBMaxElements var", "value", value) break } - r.cfg.RTMPRBSize = v - case "RTMPRBElementSize": + r.cfg.RTMPRBMaxElements = v + case "RTMPRBCapacity": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value) + r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBCapacity var", "value", value) break } - r.cfg.RTMPRBElementSize = v + r.cfg.RTMPRBCapacity = v case "RTMPRBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { @@ -726,20 +721,20 @@ func (r *Revid) Update(vars map[string]string) error { break } r.cfg.RTMPRBWriteTimeout = v - case "MTSRBSize": - v, err := strconv.Atoi(value) - if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value) - break - } - r.cfg.MTSRBSize = v case "MTSRBElementSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value) break } - r.cfg.MTSRBElementSize = v + r.cfg.MTSRBMaxElements = v + case "MTSRBCapacity": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBCapacity var", "value", value) + break + } + r.cfg.MTSRBCapacity = v case "MTSRBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { diff --git a/revid/senders.go b/revid/senders.go index 1517a900..4871ddb5 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -44,7 +44,7 @@ import ( "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/vring" ) // Log is used by the multiSender. @@ -161,7 +161,7 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ring *ring.Buffer + ring *vring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer @@ -174,7 +174,7 @@ type mtsSender struct { } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *vring.Buffer, clipDur time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), @@ -190,7 +190,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int // output starts an mtsSender's data handling routine. func (s *mtsSender) output() { - var chunk *ring.Chunk + var elem *vring.Element for { select { case <-s.done: @@ -198,14 +198,14 @@ func (s *mtsSender) output() { defer s.wg.Done() return default: - // If chunk is nil then we're ready to get another from the ringBuffer. - if chunk == nil { + // If elem is nil then we're ready to get another from the ringBuffer. + if elem == nil { var err error - chunk, err = s.ring.Next(mtsRBReadTimeout) + elem, err = s.ring.Next(mtsRBReadTimeout) switch err { case nil, io.EOF: continue - case ring.ErrTimeout: + case vring.ErrNextTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue default: @@ -213,20 +213,20 @@ func (s *mtsSender) output() { continue } } - err := s.repairer.Repair(chunk.Bytes()) + err := s.repairer.Repair(elem.Bytes()) if err != nil { - chunk.Close() - chunk = nil + elem.Close() + elem = nil continue } s.log(logger.Debug, pkg+"mtsSender: writing") - _, err = s.dst.Write(chunk.Bytes()) + _, err = s.dst.Write(elem.Bytes()) if err != nil { s.repairer.Failed() continue } - chunk.Close() - chunk = nil + elem.Close() + elem = nil } } } @@ -273,12 +273,12 @@ type rtmpSender struct { timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - ring *ring.Buffer + ring *vring.Buffer done chan struct{} wg sync.WaitGroup } -func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -307,7 +307,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log f // output starts an mtsSender's data handling routine. func (s *rtmpSender) output() { - var chunk *ring.Chunk + var elem *vring.Element for { select { case <-s.done: @@ -315,14 +315,14 @@ func (s *rtmpSender) output() { defer s.wg.Done() return default: - // If chunk is nil then we're ready to get another from the ring buffer. - if chunk == nil { + // If elem is nil then we're ready to get another from the ring buffer. + if elem == nil { var err error - chunk, err = s.ring.Next(rtmpRBReadTimeout) + elem, err = s.ring.Next(rtmpRBReadTimeout) switch err { case nil, io.EOF: continue - case ring.ErrTimeout: + case vring.ErrNextTimeout: s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") continue default: @@ -338,7 +338,7 @@ func (s *rtmpSender) output() { continue } } - _, err := s.conn.Write(chunk.Bytes()) + _, err := s.conn.Write(elem.Bytes()) switch err { case nil, rtmp.ErrInvalidFlvTag: default: @@ -349,8 +349,8 @@ func (s *rtmpSender) output() { } continue } - chunk.Close() - chunk = nil + elem.Close() + elem = nil } } } diff --git a/revid/senders_test.go b/revid/senders_test.go index 0475dfba..9c972659 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -39,13 +39,25 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/utils/vring" ) var ( errSendFailed = errors.New("send failed") ) +const ( + // MTS ring buffer defaults. + mtsRBMaxElements = 10000 + mtsRBCapacity = 200000000 // bytes + mtsRBWriteTimeout = 5 + + // RTMP ring buffer defaults. + rtmpRBMaxElements = 10000 + rtmpRBCapacity = 200000000 // bytes + rtmpRBWriteTimeout = 5 +) + // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { @@ -134,7 +146,11 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) + rb, err := vring.NewBuffer(mtsRBMaxElements, mtsRBCapacity, 0) + if err != nil { + t.Fatalf("could not initialise ring buffer: %v", err) + } + sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) @@ -213,7 +229,11 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) + rb, err := vring.NewBuffer(mtsRBMaxElements, mtsRBCapacity, 0) + if err != nil { + t.Fatalf("could not initialise ring buffer: %v", err) + } + sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) @@ -294,7 +314,11 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) + rb, err := vring.NewBuffer(1, mtsRBCapacity, 0) + if err != nil { + t.Fatalf("could not initialise ring buffer: %v", err) + } + sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))