diff --git a/revid/revid.go b/revid/revid.go index 81546ead..33d847ea 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - revid.go + r.go DESCRIPTION See Readme.md @@ -40,7 +40,6 @@ import ( "sync" "time" - "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -120,10 +119,11 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. mu sync.Mutex isRunning bool + wg sync.WaitGroup + err chan error } @@ -334,7 +334,7 @@ func (r *Revid) Config() Config { return ret } -// setIsRunning sets revid.isRunning using b. +// setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b @@ -348,9 +348,11 @@ func (r *Revid) Start() error { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -371,6 +373,7 @@ func (r *Revid) Stop() error { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() return nil } @@ -388,89 +391,89 @@ func (r *Revid) Update(vars map[string]string) error { // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Outputs[0] = revid.File + r.config.Outputs[0] = File case "Http": - cfg.Outputs[0] = revid.Http + r.config.Outputs[0] = Http case "Rtmp": - cfg.Outputs[0] = revid.Rtmp + r.config.Outputs[0] = Rtmp case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp + r.config.Outputs[0] = FfmpegRtmp default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } - cfg.FramesPerClip = uint(f) + r.config.FramesPerClip = uint(f) case "RtmpUrl": - cfg.RtmpUrl = value + r.config.RtmpUrl = value case "Bitrate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.Bitrate = uint(r) + r.config.Bitrate = uint(r) case "OutputFileName": - cfg.OutputFileName = value + r.config.OutputFileName = value case "InputFileName": - cfg.InputFileName = value + r.config.InputFileName = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid height param", "value", value) break } - cfg.Height = uint(h) + r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid width param", "value", value) break } - cfg.Width = uint(w) + r.config.Width = uint(w) case "FrameRate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.FrameRate = uint(r) + r.config.FrameRate = uint(r) case "HttpAddress": - cfg.HttpAddress = value + r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } - cfg.Quantization = uint(q) + r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } - cfg.IntraRefreshPeriod = uint(p) + r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipHorizontal = true + r.config.FlipHorizontal = true case "false": - cfg.FlipHorizontal = false + r.config.FlipHorizontal = false default: log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipVertical = true + r.config.FlipVertical = true case "false": - cfg.FlipVertical = false + r.config.FlipVertical = false default: log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } @@ -478,7 +481,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return startRevid(ns, cfg) + return r.Start() } // outputClips takes the clips produced in the packClips method and outputs them @@ -622,6 +625,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } + r.wg.Add(1) go r.processFrom(stdout, 0) return nil } @@ -670,6 +674,7 @@ func (r *Revid) startV4L() error { return err } + r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } @@ -685,6 +690,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } @@ -693,4 +699,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() }