Added ns to Revid struct, so it could be exposed as a httpSender, and refactored revid-cli accordingly.

This commit is contained in:
Alan Noble 2018-06-26 15:53:55 +09:30
parent 62147e160d
commit 1645b759f1
3 changed files with 85 additions and 93 deletions

View File

@ -85,8 +85,8 @@ const (
// Globals
var (
revidInst *revid.Revid
config revid.Config
rv *revid.Revid
config revid.Config
)
func main() {
@ -241,35 +241,43 @@ func main() {
config.Timeout = *configFlags[timeoutPtr]
config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr]
var ns netsender.Sender
var vs int
paused := false
if *useNetsender {
// initialize NetSender and use NetSender's logger
config.Logger = netsender.Logger()
config.Logger.Log(progName, "Info", "Running in NetSender mode")
err := ns.Init(nil, nil, nil)
if err != nil {
config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists.
os.Exit(1)
}
vars, _ := ns.Vars()
if vars["mode"] == "Paused" {
paused = true
}
} else {
// alternatively, instantiate our own logger
if !*useNetsender {
// instantiate our own logger
config.Logger = smartlogger.New(loggerVerbosity, smartlogger.File, "/var/log/netsender/")
// run revid for the specified duration
startRevid(nil)
time.Sleep(*runDurationPtr)
stopRevid()
return
}
if !paused {
startRevid()
err = run()
if err != nil {
config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists.
os.Exit(1)
}
}
// initialize then run the main NetSender client
func run() error {
// initialize NetSender and use NetSender's logger
config.Logger = netsender.Logger()
config.Logger.Log(progName, "Info", "Running in NetSender mode")
var ns netsender.Sender
err := ns.Init(nil, nil, nil)
if err != nil {
return err
}
vars, _ := ns.Vars()
paused := false
if vars["mode"] == "Paused" {
paused = true
}
// loop in NetSender mode
for *useNetsender {
if err := sendTo(&ns); err != nil {
var vs int
for {
if err := send(&ns); err != nil {
config.Logger.Log(progName, "Warning", err.Error())
time.Sleep(netSendRetryTime)
continue
@ -291,7 +299,10 @@ func main() {
paused = true
}
} else {
updateRevid(vars, !paused)
err = updateRevid(&ns, vars, !paused)
if err != nil {
return err
}
if paused {
paused = false
}
@ -300,21 +311,16 @@ func main() {
sleepTime, _ := strconv.Atoi(ns.Param("mp"))
time.Sleep(time.Duration(sleepTime) * time.Second)
}
// If we're not running a netsender session then we run revid for the amount
// of time the user defined or the default 2 days
time.Sleep(*runDurationPtr)
stopRevid()
}
// sendTo implements our main NetSender client and handles NetReceiver configuration
// send implements our main NetSender client and handles NetReceiver configuration
// (as distinct from httpSender which just sends video data).
func sendTo(ns *netsender.Sender) error {
func send(ns *netsender.Sender) error {
// populate input values, if any
inputs := netsender.MakePins(ns.Param("ip"), "X")
for i, pin := range inputs {
if pin.Name == "X23" {
inputs[i].Value = revidInst.Bitrate()
inputs[i].Value = rv.Bitrate()
}
}
@ -329,34 +335,24 @@ func sendTo(ns *netsender.Sender) error {
}
// wrappers for stopping and starting revid
func startRevid() {
createRevidInstance()
revidInst.Start()
}
func createRevidInstance() {
// Try to create the revid instance with the given config
var err error
for revidInst, err = revid.New(config); err != nil; {
// If the config does have a logger, use it to output error, otherwise
// just output to std output
if config.Logger != nil {
config.Logger.Log(progName, "FATAL ERROR", err.Error())
} else {
fmt.Printf("FATAL ERROR: %v", err.Error())
}
func startRevid(ns *netsender.Sender) (err error) {
rv, err = revid.New(config, ns)
if err != nil {
return err
}
rv.Start()
return nil
}
func stopRevid() {
revidInst.Stop()
rv.Stop()
// FIXME(kortschak): Is this waiting on completion of work?
// Use a wait group and Wait method if it is.
time.Sleep(revidStopTime)
}
func updateRevid(vars map[string]string, stop bool) {
func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error {
if stop {
stopRevid()
}
@ -375,7 +371,7 @@ func updateRevid(vars map[string]string, stop bool) {
case "FfmpegRtmp":
config.Output = revid.FfmpegRtmp
default:
revidInst.Log(revid.Warning, "Invalid Output param: "+value)
rv.Log(revid.Warning, "Invalid Output param: "+value)
continue
}
case "FramesPerClip":
@ -383,7 +379,7 @@ func updateRevid(vars map[string]string, stop bool) {
if fpc > 0 && err == nil {
config.FramesPerClip = fpc
} else {
revidInst.Log(revid.Warning, "Invalid FramesPerClip param: "+value)
rv.Log(revid.Warning, "Invalid FramesPerClip param: "+value)
}
case "RtmpUrl":
config.RtmpUrl = value
@ -392,7 +388,7 @@ func updateRevid(vars map[string]string, stop bool) {
if asInt > 0 && err == nil {
config.Bitrate = value
} else {
revidInst.Log(revid.Warning, "Invalid Bitrate param: "+value)
rv.Log(revid.Warning, "Invalid Bitrate param: "+value)
}
case "OutputFileName":
config.OutputFileName = value
@ -403,21 +399,21 @@ func updateRevid(vars map[string]string, stop bool) {
if asInt > 0 && err == nil {
config.Height = value
} else {
revidInst.Log(revid.Warning, "Invalid Height param: "+value)
rv.Log(revid.Warning, "Invalid Height param: "+value)
}
case "Width":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
config.Width = value
} else {
revidInst.Log(revid.Warning, "Invalid Width param: "+value)
rv.Log(revid.Warning, "Invalid Width param: "+value)
}
case "FrameRate":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
config.FrameRate = value
} else {
revidInst.Log(revid.Warning, "Invalid FrameRate param: "+value)
rv.Log(revid.Warning, "Invalid FrameRate param: "+value)
}
case "HttpAddress":
config.HttpAddress = value
@ -426,7 +422,7 @@ func updateRevid(vars map[string]string, stop bool) {
if asInt > 0 && err == nil {
config.Quantization = value
} else {
revidInst.Log(revid.Warning, "Invalid Quantization param: "+value)
rv.Log(revid.Warning, "Invalid Quantization param: "+value)
}
case "Timeout":
asInt, err := strconv.Atoi(value)
@ -445,7 +441,7 @@ func updateRevid(vars map[string]string, stop bool) {
case "No":
config.HorizontalFlip = revid.No
default:
revidInst.Log(revid.Warning, "Invalid HorizontalFlip param: "+value)
rv.Log(revid.Warning, "Invalid HorizontalFlip param: "+value)
}
case "VerticalFlip":
switch value {
@ -454,14 +450,14 @@ func updateRevid(vars map[string]string, stop bool) {
case "No":
config.VerticalFlip = revid.No
default:
revidInst.Log(revid.Warning, "Invalid VerticalFlip param: "+value)
rv.Log(revid.Warning, "Invalid VerticalFlip param: "+value)
}
default:
if unicode.IsUpper(rune(key[0])) {
revidInst.Log(revid.Warning, "Unexpected param: "+key)
rv.Log(revid.Warning, "Unexpected param: "+key)
} // else system params are lower case
}
}
startRevid()
return startRevid(ns)
}

View File

@ -43,6 +43,7 @@ import (
"bitbucket.org/ausocean/av/parser"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/iot/pi/netsender"
)
// Misc constants
@ -98,12 +99,13 @@ type Revid struct {
destination loadSender
rtmpInst rtmp.Session
bitrate int
ns *netsender.Sender
}
// NewRevid returns a pointer to a new Revid with the desired
// configuration, and/or an error if construction of the new instant was not
// successful.
func New(c Config) (*Revid, error) {
func New(c Config, ns *netsender.Sender) (*Revid, error) {
var r Revid
err := r.reset(c)
if err != nil {
@ -111,10 +113,11 @@ func New(c Config) (*Revid, error) {
}
r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.outputChan = make(chan []byte, outputChanSize)
r.ns = ns
return &r, nil
}
// Bitrate returns the result of the most recent bitrate check.
// Bitrate returns the resbult of the most recent bitrate check.
func (r *Revid) Bitrate() int {
return r.bitrate
}
@ -165,7 +168,7 @@ func (r *Revid) reset(config Config) error {
}
r.destination = s
case Http:
r.destination = newHttpSender(config.HttpAddress, httpTimeout, r.Log)
r.destination = newHttpSender(r.ns, r.Log)
}
switch r.config.Input {
@ -206,21 +209,6 @@ func (r *Revid) reset(config Config) error {
return nil
}
// SetConfig changes the current configuration of the receiver.
func (r *Revid) SetConfig(c Config) error {
// FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go.
// The implementation in the command is used and this is not.
// Decide on one or the other.
r.Stop()
r, err := New(c)
if err != nil {
return err
}
r.Start()
return nil
}
// Log takes a logtype and message and tries to send this information to the
// logger provided in the revid config - if there is one, otherwise the message
// is sent to stdout

View File

@ -32,7 +32,6 @@ import (
"io"
"os"
"os/exec"
"time"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/iot/pi/netsender"
@ -108,11 +107,9 @@ type httpSender struct {
chunk *ring.Chunk
}
func newHttpSender(_ string, _ time.Duration, log func(lvl, msg string)) *httpSender {
var client netsender.Sender
client.Init(nil, nil, nil)
func newHttpSender(ns *netsender.Sender, log func(lvl, msg string)) *httpSender {
return &httpSender{
client: &client,
client: ns,
log: log,
}
}
@ -122,18 +119,29 @@ func (s *httpSender) load(c *ring.Chunk) error {
return nil
}
func (s *httpSender) send() error {
func (s *httpSender) send() (err error) {
if s.chunk == nil {
// Do not retry with httpSender,
// so just return without error
// if the chunk has been cleared.
return nil
}
pins := netsender.MakePins("V0", "")
pins[0].Value = s.chunk.Len()
pins[0].Data = s.chunk.Bytes()
pins[0].MimeType = "video/mp2t"
_, _, err := s.client.Send(netsender.RequestPoll, pins)
// Only send if "V0" is configured as an input.
send := false
ip := s.client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = s.chunk.Len()
pins[i].Data = s.chunk.Bytes()
pins[i].MimeType = "video/mp2t"
break
}
}
if send {
_, _, err = s.client.Send(netsender.RequestPoll, pins)
}
// We will not retry, so release
// the chunk and clear it now.
s.chunk.Close()