Merged in senders-are-writers (pull request #176)

revid: senders are now io.Writers

Approved-by: kortschak <dan@kortschak.io>
Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2019-03-30 05:57:08 +00:00
commit ec3e0df977
4 changed files with 81 additions and 63 deletions

View File

@ -184,35 +184,35 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
// will hold senders that require FLV encoding. // will hold senders that require FLV encoding.
var mtsSenders, flvSenders []loadSender var mtsSenders, flvSenders []io.Writer
// We will go through our outputs and create the corresponding senders to add // We will go through our outputs and create the corresponding senders to add
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
// output requires FLV encoding. // output requires FLV encoding.
var sender loadSender var w io.Writer
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case Http: case Http:
sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) w = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)
mtsSenders = append(mtsSenders, sender) mtsSenders = append(mtsSenders, w)
case Rtp: case Rtp:
sender, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error())
} }
mtsSenders = append(mtsSenders, sender) mtsSenders = append(mtsSenders, w)
case File: case File:
sender, err := newFileSender(r.config.OutputPath) w, err := newFileSender(r.config.OutputPath)
if err != nil { if err != nil {
return err return err
} }
mtsSenders = append(mtsSenders, sender) mtsSenders = append(mtsSenders, w)
case Rtmp: case Rtmp:
sender, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
} }
flvSenders = append(flvSenders, sender) flvSenders = append(flvSenders, w)
} }
} }

View File

