From 9095044e234e730c19ed6a4f468bb46f2ba5c944 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:55:40 +1030 Subject: [PATCH] revid: using waitgroups so that revid.Stop() is safer - we can wait until the input and output routines are done before we do anything, like touch the revid config. Also started modifying revid.Update() to remove errors introduced after the copy of updateRevid from revid-cli to revid.go in the previous commit. --- revid/revid.go | 59 ++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 81546ead..33d847ea 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - revid.go + r.go DESCRIPTION See Readme.md @@ -40,7 +40,6 @@ import ( "sync" "time" - "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -120,10 +119,11 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. mu sync.Mutex isRunning bool + wg sync.WaitGroup + err chan error } @@ -334,7 +334,7 @@ func (r *Revid) Config() Config { return ret } -// setIsRunning sets revid.isRunning using b. +// setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b @@ -348,9 +348,11 @@ func (r *Revid) Start() error { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -371,6 +373,7 @@ func (r *Revid) Stop() error { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() return nil } @@ -388,89 +391,89 @@ func (r *Revid) Update(vars map[string]string) error { // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Outputs[0] = revid.File + r.config.Outputs[0] = File case "Http": - cfg.Outputs[0] = revid.Http + r.config.Outputs[0] = Http case "Rtmp": - cfg.Outputs[0] = revid.Rtmp + r.config.Outputs[0] = Rtmp case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp + r.config.Outputs[0] = FfmpegRtmp default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } - cfg.FramesPerClip = uint(f) + r.config.FramesPerClip = uint(f) case "RtmpUrl": - cfg.RtmpUrl = value + r.config.RtmpUrl = value case "Bitrate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.Bitrate = uint(r) + r.config.Bitrate = uint(r) case "OutputFileName": - cfg.OutputFileName = value + r.config.OutputFileName = value case "InputFileName": - cfg.InputFileName = value + r.config.InputFileName = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid height param", "value", value) break } - cfg.Height = uint(h) + r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid width param", "value", value) break } - cfg.Width = uint(w) + r.config.Width = uint(w) case "FrameRate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.FrameRate = uint(r) + r.config.FrameRate = uint(r) case "HttpAddress": - cfg.HttpAddress = value + r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } - cfg.Quantization = uint(q) + r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } - cfg.IntraRefreshPeriod = uint(p) + r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipHorizontal = true + r.config.FlipHorizontal = true case "false": - cfg.FlipHorizontal = false + r.config.FlipHorizontal = false default: log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipVertical = true + r.config.FlipVertical = true case "false": - cfg.FlipVertical = false + r.config.FlipVertical = false default: log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } @@ -478,7 +481,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return startRevid(ns, cfg) + return r.Start() } // outputClips takes the clips produced in the packClips method and outputs them @@ -622,6 +625,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } + r.wg.Add(1) go r.processFrom(stdout, 0) return nil } @@ -670,6 +674,7 @@ func (r *Revid) startV4L() error { return err } + r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } @@ -685,6 +690,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } @@ -693,4 +699,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() }