diff --git a/revid/config.go b/revid/config.go index cf108db6..a995d238 100644 --- a/revid/config.go +++ b/revid/config.go @@ -90,32 +90,40 @@ const ( // Default config settings const ( - defaultInput = Raspivid - defaultOutput = HTTP - defaultFrameRate = 25 - defaultWriteRate = 25 - defaultWidth = 1280 - defaultHeight = 720 - defaultIntraRefreshPeriod = 100 - defaultTimeout = 0 - defaultQuantization = 40 - defaultBitrate = 400000 - defaultFramesPerClip = 1 - httpFramesPerClip = 560 - defaultInputCodec = codecutil.H264 - defaultVerbosity = logger.Error - defaultRtpAddr = "localhost:6970" - defaultBurstPeriod = 10 // Seconds - defaultRotation = 0 // Degrees + // General revid defaults. + defaultInput = Raspivid + defaultOutput = HTTP + defaultFrameRate = 25 + defaultWriteRate = 25 + defaultTimeout = 0 + defaultInputCodec = codecutil.H264 + defaultVerbosity = logger.Error + defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds + + // Raspivid video defaults. defaultBrightness = 50 defaultExposure = "auto" defaultAutoWhiteBalance = "auto" + defaultRotation = 0 // Degrees + defaultWidth = 1280 + defaultHeight = 720 + defaultIntraRefreshPeriod = 100 + defaultQuantization = 40 + defaultBitrate = 400000 + // Audio defaults. defaultAudioInputCodec = codecutil.ADPCM defaultSampleRate = 48000 defaultBitDepth = 16 defaultChannels = 1 defaultRecPeriod = 1.0 + + // Ringbuffer defaults. + defaultMTSRBSize = 1000 + defaultMTSRBElementSize = 100000 + defaultRTMPRBSize = 500 + defaultRTMPRBElementSize = 200000 ) // Config provides parameters relevant to a revid instance. A new config must @@ -166,10 +174,6 @@ type Config struct { // bitrate, if configurable with the chosen input. Raspivid supports quantization. Quantize bool - // FramesPerClip defines the number of packetization units to pack into a clip - // per HTTP send. - FramesPerClip uint - // RTMPURL specifies the Rtmp output destination URL. This must be defined if // RTMP is to be used as an output. RTMPURL string @@ -236,10 +240,16 @@ type Config struct { BurstPeriod uint // BurstPeriod defines the revid burst period in seconds. Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. Height uint // Height defines the input video height Raspivid input. - Width uint // Width defines the input video width Raspivid input. + Width uint // Width defines the input video width Raspivid input. Bitrate uint // Bitrate specifies the input bitrate for Raspivid input. FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input. FlipVertical bool // FlipVertial flips video vertically for Raspivid input. + + // Ring buffer sizes. + RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. + RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer. + MTSRBSize int // The number of elements in the MTS sender ringbuffer. + MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer. } // Validate checks for any errors in the config fields and defaults settings @@ -310,11 +320,7 @@ func (c *Config) Validate(r *Revid) error { // c.FramesPerClip = httpFramesPerClip break } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip case HTTP, RTP: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip default: return errors.New("bad output type defined in config") } @@ -326,11 +332,6 @@ func (c *Config) Validate(r *Revid) error { c.BurstPeriod = defaultBurstPeriod } - if c.FramesPerClip < 1 { - c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - } - if c.Rotation > 359 { c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation) c.Rotation = defaultRotation @@ -425,6 +426,26 @@ func (c *Config) Validate(r *Revid) error { return errors.New(pkg + "bad auto white balance setting in config") } + if c.RTMPRBSize <= 0 { + c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize) + c.RTMPRBSize = defaultRTMPRBSize + } + + if c.RTMPRBElementSize <= 0 { + c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize) + c.RTMPRBElementSize = defaultRTMPRBElementSize + } + + if c.MTSRBSize <= 0 { + c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize) + c.MTSRBSize = defaultMTSRBSize + } + + if c.MTSRBElementSize <= 0 { + c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize) + c.MTSRBElementSize = defaultMTSRBElementSize + } + return nil } diff --git a/revid/revid.go b/revid/revid.go index c3664e4a..7075ce8f 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -54,18 +54,6 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// mtsSender ringBuffer sizes. -const ( - mtsRBSize = 1000 - mtsRBElementSize = 100000 -) - -// rtmpSender ringBuffer sizes. -const ( - rtmpRBSize = 500 - rtmpRBElementSize = 200000 -) - // RTMP connection properties. const ( rtmpConnectionMaxTries = 5 @@ -240,7 +228,13 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. for _, out := range r.config.Outputs { switch out { case HTTP: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, mtsRBSize, mtsRBElementSize, 0) + w = newMtsSender( + newHttpSender(r.ns, r.config.Logger.Log), + r.config.Logger.Log, + r.config.MTSRBSize, + r.config.MTSRBElementSize, + 0, + ) mtsSenders = append(mtsSenders, w) case RTP: w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate) @@ -255,7 +249,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. } mtsSenders = append(mtsSenders, w) case RTMP: - w, err := newRtmpSender(r.config.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + w, err := newRtmpSender( + r.config.RTMPURL, + rtmpConnectionTimeout, + rtmpConnectionMaxTries, + r.config.RTMPRBSize, + r.config.RTMPRBElementSize, + r.config.Logger.Log, + ) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } @@ -504,6 +505,34 @@ func (r *Revid) Update(vars map[string]string) error { default: r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) } + case "RTMPRBSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value) + break + } + r.config.RTMPRBSize = v + case "RTMPRBElementSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value) + break + } + r.config.RTMPRBElementSize = v + case "MTSRBSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value) + break + } + r.config.MTSRBElementSize = v + case "MTSRBElementSize": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value) + break + } + r.config.MTSRBElementSize = v } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) diff --git a/revid/senders.go b/revid/senders.go index ea1f8447..7de06342 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -261,7 +261,7 @@ type rtmpSender struct { wg sync.WaitGroup } -func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -280,7 +280,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, - ring: ring.NewBuffer(rtmpRBSize, rtmpRBElementSize, 0), + ring: ring.NewBuffer(rbSize, rbElementSize, 0), done: make(chan struct{}), } s.wg.Add(1) diff --git a/revid/senders_test.go b/revid/senders_test.go index eeba6a2b..d92f19f4 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. @@ -211,7 +211,7 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -291,7 +291,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, mtsRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, defaultMTSRBElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder.