From 5ecf06e09341797f7838744929d4fee9622a5418 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 8 Apr 2019 19:02:42 +0930 Subject: [PATCH] revid: Buffered MtsSender The mtsSender now has a ringBuffer and tests have been updated accordingly. The mtsSender now uses an output routine to get data from it's ringBuffer to send. Revid now uses ioext.multiWriteClosers for encoders to write to so that senders can be closed and therefore any output routines. --- revid/revid.go | 37 ++++--- revid/senders.go | 101 ++++++++++++++--- revid/senders_test.go | 244 +++++++++++++++++++++++++++++------------- 3 files changed, 274 insertions(+), 108 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 0828d7da..03c30696 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) @@ -111,6 +112,8 @@ type Revid struct { // encoder holds the required encoders, which then write to destinations. encoder []io.Writer + writeClosers []io.WriteCloser + // bitrate hold the last send bitrate calculation result. bitrate int @@ -178,42 +181,42 @@ func (r *Revid) setConfig(config Config) error { } // setupPipeline constructs a data pipeline. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error { +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) r.encoder = r.encoder[:0] // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []io.Writer + var mtsSenders, flvSenders []io.WriteCloser // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var w io.Writer + var wc io.WriteCloser for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) - mtsSenders = append(mtsSenders, w) + wc = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + mtsSenders = append(mtsSenders, wc) case Rtp: - w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + wc, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } - mtsSenders = append(mtsSenders, w) + mtsSenders = append(mtsSenders, wc) case File: - w, err := newFileSender(r.config.OutputPath) + wc, err := newFileSender(r.config.OutputPath) if err != nil { return err } - mtsSenders = append(mtsSenders, w) + mtsSenders = append(mtsSenders, wc) case Rtmp: - w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + wc, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } - flvSenders = append(flvSenders, w) + flvSenders = append(flvSenders, wc) } } @@ -279,7 +282,7 @@ func (r *Revid) reset(config Config) error { return err } - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter) + err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) if err != nil { return err } @@ -338,15 +341,21 @@ func (r *Revid) Stop() { return } + for _, wc := range r.writeClosers { + err := wc.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) + return + } + } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.setIsRunning(false) - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } r.wg.Wait() + r.setIsRunning(false) } func (r *Revid) Update(vars map[string]string) error { diff --git a/revid/senders.go b/revid/senders.go index b8e73a21..4f779ccf 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,8 @@ import ( "net" "os" "strconv" + "sync" + "time" "github.com/Comcast/gots/packet" @@ -43,6 +45,7 @@ import ( "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" ) // Log is used by the multiSender. @@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) { return len(d), httpSend(d, s.client, s.log) } +func (s *httpSender) Close() error { return nil } + func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false @@ -129,7 +134,7 @@ type fileSender struct { data []byte } -func newFileSender(path string) (io.Writer, error) { +func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err @@ -142,27 +147,37 @@ func (s *fileSender) Write(d []byte) (int, error) { return s.file.Write(d) } -func (s *fileSender) close() error { return s.file.Close() } +func (s *fileSender) Close() error { return s.file.Close() } -// mtsSender implements loadSender and provides sending capability specifically +// mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately -// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// lengthed clips based on PSI. It also accounts for discontinuities by // setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { - dst io.Writer + dst io.WriteCloser buf []byte + ringBuf *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int + quit chan struct{} + log func(lvl int8, msg string, args ...interface{}) + wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender { - return &mtsSender{ +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { + s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), + log: log, + ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), + quit: make(chan struct{}), } + s.wg.Add(1) + go s.output() + return s } // Write implements io.Writer. @@ -176,20 +191,70 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - err := s.repairer.Repair(s.buf) - if err == nil { - _, err = s.dst.Write(s.buf) - if err == nil { - goto done - } + _, err := s.ringBuf.Write(s.buf) + if err != nil { + s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.repairer.Failed() - done: + s.ringBuf.Flush() s.buf = s.buf[:0] } return len(d), nil } +// Close implements io.Closer. +func (s *mtsSender) Close() error { + close(s.quit) + s.wg.Wait() + return nil +} + +// output is a routine start at creation of the mtsSender. It will get data +// from the mtsSenders ringBuffer and attempt to send. +func (s *mtsSender) output() { + var chunk *ring.Chunk +loop: + for { + select { + case <-s.quit: + s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { + var err error + chunk, err = s.ringBuf.Next(readTimeout) + switch err { + case nil: + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) + fallthrough + case io.EOF: + goto loop + } + // If chunk is not nil, then we need to try sending it off. + } else { + err := s.repairer.Repair(chunk.Bytes()) + if err != nil { + chunk.Close() + chunk = nil + continue + } + _, err = s.dst.Write(chunk.Bytes()) + if err != nil { + s.repairer.Failed() + continue + } + chunk.Close() + chunk = nil + } + } + } +} + // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -238,7 +303,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) { } func (s *rtmpSender) restart() error { - s.close() + s.Close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -253,7 +318,7 @@ func (s *rtmpSender) restart() error { return err } -func (s *rtmpSender) close() error { +func (s *rtmpSender) Close() error { if s.conn != nil { return s.conn.Close() } @@ -284,3 +349,5 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ func (s *rtpSender) Write(d []byte) (int, error) { return s.encoder.Write(s.data) } + +func (s *rtpSender) Close() error { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index 95474922..e7fac859 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -30,7 +30,6 @@ package revid import ( "errors" - "fmt" "testing" "time" @@ -40,7 +39,6 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) // Ring buffer sizes and read/write timeouts. @@ -55,20 +53,25 @@ var ( errSendFailed = errors.New("send failed") ) -// sender simulates sending of video data, creating discontinuities if -// testDiscontinuities is set to true. +// destination simulates a destination for the mtsSender. It allows for the +// emulation of failed and delayed sends. type destination struct { - buf [][]byte - testDiscontinuities bool - discontinuityAt int - currentPkt int + buf [][]byte + testFails bool + failAt int + currentPkt int + t *testing.T + sendDelay time.Duration + delayAt int } -// Write implements io.Writer. -// Write takes d and neglects if testDiscontinuities is true, returning an error, -// otherwise d is appended to senders buf. func (ts *destination) Write(d []byte) (int, error) { - if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.t.Log("writing clip to destination") + if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + time.Sleep(ts.sendDelay) + } + if ts.testFails && ts.currentPkt == ts.failAt { + ts.t.Log("failed send") ts.currentPkt++ return 0, errSendFailed } @@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) { return len(d), nil } -// log implements the required logging func for some of the structs in use -// within tests. -func log(lvl int8, msg string, args ...interface{}) { +func (ts *destination) Close() error { return nil } + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl dummyLogger) log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { case logger.Warning: @@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) { for i := 0; i < len(args); i++ { msg += " %v" } - fmt.Printf(msg, args) + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args) } // TestSegment ensures that the mtsSender correctly segments data into clips @@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) { func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer, sender, loadsender and the MPEGTS encoder. - tstDst := &destination{} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + // Create ringBuffer, sender, sender and the MPEGTS encoder. + tstDst := &destination{t: t} + sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Insert a payload so that we check that the segmentation works correctly - // in this regard. Packet number will be used. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf expectData := 0 for clipNo, clip := range result { @@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) { } // Check that the clip data is okay. + t.Log("checking clip data") for i := 0; i < len(clip); i += mts.PacketSize { copy(pkt[:], clip[i:i+mts.PacketSize]) if pkt.PID() == mts.VideoPid { + t.Log("got video PID") payload, err := pkt.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) @@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) { } } +// TestMtsSenderFailedSend checks that a failed send is correctly handled by +// the mtsSender. The mtsSender should try to send the same clip again. +func TestMtsSenderFailedSend(t *testing.T) { + mts.Meta = meta.New() + + // Create destination, the mtsSender and the mtsEncoder + const clipToFailAt = 3 + tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} + sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) + + // Turn time based PSI writing off for encoder and send PSI every 10 packets. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + encoder.Write([]byte{byte(i)}) + } + + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check that we have data as expected. + result := tstDst.buf + expectData := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) + + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } + + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay. + t.Log("checking clip data") + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { + t.Log("got video PID") + payload, err := pkt.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse PES from the MTS payload. + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the PES packet and convert to an int. + data := int8(pes.Data()[0]) + + // Calc expected data in the PES and then check. + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) + } + expectData++ + } + } + } +} + +// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is +// correctly handled by the mtsSender. A discontinuity is caused by overflowing +// the mtsSender's ringBuffer. It is expected that the next clip seen has the +// disconinuity indicator applied. func TestMtsSenderDiscontinuity(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer sender, loadSender and the MPEGTS encoder. - const clipWithDiscontinuity = 3 - tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + // Create destination, the mtsSender and the mtsEncoder. + const clipToDelay = 3 + tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} + sender := newMtsSender(tstDst, dummyLogger(*t).log, 1, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet number. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give mtsSender time to finish up then Close. + time.Sleep(100 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf + expectedCC := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) - // First check that we have less clips as expected. - expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 - gotLen := len(result) - if gotLen != expectedLen { - t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) - } + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } - // Now check that the discontinuity indicator is set at the discontinuityClip PAT. - disconClip := result[clipWithDiscontinuity] - firstPkt := disconClip[:mts.PacketSize] - var pkt packet.Packet - copy(pkt[:], firstPkt) - discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } - if !discon { - t.Fatalf("Did not get discontinuity indicator for PAT") + // Get the discontinuity indicator + discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity() + + // Check the continuity counter. + cc := pkt.ContinuityCounter() + if cc != expectedCC { + t.Log("discontinuity found") + expectedCC = cc + if !discon { + t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo) + } + } else { + if discon && clipNo != 0 { + t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo) + } + } + expectedCC = (expectedCC + 1) & 0xf } }