revid: senders are now io.Writers

Added a Write method to senders such that they implement io.Writer. The multiSender now takes a slice of io.writers.
Also modified revid code and tests to account for this chance.
This commit is contained in:
Saxon 2019-03-29 12:49:26 +10:30
parent 7c31f6fd6c
commit 3896a5e804
4 changed files with 55 additions and 15 deletions

View File

@ -184,12 +184,12 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
// will hold senders that require FLV encoding.
var mtsSenders, flvSenders []loadSender
var mtsSenders, flvSenders []io.Writer
// We will go through our outputs and create the corresponding senders to add
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
// output requires FLV encoding.
var sender loadSender
var sender io.Writer
for _, out := range r.config.Outputs {
switch out {
case Http:

View File

@ -278,7 +278,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
ok := false
for _, dst := range senders {
// Get type of sender.
senderType, err := typeOfSender(dst)
senderType, err := typeOfSender(dst.(loadSender))
if err != nil {
t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err)
}

View File

@ -31,6 +31,7 @@ package revid
import (
"errors"
"fmt"
"io"
"net"
"os"
"strconv"
@ -58,12 +59,12 @@ type Log func(level int8, message string, params ...interface{})
// multiSender implements io.Writer. It provides the capacity to send to multiple
// senders from a single Write call.
type multiSender struct {
senders []loadSender
senders []io.Writer
log Log
}
// newMultiSender returns a pointer to a new multiSender.
func newMultiSender(senders []loadSender, log Log) *multiSender {
func newMultiSender(senders []io.Writer, log Log) *multiSender {
return &multiSender{
senders: senders,
log: log,
@ -74,14 +75,17 @@ func newMultiSender(senders []loadSender, log Log) *multiSender {
// and release on all senders of multiSender.
func (s *multiSender) Write(d []byte) (int, error) {
for i, sender := range s.senders {
sender.load(d)
err := sender.send()
_, err := sender.Write(d)
if err != nil {
s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err)
}
}
for _, sender := range s.senders {
sender.release()
s, ok := sender.(loadSender)
if !ok {
panic("sender is not a loadSender")
}
s.release()
}
return len(d), nil
}
@ -109,13 +113,8 @@ func (s *minimalHttpSender) send(d []byte) error {
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
type loadSender interface {
// load assigns the *ring.Chunk to the loadSender.
// The load call may fail, but must not mutate the
// the chunk.
load(d []byte) error
// send performs a destination-specific send
// operation. It must not mutate the chunk.
send() error
// release releases the *ring.Chunk.
@ -137,7 +136,19 @@ type fileSender struct {
data []byte
}
func newFileSender(path string) (*fileSender, error) {
func (s *fileSender) Write(d []byte) (int, error) {
err := s.load(d)
if err != nil {
return 0, err
}
err = s.send()
if err != nil {
return len(d), err
}
return len(d), nil
}
func newFileSender(path string) (io.Writer, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
@ -174,6 +185,10 @@ type mtsSender struct {
curPid int
}
func (s *mtsSender) Write(d []byte) (int, error) {
return write(s, d)
}
// newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
@ -357,6 +372,10 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
return s, err
}
func (s *rtmpSender) Write(d []byte) (int, error) {
return write(s, d)
}
func (s *rtmpSender) load(d []byte) error {
s.data = d
return nil
@ -406,6 +425,10 @@ type rtpSender struct {
data []byte
}
func (s *rtpSender) Write(d []byte) (int, error) {
return write(s, d)
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
@ -432,3 +455,15 @@ func (s *rtpSender) send() error {
_, err := s.encoder.Write(s.data)
return err
}
func write(s loadSender, d []byte) (int, error) {
err := s.load(d)
if err != nil {
return 0, err
}
err = s.send()
if err != nil {
return len(d), err
}
return len(d), nil
}

View File

@ -31,6 +31,7 @@ package revid
import (
"errors"
"fmt"
"io"
"sync"
"testing"
"time"
@ -271,6 +272,10 @@ func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender {
return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry}
}
func (s *dummyLoadSender) Write(d []byte) (int, error) {
return write(s, d)
}
// load takes a byte slice and assigns it to the dummyLoadSenders data slice.
func (s *dummyLoadSender) load(d []byte) error {
s.data = d
@ -315,7 +320,7 @@ func (s *dummyLoadSender) retrySend() bool { return s.retry }
// TestMultiSenderWrite checks that we can do basic writing to multiple senders
// using the multiSender.
func TestMultiSenderWrite(t *testing.T) {
senders := []loadSender{
senders := []io.Writer{
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),