revid: rename KeyRB* and RB* to refer to pool

This also changes lexically significant strings in configuration text.
This commit is contained in:
Dan Kortschak 2021-05-17 11:18:44 +09:30
parent 7541015730
commit 05a6b4435a
No known key found for this signature in database
GPG Key ID: 45817BAD958E3F0C
11 changed files with 172 additions and 170 deletions

View File

@ -7,5 +7,5 @@
"TimelapseInterval":"10",
"logging":"Debug",
"Suppress":"false",
"RBStartElementSize":"1000000"
"PoolStartElementSize":"1000000"
}

View File

@ -236,9 +236,9 @@ type Config struct {
PSITime uint // Sets the time between a packet being sent.
Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera.
RBCapacity uint // The number of bytes the pool buffer will occupy.
RBStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames.
RBWriteTimeout uint // The pool buffer write timeout in seconds.
PoolCapacity uint // The number of bytes the pool buffer will occupy.
PoolStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames.
PoolWriteTimeout uint // The pool buffer write timeout in seconds.
RecPeriod float64 // How many seconds to record at a time.
Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input.
RTMPURL string // RTMPURL specifies the Rtmp output destination URL. This must be defined if RTMP is to be used as an output.

View File

@ -59,9 +59,9 @@ func TestValidate(t *testing.T) {
ClipDuration: defaultClipDuration,
PSITime: defaultPSITime,
FileFPS: defaultFileFPS,
RBCapacity: defaultRBCapacity,
RBStartElementSize: defaultRBStartElementSize,
RBWriteTimeout: defaultRBWriteTimeout,
PoolCapacity: defaultPoolCapacity,
PoolStartElementSize: defaultPoolStartElementSize,
PoolWriteTimeout: defaultPoolWriteTimeout,
MinFPS: defaultMinFPS,
}
@ -112,8 +112,8 @@ func TestUpdate(t *testing.T) {
"OutputPath": "/outputpath",
"Outputs": "Rtmp,Rtp",
"Quantization": "30",
"RBCapacity": "100000",
"RBWriteTimeout": "50",
"PoolCapacity": "100000",
"PoolWriteTimeout": "50",
"Rotation": "180",
"RTMPURL": "rtmp://url",
"RTPAddress": "ip:port",
@ -162,8 +162,8 @@ func TestUpdate(t *testing.T) {
OutputPath: "/outputpath",
Outputs: []uint8{OutputRTMP, OutputRTP},
Quantization: 30,
RBCapacity: 100000,
RBWriteTimeout: 50,
PoolCapacity: 100000,
PoolWriteTimeout: 50,
Rotation: 180,
RTMPURL: "rtmp://url",
RTPAddress: "ip:port",

View File

@ -116,9 +116,9 @@ var params = []Param{
{N: "Outputs", BT: "[]Output", M: "Output", E: []string{"HTTP", "RTMP", "RTP", "File"}},
{N: "PSITime", BT: "time.Duration"},
{N: "Quantization", BT: "uint"},
{N: "RBCapacity", BT: "uint", Min: 1000000, Max: 100000000},
{N: "RBMaxElements", BT: "uint", Min: 0, Max: math.MaxUint32},
{N: "RBWriteTimeout", BT: "time.Duration"},
{N: "PoolCapacity", BT: "uint", Min: 1000000, Max: 100000000},
{N: "PoolMaxElements", BT: "uint", Min: 0, Max: math.MaxUint32},
{N: "PoolWriteTimeout", BT: "time.Duration"},
{N: "RTMPURL", BT: "string"},
{N: "RTPAddress", BT: "string"},
{N: "RecPeriod", BT: "float64"}, // TODO(Trek): bounds.

View File

@ -701,10 +701,10 @@ func (q *Quantization) Set(val string) error {
return nil
}
type RBCapacity uint
type PoolCapacity uint
func (r *RBCapacity) Type() string { return "uint" }
func (r *RBCapacity) Set(val string) error {
func (r *PoolCapacity) Type() string { return "uint" }
func (r *PoolCapacity) Set(val string) error {
_v, err := strconv.Atoi(val)
if err != nil {
return fmt.Errorf("could not convert set string to int: %w", err)
@ -714,14 +714,14 @@ func (r *RBCapacity) Set(val string) error {
return fmt.Errorf("invalid value %v", _v)
}
*r = RBCapacity(_v)
*r = PoolCapacity(_v)
return nil
}
type RBMaxElements uint
type PoolMaxElements uint
func (r *RBMaxElements) Type() string { return "uint" }
func (r *RBMaxElements) Set(val string) error {
func (r *PoolMaxElements) Type() string { return "uint" }
func (r *PoolMaxElements) Set(val string) error {
_v, err := strconv.Atoi(val)
if err != nil {
return fmt.Errorf("could not convert set string to int: %w", err)
@ -731,19 +731,19 @@ func (r *RBMaxElements) Set(val string) error {
return fmt.Errorf("invalid value %v", _v)
}
*r = RBMaxElements(_v)
*r = PoolMaxElements(_v)
return nil
}
type RBWriteTimeout time.Duration
type PoolWriteTimeout time.Duration
func (r *RBWriteTimeout) Type() string { return "time.Duration" }
func (r *RBWriteTimeout) Set(val string) error {
func (r *PoolWriteTimeout) Type() string { return "time.Duration" }
func (r *PoolWriteTimeout) Set(val string) error {
_v, err := strconv.Atoi(val)
if err != nil {
return fmt.Errorf("could not convert set string to int: %w", err)
}
*r = RBWriteTimeout(time.Duration(_v) * time.Second)
*r = PoolWriteTimeout(time.Duration(_v) * time.Second)
return nil
}

View File

@ -81,9 +81,9 @@ const (
KeyOutputs = "Outputs"
KeyPSITime = "PSITime"
KeyQuantization = "Quantization"
KeyRBCapacity = "RBCapacity"
KeyRBStartElementSize = "RBStartElementSize"
KeyRBWriteTimeout = "RBWriteTimeout"
KeyPoolCapacity = "PoolCapacity"
KeyPoolStartElementSize = "PoolStartElementSize"
KeyPoolWriteTimeout = "PoolWriteTimeout"
KeyRecPeriod = "RecPeriod"
KeyRotation = "Rotation"
KeyRTMPURL = "RTMPURL"
@ -127,9 +127,9 @@ const (
defaultFileFPS = 0
// Ring buffer defaults.
defaultRBCapacity = 50000000 // => 50MB
defaultRBStartElementSize = 1000 // bytes
defaultRBWriteTimeout = 5 // Seconds.
defaultPoolCapacity = 50000000 // => 50MB
defaultPoolStartElementSize = 1000 // bytes
defaultPoolWriteTimeout = 5 // Seconds.
// Motion filter parameter defaults.
defaultMinFPS = 1.0
@ -519,25 +519,27 @@ var Variables = []struct {
Update: func(c *Config, v string) { c.Quantization = parseUint(KeyQuantization, v, c) },
},
{
Name: KeyRBCapacity,
Name: KeyPoolCapacity,
Type_: typeUint,
Update: func(c *Config, v string) { c.RBCapacity = parseUint(KeyRBCapacity, v, c) },
Validate: func(c *Config) { c.RBCapacity = lessThanOrEqual(KeyRBCapacity, c.RBCapacity, 0, c, defaultRBCapacity) },
},
{
Name: KeyRBStartElementSize,
Type_: typeUint,
Update: func(c *Config, v string) { c.RBStartElementSize = parseUint("RBStartElementSize", v, c) },
Update: func(c *Config, v string) { c.PoolCapacity = parseUint(KeyPoolCapacity, v, c) },
Validate: func(c *Config) {
c.RBStartElementSize = lessThanOrEqual("RBStartElementSize", c.RBStartElementSize, 0, c, defaultRBStartElementSize)
c.PoolCapacity = lessThanOrEqual(KeyPoolCapacity, c.PoolCapacity, 0, c, defaultPoolCapacity)
},
},
{
Name: KeyRBWriteTimeout,
Name: KeyPoolStartElementSize,
Type_: typeUint,
Update: func(c *Config, v string) { c.RBWriteTimeout = parseUint(KeyRBWriteTimeout, v, c) },
Update: func(c *Config, v string) { c.PoolStartElementSize = parseUint("PoolStartElementSize", v, c) },
Validate: func(c *Config) {
c.RBWriteTimeout = lessThanOrEqual(KeyRBWriteTimeout, c.RBWriteTimeout, 0, c, defaultRBWriteTimeout)
c.PoolStartElementSize = lessThanOrEqual("PoolStartElementSize", c.PoolStartElementSize, 0, c, defaultPoolStartElementSize)
},
},
{
Name: KeyPoolWriteTimeout,
Type_: typeUint,
Update: func(c *Config, v string) { c.PoolWriteTimeout = parseUint(KeyPoolWriteTimeout, v, c) },
Validate: func(c *Config) {
c.PoolWriteTimeout = lessThanOrEqual(KeyPoolWriteTimeout, c.PoolWriteTimeout, 0, c, defaultPoolWriteTimeout)
},
},
{

View File

@ -172,8 +172,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
// Calculate no. of pool buffer elements based on starting element size
// const and config directed max pool buffer size, then create buffer.
// This is only used if the selected output uses a pool buffer.
nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize
writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second
nElements := r.cfg.PoolCapacity / r.cfg.PoolStartElementSize
writeTimeout := time.Duration(r.cfg.PoolWriteTimeout) * time.Second
// We will go through our outputs and create the corresponding senders to add
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
@ -183,7 +183,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
switch out {
case config.OutputHTTP:
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
w = newMTSSender(hs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)
@ -204,7 +204,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
mtsSenders = append(mtsSenders, w)
case config.OutputFiles:
r.cfg.Logger.Log(logger.Debug, "using Files output")
pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true)
if err != nil {
return err
@ -213,7 +213,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
mtsSenders = append(mtsSenders, w)
case config.OutputRTMP:
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger.Log, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())

View File

@ -45,7 +45,7 @@ import (
// Misc consts.
const (
rbStartingElementSize = 10000 // Bytes.
poolStartingElementSize = 10000 // Bytes.
rtmpConnectionMaxTries = 5
)

View File

@ -51,7 +51,7 @@ func TestRaspistill(t *testing.T) {
const (
timelapseInterval = "4"
timelapseDuration = "25"
rbStartElementSize = "1000000"
poolStartElementSize = "1000000"
input = "Raspistill"
codec = "JPEG"
output = "Files"
@ -91,7 +91,7 @@ func TestRaspistill(t *testing.T) {
config.KeyTimelapseInterval: timelapseInterval,
config.KeyTimelapseDuration: timelapseDuration,
config.KeyLogging: logging,
config.KeyRBStartElementSize: rbStartElementSize,
config.KeyPoolStartElementSize: poolStartElementSize,
},
)
if err != nil {

View File

@ -53,14 +53,14 @@ type Log func(level int8, message string, params ...interface{})
// Sender pool buffer read timeouts.
const (
rtmpRBReadTimeout = 1 * time.Second
mtsRBReadTimeout = 1 * time.Second
rtmpPoolReadTimeout = 1 * time.Second
mtsPoolReadTimeout = 1 * time.Second
maxBuffLen = 50000000
)
var (
adjustedRTMPRBElementSize int
adjustedMTSRBElementSize int
adjustedRTMPPoolElementSize int
adjustedMTSPoolElementSize int
)
// httpSender provides an implemntation of io.Writer to perform sends to a http
@ -246,7 +246,7 @@ func (s *mtsSender) output() {
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.pool.Next(mtsRBReadTimeout)
chunk, err = s.pool.Next(mtsPoolReadTimeout)
switch err {
case nil, io.EOF:
continue
@ -304,12 +304,12 @@ func (s *mtsSender) Write(d []byte) (int, error) {
s.pool.Flush()
}
if err != nil {
s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSRBElementSize)
s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize)
if err == pool.ErrTooLong {
adjustedMTSRBElementSize = len(s.buf) * 2
numElements := maxBuffLen / adjustedMTSRBElementSize
s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second)
s.log(logger.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize)
adjustedMTSPoolElementSize = len(s.buf) * 2
numElements := maxBuffLen / adjustedMTSPoolElementSize
s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSPoolElementSize, adjustedMTSPoolElementSize, 5*time.Second)
s.log(logger.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize)
}
}
s.buf = s.buf[:0]
@ -378,7 +378,7 @@ func (s *rtmpSender) output() {
// If chunk is nil then we're ready to get another from the pool buffer.
if chunk == nil {
var err error
chunk, err = s.pool.Next(rtmpRBReadTimeout)
chunk, err = s.pool.Next(rtmpPoolReadTimeout)
switch err {
case nil, io.EOF:
continue
@ -426,10 +426,10 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
} else {
s.log(logger.Warning, "pool buffer write error", "error", err.Error())
if err == pool.ErrTooLong {
adjustedRTMPRBElementSize = len(d) * 2
numElements := maxBuffLen / adjustedRTMPRBElementSize
s.pool = pool.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second)
s.log(logger.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize)
adjustedRTMPPoolElementSize = len(d) * 2
numElements := maxBuffLen / adjustedRTMPPoolElementSize
s.pool = pool.NewBuffer(numElements, adjustedRTMPPoolElementSize, 5*time.Second)
s.log(logger.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize)
}
}
s.report(len(d))

View File

@ -105,9 +105,9 @@ func TestMTSSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
const testRBCapacity = 50000000
nElements := testRBCapacity / rbStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0)
const testPoolCapacity = 50000000
nElements := testPoolCapacity / poolStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
@ -186,9 +186,9 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
const testRBCapacity = 50000000 // 50MB
nElements := testRBCapacity / rbStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0)
const testPoolCapacity = 50000000 // 50MB
nElements := testPoolCapacity / poolStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
@ -269,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, rbStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, poolStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))