Merged in revid-cleanup (pull request #134)

revid: revid cleaning and implementation of burst mode

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2019-02-05 00:34:04 +00:00
commit f96a04ece7
3 changed files with 224 additions and 182 deletions

View File

@ -50,6 +50,13 @@ const (
defaultLogVerbosity = logger.Debug
)
// Revid modes
const (
normal = "Normal"
paused = "Paused"
burst = "Burst"
)
// Other misc consts
const (
netSendRetryTime = 5 * time.Second
@ -72,23 +79,22 @@ func main() {
cfg := handleFlags()
if !*useNetsender {
// run revid for the specified duration
rv, _, err := startRevid(nil, cfg)
rv, err := revid.New(cfg, nil)
if err != nil {
cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error())
}
if err = rv.Start(); err != nil {
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
}
time.Sleep(*runDurationPtr)
err = stopRevid(rv)
if err != nil {
if err = rv.Stop(); err != nil {
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
}
return
}
err := run(nil, cfg)
if err != nil {
if err := run(cfg); err != nil {
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
os.Exit(1)
}
}
@ -244,66 +250,97 @@ func handleFlags() revid.Config {
}
// initialize then run the main NetSender client
func run(rv *revid.Revid, cfg revid.Config) error {
// initialize NetSender and use NetSender's logger
//config.Logger = netsender.Logger()
func run(cfg revid.Config) error {
log.Log(logger.Info, pkg+"running in NetSender mode")
var vars map[string]string
// initialize NetSender and use NetSender's logger
var ns netsender.Sender
err := ns.Init(log, nil, nil, nil)
if err != nil {
return err
}
vars, _ := ns.Vars()
vars, _ = ns.Vars()
vs := ns.VarSum()
paused := false
if vars["mode"] == "Paused" {
paused = true
rv, err := revid.New(cfg, &ns)
if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
}
if !paused {
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false)
// Update revid to get latest config settings from netreceiver.
err = rv.Update(vars)
if err != nil {
return err
}
// If mode on netreceiver isn't paused then we can start revid.
if ns.Mode() != paused && ns.Mode() != burst {
err = rv.Start()
if err != nil {
return err
}
}
if ns.Mode() == burst {
ns.SetMode(paused, &vs)
}
for {
if err := send(&ns, rv); err != nil {
log.Log(logger.Error, pkg+"polling failed", "error", err.Error())
// TODO(saxon): replace this call with call to ns.Run().
err = send(&ns, rv)
if err != nil {
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
if vs != ns.VarSum() {
// vars changed
vars, err := ns.Vars()
if err != nil {
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
vs = ns.VarSum()
if vars["mode"] == "Paused" {
if !paused {
log.Log(logger.Info, pkg+"pausing revid")
err = stopRevid(rv)
if err != nil {
log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error())
continue
}
paused = true
}
} else {
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused)
if err != nil {
return err
}
if paused {
paused = false
}
}
// If var sum hasn't change we continue
if vs == ns.VarSum() {
goto sleep
}
vars, err = ns.Vars()
if err != nil {
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
vs = ns.VarSum()
err = rv.Update(vars)
if err != nil {
return err
}
switch ns.Mode() {
case paused:
case normal:
err = rv.Start()
if err != nil {
return err
}
case burst:
log.Log(logger.Info, pkg+"Starting burst...")
err = rv.Start()
if err != nil {
return err
}
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
log.Log(logger.Info, pkg+"Stopping burst...")
err = rv.Stop()
if err != nil {
return err
}
ns.SetMode(paused, &vs)
}
sleep:
sleepTime, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
return err
}
sleepTime, _ := strconv.Atoi(ns.Param("mp"))
time.Sleep(time.Duration(sleepTime) * time.Second)
}
}
@ -331,139 +368,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error {
return nil
}
// wrappers for stopping and starting revid
func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) {
rv, err := revid.New(cfg, ns)
if err != nil {
return nil, cfg, err
}
err = rv.Start()
return rv, cfg, err
}
func stopRevid(rv *revid.Revid) error {
err := rv.Stop()
if err != nil {
return err
}
// FIXME(kortschak): Is this waiting on completion of work?
// Use a wait group and Wait method if it is.
time.Sleep(revidStopTime)
return nil
}
func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) {
if stop {
err := stopRevid(rv)
if err != nil {
return nil, cfg, err
}
}
//look through the vars and update revid where needed
for key, value := range vars {
switch key {
case "Output":
// FIXME(kortschak): There can be only one!
// How do we specify outputs after the first?
//
// Maybe we shouldn't be doing this!
switch value {
case "File":
cfg.Outputs[0] = revid.File
case "Http":
cfg.Outputs[0] = revid.Http
case "Rtmp":
cfg.Outputs[0] = revid.Rtmp
case "FfmpegRtmp":
cfg.Outputs[0] = revid.FfmpegRtmp
default:
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue
}
case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break
}
cfg.FramesPerClip = uint(f)
case "RtmpUrl":
cfg.RtmpUrl = value
case "Bitrate":
r, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
cfg.Bitrate = uint(r)
case "OutputFileName":
cfg.OutputFileName = value
case "InputFileName":
cfg.InputFileName = value
case "Height":
h, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid height param", "value", value)
break
}
cfg.Height = uint(h)
case "Width":
w, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid width param", "value", value)
break
}
cfg.Width = uint(w)
case "FrameRate":
r, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
cfg.FrameRate = uint(r)
case "HttpAddress":
cfg.HttpAddress = value
case "Quantization":
q, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break
}
cfg.Quantization = uint(q)
case "IntraRefreshPeriod":
p, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break
}
cfg.IntraRefreshPeriod = uint(p)
case "HorizontalFlip":
switch strings.ToLower(value) {
case "true":
cfg.FlipHorizontal = true
case "false":
cfg.FlipHorizontal = false
default:
log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
}
case "VerticalFlip":
switch strings.ToLower(value) {
case "true":
cfg.FlipVertical = true
case "false":
cfg.FlipVertical = false
default:
log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
default:
}
}
return startRevid(ns, cfg)
}
// flagStrings implements an appending string set flag.
type flagStrings []string

