mirror of https://bitbucket.org/ausocean/av.git
Merged in sort-out-rb-capacity (pull request #397)
revid: get rid of output specific ring buffer parameters and use only RBCapacity and RBWriteTimeout Approved-by: Trek Hopton <trek.hopton@gmail.com>
This commit is contained in:
parent
890f142fb9
commit
3a076cd9c0
|
@ -84,15 +84,9 @@ const (
|
|||
defaultPSITime = 2
|
||||
defaultFileFPS = 0
|
||||
|
||||
// MTS ring buffer defaults.
|
||||
defaultMTSRBSize = 100
|
||||
defaultMTSRBElementSize = 10000
|
||||
defaultMTSRBWriteTimeout = 5
|
||||
|
||||
// RTMP ring buffer defaults.
|
||||
defaultRTMPRBSize = 100
|
||||
defaultRTMPRBElementSize = 10000
|
||||
defaultRTMPRBWriteTimeout = 5
|
||||
// Ring buffer defaults.
|
||||
defaultRBCapacity = 50000000 // => 50MB
|
||||
defaultRBWriteTimeout = 5 // Seconds.
|
||||
|
||||
// Motion filter parameter defaults.
|
||||
defaultMinFPS = 1.0
|
||||
|
@ -264,15 +258,9 @@ type Config struct {
|
|||
Filters []int // Defines the methods of filtering to be used in between lexing and encoding.
|
||||
PSITime int // Sets the time between a packet being sent.
|
||||
|
||||
// RTMP ring buffer parameters.
|
||||
RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
|
||||
RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
|
||||
RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds.
|
||||
|
||||
// MTS ring buffer parameters.
|
||||
MTSRBSize int // The number of elements in the MTS sender ringbuffer.
|
||||
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
|
||||
MTSRBWriteTimeout int // The ringbuffer write timeout in seconds.
|
||||
// Ring buffer parameters.
|
||||
RBCapacity uint // The number of bytes the ring buffer will occupy.
|
||||
RBWriteTimeout uint // The ringbuffer write timeout in seconds.
|
||||
|
||||
// Motion filter parameters.
|
||||
// Some parameters can be used with any filter, while others can only be used by a few.
|
||||
|
@ -334,7 +322,6 @@ var TypeData = map[string]string{
|
|||
"Outputs": "enums:File,Http,Rtmp,Rtp",
|
||||
"Quantization": "uint",
|
||||
"RBCapacity": "uint",
|
||||
"RBMaxElements": "uint",
|
||||
"RBWriteTimeout": "uint",
|
||||
"Rotation": "uint",
|
||||
"RTMPURL": "string",
|
||||
|
@ -444,34 +431,14 @@ func (c *Config) Validate() error {
|
|||
c.RTPAddress = defaultRtpAddr
|
||||
}
|
||||
|
||||
if c.RTMPRBSize <= 0 {
|
||||
c.Logger.Log(logger.Info, "RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize)
|
||||
c.RTMPRBSize = defaultRTMPRBSize
|
||||
if c.RBWriteTimeout <= 0 {
|
||||
c.Logger.Log(logger.Info, "RBWriteTimeout bad or unset, defaulting", "RBWriteTimeout", defaultRBWriteTimeout)
|
||||
c.RBWriteTimeout = defaultRBWriteTimeout
|
||||
}
|
||||
|
||||
if c.RTMPRBElementSize <= 0 {
|
||||
c.Logger.Log(logger.Info, "RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize)
|
||||
c.RTMPRBElementSize = defaultRTMPRBElementSize
|
||||
}
|
||||
|
||||
if c.RTMPRBWriteTimeout <= 0 {
|
||||
c.Logger.Log(logger.Info, "RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout)
|
||||
c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout
|
||||
}
|
||||
|
||||
if c.MTSRBSize <= 0 {
|
||||
c.Logger.Log(logger.Info, "MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
|
||||
c.MTSRBSize = defaultMTSRBSize
|
||||
}
|
||||
|
||||
if c.MTSRBElementSize <= 0 {
|
||||
c.Logger.Log(logger.Info, "MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize)
|
||||
c.MTSRBElementSize = defaultMTSRBElementSize
|
||||
}
|
||||
|
||||
if c.MTSRBWriteTimeout <= 0 {
|
||||
c.Logger.Log(logger.Info, "MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout)
|
||||
c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout
|
||||
if c.RBCapacity <= 0 {
|
||||
c.Logger.Log(logger.Info, "RBCapacity bad or unset, defaulting", "RBCapacity", defaultRBCapacity)
|
||||
c.RBCapacity = defaultRBCapacity
|
||||
}
|
||||
|
||||
if c.PSITime <= 0 {
|
||||
|
|
|
@ -58,17 +58,9 @@ import (
|
|||
"bitbucket.org/ausocean/utils/ring"
|
||||
)
|
||||
|
||||
// Ring buffer defaults.
|
||||
// Ring buffer parameters.
|
||||
const (
|
||||
// MTS ring buffer defaults.
|
||||
defaultMTSRBSize = 1000
|
||||
defaultMTSRBElementSize = 100000
|
||||
defaultMTSRBWriteTimeout = 5
|
||||
|
||||
// RTMP ring buffer defaults.
|
||||
defaultRTMPRBSize = 1000
|
||||
defaultRTMPRBElementSize = 300000
|
||||
defaultRTMPRBWriteTimeout = 5
|
||||
rbStartingElementSize = 10000 // Bytes.
|
||||
)
|
||||
|
||||
// RTMP connection properties.
|
||||
|
@ -265,6 +257,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
// will hold senders that require FLV encoding.
|
||||
var mtsSenders, flvSenders []io.WriteCloser
|
||||
|
||||
// Calculate no. of ring buffer elements based on starting element size
|
||||
// const and config directed max ring buffer size, then create buffer.
|
||||
// This is only used if the selected output uses a ring buffer.
|
||||
nElements := r.cfg.RBCapacity / rbStartingElementSize
|
||||
writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second
|
||||
|
||||
// We will go through our outputs and create the corresponding senders to add
|
||||
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
|
||||
// output requires FLV encoding.
|
||||
|
@ -273,13 +271,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
switch out {
|
||||
case config.OutputHTTP:
|
||||
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
|
||||
w = newMtsSender(
|
||||
newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report),
|
||||
r.cfg.Logger.Log,
|
||||
ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second),
|
||||
r.cfg.ClipDuration,
|
||||
)
|
||||
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
|
||||
hs := newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
|
||||
w = newMtsSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
|
||||
mtsSenders = append(mtsSenders, w)
|
||||
|
||||
case config.OutputRTP:
|
||||
r.cfg.Logger.Log(logger.Debug, "using RTP output")
|
||||
w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report)
|
||||
|
@ -296,14 +292,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
mtsSenders = append(mtsSenders, w)
|
||||
case config.OutputRTMP:
|
||||
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
|
||||
w, err := newRtmpSender(
|
||||
r.cfg.RTMPURL,
|
||||
rtmpConnectionTimeout,
|
||||
rtmpConnectionMaxTries,
|
||||
ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second),
|
||||
r.cfg.Logger.Log,
|
||||
r.bitrate.Report,
|
||||
)
|
||||
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
|
||||
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
|
||||
if err != nil {
|
||||
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())
|
||||
}
|
||||
|
@ -754,48 +744,20 @@ func (r *Revid) Update(vars map[string]string) error {
|
|||
default:
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid Logging param", "value", value)
|
||||
}
|
||||
case "RTMPRBSize":
|
||||
case "RBCapacity":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBSize var", "value", value)
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid RBCapacity var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.RTMPRBSize = v
|
||||
case "RTMPRBElementSize":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBElementSize var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.RTMPRBElementSize = v
|
||||
case "RTMPRBWriteTimeout":
|
||||
r.cfg.RBCapacity = uint(v)
|
||||
case "RBWriteTimeout":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v <= 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBWriteTimeout var", "value", value)
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid RBWriteTimeout var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.RTMPRBWriteTimeout = v
|
||||
case "MTSRBSize":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid MTSRBSize var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.MTSRBSize = v
|
||||
case "MTSRBElementSize":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid MTSRBElementSize var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.MTSRBElementSize = v
|
||||
case "MTSRBWriteTimeout":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v <= 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, "invalid MTSRBWriteTimeout var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.MTSRBWriteTimeout = v
|
||||
r.cfg.RBWriteTimeout = uint(v)
|
||||
case "CBR":
|
||||
v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)]
|
||||
if !ok {
|
||||
|
|
|
@ -134,7 +134,9 @@ func TestMtsSenderSegment(t *testing.T) {
|
|||
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
||||
const numberOfClips = 11
|
||||
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
|
||||
const testRBCapacity = 50000000
|
||||
nElements := testRBCapacity / rbStartingElementSize
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
|
||||
|
||||
const psiSendCount = 10
|
||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||
|
@ -213,7 +215,9 @@ func TestMtsSenderFailedSend(t *testing.T) {
|
|||
// Create destination, the mtsSender and the mtsEncoder
|
||||
const clipToFailAt = 3
|
||||
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
|
||||
const testRBCapacity = 50000000 // 50MB
|
||||
nElements := testRBCapacity / rbStartingElementSize
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
|
||||
|
||||
const psiSendCount = 10
|
||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||
|
@ -294,7 +298,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
|
|||
// Create destination, the mtsSender and the mtsEncoder.
|
||||
const clipToDelay = 3
|
||||
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, rbStartingElementSize, 0), 0)
|
||||
|
||||
const psiSendCount = 10
|
||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||
|
|
Loading…
Reference in New Issue