diff --git a/go.mod b/go.mod index 453a2f38..1280e1a3 100644 --- a/go.mod +++ b/go.mod @@ -4,21 +4,11 @@ go 1.12 require ( bitbucket.org/ausocean/iot v1.2.4 - bitbucket.org/ausocean/utils v1.2.4 - github.com/BurntSushi/toml v0.3.1 // indirect + bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 - github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect - github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/mewkiz/flac v1.0.5 - github.com/pkg/errors v0.8.1 // indirect - github.com/sergi/go-diff v1.0.0 // indirect - github.com/stretchr/testify v1.3.0 // indirect github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e - go.uber.org/atomic v1.3.2 // indirect - go.uber.org/multierr v1.1.0 // indirect - go.uber.org/zap v1.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index 7a21da57..a23bf607 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,34 @@ bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= +bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q= +bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA= +bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA= +bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= +bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= +github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= @@ -29,16 +40,19 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= +github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= diff --git a/revid/revid.go b/revid/revid.go index 8636dba8..d8068115 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) @@ -111,6 +112,9 @@ type Revid struct { // encoder holds the required encoders, which then write to destinations. encoder []io.Writer + // writeClosers holds the senders that the encoders will write to. + writeClosers []io.WriteCloser + // bitrate hold the last send bitrate calculation result. bitrate int @@ -177,24 +181,29 @@ func (r *Revid) setConfig(config Config) error { return nil } -// setupPipeline constructs a data pipeline. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error { +// setupPipeline constructs the revid dataPipeline. Inputs, encoders and +// senders are created and linked based on the current revid config. +// +// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder +// respectively. multiWriter will be used to create an ioext.multiWriteCloser +// so that encoders can write to multiple senders. +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) r.encoder = r.encoder[:0] // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []io.Writer + var mtsSenders, flvSenders []io.WriteCloser // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var w io.Writer + var w io.WriteCloser for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -279,7 +288,7 @@ func (r *Revid) reset(config Config) error { return err } - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter) + err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) if err != nil { return err } @@ -338,14 +347,19 @@ func (r *Revid) Stop() { return } + for _, w := range r.writeClosers { + err := w.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) + } + } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.setIsRunning(false) - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.setIsRunning(false) r.wg.Wait() } diff --git a/revid/revid_test.go b/revid/revid_test.go index 867ded78..ccf0de3a 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -97,10 +97,10 @@ func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. type dummyMultiWriter struct { - dst []io.Writer + dst []io.WriteCloser } -func newDummyMultiWriter(dst ...io.Writer) io.Writer { +func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser { return &dummyMultiWriter{ dst: dst, } @@ -108,6 +108,8 @@ func newDummyMultiWriter(dst ...io.Writer) io.Writer { func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } +func (w *dummyMultiWriter) Close() error { return nil } + // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. func TestResetEncoderSenderSetup(t *testing.T) { diff --git a/revid/senders.go b/revid/senders.go index a678b5b1..01aa208e 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,8 @@ import ( "net" "os" "strconv" + "sync" + "time" "github.com/Comcast/gots/packet" @@ -43,6 +45,7 @@ import ( "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" ) // Log is used by the multiSender. @@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) { return len(d), httpSend(d, s.client, s.log) } +func (s *httpSender) Close() error { return nil } + func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false @@ -129,7 +134,7 @@ type fileSender struct { data []byte } -func newFileSender(path string) (io.Writer, error) { +func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err @@ -142,26 +147,80 @@ func (s *fileSender) Write(d []byte) (int, error) { return s.file.Write(d) } -func (s *fileSender) close() error { return s.file.Close() } +func (s *fileSender) Close() error { return s.file.Close() } -// mtsSender implements loadSender and provides sending capability specifically +// mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately -// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// lengthed clips based on PSI. It also accounts for discontinuities by // setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { - dst io.Writer + dst io.WriteCloser buf []byte + ringBuf *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int + quit chan struct{} + log func(lvl int8, msg string, args ...interface{}) + wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender { - return &mtsSender{ +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { + s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), + log: log, + ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), + quit: make(chan struct{}), + } + s.wg.Add(1) + go s.output() + return s +} + +// output starts an mtsSender's data handling routine. +func (s *mtsSender) output() { + var chunk *ring.Chunk + for { + select { + case <-s.quit: + s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { + var err error + chunk, err = s.ringBuf.Next(readTimeout) + switch err { + case nil: + continue + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) + fallthrough + case io.EOF: + continue + } + } + err := s.repairer.Repair(chunk.Bytes()) + if err != nil { + chunk.Close() + chunk = nil + continue + } + _, err = s.dst.Write(chunk.Bytes()) + if err != nil { + s.repairer.Failed() + continue + } + chunk.Close() + chunk = nil + } } } @@ -176,20 +235,23 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - err := s.repairer.Repair(s.buf) - if err == nil { - _, err = s.dst.Write(s.buf) - if err == nil { - goto done - } + _, err := s.ringBuf.Write(s.buf) + if err != nil { + s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.repairer.Failed() - done: + s.ringBuf.Flush() s.buf = s.buf[:0] } return len(d), nil } +// Close implements io.Closer. +func (s *mtsSender) Close() error { + close(s.quit) + s.wg.Wait() + return nil +} + // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -238,7 +300,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) { } func (s *rtmpSender) restart() error { - s.close() + s.Close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -253,11 +315,11 @@ func (s *rtmpSender) restart() error { return err } -func (s *rtmpSender) close() error { - if s.conn != nil { - return s.conn.Close() +func (s *rtmpSender) Close() error { + if s.conn == nil { + return nil } - return nil + return s.conn.Close() } // TODO: Write restart func for rtpSender @@ -290,3 +352,5 @@ func (s *rtpSender) Write(d []byte) (int, error) { } return len(d), nil } + +func (s *rtpSender) Close() error { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index aba06455..0a09662a 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -30,7 +30,6 @@ package revid import ( "errors" - "fmt" "testing" "time" @@ -40,7 +39,6 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) // Ring buffer sizes and read/write timeouts. @@ -55,20 +53,25 @@ var ( errSendFailed = errors.New("send failed") ) -// sender simulates sending of video data, creating discontinuities if -// testDiscontinuities is set to true. +// destination simulates a destination for the mtsSender. It allows for the +// emulation of failed and delayed sends. type destination struct { - buf [][]byte - testDiscontinuities bool - discontinuityAt int - currentPkt int + buf [][]byte + testFails bool + failAt int + currentPkt int + t *testing.T + sendDelay time.Duration + delayAt int } -// Write implements io.Writer. -// Write takes d and neglects if testDiscontinuities is true, returning an error, -// otherwise d is appended to senders buf. func (ts *destination) Write(d []byte) (int, error) { - if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.t.Log("writing clip to destination") + if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + time.Sleep(ts.sendDelay) + } + if ts.testFails && ts.currentPkt == ts.failAt { + ts.t.Log("failed send") ts.currentPkt++ return 0, errSendFailed } @@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) { return len(d), nil } -// log implements the required logging func for some of the structs in use -// within tests. -func log(lvl int8, msg string, args ...interface{}) { +func (ts *destination) Close() error { return nil } + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { case logger.Warning: @@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) { for i := 0; i < len(args); i++ { msg += " %v" } - fmt.Printf(msg, args) + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args) } // TestSegment ensures that the mtsSender correctly segments data into clips @@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) { func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer, sender, loadsender and the MPEGTS encoder. - tstDst := &destination{} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video) + // Create ringBuffer, sender, sender and the MPEGTS encoder. + tstDst := &destination{t: t} + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Insert a payload so that we check that the segmentation works correctly - // in this regard. Packet number will be used. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf expectData := 0 for clipNo, clip := range result { @@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) { } // Check that the clip data is okay. + t.Log("checking clip data") for i := 0; i < len(clip); i += mts.PacketSize { copy(pkt[:], clip[i:i+mts.PacketSize]) if pkt.PID() == mts.VideoPid { + t.Log("got video PID") payload, err := pkt.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) @@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) { } } +// TestMtsSenderFailedSend checks that a failed send is correctly handled by +// the mtsSender. The mtsSender should try to send the same clip again. +func TestMtsSenderFailedSend(t *testing.T) { + mts.Meta = meta.New() + + // Create destination, the mtsSender and the mtsEncoder + const clipToFailAt = 3 + tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25, mts.Video) + + // Turn time based PSI writing off for encoder and send PSI every 10 packets. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. + t.Log("writing packets") + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + encoder.Write([]byte{byte(i)}) + } + + // Give the mtsSender some time to finish up and then Close it. + time.Sleep(10 * time.Millisecond) + sender.Close() + + // Check that we have data as expected. + result := tstDst.buf + expectData := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) + + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } + + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay. + t.Log("checking clip data") + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { + t.Log("got video PID") + payload, err := pkt.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse PES from the MTS payload. + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the PES packet and convert to an int. + data := int8(pes.Data()[0]) + + // Calc expected data in the PES and then check. + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) + } + expectData++ + } + } + } +} + +// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is +// correctly handled by the mtsSender. A discontinuity is caused by overflowing +// the mtsSender's ringBuffer. It is expected that the next clip seen has the +// disconinuity indicator applied. func TestMtsSenderDiscontinuity(t *testing.T) { mts.Meta = meta.New() - // Create ringBuffer sender, loadSender and the MPEGTS encoder. - const clipWithDiscontinuity = 3 - tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstDst, log) - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video) + // Create destination, the mtsSender and the mtsEncoder. + const clipToDelay = 3 + tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) + encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) + // Write the packets to the encoder, which will in turn write to the mtsSender. + // Payload will just be packet number. const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet number. encoder.Write([]byte{byte(i)}) - rb.Flush() - - for { - next, err := rb.Next(rTimeout) - if err != nil { - break - } - - _, err = loadSender.Write(next.Bytes()) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - next.Close() - next = nil - } } + // Give mtsSender time to finish up then Close. + time.Sleep(100 * time.Millisecond) + sender.Close() + + // Check the data. result := tstDst.buf + expectedCC := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) - // First check that we have less clips as expected. - expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 - gotLen := len(result) - if gotLen != expectedLen { - t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) - } + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } - // Now check that the discontinuity indicator is set at the discontinuityClip PAT. - disconClip := result[clipWithDiscontinuity] - firstPkt := disconClip[:mts.PacketSize] - var pkt packet.Packet - copy(pkt[:], firstPkt) - discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } - if !discon { - t.Fatalf("Did not get discontinuity indicator for PAT") + // Get the discontinuity indicator + discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity() + + // Check the continuity counter. + cc := pkt.ContinuityCounter() + if cc != expectedCC { + t.Log("discontinuity found") + expectedCC = cc + if !discon { + t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo) + } + } else { + if discon && clipNo != 0 { + t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo) + } + } + expectedCC = (expectedCC + 1) & 0xf } }