View File

@ -68,6 +68,7 @@ type Config struct {
RtpAddress string
Logger Logger
SendRetry bool
BurstPeriod uint
}
// Enums for config struct
@ -114,6 +115,7 @@ const (
defaultInputCodec = H264
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
defaultRtpAddr = "localhost:6970"
defaultBurstPeriod = 10 // Seconds
)
// Validate checks for any errors in the config fields and defaults settings
@ -200,6 +202,11 @@ func (c *Config) Validate(r *Revid) error {
}
}
if c.BurstPeriod == 0 {
c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod)
c.BurstPeriod = defaultBurstPeriod
}
if c.FramesPerClip < 1 {
c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting",
"framesPerClip", defaultFramesPerClip)

View File

@ -119,10 +119,11 @@ type Revid struct {
// bitrate hold the last send bitrate calculation result.
bitrate int
// isRunning is a loaded and cocked foot-gun.
mu sync.Mutex
isRunning bool
wg sync.WaitGroup
err chan error
}
@ -326,7 +327,14 @@ func (r *Revid) IsRunning() bool {
return ret
}
// setIsRunning sets revid.isRunning using b.
func (r *Revid) Config() Config {
r.mu.Lock()
cfg := r.config
r.mu.Unlock()
return cfg
}
// setIsRunning sets r.isRunning using b.
func (r *Revid) setIsRunning(b bool) {
r.mu.Lock()
r.isRunning = b
@ -340,9 +348,11 @@ func (r *Revid) Start() error {
return errors.New(pkg + "start called but revid is already running")
}
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.setIsRunning(true)
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
r.wg.Add(1)
go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput()
@ -363,12 +373,129 @@ func (r *Revid) Stop() error {
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
}
r.wg.Wait()
return nil
}
func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() {
if err := r.Stop(); err != nil {
return err
}
}
//look through the vars and update revid where needed
for key, value := range vars {
switch key {
case "Output":
// FIXME(kortschak): There can be only one!
// How do we specify outputs after the first?
//
// Maybe we shouldn't be doing this!
switch value {
case "File":
r.config.Outputs[0] = File
case "Http":
r.config.Outputs[0] = Http
case "Rtmp":
r.config.Outputs[0] = Rtmp
case "FfmpegRtmp":
r.config.Outputs[0] = FfmpegRtmp
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue
}
case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break
}
r.config.FramesPerClip = uint(f)
case "RtmpUrl":
r.config.RtmpUrl = value
case "Bitrate":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
r.config.Bitrate = uint(v)
case "OutputFileName":
r.config.OutputFileName = value
case "InputFileName":
r.config.InputFileName = value
case "Height":
h, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value)
break
}
r.config.Height = uint(h)
case "Width":
w, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value)
break
}
r.config.Width = uint(w)
case "FrameRate":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
r.config.FrameRate = uint(v)
case "HttpAddress":
r.config.HttpAddress = value
case "Quantization":
q, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break
}
r.config.Quantization = uint(q)
case "IntraRefreshPeriod":
p, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break
}
r.config.IntraRefreshPeriod = uint(p)
case "HorizontalFlip":
switch strings.ToLower(value) {
case "true":
r.config.FlipHorizontal = true
case "false":
r.config.FlipHorizontal = false
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
}
case "VerticalFlip":
switch strings.ToLower(value) {
case "true":
r.config.FlipVertical = true
case "false":
r.config.FlipVertical = false
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
case "BurstPeriod":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value)
break
}
r.config.BurstPeriod = uint(v)
}
}
return nil
}
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config
func (r *Revid) outputClips() {
defer r.wg.Done()
lastTime := time.Now()
var count int
loop:
@ -507,6 +634,7 @@ func (r *Revid) startRaspivid() error {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
}
r.wg.Add(1)
go r.processFrom(stdout, 0)
return nil
}
@ -555,6 +683,7 @@ func (r *Revid) startV4L() error {
return err
}
r.wg.Add(1)
go r.processFrom(stdout, time.Duration(0))
return nil
}
@ -570,6 +699,7 @@ func (r *Revid) setupInputForFile() error {
defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
return nil
}
@ -578,4 +708,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.encoder, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.wg.Done()
}