Merged in netsender-sharing (pull request #36)

Added ns to Revid struct, so it could be exposed as a httpSender, and refactored revid-cli accordingly.

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Alan Noble 2018-06-27 02:05:17 +00:00
commit 5ff305e71a
3 changed files with 93 additions and 101 deletions

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
RevidCLI.go revid-cli - command line interface for Revid.
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
@ -10,7 +10,7 @@ AUTHORS
Jack Richardson <jack@ausocean.org> Jack Richardson <jack@ausocean.org>
LICENSE LICENSE
RevidCLI.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) revid-cli is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -80,12 +80,11 @@ const (
netSendRetryTime = 5 * time.Second netSendRetryTime = 5 * time.Second
defaultRunDuration = 24 * time.Hour defaultRunDuration = 24 * time.Hour
revidStopTime = 5 * time.Second revidStopTime = 5 * time.Second
prepTime = 20 * time.Second
) )
// Globals // Globals
var ( var (
revidInst *revid.Revid rv *revid.Revid
config revid.Config config revid.Config
) )
@ -241,35 +240,43 @@ func main() {
config.Timeout = *configFlags[timeoutPtr] config.Timeout = *configFlags[timeoutPtr]
config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr]
var ns netsender.Sender if !*useNetsender {
var vs int // instantiate our own logger
paused := false config.Logger = smartlogger.New(loggerVerbosity, smartlogger.File, "/var/log/netsender/")
// run revid for the specified duration
startRevid(nil)
time.Sleep(*runDurationPtr)
stopRevid()
return
}
if *useNetsender { err = run()
// initialize NetSender and use NetSender's logger
config.Logger = netsender.Logger()
config.Logger.Log(progName, "Info", "Running in NetSender mode")
err := ns.Init(nil, nil, nil)
if err != nil { if err != nil {
config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists. config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists.
os.Exit(1) os.Exit(1)
} }
}
// initialize then run the main NetSender client
func run() error {
// initialize NetSender and use NetSender's logger
config.Logger = netsender.Logger()
config.Logger.Log(progName, "Info", "Running in NetSender mode")
var ns netsender.Sender
err := ns.Init(nil, nil, nil)
if err != nil {
return err
}
vars, _ := ns.Vars() vars, _ := ns.Vars()
paused := false
if vars["mode"] == "Paused" { if vars["mode"] == "Paused" {
paused = true paused = true
} }
} else {
// alternatively, instantiate our own logger
config.Logger = smartlogger.New(loggerVerbosity, smartlogger.File, "/var/log/netsender/")
}
if !paused { var vs int
startRevid() for {
} if err := send(&ns); err != nil {
// loop in NetSender mode
for *useNetsender {
if err := sendTo(&ns); err != nil {
config.Logger.Log(progName, "Warning", err.Error()) config.Logger.Log(progName, "Warning", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
@ -291,7 +298,10 @@ func main() {
paused = true paused = true
} }
} else { } else {
updateRevid(vars, !paused) err = updateRevid(&ns, vars, !paused)
if err != nil {
return err
}
if paused { if paused {
paused = false paused = false
} }
@ -300,21 +310,16 @@ func main() {
sleepTime, _ := strconv.Atoi(ns.Param("mp")) sleepTime, _ := strconv.Atoi(ns.Param("mp"))
time.Sleep(time.Duration(sleepTime) * time.Second) time.Sleep(time.Duration(sleepTime) * time.Second)
} }
// If we're not running a netsender session then we run revid for the amount
// of time the user defined or the default 2 days
time.Sleep(*runDurationPtr)
stopRevid()
} }
// sendTo implements our main NetSender client and handles NetReceiver configuration // send implements our main NetSender client and handles NetReceiver configuration
// (as distinct from httpSender which just sends video data). // (as distinct from httpSender which just sends video data).
func sendTo(ns *netsender.Sender) error { func send(ns *netsender.Sender) error {
// populate input values, if any // populate input values, if any
inputs := netsender.MakePins(ns.Param("ip"), "X") inputs := netsender.MakePins(ns.Param("ip"), "X")
for i, pin := range inputs { for i, pin := range inputs {
if pin.Name == "X23" { if pin.Name == "X23" {
inputs[i].Value = revidInst.Bitrate() inputs[i].Value = rv.Bitrate()
} }
} }
@ -329,34 +334,24 @@ func sendTo(ns *netsender.Sender) error {
} }
// wrappers for stopping and starting revid // wrappers for stopping and starting revid
func startRevid() { func startRevid(ns *netsender.Sender) (err error) {
createRevidInstance() rv, err = revid.New(config, ns)
revidInst.Start() if err != nil {
} return err
func createRevidInstance() {
// Try to create the revid instance with the given config
var err error
for revidInst, err = revid.New(config); err != nil; {
// If the config does have a logger, use it to output error, otherwise
// just output to std output
if config.Logger != nil {
config.Logger.Log(progName, "FATAL ERROR", err.Error())
} else {
fmt.Printf("FATAL ERROR: %v", err.Error())
}
} }
rv.Start()
return nil
} }
func stopRevid() { func stopRevid() {
revidInst.Stop() rv.Stop()
// FIXME(kortschak): Is this waiting on completion of work? // FIXME(kortschak): Is this waiting on completion of work?
// Use a wait group and Wait method if it is. // Use a wait group and Wait method if it is.
time.Sleep(revidStopTime) time.Sleep(revidStopTime)
} }
func updateRevid(vars map[string]string, stop bool) { func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error {
if stop { if stop {
stopRevid() stopRevid()
} }
@ -375,7 +370,7 @@ func updateRevid(vars map[string]string, stop bool) {
case "FfmpegRtmp": case "FfmpegRtmp":
config.Output = revid.FfmpegRtmp config.Output = revid.FfmpegRtmp
default: default:
revidInst.Log(revid.Warning, "Invalid Output param: "+value) rv.Log(revid.Warning, "Invalid Output param: "+value)
continue continue
} }
case "FramesPerClip": case "FramesPerClip":
@ -383,7 +378,7 @@ func updateRevid(vars map[string]string, stop bool) {
if fpc > 0 && err == nil { if fpc > 0 && err == nil {
config.FramesPerClip = fpc config.FramesPerClip = fpc
} else { } else {
revidInst.Log(revid.Warning, "Invalid FramesPerClip param: "+value) rv.Log(revid.Warning, "Invalid FramesPerClip param: "+value)
} }
case "RtmpUrl": case "RtmpUrl":
config.RtmpUrl = value config.RtmpUrl = value
@ -392,7 +387,7 @@ func updateRevid(vars map[string]string, stop bool) {
if asInt > 0 && err == nil { if asInt > 0 && err == nil {
config.Bitrate = value config.Bitrate = value
} else { } else {
revidInst.Log(revid.Warning, "Invalid Bitrate param: "+value) rv.Log(revid.Warning, "Invalid Bitrate param: "+value)
} }
case "OutputFileName": case "OutputFileName":
config.OutputFileName = value config.OutputFileName = value
@ -403,21 +398,21 @@ func updateRevid(vars map[string]string, stop bool) {
if asInt > 0 && err == nil { if asInt > 0 && err == nil {
config.Height = value config.Height = value
} else { } else {
revidInst.Log(revid.Warning, "Invalid Height param: "+value) rv.Log(revid.Warning, "Invalid Height param: "+value)
} }
case "Width": case "Width":
asInt, err := strconv.Atoi(value) asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil { if asInt > 0 && err == nil {
config.Width = value config.Width = value
} else { } else {
revidInst.Log(revid.Warning, "Invalid Width param: "+value) rv.Log(revid.Warning, "Invalid Width param: "+value)
} }
case "FrameRate": case "FrameRate":
asInt, err := strconv.Atoi(value) asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil { if asInt > 0 && err == nil {
config.FrameRate = value config.FrameRate = value
} else { } else {
revidInst.Log(revid.Warning, "Invalid FrameRate param: "+value) rv.Log(revid.Warning, "Invalid FrameRate param: "+value)
} }
case "HttpAddress": case "HttpAddress":
config.HttpAddress = value config.HttpAddress = value
@ -426,7 +421,7 @@ func updateRevid(vars map[string]string, stop bool) {
if asInt > 0 && err == nil { if asInt > 0 && err == nil {
config.Quantization = value config.Quantization = value
} else { } else {
revidInst.Log(revid.Warning, "Invalid Quantization param: "+value) rv.Log(revid.Warning, "Invalid Quantization param: "+value)
} }
case "Timeout": case "Timeout":
asInt, err := strconv.Atoi(value) asInt, err := strconv.Atoi(value)
@ -445,7 +440,7 @@ func updateRevid(vars map[string]string, stop bool) {
case "No": case "No":
config.HorizontalFlip = revid.No config.HorizontalFlip = revid.No
default: default:
revidInst.Log(revid.Warning, "Invalid HorizontalFlip param: "+value) rv.Log(revid.Warning, "Invalid HorizontalFlip param: "+value)
} }
case "VerticalFlip": case "VerticalFlip":
switch value { switch value {
@ -454,14 +449,14 @@ func updateRevid(vars map[string]string, stop bool) {
case "No": case "No":
config.VerticalFlip = revid.No config.VerticalFlip = revid.No
default: default:
revidInst.Log(revid.Warning, "Invalid VerticalFlip param: "+value) rv.Log(revid.Warning, "Invalid VerticalFlip param: "+value)
} }
default: default:
if unicode.IsUpper(rune(key[0])) { if unicode.IsUpper(rune(key[0])) {
revidInst.Log(revid.Warning, "Unexpected param: "+key) rv.Log(revid.Warning, "Unexpected param: "+key)
} // else system params are lower case } // else system params are lower case
} }
} }
startRevid() return startRevid(ns)
} }

View File

@ -42,6 +42,7 @@ import (
"bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/generator"
"bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/parser"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
) )
@ -98,12 +99,13 @@ type Revid struct {
destination loadSender destination loadSender
rtmpInst rtmp.Session rtmpInst rtmp.Session
bitrate int bitrate int
ns *netsender.Sender
} }
// NewRevid returns a pointer to a new Revid with the desired // NewRevid returns a pointer to a new Revid with the desired
// configuration, and/or an error if construction of the new instant was not // configuration, and/or an error if construction of the new instant was not
// successful. // successful.
func New(c Config) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
var r Revid var r Revid
err := r.reset(c) err := r.reset(c)
if err != nil { if err != nil {
@ -111,6 +113,7 @@ func New(c Config) (*Revid, error) {
} }
r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.outputChan = make(chan []byte, outputChanSize) r.outputChan = make(chan []byte, outputChanSize)
r.ns = ns
return &r, nil return &r, nil
} }
@ -165,7 +168,7 @@ func (r *Revid) reset(config Config) error {
} }
r.destination = s r.destination = s
case Http: case Http:
r.destination = newHttpSender(config.HttpAddress, httpTimeout, r.Log) r.destination = newHttpSender(r.ns, r.Log)
} }
switch r.config.Input { switch r.config.Input {
@ -206,21 +209,6 @@ func (r *Revid) reset(config Config) error {
return nil return nil
} }
// SetConfig changes the current configuration of the receiver.
func (r *Revid) SetConfig(c Config) error {
// FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go.
// The implementation in the command is used and this is not.
// Decide on one or the other.
r.Stop()
r, err := New(c)
if err != nil {
return err
}
r.Start()
return nil
}
// Log takes a logtype and message and tries to send this information to the // Log takes a logtype and message and tries to send this information to the
// logger provided in the revid config - if there is one, otherwise the message // logger provided in the revid config - if there is one, otherwise the message
// is sent to stdout // is sent to stdout
@ -247,18 +235,18 @@ func (r *Revid) Start() {
r.Log(Warning, "Revid.Start() called but revid already running!") r.Log(Warning, "Revid.Start() called but revid already running!")
return return
} }
r.Log(Info, "Starting Revid!") r.Log(Info, "Starting Revid")
r.Log(Debug, "Setting up output!") r.Log(Debug, "Setting up output")
r.isRunning = true r.isRunning = true
r.Log(Info, "Starting output routine!") r.Log(Info, "Starting output routine")
go r.outputClips() go r.outputClips()
r.Log(Info, "Starting clip packing routine!") r.Log(Info, "Starting clip packing routine")
go r.packClips() go r.packClips()
r.Log(Info, "Starting packetisation generator!") r.Log(Info, "Starting packetisation generator")
r.generator.Start() r.generator.Start()
r.Log(Info, "Starting parser!") r.Log(Info, "Starting parser")
r.parser.Start() r.parser.Start()
r.Log(Info, "Setting up input and receiving content!") r.Log(Info, "Setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }

View File

@ -32,7 +32,6 @@ import (
"io" "io"
"os" "os"
"os/exec" "os/exec"
"time"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
@ -108,11 +107,9 @@ type httpSender struct {
chunk *ring.Chunk chunk *ring.Chunk
} }
func newHttpSender(_ string, _ time.Duration, log func(lvl, msg string)) *httpSender { func newHttpSender(ns *netsender.Sender, log func(lvl, msg string)) *httpSender {
var client netsender.Sender
client.Init(nil, nil, nil)
return &httpSender{ return &httpSender{
client: &client, client: ns,
log: log, log: log,
} }
} }
@ -129,11 +126,23 @@ func (s *httpSender) send() error {
// if the chunk has been cleared. // if the chunk has been cleared.
return nil return nil
} }
pins := netsender.MakePins("V0", "") // Only send if "V0" is configured as an input.
pins[0].Value = s.chunk.Len() send := false
pins[0].Data = s.chunk.Bytes() ip := s.client.Param("ip")
pins[0].MimeType = "video/mp2t" pins := netsender.MakePins(ip, "V")
_, _, err := s.client.Send(netsender.RequestPoll, pins) for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = s.chunk.Len()
pins[i].Data = s.chunk.Bytes()
pins[i].MimeType = "video/mp2t"
break
}
}
var err error
if send {
_, _, err = s.client.Send(netsender.RequestPoll, pins)
}
// We will not retry, so release // We will not retry, so release
// the chunk and clear it now. // the chunk and clear it now.
s.chunk.Close() s.chunk.Close()