revid: addressing PR feedback

This commit is contained in:
Saxon 2019-04-18 16:55:48 +09:30
parent 1c5d3997bb
commit 74c995d452
4 changed files with 60 additions and 44 deletions

View File

@ -95,16 +95,16 @@ type Revid struct {
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
// mwc will hold the multiWriteCloser that writes to encoders from the lexer. // mwc will hold the multiWriteCloser that writes to encoders from the lexer.
mwc io.WriteCloser encoders io.WriteCloser
// wg will be used to wait for any processing routines to finish.
wg sync.WaitGroup
// isRunning is used to keep track of revid's running state between methods. // isRunning is used to keep track of revid's running state between methods.
isRunning bool isRunning bool
// err will channel errors from revid routines to the handle errors routine. // err will channel errors from revid routines to the handle errors routine.
err chan error err chan error
// wg will be used to wait for any processing routines to finish.
wg sync.WaitGroup
} }
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
@ -148,7 +148,7 @@ func (r *Revid) Bitrate() int {
// reset swaps the current config of a Revid with the passed // reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid. It then // configuration; checking validity and returning errors if not valid. It then
// sets up the data pipeline accordinging to this configuration. // sets up the data pipeline accordingly to this configuration.
func (r *Revid) reset(config Config) error { func (r *Revid) reset(config Config) error {
err := r.setConfig(config) err := r.setConfig(config)
if err != nil { if err != nil {
@ -251,7 +251,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int)
encoders = append(encoders, e) encoders = append(encoders, e)
} }
r.mwc = multiWriter(encoders...) r.encoders = multiWriter(encoders...)
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
@ -298,7 +298,7 @@ func (r *Revid) Stop() {
} }
r.config.Logger.Log(logger.Info, pkg+"closing pipeline") r.config.Logger.Log(logger.Info, pkg+"closing pipeline")
err := r.mwc.Close() err := r.encoders.Close()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error())
} }
@ -572,7 +572,7 @@ func (r *Revid) setupInputForFile() error {
func (r *Revid) processFrom(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data") r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.mwc, read, delay) r.err <- r.lexTo(r.encoders, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.wg.Done() r.wg.Done()
} }

View File

