revid: now using variable ring buffer

This commit is contained in:
Saxon 2020-01-08 22:50:29 +10:30
parent f5b8ea10fc
commit e9bd2fc0d1
4 changed files with 99 additions and 78 deletions

View File

@ -86,14 +86,16 @@ const (
defaultAudioInputCodec = codecutil.ADPCM
defaultPSITime = 2
// Ring buffer defaults.
// MTS ring buffer defaults.
defaultMTSRBSize = 100
defaultMTSRBElementSize = 300000
defaultMTSRBMaxElements = 10000
defaultMTSRBCapacity = 200000000 // bytes
defaultMTSRBWriteTimeout = 5
// RTMP ring buffer defaults.
defaultRTMPRBSize = 100
defaultRTMPRBElementSize = 300000
defaultRTMPRBMaxElements = 10000
defaultRTMPRBCapacity = 200000000 // bytes
defaultRTMPRBWriteTimeout = 5
// Motion filter parameter defaults.
@ -283,13 +285,13 @@ type Config struct {
PSITime int // Sets the time between a packet being sent.
// RTMP ring buffer parameters.
RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
RTMPRBMaxElements int // The maximum possible number of elements in ring buffer.
RTMPRBCapacity int // The total number of bytes available for the ring buffer.
RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds.
// MTS ring buffer parameters.
MTSRBSize int // The number of elements in the MTS sender ringbuffer.
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
MTSRBMaxElements int // The maximum possible number of elements in ring buffer.
MTSRBCapacity int // The total number of bytes available for the ring buffer.
MTSRBWriteTimeout int // The ringbuffer write timeout in seconds.
// Motion filter parameters.
@ -454,14 +456,14 @@ func (c *Config) Validate() error {
c.RTPAddress = defaultRtpAddr
}
if c.RTMPRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize)
c.RTMPRBSize = defaultRTMPRBSize
if c.RTMPRBMaxElements <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBMaxElements bad or unset, defaulting", "RTMPRBMaxElements", defaultRTMPRBMaxElements)
c.RTMPRBMaxElements = defaultRTMPRBMaxElements
}
if c.RTMPRBElementSize <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize)
c.RTMPRBElementSize = defaultRTMPRBElementSize
if c.RTMPRBCapacity <= 0 {
c.Logger.Log(logger.Info, pkg+"RTMPRBCapacity bad or unset, defaulting", "RTMPRBCapacity", defaultRTMPRBCapacity)
c.RTMPRBCapacity = defaultRTMPRBCapacity
}
if c.RTMPRBWriteTimeout <= 0 {
@ -469,14 +471,14 @@ func (c *Config) Validate() error {
c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout
}
if c.MTSRBSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
c.MTSRBSize = defaultMTSRBSize
if c.MTSRBMaxElements <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBMaxElements)
c.MTSRBMaxElements = defaultMTSRBMaxElements
}
if c.MTSRBElementSize <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize)
c.MTSRBElementSize = defaultMTSRBElementSize
if c.MTSRBCapacity <= 0 {
c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBCapacity)
c.MTSRBCapacity = defaultMTSRBCapacity
}
if c.MTSRBWriteTimeout <= 0 {

View File

@ -54,20 +54,7 @@ import (
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer defaults.
const (
// MTS ring buffer defaults.
defaultMTSRBSize = 1000
defaultMTSRBElementSize = 100000
defaultMTSRBWriteTimeout = 5
// RTMP ring buffer defaults.
defaultRTMPRBSize = 1000
defaultRTMPRBElementSize = 300000
defaultRTMPRBWriteTimeout = 5
"bitbucket.org/ausocean/utils/vring"
)
// RTMP connection properties.
@ -274,10 +261,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
for _, out := range r.cfg.Outputs {
switch out {
case config.OutputHTTP:
rb, err := vring.NewBuffer(r.cfg.MTSRBMaxElements, r.cfg.MTSRBCapacity, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second)
if err != nil {
return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
}
w = newMtsSender(
newHttpSender(r.ns, r.cfg.Logger.Log),
r.cfg.Logger.Log,
ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second),
rb,
r.cfg.ClipDuration,
)
mtsSenders = append(mtsSenders, w)
@ -294,11 +285,15 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
}
mtsSenders = append(mtsSenders, w)
case config.OutputRTMP:
rb, err := vring.NewBuffer(r.cfg.RTMPRBMaxElements, r.cfg.RTMPRBCapacity, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second)
if err != nil {
return fmt.Errorf("could not initialise RTMP ring buffer: %w", err)
}
w, err := newRtmpSender(
r.cfg.RTMPURL,
rtmpConnectionTimeout,
rtmpConnectionMaxTries,
ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second),
rb,
r.cfg.Logger.Log,
)
if err != nil {
@ -705,20 +700,20 @@ func (r *Revid) Update(vars map[string]string) error {
default:
r.cfg.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value)
}
case "RTMPRBSize":
case "RTMPRBMaxElements":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value)
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBMaxElements var", "value", value)
break
}
r.cfg.RTMPRBSize = v
case "RTMPRBElementSize":
r.cfg.RTMPRBMaxElements = v
case "RTMPRBCapacity":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value)
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBCapacity var", "value", value)
break
}
r.cfg.RTMPRBElementSize = v
r.cfg.RTMPRBCapacity = v
case "RTMPRBWriteTimeout":
v, err := strconv.Atoi(value)
if err != nil || v <= 0 {
@ -726,20 +721,20 @@ func (r *Revid) Update(vars map[string]string) error {
break
}
r.cfg.RTMPRBWriteTimeout = v
case "MTSRBSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value)
break
}
r.cfg.MTSRBSize = v
case "MTSRBElementSize":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value)
break
}
r.cfg.MTSRBElementSize = v
r.cfg.MTSRBMaxElements = v
case "MTSRBCapacity":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBCapacity var", "value", value)
break
}
r.cfg.MTSRBCapacity = v
case "MTSRBWriteTimeout":
v, err := strconv.Atoi(value)
if err != nil || v <= 0 {

View File

@ -44,7 +44,7 @@ import (
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/vring"
)
// Log is used by the multiSender.
@ -161,7 +161,7 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct {
dst io.WriteCloser
buf []byte
ring *ring.Buffer
ring *vring.Buffer
next []byte
pkt packet.Packet
repairer *mts.DiscontinuityRepairer
@ -174,7 +174,7 @@ type mtsSender struct {
}
// newMtsSender returns a new mtsSender.
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *vring.Buffer, clipDur time.Duration) *mtsSender {
s := &mtsSender{
dst: dst,
repairer: mts.NewDiscontinuityRepairer(),
@ -190,7 +190,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int
// output starts an mtsSender's data handling routine.
func (s *mtsSender) output() {
var chunk *ring.Chunk
var elem *vring.Element
for {
select {
case <-s.done:
@ -198,14 +198,14 @@ func (s *mtsSender) output() {
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
// If elem is nil then we're ready to get another from the ringBuffer.
if elem == nil {
var err error
chunk, err = s.ring.Next(mtsRBReadTimeout)
elem, err = s.ring.Next(mtsRBReadTimeout)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
case vring.ErrNextTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
continue
default:
@ -213,20 +213,20 @@ func (s *mtsSender) output() {
continue
}
}
err := s.repairer.Repair(chunk.Bytes())
err := s.repairer.Repair(elem.Bytes())
if err != nil {
chunk.Close()
chunk = nil
elem.Close()
elem = nil
continue
}
s.log(logger.Debug, pkg+"mtsSender: writing")
_, err = s.dst.Write(chunk.Bytes())
_, err = s.dst.Write(elem.Bytes())
if err != nil {
s.repairer.Failed()
continue
}
chunk.Close()
chunk = nil
elem.Close()
elem = nil
}
}
}
@ -273,12 +273,12 @@ type rtmpSender struct {
timeout uint
retries int
log func(lvl int8, msg string, args ...interface{})
ring *ring.Buffer
ring *vring.Buffer
done chan struct{}
wg sync.WaitGroup
}
func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
@ -307,7 +307,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log f
// output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() {
var chunk *ring.Chunk
var elem *vring.Element
for {
select {
case <-s.done:
@ -315,14 +315,14 @@ func (s *rtmpSender) output() {
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ring buffer.
if chunk == nil {
// If elem is nil then we're ready to get another from the ring buffer.
if elem == nil {
var err error
chunk, err = s.ring.Next(rtmpRBReadTimeout)
elem, err = s.ring.Next(rtmpRBReadTimeout)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
case vring.ErrNextTimeout:
s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout")
continue
default:
@ -338,7 +338,7 @@ func (s *rtmpSender) output() {
continue
}
}
_, err := s.conn.Write(chunk.Bytes())
_, err := s.conn.Write(elem.Bytes())
switch err {
case nil, rtmp.ErrInvalidFlvTag:
default:
@ -349,8 +349,8 @@ func (s *rtmpSender) output() {
}
continue
}
chunk.Close()
chunk = nil
elem.Close()
elem = nil
}
}
}

View File

@ -39,13 +39,25 @@ import (
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/vring"
)
var (
errSendFailed = errors.New("send failed")
)
const (
// MTS ring buffer defaults.
mtsRBMaxElements = 10000
mtsRBCapacity = 200000000 // bytes
mtsRBWriteTimeout = 5
// RTMP ring buffer defaults.
rtmpRBMaxElements = 10000
rtmpRBCapacity = 200000000 // bytes
rtmpRBWriteTimeout = 5
)
// destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends.
type destination struct {
@ -134,7 +146,11 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
rb, err := vring.NewBuffer(mtsRBMaxElements, mtsRBCapacity, 0)
if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err)
}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
@ -213,7 +229,11 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
rb, err := vring.NewBuffer(mtsRBMaxElements, mtsRBCapacity, 0)
if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err)
}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
@ -294,7 +314,11 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
rb, err := vring.NewBuffer(1, mtsRBCapacity, 0)
if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err)
}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))