Merged in start-in-paused-state (pull request #155)

Start in paused state and more robustness

Approved-by: Alan Noble <anoble@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2019-03-03 20:21:34 +00:00 committed by Alan Noble
commit 05c2c77f31
7 changed files with 64 additions and 69 deletions

View File

@ -45,14 +45,6 @@ import (
"bitbucket.org/ausocean/utils/logger"
)
const (
// progName is the program name for logging purposes.
progName = "revid-cli"
// Logging is set to INFO level.
defaultLogVerbosity = logger.Info
)
// Revid modes
const (
normal = "Normal"
@ -62,17 +54,19 @@ const (
// Other misc consts
const (
netSendRetryTime = 5 * time.Second
defaultRunDuration = 24 * time.Hour
revidStopTime = 5 * time.Second
defaultLogPath = "/var/log/netsender"
pkg = "revid-cli:"
netSendRetryTime = 5 * time.Second
defaultRunDuration = 24 * time.Hour
revidStopTime = 5 * time.Second
defaultLogPath = "/var/log/netsender"
pkg = "revid-cli:"
defaultLogVerbosity = logger.Info
defaultSleepTime = 60 // Seconds
)
// canProfile is set to false with revid-cli is built with "-tags profile".
var canProfile = true
// The logger that will be used throughout
// The logger that will be used throughout.
var log *logger.Logger
const (
@ -100,9 +94,7 @@ func main() {
return
}
if err := run(cfg); err != nil {
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
}
run(cfg)
}
// handleFlags parses command line flags and returns a revid configuration
@ -267,16 +259,17 @@ func handleFlags() revid.Config {
}
// initialize then run the main NetSender client
func run(cfg revid.Config) error {
func run(cfg revid.Config) {
log.Log(logger.Info, pkg+"running in NetSender mode")
var vars map[string]string
var rv *revid.Revid
readPin := func(pin *netsender.Pin) error {
switch {
case pin.Name == "X23":
if rv == nil {
pin.Value = -1
}
pin.Value = rv.Bitrate()
case pin.Name[0] == 'X':
return sds.ReadSystem(pin)
@ -288,35 +281,10 @@ func run(cfg revid.Config) error {
ns, err := netsender.New(log, nil, readPin, nil)
if err != nil {
return err
}
rv, err = revid.New(cfg, ns)
if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
}
vars, _ = ns.Vars()
vs := ns.VarSum()
// Update revid to get latest config settings from netreceiver.
err = rv.Update(vars)
if err != nil {
return err
}
// If mode on netreceiver isn't paused then we can start revid.
if ns.Mode() != paused && ns.Mode() != burst {
err = rv.Start()
if err != nil {
return err
}
}
if ns.Mode() == burst {
ns.SetMode(paused, &vs)
log.Log(logger.Fatal, pkg+"could not initialise netsender client")
}
var vs int
for {
err = ns.Run()
if err != nil {
@ -325,10 +293,13 @@ func run(cfg revid.Config) error {
continue
}
// If var sum hasn't change we continue
if vs == ns.VarSum() {
// If var sum hasn't changed we continue.
var vars map[string]string
newVs := ns.VarSum()
if vs == newVs {
goto sleep
}
vs = newVs
vars, err = ns.Vars()
if err != nil {
@ -336,35 +307,52 @@ func run(cfg revid.Config) error {
time.Sleep(netSendRetryTime)
continue
}
vs = ns.VarSum()
if rv == nil {
rv, err = revid.New(cfg, ns)
if err != nil {
log.Log(logger.Warning, pkg+"could not initialise revid", "error", err.Error())
goto sleep
}
}
err = rv.Update(vars)
if err != nil {
return err
log.Log(logger.Warning, pkg+"Couldn't update revid", "error", err.Error())
goto sleep
}
switch ns.Mode() {
case paused:
rv.Stop()
case normal:
err = rv.Start()
if err != nil {
return err
log.Log(logger.Warning, pkg+"could not start revid", "error", err.Error())
ns.SetMode(paused, &vs)
goto sleep
}
case burst:
log.Log(logger.Info, pkg+"Starting burst...")
err = rv.Start()
if err != nil {
return err
log.Log(logger.Warning, pkg+"could not start burst", "error", err.Error())
ns.SetMode(paused, &vs)
goto sleep
}
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
log.Log(logger.Info, pkg+"Stopping burst...")
rv.Stop()
ns.SetMode(paused, &vs)
}
sleep:
sleepTime, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
return err
log.Log(logger.Error, pkg+"could not get sleep time, using default")
sleepTime = defaultSleepTime
}
time.Sleep(time.Duration(sleepTime) * time.Second)
}

View File

@ -57,8 +57,8 @@ type Config struct {
FramesPerClip uint
RtmpUrl string
Bitrate uint
OutputPath string
InputPath string
OutputPath string
InputPath string
Height uint
Width uint
FrameRate uint
@ -187,6 +187,7 @@ func (c *Config) Validate(r *Revid) error {
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
c.Packetization = Flv
c.SendRetry = true
case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput)

View File

@ -54,7 +54,7 @@ const (
mtsRbSize = 100
mtsRbElementSize = 150000
flvRbSize = 1000
flvRbElementSize = 10000
flvRbElementSize = 100000
writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond
)
@ -355,6 +355,9 @@ func (r *Revid) Start() error {
go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput()
if err != nil {
r.Stop()
}
return err
}
@ -537,7 +540,7 @@ loop:
err = dest.send()
if err == nil {
r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else if r.config.SendRetry == false {
} else if !r.config.SendRetry {
r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+" failed", "error", err.Error())
} else {
r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+
@ -546,19 +549,16 @@ loop:
if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
for err != nil {
time.Sleep(sendFailedDelay)
if rs, ok := dest.(restarter); ok {
r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.setIsRunning(false)
return
time.Sleep(sendFailedDelay)
continue
}
r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session")
r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session, sending again")
}
err = dest.send()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error())

View File

@ -298,6 +298,9 @@ func (s *rtmpSender) load(c *ring.Chunk) error {
func (s *rtmpSender) send() error {
_, err := s.chunk.WriteTo(s.conn)
if err == rtmp.ErrInvalidFlvTag {
return nil
}
return err
}
@ -307,7 +310,7 @@ func (s *rtmpSender) release() {
}
func (s *rtmpSender) restart() error {
s.conn.Close()
s.close()
var err error
for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
@ -323,7 +326,10 @@ func (s *rtmpSender) restart() error {
}
func (s *rtmpSender) close() error {
return s.conn.Close()
if s.conn != nil {
return s.conn.Close()
}
return nil
}
// udpSender implements loadSender for a native udp destination.

View File

@ -155,7 +155,7 @@ func (c *Conn) Write(data []byte) (int, error) {
return 0, errNotConnected
}
if len(data) < flvTagheaderSize {
return 0, errInvalidFlvTag
return 0, ErrInvalidFlvTag
}
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented

View File

@ -158,7 +158,7 @@ var (
errNotWritable = errors.New("rtmp: connection not writable")
errInvalidHeader = errors.New("rtmp: invalid header")
errInvalidBody = errors.New("rtmp: invalid body")
errInvalidFlvTag = errors.New("rtmp: invalid FLV tag")
ErrInvalidFlvTag = errors.New("rtmp: invalid FLV tag")
errUnimplemented = errors.New("rtmp: unimplemented feature")
)

View File

@ -216,7 +216,7 @@ type rtmpSender struct {
func (rs *rtmpSender) Write(p []byte) (int, error) {
n, err := rs.conn.Write(p)
if err != errInvalidFlvTag && err != nil {
if err != ErrInvalidFlvTag && err != nil {
return 0, err
}
return n, nil