revid: setupInput function for revid now returns closure that is used to do any clean up

This commit is contained in:
Saxon 2019-05-06 15:12:05 +09:30
parent 408492ae1a
commit 77ff88392f
2 changed files with 28 additions and 20 deletions

View File

@ -26,7 +26,6 @@ LICENSE
package mts
import (
"fmt"
"io"
"time"
@ -205,9 +204,6 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
// Write implements io.Writer. Write takes raw h264 and encodes into mpegts,
// then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(data []byte) (int, error) {
if len(data) > pes.MaxPesSize {
return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data))
}
now := time.Now()
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
e.pktCount = 0

View File

@ -80,8 +80,12 @@ type Revid struct {
ns *netsender.Sender
// setupInput holds the current approach to setting up
// the input stream.
setupInput func() error
// the input stream. I will return a function used for cleaning up.
setupInput func() (func() error, error)
// closeInput holds the cleanup function return from setupInput and is called
// in Revid.Stop().
closeInput func() error
// cmd is the exec'd process that may be used to produce
// the input stream.
@ -286,11 +290,13 @@ func (r *Revid) Start() error {
}
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.isRunning = true
err := r.setupInput()
var err error
r.closeInput, err = r.setupInput()
if err != nil {
r.Stop()
return fmt.Errorf("could not setup input, failed with err: %v", err)
}
return err
return nil
}
// Stop closes down the pipeline. This closes encoders and sender output routines,
@ -303,6 +309,13 @@ func (r *Revid) Stop() {
return
}
if r.closeInput != nil {
err := r.closeInput()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error())
}
}
r.config.Logger.Log(logger.Info, pkg+"closing pipeline")
err := r.encoders.Close()
if err != nil {
@ -473,7 +486,7 @@ func (r *Revid) Update(vars map[string]string) error {
// startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error {
func (r *Revid) startRaspivid() (func() error, error) {
r.config.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0"
@ -505,7 +518,7 @@ func (r *Revid) startRaspivid() error {
switch r.config.InputCodec {
default:
return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec)
return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec)
case H264:
args = append(args,
"--codec", "H264",
@ -523,7 +536,7 @@ func (r *Revid) startRaspivid() error {
stdout, err := r.cmd.StdoutPipe()
if err != nil {
return err
return nil, err
}
err = r.cmd.Start()
if err != nil {
@ -532,10 +545,10 @@ func (r *Revid) startRaspivid() error {
r.wg.Add(1)
go r.processFrom(stdout, 0)
return nil
return nil, nil
}
func (r *Revid) startV4L() error {
func (r *Revid) startV4L() (func() error, error) {
const defaultVideo = "/dev/video0"
r.config.Logger.Log(logger.Info, pkg+"starting webcam")
@ -563,34 +576,33 @@ func (r *Revid) startV4L() error {
stdout, err := r.cmd.StdoutPipe()
if err != nil {
return err
return nil, nil
}
err = r.cmd.Start()
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error())
return err
return nil, nil
}
r.wg.Add(1)
go r.processFrom(stdout, time.Duration(0))
return nil
return nil, nil
}
// setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error {
func (r *Revid) setupInputForFile() (func() error, error) {
f, err := os.Open(r.config.InputPath)
if err != nil {
r.config.Logger.Log(logger.Error, err.Error())
r.Stop()
return err
return nil, err
}
defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
return nil
return func() error { return f.Close() }, nil
}
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {