From 1af4b250306d9240d097845672da2fcbc0c2413e Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:17:44 +1030 Subject: [PATCH 01/14] cmd/revid-cli & revid: removed startRevid and stopRevid as shouldn't be required when we have revid.Start() and revid.Stop(). Created revid.Config() which returns copy of config safely using mutex. removed updateRevid in revid-cli and move to fun revid.Update() - as there's no reason why it can't just be a receiver func - even better considering we want to start moving alot of stuff from revid-cli to the revid-api anyways. --- cmd/revid-cli/main.go | 54 +++++--------------- revid/revid.go | 115 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 42 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 5b826b91..826cb2a4 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -72,23 +72,22 @@ func main() { cfg := handleFlags() if !*useNetsender { - // run revid for the specified duration - rv, _, err := startRevid(nil, cfg) + rv, err := revid.New(cfg, nil) if err != nil { + cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error()) + } + if err = rv.Start(); err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) } time.Sleep(*runDurationPtr) - err = stopRevid(rv) - if err != nil { + if err = rv.Stop(); err != nil { cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) } return } - err := run(nil, cfg) - if err != nil { + if err := run(cfg); err != nil { log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) - os.Exit(1) } } @@ -244,27 +243,20 @@ func handleFlags() revid.Config { } // initialize then run the main NetSender client -func run(rv *revid.Revid, cfg revid.Config) error { +func run(cfg revid.Config) error { // initialize NetSender and use NetSender's logger - //config.Logger = netsender.Logger() log.Log(logger.Info, pkg+"running in NetSender mode") var ns netsender.Sender - err := ns.Init(log, nil, nil, nil) - if err != nil { + if err := ns.Init(log, nil, nil, nil); err != nil { return err } vars, _ := ns.Vars() vs := ns.VarSum() - paused := false - if vars["mode"] == "Paused" { - paused = true - } - if !paused { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) - if err != nil { - return err - } + + rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + if err != nil { + return err } for { @@ -331,28 +323,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -// wrappers for stopping and starting revid -func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) { - rv, err := revid.New(cfg, ns) - if err != nil { - return nil, cfg, err - } - err = rv.Start() - return rv, cfg, err -} - -func stopRevid(rv *revid.Revid) error { - err := rv.Stop() - if err != nil { - return err - } - - // FIXME(kortschak): Is this waiting on completion of work? - // Use a wait group and Wait method if it is. - time.Sleep(revidStopTime) - return nil -} - func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { if stop { err := stopRevid(rv) diff --git a/revid/revid.go b/revid/revid.go index 6833ec2b..81546ead 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -40,6 +40,7 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -326,6 +327,13 @@ func (r *Revid) IsRunning() bool { return ret } +func (r *Revid) Config() Config { + r.mu.Lock() + ret := r.config + r.mu.Unlock() + return ret +} + // setIsRunning sets revid.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() @@ -366,6 +374,113 @@ func (r *Revid) Stop() error { return nil } +func (r *Revid) Update(vars map[string]string) error { + if r.IsRunning() { + r.Stop() + } + //look through the vars and update revid where needed + for key, value := range vars { + switch key { + case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! + switch value { + case "File": + cfg.Outputs[0] = revid.File + case "Http": + cfg.Outputs[0] = revid.Http + case "Rtmp": + cfg.Outputs[0] = revid.Rtmp + case "FfmpegRtmp": + cfg.Outputs[0] = revid.FfmpegRtmp + default: + log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + continue + } + case "FramesPerClip": + f, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + break + } + cfg.FramesPerClip = uint(f) + case "RtmpUrl": + cfg.RtmpUrl = value + case "Bitrate": + r, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + cfg.Bitrate = uint(r) + case "OutputFileName": + cfg.OutputFileName = value + case "InputFileName": + cfg.InputFileName = value + case "Height": + h, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid height param", "value", value) + break + } + cfg.Height = uint(h) + case "Width": + w, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid width param", "value", value) + break + } + cfg.Width = uint(w) + case "FrameRate": + r, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + cfg.FrameRate = uint(r) + case "HttpAddress": + cfg.HttpAddress = value + case "Quantization": + q, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + break + } + cfg.Quantization = uint(q) + case "IntraRefreshPeriod": + p, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + break + } + cfg.IntraRefreshPeriod = uint(p) + case "HorizontalFlip": + switch strings.ToLower(value) { + case "true": + cfg.FlipHorizontal = true + case "false": + cfg.FlipHorizontal = false + default: + log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + } + case "VerticalFlip": + switch strings.ToLower(value) { + case "true": + cfg.FlipVertical = true + case "false": + cfg.FlipVertical = false + default: + log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + } + default: + } + } + + return startRevid(ns, cfg) +} + // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { From 9095044e234e730c19ed6a4f468bb46f2ba5c944 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:55:40 +1030 Subject: [PATCH 02/14] revid: using waitgroups so that revid.Stop() is safer - we can wait until the input and output routines are done before we do anything, like touch the revid config. Also started modifying revid.Update() to remove errors introduced after the copy of updateRevid from revid-cli to revid.go in the previous commit. --- revid/revid.go | 59 ++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 81546ead..33d847ea 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - revid.go + r.go DESCRIPTION See Readme.md @@ -40,7 +40,6 @@ import ( "sync" "time" - "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -120,10 +119,11 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. mu sync.Mutex isRunning bool + wg sync.WaitGroup + err chan error } @@ -334,7 +334,7 @@ func (r *Revid) Config() Config { return ret } -// setIsRunning sets revid.isRunning using b. +// setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b @@ -348,9 +348,11 @@ func (r *Revid) Start() error { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -371,6 +373,7 @@ func (r *Revid) Stop() error { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() return nil } @@ -388,89 +391,89 @@ func (r *Revid) Update(vars map[string]string) error { // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Outputs[0] = revid.File + r.config.Outputs[0] = File case "Http": - cfg.Outputs[0] = revid.Http + r.config.Outputs[0] = Http case "Rtmp": - cfg.Outputs[0] = revid.Rtmp + r.config.Outputs[0] = Rtmp case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp + r.config.Outputs[0] = FfmpegRtmp default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } - cfg.FramesPerClip = uint(f) + r.config.FramesPerClip = uint(f) case "RtmpUrl": - cfg.RtmpUrl = value + r.config.RtmpUrl = value case "Bitrate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.Bitrate = uint(r) + r.config.Bitrate = uint(r) case "OutputFileName": - cfg.OutputFileName = value + r.config.OutputFileName = value case "InputFileName": - cfg.InputFileName = value + r.config.InputFileName = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid height param", "value", value) break } - cfg.Height = uint(h) + r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid width param", "value", value) break } - cfg.Width = uint(w) + r.config.Width = uint(w) case "FrameRate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.FrameRate = uint(r) + r.config.FrameRate = uint(r) case "HttpAddress": - cfg.HttpAddress = value + r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } - cfg.Quantization = uint(q) + r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } - cfg.IntraRefreshPeriod = uint(p) + r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipHorizontal = true + r.config.FlipHorizontal = true case "false": - cfg.FlipHorizontal = false + r.config.FlipHorizontal = false default: log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipVertical = true + r.config.FlipVertical = true case "false": - cfg.FlipVertical = false + r.config.FlipVertical = false default: log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } @@ -478,7 +481,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return startRevid(ns, cfg) + return r.Start() } // outputClips takes the clips produced in the packClips method and outputs them @@ -622,6 +625,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } + r.wg.Add(1) go r.processFrom(stdout, 0) return nil } @@ -670,6 +674,7 @@ func (r *Revid) startV4L() error { return err } + r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } @@ -685,6 +690,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } @@ -693,4 +699,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() } From 1010721dd070882058660fff25f3e84db4ad446d Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 23:37:38 +1030 Subject: [PATCH 03/14] cmd/revid-cli & revid: Checking revid mode differently - now using ns.Mode(), which should soon be an available feature. Also now using ns.SetMode() - which tells netreceiver that we've changed mode. --- cmd/revid-cli/main.go | 157 +++++++++--------------------------------- revid/revid.go | 26 +++---- 2 files changed, 45 insertions(+), 138 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 826cb2a4..58f8e507 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -50,6 +50,13 @@ const ( defaultLogVerbosity = logger.Debug ) +// Revid modes +const ( + normal = "Normal" + paused = "Paused" + burst = "Burst" +) + // Other misc consts const ( netSendRetryTime = 5 * time.Second @@ -244,9 +251,9 @@ func handleFlags() revid.Config { // initialize then run the main NetSender client func run(cfg revid.Config) error { - // initialize NetSender and use NetSender's logger log.Log(logger.Info, pkg+"running in NetSender mode") + // initialize NetSender and use NetSender's logger var ns netsender.Sender if err := ns.Init(log, nil, nil, nil); err != nil { return err @@ -254,8 +261,12 @@ func run(cfg revid.Config) error { vars, _ := ns.Vars() vs := ns.VarSum() - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + rv, err := revid.New(cfg, &ns) if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) + } + + if err = rv.Update(vars); err != nil { return err } @@ -267,7 +278,6 @@ func run(cfg revid.Config) error { } if vs != ns.VarSum() { - // vars changed vars, err := ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) @@ -275,24 +285,32 @@ func run(cfg revid.Config) error { continue } vs = ns.VarSum() - if vars["mode"] == "Paused" { - if !paused { + + switch ns.Mode() { + case paused: + if rv.IsRunning() { log.Log(logger.Info, pkg+"pausing revid") - err = stopRevid(rv) - if err != nil { - log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error()) + if err = rv.Stop(); err != nil { + log.Log(logger.Error, pkg+"failed to stop revid", "error", err.Error()) continue } - paused = true + ns.SetMode(paused) } - } else { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused) - if err != nil { + case normal: + if err = rv.Update(vars); err != nil { return err } - if paused { - paused = false + ns.SetMode(normal) + case burst: + if err = rv.Start(); err != nil { + return err } + ns.SetMode(burst) + time.Sleep(rv.Config().BurstPeriod) + if err = rv.Stop(); err != nil { + return err + } + ns.SetMode(paused) } } sleepTime, _ := strconv.Atoi(ns.Param("mp")) @@ -323,117 +341,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { - if stop { - err := stopRevid(rv) - if err != nil { - return nil, cfg, err - } - } - - //look through the vars and update revid where needed - for key, value := range vars { - switch key { - case "Output": - // FIXME(kortschak): There can be only one! - // How do we specify outputs after the first? - // - // Maybe we shouldn't be doing this! - switch value { - case "File": - cfg.Outputs[0] = revid.File - case "Http": - cfg.Outputs[0] = revid.Http - case "Rtmp": - cfg.Outputs[0] = revid.Rtmp - case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp - default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue - } - case "FramesPerClip": - f, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) - break - } - cfg.FramesPerClip = uint(f) - case "RtmpUrl": - cfg.RtmpUrl = value - case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.Bitrate = uint(r) - case "OutputFileName": - cfg.OutputFileName = value - case "InputFileName": - cfg.InputFileName = value - case "Height": - h, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) - break - } - cfg.Height = uint(h) - case "Width": - w, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) - break - } - cfg.Width = uint(w) - case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.FrameRate = uint(r) - case "HttpAddress": - cfg.HttpAddress = value - case "Quantization": - q, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) - break - } - cfg.Quantization = uint(q) - case "IntraRefreshPeriod": - p, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) - break - } - cfg.IntraRefreshPeriod = uint(p) - case "HorizontalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipHorizontal = true - case "false": - cfg.FlipHorizontal = false - default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) - } - case "VerticalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipVertical = true - case "false": - cfg.FlipVertical = false - default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) - } - default: - } - } - - return startRevid(ns, cfg) -} - // flagStrings implements an appending string set flag. type flagStrings []string diff --git a/revid/revid.go b/revid/revid.go index 33d847ea..c90ba8da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -405,19 +405,19 @@ func (r *Revid) Update(vars map[string]string) error { case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } r.config.FramesPerClip = uint(f) case "RtmpUrl": r.config.RtmpUrl = value case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) + v, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - r.config.Bitrate = uint(r) + r.config.Bitrate = uint(v) case "OutputFileName": r.config.OutputFileName = value case "InputFileName": @@ -425,37 +425,37 @@ func (r *Revid) Update(vars map[string]string) error { case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) break } r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) break } r.config.Width = uint(w) case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) + v, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - r.config.FrameRate = uint(r) + r.config.FrameRate = uint(v) case "HttpAddress": r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } r.config.IntraRefreshPeriod = uint(p) @@ -466,7 +466,7 @@ func (r *Revid) Update(vars map[string]string) error { case "false": r.config.FlipHorizontal = false default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { @@ -475,7 +475,7 @@ func (r *Revid) Update(vars map[string]string) error { case "false": r.config.FlipVertical = false default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } default: } From 6171c4e9994a87722733249a11e0d607e8e788b5 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 23:43:51 +1030 Subject: [PATCH 04/14] revid: added handling of burstPeriod to config --- revid/config.go | 7 +++++++ revid/revid.go | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/revid/config.go b/revid/config.go index dc9c5a8d..b0ba2bc4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -68,6 +68,7 @@ type Config struct { RtpAddress string Logger Logger SendRetry bool + BurstPeriod uint } // Enums for config struct @@ -114,6 +115,7 @@ const ( defaultInputCodec = H264 defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds ) // Validate checks for any errors in the config fields and defaults settings @@ -200,6 +202,11 @@ func (c *Config) Validate(r *Revid) error { } } + if c.BurstPeriod == 0 { + c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) + c.BurstPeriod = defaultBurstPeriod + } + if c.FramesPerClip < 1 { c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) diff --git a/revid/revid.go b/revid/revid.go index c90ba8da..9f1cc12c 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -477,6 +477,13 @@ func (r *Revid) Update(vars map[string]string) error { default: r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } + case "BurstPeriod": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) + break + } + r.config.BurstPeriod = uint(v) default: } } From ee7eb84d26e29b3d9cab7005fddd48d1847a8211 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 13:25:37 +1030 Subject: [PATCH 05/14] revid-cli: correctly using ns.Mode() and ns.SetMode() --- cmd/revid-cli/main.go | 75 +++++++++++++++++++++++-------------------- revid/revid.go | 2 +- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 58f8e507..ac8fff12 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -266,52 +266,57 @@ func run(cfg revid.Config) error { log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } + // Update revid to get latest config settings from netreceiver. if err = rv.Update(vars); err != nil { return err } + // If mode on netreceiver isn't paused then we can start revid. + if ns.Mode() != paused { + if err = rv.Start(); err != nil { + return err + } + } + for { - if err := send(&ns, rv); err != nil { - log.Log(logger.Error, pkg+"polling failed", "error", err.Error()) + if err := ns.Run(); err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...") time.Sleep(netSendRetryTime) continue } - if vs != ns.VarSum() { - vars, err := ns.Vars() - if err != nil { - log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) - time.Sleep(netSendRetryTime) - continue - } - vs = ns.VarSum() + // If var sum hasn't change we continue + if vs == ns.VarSum() { + continue + } - switch ns.Mode() { - case paused: - if rv.IsRunning() { - log.Log(logger.Info, pkg+"pausing revid") - if err = rv.Stop(); err != nil { - log.Log(logger.Error, pkg+"failed to stop revid", "error", err.Error()) - continue - } - ns.SetMode(paused) - } - case normal: - if err = rv.Update(vars); err != nil { - return err - } - ns.SetMode(normal) - case burst: - if err = rv.Start(); err != nil { - return err - } - ns.SetMode(burst) - time.Sleep(rv.Config().BurstPeriod) - if err = rv.Stop(); err != nil { - return err - } - ns.SetMode(paused) + vars, err := ns.Vars() + if err != nil { + log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + vs = ns.VarSum() + + if err = rv.Update(vars); err != nil { + return err + } + + switch ns.Mode() { + case paused: + case normal: + if err = rv.Start(); err != nil { + return err } + case burst: + if err = rv.Start(); err != nil { + return err + } + time.Sleep(time.Duration(rv.Config().BurstPeriod)) + if err = rv.Stop(); err != nil { + return err + } + ns.SetMode(paused, &vs) } sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) diff --git a/revid/revid.go b/revid/revid.go index 9f1cc12c..e4c7dbbe 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -488,7 +488,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return r.Start() + return nil } // outputClips takes the clips produced in the packClips method and outputs them From 93e3899725b8e6029c1cc80265f0ba1405eb4b7e Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:04:49 +1030 Subject: [PATCH 06/14] cmd/revid-cli: using ns.Send() rather than ns.Run() to poll --- cmd/revid-cli/main.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index ac8fff12..bcabf40d 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -272,15 +272,19 @@ func run(cfg revid.Config) error { } // If mode on netreceiver isn't paused then we can start revid. - if ns.Mode() != paused { + if ns.Mode() != paused && ns.Mode() != burst { if err = rv.Start(); err != nil { return err } } + if ns.Mode() == burst { + ns.SetMode(normal, &vs) + } + for { - if err := ns.Run(); err != nil { - log.Log(logger.Error, pkg+"Run Failed. Retrying...") + if err := send(&ns, rv); err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } @@ -309,10 +313,12 @@ func run(cfg revid.Config) error { return err } case burst: + log.Log(logger.Info, pkg+"Starting burst...") if err = rv.Start(); err != nil { return err } - time.Sleep(time.Duration(rv.Config().BurstPeriod)) + time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) + log.Log(logger.Info, pkg+"Stopping burst...") if err = rv.Stop(); err != nil { return err } From 8978f9edc59d050a1ee590edfd58f59f6cf2ace5 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:12:30 +1030 Subject: [PATCH 07/14] cmd/revid-cli & revid: using goto to sleep for monitor period, and using wg.Done() at the end of output routine. --- cmd/revid-cli/main.go | 10 +++++++--- revid/revid.go | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index bcabf40d..1eafaae9 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -253,12 +253,15 @@ func handleFlags() revid.Config { func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") + var err error + var vars map[string]string + // initialize NetSender and use NetSender's logger var ns netsender.Sender if err := ns.Init(log, nil, nil, nil); err != nil { return err } - vars, _ := ns.Vars() + vars, _ = ns.Vars() vs := ns.VarSum() rv, err := revid.New(cfg, &ns) @@ -291,10 +294,10 @@ func run(cfg revid.Config) error { // If var sum hasn't change we continue if vs == ns.VarSum() { - continue + goto sleep } - vars, err := ns.Vars() + vars, err = ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) time.Sleep(netSendRetryTime) @@ -324,6 +327,7 @@ func run(cfg revid.Config) error { } ns.SetMode(paused, &vs) } + sleep: sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } diff --git a/revid/revid.go b/revid/revid.go index e4c7dbbe..d0f3abf6 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -582,6 +582,7 @@ loop: r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } + r.wg.Done() } // startRaspivid sets up things for input from raspivid i.e. starts From 1cdbfa2c66d2ccae05cf86ed9e81e8d6a4bae9c1 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:14:19 +1030 Subject: [PATCH 08/14] cmd/revid-cli: setting mode to paused if ns is in burst mode. --- cmd/revid-cli/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 1eafaae9..b33ef124 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -282,7 +282,7 @@ func run(cfg revid.Config) error { } if ns.Mode() == burst { - ns.SetMode(normal, &vs) + ns.SetMode(paused, &vs) } for { From bd2958ba4e0ccc62fe8c5af4d97cf7951bfcaa59 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 19:14:02 +1030 Subject: [PATCH 09/14] cmd/revid-cli & revid: added TODO for the use of Run() instead of send in cmd/revid-cli/main.go. Fixed filename in revid/revid.go file header. Renamed ret to cfg in revid.Config(). Catching error from call to revid.Stop() in revid.Udate() --- cmd/revid-cli/main.go | 1 + revid/revid.go | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index b33ef124..29191d3a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -286,6 +286,7 @@ func run(cfg revid.Config) error { } for { + // TODO(saxon): replace this call with call to ns.Run(). if err := send(&ns, rv); err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) diff --git a/revid/revid.go b/revid/revid.go index d0f3abf6..a3c10b66 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - r.go + revid.go DESCRIPTION See Readme.md @@ -329,9 +329,9 @@ func (r *Revid) IsRunning() bool { func (r *Revid) Config() Config { r.mu.Lock() - ret := r.config + cfg := r.config r.mu.Unlock() - return ret + return cfg } // setIsRunning sets r.isRunning using b. @@ -379,7 +379,9 @@ func (r *Revid) Stop() error { func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { - r.Stop() + if err := r.Stop(); err != nil { + return err + } } //look through the vars and update revid where needed for key, value := range vars { From 35344402b848ec34a81260002e0394a7452fd225 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:32:16 +1030 Subject: [PATCH 10/14] cmd/revid-cli/main.go: not using closed scope conditions anymore --- cmd/revid-cli/main.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 29191d3a..52bb283b 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -253,14 +253,15 @@ func handleFlags() revid.Config { func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") - var err error var vars map[string]string // initialize NetSender and use NetSender's logger var ns netsender.Sender - if err := ns.Init(log, nil, nil, nil); err != nil { + err := ns.Init(log, nil, nil, nil) + if err != nil { return err } + vars, _ = ns.Vars() vs := ns.VarSum() @@ -270,7 +271,8 @@ func run(cfg revid.Config) error { } // Update revid to get latest config settings from netreceiver. - if err = rv.Update(vars); err != nil { + err = rv.Update(vars) + if err != nil { return err } @@ -287,7 +289,8 @@ func run(cfg revid.Config) error { for { // TODO(saxon): replace this call with call to ns.Run(). - if err := send(&ns, rv); err != nil { + err = send(&ns, rv) + if err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue @@ -306,24 +309,28 @@ func run(cfg revid.Config) error { } vs = ns.VarSum() - if err = rv.Update(vars); err != nil { + err = rv.Update(vars) + if err != nil { return err } switch ns.Mode() { case paused: case normal: - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } case burst: log.Log(logger.Info, pkg+"Starting burst...") - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) log.Log(logger.Info, pkg+"Stopping burst...") - if err = rv.Stop(); err != nil { + err = rv.Stop() + if err != nil { return err } ns.SetMode(paused, &vs) From 4dcbd904499c4b51516cba0c90f3eebeef94df53 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:40:08 +1030 Subject: [PATCH 11/14] cmd/revid-cli: removed another closed scope condition --- cmd/revid-cli/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 52bb283b..f37144a2 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -278,7 +278,8 @@ func run(cfg revid.Config) error { // If mode on netreceiver isn't paused then we can start revid. if ns.Mode() != paused && ns.Mode() != burst { - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } } From ea8572a777a90b90bb3e19564402426337b73de5 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:45:15 +1030 Subject: [PATCH 12/14] cmd/revid-cli: catching error in conversion of mp --- cmd/revid-cli/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index f37144a2..1c2b9c90 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -337,7 +337,10 @@ func run(cfg revid.Config) error { ns.SetMode(paused, &vs) } sleep: - sleepTime, _ := strconv.Atoi(ns.Param("mp")) + sleepTime, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + return err + } time.Sleep(time.Duration(sleepTime) * time.Second) } } From a4d179039b4086facc14cb5eb6b719d09bc0873c Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:49:05 +1030 Subject: [PATCH 13/14] revid/revid.go: removed default case in switch with revid.Update() --- revid/revid.go | 1 - 1 file changed, 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index a3c10b66..8c4b57ed 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -486,7 +486,6 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.BurstPeriod = uint(v) - default: } } From de4f471201bc370186924b21e955884066e1c1d3 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:50:21 +1030 Subject: [PATCH 14/14] revid/revid.go: defer r.wg.Done() in revid.outputClips routine --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 8c4b57ed..02656d03 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -495,6 +495,7 @@ func (r *Revid) Update(vars map[string]string) error { // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { + defer r.wg.Done() lastTime := time.Now() var count int loop: @@ -583,7 +584,6 @@ loop: r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } - r.wg.Done() } // startRaspivid sets up things for input from raspivid i.e. starts