Merged in rtp-ringbuff-bypass (pull request #88)

Rtp ringbuff bypass

Approved-by: Alan Noble <anoble@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2019-01-16 23:22:18 +00:00
commit ddb8fc1e1c
2 changed files with 21 additions and 18 deletions

View File

@ -111,6 +111,10 @@ type Revid struct {
// destination is the target endpoint. // destination is the target endpoint.
destination []loadSender destination []loadSender
// rtpSender is an unbuffered sender.
// It is used to isolate RTP from ring buffer-induced delays.
rtpSender *rtpSender
// bitrate hold the last send bitrate calculation result. // bitrate hold the last send bitrate calculation result.
bitrate int bitrate int
@ -137,6 +141,13 @@ func (p *packer) Write(frame []byte) (int, error) {
return len(frame), nil return len(frame), nil
} }
n, err := p.owner.buffer.Write(frame) n, err := p.owner.buffer.Write(frame)
// If we have an rtp sender bypass ringbuffer and give straight to sender
if p.owner.rtpSender != nil {
err = p.owner.rtpSender.send(frame)
if err != nil {
p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error())
}
}
if err != nil { if err != nil {
if err == ring.ErrDropped { if err == ring.ErrDropped {
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
@ -193,7 +204,7 @@ func (r *Revid) reset(config Config) error {
} }
n := 1 n := 1
if r.config.Output2 != 0 { if r.config.Output2 != 0 && r.config.Output2 != Rtp {
n = 2 n = 2
} }
r.destination = make([]loadSender, n) r.destination = make([]loadSender, n)
@ -227,11 +238,10 @@ func (r *Revid) reset(config Config) error {
} }
r.destination[outNo] = s r.destination[outNo] = s
case Rtp: case Rtp:
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s
} }
} }

View File

@ -29,6 +29,7 @@ LICENSE
package revid package revid
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -369,7 +370,6 @@ func (s *udpSender) close() error { return nil }
// rtpSender implements loadSender for a native udp destination with rtp packetization. // rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct { type rtpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
encoder *rtp.Encoder encoder *rtp.Encoder
} }
@ -385,19 +385,12 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
return s, nil return s, nil
} }
func (s *rtpSender) load(c *ring.Chunk) error { func (s *rtpSender) send(d []byte) error {
s.chunk = c var err error
return nil if d != nil {
_, err = s.encoder.Write(d)
} else {
err = errors.New("no data to send provided")
} }
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)
return err return err
} }
func (s *rtpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtpSender) close() error { return nil }