@ -208,14 +208,14 @@ func TestResetEncoderSenderSetup(t *testing.T) {
} }
// First check that we have the correct number of encoders. // First check that we have the correct number of encoders.
got := len(rv.mwc.(*dummyMultiWriter).dst) got := len(rv.encoders.(*dummyMultiWriter).dst)
want := len(test.encoders) want := len(test.encoders)
if got != want { if got != want {
t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want)
} }
// Now check the correctness of encoders and their destinations. // Now check the correctness of encoders and their destinations.
for _, e := range rv.mwc.(*dummyMultiWriter).dst { for _, e := range rv.encoders.(*dummyMultiWriter).dst {
// Get e's type. // Get e's type.
encoderType := fmt.Sprintf("%T", e) encoderType := fmt.Sprintf("%T", e)

View File

@ -155,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct { type mtsSender struct {
dst io.WriteCloser dst io.WriteCloser
buf []byte buf []byte
rb *ring.Buffer ring *ring.Buffer
next []byte next []byte
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
curPid int curPid int
quit chan struct{} done chan struct{}
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup wg sync.WaitGroup
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender {
s := &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
log: log, log: log,
rb: ring.NewBuffer(rbSize, rbElementSize, wTimeout), ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout),
quit: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)
go s.output() go s.output()
@ -184,15 +184,15 @@ func (s *mtsSender) output() {
var chunk *ring.Chunk var chunk *ring.Chunk
for { for {
select { select {
case <-s.quit: case <-s.done:
s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine")
defer s.wg.Done() defer s.wg.Done()
return return
default: default:
// If chunk is nil then we're ready to get another from the ringBuffer. // If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil { if chunk == nil {
var err error var err error
chunk, err = s.rb.Next(rTimeout) chunk, err = s.ring.Next(rTimeout)
switch err { switch err {
case nil, io.EOF: case nil, io.EOF:
continue continue
@ -232,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) {
copy(s.pkt[:], bytes) copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID() s.curPid = s.pkt.PID()
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
_, err := s.rb.Write(s.buf) _, err := s.ring.Write(s.buf)
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
} }
s.rb.Flush() s.ring.Flush()
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
return len(d), nil return len(d), nil
@ -244,7 +244,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
// Close implements io.Closer. // Close implements io.Closer.
func (s *mtsSender) Close() error { func (s *mtsSender) Close() error {
close(s.quit) close(s.done)
s.wg.Wait() s.wg.Wait()
return nil return nil
} }
@ -256,8 +256,8 @@ type rtmpSender struct {
timeout uint timeout uint
retries int retries int
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
rb *ring.Buffer ring *ring.Buffer
quit chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -280,8 +280,8 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
rb: ring.NewBuffer(10, rbElementSize, 0), ring: ring.NewBuffer(10, rbElementSize, 0),
quit: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)
go s.output() go s.output()
@ -293,15 +293,15 @@ func (s *rtmpSender) output() {
var chunk *ring.Chunk var chunk *ring.Chunk
for { for {
select { select {
case <-s.quit: case <-s.done:
s.log(logger.Info, pkg+"rtmpSender: got quit signal, terminating output routine") s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine")
defer s.wg.Done() defer s.wg.Done()
return return
default: default:
// If chunk is nil then we're ready to get another from the ringBuffer. // If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil { if chunk == nil {
var err error var err error
chunk, err = s.rb.Next(rTimeout) chunk, err = s.ring.Next(rTimeout)
switch err { switch err {
case nil, io.EOF: case nil, io.EOF:
continue continue
@ -340,11 +340,11 @@ func (s *rtmpSender) output() {
// Write implements io.Writer. // Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) { func (s *rtmpSender) Write(d []byte) (int, error) {
_, err := s.rb.Write(d) _, err := s.ring.Write(d)
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error())
} }
s.rb.Flush() s.ring.Flush()
return len(d), nil return len(d), nil
} }
@ -365,8 +365,8 @@ func (s *rtmpSender) restart() error {
} }
func (s *rtmpSender) Close() error { func (s *rtmpSender) Close() error {
if s.quit != nil { if s.done != nil {
close(s.quit) close(s.done)
} }
s.wg.Wait() s.wg.Wait()
return s.close() return s.close()

View File

@ -48,15 +48,31 @@ var (
// destination simulates a destination for the mtsSender. It allows for the // destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends. // emulation of failed and delayed sends.
type destination struct { type destination struct {
buf [][]byte // Holds the clips written to this destination using Write.
testFails bool buf [][]byte
failAt int
// testFails is set to true if we would like a write to fail at a particular
// clip as determined by failAt.
testFails bool
failAt int
// Holds the current clip number.
currentClip int currentClip int
t *testing.T
sendDelay time.Duration // Pointer to the testing.T of a test where this struct is being used. This
delayAt int // is used so that logging can be done through the testing log utilities.
done chan struct{} t *testing.T
doneAt int
// sendDelay is the amount of time we would like a Write to be delayed when
// we hit the clip number indicated by delayAt.
sendDelay time.Duration
delayAt int
// done will be used to send a signal to the main routine to indicate that
// the destination has received all clips. doneAt indicates the final clip
// number.
done chan struct{}
doneAt int
} }
func (ts *destination) Write(d []byte) (int, error) { func (ts *destination) Write(d []byte) (int, error) {
@ -116,8 +132,8 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11 const numberOfClips = 11
tstDst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
@ -133,11 +149,11 @@ func TestMtsSenderSegment(t *testing.T) {
} }
// Wait until the destination has all the data, then close the sender. // Wait until the destination has all the data, then close the sender.
<-tstDst.done <-dst.done
sender.Close() sender.Close()
// Check the data. // Check the data.
result := tstDst.buf result := dst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)