From d75ea20137537166845223a94f4531b9b4c22f9f Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 10:25:35 +0930 Subject: [PATCH] revid: applying some feedback from last PR --- revid/revid.go | 3 ++- revid/senders.go | 14 ++++++------- revid/senders_test.go | 46 ++++++++++++++++++++++++------------------- 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 61af584e..865bc342 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -142,7 +142,8 @@ func (r *Revid) Bitrate() int { } // reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. +// configuration; checking validity and returning errors if not valid. It then +// sets up the data pipeline accordinging to this configuration. func (r *Revid) reset(config Config) error { err := r.setConfig(config) if err != nil { diff --git a/revid/senders.go b/revid/senders.go index a0777803..4da8cdd5 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -156,7 +156,7 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ringBuf *ring.Buffer + rb *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer @@ -172,7 +172,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), + rb: ring.NewBuffer(rbSize, rbElementSize, wTimeout), quit: make(chan struct{}), } s.wg.Add(1) @@ -193,17 +193,15 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ringBuf.Next(rTimeout) + chunk, err = s.rb.Next(rTimeout) switch err { - case nil: + case nil, io.EOF: continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue default: s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) - fallthrough - case io.EOF: continue } } @@ -235,11 +233,11 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.ringBuf.Write(s.buf) + _, err := s.rb.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.ringBuf.Flush() + s.rb.Flush() s.buf = s.buf[:0] } return len(d), nil diff --git a/revid/senders_test.go b/revid/senders_test.go index 3b32e0e4..f6a2e8cf 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -48,29 +48,34 @@ var ( // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { - buf [][]byte - testFails bool - failAt int - currentPkt int - t *testing.T - sendDelay time.Duration - delayAt int + buf [][]byte + testFails bool + failAt int + currentClip int + t *testing.T + sendDelay time.Duration + delayAt int + done chan struct{} + doneAt int } func (ts *destination) Write(d []byte) (int, error) { ts.t.Log("writing clip to destination") - if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + if ts.delayAt != 0 && ts.currentClip == ts.delayAt { time.Sleep(ts.sendDelay) } - if ts.testFails && ts.currentPkt == ts.failAt { + if ts.testFails && ts.currentClip == ts.failAt { ts.t.Log("failed send") - ts.currentPkt++ + ts.currentClip++ return 0, errSendFailed } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) - ts.currentPkt++ + if ts.currentClip == ts.doneAt { + close(ts.done) + } + ts.currentClip++ return len(d), nil } @@ -110,7 +115,8 @@ func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer, sender, sender and the MPEGTS encoder. - tstDst := &destination{t: t} + const numberOfClips = 11 + tstDst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) @@ -126,8 +132,8 @@ func TestMtsSenderSegment(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-tstDst.done sender.Close() // Check the data. @@ -188,7 +194,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 - tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} + tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) @@ -204,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-tstDst.done sender.Close() // Check that we have data as expected. @@ -268,7 +274,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 - tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} + tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) @@ -283,8 +289,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give mtsSender time to finish up then Close. - time.Sleep(100 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-tstDst.done sender.Close() // Check the data.