diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index c7e3fffd..39872022 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -282,7 +282,7 @@ func run(cfg revid.Config) { for { err = ns.Run() if err != nil { - log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) + log.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } diff --git a/container/flv/encoder.go b/container/flv/encoder.go index 0fe794d2..306d4b66 100644 --- a/container/flv/encoder.go +++ b/container/flv/encoder.go @@ -55,8 +55,7 @@ var ( // Encoder provides properties required for the generation of flv video // from raw video data type Encoder struct { - dst io.Writer - + dst io.WriteCloser fps int audio bool video bool @@ -64,7 +63,7 @@ type Encoder struct { } // NewEncoder retuns a new FLV encoder. -func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { +func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) { e := Encoder{ dst: dst, fps: fps, @@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) { return len(frame), nil } + +// Close will close the encoder destination. +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 23ba16e6..f785930d 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -26,6 +26,7 @@ package mts import ( "bytes" + "io" "io/ioutil" "testing" @@ -35,6 +36,10 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" ) +type nopCloser struct{ io.Writer } + +func (nopCloser) Close() error { return nil } + // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. func TestEncodePcm(t *testing.T) { @@ -45,7 +50,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(&buf, writeFreq, Audio) + e := NewEncoder(nopCloser{&buf}, writeFreq, Audio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index e72cfebf..552000e0 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -127,7 +127,7 @@ const ( // Encoder encapsulates properties of an mpegts generator. type Encoder struct { - dst io.Writer + dst io.WriteCloser clock time.Duration lastTime time.Time @@ -149,7 +149,7 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { +func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { var mPid int var sid byte switch mediaType { @@ -327,3 +327,7 @@ func updateMeta(b []byte) ([]byte, error) { err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) return []byte(p), err } + +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 2bf94d64..939de5b7 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -47,9 +47,8 @@ const fps = 25 // write this to psi. func TestMetaEncode1(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -76,9 +75,8 @@ func TestMetaEncode1(t *testing.T) { // into psi. func TestMetaEncode2(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/protocol/rtmp/rtmp_test.go b/protocol/rtmp/rtmp_test.go index f2d661c5..1cf056cb 100644 --- a/protocol/rtmp/rtmp_test.go +++ b/protocol/rtmp/rtmp_test.go @@ -222,6 +222,8 @@ func (rs *rtmpSender) Write(p []byte) (int, error) { return n, nil } +func (rs *rtmpSender) Close() error { return nil } + // TestFromFile tests streaming from an video file comprising raw H.264. // The test file is supplied via the RTMP_TEST_FILE environment variable. func TestFromFile(t *testing.T) { diff --git a/revid/revid.go b/revid/revid.go index d8068115..72eb52de 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -8,6 +8,7 @@ DESCRIPTION AUTHORS Saxon A. Nelson-Milton Alan Noble + Dan Kortschak LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) @@ -20,13 +21,12 @@ LICENSE It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. + for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + in gpl.txt. If not, see http://www.gnu.org/licenses. */ -// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( @@ -46,15 +46,12 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -// Ring buffer sizes and read/write timeouts. +// mtsSender ringBuffer sizes. const ( - ringBufferSize = 1000 - ringBufferElementSize = 100000 - writeTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond + rbSize = 1000 + rbElementSize = 100000 ) // RTMP connection properties. @@ -63,17 +60,6 @@ const ( rtmpConnectionTimeout = 10 ) -// Duration of video for each clip sent out. -const clipDuration = 1 * time.Second - -// Time duration between bitrate checks. -const bitrateTime = 1 * time.Minute - -// After a send fail, this is the delay before another send. -const sendFailedDelay = 5 * time.Millisecond - -const ffmpegPath = "/usr/local/bin/ffmpeg" - const pkg = "revid:" type Logger interface { @@ -89,6 +75,7 @@ type Revid struct { // FIXME(kortschak): The relationship of concerns // in config/ns is weird. config Config + // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender @@ -105,40 +92,19 @@ type Revid struct { // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error - // buffer handles passing frames from the transcoder - // to the target destination. - buffer *buffer + // encoders will hold the multiWriteCloser that writes to encoders from the lexer. + encoders io.WriteCloser - // encoder holds the required encoders, which then write to destinations. - encoder []io.Writer - - // writeClosers holds the senders that the encoders will write to. - writeClosers []io.WriteCloser - - // bitrate hold the last send bitrate calculation result. - bitrate int - - mu sync.Mutex + // isRunning is used to keep track of revid's running state between methods. isRunning bool + // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup + // err will channel errors from revid routines to the handle errors routine. err chan error } -// buffer is a wrapper for a ring.Buffer and provides function to write and -// flush in one Write call. -type buffer ring.Buffer - -// Write implements the io.Writer interface. It will write to the underlying -// ring.Buffer and then flush to indicate a complete ring.Buffer write. -func (b *buffer) Write(d []byte) (int, error) { - r := (*ring.Buffer)(b) - n, err := r.Write(d) - r.Flush() - return n, err -} - // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -151,6 +117,13 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// Config returns a copy of revids current config. +// +// Config is not safe for concurrent use. +func (r *Revid) Config() Config { + return r.config +} + // TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { @@ -167,10 +140,41 @@ func (r *Revid) handleErrors() { } // Bitrate returns the result of the most recent bitrate check. +// +// TODO: get this working again. func (r *Revid) Bitrate() int { - return r.bitrate + return -1 } +// reset swaps the current config of a Revid with the passed +// configuration; checking validity and returning errors if not valid. It then +// sets up the data pipeline accordingly to this configuration. +func (r *Revid) reset(config Config) error { + err := r.setConfig(config) + if err != nil { + return err + } + + err = r.setupPipeline( + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + e := mts.NewEncoder(dst, float64(fps), mts.Video) + return e, nil + }, + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + return flv.NewEncoder(dst, true, true, fps) + }, + ioext.MultiWriteCloser, + ) + + if err != nil { + return err + } + + return nil +} + +// setConfig takes a config, checks it's validity and then replaces the current +// revid config. func (r *Revid) setConfig(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) @@ -187,10 +191,10 @@ func (r *Revid) setConfig(config Config) error { // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. -func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { - r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) - - r.encoder = r.encoder[:0] +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { + // encoders will hold the encoders that are required for revid's current + // configuration. + var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. @@ -203,7 +207,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -232,7 +236,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W if len(mtsSenders) != 0 { mw := multiWriter(mtsSenders...) e, _ := mtsEnc(mw, int(r.config.FrameRate)) - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV @@ -244,9 +248,11 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W if err != nil { return err } - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } + r.encoders = multiWriter(encoders...) + switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid @@ -267,72 +273,17 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W return nil } -func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - e := mts.NewEncoder(dst, float64(fps), mts.Video) - return e, nil -} - -func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - e, err := flv.NewEncoder(dst, true, true, fps) - if err != nil { - return nil, err - } - return e, nil -} - -// reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. -func (r *Revid) reset(config Config) error { - err := r.setConfig(config) - if err != nil { - return err - } - - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) - if err != nil { - return err - } - - return nil -} - -// IsRunning returns true if revid is running. -func (r *Revid) IsRunning() bool { - r.mu.Lock() - ret := r.isRunning - r.mu.Unlock() - return ret -} - -func (r *Revid) Config() Config { - r.mu.Lock() - cfg := r.config - r.mu.Unlock() - return cfg -} - -// setIsRunning sets r.isRunning using b. -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. +// +// Start is not safe for concurrent use. func (r *Revid) Start() error { - if r.IsRunning() { + if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") return nil } r.config.Logger.Log(logger.Info, pkg+"starting Revid") - // TODO: this doesn't need to be here - r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.setIsRunning(true) - r.config.Logger.Log(logger.Info, pkg+"starting output routine") - r.wg.Add(1) - go r.outputClips() - r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") + r.isRunning = true err := r.setupInput() if err != nil { r.Stop() @@ -340,33 +291,39 @@ func (r *Revid) Start() error { return err } -// Stop halts any processing of video data from a camera or file +// Stop closes down the pipeline. This closes encoders and sender output routines, +// connections, and/or files. +// +// Stop is not safe for concurrent use. func (r *Revid) Stop() { - if !r.IsRunning() { + if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") return } - for _, w := range r.writeClosers { - err := w.Close() - if err != nil { - r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) - } + r.config.Logger.Log(logger.Info, pkg+"closing pipeline") + err := r.encoders.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") - // If a cmd process is running, we kill! + if r.cmd != nil && r.cmd.Process != nil { + r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } - r.setIsRunning(false) r.wg.Wait() + r.isRunning = false } +// Update takes a map of variables and their values and edits the current config +// if the variables are recognised as valid parameters. +// +// Update is not safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { - if r.IsRunning() { + if r.isRunning { r.Stop() } + //look through the vars and update revid where needed for key, value := range vars { switch key { @@ -497,55 +454,6 @@ func (r *Revid) Update(vars map[string]string) error { return r.reset(r.config) } -// outputClips takes the clips produced in the packClips method and outputs them -// to the desired output defined in the revid config -func (r *Revid) outputClips() { - defer r.wg.Done() - lastTime := time.Now() - var count int -loop: - for r.IsRunning() { - // If the ring buffer has something we can read and send off - chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout) - switch err { - case nil: - // Do nothing. - case ring.ErrTimeout: - r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout") - continue - default: - r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error()) - fallthrough - case io.EOF: - break loop - } - - // Loop over encoders and hand bytes over to each one. - for _, e := range r.encoder { - _, err := chunk.WriteTo(e) - if err != nil { - r.err <- err - } - } - - // Release the chunk back to the ring buffer. - chunk.Close() - - // FIXME(saxon): this doesn't work anymore. - now := time.Now() - deltaTime := now.Sub(lastTime) - if deltaTime > bitrateTime { - // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. - r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) - r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) - r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len()) - lastTime = now - count = 0 - } - } - r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore") -} - // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() error { @@ -670,7 +578,7 @@ func (r *Revid) setupInputForFile() error { func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") - r.err <- r.lexTo(r.buffer, read, delay) + r.err <- r.lexTo(r.encoders, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() } diff --git a/revid/revid_test.go b/revid/revid_test.go index ccf0de3a..08c8521a 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -1,3 +1,31 @@ +/* +NAME + revid_test.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + +LICENSE + This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + package revid import ( @@ -47,11 +75,8 @@ func TestRaspivid(t *testing.T) { // testLogger implements a netsender.Logger. type testLogger struct{} -// SetLevel normally sets the logging level, but it is a no-op in our case. -func (tl *testLogger) SetLevel(level int8) { -} +func (tl *testLogger) SetLevel(level int8) {} -// Log requests the Logger to write a message at the given level. func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} if level < -1 || level > 5 { @@ -71,44 +96,35 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. type tstMtsEncoder struct { - dst io.Writer + // dst is here solely to detect the type stored in the encoder. + // No data is written to dst. + dst io.WriteCloser } -// newTstMtsEncoder returns a pointer to a newTsMtsEncoder. -func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstMtsEncoder{dst: dst}, nil -} - -func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil } +func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstMtsEncoder) Close() error { return nil } // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. type tstFlvEncoder struct { - dst io.Writer -} - -// newTstFlvEncoder returns a pointer to a new tstFlvEncoder. -func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstFlvEncoder{dst: dst}, nil + // dst is here solely to detect the type stored in the encoder. + // No data is written to dst. + dst io.WriteCloser } func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstFlvEncoder) Close() error { return nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. type dummyMultiWriter struct { + // dst is here solely to detect the types stored in the multiWriter. + // No data is written to dst. dst []io.WriteCloser } -func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser { - return &dummyMultiWriter{ - dst: dst, - } -} - func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } - -func (w *dummyMultiWriter) Close() error { return nil } +func (w *dummyMultiWriter) Close() error { return nil } // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. @@ -216,20 +232,30 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // This logic is what we want to check. - err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) + err = rv.setupPipeline( + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstMtsEncoder{dst: dst}, nil + }, + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstFlvEncoder{dst: dst}, nil + }, + func(writers ...io.WriteCloser) io.WriteCloser { + return &dummyMultiWriter{dst: writers} + }, + ) if err != nil { t.Fatalf("unexpected error: %v for test %v", err, testNum) } // First check that we have the correct number of encoders. - got := len(rv.encoder) + got := len(rv.encoders.(*dummyMultiWriter).dst) want := len(test.encoders) if got != want { t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) } // Now check the correctness of encoders and their destinations. - for _, e := range rv.encoder { + for _, e := range rv.encoders.(*dummyMultiWriter).dst { // Get e's type. encoderType := fmt.Sprintf("%T", e) @@ -245,7 +271,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // Now check that this encoder has correct number of destinations (senders). - var ms io.Writer + var ms io.WriteCloser switch encoderType { case mtsEncoderStr: ms = e.(*tstMtsEncoder).dst diff --git a/revid/senders.go b/revid/senders.go index 01aa208e..17fa7dc1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "errors" "fmt" "io" "net" @@ -119,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) // Extract location from reply g, err := dec.String("ll") if err != nil { - log(logger.Warning, pkg+"No location in reply") + log(logger.Debug, pkg+"No location in reply") } else { log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) @@ -156,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ringBuf *ring.Buffer + ring *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int - quit chan struct{} + done chan struct{} log func(lvl int8, msg string, args ...interface{}) wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), - quit: make(chan struct{}), + ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout), + done: make(chan struct{}), } s.wg.Add(1) go s.output() @@ -185,25 +184,23 @@ func (s *mtsSender) output() { var chunk *ring.Chunk for { select { - case <-s.quit: - s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + case <-s.done: + s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ringBuf.Next(readTimeout) + chunk, err = s.ring.Next(0) switch err { - case nil: + case nil, io.EOF: continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue default: s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) - fallthrough - case io.EOF: continue } } @@ -235,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.ringBuf.Write(s.buf) + _, err := s.ring.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.ringBuf.Flush() + s.ring.Flush() s.buf = s.buf[:0] } return len(d), nil @@ -247,21 +244,21 @@ func (s *mtsSender) Write(d []byte) (int, error) { // Close implements io.Closer. func (s *mtsSender) Close() error { - close(s.quit) + close(s.done) s.wg.Wait() return nil } // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { - conn *rtmp.Conn - + conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - - data []byte + ring *ring.Buffer + done chan struct{} + wg sync.WaitGroup } func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { @@ -283,24 +280,76 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, + ring: ring.NewBuffer(10, rbElementSize, 0), + done: make(chan struct{}), } + s.wg.Add(1) + go s.output() return s, err } +// output starts an mtsSender's data handling routine. +func (s *rtmpSender) output() { + var chunk *ring.Chunk + for { + select { + case <-s.done: + s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ring buffer. + if chunk == nil { + var err error + chunk, err = s.ring.Next(0) + switch err { + case nil, io.EOF: + continue + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error()) + continue + } + } + if s.conn == nil { + s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...") + err := s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + continue + } + } + _, err := s.conn.Write(chunk.Bytes()) + switch err { + case nil, rtmp.ErrInvalidFlvTag: + default: + s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) + err = s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + } + continue + } + chunk.Close() + chunk = nil + } + } +} + // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - if s.conn == nil { - return 0, errors.New("no rtmp connection, cannot write") - } - _, err := s.conn.Write(d) + _, err := s.ring.Write(d) if err != nil { - err = s.restart() + s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) } - return len(d), err + s.ring.Flush() + return len(d), nil } func (s *rtmpSender) restart() error { - s.Close() + s.close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -316,6 +365,14 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { + if s.done != nil { + close(s.done) + } + s.wg.Wait() + return s.close() +} + +func (s *rtmpSender) close() error { if s.conn == nil { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index 0a09662a..80293759 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -41,14 +41,6 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// Ring buffer sizes and read/write timeouts. -const ( - rbSize = 100 - rbElementSize = 150000 - wTimeout = 10 * time.Millisecond - rTimeout = 10 * time.Millisecond -) - var ( errSendFailed = errors.New("send failed") ) @@ -56,29 +48,50 @@ var ( // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { - buf [][]byte - testFails bool - failAt int - currentPkt int - t *testing.T - sendDelay time.Duration - delayAt int + // Holds the clips written to this destination using Write. + buf [][]byte + + // testFails is set to true if we would like a write to fail at a particular + // clip as determined by failAt. + testFails bool + failAt int + + // Holds the current clip number. + currentClip int + + // Pointer to the testing.T of a test where this struct is being used. This + // is used so that logging can be done through the testing log utilities. + t *testing.T + + // sendDelay is the amount of time we would like a Write to be delayed when + // we hit the clip number indicated by delayAt. + sendDelay time.Duration + delayAt int + + // done will be used to send a signal to the main routine to indicate that + // the destination has received all clips. doneAt indicates the final clip + // number. + done chan struct{} + doneAt int } func (ts *destination) Write(d []byte) (int, error) { ts.t.Log("writing clip to destination") - if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + if ts.delayAt != 0 && ts.currentClip == ts.delayAt { time.Sleep(ts.sendDelay) } - if ts.testFails && ts.currentPkt == ts.failAt { + if ts.testFails && ts.currentClip == ts.failAt { ts.t.Log("failed send") - ts.currentPkt++ + ts.currentClip++ return 0, errSendFailed } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) - ts.currentPkt++ + if ts.currentClip == ts.doneAt { + close(ts.done) + } + ts.currentClip++ return len(d), nil } @@ -118,8 +131,9 @@ func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer, sender, sender and the MPEGTS encoder. - tstDst := &destination{t: t} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + const numberOfClips = 11 + dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -134,12 +148,12 @@ func TestMtsSenderSegment(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -196,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 - tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -212,12 +226,12 @@ func TestMtsSenderFailedSend(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-dst.done sender.Close() // Check that we have data as expected. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -276,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 - tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) + dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -291,12 +305,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give mtsSender time to finish up then Close. - time.Sleep(100 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectedCC := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo)