revid: corrected conflict

This commit is contained in:
Saxon 2019-03-02 20:41:19 +10:30
commit e0c7f3074d
3 changed files with 27 additions and 29 deletions

View File

@ -96,9 +96,7 @@ func main() {
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
} }
time.Sleep(*runDurationPtr) time.Sleep(*runDurationPtr)
if err = rv.Stop(); err != nil { rv.Stop()
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
}
return return
} }
@ -360,10 +358,7 @@ func run(cfg revid.Config) error {
} }
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
log.Log(logger.Info, pkg+"Stopping burst...") log.Log(logger.Info, pkg+"Stopping burst...")
err = rv.Stop() rv.Stop()
if err != nil {
return err
}
ns.SetMode(paused, &vs) ns.SetMode(paused, &vs)
} }
sleep: sleep:

View File

@ -186,15 +186,18 @@ func (c *Config) Validate(r *Revid) error {
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
c.Packetization = Flv
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Outputs[i] = defaultOutput c.Outputs[i] = defaultOutput
c.Packetization = defaultPacketization
fallthrough fallthrough
case Http, Rtp: case Http, Rtp:
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
"framesPerClip", httpFramesPerClip) "framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip c.FramesPerClip = httpFramesPerClip
c.Packetization = Mpegts
default: default:
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }

View File

@ -51,10 +51,12 @@ import (
// Ring buffer sizes and read/write timeouts. // Ring buffer sizes and read/write timeouts.
const ( const (
ringBufferSize = 1000 mtsRbSize = 100
ringBufferElementSize = 150000 mtsRbElementSize = 150000
writeTimeout = 10 * time.Millisecond flvRbSize = 1000
readTimeout = 10 * time.Millisecond flvRbElementSize = 10000
writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond
) )
// RTMP connection properties. // RTMP connection properties.
@ -137,11 +139,6 @@ type packer struct {
// are deemed to be successful, although a successful // are deemed to be successful, although a successful
// write may include a dropped frame. // write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) { func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil
}
if len(p.owner.destination) == 0 { if len(p.owner.destination) == 0 {
panic("must have at least 1 destination") panic("must have at least 1 destination")
} }
@ -180,13 +177,10 @@ func (r *Revid) handleErrors() {
err := <-r.err err := <-r.err
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
err = r.Stop() r.Stop()
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error())
}
err = r.Start() err = r.Start()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart revid", "error", err.Error())
} }
} }
} }
@ -207,7 +201,14 @@ func (r *Revid) reset(config Config) error {
} }
r.config = config r.config = config
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) // NB: currently we use two outputs that require the same packetization method
// so we only need to check first output, but this may change later.
switch r.config.Outputs[0] {
case Rtmp, FfmpegRtmp:
r.buffer = ring.NewBuffer(flvRbSize, flvRbElementSize, writeTimeout)
case Http, Rtp:
r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout)
}
r.destination = make([]loadSender, 0, len(r.config.Outputs)) r.destination = make([]loadSender, 0, len(r.config.Outputs))
for _, typ := range r.config.Outputs { for _, typ := range r.config.Outputs {
@ -326,7 +327,8 @@ func (r *Revid) setIsRunning(b bool) {
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error { func (r *Revid) Start() error {
if r.IsRunning() { if r.IsRunning() {
return errors.New(pkg + "start called but revid is already running") r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
return nil
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here // TODO: this doesn't need to be here
@ -341,9 +343,10 @@ func (r *Revid) Start() error {
} }
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() error { func (r *Revid) Stop() {
if !r.IsRunning() { if !r.IsRunning() {
return errors.New(pkg + "stop called but revid is already stopped") r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
return
} }
r.config.Logger.Log(logger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
@ -355,14 +358,11 @@ func (r *Revid) Stop() error {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.wg.Wait() r.wg.Wait()
return nil
} }
func (r *Revid) Update(vars map[string]string) error { func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() { if r.IsRunning() {
if err := r.Stop(); err != nil { r.Stop()
return err
}
} }
//look through the vars and update revid where needed //look through the vars and update revid where needed
for key, value := range vars { for key, value := range vars {