From 05a6b4435a17ffd9deaceaf3ded8d973002f9f8f Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 17 May 2021 11:18:44 +0930 Subject: [PATCH] revid: rename KeyRB* and RB* to refer to pool This also changes lexically significant strings in configuration text. --- exp/rvcl/config.json | 2 +- revid/config/config.go | 22 +-- revid/config/config_test.go | 40 ++--- revid/config/parameter/generate_parameters.go | 6 +- revid/config/parameter/parameters.go | 24 +-- revid/config/variables.go | 152 +++++++++--------- revid/pipeline.go | 10 +- revid/revid.go | 4 +- revid/revid_test.go | 36 ++--- revid/senders.go | 32 ++-- revid/senders_test.go | 14 +- 11 files changed, 172 insertions(+), 170 deletions(-) diff --git a/exp/rvcl/config.json b/exp/rvcl/config.json index 40b7cc25..5c711083 100644 --- a/exp/rvcl/config.json +++ b/exp/rvcl/config.json @@ -7,5 +7,5 @@ "TimelapseInterval":"10", "logging":"Debug", "Suppress":"false", - "RBStartElementSize":"1000000" + "PoolStartElementSize":"1000000" } diff --git a/revid/config/config.go b/revid/config/config.go index c6eed123..7cf9864a 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -234,17 +234,17 @@ type Config struct { // localhost:6970. MPEGT-TS packetization is used. Outputs []uint8 - PSITime uint // Sets the time between a packet being sent. - Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera. - RBCapacity uint // The number of bytes the pool buffer will occupy. - RBStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames. - RBWriteTimeout uint // The pool buffer write timeout in seconds. - RecPeriod float64 // How many seconds to record at a time. - Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. - RTMPURL string // RTMPURL specifies the Rtmp output destination URL. This must be defined if RTMP is to be used as an output. - RTPAddress string // RTPAddress defines the RTP output destination. - SampleRate uint // Samples a second (Hz). - Saturation int + PSITime uint // Sets the time between a packet being sent. + Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera. + PoolCapacity uint // The number of bytes the pool buffer will occupy. + PoolStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames. + PoolWriteTimeout uint // The pool buffer write timeout in seconds. + RecPeriod float64 // How many seconds to record at a time. + Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. + RTMPURL string // RTMPURL specifies the Rtmp output destination URL. This must be defined if RTMP is to be used as an output. + RTPAddress string // RTPAddress defines the RTP output destination. + SampleRate uint // Samples a second (Hz). + Saturation int // Sharpness is the sharpness of capture image/video from a capture device. Sharpness int diff --git a/revid/config/config_test.go b/revid/config/config_test.go index 4130f14b..4983d2bd 100644 --- a/revid/config/config_test.go +++ b/revid/config/config_test.go @@ -47,22 +47,22 @@ func TestValidate(t *testing.T) { dl := &dumbLogger{} want := Config{ - Logger: dl, - Input: defaultInput, - Outputs: []uint8{defaultOutput}, - InputCodec: defaultInputCodec, - RTPAddress: defaultRTPAddr, - CameraIP: defaultCameraIP, - BurstPeriod: defaultBurstPeriod, - MinFrames: defaultMinFrames, - FrameRate: defaultFrameRate, - ClipDuration: defaultClipDuration, - PSITime: defaultPSITime, - FileFPS: defaultFileFPS, - RBCapacity: defaultRBCapacity, - RBStartElementSize: defaultRBStartElementSize, - RBWriteTimeout: defaultRBWriteTimeout, - MinFPS: defaultMinFPS, + Logger: dl, + Input: defaultInput, + Outputs: []uint8{defaultOutput}, + InputCodec: defaultInputCodec, + RTPAddress: defaultRTPAddr, + CameraIP: defaultCameraIP, + BurstPeriod: defaultBurstPeriod, + MinFrames: defaultMinFrames, + FrameRate: defaultFrameRate, + ClipDuration: defaultClipDuration, + PSITime: defaultPSITime, + FileFPS: defaultFileFPS, + PoolCapacity: defaultPoolCapacity, + PoolStartElementSize: defaultPoolStartElementSize, + PoolWriteTimeout: defaultPoolWriteTimeout, + MinFPS: defaultMinFPS, } got := Config{Logger: dl} @@ -112,8 +112,8 @@ func TestUpdate(t *testing.T) { "OutputPath": "/outputpath", "Outputs": "Rtmp,Rtp", "Quantization": "30", - "RBCapacity": "100000", - "RBWriteTimeout": "50", + "PoolCapacity": "100000", + "PoolWriteTimeout": "50", "Rotation": "180", "RTMPURL": "rtmp://url", "RTPAddress": "ip:port", @@ -162,8 +162,8 @@ func TestUpdate(t *testing.T) { OutputPath: "/outputpath", Outputs: []uint8{OutputRTMP, OutputRTP}, Quantization: 30, - RBCapacity: 100000, - RBWriteTimeout: 50, + PoolCapacity: 100000, + PoolWriteTimeout: 50, Rotation: 180, RTMPURL: "rtmp://url", RTPAddress: "ip:port", diff --git a/revid/config/parameter/generate_parameters.go b/revid/config/parameter/generate_parameters.go index ad2cff36..f95a57a1 100644 --- a/revid/config/parameter/generate_parameters.go +++ b/revid/config/parameter/generate_parameters.go @@ -116,9 +116,9 @@ var params = []Param{ {N: "Outputs", BT: "[]Output", M: "Output", E: []string{"HTTP", "RTMP", "RTP", "File"}}, {N: "PSITime", BT: "time.Duration"}, {N: "Quantization", BT: "uint"}, - {N: "RBCapacity", BT: "uint", Min: 1000000, Max: 100000000}, - {N: "RBMaxElements", BT: "uint", Min: 0, Max: math.MaxUint32}, - {N: "RBWriteTimeout", BT: "time.Duration"}, + {N: "PoolCapacity", BT: "uint", Min: 1000000, Max: 100000000}, + {N: "PoolMaxElements", BT: "uint", Min: 0, Max: math.MaxUint32}, + {N: "PoolWriteTimeout", BT: "time.Duration"}, {N: "RTMPURL", BT: "string"}, {N: "RTPAddress", BT: "string"}, {N: "RecPeriod", BT: "float64"}, // TODO(Trek): bounds. diff --git a/revid/config/parameter/parameters.go b/revid/config/parameter/parameters.go index bd127ed7..7ab1e9dd 100644 --- a/revid/config/parameter/parameters.go +++ b/revid/config/parameter/parameters.go @@ -701,10 +701,10 @@ func (q *Quantization) Set(val string) error { return nil } -type RBCapacity uint +type PoolCapacity uint -func (r *RBCapacity) Type() string { return "uint" } -func (r *RBCapacity) Set(val string) error { +func (r *PoolCapacity) Type() string { return "uint" } +func (r *PoolCapacity) Set(val string) error { _v, err := strconv.Atoi(val) if err != nil { return fmt.Errorf("could not convert set string to int: %w", err) @@ -714,14 +714,14 @@ func (r *RBCapacity) Set(val string) error { return fmt.Errorf("invalid value %v", _v) } - *r = RBCapacity(_v) + *r = PoolCapacity(_v) return nil } -type RBMaxElements uint +type PoolMaxElements uint -func (r *RBMaxElements) Type() string { return "uint" } -func (r *RBMaxElements) Set(val string) error { +func (r *PoolMaxElements) Type() string { return "uint" } +func (r *PoolMaxElements) Set(val string) error { _v, err := strconv.Atoi(val) if err != nil { return fmt.Errorf("could not convert set string to int: %w", err) @@ -731,19 +731,19 @@ func (r *RBMaxElements) Set(val string) error { return fmt.Errorf("invalid value %v", _v) } - *r = RBMaxElements(_v) + *r = PoolMaxElements(_v) return nil } -type RBWriteTimeout time.Duration +type PoolWriteTimeout time.Duration -func (r *RBWriteTimeout) Type() string { return "time.Duration" } -func (r *RBWriteTimeout) Set(val string) error { +func (r *PoolWriteTimeout) Type() string { return "time.Duration" } +func (r *PoolWriteTimeout) Set(val string) error { _v, err := strconv.Atoi(val) if err != nil { return fmt.Errorf("could not convert set string to int: %w", err) } - *r = RBWriteTimeout(time.Duration(_v) * time.Second) + *r = PoolWriteTimeout(time.Duration(_v) * time.Second) return nil } diff --git a/revid/config/variables.go b/revid/config/variables.go index e5e87dcf..79b74d89 100644 --- a/revid/config/variables.go +++ b/revid/config/variables.go @@ -39,66 +39,66 @@ import ( // Config map Keys. const ( - KeyAutoWhiteBalance = "AutoWhiteBalance" - KeyAWBGains = "AWBGains" - KeyBitDepth = "BitDepth" - KeyBitrate = "Bitrate" - KeyBrightness = "Brightness" - KeyBurstPeriod = "BurstPeriod" - KeyCameraChan = "CameraChan" - KeyCameraIP = "CameraIP" - KeyCBR = "CBR" - KeyClipDuration = "ClipDuration" - KeyChannels = "Channels" - KeyContrast = "Contrast" - KeyExposure = "Exposure" - KeyEV = "EV" - KeyFileFPS = "FileFPS" - KeyFilters = "Filters" - KeyFrameRate = "FrameRate" - KeyHeight = "Height" - KeyHorizontalFlip = "HorizontalFlip" - KeyHTTPAddress = "HTTPAddress" - KeyInput = "Input" - KeyInputCodec = "InputCodec" - KeyInputPath = "InputPath" - KeyISO = "ISO" - KeyLogging = "logging" - KeyLoop = "Loop" - KeyMinFPS = "MinFPS" - KeyMinFrames = "MinFrames" - KeyMode = "mode" - KeyMotionDownscaling = "MotionDownscaling" - KeyMotionHistory = "MotionHistory" - KeyMotionInterval = "MotionInterval" - KeyMotionKernel = "MotionKernel" - KeyMotionMinArea = "MotionMinArea" - KeyMotionPadding = "MotionPadding" - KeyMotionPixels = "MotionPixels" - KeyMotionThreshold = "MotionThreshold" - KeyOutput = "Output" - KeyOutputPath = "OutputPath" - KeyOutputs = "Outputs" - KeyPSITime = "PSITime" - KeyQuantization = "Quantization" - KeyRBCapacity = "RBCapacity" - KeyRBStartElementSize = "RBStartElementSize" - KeyRBWriteTimeout = "RBWriteTimeout" - KeyRecPeriod = "RecPeriod" - KeyRotation = "Rotation" - KeyRTMPURL = "RTMPURL" - KeyRTPAddress = "RTPAddress" - KeySampleRate = "SampleRate" - KeySaturation = "Saturation" - KeySharpness = "Sharpness" - KeyJPEGQuality = "JPEGQuality" - KeySuppress = "Suppress" - KeyTimelapseDuration = "TimelapseDuration" - KeyTimelapseInterval = "TimelapseInterval" - KeyVBRBitrate = "VBRBitrate" - KeyVBRQuality = "VBRQuality" - KeyVerticalFlip = "VerticalFlip" - KeyWidth = "Width" + KeyAutoWhiteBalance = "AutoWhiteBalance" + KeyAWBGains = "AWBGains" + KeyBitDepth = "BitDepth" + KeyBitrate = "Bitrate" + KeyBrightness = "Brightness" + KeyBurstPeriod = "BurstPeriod" + KeyCameraChan = "CameraChan" + KeyCameraIP = "CameraIP" + KeyCBR = "CBR" + KeyClipDuration = "ClipDuration" + KeyChannels = "Channels" + KeyContrast = "Contrast" + KeyExposure = "Exposure" + KeyEV = "EV" + KeyFileFPS = "FileFPS" + KeyFilters = "Filters" + KeyFrameRate = "FrameRate" + KeyHeight = "Height" + KeyHorizontalFlip = "HorizontalFlip" + KeyHTTPAddress = "HTTPAddress" + KeyInput = "Input" + KeyInputCodec = "InputCodec" + KeyInputPath = "InputPath" + KeyISO = "ISO" + KeyLogging = "logging" + KeyLoop = "Loop" + KeyMinFPS = "MinFPS" + KeyMinFrames = "MinFrames" + KeyMode = "mode" + KeyMotionDownscaling = "MotionDownscaling" + KeyMotionHistory = "MotionHistory" + KeyMotionInterval = "MotionInterval" + KeyMotionKernel = "MotionKernel" + KeyMotionMinArea = "MotionMinArea" + KeyMotionPadding = "MotionPadding" + KeyMotionPixels = "MotionPixels" + KeyMotionThreshold = "MotionThreshold" + KeyOutput = "Output" + KeyOutputPath = "OutputPath" + KeyOutputs = "Outputs" + KeyPSITime = "PSITime" + KeyQuantization = "Quantization" + KeyPoolCapacity = "PoolCapacity" + KeyPoolStartElementSize = "PoolStartElementSize" + KeyPoolWriteTimeout = "PoolWriteTimeout" + KeyRecPeriod = "RecPeriod" + KeyRotation = "Rotation" + KeyRTMPURL = "RTMPURL" + KeyRTPAddress = "RTPAddress" + KeySampleRate = "SampleRate" + KeySaturation = "Saturation" + KeySharpness = "Sharpness" + KeyJPEGQuality = "JPEGQuality" + KeySuppress = "Suppress" + KeyTimelapseDuration = "TimelapseDuration" + KeyTimelapseInterval = "TimelapseInterval" + KeyVBRBitrate = "VBRBitrate" + KeyVBRQuality = "VBRQuality" + KeyVerticalFlip = "VerticalFlip" + KeyWidth = "Width" ) // Config map parameter types. @@ -127,9 +127,9 @@ const ( defaultFileFPS = 0 // Ring buffer defaults. - defaultRBCapacity = 50000000 // => 50MB - defaultRBStartElementSize = 1000 // bytes - defaultRBWriteTimeout = 5 // Seconds. + defaultPoolCapacity = 50000000 // => 50MB + defaultPoolStartElementSize = 1000 // bytes + defaultPoolWriteTimeout = 5 // Seconds. // Motion filter parameter defaults. defaultMinFPS = 1.0 @@ -519,25 +519,27 @@ var Variables = []struct { Update: func(c *Config, v string) { c.Quantization = parseUint(KeyQuantization, v, c) }, }, { - Name: KeyRBCapacity, - Type_: typeUint, - Update: func(c *Config, v string) { c.RBCapacity = parseUint(KeyRBCapacity, v, c) }, - Validate: func(c *Config) { c.RBCapacity = lessThanOrEqual(KeyRBCapacity, c.RBCapacity, 0, c, defaultRBCapacity) }, - }, - { - Name: KeyRBStartElementSize, + Name: KeyPoolCapacity, Type_: typeUint, - Update: func(c *Config, v string) { c.RBStartElementSize = parseUint("RBStartElementSize", v, c) }, + Update: func(c *Config, v string) { c.PoolCapacity = parseUint(KeyPoolCapacity, v, c) }, Validate: func(c *Config) { - c.RBStartElementSize = lessThanOrEqual("RBStartElementSize", c.RBStartElementSize, 0, c, defaultRBStartElementSize) + c.PoolCapacity = lessThanOrEqual(KeyPoolCapacity, c.PoolCapacity, 0, c, defaultPoolCapacity) }, }, { - Name: KeyRBWriteTimeout, + Name: KeyPoolStartElementSize, Type_: typeUint, - Update: func(c *Config, v string) { c.RBWriteTimeout = parseUint(KeyRBWriteTimeout, v, c) }, + Update: func(c *Config, v string) { c.PoolStartElementSize = parseUint("PoolStartElementSize", v, c) }, Validate: func(c *Config) { - c.RBWriteTimeout = lessThanOrEqual(KeyRBWriteTimeout, c.RBWriteTimeout, 0, c, defaultRBWriteTimeout) + c.PoolStartElementSize = lessThanOrEqual("PoolStartElementSize", c.PoolStartElementSize, 0, c, defaultPoolStartElementSize) + }, + }, + { + Name: KeyPoolWriteTimeout, + Type_: typeUint, + Update: func(c *Config, v string) { c.PoolWriteTimeout = parseUint(KeyPoolWriteTimeout, v, c) }, + Validate: func(c *Config) { + c.PoolWriteTimeout = lessThanOrEqual(KeyPoolWriteTimeout, c.PoolWriteTimeout, 0, c, defaultPoolWriteTimeout) }, }, { diff --git a/revid/pipeline.go b/revid/pipeline.go index 5618a402..11fee2fa 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -172,8 +172,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. // Calculate no. of pool buffer elements based on starting element size // const and config directed max pool buffer size, then create buffer. // This is only used if the selected output uses a pool buffer. - nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize - writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second + nElements := r.cfg.PoolCapacity / r.cfg.PoolStartElementSize + writeTimeout := time.Duration(r.cfg.PoolWriteTimeout) * time.Second // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the @@ -183,7 +183,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch out { case config.OutputHTTP: r.cfg.Logger.Log(logger.Debug, "using HTTP output") - pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) w = newMTSSender(hs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) @@ -204,7 +204,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputFiles: r.cfg.Logger.Log(logger.Debug, "using Files output") - pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true) if err != nil { return err @@ -213,7 +213,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Log(logger.Debug, "using RTMP output") - pb := pool.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger.Log, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) diff --git a/revid/revid.go b/revid/revid.go index f6e0cb0a..1516b6ea 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -45,8 +45,8 @@ import ( // Misc consts. const ( - rbStartingElementSize = 10000 // Bytes. - rtmpConnectionMaxTries = 5 + poolStartingElementSize = 10000 // Bytes. + rtmpConnectionMaxTries = 5 ) type Logger interface { diff --git a/revid/revid_test.go b/revid/revid_test.go index 194e27c8..01d2b760 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -49,16 +49,16 @@ func TestRaspistill(t *testing.T) { // Configuration parameters. const ( - timelapseInterval = "4" - timelapseDuration = "25" - rbStartElementSize = "1000000" - input = "Raspistill" - codec = "JPEG" - output = "Files" - outDir = "out" - outputPath = outDir + "/" - logging = "Debug" - testImgDir = "../../../test/test-data/av/input/jpeg/" + timelapseInterval = "4" + timelapseDuration = "25" + poolStartElementSize = "1000000" + input = "Raspistill" + codec = "JPEG" + output = "Files" + outDir = "out" + outputPath = outDir + "/" + logging = "Debug" + testImgDir = "../../../test/test-data/av/input/jpeg/" ) const runTime = 40 * time.Second @@ -84,14 +84,14 @@ func TestRaspistill(t *testing.T) { err = rv.Update( map[string]string{ - config.KeyInput: input, - config.KeyInputCodec: codec, - config.KeyOutput: output, - config.KeyOutputPath: outputPath, - config.KeyTimelapseInterval: timelapseInterval, - config.KeyTimelapseDuration: timelapseDuration, - config.KeyLogging: logging, - config.KeyRBStartElementSize: rbStartElementSize, + config.KeyInput: input, + config.KeyInputCodec: codec, + config.KeyOutput: output, + config.KeyOutputPath: outputPath, + config.KeyTimelapseInterval: timelapseInterval, + config.KeyTimelapseDuration: timelapseDuration, + config.KeyLogging: logging, + config.KeyPoolStartElementSize: poolStartElementSize, }, ) if err != nil { diff --git a/revid/senders.go b/revid/senders.go index 4a227d22..92fd6d79 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -53,14 +53,14 @@ type Log func(level int8, message string, params ...interface{}) // Sender pool buffer read timeouts. const ( - rtmpRBReadTimeout = 1 * time.Second - mtsRBReadTimeout = 1 * time.Second - maxBuffLen = 50000000 + rtmpPoolReadTimeout = 1 * time.Second + mtsPoolReadTimeout = 1 * time.Second + maxBuffLen = 50000000 ) var ( - adjustedRTMPRBElementSize int - adjustedMTSRBElementSize int + adjustedRTMPPoolElementSize int + adjustedMTSPoolElementSize int ) // httpSender provides an implemntation of io.Writer to perform sends to a http @@ -246,7 +246,7 @@ func (s *mtsSender) output() { // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.pool.Next(mtsRBReadTimeout) + chunk, err = s.pool.Next(mtsPoolReadTimeout) switch err { case nil, io.EOF: continue @@ -304,12 +304,12 @@ func (s *mtsSender) Write(d []byte) (int, error) { s.pool.Flush() } if err != nil { - s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSRBElementSize) + s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize) if err == pool.ErrTooLong { - adjustedMTSRBElementSize = len(s.buf) * 2 - numElements := maxBuffLen / adjustedMTSRBElementSize - s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second) - s.log(logger.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize) + adjustedMTSPoolElementSize = len(s.buf) * 2 + numElements := maxBuffLen / adjustedMTSPoolElementSize + s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSPoolElementSize, adjustedMTSPoolElementSize, 5*time.Second) + s.log(logger.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize) } } s.buf = s.buf[:0] @@ -378,7 +378,7 @@ func (s *rtmpSender) output() { // If chunk is nil then we're ready to get another from the pool buffer. if chunk == nil { var err error - chunk, err = s.pool.Next(rtmpRBReadTimeout) + chunk, err = s.pool.Next(rtmpPoolReadTimeout) switch err { case nil, io.EOF: continue @@ -426,10 +426,10 @@ func (s *rtmpSender) Write(d []byte) (int, error) { } else { s.log(logger.Warning, "pool buffer write error", "error", err.Error()) if err == pool.ErrTooLong { - adjustedRTMPRBElementSize = len(d) * 2 - numElements := maxBuffLen / adjustedRTMPRBElementSize - s.pool = pool.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second) - s.log(logger.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize) + adjustedRTMPPoolElementSize = len(d) * 2 + numElements := maxBuffLen / adjustedRTMPPoolElementSize + s.pool = pool.NewBuffer(numElements, adjustedRTMPPoolElementSize, 5*time.Second) + s.log(logger.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize) } } s.report(len(d)) diff --git a/revid/senders_test.go b/revid/senders_test.go index e50ecaf9..357d6188 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -105,9 +105,9 @@ func TestMTSSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - const testRBCapacity = 50000000 - nElements := testRBCapacity / rbStartingElementSize - sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0) + const testPoolCapacity = 50000000 + nElements := testPoolCapacity / poolStartingElementSize + sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) @@ -186,9 +186,9 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - const testRBCapacity = 50000000 // 50MB - nElements := testRBCapacity / rbStartingElementSize - sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, rbStartingElementSize, 0), 0) + const testPoolCapacity = 50000000 // 50MB + nElements := testPoolCapacity / poolStartingElementSize + sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) @@ -269,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, rbStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, poolStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))