revid: made RTMP and MTS sender ring buffer write timeouts configurable

This commit is contained in:
Saxon 2019-10-12 21:10:48 +10:30
parent 8489d615d8
commit 874b9639cf
3 changed files with 42 additions and 11 deletions

View File

@ -122,11 +122,15 @@ const (
defaultChannels = 1 defaultChannels = 1
defaultRecPeriod = 1.0 defaultRecPeriod = 1.0
// Ringbuffer defaults. // MTS ring buffer defaults.
defaultMTSRBSize = 1000 defaultMTSRBSize = 1000
defaultMTSRBElementSize = 100000 defaultMTSRBElementSize = 100000
defaultMTSRBWriteTimeout = 5 * time.Second
// RTMP ring buffer defaults.
defaultRTMPRBSize = 1000 defaultRTMPRBSize = 1000
defaultRTMPRBElementSize = 300000 defaultRTMPRBElementSize = 300000
defaultRTMPRBWriteTimeout = 5 * time.Second
) )
// Config provides parameters relevant to a revid instance. A new config must // Config provides parameters relevant to a revid instance. A new config must
@ -251,11 +255,15 @@ type Config struct {
FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input. FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input.
FlipVertical bool // FlipVertial flips video vertically for Raspivid input. FlipVertical bool // FlipVertial flips video vertically for Raspivid input.
// Ring buffer sizes. // RTMP ring buffer parameters.
RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer. RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
RTMPRBWriteTimeout time.Duration
// MTS ring buffer parameters.
MTSRBSize int // The number of elements in the MTS sender ringbuffer. MTSRBSize int // The number of elements in the MTS sender ringbuffer.
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer. MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
MTSRBWriteTimeout time.Duration
} }
// Validation errors. // Validation errors.
@ -453,6 +461,11 @@ func (c *Config) Validate() error {
c.RTMPRBElementSize = defaultRTMPRBElementSize c.RTMPRBElementSize = defaultRTMPRBElementSize
} }
if c.RTMPRBWriteTimeout <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout)
c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout
}
if c.MTSRBSize <= 0 { if c.MTSRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize) c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
c.MTSRBSize = defaultMTSRBSize c.MTSRBSize = defaultMTSRBSize
@ -463,6 +476,11 @@ func (c *Config) Validate() error {
c.MTSRBElementSize = defaultMTSRBElementSize c.MTSRBElementSize = defaultMTSRBElementSize
} }
if c.MTSRBWriteTimeout <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout)
c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout
}
return nil return nil
} }

View File

@ -252,7 +252,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
w = newMtsSender( w = newMtsSender(
newHttpSender(r.ns, r.config.Logger.Log), newHttpSender(r.ns, r.config.Logger.Log),
r.config.Logger.Log, r.config.Logger.Log,
ring.NewBuffer(r.config.MTSRBSize, r.config.MTSRBElementSize, 0), ring.NewBuffer(r.config.MTSRBSize, r.config.MTSRBElementSize, r.config.MTSRBWriteTimeout),
r.config.ClipDuration, r.config.ClipDuration,
) )
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
@ -273,8 +273,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
r.config.RTMPURL, r.config.RTMPURL,
rtmpConnectionTimeout, rtmpConnectionTimeout,
rtmpConnectionMaxTries, rtmpConnectionMaxTries,
r.config.RTMPRBSize, ring.NewBuffer(r.config.RTMPRBSize, r.config.RTMPRBElementSize, r.config.RTMPRBWriteTimeout),
r.config.RTMPRBElementSize,
r.config.Logger.Log, r.config.Logger.Log,
) )
if err != nil { if err != nil {
@ -573,6 +572,13 @@ func (r *Revid) Update(vars map[string]string) error {
break break
} }
r.config.RTMPRBElementSize = v r.config.RTMPRBElementSize = v
case "RTMPRBWriteTimeout":
v, err := strconv.Atoi(value)
if err != nil || v <= 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBWriteTimeout var", "value", value)
break
}
r.config.RTMPRBWriteTimeout = time.Duration(v) * time.Second
case "MTSRBSize": case "MTSRBSize":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
@ -587,6 +593,13 @@ func (r *Revid) Update(vars map[string]string) error {
break break
} }
r.config.MTSRBElementSize = v r.config.MTSRBElementSize = v
case "MTSRBWriteTimeout":
v, err := strconv.Atoi(value)
if err != nil || v <= 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBWriteTimeout var", "value", value)
break
}
r.config.MTSRBWriteTimeout = time.Duration(v) * time.Second
} }
} }
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))

View File

@ -275,7 +275,7 @@ type rtmpSender struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn var conn *rtmp.Conn
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
@ -294,7 +294,7 @@ func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int,
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
ring: ring.NewBuffer(rbSize, rbElementSize, 5*time.Second), ring: rb,
done: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)