diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 552000e0..e9efbd97 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -26,7 +26,6 @@ LICENSE package mts import ( - "fmt" "io" "time" @@ -205,9 +204,6 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { // Write implements io.Writer. Write takes raw h264 and encodes into mpegts, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { - if len(data) > pes.MaxPesSize { - return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data)) - } now := time.Now() if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { e.pktCount = 0 diff --git a/revid/revid.go b/revid/revid.go index 3a85df69..e97746d1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -80,8 +80,12 @@ type Revid struct { ns *netsender.Sender // setupInput holds the current approach to setting up - // the input stream. - setupInput func() error + // the input stream. I will return a function used for cleaning up. + setupInput func() (func() error, error) + + // closeInput holds the cleanup function return from setupInput and is called + // in Revid.Stop(). + closeInput func() error // cmd is the exec'd process that may be used to produce // the input stream. @@ -286,11 +290,13 @@ func (r *Revid) Start() error { } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.isRunning = true - err := r.setupInput() + var err error + r.closeInput, err = r.setupInput() if err != nil { r.Stop() + return fmt.Errorf("could not setup input, failed with err: %v", err) } - return err + return nil } // Stop closes down the pipeline. This closes encoders and sender output routines, @@ -303,6 +309,13 @@ func (r *Revid) Stop() { return } + if r.closeInput != nil { + err := r.closeInput() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error()) + } + } + r.config.Logger.Log(logger.Info, pkg+"closing pipeline") err := r.encoders.Close() if err != nil { @@ -473,7 +486,7 @@ func (r *Revid) Update(vars map[string]string) error { // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. -func (r *Revid) startRaspivid() error { +func (r *Revid) startRaspivid() (func() error, error) { r.config.Logger.Log(logger.Info, pkg+"starting raspivid") const disabled = "0" @@ -505,7 +518,7 @@ func (r *Revid) startRaspivid() error { switch r.config.InputCodec { default: - return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) + return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) case H264: args = append(args, "--codec", "H264", @@ -523,7 +536,7 @@ func (r *Revid) startRaspivid() error { stdout, err := r.cmd.StdoutPipe() if err != nil { - return err + return nil, err } err = r.cmd.Start() if err != nil { @@ -532,10 +545,10 @@ func (r *Revid) startRaspivid() error { r.wg.Add(1) go r.processFrom(stdout, 0) - return nil + return nil, nil } -func (r *Revid) startV4L() error { +func (r *Revid) startV4L() (func() error, error) { const defaultVideo = "/dev/video0" r.config.Logger.Log(logger.Info, pkg+"starting webcam") @@ -563,34 +576,33 @@ func (r *Revid) startV4L() error { stdout, err := r.cmd.StdoutPipe() if err != nil { - return err + return nil, nil } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) - return err + return nil, nil } r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) - return nil + return nil, nil } // setupInputForFile sets things up for getting input from a file -func (r *Revid) setupInputForFile() error { +func (r *Revid) setupInputForFile() (func() error, error) { f, err := os.Open(r.config.InputPath) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) r.Stop() - return err + return nil, err } - defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) - return nil + return func() error { return f.Close() }, nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration) {