diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 8f71bbb7..053bb0a9 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -26,6 +26,7 @@ package mts import ( "bytes" + "io" "io/ioutil" "testing" @@ -35,25 +36,21 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" ) -type buffer bytes.Buffer +type nopCloser struct{ io.Writer } -func (b *buffer) Write(d []byte) (int, error) { - return (*bytes.Buffer)(b).Write(d) -} - -func (b *buffer) Close() error { return nil } +func (nopCloser) Close() error { return nil } // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. func TestEncodePcm(t *testing.T) { Meta = meta.New() - var buf buffer + var buf bytes.Buffer sampleRate := 48000 sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(&buf, writeFreq, Audio) + e := NewEncoder(&nopCloser{&buf}, writeFreq, Audio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index e29b0639..314e235a 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -47,8 +47,8 @@ const fps = 25 // write this to psi. func TestMetaEncode1(t *testing.T) { Meta = meta.New() - var buf buffer - e := NewEncoder(&buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(&nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -75,8 +75,8 @@ func TestMetaEncode1(t *testing.T) { // into psi. func TestMetaEncode2(t *testing.T) { Meta = meta.New() - var buf buffer - e := NewEncoder(&buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(&nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/revid/revid.go b/revid/revid.go index ea85038a..c9c2bb51 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -52,8 +52,6 @@ import ( const ( rbSize = 1000 rbElementSize = 100000 - wTimeout = 0 * time.Second - rTimeout = 0 * time.Second ) // RTMP connection properties. @@ -120,6 +118,9 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { } // Config returns a copy of revids current config. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Config() Config { return r.config } @@ -207,7 +208,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, wTimeout) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -275,6 +276,9 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Start() error { if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") @@ -291,6 +295,9 @@ func (r *Revid) Start() error { // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Stop() { if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") @@ -313,6 +320,9 @@ func (r *Revid) Stop() { // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Update(vars map[string]string) error { if r.isRunning { r.Stop() diff --git a/revid/senders.go b/revid/senders.go index eb0c6ca4..947f5be1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -192,7 +192,7 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ring.Next(rTimeout) + chunk, err = s.ring.Next(0) switch err { case nil, io.EOF: continue @@ -301,7 +301,7 @@ func (s *rtmpSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ring.Next(rTimeout) + chunk, err = s.ring.Next(0) switch err { case nil, io.EOF: continue diff --git a/revid/senders_test.go b/revid/senders_test.go index 36c3f41a..80293759 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -210,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 - tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) + dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -227,11 +227,11 @@ func TestMtsSenderFailedSend(t *testing.T) { } // Wait until the destination has all the data, then close the sender. - <-tstDst.done + <-dst.done sender.Close() // Check that we have data as expected. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -290,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 - tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) + dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -306,11 +306,11 @@ func TestMtsSenderDiscontinuity(t *testing.T) { } // Wait until the destination has all the data, then close the sender. - <-tstDst.done + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectedCC := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo)