revid: only using one set of ring buffer parameters rather than MTS and RTMP

This commit is contained in:
Saxon 2020-01-09 13:58:44 +10:30
parent 7086960e66
commit d90d2203bd
3 changed files with 73 additions and 107 deletions

View File

@ -87,16 +87,9 @@ const (
defaultPSITime = 2 defaultPSITime = 2
// Ring buffer defaults. // Ring buffer defaults.
defaultRBMaxElements = 10000
// MTS ring buffer defaults. defaultRBCapacity = 200000000 // bytes (200MB)
defaultMTSRBMaxElements = 10000 defaultRBWriteTimeout = 5
defaultMTSRBCapacity = 200000000 // bytes
defaultMTSRBWriteTimeout = 5
// RTMP ring buffer defaults.
defaultRTMPRBMaxElements = 10000
defaultRTMPRBCapacity = 200000000 // bytes
defaultRTMPRBWriteTimeout = 5
// Motion filter parameter defaults. // Motion filter parameter defaults.
defaultMinFPS = 1.0 defaultMinFPS = 1.0
@ -284,15 +277,10 @@ type Config struct {
Filters []int // Defines the methods of filtering to be used in between lexing and encoding. Filters []int // Defines the methods of filtering to be used in between lexing and encoding.
PSITime int // Sets the time between a packet being sent. PSITime int // Sets the time between a packet being sent.
// RTMP ring buffer parameters. // Ring buffer parameters.
RTMPRBMaxElements int // The maximum possible number of elements in ring buffer. RBMaxElements int // The maximum possible number of elements in ring buffer.
RTMPRBCapacity int // The total number of bytes available for the ring buffer. RBCapacity int // The total number of bytes available for the ring buffer.
RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds. RBWriteTimeout int // The ringbuffer write timeout in seconds.
// MTS ring buffer parameters.
MTSRBMaxElements int // The maximum possible number of elements in ring buffer.
MTSRBCapacity int // The total number of bytes available for the ring buffer.
MTSRBWriteTimeout int // The ringbuffer write timeout in seconds.
// Motion filter parameters. // Motion filter parameters.
MinFPS float64 // The reduced framerate of the video when there is no motion. MinFPS float64 // The reduced framerate of the video when there is no motion.
@ -338,17 +326,14 @@ var TypeData = map[string]string{
"MOGHistory": "uint", "MOGHistory": "uint",
"MOGMinArea": "float", "MOGMinArea": "float",
"MOGThreshold": "float", "MOGThreshold": "float",
"MTSRBCapacity": "uint", "RBCapacity": "uint",
"MTSRBMaxElements": "uint", "RBMaxElements": "uint",
"MTSRBWriteTimeout": "uint", "RBWriteTimeout": "uint",
"Output": "enum:File,Http,Rtmp,Rtp", "Output": "enum:File,Http,Rtmp,Rtp",
"OutputPath": "string", "OutputPath": "string",
"Outputs": "enums:File,Http,Rtmp,Rtp", "Outputs": "enums:File,Http,Rtmp,Rtp",
"Quantization": "uint", "Quantization": "uint",
"Rotation": "uint", "Rotation": "uint",
"RTMPRBCapacity": "uint",
"RTMPRBMaxElements": "uint",
"RTMPRBWriteTimeout": "uint",
"RTMPURL": "string", "RTMPURL": "string",
"RTPAddress": "string", "RTPAddress": "string",
"Saturation": "int", "Saturation": "int",
@ -456,34 +441,19 @@ func (c *Config) Validate() error {
c.RTPAddress = defaultRtpAddr c.RTPAddress = defaultRtpAddr
} }
if c.RTMPRBMaxElements <= 0 { if c.RBMaxElements <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBMaxElements bad or unset, defaulting", "RTMPRBMaxElements", defaultRTMPRBMaxElements) c.Logger.Log(logger.Info, pkg+"RBMaxElements bad or unset, defaulting", "RBMaxElements", defaultRBMaxElements)
c.RTMPRBMaxElements = defaultRTMPRBMaxElements c.RBMaxElements = defaultRBMaxElements
} }
if c.RTMPRBCapacity <= 0 { if c.RBCapacity <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBCapacity bad or unset, defaulting", "RTMPRBCapacity", defaultRTMPRBCapacity) c.Logger.Log(logger.Info, pkg+"RBCapacity bad or unset, defaulting", "RBCapacity", defaultRBCapacity)
c.RTMPRBCapacity = defaultRTMPRBCapacity c.RBCapacity = defaultRBCapacity
} }
if c.RTMPRBWriteTimeout <= 0 { if c.RBWriteTimeout <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout) c.Logger.Log(logger.Info, pkg+"RBWriteTimeout bad or unset, defaulting", "RBWriteTimeout", defaultRBWriteTimeout)
c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout c.RBWriteTimeout = defaultRBWriteTimeout
}
if c.MTSRBMaxElements <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBMaxElements)
c.MTSRBMaxElements = defaultMTSRBMaxElements
}
if c.MTSRBCapacity <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBCapacity)
c.MTSRBCapacity = defaultMTSRBCapacity
}
if c.MTSRBWriteTimeout <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout)
c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout
} }
if c.PSITime <= 0 { if c.PSITime <= 0 {

View File

@ -261,7 +261,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
for _, out := range r.cfg.Outputs { for _, out := range r.cfg.Outputs {
switch out { switch out {
case config.OutputHTTP: case config.OutputHTTP:
rb, err := vring.NewBuffer(r.cfg.MTSRBMaxElements, r.cfg.MTSRBCapacity, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second) rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
if err != nil { if err != nil {
return fmt.Errorf("could not initialise MTS ring buffer: %w", err) return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
} }
@ -285,7 +285,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
} }
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case config.OutputRTMP: case config.OutputRTMP:
rb, err := vring.NewBuffer(r.cfg.RTMPRBMaxElements, r.cfg.RTMPRBCapacity, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second) rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
if err != nil { if err != nil {
return fmt.Errorf("could not initialise RTMP ring buffer: %w", err) return fmt.Errorf("could not initialise RTMP ring buffer: %w", err)
} }
@ -706,42 +706,42 @@ func (r *Revid) Update(vars map[string]string) error {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBMaxElements var", "value", value) r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBMaxElements var", "value", value)
break break
} }
r.cfg.RTMPRBMaxElements = v r.cfg.RBMaxElements = v
case "RTMPRBCapacity": case "RTMPRBCapacity":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBCapacity var", "value", value) r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBCapacity var", "value", value)
break break
} }
r.cfg.RTMPRBCapacity = v r.cfg.RBCapacity = v
case "RTMPRBWriteTimeout": case "RTMPRBWriteTimeout":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v <= 0 { if err != nil || v <= 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBWriteTimeout var", "value", value) r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBWriteTimeout var", "value", value)
break break
} }
r.cfg.RTMPRBWriteTimeout = v r.cfg.RBWriteTimeout = v
case "MTSRBMaxElements": case "MTSRBMaxElements":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBMaxElements var", "value", value) r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBMaxElements var", "value", value)
break break
} }
r.cfg.MTSRBMaxElements = v r.cfg.RBMaxElements = v
case "MTSRBCapacity": case "MTSRBCapacity":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBCapacity var", "value", value) r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBCapacity var", "value", value)
break break
} }
r.cfg.MTSRBCapacity = v r.cfg.RBCapacity = v
case "MTSRBWriteTimeout": case "MTSRBWriteTimeout":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v <= 0 { if err != nil || v <= 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBWriteTimeout var", "value", value) r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBWriteTimeout var", "value", value)
break break
} }
r.cfg.MTSRBWriteTimeout = v r.cfg.RBWriteTimeout = v
case "CBR": case "CBR":
v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)] v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)]
if !ok { if !ok {

View File

@ -46,16 +46,12 @@ var (
errSendFailed = errors.New("send failed") errSendFailed = errors.New("send failed")
) )
// Ring buffer parameters.
const ( const (
// MTS ring buffer defaults. // MTS ring buffer defaults.
mtsRBMaxElements = 10000 rbMaxElements = 10000
mtsRBCapacity = 200000000 // bytes rbCapacity = 200000000 // bytes
mtsRBWriteTimeout = 5 rbWriteTimeout = 5
// RTMP ring buffer defaults.
rtmpRBMaxElements = 10000
rtmpRBCapacity = 200000000 // bytes
rtmpRBWriteTimeout = 5
) )
// destination simulates a destination for the mtsSender. It allows for the // destination simulates a destination for the mtsSender. It allows for the
@ -146,7 +142,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
rb, err := vring.NewBuffer(mtsRBMaxElements, mtsRBCapacity, 0) rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 0)
if err != nil { if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err) t.Fatalf("could not initialise ring buffer: %v", err)
} }
@ -229,7 +225,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
rb, err := vring.NewBuffer(mtsRBMaxElements, mtsRBCapacity, 0) rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 0)
if err != nil { if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err) t.Fatalf("could not initialise ring buffer: %v", err)
} }
@ -314,7 +310,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
rb, err := vring.NewBuffer(1, mtsRBCapacity, 0) rb, err := vring.NewBuffer(1, rbCapacity, 0)
if err != nil { if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err) t.Fatalf("could not initialise ring buffer: %v", err)
} }