From 3896a5e80405b08091113882146c3d132894062b Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 29 Mar 2019 12:49:26 +1030 Subject: [PATCH] revid: senders are now io.Writers Added a Write method to senders such that they implement io.Writer. The multiSender now takes a slice of io.writers. Also modified revid code and tests to account for this chance. --- revid/revid.go | 4 +-- revid/revid_test.go | 2 +- revid/senders.go | 57 ++++++++++++++++++++++++++++++++++--------- revid/senders_test.go | 7 +++++- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index adb60a92..bcf0d759 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -184,12 +184,12 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []loadSender + var mtsSenders, flvSenders []io.Writer // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var sender loadSender + var sender io.Writer for _, out := range r.config.Outputs { switch out { case Http: diff --git a/revid/revid_test.go b/revid/revid_test.go index 4380a2bf..269718a7 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -278,7 +278,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { ok := false for _, dst := range senders { // Get type of sender. - senderType, err := typeOfSender(dst) + senderType, err := typeOfSender(dst.(loadSender)) if err != nil { t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err) } diff --git a/revid/senders.go b/revid/senders.go index c8ee91f5..44e2db34 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -31,6 +31,7 @@ package revid import ( "errors" "fmt" + "io" "net" "os" "strconv" @@ -58,12 +59,12 @@ type Log func(level int8, message string, params ...interface{}) // multiSender implements io.Writer. It provides the capacity to send to multiple // senders from a single Write call. type multiSender struct { - senders []loadSender + senders []io.Writer log Log } // newMultiSender returns a pointer to a new multiSender. -func newMultiSender(senders []loadSender, log Log) *multiSender { +func newMultiSender(senders []io.Writer, log Log) *multiSender { return &multiSender{ senders: senders, log: log, @@ -74,14 +75,17 @@ func newMultiSender(senders []loadSender, log Log) *multiSender { // and release on all senders of multiSender. func (s *multiSender) Write(d []byte) (int, error) { for i, sender := range s.senders { - sender.load(d) - err := sender.send() + _, err := sender.Write(d) if err != nil { s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err) } } for _, sender := range s.senders { - sender.release() + s, ok := sender.(loadSender) + if !ok { + panic("sender is not a loadSender") + } + s.release() } return len(d), nil } @@ -109,13 +113,8 @@ func (s *minimalHttpSender) send(d []byte) error { // When a loadSender has finished using the *ring.Chunk // it must be Closed. type loadSender interface { - // load assigns the *ring.Chunk to the loadSender. - // The load call may fail, but must not mutate the - // the chunk. load(d []byte) error - // send performs a destination-specific send - // operation. It must not mutate the chunk. send() error // release releases the *ring.Chunk. @@ -137,7 +136,19 @@ type fileSender struct { data []byte } -func newFileSender(path string) (*fileSender, error) { +func (s *fileSender) Write(d []byte) (int, error) { + err := s.load(d) + if err != nil { + return 0, err + } + err = s.send() + if err != nil { + return len(d), err + } + return len(d), nil +} + +func newFileSender(path string) (io.Writer, error) { f, err := os.Create(path) if err != nil { return nil, err @@ -174,6 +185,10 @@ type mtsSender struct { curPid int } +func (s *mtsSender) Write(d []byte) (int, error) { + return write(s, d) +} + // newMtsSender returns a new mtsSender. func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ @@ -357,6 +372,10 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg return s, err } +func (s *rtmpSender) Write(d []byte) (int, error) { + return write(s, d) +} + func (s *rtmpSender) load(d []byte) error { s.data = d return nil @@ -406,6 +425,10 @@ type rtpSender struct { data []byte } +func (s *rtpSender) Write(d []byte) (int, error) { + return write(s, d) +} + func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { @@ -432,3 +455,15 @@ func (s *rtpSender) send() error { _, err := s.encoder.Write(s.data) return err } + +func write(s loadSender, d []byte) (int, error) { + err := s.load(d) + if err != nil { + return 0, err + } + err = s.send() + if err != nil { + return len(d), err + } + return len(d), nil +} diff --git a/revid/senders_test.go b/revid/senders_test.go index 2f464de0..d327077e 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -31,6 +31,7 @@ package revid import ( "errors" "fmt" + "io" "sync" "testing" "time" @@ -271,6 +272,10 @@ func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender { return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry} } +func (s *dummyLoadSender) Write(d []byte) (int, error) { + return write(s, d) +} + // load takes a byte slice and assigns it to the dummyLoadSenders data slice. func (s *dummyLoadSender) load(d []byte) error { s.data = d @@ -315,7 +320,7 @@ func (s *dummyLoadSender) retrySend() bool { return s.retry } // TestMultiSenderWrite checks that we can do basic writing to multiple senders // using the multiSender. func TestMultiSenderWrite(t *testing.T) { - senders := []loadSender{ + senders := []io.Writer{ newDummyLoadSender(false, false), newDummyLoadSender(false, false), newDummyLoadSender(false, false),