revid: added ringBuffer to rtmpSender

This commit is contained in:
Saxon 2019-04-15 10:48:12 +09:30
parent d75ea20137
commit d18373908b
2 changed files with 67 additions and 13 deletions

View File

@ -52,8 +52,8 @@ import (
const (
rbSize = 1000
rbElementSize = 100000
wTimeout = 1 * time.Second
rTimeout = 1 * time.Second
wTimeout = 0 * time.Second
rTimeout = 0 * time.Second
)
// RTMP connection properties.
@ -119,6 +119,11 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
return &r, nil
}
// Config returns a copy of revids current config.
func (r *Revid) Config() Config {
return r.config
}
// TODO(Saxon): put more thought into error severity.
func (r *Revid) handleErrors() {
for {

View File

@ -29,7 +29,6 @@ LICENSE
package revid
import (
"errors"
"fmt"
"io"
"net"
@ -252,14 +251,14 @@ func (s *mtsSender) Close() error {
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
conn *rtmp.Conn
conn *rtmp.Conn
url string
timeout uint
retries int
log func(lvl int8, msg string, args ...interface{})
data []byte
rb *ring.Buffer
quit chan struct{}
wg sync.WaitGroup
}
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
@ -281,20 +280,70 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout,
retries: retries,
log: log,
rb: ring.NewBuffer(10, rbElementSize, 0),
quit: make(chan struct{}),
}
s.wg.Add(1)
go s.output()
return s, err
}
// output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() {
var chunk *ring.Chunk
for {
select {
case <-s.quit:
s.log(logger.Info, pkg+"rtmpSender: got quit signal, terminating output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.rb.Next(rTimeout)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error())
continue
}
}
if s.conn == nil {
s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...")
err := s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
continue
}
}
_, err := s.conn.Write(chunk.Bytes())
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error())
err = s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
}
continue
}
chunk.Close()
chunk = nil
}
}
}
// Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) {
if s.conn == nil {
return 0, errors.New("no rtmp connection, cannot write")
}
_, err := s.conn.Write(d)
_, err := s.rb.Write(d)
if err != nil {
err = s.restart()
s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error())
}
return len(d), err
s.rb.Flush()
return len(d), nil
}
func (s *rtmpSender) restart() error {