Merged in rb-vars (pull request #230)

revid: ring buffer sizes configurable via vars

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2019-08-09 06:06:20 +00:00
commit 5bd2528bb0
5 changed files with 100 additions and 52 deletions

View File

@ -122,7 +122,6 @@ func handleFlags() revid.Config {
httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts")
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
heightPtr = flag.Uint("Height", 0, "Height in pixels") heightPtr = flag.Uint("Height", 0, "Height in pixels")
widthPtr = flag.Uint("Width", 0, "Width in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels")
@ -241,7 +240,6 @@ func handleFlags() revid.Config {
cfg.Rotation = *rotationPtr cfg.Rotation = *rotationPtr
cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipHorizontal = *horizontalFlipPtr
cfg.FlipVertical = *verticalFlipPtr cfg.FlipVertical = *verticalFlipPtr
cfg.FramesPerClip = *framesPerClipPtr
cfg.RTMPURL = *rtmpUrlPtr cfg.RTMPURL = *rtmpUrlPtr
cfg.Bitrate = *bitratePtr cfg.Bitrate = *bitratePtr
cfg.OutputPath = *outputPathPtr cfg.OutputPath = *outputPathPtr

View File

@ -90,32 +90,40 @@ const (
// Default config settings // Default config settings
const ( const (
// General revid defaults.
defaultInput = Raspivid defaultInput = Raspivid
defaultOutput = HTTP defaultOutput = HTTP
defaultFrameRate = 25 defaultFrameRate = 25
defaultWriteRate = 25 defaultWriteRate = 25
defaultWidth = 1280
defaultHeight = 720
defaultIntraRefreshPeriod = 100
defaultTimeout = 0 defaultTimeout = 0
defaultQuantization = 40
defaultBitrate = 400000
defaultFramesPerClip = 1
httpFramesPerClip = 560
defaultInputCodec = codecutil.H264 defaultInputCodec = codecutil.H264
defaultVerbosity = logger.Error defaultVerbosity = logger.Error
defaultRtpAddr = "localhost:6970" defaultRtpAddr = "localhost:6970"
defaultBurstPeriod = 10 // Seconds defaultBurstPeriod = 10 // Seconds
defaultRotation = 0 // Degrees
// Raspivid video defaults.
defaultBrightness = 50 defaultBrightness = 50
defaultExposure = "auto" defaultExposure = "auto"
defaultAutoWhiteBalance = "auto" defaultAutoWhiteBalance = "auto"
defaultRotation = 0 // Degrees
defaultWidth = 1280
defaultHeight = 720
defaultIntraRefreshPeriod = 100
defaultQuantization = 40
defaultBitrate = 400000
// Audio defaults.
defaultAudioInputCodec = codecutil.ADPCM defaultAudioInputCodec = codecutil.ADPCM
defaultSampleRate = 48000 defaultSampleRate = 48000
defaultBitDepth = 16 defaultBitDepth = 16
defaultChannels = 1 defaultChannels = 1
defaultRecPeriod = 1.0 defaultRecPeriod = 1.0
// Ringbuffer defaults.
defaultMTSRBSize = 1000
defaultMTSRBElementSize = 100000
defaultRTMPRBSize = 500
defaultRTMPRBElementSize = 200000
) )
// Config provides parameters relevant to a revid instance. A new config must // Config provides parameters relevant to a revid instance. A new config must
@ -166,10 +174,6 @@ type Config struct {
// bitrate, if configurable with the chosen input. Raspivid supports quantization. // bitrate, if configurable with the chosen input. Raspivid supports quantization.
Quantize bool Quantize bool
// FramesPerClip defines the number of packetization units to pack into a clip
// per HTTP send.
FramesPerClip uint
// RTMPURL specifies the Rtmp output destination URL. This must be defined if // RTMPURL specifies the Rtmp output destination URL. This must be defined if
// RTMP is to be used as an output. // RTMP is to be used as an output.
RTMPURL string RTMPURL string
@ -240,6 +244,12 @@ type Config struct {
Bitrate uint // Bitrate specifies the input bitrate for Raspivid input. Bitrate uint // Bitrate specifies the input bitrate for Raspivid input.
FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input. FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input.
FlipVertical bool // FlipVertial flips video vertically for Raspivid input. FlipVertical bool // FlipVertial flips video vertically for Raspivid input.
// Ring buffer sizes.
RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
MTSRBSize int // The number of elements in the MTS sender ringbuffer.
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
} }
// Validate checks for any errors in the config fields and defaults settings // Validate checks for any errors in the config fields and defaults settings
@ -310,11 +320,7 @@ func (c *Config) Validate(r *Revid) error {
// c.FramesPerClip = httpFramesPerClip // c.FramesPerClip = httpFramesPerClip
break break
} }
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
case HTTP, RTP: case HTTP, RTP:
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip
default: default:
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }
@ -326,11 +332,6 @@ func (c *Config) Validate(r *Revid) error {
c.BurstPeriod = defaultBurstPeriod c.BurstPeriod = defaultBurstPeriod
} }
if c.FramesPerClip < 1 {
c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
}
if c.Rotation > 359 { if c.Rotation > 359 {
c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation) c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation)
c.Rotation = defaultRotation c.Rotation = defaultRotation
@ -425,6 +426,26 @@ func (c *Config) Validate(r *Revid) error {
return errors.New(pkg + "bad auto white balance setting in config") return errors.New(pkg + "bad auto white balance setting in config")
} }
if c.RTMPRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize)
c.RTMPRBSize = defaultRTMPRBSize
}
if c.RTMPRBElementSize <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize)
c.RTMPRBElementSize = defaultRTMPRBElementSize
}
if c.MTSRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
c.MTSRBSize = defaultMTSRBSize
}
if c.MTSRBElementSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize)
c.MTSRBElementSize = defaultMTSRBElementSize
}
return nil return nil
} }

