Merged in start-funcs-return-cleanup-closure (pull request #194)

revid: setupInput function for revid now returns closure that is used to do any clean up
This commit is contained in:
Saxon Milton 2019-05-06 05:47:56 +00:00
commit 4c080e9288
2 changed files with 28 additions and 20 deletions

View File

@ -26,7 +26,6 @@ LICENSE
package mts package mts
import ( import (
"fmt"
"io" "io"
"time" "time"
@ -205,9 +204,6 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
// Write implements io.Writer. Write takes raw h264 and encodes into mpegts, // Write implements io.Writer. Write takes raw h264 and encodes into mpegts,
// then sending it to the encoder's io.Writer destination. // then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(data []byte) (int, error) { func (e *Encoder) Write(data []byte) (int, error) {
if len(data) > pes.MaxPesSize {
return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data))
}
now := time.Now() now := time.Now()
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
e.pktCount = 0 e.pktCount = 0

View File

@ -80,8 +80,12 @@ type Revid struct {
ns *netsender.Sender ns *netsender.Sender
// setupInput holds the current approach to setting up // setupInput holds the current approach to setting up
// the input stream. // the input stream. I will return a function used for cleaning up.
setupInput func() error setupInput func() (func() error, error)
// closeInput holds the cleanup function return from setupInput and is called
// in Revid.Stop().
closeInput func() error
// cmd is the exec'd process that may be used to produce // cmd is the exec'd process that may be used to produce
// the input stream. // the input stream.
@ -286,11 +290,13 @@ func (r *Revid) Start() error {
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.isRunning = true r.isRunning = true
err := r.setupInput() var err error
r.closeInput, err = r.setupInput()
if err != nil { if err != nil {
r.Stop() r.Stop()
return fmt.Errorf("could not setup input, failed with err: %v", err)
} }
return err return nil
} }
// Stop closes down the pipeline. This closes encoders and sender output routines, // Stop closes down the pipeline. This closes encoders and sender output routines,
@ -303,6 +309,13 @@ func (r *Revid) Stop() {
return return
} }
if r.closeInput != nil {
err := r.closeInput()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error())
}
}
r.config.Logger.Log(logger.Info, pkg+"closing pipeline") r.config.Logger.Log(logger.Info, pkg+"closing pipeline")
err := r.encoders.Close() err := r.encoders.Close()
if err != nil { if err != nil {
@ -473,7 +486,7 @@ func (r *Revid) Update(vars map[string]string) error {
// startRaspivid sets up things for input from raspivid i.e. starts // startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output. // a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error { func (r *Revid) startRaspivid() (func() error, error) {
r.config.Logger.Log(logger.Info, pkg+"starting raspivid") r.config.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0" const disabled = "0"
@ -505,7 +518,7 @@ func (r *Revid) startRaspivid() error {
switch r.config.InputCodec { switch r.config.InputCodec {
default: default:
return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec)
case H264: case H264:
args = append(args, args = append(args,
"--codec", "H264", "--codec", "H264",
@ -523,7 +536,7 @@ func (r *Revid) startRaspivid() error {
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
if err != nil { if err != nil {
return err return nil, err
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
@ -532,10 +545,10 @@ func (r *Revid) startRaspivid() error {
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(stdout, 0) go r.processFrom(stdout, 0)
return nil return nil, nil
} }
func (r *Revid) startV4L() error { func (r *Revid) startV4L() (func() error, error) {
const defaultVideo = "/dev/video0" const defaultVideo = "/dev/video0"
r.config.Logger.Log(logger.Info, pkg+"starting webcam") r.config.Logger.Log(logger.Info, pkg+"starting webcam")
@ -563,34 +576,33 @@ func (r *Revid) startV4L() error {
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
if err != nil { if err != nil {
return err return nil, nil
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error())
return err return nil, nil
} }
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(stdout, time.Duration(0)) go r.processFrom(stdout, time.Duration(0))
return nil return nil, nil
} }
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() (func() error, error) {
f, err := os.Open(r.config.InputPath) f, err := os.Open(r.config.InputPath)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
r.Stop() r.Stop()
return err return nil, err
} }
defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
return nil return func() error { return f.Close() }, nil
} }
func (r *Revid) processFrom(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {