From 3a076cd9c02bd9e835e2744346dc8e2a0452bb76 Mon Sep 17 00:00:00 2001 From: Saxon Milton Date: Mon, 27 Apr 2020 03:49:23 +0000 Subject: [PATCH] Merged in sort-out-rb-capacity (pull request #397) revid: get rid of output specific ring buffer parameters and use only RBCapacity and RBWriteTimeout Approved-by: Trek Hopton --- revid/config/config.go | 57 +++++++----------------------- revid/revid.go | 78 +++++++++++------------------------------- revid/senders_test.go | 10 ++++-- 3 files changed, 39 insertions(+), 106 deletions(-) diff --git a/revid/config/config.go b/revid/config/config.go index b235940c..67d0127a 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -84,15 +84,9 @@ const ( defaultPSITime = 2 defaultFileFPS = 0 - // MTS ring buffer defaults. - defaultMTSRBSize = 100 - defaultMTSRBElementSize = 10000 - defaultMTSRBWriteTimeout = 5 - - // RTMP ring buffer defaults. - defaultRTMPRBSize = 100 - defaultRTMPRBElementSize = 10000 - defaultRTMPRBWriteTimeout = 5 + // Ring buffer defaults. + defaultRBCapacity = 50000000 // => 50MB + defaultRBWriteTimeout = 5 // Seconds. // Motion filter parameter defaults. defaultMinFPS = 1.0 @@ -264,15 +258,9 @@ type Config struct { Filters []int // Defines the methods of filtering to be used in between lexing and encoding. PSITime int // Sets the time between a packet being sent. - // RTMP ring buffer parameters. - RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. - RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer. - RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds. - - // MTS ring buffer parameters. - MTSRBSize int // The number of elements in the MTS sender ringbuffer. - MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer. - MTSRBWriteTimeout int // The ringbuffer write timeout in seconds. + // Ring buffer parameters. + RBCapacity uint // The number of bytes the ring buffer will occupy. + RBWriteTimeout uint // The ringbuffer write timeout in seconds. // Motion filter parameters. // Some parameters can be used with any filter, while others can only be used by a few. @@ -334,7 +322,6 @@ var TypeData = map[string]string{ "Outputs": "enums:File,Http,Rtmp,Rtp", "Quantization": "uint", "RBCapacity": "uint", - "RBMaxElements": "uint", "RBWriteTimeout": "uint", "Rotation": "uint", "RTMPURL": "string", @@ -444,34 +431,14 @@ func (c *Config) Validate() error { c.RTPAddress = defaultRtpAddr } - if c.RTMPRBSize <= 0 { - c.Logger.Log(logger.Info, "RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize) - c.RTMPRBSize = defaultRTMPRBSize + if c.RBWriteTimeout <= 0 { + c.Logger.Log(logger.Info, "RBWriteTimeout bad or unset, defaulting", "RBWriteTimeout", defaultRBWriteTimeout) + c.RBWriteTimeout = defaultRBWriteTimeout } - if c.RTMPRBElementSize <= 0 { - c.Logger.Log(logger.Info, "RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize) - c.RTMPRBElementSize = defaultRTMPRBElementSize - } - - if c.RTMPRBWriteTimeout <= 0 { - c.Logger.Log(logger.Info, "RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout) - c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout - } - - if c.MTSRBSize <= 0 { - c.Logger.Log(logger.Info, "MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize) - c.MTSRBSize = defaultMTSRBSize - } - - if c.MTSRBElementSize <= 0 { - c.Logger.Log(logger.Info, "MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize) - c.MTSRBElementSize = defaultMTSRBElementSize - } - - if c.MTSRBWriteTimeout <= 0 { - c.Logger.Log(logger.Info, "MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout) - c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout + if c.RBCapacity <= 0 { + c.Logger.Log(logger.Info, "RBCapacity bad or unset, defaulting", "RBCapacity", defaultRBCapacity) + c.RBCapacity = defaultRBCapacity } if c.PSITime <= 0 { diff --git a/revid/revid.go b/revid/revid.go index c5814607..35beadb7 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -58,17 +58,9 @@ import ( "bitbucket.org/ausocean/utils/ring" ) -// Ring buffer defaults. +// Ring buffer parameters. const ( - // MTS ring buffer defaults. - defaultMTSRBSize = 1000 - defaultMTSRBElementSize = 100000 - defaultMTSRBWriteTimeout = 5 - - // RTMP ring buffer defaults. - defaultRTMPRBSize = 1000 - defaultRTMPRBElementSize = 300000 - defaultRTMPRBWriteTimeout = 5 + rbStartingElementSize = 10000 // Bytes. ) // RTMP connection properties. @@ -265,6 +257,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. // will hold senders that require FLV encoding. var mtsSenders, flvSenders []io.WriteCloser + // Calculate no. of ring buffer elements based on starting element size + // const and config directed max ring buffer size, then create buffer. + // This is only used if the selected output uses a ring buffer. + nElements := r.cfg.RBCapacity / rbStartingElementSize + writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second + // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. @@ -273,13 +271,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch out { case config.OutputHTTP: r.cfg.Logger.Log(logger.Debug, "using HTTP output") - w = newMtsSender( - newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report), - r.cfg.Logger.Log, - ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second), - r.cfg.ClipDuration, - ) + rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) + hs := newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) + w = newMtsSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) + case config.OutputRTP: r.cfg.Logger.Log(logger.Debug, "using RTP output") w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) @@ -296,14 +292,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Log(logger.Debug, "using RTMP output") - w, err := newRtmpSender( - r.cfg.RTMPURL, - rtmpConnectionTimeout, - rtmpConnectionMaxTries, - ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second), - r.cfg.Logger.Log, - r.bitrate.Report, - ) + rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) + w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) } @@ -754,48 +744,20 @@ func (r *Revid) Update(vars map[string]string) error { default: r.cfg.Logger.Log(logger.Warning, "invalid Logging param", "value", value) } - case "RTMPRBSize": + case "RBCapacity": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBSize var", "value", value) + r.cfg.Logger.Log(logger.Warning, "invalid RBCapacity var", "value", value) break } - r.cfg.RTMPRBSize = v - case "RTMPRBElementSize": - v, err := strconv.Atoi(value) - if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBElementSize var", "value", value) - break - } - r.cfg.RTMPRBElementSize = v - case "RTMPRBWriteTimeout": + r.cfg.RBCapacity = uint(v) + case "RBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { - r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBWriteTimeout var", "value", value) + r.cfg.Logger.Log(logger.Warning, "invalid RBWriteTimeout var", "value", value) break } - r.cfg.RTMPRBWriteTimeout = v - case "MTSRBSize": - v, err := strconv.Atoi(value) - if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid MTSRBSize var", "value", value) - break - } - r.cfg.MTSRBSize = v - case "MTSRBElementSize": - v, err := strconv.Atoi(value) - if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, "invalid MTSRBElementSize var", "value", value) - break - } - r.cfg.MTSRBElementSize = v - case "MTSRBWriteTimeout": - v, err := strconv.Atoi(value) - if err != nil || v <= 0 { - r.cfg.Logger.Log(logger.Warning, "invalid MTSRBWriteTimeout var", "value", value) - break - } - r.cfg.MTSRBWriteTimeout = v + r.cfg.RBWriteTimeout = uint(v) case "CBR": v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)] if !ok { diff --git a/revid/senders_test.go b/revid/senders_test.go index 0475dfba..b00cb6f4 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -134,7 +134,9 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) + const testRBCapacity = 50000000 + nElements := testRBCapacity / rbStartingElementSize + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) @@ -213,7 +215,9 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) + const testRBCapacity = 50000000 // 50MB + nElements := testRBCapacity / rbStartingElementSize + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) @@ -294,7 +298,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, rbStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))