From 1645b759f1c74a72ccb189ced7c66b24a247f221 Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Tue, 26 Jun 2018 15:53:55 +0930 Subject: [PATCH 1/2] Added ns to Revid struct, so it could be exposed as a httpSender, and refactored revid-cli accordingly. --- cmd/revid-cli/main.go | 124 ++++++++++++++++++++---------------------- revid/revid.go | 24 ++------ revid/senders.go | 30 ++++++---- 3 files changed, 85 insertions(+), 93 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 8c7f8980..fe7baaaa 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -85,8 +85,8 @@ const ( // Globals var ( - revidInst *revid.Revid - config revid.Config + rv *revid.Revid + config revid.Config ) func main() { @@ -241,35 +241,43 @@ func main() { config.Timeout = *configFlags[timeoutPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] - var ns netsender.Sender - var vs int - paused := false - - if *useNetsender { - // initialize NetSender and use NetSender's logger - config.Logger = netsender.Logger() - config.Logger.Log(progName, "Info", "Running in NetSender mode") - err := ns.Init(nil, nil, nil) - if err != nil { - config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists. - os.Exit(1) - } - vars, _ := ns.Vars() - if vars["mode"] == "Paused" { - paused = true - } - } else { - // alternatively, instantiate our own logger + if !*useNetsender { + // instantiate our own logger config.Logger = smartlogger.New(loggerVerbosity, smartlogger.File, "/var/log/netsender/") + // run revid for the specified duration + startRevid(nil) + time.Sleep(*runDurationPtr) + stopRevid() + return } - if !paused { - startRevid() + err = run() + if err != nil { + config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists. + os.Exit(1) + } +} + +// initialize then run the main NetSender client +func run() error { + // initialize NetSender and use NetSender's logger + config.Logger = netsender.Logger() + config.Logger.Log(progName, "Info", "Running in NetSender mode") + + var ns netsender.Sender + err := ns.Init(nil, nil, nil) + if err != nil { + return err + } + vars, _ := ns.Vars() + paused := false + if vars["mode"] == "Paused" { + paused = true } - // loop in NetSender mode - for *useNetsender { - if err := sendTo(&ns); err != nil { + var vs int + for { + if err := send(&ns); err != nil { config.Logger.Log(progName, "Warning", err.Error()) time.Sleep(netSendRetryTime) continue @@ -291,7 +299,10 @@ func main() { paused = true } } else { - updateRevid(vars, !paused) + err = updateRevid(&ns, vars, !paused) + if err != nil { + return err + } if paused { paused = false } @@ -300,21 +311,16 @@ func main() { sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } - - // If we're not running a netsender session then we run revid for the amount - // of time the user defined or the default 2 days - time.Sleep(*runDurationPtr) - stopRevid() } -// sendTo implements our main NetSender client and handles NetReceiver configuration +// send implements our main NetSender client and handles NetReceiver configuration // (as distinct from httpSender which just sends video data). -func sendTo(ns *netsender.Sender) error { +func send(ns *netsender.Sender) error { // populate input values, if any inputs := netsender.MakePins(ns.Param("ip"), "X") for i, pin := range inputs { if pin.Name == "X23" { - inputs[i].Value = revidInst.Bitrate() + inputs[i].Value = rv.Bitrate() } } @@ -329,34 +335,24 @@ func sendTo(ns *netsender.Sender) error { } // wrappers for stopping and starting revid -func startRevid() { - createRevidInstance() - revidInst.Start() -} - -func createRevidInstance() { - // Try to create the revid instance with the given config - var err error - for revidInst, err = revid.New(config); err != nil; { - // If the config does have a logger, use it to output error, otherwise - // just output to std output - if config.Logger != nil { - config.Logger.Log(progName, "FATAL ERROR", err.Error()) - } else { - fmt.Printf("FATAL ERROR: %v", err.Error()) - } +func startRevid(ns *netsender.Sender) (err error) { + rv, err = revid.New(config, ns) + if err != nil { + return err } + rv.Start() + return nil } func stopRevid() { - revidInst.Stop() + rv.Stop() // FIXME(kortschak): Is this waiting on completion of work? // Use a wait group and Wait method if it is. time.Sleep(revidStopTime) } -func updateRevid(vars map[string]string, stop bool) { +func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error { if stop { stopRevid() } @@ -375,7 +371,7 @@ func updateRevid(vars map[string]string, stop bool) { case "FfmpegRtmp": config.Output = revid.FfmpegRtmp default: - revidInst.Log(revid.Warning, "Invalid Output param: "+value) + rv.Log(revid.Warning, "Invalid Output param: "+value) continue } case "FramesPerClip": @@ -383,7 +379,7 @@ func updateRevid(vars map[string]string, stop bool) { if fpc > 0 && err == nil { config.FramesPerClip = fpc } else { - revidInst.Log(revid.Warning, "Invalid FramesPerClip param: "+value) + rv.Log(revid.Warning, "Invalid FramesPerClip param: "+value) } case "RtmpUrl": config.RtmpUrl = value @@ -392,7 +388,7 @@ func updateRevid(vars map[string]string, stop bool) { if asInt > 0 && err == nil { config.Bitrate = value } else { - revidInst.Log(revid.Warning, "Invalid Bitrate param: "+value) + rv.Log(revid.Warning, "Invalid Bitrate param: "+value) } case "OutputFileName": config.OutputFileName = value @@ -403,21 +399,21 @@ func updateRevid(vars map[string]string, stop bool) { if asInt > 0 && err == nil { config.Height = value } else { - revidInst.Log(revid.Warning, "Invalid Height param: "+value) + rv.Log(revid.Warning, "Invalid Height param: "+value) } case "Width": asInt, err := strconv.Atoi(value) if asInt > 0 && err == nil { config.Width = value } else { - revidInst.Log(revid.Warning, "Invalid Width param: "+value) + rv.Log(revid.Warning, "Invalid Width param: "+value) } case "FrameRate": asInt, err := strconv.Atoi(value) if asInt > 0 && err == nil { config.FrameRate = value } else { - revidInst.Log(revid.Warning, "Invalid FrameRate param: "+value) + rv.Log(revid.Warning, "Invalid FrameRate param: "+value) } case "HttpAddress": config.HttpAddress = value @@ -426,7 +422,7 @@ func updateRevid(vars map[string]string, stop bool) { if asInt > 0 && err == nil { config.Quantization = value } else { - revidInst.Log(revid.Warning, "Invalid Quantization param: "+value) + rv.Log(revid.Warning, "Invalid Quantization param: "+value) } case "Timeout": asInt, err := strconv.Atoi(value) @@ -445,7 +441,7 @@ func updateRevid(vars map[string]string, stop bool) { case "No": config.HorizontalFlip = revid.No default: - revidInst.Log(revid.Warning, "Invalid HorizontalFlip param: "+value) + rv.Log(revid.Warning, "Invalid HorizontalFlip param: "+value) } case "VerticalFlip": switch value { @@ -454,14 +450,14 @@ func updateRevid(vars map[string]string, stop bool) { case "No": config.VerticalFlip = revid.No default: - revidInst.Log(revid.Warning, "Invalid VerticalFlip param: "+value) + rv.Log(revid.Warning, "Invalid VerticalFlip param: "+value) } default: if unicode.IsUpper(rune(key[0])) { - revidInst.Log(revid.Warning, "Unexpected param: "+key) + rv.Log(revid.Warning, "Unexpected param: "+key) } // else system params are lower case } } - startRevid() + return startRevid(ns) } diff --git a/revid/revid.go b/revid/revid.go index 18f5111a..86b42afa 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,6 +43,7 @@ import ( "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/utils/ring" + "bitbucket.org/ausocean/iot/pi/netsender" ) // Misc constants @@ -98,12 +99,13 @@ type Revid struct { destination loadSender rtmpInst rtmp.Session bitrate int + ns *netsender.Sender } // NewRevid returns a pointer to a new Revid with the desired // configuration, and/or an error if construction of the new instant was not // successful. -func New(c Config) (*Revid, error) { +func New(c Config, ns *netsender.Sender) (*Revid, error) { var r Revid err := r.reset(c) if err != nil { @@ -111,10 +113,11 @@ func New(c Config) (*Revid, error) { } r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.outputChan = make(chan []byte, outputChanSize) + r.ns = ns return &r, nil } -// Bitrate returns the result of the most recent bitrate check. +// Bitrate returns the resbult of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate } @@ -165,7 +168,7 @@ func (r *Revid) reset(config Config) error { } r.destination = s case Http: - r.destination = newHttpSender(config.HttpAddress, httpTimeout, r.Log) + r.destination = newHttpSender(r.ns, r.Log) } switch r.config.Input { @@ -206,21 +209,6 @@ func (r *Revid) reset(config Config) error { return nil } -// SetConfig changes the current configuration of the receiver. -func (r *Revid) SetConfig(c Config) error { - // FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go. - // The implementation in the command is used and this is not. - // Decide on one or the other. - - r.Stop() - r, err := New(c) - if err != nil { - return err - } - r.Start() - return nil -} - // Log takes a logtype and message and tries to send this information to the // logger provided in the revid config - if there is one, otherwise the message // is sent to stdout diff --git a/revid/senders.go b/revid/senders.go index f11e4588..68b5e940 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -32,7 +32,6 @@ import ( "io" "os" "os/exec" - "time" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/iot/pi/netsender" @@ -108,11 +107,9 @@ type httpSender struct { chunk *ring.Chunk } -func newHttpSender(_ string, _ time.Duration, log func(lvl, msg string)) *httpSender { - var client netsender.Sender - client.Init(nil, nil, nil) +func newHttpSender(ns *netsender.Sender, log func(lvl, msg string)) *httpSender { return &httpSender{ - client: &client, + client: ns, log: log, } } @@ -122,18 +119,29 @@ func (s *httpSender) load(c *ring.Chunk) error { return nil } -func (s *httpSender) send() error { +func (s *httpSender) send() (err error) { if s.chunk == nil { // Do not retry with httpSender, // so just return without error // if the chunk has been cleared. return nil } - pins := netsender.MakePins("V0", "") - pins[0].Value = s.chunk.Len() - pins[0].Data = s.chunk.Bytes() - pins[0].MimeType = "video/mp2t" - _, _, err := s.client.Send(netsender.RequestPoll, pins) + // Only send if "V0" is configured as an input. + send := false + ip := s.client.Param("ip") + pins := netsender.MakePins(ip, "V") + for i, pin := range pins { + if pin.Name == "V0" { + send = true + pins[i].Value = s.chunk.Len() + pins[i].Data = s.chunk.Bytes() + pins[i].MimeType = "video/mp2t" + break + } + } + if send { + _, _, err = s.client.Send(netsender.RequestPoll, pins) + } // We will not retry, so release // the chunk and clear it now. s.chunk.Close() From 99ef1d0727ac6246c9707017c23c8682519dee87 Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Wed, 27 Jun 2018 07:50:05 +0930 Subject: [PATCH 2/2] PR feedback and some other clean-up. --- cmd/revid-cli/main.go | 5 ++--- revid/revid.go | 18 +++++++++--------- revid/senders.go | 3 ++- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index fe7baaaa..a228312e 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -1,6 +1,6 @@ /* NAME - RevidCLI.go + revid-cli - command line interface for Revid. DESCRIPTION See Readme.md @@ -10,7 +10,7 @@ AUTHORS Jack Richardson LICENSE - RevidCLI.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + revid-cli is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -80,7 +80,6 @@ const ( netSendRetryTime = 5 * time.Second defaultRunDuration = 24 * time.Hour revidStopTime = 5 * time.Second - prepTime = 20 * time.Second ) // Globals diff --git a/revid/revid.go b/revid/revid.go index 86b42afa..8ae5388b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -42,8 +42,8 @@ import ( "bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/rtmp" - "bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/ring" ) // Misc constants @@ -117,7 +117,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } -// Bitrate returns the resbult of the most recent bitrate check. +// Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate } @@ -235,18 +235,18 @@ func (r *Revid) Start() { r.Log(Warning, "Revid.Start() called but revid already running!") return } - r.Log(Info, "Starting Revid!") - r.Log(Debug, "Setting up output!") + r.Log(Info, "Starting Revid") + r.Log(Debug, "Setting up output") r.isRunning = true - r.Log(Info, "Starting output routine!") + r.Log(Info, "Starting output routine") go r.outputClips() - r.Log(Info, "Starting clip packing routine!") + r.Log(Info, "Starting clip packing routine") go r.packClips() - r.Log(Info, "Starting packetisation generator!") + r.Log(Info, "Starting packetisation generator") r.generator.Start() - r.Log(Info, "Starting parser!") + r.Log(Info, "Starting parser") r.parser.Start() - r.Log(Info, "Setting up input and receiving content!") + r.Log(Info, "Setting up input and receiving content") go r.setupInput() } diff --git a/revid/senders.go b/revid/senders.go index 68b5e940..eb1dd371 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -119,7 +119,7 @@ func (s *httpSender) load(c *ring.Chunk) error { return nil } -func (s *httpSender) send() (err error) { +func (s *httpSender) send() error { if s.chunk == nil { // Do not retry with httpSender, // so just return without error @@ -139,6 +139,7 @@ func (s *httpSender) send() (err error) { break } } + var err error if send { _, _, err = s.client.Send(netsender.RequestPoll, pins) }