From 5ecf06e09341797f7838744929d4fee9622a5418 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 8 Apr 2019 19:02:42 +0930 Subject: [PATCH 01/15] revid: Buffered MtsSender The mtsSender now has a ringBuffer and tests have been updated accordingly. The mtsSender now uses an output routine to get data from it's ringBuffer to send. Revid now uses ioext.multiWriteClosers for encoders to write to so that senders can be closed and therefore any output routines. --- revid/revid.go | 37 ++++--- revid/senders.go | 101 ++++++++++++++--- revid/senders_test.go | 244 +++++++++++++++++++++++++++++------------- 3 files changed, 274 insertions(+), 108 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 0828d7da..03c30696 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) @@ -111,6 +112,8 @@ type Revid struct { // encoder holds the required encoders, which then write to destinations. encoder []io.Writer + writeClosers []io.WriteCloser + // bitrate hold the last send bitrate calculation result. bitrate int @@ -178,42 +181,42 @@ func (r *Revid) setConfig(config Config) error { } // setupPipeline constructs a data pipeline. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error { +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) r.encoder = r.encoder[:0] // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []io.Writer + var mtsSenders, flvSenders []io.WriteCloser // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var w io.Writer + var wc io.WriteCloser for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) - mtsSenders = append(mtsSenders, w) + wc = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + mtsSenders = append(mtsSenders, wc) case Rtp: - w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + wc, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } - mtsSenders = append(mtsSenders, w) + mtsSenders = append(mtsSenders, wc) case File: - w, err := newFileSender(r.config.OutputPath) + wc, err := newFileSender(r.config.OutputPath) if err != nil { return err } - mtsSenders = append(mtsSenders, w) + mtsSenders = append(mtsSenders, wc) case Rtmp: - w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + wc, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } - flvSenders = append(flvSenders, w) + flvSenders = append(flvSenders, wc) } } @@ -279,7 +282,7 @@ func (r *Revid) reset(config Config) error { return err } - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter) + err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) if err != nil { return err } @@ -338,15 +341,21 @@ func (r *Revid) Stop() { return } + for _, wc := range r.writeClosers { + err := wc.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) + return + } + } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.setIsRunning(false) - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } r.wg.Wait() + r.setIsRunning(false) } func (r *Revid) Update(vars map[string]string) error { diff --git a/revid/senders.go b/revid/senders.go index b8e73a21..4f779ccf 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,8 @@ import ( "net" "os" "strconv" + "sync" + "time" "github.com/Comcast/gots/packet" @@ -43,6 +45,7 @@ import ( "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" ) // Log is used by the multiSender. @@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) { return len(d), httpSend(d, s.client, s.log) } +func (s *httpSender) Close() error { return nil } + func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false @@ -129,7 +134,7 @@ type fileSender struct { data []byte } -func newFileSender(path string) (io.Writer, error) { +func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err @@ -142,27 +147,37 @@ func (s *fileSender) Write(d []byte) (int, error) { return s.file.Write(d) } -func (s *fileSender) close() error { return s.file.Close() } +func (s *fileSender) Close() error { return s.file.Close() } -// mtsSender implements loadSender and provides sending capability specifically +// mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately -// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// lengthed clips based on PSI. It also accounts for discontinuities by // setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { - dst io.Writer + dst io.WriteCloser buf []byte + ringBuf *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int + quit chan struct{} + log func(lvl int8, msg string, args ...interface{}) + wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender { - return &mtsSender{ +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { + s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), + log: log, + ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), + quit: make(chan struct{}), } + s.wg.Add(1) + go s.output() + return s } // Write implements io.Writer. @@ -176,20 +191,70 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - err := s.repairer.Repair(s.buf) - if err == nil { - _, err = s.dst.Write(s.buf) - if err == nil { - goto done - } + _, err := s.ringBuf.Write(s.buf) + if err != nil { + s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.repairer.Failed() - done: + s.ringBuf.Flush() s.buf = s.buf[:0] } return len(d), nil } +// Close implements io.Closer. +func (s *mtsSender) Close() error { + close(s.quit) + s.wg.Wait() + return nil +} + +// output is a routine start at creation of the mtsSender. It will get data +// from the mtsSenders ringBuffer and attempt to send. +func (s *mtsSender) output() { + var chunk *ring.Chunk +loop: + for { + select { + case <-s.quit: + s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { + var err error + chunk, err = s.ringBuf.Next(readTimeout) + switch err { + case nil: + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) + fallthrough + case io.EOF: + goto loop + } + // If chunk is not nil, then we need to try sending it off. + } else { + err := s.repairer.Repair(chunk.Bytes()) + if err != nil { + chunk.Close() + chunk = nil + continue + } + _, err = s.dst.Write(chunk.Bytes()) + if err != nil { + s.repairer.Failed() + continue + } + chunk.Close() + chunk = nil + } + } + } +} + // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -238,7 +303,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) { } func (s *rtmpSender) restart() error { - s.close() + s.Close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -253,7 +318,7 @@ func (s *rtmpSender) restart() error { return err } -func (s *rtmpSender) close() error { +func (s *rtmpSender) Close() error { if s.conn != nil { return s.conn.Close() } @@ -284,3 +349,5 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ func (s *rtpSender) Write(d []byte) (int, error) { return s.encoder.Write(s.data) } + +func (s *rtpSender) Close() error { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index 95474922..e7fac859 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -30,7 +30,6 @@ package revid import ( "errors" - "fmt" "testing" "time" @@ -40,7 +39,6 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) // Ring buffer sizes and read/write timeouts. @@ -55,20 +53,25 @@ var ( errSendFailed = errors.New("send failed") ) -// sender simulates sending of video data, creating discontinuities if -// testDiscontinuities is set to true. +// destination simulates a destination for the mtsSender. It allows for the +// emulation of failed and delayed sends. type destination struct { - buf [][]byte - testDiscontinuities bool - discontinuityAt int - currentPkt int + buf [][]byte + testFails bool + failAt int + currentPkt int + t *testing.T + sendDelay time.Duration + delayAt int } -// Write implements io.Writer. -// Write takes d and neglects if testDiscontinuities is true, returning an error, -// otherwise d is appended to senders buf. func (ts *destination) Write(d []byte) (int, error) { - if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.t.Log("writing clip to destination") + if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + time.Sleep(ts.sendDelay) + } + if ts.testFails && ts.currentPkt == ts.failAt { + ts.t.Log("failed send") ts.currentPkt++ return 0, errSendFailed } @@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) { return len(d), nil } -// log implements the required logging func for some of the structs in use -// within tests. -func log(lvl int8, msg string, args ...interface{}) { +func (ts *destination) Close() error { return nil } + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl dummyLogger) log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { case logger.Warning: @@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) { for i := 0; i < len(args); i++ { msg += " %v" } - fmt.Printf(msg, args) + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args) } // TestSegment ensures that the mtsSender correctly segments data into clips @@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) { func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer, sender, loadsender and the MPEGTS encoder. - tstDst := &destination{} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + // Create ringBuffer, sender, sender and the MPEGTS encoder. + tstDst := &destination{t: t} + sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Insert a payload so that we check that the segmentation works correctly - // in this regard. Packet number will be used. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf expectData := 0 for clipNo, clip := range result { @@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) { } // Check that the clip data is okay. + t.Log("checking clip data") for i := 0; i < len(clip); i += mts.PacketSize { copy(pkt[:], clip[i:i+mts.PacketSize]) if pkt.PID() == mts.VideoPid { + t.Log("got video PID") payload, err := pkt.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) @@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) { } } +// TestMtsSenderFailedSend checks that a failed send is correctly handled by +// the mtsSender. The mtsSender should try to send the same clip again. +func TestMtsSenderFailedSend(t *testing.T) { + mts.Meta = meta.New() + + // Create destination, the mtsSender and the mtsEncoder + const clipToFailAt = 3 + tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} + sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) + + // Turn time based PSI writing off for encoder and send PSI every 10 packets. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + encoder.Write([]byte{byte(i)}) + } + + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check that we have data as expected. + result := tstDst.buf + expectData := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) + + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } + + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay. + t.Log("checking clip data") + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { + t.Log("got video PID") + payload, err := pkt.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse PES from the MTS payload. + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the PES packet and convert to an int. + data := int8(pes.Data()[0]) + + // Calc expected data in the PES and then check. + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) + } + expectData++ + } + } + } +} + +// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is +// correctly handled by the mtsSender. A discontinuity is caused by overflowing +// the mtsSender's ringBuffer. It is expected that the next clip seen has the +// disconinuity indicator applied. func TestMtsSenderDiscontinuity(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer sender, loadSender and the MPEGTS encoder. - const clipWithDiscontinuity = 3 - tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + // Create destination, the mtsSender and the mtsEncoder. + const clipToDelay = 3 + tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} + sender := newMtsSender(tstDst, dummyLogger(*t).log, 1, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet number. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give mtsSender time to finish up then Close. + time.Sleep(100 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf + expectedCC := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) - // First check that we have less clips as expected. - expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 - gotLen := len(result) - if gotLen != expectedLen { - t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) - } + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } - // Now check that the discontinuity indicator is set at the discontinuityClip PAT. - disconClip := result[clipWithDiscontinuity] - firstPkt := disconClip[:mts.PacketSize] - var pkt packet.Packet - copy(pkt[:], firstPkt) - discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } - if !discon { - t.Fatalf("Did not get discontinuity indicator for PAT") + // Get the discontinuity indicator + discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity() + + // Check the continuity counter. + cc := pkt.ContinuityCounter() + if cc != expectedCC { + t.Log("discontinuity found") + expectedCC = cc + if !discon { + t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo) + } + } else { + if discon && clipNo != 0 { + t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo) + } + } + expectedCC = (expectedCC + 1) & 0xf } } From 66622920d523818acf4b7f7ffb22ebeec922f108 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 8 Apr 2019 19:02:42 +0930 Subject: [PATCH 02/15] revid: Buffered MtsSender The mtsSender now has a ringBuffer and tests have been updated accordingly. The mtsSender now uses an output routine to get data from it's ringBuffer to send. Revid now uses ioext.multiWriteClosers for encoders to write to so that senders can be closed and therefore any output routines. --- revid/revid.go | 37 ++++--- revid/senders.go | 101 ++++++++++++++--- revid/senders_test.go | 244 +++++++++++++++++++++++++++++------------- 3 files changed, 274 insertions(+), 108 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 0828d7da..03c30696 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) @@ -111,6 +112,8 @@ type Revid struct { // encoder holds the required encoders, which then write to destinations. encoder []io.Writer + writeClosers []io.WriteCloser + // bitrate hold the last send bitrate calculation result. bitrate int @@ -178,42 +181,42 @@ func (r *Revid) setConfig(config Config) error { } // setupPipeline constructs a data pipeline. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error { +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) r.encoder = r.encoder[:0] // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []io.Writer + var mtsSenders, flvSenders []io.WriteCloser // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var w io.Writer + var wc io.WriteCloser for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) - mtsSenders = append(mtsSenders, w) + wc = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + mtsSenders = append(mtsSenders, wc) case Rtp: - w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + wc, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } - mtsSenders = append(mtsSenders, w) + mtsSenders = append(mtsSenders, wc) case File: - w, err := newFileSender(r.config.OutputPath) + wc, err := newFileSender(r.config.OutputPath) if err != nil { return err } - mtsSenders = append(mtsSenders, w) + mtsSenders = append(mtsSenders, wc) case Rtmp: - w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + wc, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } - flvSenders = append(flvSenders, w) + flvSenders = append(flvSenders, wc) } } @@ -279,7 +282,7 @@ func (r *Revid) reset(config Config) error { return err } - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter) + err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) if err != nil { return err } @@ -338,15 +341,21 @@ func (r *Revid) Stop() { return } + for _, wc := range r.writeClosers { + err := wc.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) + return + } + } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.setIsRunning(false) - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } r.wg.Wait() + r.setIsRunning(false) } func (r *Revid) Update(vars map[string]string) error { diff --git a/revid/senders.go b/revid/senders.go index a678b5b1..d1268e85 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,8 @@ import ( "net" "os" "strconv" + "sync" + "time" "github.com/Comcast/gots/packet" @@ -43,6 +45,7 @@ import ( "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" ) // Log is used by the multiSender. @@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) { return len(d), httpSend(d, s.client, s.log) } +func (s *httpSender) Close() error { return nil } + func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false @@ -129,7 +134,7 @@ type fileSender struct { data []byte } -func newFileSender(path string) (io.Writer, error) { +func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err @@ -142,27 +147,37 @@ func (s *fileSender) Write(d []byte) (int, error) { return s.file.Write(d) } -func (s *fileSender) close() error { return s.file.Close() } +func (s *fileSender) Close() error { return s.file.Close() } -// mtsSender implements loadSender and provides sending capability specifically +// mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately -// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// lengthed clips based on PSI. It also accounts for discontinuities by // setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { - dst io.Writer + dst io.WriteCloser buf []byte + ringBuf *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int + quit chan struct{} + log func(lvl int8, msg string, args ...interface{}) + wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender { - return &mtsSender{ +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { + s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), + log: log, + ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), + quit: make(chan struct{}), } + s.wg.Add(1) + go s.output() + return s } // Write implements io.Writer. @@ -176,20 +191,70 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - err := s.repairer.Repair(s.buf) - if err == nil { - _, err = s.dst.Write(s.buf) - if err == nil { - goto done - } + _, err := s.ringBuf.Write(s.buf) + if err != nil { + s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.repairer.Failed() - done: + s.ringBuf.Flush() s.buf = s.buf[:0] } return len(d), nil } +// Close implements io.Closer. +func (s *mtsSender) Close() error { + close(s.quit) + s.wg.Wait() + return nil +} + +// output is a routine start at creation of the mtsSender. It will get data +// from the mtsSenders ringBuffer and attempt to send. +func (s *mtsSender) output() { + var chunk *ring.Chunk +loop: + for { + select { + case <-s.quit: + s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { + var err error + chunk, err = s.ringBuf.Next(readTimeout) + switch err { + case nil: + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) + fallthrough + case io.EOF: + goto loop + } + // If chunk is not nil, then we need to try sending it off. + } else { + err := s.repairer.Repair(chunk.Bytes()) + if err != nil { + chunk.Close() + chunk = nil + continue + } + _, err = s.dst.Write(chunk.Bytes()) + if err != nil { + s.repairer.Failed() + continue + } + chunk.Close() + chunk = nil + } + } + } +} + // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -238,7 +303,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) { } func (s *rtmpSender) restart() error { - s.close() + s.Close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -253,7 +318,7 @@ func (s *rtmpSender) restart() error { return err } -func (s *rtmpSender) close() error { +func (s *rtmpSender) Close() error { if s.conn != nil { return s.conn.Close() } @@ -290,3 +355,5 @@ func (s *rtpSender) Write(d []byte) (int, error) { } return len(d), nil } + +func (s *rtpSender) Close() error { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index 95474922..e7fac859 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -30,7 +30,6 @@ package revid import ( "errors" - "fmt" "testing" "time" @@ -40,7 +39,6 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) // Ring buffer sizes and read/write timeouts. @@ -55,20 +53,25 @@ var ( errSendFailed = errors.New("send failed") ) -// sender simulates sending of video data, creating discontinuities if -// testDiscontinuities is set to true. +// destination simulates a destination for the mtsSender. It allows for the +// emulation of failed and delayed sends. type destination struct { - buf [][]byte - testDiscontinuities bool - discontinuityAt int - currentPkt int + buf [][]byte + testFails bool + failAt int + currentPkt int + t *testing.T + sendDelay time.Duration + delayAt int } -// Write implements io.Writer. -// Write takes d and neglects if testDiscontinuities is true, returning an error, -// otherwise d is appended to senders buf. func (ts *destination) Write(d []byte) (int, error) { - if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.t.Log("writing clip to destination") + if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + time.Sleep(ts.sendDelay) + } + if ts.testFails && ts.currentPkt == ts.failAt { + ts.t.Log("failed send") ts.currentPkt++ return 0, errSendFailed } @@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) { return len(d), nil } -// log implements the required logging func for some of the structs in use -// within tests. -func log(lvl int8, msg string, args ...interface{}) { +func (ts *destination) Close() error { return nil } + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl dummyLogger) log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { case logger.Warning: @@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) { for i := 0; i < len(args); i++ { msg += " %v" } - fmt.Printf(msg, args) + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args) } // TestSegment ensures that the mtsSender correctly segments data into clips @@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) { func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer, sender, loadsender and the MPEGTS encoder. - tstDst := &destination{} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + // Create ringBuffer, sender, sender and the MPEGTS encoder. + tstDst := &destination{t: t} + sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Insert a payload so that we check that the segmentation works correctly - // in this regard. Packet number will be used. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf expectData := 0 for clipNo, clip := range result { @@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) { } // Check that the clip data is okay. + t.Log("checking clip data") for i := 0; i < len(clip); i += mts.PacketSize { copy(pkt[:], clip[i:i+mts.PacketSize]) if pkt.PID() == mts.VideoPid { + t.Log("got video PID") payload, err := pkt.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) @@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) { } } +// TestMtsSenderFailedSend checks that a failed send is correctly handled by +// the mtsSender. The mtsSender should try to send the same clip again. +func TestMtsSenderFailedSend(t *testing.T) { + mts.Meta = meta.New() + + // Create destination, the mtsSender and the mtsEncoder + const clipToFailAt = 3 + tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} + sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) + + // Turn time based PSI writing off for encoder and send PSI every 10 packets. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + encoder.Write([]byte{byte(i)}) + } + + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check that we have data as expected. + result := tstDst.buf + expectData := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) + + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } + + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay. + t.Log("checking clip data") + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { + t.Log("got video PID") + payload, err := pkt.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse PES from the MTS payload. + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the PES packet and convert to an int. + data := int8(pes.Data()[0]) + + // Calc expected data in the PES and then check. + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) + } + expectData++ + } + } + } +} + +// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is +// correctly handled by the mtsSender. A discontinuity is caused by overflowing +// the mtsSender's ringBuffer. It is expected that the next clip seen has the +// disconinuity indicator applied. func TestMtsSenderDiscontinuity(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer sender, loadSender and the MPEGTS encoder. - const clipWithDiscontinuity = 3 - tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + // Create destination, the mtsSender and the mtsEncoder. + const clipToDelay = 3 + tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} + sender := newMtsSender(tstDst, dummyLogger(*t).log, 1, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet number. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give mtsSender time to finish up then Close. + time.Sleep(100 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf + expectedCC := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) - // First check that we have less clips as expected. - expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 - gotLen := len(result) - if gotLen != expectedLen { - t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) - } + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } - // Now check that the discontinuity indicator is set at the discontinuityClip PAT. - disconClip := result[clipWithDiscontinuity] - firstPkt := disconClip[:mts.PacketSize] - var pkt packet.Packet - copy(pkt[:], firstPkt) - discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } - if !discon { - t.Fatalf("Did not get discontinuity indicator for PAT") + // Get the discontinuity indicator + discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity() + + // Check the continuity counter. + cc := pkt.ContinuityCounter() + if cc != expectedCC { + t.Log("discontinuity found") + expectedCC = cc + if !discon { + t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo) + } + } else { + if discon && clipNo != 0 { + t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo) + } + } + expectedCC = (expectedCC + 1) & 0xf } } From d3e3904e75ff36b5f4eefaa7c507245b7344ec30 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:11:45 +0930 Subject: [PATCH 03/15] revid: commented Revid.writeClosers --- revid/revid.go | 1 + 1 file changed, 1 insertion(+) diff --git a/revid/revid.go b/revid/revid.go index 03c30696..b5abf168 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -112,6 +112,7 @@ type Revid struct { // encoder holds the required encoders, which then write to destinations. encoder []io.Writer + // writeClosers holds the senders that the encoders will write to. writeClosers []io.WriteCloser // bitrate hold the last send bitrate calculation result. From dd833afe2e7dca1045fd432cdfa3a60a193702ea Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:13:08 +0930 Subject: [PATCH 04/15] revid: updated comment for setupPipeline --- revid/revid.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index b5abf168..ed2d78be 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -181,7 +181,8 @@ func (r *Revid) setConfig(config Config) error { return nil } -// setupPipeline constructs a data pipeline. +// setupPipeline constructs the revid dataPipeline. Inputs, encoders and +// senders are create and linked based on the current revid config. func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) From f546b9daedce83f369016923ea126945c16a0b11 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:15:46 +0930 Subject: [PATCH 05/15] revid: improved mtsSender's output comment and moved closer to call --- revid/senders.go | 59 ++++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index d1268e85..2b65b0e9 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -180,36 +180,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int return s } -// Write implements io.Writer. -func (s *mtsSender) Write(d []byte) (int, error) { - if s.next != nil { - s.buf = append(s.buf, s.next...) - } - bytes := make([]byte, len(d)) - copy(bytes, d) - s.next = bytes - copy(s.pkt[:], bytes) - s.curPid = s.pkt.PID() - if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.ringBuf.Write(s.buf) - if err != nil { - s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) - } - s.ringBuf.Flush() - s.buf = s.buf[:0] - } - return len(d), nil -} - -// Close implements io.Closer. -func (s *mtsSender) Close() error { - close(s.quit) - s.wg.Wait() - return nil -} - -// output is a routine start at creation of the mtsSender. It will get data -// from the mtsSenders ringBuffer and attempt to send. +// output starts an mtsSender's data handling routine. func (s *mtsSender) output() { var chunk *ring.Chunk loop: @@ -255,6 +226,34 @@ loop: } } +// Write implements io.Writer. +func (s *mtsSender) Write(d []byte) (int, error) { + if s.next != nil { + s.buf = append(s.buf, s.next...) + } + bytes := make([]byte, len(d)) + copy(bytes, d) + s.next = bytes + copy(s.pkt[:], bytes) + s.curPid = s.pkt.PID() + if s.curPid == mts.PatPid && len(s.buf) > 0 { + _, err := s.ringBuf.Write(s.buf) + if err != nil { + s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) + } + s.ringBuf.Flush() + s.buf = s.buf[:0] + } + return len(d), nil +} + +// Close implements io.Closer. +func (s *mtsSender) Close() error { + close(s.quit) + s.wg.Wait() + return nil +} + // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn From 3841b8cb5b2342561b02865239e29247650483b9 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:16:51 +0930 Subject: [PATCH 06/15] revid: fixed build error in revid_test.go --- revid/revid_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/revid/revid_test.go b/revid/revid_test.go index 867ded78..ccf0de3a 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -97,10 +97,10 @@ func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. type dummyMultiWriter struct { - dst []io.Writer + dst []io.WriteCloser } -func newDummyMultiWriter(dst ...io.Writer) io.Writer { +func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser { return &dummyMultiWriter{ dst: dst, } @@ -108,6 +108,8 @@ func newDummyMultiWriter(dst ...io.Writer) io.Writer { func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } +func (w *dummyMultiWriter) Close() error { return nil } + // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. func TestResetEncoderSenderSetup(t *testing.T) { From 9a52f19e3d3c4d2051ea95c4690548ad8b1f295d Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:19:28 +0930 Subject: [PATCH 07/15] revid: fixed mtsSender's output routine's logic --- revid/senders.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index 2b65b0e9..d50fc7d8 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -183,7 +183,6 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int // output starts an mtsSender's data handling routine. func (s *mtsSender) output() { var chunk *ring.Chunk -loop: for { select { case <-s.quit: @@ -197,6 +196,7 @@ loop: chunk, err = s.ringBuf.Next(readTimeout) switch err { case nil: + continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue @@ -204,24 +204,22 @@ loop: s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) fallthrough case io.EOF: - goto loop - } - // If chunk is not nil, then we need to try sending it off. - } else { - err := s.repairer.Repair(chunk.Bytes()) - if err != nil { - chunk.Close() - chunk = nil - continue - } - _, err = s.dst.Write(chunk.Bytes()) - if err != nil { - s.repairer.Failed() continue } + } + err := s.repairer.Repair(chunk.Bytes()) + if err != nil { chunk.Close() chunk = nil + continue } + _, err = s.dst.Write(chunk.Bytes()) + if err != nil { + s.repairer.Failed() + continue + } + chunk.Close() + chunk = nil } } } From eb866ada5eb674a04f49bf52119c88f164deef84 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:20:39 +0930 Subject: [PATCH 08/15] revid: restructure rtmpSender's Close method --- revid/senders.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index d50fc7d8..01aa208e 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -316,10 +316,10 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { - if s.conn != nil { - return s.conn.Close() + if s.conn == nil { + return nil } - return nil + return s.conn.Close() } // TODO: Write restart func for rtpSender From 850b45d791a6c698d564ecf55de4fd3dd1dce6d4 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:22:41 +0930 Subject: [PATCH 09/15] revid: temp writeCloser wc in setupPipeline renamed to w --- revid/revid.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index ed2d78be..5ebf64cb 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -195,30 +195,30 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var wc io.WriteCloser + var w io.WriteCloser for _, out := range r.config.Outputs { switch out { case Http: - wc = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) - mtsSenders = append(mtsSenders, wc) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + mtsSenders = append(mtsSenders, w) case Rtp: - wc, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } - mtsSenders = append(mtsSenders, wc) + mtsSenders = append(mtsSenders, w) case File: - wc, err := newFileSender(r.config.OutputPath) + w, err := newFileSender(r.config.OutputPath) if err != nil { return err } - mtsSenders = append(mtsSenders, wc) + mtsSenders = append(mtsSenders, w) case Rtmp: - wc, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } - flvSenders = append(flvSenders, wc) + flvSenders = append(flvSenders, w) } } @@ -343,8 +343,8 @@ func (r *Revid) Stop() { return } - for _, wc := range r.writeClosers { - err := wc.Close() + for _, w := range r.writeClosers { + err := w.Close() if err != nil { r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) return From bab1e62798ebf927b143bdf318998a49ff600db3 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 12:57:28 +0930 Subject: [PATCH 10/15] revid: don't return from revid.Stop() if a close of one of the senders fails --- revid/revid.go | 1 - 1 file changed, 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 5ebf64cb..38cececc 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -347,7 +347,6 @@ func (r *Revid) Stop() { err := w.Close() if err != nil { r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) - return } } r.config.Logger.Log(logger.Info, pkg+"stopping revid") From 02287bfd80dc2eb619ffd269e35d90d2e44876f7 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 10 Apr 2019 14:05:53 +0930 Subject: [PATCH 11/15] revid: added more to setupPipeline comment regarding parameters: --- revid/revid.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 38cececc..ebcef5b0 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -182,7 +182,11 @@ func (r *Revid) setConfig(config Config) error { } // setupPipeline constructs the revid dataPipeline. Inputs, encoders and -// senders are create and linked based on the current revid config. +// senders are created and linked based on the current revid config. +// +// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder +// respectively. multiWriter will be used to create an ioext.multiWriteCloser +// so that encoders can write to multiple senders. func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) From 90a059b37dce4809cf6f813422d006efc50556f3 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 13 Apr 2019 20:02:50 +0930 Subject: [PATCH 12/15] av: now builds --- go.mod | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 453a2f38..1280e1a3 100644 --- a/go.mod +++ b/go.mod @@ -4,21 +4,11 @@ go 1.12 require ( bitbucket.org/ausocean/iot v1.2.4 - bitbucket.org/ausocean/utils v1.2.4 - github.com/BurntSushi/toml v0.3.1 // indirect + bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 - github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect - github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/mewkiz/flac v1.0.5 - github.com/pkg/errors v0.8.1 // indirect - github.com/sergi/go-diff v1.0.0 // indirect - github.com/stretchr/testify v1.3.0 // indirect github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e - go.uber.org/atomic v1.3.2 // indirect - go.uber.org/multierr v1.1.0 // indirect - go.uber.org/zap v1.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/yaml.v2 v2.2.2 // indirect ) From b6199988a3781250dd546bd616b999337121f3f8 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 13 Apr 2019 20:06:16 +0930 Subject: [PATCH 13/15] av: updating go.sum --- go.sum | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/go.sum b/go.sum index 7a21da57..a23bf607 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,34 @@ bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= +bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q= +bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA= +bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA= +bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= +bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= +github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= @@ -29,16 +40,19 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= +github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= From 5cd12bff8a0ac6fba59c9a938ab1f00426bd8539 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 13 Apr 2019 20:41:47 +0930 Subject: [PATCH 14/15] revid: dummyLogger in senders_test.go now uses pointer receiver for log --- revid/senders_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/revid/senders_test.go b/revid/senders_test.go index 76dc2d78..0a09662a 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -87,7 +87,7 @@ func (ts *destination) Close() error { return nil } // dummyLogger will allow logging to be done by the testing pkg. type dummyLogger testing.T -func (dl dummyLogger) log(lvl int8, msg string, args ...interface{}) { +func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { case logger.Warning: @@ -119,7 +119,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. tstDst := &destination{t: t} - sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -197,7 +197,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} - sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -277,7 +277,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} - sender := newMtsSender(tstDst, dummyLogger(*t).log, 1, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. From b347f2e9d1026b842f264c6960a8bcf3f7849501 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 14 Apr 2019 11:13:17 +0930 Subject: [PATCH 15/15] revid: set revid.isRunning to false before waiting, because this is what triggers output routine to be killed. --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index f3d4cf99..d8068115 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -359,8 +359,8 @@ func (r *Revid) Stop() { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } - r.wg.Wait() r.setIsRunning(false) + r.wg.Wait() } func (r *Revid) Update(vars map[string]string) error {