View File

@ -53,18 +53,6 @@ import (
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
// mtsSender ringBuffer sizes.
const (
mtsRBSize = 1000
mtsRBElementSize = 100000
)
// rtmpSender ringBuffer sizes.
const (
rtmpRBSize = 500
rtmpRBElementSize = 200000
)
// RTMP connection properties. // RTMP connection properties.
const ( const (
rtmpConnectionMaxTries = 5 rtmpConnectionMaxTries = 5
@ -239,7 +227,13 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case HTTP: case HTTP:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, mtsRBSize, mtsRBElementSize, 0) w = newMtsSender(
newHttpSender(r.ns, r.config.Logger.Log),
r.config.Logger.Log,
r.config.MTSRBSize,
r.config.MTSRBElementSize,
0,
)
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case RTP: case RTP:
w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate) w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate)
@ -254,7 +248,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
} }
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case RTMP: case RTMP:
w, err := newRtmpSender(r.config.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) w, err := newRtmpSender(
r.config.RTMPURL,
rtmpConnectionTimeout,
rtmpConnectionMaxTries,
r.config.RTMPRBSize,
r.config.RTMPRBElementSize,
r.config.Logger.Log,
)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
} }
@ -503,6 +504,34 @@ func (r *Revid) Update(vars map[string]string) error {
default: default:
r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value)
} }
case "RTMPRBSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value)
break
}
r.config.RTMPRBSize = v
case "RTMPRBElementSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value)
break
}
r.config.RTMPRBElementSize = v
case "MTSRBSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value)
break
}
r.config.MTSRBSize = v
case "MTSRBElementSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value)
break
}
r.config.MTSRBElementSize = v
} }
} }
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))

View File

@ -261,7 +261,7 @@ type rtmpSender struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn var conn *rtmp.Conn
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
@ -280,7 +280,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
ring: ring.NewBuffer(rtmpRBSize, rtmpRBElementSize, 0), ring: ring.NewBuffer(rbSize, rbElementSize, 0),
done: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)

View File

@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
@ -211,7 +211,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, mtsRBSize, mtsRBElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder and send PSI every 10 packets. // Turn time based PSI writing off for encoder and send PSI every 10 packets.
@ -291,7 +291,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, mtsRBElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, defaultMTSRBElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.