From 74c995d452dfa09242b7a7dd3561ca36efe0553c Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 18 Apr 2019 16:55:48 +0930 Subject: [PATCH] revid: addressing PR feedback --- revid/revid.go | 16 ++++++++-------- revid/revid_test.go | 4 ++-- revid/senders.go | 44 +++++++++++++++++++++---------------------- revid/senders_test.go | 40 +++++++++++++++++++++++++++------------ 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 29514fa6..ea85038a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -95,16 +95,16 @@ type Revid struct { lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error // mwc will hold the multiWriteCloser that writes to encoders from the lexer. - mwc io.WriteCloser - - // wg will be used to wait for any processing routines to finish. - wg sync.WaitGroup + encoders io.WriteCloser // isRunning is used to keep track of revid's running state between methods. isRunning bool // err will channel errors from revid routines to the handle errors routine. err chan error + + // wg will be used to wait for any processing routines to finish. + wg sync.WaitGroup } // New returns a pointer to a new Revid with the desired configuration, and/or @@ -148,7 +148,7 @@ func (r *Revid) Bitrate() int { // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. It then -// sets up the data pipeline accordinging to this configuration. +// sets up the data pipeline accordingly to this configuration. func (r *Revid) reset(config Config) error { err := r.setConfig(config) if err != nil { @@ -251,7 +251,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) encoders = append(encoders, e) } - r.mwc = multiWriter(encoders...) + r.encoders = multiWriter(encoders...) switch r.config.Input { case Raspivid: @@ -298,7 +298,7 @@ func (r *Revid) Stop() { } r.config.Logger.Log(logger.Info, pkg+"closing pipeline") - err := r.mwc.Close() + err := r.encoders.Close() if err != nil { r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error()) } @@ -572,7 +572,7 @@ func (r *Revid) setupInputForFile() error { func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") - r.err <- r.lexTo(r.mwc, read, delay) + r.err <- r.lexTo(r.encoders, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() } diff --git a/revid/revid_test.go b/revid/revid_test.go index 68520bbe..2ee53151 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -208,14 +208,14 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // First check that we have the correct number of encoders. - got := len(rv.mwc.(*dummyMultiWriter).dst) + got := len(rv.encoders.(*dummyMultiWriter).dst) want := len(test.encoders) if got != want { t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) } // Now check the correctness of encoders and their destinations. - for _, e := range rv.mwc.(*dummyMultiWriter).dst { + for _, e := range rv.encoders.(*dummyMultiWriter).dst { // Get e's type. encoderType := fmt.Sprintf("%T", e) diff --git a/revid/senders.go b/revid/senders.go index e9d9a92d..eb0c6ca4 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -155,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - rb *ring.Buffer + ring *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int - quit chan struct{} + done chan struct{} log func(lvl int8, msg string, args ...interface{}) wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - rb: ring.NewBuffer(rbSize, rbElementSize, wTimeout), - quit: make(chan struct{}), + ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout), + done: make(chan struct{}), } s.wg.Add(1) go s.output() @@ -184,15 +184,15 @@ func (s *mtsSender) output() { var chunk *ring.Chunk for { select { - case <-s.quit: - s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + case <-s.done: + s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.rb.Next(rTimeout) + chunk, err = s.ring.Next(rTimeout) switch err { case nil, io.EOF: continue @@ -232,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.rb.Write(s.buf) + _, err := s.ring.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.rb.Flush() + s.ring.Flush() s.buf = s.buf[:0] } return len(d), nil @@ -244,7 +244,7 @@ func (s *mtsSender) Write(d []byte) (int, error) { // Close implements io.Closer. func (s *mtsSender) Close() error { - close(s.quit) + close(s.done) s.wg.Wait() return nil } @@ -256,8 +256,8 @@ type rtmpSender struct { timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - rb *ring.Buffer - quit chan struct{} + ring *ring.Buffer + done chan struct{} wg sync.WaitGroup } @@ -280,8 +280,8 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, - rb: ring.NewBuffer(10, rbElementSize, 0), - quit: make(chan struct{}), + ring: ring.NewBuffer(10, rbElementSize, 0), + done: make(chan struct{}), } s.wg.Add(1) go s.output() @@ -293,15 +293,15 @@ func (s *rtmpSender) output() { var chunk *ring.Chunk for { select { - case <-s.quit: - s.log(logger.Info, pkg+"rtmpSender: got quit signal, terminating output routine") + case <-s.done: + s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.rb.Next(rTimeout) + chunk, err = s.ring.Next(rTimeout) switch err { case nil, io.EOF: continue @@ -340,11 +340,11 @@ func (s *rtmpSender) output() { // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - _, err := s.rb.Write(d) + _, err := s.ring.Write(d) if err != nil { s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error()) } - s.rb.Flush() + s.ring.Flush() return len(d), nil } @@ -365,8 +365,8 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { - if s.quit != nil { - close(s.quit) + if s.done != nil { + close(s.done) } s.wg.Wait() return s.close() diff --git a/revid/senders_test.go b/revid/senders_test.go index f6a2e8cf..36c3f41a 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -48,15 +48,31 @@ var ( // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { - buf [][]byte - testFails bool - failAt int + // Holds the clips written to this destination using Write. + buf [][]byte + + // testFails is set to true if we would like a write to fail at a particular + // clip as determined by failAt. + testFails bool + failAt int + + // Holds the current clip number. currentClip int - t *testing.T - sendDelay time.Duration - delayAt int - done chan struct{} - doneAt int + + // Pointer to the testing.T of a test where this struct is being used. This + // is used so that logging can be done through the testing log utilities. + t *testing.T + + // sendDelay is the amount of time we would like a Write to be delayed when + // we hit the clip number indicated by delayAt. + sendDelay time.Duration + delayAt int + + // done will be used to send a signal to the main routine to indicate that + // the destination has received all clips. doneAt indicates the final clip + // number. + done chan struct{} + doneAt int } func (ts *destination) Write(d []byte) (int, error) { @@ -116,8 +132,8 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 - tstDst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) + dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -133,11 +149,11 @@ func TestMtsSenderSegment(t *testing.T) { } // Wait until the destination has all the data, then close the sender. - <-tstDst.done + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo)