From c0614c3456c60a269bf52f9a4c21c1459ce2cc75 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 9 Apr 2020 15:55:01 +0930 Subject: [PATCH] revid: reverted to fixed element size ring buffer --- revid/config/config.go | 59 +++++++++++++++++++++++++++------------ revid/revid.go | 55 +++++++++++++++++++----------------- revid/senders.go | 63 +++++++++++++++++++++++------------------- revid/senders_test.go | 27 +++--------------- 4 files changed, 110 insertions(+), 94 deletions(-) diff --git a/revid/config/config.go b/revid/config/config.go index fbe51e5d..b235940c 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -84,10 +84,15 @@ const ( defaultPSITime = 2 defaultFileFPS = 0 - // Ring buffer defaults. - defaultRBMaxElements = 10000 - defaultRBCapacity = 200000000 // bytes (200MB) - defaultRBWriteTimeout = 5 + // MTS ring buffer defaults. + defaultMTSRBSize = 100 + defaultMTSRBElementSize = 10000 + defaultMTSRBWriteTimeout = 5 + + // RTMP ring buffer defaults. + defaultRTMPRBSize = 100 + defaultRTMPRBElementSize = 10000 + defaultRTMPRBWriteTimeout = 5 // Motion filter parameter defaults. defaultMinFPS = 1.0 @@ -259,10 +264,15 @@ type Config struct { Filters []int // Defines the methods of filtering to be used in between lexing and encoding. PSITime int // Sets the time between a packet being sent. - // Ring buffer parameters. - RBMaxElements int // The maximum possible number of elements in ring buffer. - RBCapacity int // The total number of bytes available for the ring buffer. - RBWriteTimeout int // The ringbuffer write timeout in seconds. + // RTMP ring buffer parameters. + RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. + RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer. + RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds. + + // MTS ring buffer parameters. + MTSRBSize int // The number of elements in the MTS sender ringbuffer. + MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer. + MTSRBWriteTimeout int // The ringbuffer write timeout in seconds. // Motion filter parameters. // Some parameters can be used with any filter, while others can only be used by a few. @@ -434,19 +444,34 @@ func (c *Config) Validate() error { c.RTPAddress = defaultRtpAddr } - if c.RBMaxElements <= 0 { - c.LogInvalidField("RBMaxElements", defaultRBMaxElements) - c.RBMaxElements = defaultRBMaxElements + if c.RTMPRBSize <= 0 { + c.Logger.Log(logger.Info, "RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize) + c.RTMPRBSize = defaultRTMPRBSize } - if c.RBCapacity <= 0 { - c.LogInvalidField("RBCapacity", defaultRBCapacity) - c.RBCapacity = defaultRBCapacity + if c.RTMPRBElementSize <= 0 { + c.Logger.Log(logger.Info, "RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize) + c.RTMPRBElementSize = defaultRTMPRBElementSize } - if c.RBWriteTimeout <= 0 { - c.LogInvalidField("RBWriteTimeout", defaultRBWriteTimeout) - c.RBWriteTimeout = defaultRBWriteTimeout + if c.RTMPRBWriteTimeout <= 0 { + c.Logger.Log(logger.Info, "RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout) + c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout + } + + if c.MTSRBSize <= 0 { + c.Logger.Log(logger.Info, "MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize) + c.MTSRBSize = defaultMTSRBSize + } + + if c.MTSRBElementSize <= 0 { + c.Logger.Log(logger.Info, "MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize) + c.MTSRBElementSize = defaultMTSRBElementSize + } + + if c.MTSRBWriteTimeout <= 0 { + c.Logger.Log(logger.Info, "MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout) + c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout } if c.PSITime <= 0 { diff --git a/revid/revid.go b/revid/revid.go index 934a772c..fb5eded2 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -55,7 +55,20 @@ import ( "bitbucket.org/ausocean/utils/bitrate" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/vring" + "bitbucket.org/ausocean/utils/ring" +) + +// Ring buffer defaults. +const ( + // MTS ring buffer defaults. + defaultMTSRBSize = 1000 + defaultMTSRBElementSize = 100000 + defaultMTSRBWriteTimeout = 5 + + // RTMP ring buffer defaults. + defaultRTMPRBSize = 1000 + defaultRTMPRBElementSize = 300000 + defaultRTMPRBWriteTimeout = 5 ) // RTMP connection properties. @@ -260,14 +273,10 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch out { case config.OutputHTTP: r.cfg.Logger.Log(logger.Debug, "using HTTP output") - rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second) - if err != nil { - return fmt.Errorf("could not initialise MTS ring buffer: %w", err) - } w = newMtsSender( newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report), r.cfg.Logger.Log, - rb, + ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second), r.cfg.ClipDuration, ) mtsSenders = append(mtsSenders, w) @@ -287,15 +296,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Log(logger.Debug, "using RTMP output") - rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second) - if err != nil { - return fmt.Errorf("could not initialise RTMP ring buffer: %w", err) - } w, err := newRtmpSender( r.cfg.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, - rb, + ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second), r.cfg.Logger.Log, r.bitrate.Report, ) @@ -749,48 +754,48 @@ func (r *Revid) Update(vars map[string]string) error { default: r.cfg.Logger.Log(logger.Warning, "invalid Logging param", "value", value) } - case "RTMPRBMaxElements": + case "RTMPRBSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBMaxElements var", "value", value) + r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBSize var", "value", value) break } - r.cfg.RBMaxElements = v - case "RTMPRBCapacity": + r.cfg.RTMPRBSize = v + case "RTMPRBElementSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBCapacity var", "value", value) + r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBElementSize var", "value", value) break } - r.cfg.RBCapacity = v + r.cfg.RTMPRBElementSize = v case "RTMPRBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBWriteTimeout var", "value", value) break } - r.cfg.RBWriteTimeout = v - case "MTSRBMaxElements": + r.cfg.RTMPRBWriteTimeout = v + case "MTSRBSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid MTSRBMaxElements var", "value", value) + r.cfg.Logger.Log(logger.Warning, "invalid MTSRBSize var", "value", value) break } - r.cfg.RBMaxElements = v - case "MTSRBCapacity": + r.cfg.MTSRBSize = v + case "MTSRBElementSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid MTSRBCapacity var", "value", value) + r.cfg.Logger.Log(logger.Warning, "invalid MTSRBElementSize var", "value", value) break } - r.cfg.RBCapacity = v + r.cfg.MTSRBElementSize = v case "MTSRBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { r.cfg.Logger.Log(logger.Warning, "invalid MTSRBWriteTimeout var", "value", value) break } - r.cfg.RBWriteTimeout = v + r.cfg.MTSRBWriteTimeout = v case "CBR": v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)] if !ok { diff --git a/revid/senders.go b/revid/senders.go index e6bcaeca..0d874f57 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -44,7 +44,7 @@ import ( "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/vring" + "bitbucket.org/ausocean/utils/ring" ) // Log is used by the multiSender. @@ -54,6 +54,7 @@ type Log func(level int8, message string, params ...interface{}) const ( rtmpRBReadTimeout = 1 * time.Second mtsRBReadTimeout = 1 * time.Second + maxBuffLen = 100000000 ) // httpSender provides an implemntation of io.Writer to perform sends to a http @@ -173,7 +174,7 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ring *vring.Buffer + ring *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer @@ -186,7 +187,7 @@ type mtsSender struct { } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *vring.Buffer, clipDur time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), @@ -202,7 +203,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int // output starts an mtsSender's data handling routine. func (s *mtsSender) output() { - var elem *vring.Element + var chunk *ring.Chunk for { select { case <-s.done: @@ -210,29 +211,29 @@ func (s *mtsSender) output() { defer s.wg.Done() return default: - // If elem is nil then we're ready to get another from the ringBuffer. - if elem == nil { + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { var err error - elem, err = s.ring.Next(mtsRBReadTimeout) + chunk, err = s.ring.Next(mtsRBReadTimeout) switch err { case nil, io.EOF: continue - case vring.ErrNextTimeout: - s.log(logger.Debug, "ring buffer read timeout") + case ring.ErrTimeout: + s.log(logger.Debug, "mtsSender: ring buffer read timeout") continue default: s.log(logger.Error, "unexpected error", "error", err.Error()) continue } } - err := s.repairer.Repair(elem.Bytes()) + err := s.repairer.Repair(chunk.Bytes()) if err != nil { - elem.Close() - elem = nil + chunk.Close() + chunk = nil continue } - s.log(logger.Debug, "writing") - _, err = s.dst.Write(elem.Bytes()) + s.log(logger.Debug, "mtsSender: writing") + _, err = s.dst.Write(chunk.Bytes()) if err != nil { s.log(logger.Debug, "failed write, repairing MTS", "error", err) s.repairer.Failed() @@ -240,8 +241,8 @@ func (s *mtsSender) output() { } else { s.log(logger.Debug, "good write") } - elem.Close() - elem = nil + chunk.Close() + chunk = nil } } } @@ -268,6 +269,9 @@ func (s *mtsSender) Write(d []byte) (int, error) { } if err != nil { s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf)) + if err == ring.ErrTooLong { + s.ring = ring.NewBuffer(maxBuffLen/len(d), len(d), 5*time.Second) + } } s.buf = s.buf[:0] } @@ -290,13 +294,13 @@ type rtmpSender struct { timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - ring *vring.Buffer + ring *ring.Buffer done chan struct{} wg sync.WaitGroup report func(sent int) } -func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -326,7 +330,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log // output starts an mtsSender's data handling routine. func (s *rtmpSender) output() { - var elem *vring.Element + var chunk *ring.Chunk for { select { case <-s.done: @@ -334,15 +338,15 @@ func (s *rtmpSender) output() { defer s.wg.Done() return default: - // If elem is nil then we're ready to get another from the ring buffer. - if elem == nil { + // If chunk is nil then we're ready to get another from the ring buffer. + if chunk == nil { var err error - elem, err = s.ring.Next(rtmpRBReadTimeout) + chunk, err = s.ring.Next(rtmpRBReadTimeout) switch err { case nil, io.EOF: continue - case vring.ErrNextTimeout: - s.log(logger.Debug, "ring buffer read timeout") + case ring.ErrTimeout: + s.log(logger.Debug, "rtmpSender: ring buffer read timeout") continue default: s.log(logger.Error, "unexpected error", "error", err.Error()) @@ -357,9 +361,7 @@ func (s *rtmpSender) output() { continue } } - b := elem.Bytes() - s.log(logger.Debug, "writing to conn", "len", len(b)) - _, err := s.conn.Write(b) + _, err := s.conn.Write(chunk.Bytes()) switch err { case nil, rtmp.ErrInvalidFlvTag: s.log(logger.Debug, "good write to conn") @@ -371,8 +373,8 @@ func (s *rtmpSender) output() { } continue } - elem.Close() - elem = nil + chunk.Close() + chunk = nil } } } @@ -386,6 +388,9 @@ func (s *rtmpSender) Write(d []byte) (int, error) { s.log(logger.Debug, "good ring buffer write", "len", len(d)) } else { s.log(logger.Warning, "ring buffer write error", "error", err.Error()) + if err == ring.ErrTooLong { + s.ring = ring.NewBuffer(maxBuffLen/len(d), len(d), 5*time.Second) + } } s.report(len(d)) return len(d), nil diff --git a/revid/senders_test.go b/revid/senders_test.go index 80b18746..0475dfba 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -39,20 +39,13 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/vring" + "bitbucket.org/ausocean/utils/ring" ) var ( errSendFailed = errors.New("send failed") ) -// Ring buffer parameters. -const ( - rbMaxElements = 10000 - rbCapacity = 200000000 // bytes - rbWriteTimeout = 5 -) - // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { @@ -141,11 +134,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 0) - if err != nil { - t.Fatalf("could not initialise ring buffer: %v", err) - } - sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) @@ -224,11 +213,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 10*time.Millisecond) - if err != nil { - t.Fatalf("could not initialise ring buffer: %v", err) - } - sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) @@ -309,11 +294,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - rb, err := vring.NewBuffer(1, rbCapacity, 10*time.Millisecond) - if err != nil { - t.Fatalf("could not initialise ring buffer: %v", err) - } - sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))