@ -1,7 +1,6 @@
package revid package revid
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -100,11 +99,11 @@ func (e *tstFlvEncoder) Write(d []byte) (int, error) { return 0, nil }
func TestResetEncoderSenderSetup(t *testing.T) { func TestResetEncoderSenderSetup(t *testing.T) {
// We will use these to indicate types after assertion. // We will use these to indicate types after assertion.
const ( const (
mtsSenderStr = "revid.mtsSender" mtsSenderStr = "*revid.mtsSender"
rtpSenderStr = "revid.rtpSender" rtpSenderStr = "*revid.rtpSender"
rtmpSenderStr = "revid.RtmpSender" rtmpSenderStr = "*revid.rtmpSender"
mtsEncoderStr = "mts.Encoder" mtsEncoderStr = "*revid.tstMtsEncoder"
flvEncoderStr = "flv.Encoder" flvEncoderStr = "*revid.tstFlvEncoder"
) )
// Struct that will be used to format test cases nicely below. // Struct that will be used to format test cases nicely below.
@ -185,31 +184,6 @@ func TestResetEncoderSenderSetup(t *testing.T) {
}, },
} }
// typeOfEncoder will return the type of encoder implementing stream.Encoder.
typeOfEncoder := func(i io.Writer) (string, error) {
if _, ok := i.(*tstMtsEncoder); ok {
return mtsEncoderStr, nil
}
if _, ok := i.(*tstFlvEncoder); ok {
return flvEncoderStr, nil
}
return "", errors.New("unknown Encoder type")
}
// typeOfSender will return the type of sender implementing loadSender.
typeOfSender := func(s loadSender) (string, error) {
if _, ok := s.(*mtsSender); ok {
return mtsSenderStr, nil
}
if _, ok := s.(*rtpSender); ok {
return rtpSenderStr, nil
}
if _, ok := s.(*rtmpSender); ok {
return rtmpSenderStr, nil
}
return "", errors.New("unknown loadSender type")
}
rv, err := New(Config{Logger: &testLogger{}}, nil) rv, err := New(Config{Logger: &testLogger{}}, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
@ -241,10 +215,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
// Now check the correctness of encoders and their destinations. // Now check the correctness of encoders and their destinations.
for _, e := range rv.encoder { for _, e := range rv.encoder {
// Get e's type. // Get e's type.
encoderType, err := typeOfEncoder(e) encoderType := fmt.Sprintf("%T", e)
if err != nil {
t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err)
}
// Check that we expect this encoder to be here. // Check that we expect this encoder to be here.
idx := -1 idx := -1
@ -266,7 +237,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
ms = e.(*tstFlvEncoder).dst ms = e.(*tstFlvEncoder).dst
} }
senders := ms.(*multiSender).senders senders := ms.(*multiSender).dst
got = len(senders) got = len(senders)
want = len(test.encoders[idx].destinations) want = len(test.encoders[idx].destinations)
if got != want { if got != want {
@ -278,10 +249,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
ok := false ok := false
for _, dst := range senders { for _, dst := range senders {
// Get type of sender. // Get type of sender.
senderType, err := typeOfSender(dst) senderType := fmt.Sprintf("%T", dst)
if err != nil {
t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err)
}
// If it's one we want, indicate. // If it's one we want, indicate.
if senderType == expectDst { if senderType == expectDst {

View File

@ -31,6 +31,7 @@ package revid
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"os" "os"
"strconv" "strconv"
@ -58,14 +59,14 @@ type Log func(level int8, message string, params ...interface{})
// multiSender implements io.Writer. It provides the capacity to send to multiple // multiSender implements io.Writer. It provides the capacity to send to multiple
// senders from a single Write call. // senders from a single Write call.
type multiSender struct { type multiSender struct {
senders []loadSender dst []io.Writer
log Log log Log
} }
// newMultiSender returns a pointer to a new multiSender. // newMultiSender returns a pointer to a new multiSender.
func newMultiSender(senders []loadSender, log Log) *multiSender { func newMultiSender(senders []io.Writer, log Log) *multiSender {
return &multiSender{ return &multiSender{
senders: senders, dst: senders,
log: log, log: log,
} }
} }
@ -73,15 +74,18 @@ func newMultiSender(senders []loadSender, log Log) *multiSender {
// Write implements io.Writer. This will call load (with the passed slice), send // Write implements io.Writer. This will call load (with the passed slice), send
// and release on all senders of multiSender. // and release on all senders of multiSender.
func (s *multiSender) Write(d []byte) (int, error) { func (s *multiSender) Write(d []byte) (int, error) {
for i, sender := range s.senders { for i, sender := range s.dst {
sender.load(d) _, err := sender.Write(d)
err := sender.send()
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err) s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err)
} }
} }
for _, sender := range s.senders { for _, sender := range s.dst {
sender.release() s, ok := sender.(loadSender)
if !ok {
panic("sender is not a loadSender")
}
s.release()
} }
return len(d), nil return len(d), nil
} }
@ -137,7 +141,20 @@ type fileSender struct {
data []byte data []byte
} }
func newFileSender(path string) (*fileSender, error) { // Write implements io.Writer.
func (s *fileSender) Write(d []byte) (int, error) {
err := s.load(d)
if err != nil {
return 0, err
}
err = s.send()
if err != nil {
return len(d), err
}
return len(d), nil
}
func newFileSender(path string) (io.Writer, error) {
f, err := os.Create(path) f, err := os.Create(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -174,6 +191,11 @@ type mtsSender struct {
curPid int curPid int
} }
// Write implements io.Writer.
func (s *mtsSender) Write(d []byte) (int, error) {
return write(s, d)
}
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{ return &mtsSender{
@ -357,6 +379,11 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
return s, err return s, err
} }
// Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) {
return write(s, d)
}
func (s *rtmpSender) load(d []byte) error { func (s *rtmpSender) load(d []byte) error {
s.data = d s.data = d
return nil return nil
@ -406,6 +433,11 @@ type rtpSender struct {
data []byte data []byte
} }
// Write implements io.Writer.
func (s *rtpSender) Write(d []byte) (int, error) {
return write(s, d)
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
conn, err := net.Dial("udp", addr) conn, err := net.Dial("udp", addr)
if err != nil { if err != nil {
@ -432,3 +464,16 @@ func (s *rtpSender) send() error {
_, err := s.encoder.Write(s.data) _, err := s.encoder.Write(s.data)
return err return err
} }
// write wraps the load and send method for loadSenders.
func write(s loadSender, d []byte) (int, error) {
err := s.load(d)
if err != nil {
return 0, err
}
err = s.send()
if err != nil {
return len(d), err
}
return len(d), nil
}

View File

@ -31,6 +31,7 @@ package revid
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -271,6 +272,10 @@ func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender {
return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry} return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry}
} }
func (s *dummyLoadSender) Write(d []byte) (int, error) {
return write(s, d)
}
// load takes a byte slice and assigns it to the dummyLoadSenders data slice. // load takes a byte slice and assigns it to the dummyLoadSenders data slice.
func (s *dummyLoadSender) load(d []byte) error { func (s *dummyLoadSender) load(d []byte) error {
s.data = d s.data = d
@ -315,7 +320,7 @@ func (s *dummyLoadSender) retrySend() bool { return s.retry }
// TestMultiSenderWrite checks that we can do basic writing to multiple senders // TestMultiSenderWrite checks that we can do basic writing to multiple senders
// using the multiSender. // using the multiSender.
func TestMultiSenderWrite(t *testing.T) { func TestMultiSenderWrite(t *testing.T) {
senders := []loadSender{ senders := []io.Writer{
newDummyLoadSender(false, false), newDummyLoadSender(false, false),
newDummyLoadSender(false, false), newDummyLoadSender(false, false),
newDummyLoadSender(false, false), newDummyLoadSender(false, false),
@ -330,7 +335,7 @@ func TestMultiSenderWrite(t *testing.T) {
// Check that the senders got the data correctly from the writes. // Check that the senders got the data correctly from the writes.
for i := byte(0); i < noOfWrites; i++ { for i := byte(0); i < noOfWrites; i++ {
for j, dest := range ms.senders { for j, dest := range ms.dst {
got := dest.(*dummyLoadSender).buf[i][0] got := dest.(*dummyLoadSender).buf[i][0]
if got != i { if got != i {
t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i) t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i)