From f59879b51ddff25c511bc8109f9459190c5d89db Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 08:42:56 +0930 Subject: [PATCH 01/28] revid: removed ringBuffer after lexer Now that we want buffered senders (as required), the ringBuffer that was after the lexer has been removed. Instead, we now have an ioext.multiWriterCloser to which the lexer writes to. This then writes to the encoders, and then encoders write to each of their own multiWriteClosers, which write to the appropriate senders. We now call close on the first multiWriteCloser to close down the entired pipeline, as this close call propogates through each level. We have removed the outputClips routine as it's not required anymore to get data from the revid ringBuffer, and have removed other things that were used by this, like the IsRunning function. We have also updated tests to work with these changes - they are passing. --- container/flv/encoder.go | 10 +- container/mts/audio_test.go | 12 +- container/mts/encoder.go | 8 +- container/mts/metaEncode_test.go | 14 +- revid/revid.go | 246 +++++++++---------------------- revid/revid_test.go | 56 +++---- revid/senders.go | 2 +- revid/senders_test.go | 14 +- 8 files changed, 125 insertions(+), 237 deletions(-) diff --git a/container/flv/encoder.go b/container/flv/encoder.go index 0fe794d2..306d4b66 100644 --- a/container/flv/encoder.go +++ b/container/flv/encoder.go @@ -55,8 +55,7 @@ var ( // Encoder provides properties required for the generation of flv video // from raw video data type Encoder struct { - dst io.Writer - + dst io.WriteCloser fps int audio bool video bool @@ -64,7 +63,7 @@ type Encoder struct { } // NewEncoder retuns a new FLV encoder. -func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { +func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) { e := Encoder{ dst: dst, fps: fps, @@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) { return len(frame), nil } + +// Close will close the encoder destination. +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 23ba16e6..9c1b8f03 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -35,12 +35,20 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" ) +type buffer bytes.Buffer + +func (b *buffer) Write(d []byte) (int, error) { + return b.Write(d) +} + +func (b *buffer) Close() error { return nil } + // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. func TestEncodePcm(t *testing.T) { Meta = meta.New() - var buf bytes.Buffer + var buf buffer sampleRate := 48000 sampleSize := 2 blockSize := 16000 @@ -69,7 +77,7 @@ func TestEncodePcm(t *testing.T) { } } } - clip := buf.Bytes() + clip := (*bytes.Buffer)(&buf).Bytes() // Get the first MTS packet to check var pkt packet.Packet diff --git a/container/mts/encoder.go b/container/mts/encoder.go index e72cfebf..552000e0 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -127,7 +127,7 @@ const ( // Encoder encapsulates properties of an mpegts generator. type Encoder struct { - dst io.Writer + dst io.WriteCloser clock time.Duration lastTime time.Time @@ -149,7 +149,7 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { +func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { var mPid int var sid byte switch mediaType { @@ -327,3 +327,7 @@ func updateMeta(b []byte) ([]byte, error) { err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) return []byte(p), err } + +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 2bf94d64..e29b0639 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -47,14 +47,13 @@ const fps = 25 // write this to psi. func TestMetaEncode1(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf buffer + e := NewEncoder(&buf, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) } - out := buf.Bytes() + out := (*bytes.Buffer)(&buf).Bytes() got := out[PacketSize+4:] want := []byte{ @@ -76,15 +75,14 @@ func TestMetaEncode1(t *testing.T) { // into psi. func TestMetaEncode2(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf buffer + e := NewEncoder(&buf, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) } - out := buf.Bytes() + out := (*bytes.Buffer)(&buf).Bytes() got := out[PacketSize+4:] want := []byte{ 0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24, diff --git a/revid/revid.go b/revid/revid.go index d8068115..61af584e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -8,6 +8,7 @@ DESCRIPTION AUTHORS Saxon A. Nelson-Milton Alan Noble + Dan Kortschak LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) @@ -26,7 +27,6 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( @@ -46,15 +46,14 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -// Ring buffer sizes and read/write timeouts. +// mtsSender ringBuffer sizes. const ( - ringBufferSize = 1000 - ringBufferElementSize = 100000 - writeTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond + rbSize = 1000 + rbElementSize = 100000 + wTimeout = 1 * time.Second + rTimeout = 1 * time.Second ) // RTMP connection properties. @@ -63,17 +62,6 @@ const ( rtmpConnectionTimeout = 10 ) -// Duration of video for each clip sent out. -const clipDuration = 1 * time.Second - -// Time duration between bitrate checks. -const bitrateTime = 1 * time.Minute - -// After a send fail, this is the delay before another send. -const sendFailedDelay = 5 * time.Millisecond - -const ffmpegPath = "/usr/local/bin/ffmpeg" - const pkg = "revid:" type Logger interface { @@ -89,6 +77,7 @@ type Revid struct { // FIXME(kortschak): The relationship of concerns // in config/ns is weird. config Config + // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender @@ -105,40 +94,19 @@ type Revid struct { // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error - // buffer handles passing frames from the transcoder - // to the target destination. - buffer *buffer - - // encoder holds the required encoders, which then write to destinations. - encoder []io.Writer - - // writeClosers holds the senders that the encoders will write to. - writeClosers []io.WriteCloser - - // bitrate hold the last send bitrate calculation result. - bitrate int - - mu sync.Mutex - isRunning bool + // mwc will hold the multiWriteCloser that writes to encoders from the lexer. + mwc io.WriteCloser + // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup + // isRunning is used to keep track of revid's running state between methods. + isRunning bool + + // err will channel errors from revid routines to the handle errors routine. err chan error } -// buffer is a wrapper for a ring.Buffer and provides function to write and -// flush in one Write call. -type buffer ring.Buffer - -// Write implements the io.Writer interface. It will write to the underlying -// ring.Buffer and then flush to indicate a complete ring.Buffer write. -func (b *buffer) Write(d []byte) (int, error) { - r := (*ring.Buffer)(b) - n, err := r.Write(d) - r.Flush() - return n, err -} - // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -167,10 +135,40 @@ func (r *Revid) handleErrors() { } // Bitrate returns the result of the most recent bitrate check. +// +// TODO: get this working again. func (r *Revid) Bitrate() int { - return r.bitrate + return -1 } +// reset swaps the current config of a Revid with the passed +// configuration; checking validity and returning errors if not valid. +func (r *Revid) reset(config Config) error { + err := r.setConfig(config) + if err != nil { + return err + } + + err = r.setupPipeline( + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + e := mts.NewEncoder(dst, float64(fps), mts.Video) + return e, nil + }, + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + return flv.NewEncoder(dst, true, true, fps) + }, + ioext.MultiWriteCloser, + ) + + if err != nil { + return err + } + + return nil +} + +// setConfig takes a config, checks it's validity and then replaces the current +// revid config. func (r *Revid) setConfig(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) @@ -187,10 +185,10 @@ func (r *Revid) setConfig(config Config) error { // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { - r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) - - r.encoder = r.encoder[:0] +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { + // encoders will hold the encoders that are required for revid's current + // configuration. + var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. @@ -203,7 +201,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, wTimeout) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -232,7 +230,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W if len(mtsSenders) != 0 { mw := multiWriter(mtsSenders...) e, _ := mtsEnc(mw, int(r.config.FrameRate)) - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV @@ -244,9 +242,11 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W if err != nil { return err } - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } + r.mwc = multiWriter(encoders...) + switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid @@ -267,72 +267,15 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W return nil } -func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - e := mts.NewEncoder(dst, float64(fps), mts.Video) - return e, nil -} - -func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - e, err := flv.NewEncoder(dst, true, true, fps) - if err != nil { - return nil, err - } - return e, nil -} - -// reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. -func (r *Revid) reset(config Config) error { - err := r.setConfig(config) - if err != nil { - return err - } - - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) - if err != nil { - return err - } - - return nil -} - -// IsRunning returns true if revid is running. -func (r *Revid) IsRunning() bool { - r.mu.Lock() - ret := r.isRunning - r.mu.Unlock() - return ret -} - -func (r *Revid) Config() Config { - r.mu.Lock() - cfg := r.config - r.mu.Unlock() - return cfg -} - -// setIsRunning sets r.isRunning using b. -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.IsRunning() { + if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") return nil } r.config.Logger.Log(logger.Info, pkg+"starting Revid") - // TODO: this doesn't need to be here - r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.setIsRunning(true) - r.config.Logger.Log(logger.Info, pkg+"starting output routine") - r.wg.Add(1) - go r.outputClips() - r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") + r.isRunning = true err := r.setupInput() if err != nil { r.Stop() @@ -340,33 +283,35 @@ func (r *Revid) Start() error { return err } -// Stop halts any processing of video data from a camera or file +// Stop closes down the pipeline. This closes encoders and sender output routines, +// connections, and/or files. func (r *Revid) Stop() { - if !r.IsRunning() { + if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") return } - for _, w := range r.writeClosers { - err := w.Close() - if err != nil { - r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) - } + r.config.Logger.Log(logger.Info, pkg+"closing pipeline") + err := r.mwc.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") - // If a cmd process is running, we kill! + if r.cmd != nil && r.cmd.Process != nil { + r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } - r.setIsRunning(false) r.wg.Wait() + r.isRunning = false } +// Update takes a map of variables and their values and edits the current config +// if the variables are recognised as valid parameters. func (r *Revid) Update(vars map[string]string) error { - if r.IsRunning() { + if r.isRunning { r.Stop() } + //look through the vars and update revid where needed for key, value := range vars { switch key { @@ -497,55 +442,6 @@ func (r *Revid) Update(vars map[string]string) error { return r.reset(r.config) } -// outputClips takes the clips produced in the packClips method and outputs them -// to the desired output defined in the revid config -func (r *Revid) outputClips() { - defer r.wg.Done() - lastTime := time.Now() - var count int -loop: - for r.IsRunning() { - // If the ring buffer has something we can read and send off - chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout) - switch err { - case nil: - // Do nothing. - case ring.ErrTimeout: - r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout") - continue - default: - r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error()) - fallthrough - case io.EOF: - break loop - } - - // Loop over encoders and hand bytes over to each one. - for _, e := range r.encoder { - _, err := chunk.WriteTo(e) - if err != nil { - r.err <- err - } - } - - // Release the chunk back to the ring buffer. - chunk.Close() - - // FIXME(saxon): this doesn't work anymore. - now := time.Now() - deltaTime := now.Sub(lastTime) - if deltaTime > bitrateTime { - // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. - r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) - r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) - r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len()) - lastTime = now - count = 0 - } - } - r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore") -} - // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() error { @@ -670,7 +566,7 @@ func (r *Revid) setupInputForFile() error { func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") - r.err <- r.lexTo(r.buffer, read, delay) + r.err <- r.lexTo(r.mwc, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() } diff --git a/revid/revid_test.go b/revid/revid_test.go index ccf0de3a..68520bbe 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -47,11 +47,8 @@ func TestRaspivid(t *testing.T) { // testLogger implements a netsender.Logger. type testLogger struct{} -// SetLevel normally sets the logging level, but it is a no-op in our case. -func (tl *testLogger) SetLevel(level int8) { -} +func (tl *testLogger) SetLevel(level int8) {} -// Log requests the Logger to write a message at the given level. func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} if level < -1 || level > 5 { @@ -70,45 +67,24 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. -type tstMtsEncoder struct { - dst io.Writer -} +type tstMtsEncoder struct{ dst io.WriteCloser } -// newTstMtsEncoder returns a pointer to a newTsMtsEncoder. -func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstMtsEncoder{dst: dst}, nil -} - -func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil } +func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstMtsEncoder) Close() error { return nil } // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. -type tstFlvEncoder struct { - dst io.Writer -} - -// newTstFlvEncoder returns a pointer to a new tstFlvEncoder. -func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstFlvEncoder{dst: dst}, nil -} +type tstFlvEncoder struct{ dst io.WriteCloser } func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstFlvEncoder) Close() error { return nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. -type dummyMultiWriter struct { - dst []io.WriteCloser -} - -func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser { - return &dummyMultiWriter{ - dst: dst, - } -} +type dummyMultiWriter struct{ dst []io.WriteCloser } func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } - -func (w *dummyMultiWriter) Close() error { return nil } +func (w *dummyMultiWriter) Close() error { return nil } // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. @@ -216,20 +192,30 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // This logic is what we want to check. - err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) + err = rv.setupPipeline( + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstMtsEncoder{dst: dst}, nil + }, + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstFlvEncoder{dst: dst}, nil + }, + func(writers ...io.WriteCloser) io.WriteCloser { + return &dummyMultiWriter{dst: writers} + }, + ) if err != nil { t.Fatalf("unexpected error: %v for test %v", err, testNum) } // First check that we have the correct number of encoders. - got := len(rv.encoder) + got := len(rv.mwc.(*dummyMultiWriter).dst) want := len(test.encoders) if got != want { t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) } // Now check the correctness of encoders and their destinations. - for _, e := range rv.encoder { + for _, e := range rv.mwc.(*dummyMultiWriter).dst { // Get e's type. encoderType := fmt.Sprintf("%T", e) diff --git a/revid/senders.go b/revid/senders.go index 01aa208e..a0777803 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -193,7 +193,7 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ringBuf.Next(readTimeout) + chunk, err = s.ringBuf.Next(rTimeout) switch err { case nil: continue diff --git a/revid/senders_test.go b/revid/senders_test.go index 0a09662a..3b32e0e4 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -41,14 +41,6 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// Ring buffer sizes and read/write timeouts. -const ( - rbSize = 100 - rbElementSize = 150000 - wTimeout = 10 * time.Millisecond - rTimeout = 10 * time.Millisecond -) - var ( errSendFailed = errors.New("send failed") ) @@ -119,7 +111,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. tstDst := &destination{t: t} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -197,7 +189,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -277,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. From d75ea20137537166845223a94f4531b9b4c22f9f Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 10:25:35 +0930 Subject: [PATCH 02/28] revid: applying some feedback from last PR --- revid/revid.go | 3 ++- revid/senders.go | 14 ++++++------- revid/senders_test.go | 46 ++++++++++++++++++++++++------------------- 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 61af584e..865bc342 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -142,7 +142,8 @@ func (r *Revid) Bitrate() int { } // reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. +// configuration; checking validity and returning errors if not valid. It then +// sets up the data pipeline accordinging to this configuration. func (r *Revid) reset(config Config) error { err := r.setConfig(config) if err != nil { diff --git a/revid/senders.go b/revid/senders.go index a0777803..4da8cdd5 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -156,7 +156,7 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ringBuf *ring.Buffer + rb *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer @@ -172,7 +172,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), + rb: ring.NewBuffer(rbSize, rbElementSize, wTimeout), quit: make(chan struct{}), } s.wg.Add(1) @@ -193,17 +193,15 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ringBuf.Next(rTimeout) + chunk, err = s.rb.Next(rTimeout) switch err { - case nil: + case nil, io.EOF: continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue default: s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) - fallthrough - case io.EOF: continue } } @@ -235,11 +233,11 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.ringBuf.Write(s.buf) + _, err := s.rb.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.ringBuf.Flush() + s.rb.Flush() s.buf = s.buf[:0] } return len(d), nil diff --git a/revid/senders_test.go b/revid/senders_test.go index 3b32e0e4..f6a2e8cf 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -48,29 +48,34 @@ var ( // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { - buf [][]byte - testFails bool - failAt int - currentPkt int - t *testing.T - sendDelay time.Duration - delayAt int + buf [][]byte + testFails bool + failAt int + currentClip int + t *testing.T + sendDelay time.Duration + delayAt int + done chan struct{} + doneAt int } func (ts *destination) Write(d []byte) (int, error) { ts.t.Log("writing clip to destination") - if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + if ts.delayAt != 0 && ts.currentClip == ts.delayAt { time.Sleep(ts.sendDelay) } - if ts.testFails && ts.currentPkt == ts.failAt { + if ts.testFails && ts.currentClip == ts.failAt { ts.t.Log("failed send") - ts.currentPkt++ + ts.currentClip++ return 0, errSendFailed } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) - ts.currentPkt++ + if ts.currentClip == ts.doneAt { + close(ts.done) + } + ts.currentClip++ return len(d), nil } @@ -110,7 +115,8 @@ func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer, sender, sender and the MPEGTS encoder. - tstDst := &destination{t: t} + const numberOfClips = 11 + tstDst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) @@ -126,8 +132,8 @@ func TestMtsSenderSegment(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-tstDst.done sender.Close() // Check the data. @@ -188,7 +194,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 - tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} + tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) @@ -204,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-tstDst.done sender.Close() // Check that we have data as expected. @@ -268,7 +274,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 - tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} + tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) @@ -283,8 +289,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give mtsSender time to finish up then Close. - time.Sleep(100 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-tstDst.done sender.Close() // Check the data. From d18373908bd7a40511293edbaf60e887b09acf73 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 10:48:12 +0930 Subject: [PATCH 03/28] revid: added ringBuffer to rtmpSender --- revid/revid.go | 9 ++++-- revid/senders.go | 71 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 865bc342..48b4228d 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -52,8 +52,8 @@ import ( const ( rbSize = 1000 rbElementSize = 100000 - wTimeout = 1 * time.Second - rTimeout = 1 * time.Second + wTimeout = 0 * time.Second + rTimeout = 0 * time.Second ) // RTMP connection properties. @@ -119,6 +119,11 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// Config returns a copy of revids current config. +func (r *Revid) Config() Config { + return r.config +} + // TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { diff --git a/revid/senders.go b/revid/senders.go index 4da8cdd5..5f290fa2 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "errors" "fmt" "io" "net" @@ -252,14 +251,14 @@ func (s *mtsSender) Close() error { // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { - conn *rtmp.Conn - + conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - - data []byte + rb *ring.Buffer + quit chan struct{} + wg sync.WaitGroup } func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { @@ -281,20 +280,70 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, + rb: ring.NewBuffer(10, rbElementSize, 0), + quit: make(chan struct{}), } + s.wg.Add(1) + go s.output() return s, err } +// output starts an mtsSender's data handling routine. +func (s *rtmpSender) output() { + var chunk *ring.Chunk + for { + select { + case <-s.quit: + s.log(logger.Info, pkg+"rtmpSender: got quit signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { + var err error + chunk, err = s.rb.Next(rTimeout) + switch err { + case nil, io.EOF: + continue + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error()) + continue + } + } + if s.conn == nil { + s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...") + err := s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + continue + } + } + _, err := s.conn.Write(chunk.Bytes()) + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) + err = s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + } + continue + } + chunk.Close() + chunk = nil + } + } +} + // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - if s.conn == nil { - return 0, errors.New("no rtmp connection, cannot write") - } - _, err := s.conn.Write(d) + _, err := s.rb.Write(d) if err != nil { - err = s.restart() + s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error()) } - return len(d), err + s.rb.Flush() + return len(d), nil } func (s *rtmpSender) restart() error { From 88431b1357cddf13bed9f0164e873dd7440b9a9c Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 11:20:36 +0930 Subject: [PATCH 04/28] revid: made rtmpSender smarter with write error handling --- revid/senders.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index 5f290fa2..a4aec8b3 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -322,7 +322,9 @@ func (s *rtmpSender) output() { } } _, err := s.conn.Write(chunk.Bytes()) - if err != nil { + switch err { + case nil, rtmp.ErrInvalidFlvTag: + default: s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) err = s.restart() if err != nil { @@ -347,7 +349,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) { } func (s *rtmpSender) restart() error { - s.Close() + s.close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -363,6 +365,14 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { + if s.quit != nil { + close(s.quit) + } + s.wg.Wait() + return s.close() +} + +func (s *rtmpSender) close() error { if s.conn == nil { return nil } From 3534d9031ac495a15de248e8e848df19e33d8074 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 11:26:40 +0930 Subject: [PATCH 05/28] av: updated go mod --- go.mod | 5 +++++ go.sum | 14 -------------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 1280e1a3..675ff5c5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,15 @@ go 1.12 require ( bitbucket.org/ausocean/iot v1.2.4 bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e + github.com/BurntSushi/toml v0.3.1 // indirect github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/mewkiz/flac v1.0.5 + github.com/sergi/go-diff v1.0.0 // indirect github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index a23bf607..0fc7b5a3 100644 --- a/go.sum +++ b/go.sum @@ -1,34 +1,23 @@ bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= -bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q= -bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA= bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA= bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= -bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= -bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= -bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= -github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= @@ -40,19 +29,16 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= -github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= From 56a9b7d6ef1b8494f9141f181904f258da142166 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 11:43:46 +0930 Subject: [PATCH 06/28] av: fixed broken tests --- container/mts/audio_test.go | 2 +- protocol/rtmp/rtmp_test.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 9c1b8f03..8f71bbb7 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -38,7 +38,7 @@ import ( type buffer bytes.Buffer func (b *buffer) Write(d []byte) (int, error) { - return b.Write(d) + return (*bytes.Buffer)(b).Write(d) } func (b *buffer) Close() error { return nil } diff --git a/protocol/rtmp/rtmp_test.go b/protocol/rtmp/rtmp_test.go index f2d661c5..1cf056cb 100644 --- a/protocol/rtmp/rtmp_test.go +++ b/protocol/rtmp/rtmp_test.go @@ -222,6 +222,8 @@ func (rs *rtmpSender) Write(p []byte) (int, error) { return n, nil } +func (rs *rtmpSender) Close() error { return nil } + // TestFromFile tests streaming from an video file comprising raw H.264. // The test file is supplied via the RTMP_TEST_FILE environment variable. func TestFromFile(t *testing.T) { From cea3a5958ac09149ac78aa1df21c47098c9d442c Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 13:09:56 +0930 Subject: [PATCH 07/28] revid: changed no location in reply log message to level debug --- revid/senders.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/senders.go b/revid/senders.go index a4aec8b3..e9d9a92d 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -118,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) // Extract location from reply g, err := dec.String("ll") if err != nil { - log(logger.Warning, pkg+"No location in reply") + log(logger.Debug, pkg+"No location in reply") } else { log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) From e3c711d1f6e406b8efd48aadc4328ac1d238556b Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 13:13:01 +0930 Subject: [PATCH 08/28] cmd/revid-cli: run failed log message to warning level --- cmd/revid-cli/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index c7e3fffd..39872022 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -282,7 +282,7 @@ func run(cfg revid.Config) { for { err = ns.Run() if err != nil { - log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) + log.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } From 1c5d3997bb596c331cb379fbeed08262deb54451 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 18 Apr 2019 16:21:18 +0930 Subject: [PATCH 09/28] revid: fixed indentation on Dan's name under authors --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 48b4228d..29514fa6 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -8,7 +8,7 @@ DESCRIPTION AUTHORS Saxon A. Nelson-Milton Alan Noble - Dan Kortschak + Dan Kortschak LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) From 74c995d452dfa09242b7a7dd3561ca36efe0553c Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 18 Apr 2019 16:55:48 +0930 Subject: [PATCH 10/28] revid: addressing PR feedback --- revid/revid.go | 16 ++++++++-------- revid/revid_test.go | 4 ++-- revid/senders.go | 44 +++++++++++++++++++++---------------------- revid/senders_test.go | 40 +++++++++++++++++++++++++++------------ 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 29514fa6..ea85038a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -95,16 +95,16 @@ type Revid struct { lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error // mwc will hold the multiWriteCloser that writes to encoders from the lexer. - mwc io.WriteCloser - - // wg will be used to wait for any processing routines to finish. - wg sync.WaitGroup + encoders io.WriteCloser // isRunning is used to keep track of revid's running state between methods. isRunning bool // err will channel errors from revid routines to the handle errors routine. err chan error + + // wg will be used to wait for any processing routines to finish. + wg sync.WaitGroup } // New returns a pointer to a new Revid with the desired configuration, and/or @@ -148,7 +148,7 @@ func (r *Revid) Bitrate() int { // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. It then -// sets up the data pipeline accordinging to this configuration. +// sets up the data pipeline accordingly to this configuration. func (r *Revid) reset(config Config) error { err := r.setConfig(config) if err != nil { @@ -251,7 +251,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) encoders = append(encoders, e) } - r.mwc = multiWriter(encoders...) + r.encoders = multiWriter(encoders...) switch r.config.Input { case Raspivid: @@ -298,7 +298,7 @@ func (r *Revid) Stop() { } r.config.Logger.Log(logger.Info, pkg+"closing pipeline") - err := r.mwc.Close() + err := r.encoders.Close() if err != nil { r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error()) } @@ -572,7 +572,7 @@ func (r *Revid) setupInputForFile() error { func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") - r.err <- r.lexTo(r.mwc, read, delay) + r.err <- r.lexTo(r.encoders, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() } diff --git a/revid/revid_test.go b/revid/revid_test.go index 68520bbe..2ee53151 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -208,14 +208,14 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // First check that we have the correct number of encoders. - got := len(rv.mwc.(*dummyMultiWriter).dst) + got := len(rv.encoders.(*dummyMultiWriter).dst) want := len(test.encoders) if got != want { t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) } // Now check the correctness of encoders and their destinations. - for _, e := range rv.mwc.(*dummyMultiWriter).dst { + for _, e := range rv.encoders.(*dummyMultiWriter).dst { // Get e's type. encoderType := fmt.Sprintf("%T", e) diff --git a/revid/senders.go b/revid/senders.go index e9d9a92d..eb0c6ca4 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -155,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - rb *ring.Buffer + ring *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int - quit chan struct{} + done chan struct{} log func(lvl int8, msg string, args ...interface{}) wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - rb: ring.NewBuffer(rbSize, rbElementSize, wTimeout), - quit: make(chan struct{}), + ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout), + done: make(chan struct{}), } s.wg.Add(1) go s.output() @@ -184,15 +184,15 @@ func (s *mtsSender) output() { var chunk *ring.Chunk for { select { - case <-s.quit: - s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + case <-s.done: + s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.rb.Next(rTimeout) + chunk, err = s.ring.Next(rTimeout) switch err { case nil, io.EOF: continue @@ -232,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.rb.Write(s.buf) + _, err := s.ring.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.rb.Flush() + s.ring.Flush() s.buf = s.buf[:0] } return len(d), nil @@ -244,7 +244,7 @@ func (s *mtsSender) Write(d []byte) (int, error) { // Close implements io.Closer. func (s *mtsSender) Close() error { - close(s.quit) + close(s.done) s.wg.Wait() return nil } @@ -256,8 +256,8 @@ type rtmpSender struct { timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - rb *ring.Buffer - quit chan struct{} + ring *ring.Buffer + done chan struct{} wg sync.WaitGroup } @@ -280,8 +280,8 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, - rb: ring.NewBuffer(10, rbElementSize, 0), - quit: make(chan struct{}), + ring: ring.NewBuffer(10, rbElementSize, 0), + done: make(chan struct{}), } s.wg.Add(1) go s.output() @@ -293,15 +293,15 @@ func (s *rtmpSender) output() { var chunk *ring.Chunk for { select { - case <-s.quit: - s.log(logger.Info, pkg+"rtmpSender: got quit signal, terminating output routine") + case <-s.done: + s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.rb.Next(rTimeout) + chunk, err = s.ring.Next(rTimeout) switch err { case nil, io.EOF: continue @@ -340,11 +340,11 @@ func (s *rtmpSender) output() { // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - _, err := s.rb.Write(d) + _, err := s.ring.Write(d) if err != nil { s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error()) } - s.rb.Flush() + s.ring.Flush() return len(d), nil } @@ -365,8 +365,8 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { - if s.quit != nil { - close(s.quit) + if s.done != nil { + close(s.done) } s.wg.Wait() return s.close() diff --git a/revid/senders_test.go b/revid/senders_test.go index f6a2e8cf..36c3f41a 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -48,15 +48,31 @@ var ( // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { - buf [][]byte - testFails bool - failAt int + // Holds the clips written to this destination using Write. + buf [][]byte + + // testFails is set to true if we would like a write to fail at a particular + // clip as determined by failAt. + testFails bool + failAt int + + // Holds the current clip number. currentClip int - t *testing.T - sendDelay time.Duration - delayAt int - done chan struct{} - doneAt int + + // Pointer to the testing.T of a test where this struct is being used. This + // is used so that logging can be done through the testing log utilities. + t *testing.T + + // sendDelay is the amount of time we would like a Write to be delayed when + // we hit the clip number indicated by delayAt. + sendDelay time.Duration + delayAt int + + // done will be used to send a signal to the main routine to indicate that + // the destination has received all clips. doneAt indicates the final clip + // number. + done chan struct{} + doneAt int } func (ts *destination) Write(d []byte) (int, error) { @@ -116,8 +132,8 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 - tstDst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) + dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -133,11 +149,11 @@ func TestMtsSenderSegment(t *testing.T) { } // Wait until the destination has all the data, then close the sender. - <-tstDst.done + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) From d76b60a5151bae8c18bbb70cd699592721d14397 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 18 Apr 2019 18:31:49 +0930 Subject: [PATCH 11/28] revid: addressing PR feedback --- container/mts/audio_test.go | 13 +++++-------- container/mts/metaEncode_test.go | 8 ++++---- revid/revid.go | 16 +++++++++++++--- revid/senders.go | 4 ++-- revid/senders_test.go | 18 +++++++++--------- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 8f71bbb7..053bb0a9 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -26,6 +26,7 @@ package mts import ( "bytes" + "io" "io/ioutil" "testing" @@ -35,25 +36,21 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" ) -type buffer bytes.Buffer +type nopCloser struct{ io.Writer } -func (b *buffer) Write(d []byte) (int, error) { - return (*bytes.Buffer)(b).Write(d) -} - -func (b *buffer) Close() error { return nil } +func (nopCloser) Close() error { return nil } // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. func TestEncodePcm(t *testing.T) { Meta = meta.New() - var buf buffer + var buf bytes.Buffer sampleRate := 48000 sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(&buf, writeFreq, Audio) + e := NewEncoder(&nopCloser{&buf}, writeFreq, Audio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index e29b0639..314e235a 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -47,8 +47,8 @@ const fps = 25 // write this to psi. func TestMetaEncode1(t *testing.T) { Meta = meta.New() - var buf buffer - e := NewEncoder(&buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(&nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -75,8 +75,8 @@ func TestMetaEncode1(t *testing.T) { // into psi. func TestMetaEncode2(t *testing.T) { Meta = meta.New() - var buf buffer - e := NewEncoder(&buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(&nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/revid/revid.go b/revid/revid.go index ea85038a..c9c2bb51 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -52,8 +52,6 @@ import ( const ( rbSize = 1000 rbElementSize = 100000 - wTimeout = 0 * time.Second - rTimeout = 0 * time.Second ) // RTMP connection properties. @@ -120,6 +118,9 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { } // Config returns a copy of revids current config. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Config() Config { return r.config } @@ -207,7 +208,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, wTimeout) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -275,6 +276,9 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Start() error { if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") @@ -291,6 +295,9 @@ func (r *Revid) Start() error { // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Stop() { if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") @@ -313,6 +320,9 @@ func (r *Revid) Stop() { // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. +// +// This is not intended, nor is it safe, to be used concurrently with any other +// exported functionalilty from this file. func (r *Revid) Update(vars map[string]string) error { if r.isRunning { r.Stop() diff --git a/revid/senders.go b/revid/senders.go index eb0c6ca4..947f5be1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -192,7 +192,7 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ring.Next(rTimeout) + chunk, err = s.ring.Next(0) switch err { case nil, io.EOF: continue @@ -301,7 +301,7 @@ func (s *rtmpSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ring.Next(rTimeout) + chunk, err = s.ring.Next(0) switch err { case nil, io.EOF: continue diff --git a/revid/senders_test.go b/revid/senders_test.go index 36c3f41a..80293759 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -210,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 - tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) + dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -227,11 +227,11 @@ func TestMtsSenderFailedSend(t *testing.T) { } // Wait until the destination has all the data, then close the sender. - <-tstDst.done + <-dst.done sender.Close() // Check that we have data as expected. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -290,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 - tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) + dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -306,11 +306,11 @@ func TestMtsSenderDiscontinuity(t *testing.T) { } // Wait until the destination has all the data, then close the sender. - <-tstDst.done + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectedCC := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) From 81e3038b9ba66584e9cb0820452acdc960ac7f53 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 20 Apr 2019 18:19:25 +0930 Subject: [PATCH 12/28] go.mod: use ausocean/utils v1.2.6 --- go.mod | 7 ++++++- go.sum | 16 ++-------------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 1280e1a3..c3d766c5 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,16 @@ go 1.12 require ( bitbucket.org/ausocean/iot v1.2.4 - bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e + bitbucket.org/ausocean/utils v1.2.6 + github.com/BurntSushi/toml v0.3.1 // indirect github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/mewkiz/flac v1.0.5 + github.com/sergi/go-diff v1.0.0 // indirect github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index a23bf607..7ffe0a32 100644 --- a/go.sum +++ b/go.sum @@ -1,34 +1,25 @@ bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= -bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q= -bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA= bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA= bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= -bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= -bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= -bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8= +bitbucket.org/ausocean/utils v1.2.6 h1:JN66APCV+hu6GebIHSu2KSywhLym4vigjSz5+fB0zXc= +bitbucket.org/ausocean/utils v1.2.6/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= -github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= @@ -40,19 +31,16 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= -github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= From 9ba72fac629ce7c2a2ab947cdae1698ef69fd804 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 22 Apr 2019 15:14:08 +0930 Subject: [PATCH 13/28] av: addressing PR feedback --- revid/revid.go | 14 +++++++------- revid/revid_test.go | 28 ++++++++++++++++++++++++++++ revid/senders.go | 4 ++-- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index c9c2bb51..ced432b6 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -21,10 +21,10 @@ LICENSE It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. + for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid @@ -92,17 +92,17 @@ type Revid struct { // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error - // mwc will hold the multiWriteCloser that writes to encoders from the lexer. + // encoders will hold the multiWriteCloser that writes to encoders from the lexer. encoders io.WriteCloser // isRunning is used to keep track of revid's running state between methods. isRunning bool - // err will channel errors from revid routines to the handle errors routine. - err chan error - // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup + + // err will channel errors from revid routines to the handle errors routine. + err chan error } // New returns a pointer to a new Revid with the desired configuration, and/or @@ -307,7 +307,7 @@ func (r *Revid) Stop() { r.config.Logger.Log(logger.Info, pkg+"closing pipeline") err := r.encoders.Close() if err != nil { - r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error()) + r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) } if r.cmd != nil && r.cmd.Process != nil { diff --git a/revid/revid_test.go b/revid/revid_test.go index 2ee53151..145e4825 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -1,3 +1,31 @@ +/* +NAME + revid_test.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + +LICENSE + This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + package revid import ( diff --git a/revid/senders.go b/revid/senders.go index 947f5be1..17fa7dc1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -298,7 +298,7 @@ func (s *rtmpSender) output() { defer s.wg.Done() return default: - // If chunk is nil then we're ready to get another from the ringBuffer. + // If chunk is nil then we're ready to get another from the ring buffer. if chunk == nil { var err error chunk, err = s.ring.Next(0) @@ -342,7 +342,7 @@ func (s *rtmpSender) output() { func (s *rtmpSender) Write(d []byte) (int, error) { _, err := s.ring.Write(d) if err != nil { - s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error()) + s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) } s.ring.Flush() return len(d), nil From e5f95d1ea0977b78a576a41f272d98137a776b78 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 23 Apr 2019 13:18:41 +0930 Subject: [PATCH 14/28] revid: addressing PR feedback --- container/mts/audio_test.go | 2 +- container/mts/metaEncode_test.go | 4 ++-- revid/revid_test.go | 20 ++++++++++++++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 053bb0a9..9ef1f883 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -74,7 +74,7 @@ func TestEncodePcm(t *testing.T) { } } } - clip := (*bytes.Buffer)(&buf).Bytes() + clip := buf.Bytes() // Get the first MTS packet to check var pkt packet.Packet diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 314e235a..7fc23252 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -53,7 +53,7 @@ func TestMetaEncode1(t *testing.T) { if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) } - out := (*bytes.Buffer)(&buf).Bytes() + out := buf.Bytes() got := out[PacketSize+4:] want := []byte{ @@ -82,7 +82,7 @@ func TestMetaEncode2(t *testing.T) { if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) } - out := (*bytes.Buffer)(&buf).Bytes() + out := buf.Bytes() got := out[PacketSize+4:] want := []byte{ 0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24, diff --git a/revid/revid_test.go b/revid/revid_test.go index 145e4825..5134b47d 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -95,21 +95,33 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. -type tstMtsEncoder struct{ dst io.WriteCloser } +type tstMtsEncoder struct { + // dst is here soley to detect the type stored in the encoder. + // No data is written to dst. + dst io.WriteCloser +} func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil } func (e *tstMtsEncoder) Close() error { return nil } // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. -type tstFlvEncoder struct{ dst io.WriteCloser } +type tstFlvEncoder struct { + // dst is here soley to detect the type stored in the encoder. + // No data is written to dst. + dst io.WriteCloser +} func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } func (e *tstFlvEncoder) Close() error { return nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. -type dummyMultiWriter struct{ dst []io.WriteCloser } +type dummyMultiWriter struct { + // dst is here soley to detect the types stored in the multiWriter. + // No data is written to dst. + dst []io.WriteCloser +} func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } func (w *dummyMultiWriter) Close() error { return nil } @@ -259,7 +271,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // Now check that this encoder has correct number of destinations (senders). - var ms io.Writer + var ms io.WriteCloser switch encoderType { case mtsEncoderStr: ms = e.(*tstMtsEncoder).dst From 1b06dae07863036ce0fbf327c63e41fedd89d086 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 24 Apr 2019 12:57:04 +0930 Subject: [PATCH 15/28] container/mts: not taking address of nopCloser --- container/mts/audio_test.go | 2 +- container/mts/metaEncode_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 9ef1f883..f785930d 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -50,7 +50,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(&nopCloser{&buf}, writeFreq, Audio) + e := NewEncoder(nopCloser{&buf}, writeFreq, Audio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 7fc23252..939de5b7 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -48,7 +48,7 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(&nopCloser{&buf}, fps, Video) + e := NewEncoder(nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -76,7 +76,7 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(&nopCloser{&buf}, fps, Video) + e := NewEncoder(nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { From 74379ea04706c30e10e32d1153551f679f9634e6 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 24 Apr 2019 13:00:20 +0930 Subject: [PATCH 16/28] revid: simplified comments for Start, Stop and Update. --- revid/revid.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index ced432b6..ee258769 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -277,8 +277,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. // -// This is not intended, nor is it safe, to be used concurrently with any other -// exported functionalilty from this file. +// Start is not safe for concurrent use. func (r *Revid) Start() error { if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") @@ -296,8 +295,7 @@ func (r *Revid) Start() error { // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. // -// This is not intended, nor is it safe, to be used concurrently with any other -// exported functionalilty from this file. +// Stop is not safe for concurrent use. func (r *Revid) Stop() { if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") @@ -321,8 +319,7 @@ func (r *Revid) Stop() { // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. // -// This is not intended, nor is it safe, to be used concurrently with any other -// exported functionalilty from this file. +// Update is not safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { if r.isRunning { r.Stop() From 3ab0be4a7a95430fa46c44d9042bd76aa5af7647 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 24 Apr 2019 13:01:29 +0930 Subject: [PATCH 17/28] revid: fixed typos in revid_test.go --- revid/revid_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/revid/revid_test.go b/revid/revid_test.go index 5134b47d..08c8521a 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -96,7 +96,7 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. type tstMtsEncoder struct { - // dst is here soley to detect the type stored in the encoder. + // dst is here solely to detect the type stored in the encoder. // No data is written to dst. dst io.WriteCloser } @@ -107,7 +107,7 @@ func (e *tstMtsEncoder) Close() error { return nil } // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. type tstFlvEncoder struct { - // dst is here soley to detect the type stored in the encoder. + // dst is here solely to detect the type stored in the encoder. // No data is written to dst. dst io.WriteCloser } @@ -118,7 +118,7 @@ func (e *tstFlvEncoder) Close() error { return nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. type dummyMultiWriter struct { - // dst is here soley to detect the types stored in the multiWriter. + // dst is here solely to detect the types stored in the multiWriter. // No data is written to dst. dst []io.WriteCloser } From 0331bc49c7ee20fa007cabb36475f6306831c4b7 Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Thu, 25 Apr 2019 00:54:27 +0000 Subject: [PATCH 18/28] Added contributors. --- contributors.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/contributors.txt b/contributors.txt index 98f4eac8..0675f323 100644 --- a/contributors.txt +++ b/contributors.txt @@ -1,3 +1,5 @@ Alan Noble Saxon Nelson-Milton Jack Richardson +Dan Kortschak +Trek Hopton \ No newline at end of file From a365424f5ad2575167cbe56d46f5bbeba60f1010 Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Thu, 25 Apr 2019 01:26:50 +0000 Subject: [PATCH 19/28] Updated description and copyright. --- Readme.md | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/Readme.md b/Readme.md index 1c3489d4..7009a48d 100644 --- a/Readme.md +++ b/Readme.md @@ -2,21 +2,15 @@ av is a collection of tools and packages written in Go for audio-video processing. -# Authors -Alan Noble -Saxon A. Nelson-Milton -Trek Hopton +Codecs, containers and protocols are organized according to directories named accordingly. -# Description - -* revid: a tool for re-muxing and re-directing video streams. -* RingBuffer: a package that implements a ring buffer with concurrency control. +cmd/revid-cli is a command-line program for reading, transcoding, and writing audio/video streams and files. # License -Copyright (C) 2017 the Australian Ocean Lab (AusOcean). +Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean). -It is free software: you can redistribute it and/or modify them +This is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. @@ -27,4 +21,4 @@ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License or more details. You should have received a copy of the GNU General Public License -along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses/). +along with revid in gpl.txt. If not, see http://www.gnu.org/licenses/. From b42278cfdeac84a28a389e2603654a5079edd12b Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 25 Apr 2019 15:44:34 +0930 Subject: [PATCH 20/28] revid: improved comment for Config() --- revid/revid.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index ee258769..72eb52de 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -119,8 +119,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { // Config returns a copy of revids current config. // -// This is not intended, nor is it safe, to be used concurrently with any other -// exported functionalilty from this file. +// Config is not safe for concurrent use. func (r *Revid) Config() Config { return r.config } From 149a2fa6b98a148a79f6a64fc51ad8f33e1f28b7 Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Fri, 26 Apr 2019 00:36:23 +0000 Subject: [PATCH 21/28] Fix package-level doc comment. --- protocol/rtmp/conn.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocol/rtmp/conn.go b/protocol/rtmp/conn.go index 9b453849..26420683 100644 --- a/protocol/rtmp/conn.go +++ b/protocol/rtmp/conn.go @@ -31,6 +31,9 @@ LICENSE Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ + +// Package rtmp provides an RTMP client implementation. +// The package currently supports live streaming to YouTube. package rtmp import ( From ec290f1058755b52b6013622af02ff4683a7bf61 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 5 May 2019 19:39:56 +0930 Subject: [PATCH 22/28] revid: logging is now checked as a valid device so that logging level may be changed remotely --- revid/config.go | 17 +++++++++-------- revid/revid.go | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/revid/config.go b/revid/config.go index 6f5a0c8a..d9bd4877 100644 --- a/revid/config.go +++ b/revid/config.go @@ -148,7 +148,7 @@ const ( defaultFramesPerClip = 1 httpFramesPerClip = 560 defaultInputCodec = H264 - defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. + defaultVerbosity = logger.Error defaultRtpAddr = "localhost:6970" defaultBurstPeriod = 10 // Seconds defaultRotation = 0 // Degrees @@ -161,14 +161,15 @@ const ( // if particular parameters have not been defined. func (c *Config) Validate(r *Revid) error { switch c.LogLevel { - case Yes: - case No: - case NothingDefined: - c.LogLevel = defaultVerbosity - c.Logger.Log(logger.Info, pkg+"no LogLevel mode defined, defaulting", - "LogLevel", defaultVerbosity) + case logger.Debug: + case logger.Info: + case logger.Warning: + case logger.Error: + case logger.Fatal: default: - return errors.New("bad LogLevel defined in config") + c.LogLevel = defaultVerbosity + c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", + "LogLevel", defaultVerbosity) } switch c.Input { diff --git a/revid/revid.go b/revid/revid.go index 72eb52de..3a85df69 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -155,6 +155,8 @@ func (r *Revid) reset(config Config) error { return err } + r.config.Logger.SetLevel(config.LogLevel) + err = r.setupPipeline( func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { e := mts.NewEncoder(dst, float64(fps), mts.Video) @@ -448,6 +450,21 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.BurstPeriod = uint(v) + case "Logging": + switch value { + case "Debug": + r.config.LogLevel = logger.Debug + case "Info": + r.config.LogLevel = logger.Info + case "Warning": + r.config.LogLevel = logger.Warning + case "Error": + r.config.LogLevel = logger.Error + case "Fatal": + r.config.LogLevel = logger.Fatal + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) + } } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) From 39745c98403071efd6006488956474b536638998 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 5 May 2019 19:51:20 +0930 Subject: [PATCH 23/28] av: updated go mod --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index 7ffe0a32..cd09945f 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= -bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA= -bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.6 h1:JN66APCV+hu6GebIHSu2KSywhLym4vigjSz5+fB0zXc= bitbucket.org/ausocean/utils v1.2.6/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= From 8f74cd4ced0b1d1432cedc6bdc21d6eed24cc2f9 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 5 May 2019 22:50:59 +0930 Subject: [PATCH 24/28] revid: does not panic when not outputs are defined in a config. --- revid/config.go | 59 +++++++++++++++++++++++---------------------- revid/revid_test.go | 1 - 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/revid/config.go b/revid/config.go index d9bd4877..9c317e9b 100644 --- a/revid/config.go +++ b/revid/config.go @@ -211,36 +211,37 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - for i, o := range c.Outputs { - switch o { - case File: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Outputs[i] = Http - // FIXME(kortschak): Does this want the same line as below? - // c.FramesPerClip = httpFramesPerClip - break + if c.Outputs == nil { + c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", + defaultOutput) + c.Outputs = append(c.Outputs, defaultOutput) + c.Packetization = defaultPacketization + } else { + for i, o := range c.Outputs { + switch o { + case File: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Outputs[i] = Http + // FIXME(kortschak): Does this want the same line as below? + // c.FramesPerClip = httpFramesPerClip + break + } + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", + "framesPerClip", defaultFramesPerClip) + c.FramesPerClip = defaultFramesPerClip + c.Packetization = Flv + c.SendRetry = true + case Http, Rtp: + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", + "framesPerClip", httpFramesPerClip) + c.FramesPerClip = httpFramesPerClip + c.Packetization = Mpegts + default: + return errors.New("bad output type defined in config") } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - c.Packetization = Flv - c.SendRetry = true - case NothingDefined: - c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", - defaultOutput) - c.Outputs[i] = defaultOutput - c.Packetization = defaultPacketization - fallthrough - case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip - c.Packetization = Mpegts - default: - return errors.New("bad output type defined in config") } } diff --git a/revid/revid_test.go b/revid/revid_test.go index 08c8521a..18086912 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -59,7 +59,6 @@ func TestRaspivid(t *testing.T) { var c Config c.Logger = &logger c.Input = Raspivid - c.Outputs = make([]uint8, 1) rv, err := New(c, ns) if err != nil { From 77ff88392fc0cce863db1b3fcd5c16eeb4916c10 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 6 May 2019 15:12:05 +0930 Subject: [PATCH 25/28] revid: setupInput function for revid now returns closure that is used to do any clean up --- container/mts/encoder.go | 4 ---- revid/revid.go | 44 +++++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 552000e0..e9efbd97 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -26,7 +26,6 @@ LICENSE package mts import ( - "fmt" "io" "time" @@ -205,9 +204,6 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { // Write implements io.Writer. Write takes raw h264 and encodes into mpegts, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { - if len(data) > pes.MaxPesSize { - return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data)) - } now := time.Now() if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { e.pktCount = 0 diff --git a/revid/revid.go b/revid/revid.go index 3a85df69..e97746d1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -80,8 +80,12 @@ type Revid struct { ns *netsender.Sender // setupInput holds the current approach to setting up - // the input stream. - setupInput func() error + // the input stream. I will return a function used for cleaning up. + setupInput func() (func() error, error) + + // closeInput holds the cleanup function return from setupInput and is called + // in Revid.Stop(). + closeInput func() error // cmd is the exec'd process that may be used to produce // the input stream. @@ -286,11 +290,13 @@ func (r *Revid) Start() error { } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.isRunning = true - err := r.setupInput() + var err error + r.closeInput, err = r.setupInput() if err != nil { r.Stop() + return fmt.Errorf("could not setup input, failed with err: %v", err) } - return err + return nil } // Stop closes down the pipeline. This closes encoders and sender output routines, @@ -303,6 +309,13 @@ func (r *Revid) Stop() { return } + if r.closeInput != nil { + err := r.closeInput() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error()) + } + } + r.config.Logger.Log(logger.Info, pkg+"closing pipeline") err := r.encoders.Close() if err != nil { @@ -473,7 +486,7 @@ func (r *Revid) Update(vars map[string]string) error { // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. -func (r *Revid) startRaspivid() error { +func (r *Revid) startRaspivid() (func() error, error) { r.config.Logger.Log(logger.Info, pkg+"starting raspivid") const disabled = "0" @@ -505,7 +518,7 @@ func (r *Revid) startRaspivid() error { switch r.config.InputCodec { default: - return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) + return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) case H264: args = append(args, "--codec", "H264", @@ -523,7 +536,7 @@ func (r *Revid) startRaspivid() error { stdout, err := r.cmd.StdoutPipe() if err != nil { - return err + return nil, err } err = r.cmd.Start() if err != nil { @@ -532,10 +545,10 @@ func (r *Revid) startRaspivid() error { r.wg.Add(1) go r.processFrom(stdout, 0) - return nil + return nil, nil } -func (r *Revid) startV4L() error { +func (r *Revid) startV4L() (func() error, error) { const defaultVideo = "/dev/video0" r.config.Logger.Log(logger.Info, pkg+"starting webcam") @@ -563,34 +576,33 @@ func (r *Revid) startV4L() error { stdout, err := r.cmd.StdoutPipe() if err != nil { - return err + return nil, nil } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) - return err + return nil, nil } r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) - return nil + return nil, nil } // setupInputForFile sets things up for getting input from a file -func (r *Revid) setupInputForFile() error { +func (r *Revid) setupInputForFile() (func() error, error) { f, err := os.Open(r.config.InputPath) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) r.Stop() - return err + return nil, err } - defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) - return nil + return func() error { return f.Close() }, nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration) { From 5c40c48e9783017571fa9fa63202af875d7c620c Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 6 May 2019 15:54:33 +0930 Subject: [PATCH 26/28] revid/revid.go: in Revid.Start() if errors occurs on call to Revid.setupInput() fall through error check and return err --- revid/revid.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index e97746d1..ef421b05 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -294,9 +294,8 @@ func (r *Revid) Start() error { r.closeInput, err = r.setupInput() if err != nil { r.Stop() - return fmt.Errorf("could not setup input, failed with err: %v", err) } - return nil + return err } // Stop closes down the pipeline. This closes encoders and sender output routines, From b3775265e63cce42de86a8e689256c3c979ce61c Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 6 May 2019 15:59:41 +0930 Subject: [PATCH 27/28] revid/config.go: not breaking logging lines --- revid/config.go | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/revid/config.go b/revid/config.go index 9c317e9b..010af3fd 100644 --- a/revid/config.go +++ b/revid/config.go @@ -168,15 +168,13 @@ func (c *Config) Validate(r *Revid) error { case logger.Fatal: default: c.LogLevel = defaultVerbosity - c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", - "LogLevel", defaultVerbosity) + c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity) } switch c.Input { case Raspivid, V4L, File: case NothingDefined: - c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", - defaultInput) + c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput default: return errors.New("bad input type defined in config") @@ -200,20 +198,16 @@ func (c *Config) Validate(r *Revid) error { } case NothingDefined: - c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting", - "inputCodec", defaultInputCodec) + c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting", "inputCodec", defaultInputCodec) c.InputCodec = defaultInputCodec - c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization", - defaultQuantization) + c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization", defaultQuantization) c.Quantization = defaultQuantization - default: return errors.New("bad input codec defined in config") } if c.Outputs == nil { - c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", - defaultOutput) + c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", defaultOutput) c.Outputs = append(c.Outputs, defaultOutput) c.Packetization = defaultPacketization } else { @@ -229,14 +223,12 @@ func (c *Config) Validate(r *Revid) error { // c.FramesPerClip = httpFramesPerClip break } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip) c.FramesPerClip = defaultFramesPerClip c.Packetization = Flv c.SendRetry = true case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip) c.FramesPerClip = httpFramesPerClip c.Packetization = Mpegts default: @@ -251,8 +243,7 @@ func (c *Config) Validate(r *Revid) error { } if c.FramesPerClip < 1 { - c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", - "framesPerClip", defaultFramesPerClip) + c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) c.FramesPerClip = defaultFramesPerClip } From 5e568f277bca0b2ce628c12c96840b53efae20f6 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 6 May 2019 16:04:56 +0930 Subject: [PATCH 28/28] revid/revid.go: fixed comment for Revid.setupInput field --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index ef421b05..43c4f982 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -80,7 +80,7 @@ type Revid struct { ns *netsender.Sender // setupInput holds the current approach to setting up - // the input stream. I will return a function used for cleaning up. + // the input stream. It returns a function used for cleaning up, and any errors. setupInput func() (func() error, error) // closeInput holds the cleanup function return from setupInput and is called