diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 8c7f8980..a228312e 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -1,6 +1,6 @@ /* NAME - RevidCLI.go + revid-cli - command line interface for Revid. DESCRIPTION See Readme.md @@ -10,7 +10,7 @@ AUTHORS Jack Richardson LICENSE - RevidCLI.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + revid-cli is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -80,13 +80,12 @@ const ( netSendRetryTime = 5 * time.Second defaultRunDuration = 24 * time.Hour revidStopTime = 5 * time.Second - prepTime = 20 * time.Second ) // Globals var ( - revidInst *revid.Revid - config revid.Config + rv *revid.Revid + config revid.Config ) func main() { @@ -241,35 +240,43 @@ func main() { config.Timeout = *configFlags[timeoutPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] - var ns netsender.Sender - var vs int - paused := false - - if *useNetsender { - // initialize NetSender and use NetSender's logger - config.Logger = netsender.Logger() - config.Logger.Log(progName, "Info", "Running in NetSender mode") - err := ns.Init(nil, nil, nil) - if err != nil { - config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists. - os.Exit(1) - } - vars, _ := ns.Vars() - if vars["mode"] == "Paused" { - paused = true - } - } else { - // alternatively, instantiate our own logger + if !*useNetsender { + // instantiate our own logger config.Logger = smartlogger.New(loggerVerbosity, smartlogger.File, "/var/log/netsender/") + // run revid for the specified duration + startRevid(nil) + time.Sleep(*runDurationPtr) + stopRevid() + return } - if !paused { - startRevid() + err = run() + if err != nil { + config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists. + os.Exit(1) + } +} + +// initialize then run the main NetSender client +func run() error { + // initialize NetSender and use NetSender's logger + config.Logger = netsender.Logger() + config.Logger.Log(progName, "Info", "Running in NetSender mode") + + var ns netsender.Sender + err := ns.Init(nil, nil, nil) + if err != nil { + return err + } + vars, _ := ns.Vars() + paused := false + if vars["mode"] == "Paused" { + paused = true } - // loop in NetSender mode - for *useNetsender { - if err := sendTo(&ns); err != nil { + var vs int + for { + if err := send(&ns); err != nil { config.Logger.Log(progName, "Warning", err.Error()) time.Sleep(netSendRetryTime) continue @@ -291,7 +298,10 @@ func main() { paused = true } } else { - updateRevid(vars, !paused) + err = updateRevid(&ns, vars, !paused) + if err != nil { + return err + } if paused { paused = false } @@ -300,21 +310,16 @@ func main() { sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } - - // If we're not running a netsender session then we run revid for the amount - // of time the user defined or the default 2 days - time.Sleep(*runDurationPtr) - stopRevid() } -// sendTo implements our main NetSender client and handles NetReceiver configuration +// send implements our main NetSender client and handles NetReceiver configuration // (as distinct from httpSender which just sends video data). -func sendTo(ns *netsender.Sender) error { +func send(ns *netsender.Sender) error { // populate input values, if any inputs := netsender.MakePins(ns.Param("ip"), "X") for i, pin := range inputs { if pin.Name == "X23" { - inputs[i].Value = revidInst.Bitrate() + inputs[i].Value = rv.Bitrate() } } @@ -329,34 +334,24 @@ func sendTo(ns *netsender.Sender) error { } // wrappers for stopping and starting revid -func startRevid() { - createRevidInstance() - revidInst.Start() -} - -func createRevidInstance() { - // Try to create the revid instance with the given config - var err error - for revidInst, err = revid.New(config); err != nil; { - // If the config does have a logger, use it to output error, otherwise - // just output to std output - if config.Logger != nil { - config.Logger.Log(progName, "FATAL ERROR", err.Error()) - } else { - fmt.Printf("FATAL ERROR: %v", err.Error()) - } +func startRevid(ns *netsender.Sender) (err error) { + rv, err = revid.New(config, ns) + if err != nil { + return err } + rv.Start() + return nil } func stopRevid() { - revidInst.Stop() + rv.Stop() // FIXME(kortschak): Is this waiting on completion of work? // Use a wait group and Wait method if it is. time.Sleep(revidStopTime) } -func updateRevid(vars map[string]string, stop bool) { +func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error { if stop { stopRevid() } @@ -375,7 +370,7 @@ func updateRevid(vars map[string]string, stop bool) { case "FfmpegRtmp": config.Output = revid.FfmpegRtmp default: - revidInst.Log(revid.Warning, "Invalid Output param: "+value) + rv.Log(revid.Warning, "Invalid Output param: "+value) continue } case "FramesPerClip": @@ -383,7 +378,7 @@ func updateRevid(vars map[string]string, stop bool) { if fpc > 0 && err == nil { config.FramesPerClip = fpc } else { - revidInst.Log(revid.Warning, "Invalid FramesPerClip param: "+value) + rv.Log(revid.Warning, "Invalid FramesPerClip param: "+value) } case "RtmpUrl": config.RtmpUrl = value @@ -392,7 +387,7 @@ func updateRevid(vars map[string]string, stop bool) { if asInt > 0 && err == nil { config.Bitrate = value } else { - revidInst.Log(revid.Warning, "Invalid Bitrate param: "+value) + rv.Log(revid.Warning, "Invalid Bitrate param: "+value) } case "OutputFileName": config.OutputFileName = value @@ -403,21 +398,21 @@ func updateRevid(vars map[string]string, stop bool) { if asInt > 0 && err == nil { config.Height = value } else { - revidInst.Log(revid.Warning, "Invalid Height param: "+value) + rv.Log(revid.Warning, "Invalid Height param: "+value) } case "Width": asInt, err := strconv.Atoi(value) if asInt > 0 && err == nil { config.Width = value } else { - revidInst.Log(revid.Warning, "Invalid Width param: "+value) + rv.Log(revid.Warning, "Invalid Width param: "+value) } case "FrameRate": asInt, err := strconv.Atoi(value) if asInt > 0 && err == nil { config.FrameRate = value } else { - revidInst.Log(revid.Warning, "Invalid FrameRate param: "+value) + rv.Log(revid.Warning, "Invalid FrameRate param: "+value) } case "HttpAddress": config.HttpAddress = value @@ -426,7 +421,7 @@ func updateRevid(vars map[string]string, stop bool) { if asInt > 0 && err == nil { config.Quantization = value } else { - revidInst.Log(revid.Warning, "Invalid Quantization param: "+value) + rv.Log(revid.Warning, "Invalid Quantization param: "+value) } case "Timeout": asInt, err := strconv.Atoi(value) @@ -445,7 +440,7 @@ func updateRevid(vars map[string]string, stop bool) { case "No": config.HorizontalFlip = revid.No default: - revidInst.Log(revid.Warning, "Invalid HorizontalFlip param: "+value) + rv.Log(revid.Warning, "Invalid HorizontalFlip param: "+value) } case "VerticalFlip": switch value { @@ -454,14 +449,14 @@ func updateRevid(vars map[string]string, stop bool) { case "No": config.VerticalFlip = revid.No default: - revidInst.Log(revid.Warning, "Invalid VerticalFlip param: "+value) + rv.Log(revid.Warning, "Invalid VerticalFlip param: "+value) } default: if unicode.IsUpper(rune(key[0])) { - revidInst.Log(revid.Warning, "Unexpected param: "+key) + rv.Log(revid.Warning, "Unexpected param: "+key) } // else system params are lower case } } - startRevid() + return startRevid(ns) } diff --git a/revid/revid.go b/revid/revid.go index 18f5111a..8ae5388b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -42,6 +42,7 @@ import ( "bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/rtmp" + "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" ) @@ -98,12 +99,13 @@ type Revid struct { destination loadSender rtmpInst rtmp.Session bitrate int + ns *netsender.Sender } // NewRevid returns a pointer to a new Revid with the desired // configuration, and/or an error if construction of the new instant was not // successful. -func New(c Config) (*Revid, error) { +func New(c Config, ns *netsender.Sender) (*Revid, error) { var r Revid err := r.reset(c) if err != nil { @@ -111,6 +113,7 @@ func New(c Config) (*Revid, error) { } r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.outputChan = make(chan []byte, outputChanSize) + r.ns = ns return &r, nil } @@ -165,7 +168,7 @@ func (r *Revid) reset(config Config) error { } r.destination = s case Http: - r.destination = newHttpSender(config.HttpAddress, httpTimeout, r.Log) + r.destination = newHttpSender(r.ns, r.Log) } switch r.config.Input { @@ -206,21 +209,6 @@ func (r *Revid) reset(config Config) error { return nil } -// SetConfig changes the current configuration of the receiver. -func (r *Revid) SetConfig(c Config) error { - // FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go. - // The implementation in the command is used and this is not. - // Decide on one or the other. - - r.Stop() - r, err := New(c) - if err != nil { - return err - } - r.Start() - return nil -} - // Log takes a logtype and message and tries to send this information to the // logger provided in the revid config - if there is one, otherwise the message // is sent to stdout @@ -247,18 +235,18 @@ func (r *Revid) Start() { r.Log(Warning, "Revid.Start() called but revid already running!") return } - r.Log(Info, "Starting Revid!") - r.Log(Debug, "Setting up output!") + r.Log(Info, "Starting Revid") + r.Log(Debug, "Setting up output") r.isRunning = true - r.Log(Info, "Starting output routine!") + r.Log(Info, "Starting output routine") go r.outputClips() - r.Log(Info, "Starting clip packing routine!") + r.Log(Info, "Starting clip packing routine") go r.packClips() - r.Log(Info, "Starting packetisation generator!") + r.Log(Info, "Starting packetisation generator") r.generator.Start() - r.Log(Info, "Starting parser!") + r.Log(Info, "Starting parser") r.parser.Start() - r.Log(Info, "Setting up input and receiving content!") + r.Log(Info, "Setting up input and receiving content") go r.setupInput() } diff --git a/revid/senders.go b/revid/senders.go index f11e4588..eb1dd371 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -32,7 +32,6 @@ import ( "io" "os" "os/exec" - "time" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/iot/pi/netsender" @@ -108,11 +107,9 @@ type httpSender struct { chunk *ring.Chunk } -func newHttpSender(_ string, _ time.Duration, log func(lvl, msg string)) *httpSender { - var client netsender.Sender - client.Init(nil, nil, nil) +func newHttpSender(ns *netsender.Sender, log func(lvl, msg string)) *httpSender { return &httpSender{ - client: &client, + client: ns, log: log, } } @@ -129,11 +126,23 @@ func (s *httpSender) send() error { // if the chunk has been cleared. return nil } - pins := netsender.MakePins("V0", "") - pins[0].Value = s.chunk.Len() - pins[0].Data = s.chunk.Bytes() - pins[0].MimeType = "video/mp2t" - _, _, err := s.client.Send(netsender.RequestPoll, pins) + // Only send if "V0" is configured as an input. + send := false + ip := s.client.Param("ip") + pins := netsender.MakePins(ip, "V") + for i, pin := range pins { + if pin.Name == "V0" { + send = true + pins[i].Value = s.chunk.Len() + pins[i].Data = s.chunk.Bytes() + pins[i].MimeType = "video/mp2t" + break + } + } + var err error + if send { + _, _, err = s.client.Send(netsender.RequestPoll, pins) + } // We will not retry, so release // the chunk and clear it now. s.chunk.Close()