revid: made MTS and RTMP ringbuffer sizes configurable via vars and added to revid config

This commit is contained in:
Saxon 2019-08-08 13:20:02 +09:30
parent 662462a2ae
commit 75b7a2946f
4 changed files with 100 additions and 50 deletions

View File

@ -90,32 +90,40 @@ const (
// Default config settings
const (
// General revid defaults.
defaultInput = Raspivid
defaultOutput = HTTP
defaultFrameRate = 25
defaultWriteRate = 25
defaultWidth = 1280
defaultHeight = 720
defaultIntraRefreshPeriod = 100
defaultTimeout = 0
defaultQuantization = 40
defaultBitrate = 400000
defaultFramesPerClip = 1
httpFramesPerClip = 560
defaultInputCodec = codecutil.H264
defaultVerbosity = logger.Error
defaultRtpAddr = "localhost:6970"
defaultBurstPeriod = 10 // Seconds
defaultRotation = 0 // Degrees
// Raspivid video defaults.
defaultBrightness = 50
defaultExposure = "auto"
defaultAutoWhiteBalance = "auto"
defaultRotation = 0 // Degrees
defaultWidth = 1280
defaultHeight = 720
defaultIntraRefreshPeriod = 100
defaultQuantization = 40
defaultBitrate = 400000
// Audio defaults.
defaultAudioInputCodec = codecutil.ADPCM
defaultSampleRate = 48000
defaultBitDepth = 16
defaultChannels = 1
defaultRecPeriod = 1.0
// Ringbuffer defaults.
defaultMTSRBSize = 1000
defaultMTSRBElementSize = 100000
defaultRTMPRBSize = 500
defaultRTMPRBElementSize = 200000
)
// Config provides parameters relevant to a revid instance. A new config must
@ -166,10 +174,6 @@ type Config struct {
// bitrate, if configurable with the chosen input. Raspivid supports quantization.
Quantize bool
// FramesPerClip defines the number of packetization units to pack into a clip
// per HTTP send.
FramesPerClip uint
// RTMPURL specifies the Rtmp output destination URL. This must be defined if
// RTMP is to be used as an output.
RTMPURL string
@ -240,6 +244,12 @@ type Config struct {
Bitrate uint // Bitrate specifies the input bitrate for Raspivid input.
FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input.
FlipVertical bool // FlipVertial flips video vertically for Raspivid input.
// Ring buffer sizes.
RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
MTSRBSize int // The number of elements in the MTS sender ringbuffer.
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
}
// Validate checks for any errors in the config fields and defaults settings
@ -310,11 +320,7 @@ func (c *Config) Validate(r *Revid) error {
// c.FramesPerClip = httpFramesPerClip
break
}
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
case HTTP, RTP:
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip
default:
return errors.New("bad output type defined in config")
}
@ -326,11 +332,6 @@ func (c *Config) Validate(r *Revid) error {
c.BurstPeriod = defaultBurstPeriod
}
if c.FramesPerClip < 1 {
c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
}
if c.Rotation > 359 {
c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation)
c.Rotation = defaultRotation
@ -425,6 +426,26 @@ func (c *Config) Validate(r *Revid) error {
return errors.New(pkg + "bad auto white balance setting in config")
}
if c.RTMPRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize)
c.RTMPRBSize = defaultRTMPRBSize
}
if c.RTMPRBElementSize <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize)
c.RTMPRBElementSize = defaultRTMPRBElementSize
}
if c.MTSRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
c.MTSRBSize = defaultMTSRBSize
}
if c.MTSRBElementSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize)
c.MTSRBElementSize = defaultMTSRBElementSize
}
return nil
}

View File

@ -54,18 +54,6 @@ import (
"bitbucket.org/ausocean/utils/logger"
)
// mtsSender ringBuffer sizes.
const (
mtsRBSize = 1000
mtsRBElementSize = 100000
)
// rtmpSender ringBuffer sizes.
const (
rtmpRBSize = 500
rtmpRBElementSize = 200000
)
// RTMP connection properties.
const (
rtmpConnectionMaxTries = 5
@ -240,7 +228,13 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
for _, out := range r.config.Outputs {
switch out {
case HTTP:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, mtsRBSize, mtsRBElementSize, 0)
w = newMtsSender(
newHttpSender(r.ns, r.config.Logger.Log),
r.config.Logger.Log,
r.config.MTSRBSize,
r.config.MTSRBElementSize,
0,
)
mtsSenders = append(mtsSenders, w)
case RTP:
w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate)
@ -255,7 +249,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
}
mtsSenders = append(mtsSenders, w)
case RTMP:
w, err := newRtmpSender(r.config.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
w, err := newRtmpSender(
r.config.RTMPURL,
rtmpConnectionTimeout,
rtmpConnectionMaxTries,
r.config.RTMPRBSize,
r.config.RTMPRBElementSize,
r.config.Logger.Log,
)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
}
@ -504,6 +505,34 @@ func (r *Revid) Update(vars map[string]string) error {
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value)
}
case "RTMPRBSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value)
break
}
r.config.RTMPRBSize = v
case "RTMPRBElementSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value)
break
}
r.config.RTMPRBElementSize = v
case "MTSRBSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value)
break
}
r.config.MTSRBElementSize = v
case "MTSRBElementSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value)
break
}
r.config.MTSRBElementSize = v
}
}
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))

View File

@ -261,7 +261,7 @@ type rtmpSender struct {
wg sync.WaitGroup
}
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
@ -280,7 +280,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout,
retries: retries,
log: log,
ring: ring.NewBuffer(rtmpRBSize, rtmpRBElementSize, 0),
ring: ring.NewBuffer(rbSize, rbElementSize, 0),
done: make(chan struct{}),
}
s.wg.Add(1)

View File

@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0)
sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder.
@ -211,7 +211,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0)
sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
@ -291,7 +291,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, mtsRBElementSize, 0)
sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, defaultMTSRBElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder.