diff --git a/revid/revid.go b/revid/revid.go index b3380dd5..b8793545 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -111,6 +111,10 @@ type Revid struct { // destination is the target endpoint. destination []loadSender + // rtpSender is an unbuffered sender. + // It is used to isolate RTP from ring buffer-induced delays. + rtpSender *rtpSender + // bitrate hold the last send bitrate calculation result. bitrate int @@ -137,6 +141,13 @@ func (p *packer) Write(frame []byte) (int, error) { return len(frame), nil } n, err := p.owner.buffer.Write(frame) + // If we have an rtp sender bypass ringbuffer and give straight to sender + if p.owner.rtpSender != nil { + err = p.owner.rtpSender.send(frame) + if err != nil { + p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) + } + } if err != nil { if err == ring.ErrDropped { p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) @@ -193,7 +204,7 @@ func (r *Revid) reset(config Config) error { } n := 1 - if r.config.Output2 != 0 { + if r.config.Output2 != 0 && r.config.Output2 != Rtp { n = 2 } r.destination = make([]loadSender, n) @@ -227,11 +238,10 @@ func (r *Revid) reset(config Config) error { } r.destination[outNo] = s case Rtp: - s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { return err } - r.destination[outNo] = s } } diff --git a/revid/senders.go b/revid/senders.go index 145362f5..d1689396 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,6 +29,7 @@ LICENSE package revid import ( + "errors" "fmt" "io" "net" @@ -369,7 +370,6 @@ func (s *udpSender) close() error { return nil } // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) - chunk *ring.Chunk encoder *rtp.Encoder } @@ -385,19 +385,12 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ return s, nil } -func (s *rtpSender) load(c *ring.Chunk) error { - s.chunk = c - return nil -} - -func (s *rtpSender) send() error { - _, err := s.chunk.WriteTo(s.encoder) +func (s *rtpSender) send(d []byte) error { + var err error + if d != nil { + _, err = s.encoder.Write(d) + } else { + err = errors.New("no data to send provided") + } return err } - -func (s *rtpSender) release() { - s.chunk.Close() - s.chunk = nil -} - -func (s *rtpSender) close() error { return nil }