From d18373908bd7a40511293edbaf60e887b09acf73 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 15 Apr 2019 10:48:12 +0930 Subject: [PATCH] revid: added ringBuffer to rtmpSender --- revid/revid.go | 9 ++++-- revid/senders.go | 71 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 865bc342..48b4228d 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -52,8 +52,8 @@ import ( const ( rbSize = 1000 rbElementSize = 100000 - wTimeout = 1 * time.Second - rTimeout = 1 * time.Second + wTimeout = 0 * time.Second + rTimeout = 0 * time.Second ) // RTMP connection properties. @@ -119,6 +119,11 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// Config returns a copy of revids current config. +func (r *Revid) Config() Config { + return r.config +} + // TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { diff --git a/revid/senders.go b/revid/senders.go index 4da8cdd5..5f290fa2 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "errors" "fmt" "io" "net" @@ -252,14 +251,14 @@ func (s *mtsSender) Close() error { // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { - conn *rtmp.Conn - + conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - - data []byte + rb *ring.Buffer + quit chan struct{} + wg sync.WaitGroup } func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { @@ -281,20 +280,70 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, + rb: ring.NewBuffer(10, rbElementSize, 0), + quit: make(chan struct{}), } + s.wg.Add(1) + go s.output() return s, err } +// output starts an mtsSender's data handling routine. +func (s *rtmpSender) output() { + var chunk *ring.Chunk + for { + select { + case <-s.quit: + s.log(logger.Info, pkg+"rtmpSender: got quit signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ringBuffer. + if chunk == nil { + var err error + chunk, err = s.rb.Next(rTimeout) + switch err { + case nil, io.EOF: + continue + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error()) + continue + } + } + if s.conn == nil { + s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...") + err := s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + continue + } + } + _, err := s.conn.Write(chunk.Bytes()) + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) + err = s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + } + continue + } + chunk.Close() + chunk = nil + } + } +} + // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - if s.conn == nil { - return 0, errors.New("no rtmp connection, cannot write") - } - _, err := s.conn.Write(d) + _, err := s.rb.Write(d) if err != nil { - err = s.restart() + s.log(logger.Warning, pkg+"rtmpSender: ringBuffer write error", "error", err.Error()) } - return len(d), err + s.rb.Flush() + return len(d), nil } func (s *rtmpSender) restart() error {