This commit is contained in:
scruzin 2019-03-05 14:28:06 +10:30
commit 53934ebb8c
8 changed files with 87 additions and 88 deletions

View File

@ -45,14 +45,6 @@ import (
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
const (
// progName is the program name for logging purposes.
progName = "revid-cli"
// Logging is set to INFO level.
defaultLogVerbosity = logger.Info
)
// Revid modes // Revid modes
const ( const (
normal = "Normal" normal = "Normal"
@ -62,17 +54,19 @@ const (
// Other misc consts // Other misc consts
const ( const (
netSendRetryTime = 5 * time.Second netSendRetryTime = 5 * time.Second
defaultRunDuration = 24 * time.Hour defaultRunDuration = 24 * time.Hour
revidStopTime = 5 * time.Second revidStopTime = 5 * time.Second
defaultLogPath = "/var/log/netsender" defaultLogPath = "/var/log/netsender"
pkg = "revid-cli:" pkg = "revid-cli:"
defaultLogVerbosity = logger.Info
defaultSleepTime = 60 // Seconds
) )
// canProfile is set to false with revid-cli is built with "-tags profile". // canProfile is set to false with revid-cli is built with "-tags profile".
var canProfile = true var canProfile = true
// The logger that will be used throughout // The logger that will be used throughout.
var log *logger.Logger var log *logger.Logger
const ( const (
@ -100,9 +94,7 @@ func main() {
return return
} }
if err := run(cfg); err != nil { run(cfg)
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
}
} }
// handleFlags parses command line flags and returns a revid configuration // handleFlags parses command line flags and returns a revid configuration
@ -267,16 +259,17 @@ func handleFlags() revid.Config {
} }
// initialize then run the main NetSender client // initialize then run the main NetSender client
func run(cfg revid.Config) error { func run(cfg revid.Config) {
log.Log(logger.Info, pkg+"running in NetSender mode") log.Log(logger.Info, pkg+"running in NetSender mode")
var vars map[string]string
var rv *revid.Revid var rv *revid.Revid
readPin := func(pin *netsender.Pin) error { readPin := func(pin *netsender.Pin) error {
switch { switch {
case pin.Name == "X23": case pin.Name == "X23":
if rv == nil {
pin.Value = -1
}
pin.Value = rv.Bitrate() pin.Value = rv.Bitrate()
case pin.Name[0] == 'X': case pin.Name[0] == 'X':
return sds.ReadSystem(pin) return sds.ReadSystem(pin)
@ -288,35 +281,10 @@ func run(cfg revid.Config) error {
ns, err := netsender.New(log, nil, readPin, nil) ns, err := netsender.New(log, nil, readPin, nil)
if err != nil { if err != nil {
return err log.Log(logger.Fatal, pkg+"could not initialise netsender client")
}
rv, err = revid.New(cfg, ns)
if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
}
vars, _ = ns.Vars()
vs := ns.VarSum()
// Update revid to get latest config settings from netreceiver.
err = rv.Update(vars)
if err != nil {
return err
}
// If mode on netreceiver isn't paused then we can start revid.
if ns.Mode() != paused && ns.Mode() != burst {
err = rv.Start()
if err != nil {
return err
}
}
if ns.Mode() == burst {
ns.SetMode(paused, &vs)
} }
var vs int
for { for {
err = ns.Run() err = ns.Run()
if err != nil { if err != nil {
@ -325,10 +293,13 @@ func run(cfg revid.Config) error {
continue continue
} }
// If var sum hasn't change we continue // If var sum hasn't changed we continue.
if vs == ns.VarSum() { var vars map[string]string
newVs := ns.VarSum()
if vs == newVs {
goto sleep goto sleep
} }
vs = newVs
vars, err = ns.Vars() vars, err = ns.Vars()
if err != nil { if err != nil {
@ -336,35 +307,52 @@ func run(cfg revid.Config) error {
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
vs = ns.VarSum()
if rv == nil {
rv, err = revid.New(cfg, ns)
if err != nil {
log.Log(logger.Warning, pkg+"could not initialise revid", "error", err.Error())
goto sleep
}
}
err = rv.Update(vars) err = rv.Update(vars)
if err != nil { if err != nil {
return err log.Log(logger.Warning, pkg+"Couldn't update revid", "error", err.Error())
goto sleep
} }
switch ns.Mode() { switch ns.Mode() {
case paused: case paused:
rv.Stop()
case normal: case normal:
err = rv.Start() err = rv.Start()
if err != nil { if err != nil {
return err log.Log(logger.Warning, pkg+"could not start revid", "error", err.Error())
ns.SetMode(paused, &vs)
goto sleep
} }
case burst: case burst:
log.Log(logger.Info, pkg+"Starting burst...") log.Log(logger.Info, pkg+"Starting burst...")
err = rv.Start() err = rv.Start()
if err != nil { if err != nil {
return err log.Log(logger.Warning, pkg+"could not start burst", "error", err.Error())
ns.SetMode(paused, &vs)
goto sleep
} }
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
log.Log(logger.Info, pkg+"Stopping burst...") log.Log(logger.Info, pkg+"Stopping burst...")
rv.Stop() rv.Stop()
ns.SetMode(paused, &vs) ns.SetMode(paused, &vs)
} }
sleep: sleep:
sleepTime, err := strconv.Atoi(ns.Param("mp")) sleepTime, err := strconv.Atoi(ns.Param("mp"))
if err != nil { if err != nil {
return err log.Log(logger.Error, pkg+"could not get sleep time, using default")
sleepTime = defaultSleepTime
} }
time.Sleep(time.Duration(sleepTime) * time.Second) time.Sleep(time.Duration(sleepTime) * time.Second)
} }

View File

@ -1,16 +1,18 @@
# install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.) # install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.)
# NB: the default (soft) install does not override conf files # NB: the default (soft) install does not override conf files
USER := $(shell whoami) USER := $(shell whoami)
PATH := /usr/local/go/bin:$(PATH)
.SILENT:make_dirs .SILENT:make_dirs
.SILENT:soft_copy_files .SILENT:soft_copy_files
.SILENT:hard_copy_files .SILENT:hard_copy_files
.SILENT:syncreboot
.SILENT:clean .SILENT:clean
install: as_root make_dirs soft_copy_files install: as_root make_dirs soft_copy_files syncreboot
@echo "Install complete" @echo "Install complete"
install_hard: as_root make_dirs hard_copy_files install_hard: as_root make_dirs hard_copy_files syncreboot
@echo "Hard install complete" @echo "Hard install complete"
as_root: as_root:
@ -38,6 +40,7 @@ soft_copy_files:
echo "/etc/netsender.conf left unmodified" ; \ echo "/etc/netsender.conf left unmodified" ; \
else \ else \
cp netsender.conf /etc; \ cp netsender.conf /etc; \
chown pi /etc/netsender.conf; \
fi fi
hard_copy_files: hard_copy_files:
@ -51,6 +54,10 @@ hard_copy_files:
cp /etc/netsender.conf /etc/netsender.conf.bak ; \ cp /etc/netsender.conf /etc/netsender.conf.bak ; \
fi fi
cp -f netsender.conf /etc cp -f netsender.conf /etc
chown pi /etc/netsender.conf
syncreboot:
cd ../../utils/cmd/syncreboot; make; make install
clean: as_root clean: as_root
rm -rf /var/netsender rm -rf /var/netsender

View File

@ -57,8 +57,8 @@ type Config struct {
FramesPerClip uint FramesPerClip uint
RtmpUrl string RtmpUrl string
Bitrate uint Bitrate uint
OutputPath string OutputPath string
InputPath string InputPath string
Height uint Height uint
Width uint Width uint
FrameRate uint FrameRate uint
@ -126,7 +126,7 @@ func (c *Config) Validate(r *Revid) error {
case No: case No:
case NothingDefined: case NothingDefined:
c.LogLevel = defaultVerbosity c.LogLevel = defaultVerbosity
c.Logger.Log(logger.Warning, pkg+"no LogLevel mode defined, defaulting", c.Logger.Log(logger.Info, pkg+"no LogLevel mode defined, defaulting",
"LogLevel", defaultVerbosity) "LogLevel", defaultVerbosity)
default: default:
return errors.New("bad LogLevel defined in config") return errors.New("bad LogLevel defined in config")
@ -135,7 +135,7 @@ func (c *Config) Validate(r *Revid) error {
switch c.Input { switch c.Input {
case Raspivid, V4L, File: case Raspivid, V4L, File:
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no input type defined, defaulting", "input", c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input",
defaultInput) defaultInput)
c.Input = defaultInput c.Input = defaultInput
default: default:
@ -160,10 +160,10 @@ func (c *Config) Validate(r *Revid) error {
} }
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no input codec defined, defaulting", c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting",
"inputCodec", defaultInputCodec) "inputCodec", defaultInputCodec)
c.InputCodec = defaultInputCodec c.InputCodec = defaultInputCodec
c.Logger.Log(logger.Warning, pkg+"defaulting quantization", "quantization", c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization",
defaultQuantization) defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
@ -187,8 +187,9 @@ func (c *Config) Validate(r *Revid) error {
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
c.Packetization = Flv c.Packetization = Flv
c.SendRetry = true
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Outputs[i] = defaultOutput c.Outputs[i] = defaultOutput
c.Packetization = defaultPacketization c.Packetization = defaultPacketization
@ -204,43 +205,43 @@ func (c *Config) Validate(r *Revid) error {
} }
if c.BurstPeriod == 0 { if c.BurstPeriod == 0 {
c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) c.Logger.Log(logger.Info, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod)
c.BurstPeriod = defaultBurstPeriod c.BurstPeriod = defaultBurstPeriod
} }
if c.FramesPerClip < 1 { if c.FramesPerClip < 1 {
c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting", c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting",
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
} }
if c.Width == 0 { if c.Width == 0 {
c.Logger.Log(logger.Warning, pkg+"no width defined, defaulting", "width", defaultWidth) c.Logger.Log(logger.Info, pkg+"no width defined, defaulting", "width", defaultWidth)
c.Width = defaultWidth c.Width = defaultWidth
} }
if c.Height == 0 { if c.Height == 0 {
c.Logger.Log(logger.Warning, pkg+"no height defined, defaulting", "height", defaultHeight) c.Logger.Log(logger.Info, pkg+"no height defined, defaulting", "height", defaultHeight)
c.Height = defaultHeight c.Height = defaultHeight
} }
if c.FrameRate == 0 { if c.FrameRate == 0 {
c.Logger.Log(logger.Warning, pkg+"no frame rate defined, defaulting", "fps", defaultFrameRate) c.Logger.Log(logger.Info, pkg+"no frame rate defined, defaulting", "fps", defaultFrameRate)
c.FrameRate = defaultFrameRate c.FrameRate = defaultFrameRate
} }
if c.Bitrate == 0 { if c.Bitrate == 0 {
c.Logger.Log(logger.Warning, pkg+"no bitrate defined, defaulting", "bitrate", defaultBitrate) c.Logger.Log(logger.Info, pkg+"no bitrate defined, defaulting", "bitrate", defaultBitrate)
c.Bitrate = defaultBitrate c.Bitrate = defaultBitrate
} }
if c.IntraRefreshPeriod == 0 { if c.IntraRefreshPeriod == 0 {
c.Logger.Log(logger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod) c.Logger.Log(logger.Info, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod)
c.IntraRefreshPeriod = defaultIntraRefreshPeriod c.IntraRefreshPeriod = defaultIntraRefreshPeriod
} }
if c.Quantization == 0 { if c.Quantization == 0 {
c.Logger.Log(logger.Warning, pkg+"no quantization defined, defaulting", "quantization", defaultQuantization) c.Logger.Log(logger.Info, pkg+"no quantization defined, defaulting", "quantization", defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
} else if c.Quantization > 51 { } else if c.Quantization > 51 {
return errors.New("quantisation is over threshold") return errors.New("quantisation is over threshold")

View File

@ -54,7 +54,7 @@ const (
mtsRbSize = 100 mtsRbSize = 100
mtsRbElementSize = 150000 mtsRbElementSize = 150000
flvRbSize = 1000 flvRbSize = 1000
flvRbElementSize = 10000 flvRbElementSize = 100000
writeTimeout = 10 * time.Millisecond writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond
) )
@ -355,6 +355,9 @@ func (r *Revid) Start() error {
go r.outputClips() go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput() err := r.setupInput()
if err != nil {
r.Stop()
}
return err return err
} }
@ -537,7 +540,7 @@ loop:
err = dest.send() err = dest.send()
if err == nil { if err == nil {
r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i)) r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else if r.config.SendRetry == false { } else if !r.config.SendRetry {
r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+" failed", "error", err.Error()) r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+" failed", "error", err.Error())
} else { } else {
r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+ r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+
@ -546,19 +549,16 @@ loop:
if err != nil && chunk.Len() > 11 { if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
for err != nil { for err != nil {
time.Sleep(sendFailedDelay)
if rs, ok := dest.(restarter); ok { if rs, ok := dest.(restarter); ok {
r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs) r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart() err = rs.restart()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.setIsRunning(false) time.Sleep(sendFailedDelay)
return continue
} }
r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session, sending again")
r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session")
} }
err = dest.send() err = dest.send()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error())

View File

@ -298,6 +298,9 @@ func (s *rtmpSender) load(c *ring.Chunk) error {
func (s *rtmpSender) send() error { func (s *rtmpSender) send() error {
_, err := s.chunk.WriteTo(s.conn) _, err := s.chunk.WriteTo(s.conn)
if err == rtmp.ErrInvalidFlvTag {
return nil
}
return err return err
} }
@ -307,17 +310,14 @@ func (s *rtmpSender) release() {
} }
func (s *rtmpSender) restart() error { func (s *rtmpSender) restart() error {
err := s.conn.Close() s.close()
if err != nil { var err error
return err
}
for n := 0; n < s.retries; n++ { for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
if err == nil { if err == nil {
break break
} }
s.log(logger.Error, err.Error()) s.log(logger.Error, err.Error())
s.conn.Close()
if n < s.retries-1 { if n < s.retries-1 {
s.log(logger.Info, pkg+"retry rtmp connection") s.log(logger.Info, pkg+"retry rtmp connection")
} }
@ -326,7 +326,10 @@ func (s *rtmpSender) restart() error {
} }
func (s *rtmpSender) close() error { func (s *rtmpSender) close() error {
return s.conn.Close() if s.conn != nil {
return s.conn.Close()
}
return nil
} }
// udpSender implements loadSender for a native udp destination. // udpSender implements loadSender for a native udp destination.

View File

@ -155,7 +155,7 @@ func (c *Conn) Write(data []byte) (int, error) {
return 0, errNotConnected return 0, errNotConnected
} }
if len(data) < flvTagheaderSize { if len(data) < flvTagheaderSize {
return 0, errInvalidFlvTag return 0, ErrInvalidFlvTag
} }
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented return 0, errUnimplemented

View File

@ -158,7 +158,7 @@ var (
errNotWritable = errors.New("rtmp: connection not writable") errNotWritable = errors.New("rtmp: connection not writable")
errInvalidHeader = errors.New("rtmp: invalid header") errInvalidHeader = errors.New("rtmp: invalid header")
errInvalidBody = errors.New("rtmp: invalid body") errInvalidBody = errors.New("rtmp: invalid body")
errInvalidFlvTag = errors.New("rtmp: invalid FLV tag") ErrInvalidFlvTag = errors.New("rtmp: invalid FLV tag")
errUnimplemented = errors.New("rtmp: unimplemented feature") errUnimplemented = errors.New("rtmp: unimplemented feature")
) )

View File

@ -216,7 +216,7 @@ type rtmpSender struct {
func (rs *rtmpSender) Write(p []byte) (int, error) { func (rs *rtmpSender) Write(p []byte) (int, error) {
n, err := rs.conn.Write(p) n, err := rs.conn.Write(p)
if err != errInvalidFlvTag && err != nil { if err != ErrInvalidFlvTag && err != nil {
return 0, err return 0, err
} }
return n, nil return n, nil