From 83407004edd3e828e448af2d99fff07691f5f012 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 7 Aug 2019 17:02:06 +0930 Subject: [PATCH] revid: gave mtsSender and rtmpSender separate ringBuffer sizes, and also increase rtmpSender ringBuffer element size --- revid/revid.go | 12 +++++++++--- revid/senders.go | 2 +- revid/senders_test.go | 6 +++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 410db7a2..c3664e4a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -56,8 +56,14 @@ import ( // mtsSender ringBuffer sizes. const ( - rbSize = 1000 - rbElementSize = 100000 + mtsRBSize = 1000 + mtsRBElementSize = 100000 +) + +// rtmpSender ringBuffer sizes. +const ( + rtmpRBSize = 500 + rtmpRBElementSize = 200000 ) // RTMP connection properties. @@ -234,7 +240,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. for _, out := range r.config.Outputs { switch out { case HTTP: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, mtsRBSize, mtsRBElementSize, 0) mtsSenders = append(mtsSenders, w) case RTP: w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate) diff --git a/revid/senders.go b/revid/senders.go index c9c980c7..ea1f8447 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -280,7 +280,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, - ring: ring.NewBuffer(100, rbElementSize, 0), + ring: ring.NewBuffer(rtmpRBSize, rtmpRBElementSize, 0), done: make(chan struct{}), } s.wg.Add(1) diff --git a/revid/senders_test.go b/revid/senders_test.go index b7f67959..eeba6a2b 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. @@ -211,7 +211,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -291,7 +291,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, mtsRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder.