From 75b7a2946f625bcfe9066c8eec87aef4fffbc30b Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 8 Aug 2019 13:20:02 +0930 Subject: [PATCH 1/3] revid: made MTS and RTMP ringbuffer sizes configurable via vars and added to revid config --- revid/config.go | 83 +++++++++++++++++++++++++++---------------- revid/revid.go | 57 +++++++++++++++++++++-------- revid/senders.go | 4 +-- revid/senders_test.go | 6 ++-- 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/revid/config.go b/revid/config.go index cf108db6..a995d238 100644 --- a/revid/config.go +++ b/revid/config.go @@ -90,32 +90,40 @@ const ( // Default config settings const ( - defaultInput = Raspivid - defaultOutput = HTTP - defaultFrameRate = 25 - defaultWriteRate = 25 - defaultWidth = 1280 - defaultHeight = 720 - defaultIntraRefreshPeriod = 100 - defaultTimeout = 0 - defaultQuantization = 40 - defaultBitrate = 400000 - defaultFramesPerClip = 1 - httpFramesPerClip = 560 - defaultInputCodec = codecutil.H264 - defaultVerbosity = logger.Error - defaultRtpAddr = "localhost:6970" - defaultBurstPeriod = 10 // Seconds - defaultRotation = 0 // Degrees + // General revid defaults. + defaultInput = Raspivid + defaultOutput = HTTP + defaultFrameRate = 25 + defaultWriteRate = 25 + defaultTimeout = 0 + defaultInputCodec = codecutil.H264 + defaultVerbosity = logger.Error + defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds + + // Raspivid video defaults. defaultBrightness = 50 defaultExposure = "auto" defaultAutoWhiteBalance = "auto" + defaultRotation = 0 // Degrees + defaultWidth = 1280 + defaultHeight = 720 + defaultIntraRefreshPeriod = 100 + defaultQuantization = 40 + defaultBitrate = 400000 + // Audio defaults. defaultAudioInputCodec = codecutil.ADPCM defaultSampleRate = 48000 defaultBitDepth = 16 defaultChannels = 1 defaultRecPeriod = 1.0 + + // Ringbuffer defaults. + defaultMTSRBSize = 1000 + defaultMTSRBElementSize = 100000 + defaultRTMPRBSize = 500 + defaultRTMPRBElementSize = 200000 ) // Config provides parameters relevant to a revid instance. A new config must @@ -166,10 +174,6 @@ type Config struct { // bitrate, if configurable with the chosen input. Raspivid supports quantization. Quantize bool - // FramesPerClip defines the number of packetization units to pack into a clip - // per HTTP send. - FramesPerClip uint - // RTMPURL specifies the Rtmp output destination URL. This must be defined if // RTMP is to be used as an output. RTMPURL string @@ -236,10 +240,16 @@ type Config struct { BurstPeriod uint // BurstPeriod defines the revid burst period in seconds. Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. Height uint // Height defines the input video height Raspivid input. - Width uint // Width defines the input video width Raspivid input. + Width uint // Width defines the input video width Raspivid input. Bitrate uint // Bitrate specifies the input bitrate for Raspivid input. FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input. FlipVertical bool // FlipVertial flips video vertically for Raspivid input. + + // Ring buffer sizes. + RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. + RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer. + MTSRBSize int // The number of elements in the MTS sender ringbuffer. + MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer. } // Validate checks for any errors in the config fields and defaults settings @@ -310,11 +320,7 @@ func (c *Config) Validate(r *Revid) error { // c.FramesPerClip = httpFramesPerClip break } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip case HTTP, RTP: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip default: return errors.New("bad output type defined in config") } @@ -326,11 +332,6 @@ func (c *Config) Validate(r *Revid) error { c.BurstPeriod = defaultBurstPeriod } - if c.FramesPerClip < 1 { - c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - } - if c.Rotation > 359 { c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation) c.Rotation = defaultRotation @@ -425,6 +426,26 @@ func (c *Config) Validate(r *Revid) error { return errors.New(pkg + "bad auto white balance setting in config") } + if c.RTMPRBSize <= 0 { + c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize) + c.RTMPRBSize = defaultRTMPRBSize + } + + if c.RTMPRBElementSize <= 0 { + c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize) + c.RTMPRBElementSize = defaultRTMPRBElementSize + } + + if c.MTSRBSize <= 0 { + c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize) + c.MTSRBSize = defaultMTSRBSize + } + + if c.MTSRBElementSize <= 0 { + c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize) + c.MTSRBElementSize = defaultMTSRBElementSize + } + return nil } diff --git a/revid/revid.go b/revid/revid.go index c3664e4a..7075ce8f 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -54,18 +54,6 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// mtsSender ringBuffer sizes. -const ( - mtsRBSize = 1000 - mtsRBElementSize = 100000 -) - -// rtmpSender ringBuffer sizes. -const ( - rtmpRBSize = 500 - rtmpRBElementSize = 200000 -) - // RTMP connection properties. const ( rtmpConnectionMaxTries = 5 @@ -240,7 +228,13 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. for _, out := range r.config.Outputs { switch out { case HTTP: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, mtsRBSize, mtsRBElementSize, 0) + w = newMtsSender( + newHttpSender(r.ns, r.config.Logger.Log), + r.config.Logger.Log, + r.config.MTSRBSize, + r.config.MTSRBElementSize, + 0, + ) mtsSenders = append(mtsSenders, w) case RTP: w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate) @@ -255,7 +249,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. } mtsSenders = append(mtsSenders, w) case RTMP: - w, err := newRtmpSender(r.config.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + w, err := newRtmpSender( + r.config.RTMPURL, + rtmpConnectionTimeout, + rtmpConnectionMaxTries, + r.config.RTMPRBSize, + r.config.RTMPRBElementSize, + r.config.Logger.Log, + ) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } @@ -504,6 +505,34 @@ func (r *Revid) Update(vars map[string]string) error { default: r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) } + case "RTMPRBSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value) + break + } + r.config.RTMPRBSize = v + case "RTMPRBElementSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value) + break + } + r.config.RTMPRBElementSize = v + case "MTSRBSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value) + break + } + r.config.MTSRBElementSize = v + case "MTSRBElementSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value) + break + } + r.config.MTSRBElementSize = v } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) diff --git a/revid/senders.go b/revid/senders.go index ea1f8447..7de06342 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -261,7 +261,7 @@ type rtmpSender struct { wg sync.WaitGroup } -func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -280,7 +280,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, - ring: ring.NewBuffer(rtmpRBSize, rtmpRBElementSize, 0), + ring: ring.NewBuffer(rbSize, rbElementSize, 0), done: make(chan struct{}), } s.wg.Add(1) diff --git a/revid/senders_test.go b/revid/senders_test.go index eeba6a2b..d92f19f4 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. @@ -211,7 +211,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -291,7 +291,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, mtsRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, defaultMTSRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. From 61274a18d512eeb01b746e5741b020ac15f1b9d0 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 8 Aug 2019 13:25:24 +0930 Subject: [PATCH 2/3] revid-cli: removed reference to framesPerClip which doesn't exist anymore --- cmd/revid-cli/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 0c8a6af9..ff53a176 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -122,7 +122,6 @@ func handleFlags() revid.Config { httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") - framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") heightPtr = flag.Uint("Height", 0, "Height in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels") @@ -241,7 +240,6 @@ func handleFlags() revid.Config { cfg.Rotation = *rotationPtr cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipVertical = *verticalFlipPtr - cfg.FramesPerClip = *framesPerClipPtr cfg.RTMPURL = *rtmpUrlPtr cfg.Bitrate = *bitratePtr cfg.OutputPath = *outputPathPtr From 67e50295c92b623da855dbac14800e922972c8e8 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 9 Aug 2019 15:34:29 +0930 Subject: [PATCH 3/3] revid: fixed assignment of MTSRBSize --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 7075ce8f..8da1854a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -525,7 +525,7 @@ func (r *Revid) Update(vars map[string]string) error { r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value) break } - r.config.MTSRBElementSize = v + r.config.MTSRBSize = v case "MTSRBElementSize": v, err := strconv.Atoi(value) if err != nil || v < 0 {