diff --git a/container/flv/encoder.go b/container/flv/encoder.go index 0fe794d2..306d4b66 100644 --- a/container/flv/encoder.go +++ b/container/flv/encoder.go @@ -55,8 +55,7 @@ var ( // Encoder provides properties required for the generation of flv video // from raw video data type Encoder struct { - dst io.Writer - + dst io.WriteCloser fps int audio bool video bool @@ -64,7 +63,7 @@ type Encoder struct { } // NewEncoder retuns a new FLV encoder. -func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { +func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) { e := Encoder{ dst: dst, fps: fps, @@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) { return len(frame), nil } + +// Close will close the encoder destination. +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 23ba16e6..9c1b8f03 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -35,12 +35,20 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" ) +type buffer bytes.Buffer + +func (b *buffer) Write(d []byte) (int, error) { + return b.Write(d) +} + +func (b *buffer) Close() error { return nil } + // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. func TestEncodePcm(t *testing.T) { Meta = meta.New() - var buf bytes.Buffer + var buf buffer sampleRate := 48000 sampleSize := 2 blockSize := 16000 @@ -69,7 +77,7 @@ func TestEncodePcm(t *testing.T) { } } } - clip := buf.Bytes() + clip := (*bytes.Buffer)(&buf).Bytes() // Get the first MTS packet to check var pkt packet.Packet diff --git a/container/mts/encoder.go b/container/mts/encoder.go index e72cfebf..552000e0 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -127,7 +127,7 @@ const ( // Encoder encapsulates properties of an mpegts generator. type Encoder struct { - dst io.Writer + dst io.WriteCloser clock time.Duration lastTime time.Time @@ -149,7 +149,7 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { +func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { var mPid int var sid byte switch mediaType { @@ -327,3 +327,7 @@ func updateMeta(b []byte) ([]byte, error) { err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) return []byte(p), err } + +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 2bf94d64..e29b0639 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -47,14 +47,13 @@ const fps = 25 // write this to psi. func TestMetaEncode1(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf buffer + e := NewEncoder(&buf, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) } - out := buf.Bytes() + out := (*bytes.Buffer)(&buf).Bytes() got := out[PacketSize+4:] want := []byte{ @@ -76,15 +75,14 @@ func TestMetaEncode1(t *testing.T) { // into psi. func TestMetaEncode2(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf buffer + e := NewEncoder(&buf, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) } - out := buf.Bytes() + out := (*bytes.Buffer)(&buf).Bytes() got := out[PacketSize+4:] want := []byte{ 0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24, diff --git a/revid/revid.go b/revid/revid.go index d8068115..61af584e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -8,6 +8,7 @@ DESCRIPTION AUTHORS Saxon A. Nelson-Milton Alan Noble + Dan Kortschak LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) @@ -26,7 +27,6 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( @@ -46,15 +46,14 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -// Ring buffer sizes and read/write timeouts. +// mtsSender ringBuffer sizes. const ( - ringBufferSize = 1000 - ringBufferElementSize = 100000 - writeTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond + rbSize = 1000 + rbElementSize = 100000 + wTimeout = 1 * time.Second + rTimeout = 1 * time.Second ) // RTMP connection properties. @@ -63,17 +62,6 @@ const ( rtmpConnectionTimeout = 10 ) -// Duration of video for each clip sent out. -const clipDuration = 1 * time.Second - -// Time duration between bitrate checks. -const bitrateTime = 1 * time.Minute - -// After a send fail, this is the delay before another send. -const sendFailedDelay = 5 * time.Millisecond - -const ffmpegPath = "/usr/local/bin/ffmpeg" - const pkg = "revid:" type Logger interface { @@ -89,6 +77,7 @@ type Revid struct { // FIXME(kortschak): The relationship of concerns // in config/ns is weird. config Config + // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender @@ -105,40 +94,19 @@ type Revid struct { // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error - // buffer handles passing frames from the transcoder - // to the target destination. - buffer *buffer - - // encoder holds the required encoders, which then write to destinations. - encoder []io.Writer - - // writeClosers holds the senders that the encoders will write to. - writeClosers []io.WriteCloser - - // bitrate hold the last send bitrate calculation result. - bitrate int - - mu sync.Mutex - isRunning bool + // mwc will hold the multiWriteCloser that writes to encoders from the lexer. + mwc io.WriteCloser + // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup + // isRunning is used to keep track of revid's running state between methods. + isRunning bool + + // err will channel errors from revid routines to the handle errors routine. err chan error } -// buffer is a wrapper for a ring.Buffer and provides function to write and -// flush in one Write call. -type buffer ring.Buffer - -// Write implements the io.Writer interface. It will write to the underlying -// ring.Buffer and then flush to indicate a complete ring.Buffer write. -func (b *buffer) Write(d []byte) (int, error) { - r := (*ring.Buffer)(b) - n, err := r.Write(d) - r.Flush() - return n, err -} - // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -167,10 +135,40 @@ func (r *Revid) handleErrors() { } // Bitrate returns the result of the most recent bitrate check. +// +// TODO: get this working again. func (r *Revid) Bitrate() int { - return r.bitrate + return -1 } +// reset swaps the current config of a Revid with the passed +// configuration; checking validity and returning errors if not valid. +func (r *Revid) reset(config Config) error { + err := r.setConfig(config) + if err != nil { + return err + } + + err = r.setupPipeline( + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + e := mts.NewEncoder(dst, float64(fps), mts.Video) + return e, nil + }, + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + return flv.NewEncoder(dst, true, true, fps) + }, + ioext.MultiWriteCloser, + ) + + if err != nil { + return err + } + + return nil +} + +// setConfig takes a config, checks it's validity and then replaces the current +// revid config. func (r *Revid) setConfig(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) @@ -187,10 +185,10 @@ func (r *Revid) setConfig(config Config) error { // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { - r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) - - r.encoder = r.encoder[:0] +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { + // encoders will hold the encoders that are required for revid's current + // configuration. + var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. @@ -203,7 +201,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, wTimeout) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -232,7 +230,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W if len(mtsSenders) != 0 { mw := multiWriter(mtsSenders...) e, _ := mtsEnc(mw, int(r.config.FrameRate)) - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV @@ -244,9 +242,11 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W if err != nil { return err } - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } + r.mwc = multiWriter(encoders...) + switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid @@ -267,72 +267,15 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W return nil } -func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - e := mts.NewEncoder(dst, float64(fps), mts.Video) - return e, nil -} - -func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - e, err := flv.NewEncoder(dst, true, true, fps) - if err != nil { - return nil, err - } - return e, nil -} - -// reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. -func (r *Revid) reset(config Config) error { - err := r.setConfig(config) - if err != nil { - return err - } - - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) - if err != nil { - return err - } - - return nil -} - -// IsRunning returns true if revid is running. -func (r *Revid) IsRunning() bool { - r.mu.Lock() - ret := r.isRunning - r.mu.Unlock() - return ret -} - -func (r *Revid) Config() Config { - r.mu.Lock() - cfg := r.config - r.mu.Unlock() - return cfg -} - -// setIsRunning sets r.isRunning using b. -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.IsRunning() { + if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") return nil } r.config.Logger.Log(logger.Info, pkg+"starting Revid") - // TODO: this doesn't need to be here - r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.setIsRunning(true) - r.config.Logger.Log(logger.Info, pkg+"starting output routine") - r.wg.Add(1) - go r.outputClips() - r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") + r.isRunning = true err := r.setupInput() if err != nil { r.Stop() @@ -340,33 +283,35 @@ func (r *Revid) Start() error { return err } -// Stop halts any processing of video data from a camera or file +// Stop closes down the pipeline. This closes encoders and sender output routines, +// connections, and/or files. func (r *Revid) Stop() { - if !r.IsRunning() { + if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") return } - for _, w := range r.writeClosers { - err := w.Close() - if err != nil { - r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) - } + r.config.Logger.Log(logger.Info, pkg+"closing pipeline") + err := r.mwc.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") - // If a cmd process is running, we kill! + if r.cmd != nil && r.cmd.Process != nil { + r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } - r.setIsRunning(false) r.wg.Wait() + r.isRunning = false } +// Update takes a map of variables and their values and edits the current config +// if the variables are recognised as valid parameters. func (r *Revid) Update(vars map[string]string) error { - if r.IsRunning() { + if r.isRunning { r.Stop() } + //look through the vars and update revid where needed for key, value := range vars { switch key { @@ -497,55 +442,6 @@ func (r *Revid) Update(vars map[string]string) error { return r.reset(r.config) } -// outputClips takes the clips produced in the packClips method and outputs them -// to the desired output defined in the revid config -func (r *Revid) outputClips() { - defer r.wg.Done() - lastTime := time.Now() - var count int -loop: - for r.IsRunning() { - // If the ring buffer has something we can read and send off - chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout) - switch err { - case nil: - // Do nothing. - case ring.ErrTimeout: - r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout") - continue - default: - r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error()) - fallthrough - case io.EOF: - break loop - } - - // Loop over encoders and hand bytes over to each one. - for _, e := range r.encoder { - _, err := chunk.WriteTo(e) - if err != nil { - r.err <- err - } - } - - // Release the chunk back to the ring buffer. - chunk.Close() - - // FIXME(saxon): this doesn't work anymore. - now := time.Now() - deltaTime := now.Sub(lastTime) - if deltaTime > bitrateTime { - // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. - r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) - r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) - r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len()) - lastTime = now - count = 0 - } - } - r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore") -} - // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() error { @@ -670,7 +566,7 @@ func (r *Revid) setupInputForFile() error { func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") - r.err <- r.lexTo(r.buffer, read, delay) + r.err <- r.lexTo(r.mwc, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() } diff --git a/revid/revid_test.go b/revid/revid_test.go index ccf0de3a..68520bbe 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -47,11 +47,8 @@ func TestRaspivid(t *testing.T) { // testLogger implements a netsender.Logger. type testLogger struct{} -// SetLevel normally sets the logging level, but it is a no-op in our case. -func (tl *testLogger) SetLevel(level int8) { -} +func (tl *testLogger) SetLevel(level int8) {} -// Log requests the Logger to write a message at the given level. func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} if level < -1 || level > 5 { @@ -70,45 +67,24 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. -type tstMtsEncoder struct { - dst io.Writer -} +type tstMtsEncoder struct{ dst io.WriteCloser } -// newTstMtsEncoder returns a pointer to a newTsMtsEncoder. -func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstMtsEncoder{dst: dst}, nil -} - -func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil } +func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstMtsEncoder) Close() error { return nil } // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. -type tstFlvEncoder struct { - dst io.Writer -} - -// newTstFlvEncoder returns a pointer to a new tstFlvEncoder. -func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstFlvEncoder{dst: dst}, nil -} +type tstFlvEncoder struct{ dst io.WriteCloser } func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstFlvEncoder) Close() error { return nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. -type dummyMultiWriter struct { - dst []io.WriteCloser -} - -func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser { - return &dummyMultiWriter{ - dst: dst, - } -} +type dummyMultiWriter struct{ dst []io.WriteCloser } func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } - -func (w *dummyMultiWriter) Close() error { return nil } +func (w *dummyMultiWriter) Close() error { return nil } // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. @@ -216,20 +192,30 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // This logic is what we want to check. - err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) + err = rv.setupPipeline( + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstMtsEncoder{dst: dst}, nil + }, + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstFlvEncoder{dst: dst}, nil + }, + func(writers ...io.WriteCloser) io.WriteCloser { + return &dummyMultiWriter{dst: writers} + }, + ) if err != nil { t.Fatalf("unexpected error: %v for test %v", err, testNum) } // First check that we have the correct number of encoders. - got := len(rv.encoder) + got := len(rv.mwc.(*dummyMultiWriter).dst) want := len(test.encoders) if got != want { t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) } // Now check the correctness of encoders and their destinations. - for _, e := range rv.encoder { + for _, e := range rv.mwc.(*dummyMultiWriter).dst { // Get e's type. encoderType := fmt.Sprintf("%T", e) diff --git a/revid/senders.go b/revid/senders.go index 01aa208e..a0777803 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -193,7 +193,7 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ringBuf.Next(readTimeout) + chunk, err = s.ringBuf.Next(rTimeout) switch err { case nil: continue diff --git a/revid/senders_test.go b/revid/senders_test.go index 0a09662a..3b32e0e4 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -41,14 +41,6 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// Ring buffer sizes and read/write timeouts. -const ( - rbSize = 100 - rbElementSize = 150000 - wTimeout = 10 * time.Millisecond - rTimeout = 10 * time.Millisecond -) - var ( errSendFailed = errors.New("send failed") ) @@ -119,7 +111,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. tstDst := &destination{t: t} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -197,7 +189,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -277,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) + sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder.