Merged in add-fixed-element-rb (pull request #396)

revid: reverted to fixed element size ring buffer
This commit is contained in:
Saxon Milton 2020-04-09 06:31:03 +00:00
commit 40133eaad9
4 changed files with 110 additions and 94 deletions

View File

@ -84,10 +84,15 @@ const (
defaultPSITime = 2 defaultPSITime = 2
defaultFileFPS = 0 defaultFileFPS = 0
// Ring buffer defaults. // MTS ring buffer defaults.
defaultRBMaxElements = 10000 defaultMTSRBSize = 100
defaultRBCapacity = 200000000 // bytes (200MB) defaultMTSRBElementSize = 10000
defaultRBWriteTimeout = 5 defaultMTSRBWriteTimeout = 5
// RTMP ring buffer defaults.
defaultRTMPRBSize = 100
defaultRTMPRBElementSize = 10000
defaultRTMPRBWriteTimeout = 5
// Motion filter parameter defaults. // Motion filter parameter defaults.
defaultMinFPS = 1.0 defaultMinFPS = 1.0
@ -259,10 +264,15 @@ type Config struct {
Filters []int // Defines the methods of filtering to be used in between lexing and encoding. Filters []int // Defines the methods of filtering to be used in between lexing and encoding.
PSITime int // Sets the time between a packet being sent. PSITime int // Sets the time between a packet being sent.
// Ring buffer parameters. // RTMP ring buffer parameters.
RBMaxElements int // The maximum possible number of elements in ring buffer. RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
RBCapacity int // The total number of bytes available for the ring buffer. RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
RBWriteTimeout int // The ringbuffer write timeout in seconds. RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds.
// MTS ring buffer parameters.
MTSRBSize int // The number of elements in the MTS sender ringbuffer.
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
MTSRBWriteTimeout int // The ringbuffer write timeout in seconds.
// Motion filter parameters. // Motion filter parameters.
// Some parameters can be used with any filter, while others can only be used by a few. // Some parameters can be used with any filter, while others can only be used by a few.
@ -434,19 +444,34 @@ func (c *Config) Validate() error {
c.RTPAddress = defaultRtpAddr c.RTPAddress = defaultRtpAddr
} }
if c.RBMaxElements <= 0 { if c.RTMPRBSize <= 0 {
c.LogInvalidField("RBMaxElements", defaultRBMaxElements) c.Logger.Log(logger.Info, "RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize)
c.RBMaxElements = defaultRBMaxElements c.RTMPRBSize = defaultRTMPRBSize
} }
if c.RBCapacity <= 0 { if c.RTMPRBElementSize <= 0 {
c.LogInvalidField("RBCapacity", defaultRBCapacity) c.Logger.Log(logger.Info, "RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize)
c.RBCapacity = defaultRBCapacity c.RTMPRBElementSize = defaultRTMPRBElementSize
} }
if c.RBWriteTimeout <= 0 { if c.RTMPRBWriteTimeout <= 0 {
c.LogInvalidField("RBWriteTimeout", defaultRBWriteTimeout) c.Logger.Log(logger.Info, "RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout)
c.RBWriteTimeout = defaultRBWriteTimeout c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout
}
if c.MTSRBSize <= 0 {
c.Logger.Log(logger.Info, "MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
c.MTSRBSize = defaultMTSRBSize
}
if c.MTSRBElementSize <= 0 {
c.Logger.Log(logger.Info, "MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize)
c.MTSRBElementSize = defaultMTSRBElementSize
}
if c.MTSRBWriteTimeout <= 0 {
c.Logger.Log(logger.Info, "MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout)
c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout
} }
if c.PSITime <= 0 { if c.PSITime <= 0 {

View File

@ -55,7 +55,20 @@ import (
"bitbucket.org/ausocean/utils/bitrate" "bitbucket.org/ausocean/utils/bitrate"
"bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/vring" "bitbucket.org/ausocean/utils/ring"
)
// Ring buffer defaults.
const (
// MTS ring buffer defaults.
defaultMTSRBSize = 1000
defaultMTSRBElementSize = 100000
defaultMTSRBWriteTimeout = 5
// RTMP ring buffer defaults.
defaultRTMPRBSize = 1000
defaultRTMPRBElementSize = 300000
defaultRTMPRBWriteTimeout = 5
) )
// RTMP connection properties. // RTMP connection properties.
@ -260,14 +273,10 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
switch out { switch out {
case config.OutputHTTP: case config.OutputHTTP:
r.cfg.Logger.Log(logger.Debug, "using HTTP output") r.cfg.Logger.Log(logger.Debug, "using HTTP output")
rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
if err != nil {
return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
}
w = newMtsSender( w = newMtsSender(
newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report), newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report),
r.cfg.Logger.Log, r.cfg.Logger.Log,
rb, ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second),
r.cfg.ClipDuration, r.cfg.ClipDuration,
) )
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
@ -287,15 +296,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case config.OutputRTMP: case config.OutputRTMP:
r.cfg.Logger.Log(logger.Debug, "using RTMP output") r.cfg.Logger.Log(logger.Debug, "using RTMP output")
rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
if err != nil {
return fmt.Errorf("could not initialise RTMP ring buffer: %w", err)
}
w, err := newRtmpSender( w, err := newRtmpSender(
r.cfg.RTMPURL, r.cfg.RTMPURL,
rtmpConnectionTimeout, rtmpConnectionTimeout,
rtmpConnectionMaxTries, rtmpConnectionMaxTries,
rb, ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second),
r.cfg.Logger.Log, r.cfg.Logger.Log,
r.bitrate.Report, r.bitrate.Report,
) )
@ -749,48 +754,48 @@ func (r *Revid) Update(vars map[string]string) error {
default: default:
r.cfg.Logger.Log(logger.Warning, "invalid Logging param", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid Logging param", "value", value)
} }
case "RTMPRBMaxElements": case "RTMPRBSize":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBMaxElements var", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBSize var", "value", value)
break break
} }
r.cfg.RBMaxElements = v r.cfg.RTMPRBSize = v
case "RTMPRBCapacity": case "RTMPRBElementSize":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBCapacity var", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBElementSize var", "value", value)
break break
} }
r.cfg.RBCapacity = v r.cfg.RTMPRBElementSize = v
case "RTMPRBWriteTimeout": case "RTMPRBWriteTimeout":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v <= 0 { if err != nil || v <= 0 {
r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBWriteTimeout var", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid RTMPRBWriteTimeout var", "value", value)
break break
} }
r.cfg.RBWriteTimeout = v r.cfg.RTMPRBWriteTimeout = v
case "MTSRBMaxElements": case "MTSRBSize":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, "invalid MTSRBMaxElements var", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid MTSRBSize var", "value", value)
break break
} }
r.cfg.RBMaxElements = v r.cfg.MTSRBSize = v
case "MTSRBCapacity": case "MTSRBElementSize":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v < 0 { if err != nil || v < 0 {
r.cfg.Logger.Log(logger.Warning, "invalid MTSRBCapacity var", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid MTSRBElementSize var", "value", value)
break break
} }
r.cfg.RBCapacity = v r.cfg.MTSRBElementSize = v
case "MTSRBWriteTimeout": case "MTSRBWriteTimeout":
v, err := strconv.Atoi(value) v, err := strconv.Atoi(value)
if err != nil || v <= 0 { if err != nil || v <= 0 {
r.cfg.Logger.Log(logger.Warning, "invalid MTSRBWriteTimeout var", "value", value) r.cfg.Logger.Log(logger.Warning, "invalid MTSRBWriteTimeout var", "value", value)
break break
} }
r.cfg.RBWriteTimeout = v r.cfg.MTSRBWriteTimeout = v
case "CBR": case "CBR":
v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)] v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)]
if !ok { if !ok {

View File

@ -44,7 +44,7 @@ import (
"bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/vring" "bitbucket.org/ausocean/utils/ring"
) )
// Log is used by the multiSender. // Log is used by the multiSender.
@ -54,6 +54,7 @@ type Log func(level int8, message string, params ...interface{})
const ( const (
rtmpRBReadTimeout = 1 * time.Second rtmpRBReadTimeout = 1 * time.Second
mtsRBReadTimeout = 1 * time.Second mtsRBReadTimeout = 1 * time.Second
maxBuffLen = 100000000
) )
// httpSender provides an implemntation of io.Writer to perform sends to a http // httpSender provides an implemntation of io.Writer to perform sends to a http
@ -173,7 +174,7 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct { type mtsSender struct {
dst io.WriteCloser dst io.WriteCloser
buf []byte buf []byte
ring *vring.Buffer ring *ring.Buffer
next []byte next []byte
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
@ -186,7 +187,7 @@ type mtsSender struct {
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *vring.Buffer, clipDur time.Duration) *mtsSender { func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
s := &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
@ -202,7 +203,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int
// output starts an mtsSender's data handling routine. // output starts an mtsSender's data handling routine.
func (s *mtsSender) output() { func (s *mtsSender) output() {
var elem *vring.Element var chunk *ring.Chunk
for { for {
select { select {
case <-s.done: case <-s.done:
@ -210,29 +211,29 @@ func (s *mtsSender) output() {
defer s.wg.Done() defer s.wg.Done()
return return
default: default:
// If elem is nil then we're ready to get another from the ringBuffer. // If chunk is nil then we're ready to get another from the ringBuffer.
if elem == nil { if chunk == nil {
var err error var err error
elem, err = s.ring.Next(mtsRBReadTimeout) chunk, err = s.ring.Next(mtsRBReadTimeout)
switch err { switch err {
case nil, io.EOF: case nil, io.EOF:
continue continue
case vring.ErrNextTimeout: case ring.ErrTimeout:
s.log(logger.Debug, "ring buffer read timeout") s.log(logger.Debug, "mtsSender: ring buffer read timeout")
continue continue
default: default:
s.log(logger.Error, "unexpected error", "error", err.Error()) s.log(logger.Error, "unexpected error", "error", err.Error())
continue continue
} }
} }
err := s.repairer.Repair(elem.Bytes()) err := s.repairer.Repair(chunk.Bytes())
if err != nil { if err != nil {
elem.Close() chunk.Close()
elem = nil chunk = nil
continue continue
} }
s.log(logger.Debug, "writing") s.log(logger.Debug, "mtsSender: writing")
_, err = s.dst.Write(elem.Bytes()) _, err = s.dst.Write(chunk.Bytes())
if err != nil { if err != nil {
s.log(logger.Debug, "failed write, repairing MTS", "error", err) s.log(logger.Debug, "failed write, repairing MTS", "error", err)
s.repairer.Failed() s.repairer.Failed()
@ -240,8 +241,8 @@ func (s *mtsSender) output() {
} else { } else {
s.log(logger.Debug, "good write") s.log(logger.Debug, "good write")
} }
elem.Close() chunk.Close()
elem = nil chunk = nil
} }
} }
} }
@ -268,6 +269,9 @@ func (s *mtsSender) Write(d []byte) (int, error) {
} }
if err != nil { if err != nil {
s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf)) s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf))
if err == ring.ErrTooLong {
s.ring = ring.NewBuffer(maxBuffLen/len(d), len(d), 5*time.Second)
}
} }
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
@ -290,13 +294,13 @@ type rtmpSender struct {
timeout uint timeout uint
retries int retries int
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
ring *vring.Buffer ring *ring.Buffer
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
report func(sent int) report func(sent int)
} }
func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
var conn *rtmp.Conn var conn *rtmp.Conn
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
@ -326,7 +330,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log
// output starts an mtsSender's data handling routine. // output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() { func (s *rtmpSender) output() {
var elem *vring.Element var chunk *ring.Chunk
for { for {
select { select {
case <-s.done: case <-s.done:
@ -334,15 +338,15 @@ func (s *rtmpSender) output() {
defer s.wg.Done() defer s.wg.Done()
return return
default: default:
// If elem is nil then we're ready to get another from the ring buffer. // If chunk is nil then we're ready to get another from the ring buffer.
if elem == nil { if chunk == nil {
var err error var err error
elem, err = s.ring.Next(rtmpRBReadTimeout) chunk, err = s.ring.Next(rtmpRBReadTimeout)
switch err { switch err {
case nil, io.EOF: case nil, io.EOF:
continue continue
case vring.ErrNextTimeout: case ring.ErrTimeout:
s.log(logger.Debug, "ring buffer read timeout") s.log(logger.Debug, "rtmpSender: ring buffer read timeout")
continue continue
default: default:
s.log(logger.Error, "unexpected error", "error", err.Error()) s.log(logger.Error, "unexpected error", "error", err.Error())
@ -357,9 +361,7 @@ func (s *rtmpSender) output() {
continue continue
} }
} }
b := elem.Bytes() _, err := s.conn.Write(chunk.Bytes())
s.log(logger.Debug, "writing to conn", "len", len(b))
_, err := s.conn.Write(b)
switch err { switch err {
case nil, rtmp.ErrInvalidFlvTag: case nil, rtmp.ErrInvalidFlvTag:
s.log(logger.Debug, "good write to conn") s.log(logger.Debug, "good write to conn")
@ -371,8 +373,8 @@ func (s *rtmpSender) output() {
} }
continue continue
} }
elem.Close() chunk.Close()
elem = nil chunk = nil
} }
} }
} }
@ -386,6 +388,9 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
s.log(logger.Debug, "good ring buffer write", "len", len(d)) s.log(logger.Debug, "good ring buffer write", "len", len(d))
} else { } else {
s.log(logger.Warning, "ring buffer write error", "error", err.Error()) s.log(logger.Warning, "ring buffer write error", "error", err.Error())
if err == ring.ErrTooLong {
s.ring = ring.NewBuffer(maxBuffLen/len(d), len(d), 5*time.Second)
}
} }
s.report(len(d)) s.report(len(d))
return len(d), nil return len(d), nil

View File

@ -39,20 +39,13 @@ import (
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/vring" "bitbucket.org/ausocean/utils/ring"
) )
var ( var (
errSendFailed = errors.New("send failed") errSendFailed = errors.New("send failed")
) )
// Ring buffer parameters.
const (
rbMaxElements = 10000
rbCapacity = 200000000 // bytes
rbWriteTimeout = 5
)
// destination simulates a destination for the mtsSender. It allows for the // destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends. // emulation of failed and delayed sends.
type destination struct { type destination struct {
@ -141,11 +134,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err)
}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
const psiSendCount = 10 const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
@ -224,11 +213,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 10*time.Millisecond) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err)
}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
const psiSendCount = 10 const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
@ -309,11 +294,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
rb, err := vring.NewBuffer(1, rbCapacity, 10*time.Millisecond) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
if err != nil {
t.Fatalf("could not initialise ring buffer: %v", err)
}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
const psiSendCount = 10 const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))