revid: using waitgroups so that revid.Stop() is safer - we can wait until the input and output routines are done before we do anything, like touch the revid config. Also started modifying revid.Update() to remove errors introduced after the copy of updateRevid from revid-cli to revid.go in the previous commit.

This commit is contained in:
saxon 2019-02-03 21:55:40 +10:30
parent 1af4b25030
commit 9095044e23
1 changed files with 33 additions and 26 deletions

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
revid.go r.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
@ -40,7 +40,6 @@ import (
"sync" "sync"
"time" "time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream"
"bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
@ -120,10 +119,11 @@ type Revid struct {
// bitrate hold the last send bitrate calculation result. // bitrate hold the last send bitrate calculation result.
bitrate int bitrate int
// isRunning is a loaded and cocked foot-gun.
mu sync.Mutex mu sync.Mutex
isRunning bool isRunning bool
wg sync.WaitGroup
err chan error err chan error
} }
@ -334,7 +334,7 @@ func (r *Revid) Config() Config {
return ret return ret
} }
// setIsRunning sets revid.isRunning using b. // setIsRunning sets r.isRunning using b.
func (r *Revid) setIsRunning(b bool) { func (r *Revid) setIsRunning(b bool) {
r.mu.Lock() r.mu.Lock()
r.isRunning = b r.isRunning = b
@ -348,9 +348,11 @@ func (r *Revid) Start() error {
return errors.New(pkg + "start called but revid is already running") return errors.New(pkg + "start called but revid is already running")
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here
r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.setIsRunning(true) r.setIsRunning(true)
r.config.Logger.Log(logger.Info, pkg+"starting output routine") r.config.Logger.Log(logger.Info, pkg+"starting output routine")
r.wg.Add(1)
go r.outputClips() go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput() err := r.setupInput()
@ -371,6 +373,7 @@ func (r *Revid) Stop() error {
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.wg.Wait()
return nil return nil
} }
@ -388,89 +391,89 @@ func (r *Revid) Update(vars map[string]string) error {
// Maybe we shouldn't be doing this! // Maybe we shouldn't be doing this!
switch value { switch value {
case "File": case "File":
cfg.Outputs[0] = revid.File r.config.Outputs[0] = File
case "Http": case "Http":
cfg.Outputs[0] = revid.Http r.config.Outputs[0] = Http
case "Rtmp": case "Rtmp":
cfg.Outputs[0] = revid.Rtmp r.config.Outputs[0] = Rtmp
case "FfmpegRtmp": case "FfmpegRtmp":
cfg.Outputs[0] = revid.FfmpegRtmp r.config.Outputs[0] = FfmpegRtmp
default: default:
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue continue
} }
case "FramesPerClip": case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0) f, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break break
} }
cfg.FramesPerClip = uint(f) r.config.FramesPerClip = uint(f)
case "RtmpUrl": case "RtmpUrl":
cfg.RtmpUrl = value r.config.RtmpUrl = value
case "Bitrate": case "Bitrate":
r, err := strconv.ParseUint(value, 10, 0) r, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break break
} }
cfg.Bitrate = uint(r) r.config.Bitrate = uint(r)
case "OutputFileName": case "OutputFileName":
cfg.OutputFileName = value r.config.OutputFileName = value
case "InputFileName": case "InputFileName":
cfg.InputFileName = value r.config.InputFileName = value
case "Height": case "Height":
h, err := strconv.ParseUint(value, 10, 0) h, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid height param", "value", value) log.Log(logger.Warning, pkg+"invalid height param", "value", value)
break break
} }
cfg.Height = uint(h) r.config.Height = uint(h)
case "Width": case "Width":
w, err := strconv.ParseUint(value, 10, 0) w, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid width param", "value", value) log.Log(logger.Warning, pkg+"invalid width param", "value", value)
break break
} }
cfg.Width = uint(w) r.config.Width = uint(w)
case "FrameRate": case "FrameRate":
r, err := strconv.ParseUint(value, 10, 0) r, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break break
} }
cfg.FrameRate = uint(r) r.config.FrameRate = uint(r)
case "HttpAddress": case "HttpAddress":
cfg.HttpAddress = value r.config.HttpAddress = value
case "Quantization": case "Quantization":
q, err := strconv.ParseUint(value, 10, 0) q, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break break
} }
cfg.Quantization = uint(q) r.config.Quantization = uint(q)
case "IntraRefreshPeriod": case "IntraRefreshPeriod":
p, err := strconv.ParseUint(value, 10, 0) p, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break break
} }
cfg.IntraRefreshPeriod = uint(p) r.config.IntraRefreshPeriod = uint(p)
case "HorizontalFlip": case "HorizontalFlip":
switch strings.ToLower(value) { switch strings.ToLower(value) {
case "true": case "true":
cfg.FlipHorizontal = true r.config.FlipHorizontal = true
case "false": case "false":
cfg.FlipHorizontal = false r.config.FlipHorizontal = false
default: default:
log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
} }
case "VerticalFlip": case "VerticalFlip":
switch strings.ToLower(value) { switch strings.ToLower(value) {
case "true": case "true":
cfg.FlipVertical = true r.config.FlipVertical = true
case "false": case "false":
cfg.FlipVertical = false r.config.FlipVertical = false
default: default:
log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
} }
@ -478,7 +481,7 @@ func (r *Revid) Update(vars map[string]string) error {
} }
} }
return startRevid(ns, cfg) return r.Start()
} }
// outputClips takes the clips produced in the packClips method and outputs them // outputClips takes the clips produced in the packClips method and outputs them
@ -622,6 +625,7 @@ func (r *Revid) startRaspivid() error {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
} }
r.wg.Add(1)
go r.processFrom(stdout, 0) go r.processFrom(stdout, 0)
return nil return nil
} }
@ -670,6 +674,7 @@ func (r *Revid) startV4L() error {
return err return err
} }
r.wg.Add(1)
go r.processFrom(stdout, time.Duration(0)) go r.processFrom(stdout, time.Duration(0))
return nil return nil
} }
@ -685,6 +690,7 @@ func (r *Revid) setupInputForFile() error {
defer f.Close() defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
return nil return nil
} }
@ -693,4 +699,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data") r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.encoder, read, delay) r.err <- r.lexTo(r.encoder, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.wg.Done()
} }