From 3896a5e80405b08091113882146c3d132894062b Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 29 Mar 2019 12:49:26 +1030 Subject: [PATCH 1/6] revid: senders are now io.Writers Added a Write method to senders such that they implement io.Writer. The multiSender now takes a slice of io.writers. Also modified revid code and tests to account for this chance. --- revid/revid.go | 4 +-- revid/revid_test.go | 2 +- revid/senders.go | 57 ++++++++++++++++++++++++++++++++++--------- revid/senders_test.go | 7 +++++- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index adb60a92..bcf0d759 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -184,12 +184,12 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []loadSender + var mtsSenders, flvSenders []io.Writer // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var sender loadSender + var sender io.Writer for _, out := range r.config.Outputs { switch out { case Http: diff --git a/revid/revid_test.go b/revid/revid_test.go index 4380a2bf..269718a7 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -278,7 +278,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { ok := false for _, dst := range senders { // Get type of sender. - senderType, err := typeOfSender(dst) + senderType, err := typeOfSender(dst.(loadSender)) if err != nil { t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err) } diff --git a/revid/senders.go b/revid/senders.go index c8ee91f5..44e2db34 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -31,6 +31,7 @@ package revid import ( "errors" "fmt" + "io" "net" "os" "strconv" @@ -58,12 +59,12 @@ type Log func(level int8, message string, params ...interface{}) // multiSender implements io.Writer. It provides the capacity to send to multiple // senders from a single Write call. type multiSender struct { - senders []loadSender + senders []io.Writer log Log } // newMultiSender returns a pointer to a new multiSender. -func newMultiSender(senders []loadSender, log Log) *multiSender { +func newMultiSender(senders []io.Writer, log Log) *multiSender { return &multiSender{ senders: senders, log: log, @@ -74,14 +75,17 @@ func newMultiSender(senders []loadSender, log Log) *multiSender { // and release on all senders of multiSender. func (s *multiSender) Write(d []byte) (int, error) { for i, sender := range s.senders { - sender.load(d) - err := sender.send() + _, err := sender.Write(d) if err != nil { s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err) } } for _, sender := range s.senders { - sender.release() + s, ok := sender.(loadSender) + if !ok { + panic("sender is not a loadSender") + } + s.release() } return len(d), nil } @@ -109,13 +113,8 @@ func (s *minimalHttpSender) send(d []byte) error { // When a loadSender has finished using the *ring.Chunk // it must be Closed. type loadSender interface { - // load assigns the *ring.Chunk to the loadSender. - // The load call may fail, but must not mutate the - // the chunk. load(d []byte) error - // send performs a destination-specific send - // operation. It must not mutate the chunk. send() error // release releases the *ring.Chunk. @@ -137,7 +136,19 @@ type fileSender struct { data []byte } -func newFileSender(path string) (*fileSender, error) { +func (s *fileSender) Write(d []byte) (int, error) { + err := s.load(d) + if err != nil { + return 0, err + } + err = s.send() + if err != nil { + return len(d), err + } + return len(d), nil +} + +func newFileSender(path string) (io.Writer, error) { f, err := os.Create(path) if err != nil { return nil, err @@ -174,6 +185,10 @@ type mtsSender struct { curPid int } +func (s *mtsSender) Write(d []byte) (int, error) { + return write(s, d) +} + // newMtsSender returns a new mtsSender. func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ @@ -357,6 +372,10 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg return s, err } +func (s *rtmpSender) Write(d []byte) (int, error) { + return write(s, d) +} + func (s *rtmpSender) load(d []byte) error { s.data = d return nil @@ -406,6 +425,10 @@ type rtpSender struct { data []byte } +func (s *rtpSender) Write(d []byte) (int, error) { + return write(s, d) +} + func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { @@ -432,3 +455,15 @@ func (s *rtpSender) send() error { _, err := s.encoder.Write(s.data) return err } + +func write(s loadSender, d []byte) (int, error) { + err := s.load(d) + if err != nil { + return 0, err + } + err = s.send() + if err != nil { + return len(d), err + } + return len(d), nil +} diff --git a/revid/senders_test.go b/revid/senders_test.go index 2f464de0..d327077e 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -31,6 +31,7 @@ package revid import ( "errors" "fmt" + "io" "sync" "testing" "time" @@ -271,6 +272,10 @@ func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender { return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry} } +func (s *dummyLoadSender) Write(d []byte) (int, error) { + return write(s, d) +} + // load takes a byte slice and assigns it to the dummyLoadSenders data slice. func (s *dummyLoadSender) load(d []byte) error { s.data = d @@ -315,7 +320,7 @@ func (s *dummyLoadSender) retrySend() bool { return s.retry } // TestMultiSenderWrite checks that we can do basic writing to multiple senders // using the multiSender. func TestMultiSenderWrite(t *testing.T) { - senders := []loadSender{ + senders := []io.Writer{ newDummyLoadSender(false, false), newDummyLoadSender(false, false), newDummyLoadSender(false, false), From 648b43c50a43b2195918894e7853f204b8b1fe0d Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 29 Mar 2019 13:29:31 +1030 Subject: [PATCH 2/6] revid: added some commentary --- revid/senders.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/revid/senders.go b/revid/senders.go index 44e2db34..959fba21 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -113,8 +113,13 @@ func (s *minimalHttpSender) send(d []byte) error { // When a loadSender has finished using the *ring.Chunk // it must be Closed. type loadSender interface { + // load assigns the *ring.Chunk to the loadSender. + // The load call may fail, but must not mutate the + // the chunk. load(d []byte) error + // send performs a destination-specific send + // operation. It must not mutate the chunk. send() error // release releases the *ring.Chunk. @@ -136,6 +141,7 @@ type fileSender struct { data []byte } +// Write implements io.Writer. This will attempt to load and send the data given. func (s *fileSender) Write(d []byte) (int, error) { err := s.load(d) if err != nil { @@ -185,6 +191,7 @@ type mtsSender struct { curPid int } +// Write implements io.Writer. This will attempt to load and send the data given. func (s *mtsSender) Write(d []byte) (int, error) { return write(s, d) } @@ -372,6 +379,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg return s, err } +// Write implements io.Writer. This will attempt to load and send the data given. func (s *rtmpSender) Write(d []byte) (int, error) { return write(s, d) } @@ -425,6 +433,7 @@ type rtpSender struct { data []byte } +// Write implements io.Writer. This will attempt to load and send the data given. func (s *rtpSender) Write(d []byte) (int, error) { return write(s, d) } @@ -456,6 +465,7 @@ func (s *rtpSender) send() error { return err } +// write wraps the load and send method for loadSenders. func write(s loadSender, d []byte) (int, error) { err := s.load(d) if err != nil { From 7c724c9fc31b9062544f769e8f288f22b2ada312 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 29 Mar 2019 16:17:11 +1030 Subject: [PATCH 3/6] revid: simplified determination of sender types --- revid/revid_test.go | 46 +++++++-------------------------------------- 1 file changed, 7 insertions(+), 39 deletions(-) diff --git a/revid/revid_test.go b/revid/revid_test.go index 269718a7..14519fd8 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -1,7 +1,6 @@ package revid import ( - "errors" "fmt" "io" "os" @@ -100,11 +99,11 @@ func (e *tstFlvEncoder) Write(d []byte) (int, error) { return 0, nil } func TestResetEncoderSenderSetup(t *testing.T) { // We will use these to indicate types after assertion. const ( - mtsSenderStr = "revid.mtsSender" - rtpSenderStr = "revid.rtpSender" - rtmpSenderStr = "revid.RtmpSender" - mtsEncoderStr = "mts.Encoder" - flvEncoderStr = "flv.Encoder" + mtsSenderStr = "*revid.mtsSender" + rtpSenderStr = "*revid.rtpSender" + rtmpSenderStr = "*revid.rtmpSender" + mtsEncoderStr = "*revid.tstMtsEncoder" + flvEncoderStr = "*revid.tstFlvEncoder" ) // Struct that will be used to format test cases nicely below. @@ -185,31 +184,6 @@ func TestResetEncoderSenderSetup(t *testing.T) { }, } - // typeOfEncoder will return the type of encoder implementing stream.Encoder. - typeOfEncoder := func(i io.Writer) (string, error) { - if _, ok := i.(*tstMtsEncoder); ok { - return mtsEncoderStr, nil - } - if _, ok := i.(*tstFlvEncoder); ok { - return flvEncoderStr, nil - } - return "", errors.New("unknown Encoder type") - } - - // typeOfSender will return the type of sender implementing loadSender. - typeOfSender := func(s loadSender) (string, error) { - if _, ok := s.(*mtsSender); ok { - return mtsSenderStr, nil - } - if _, ok := s.(*rtpSender); ok { - return rtpSenderStr, nil - } - if _, ok := s.(*rtmpSender); ok { - return rtmpSenderStr, nil - } - return "", errors.New("unknown loadSender type") - } - rv, err := New(Config{Logger: &testLogger{}}, nil) if err != nil { t.Fatalf("unexpected err: %v", err) @@ -241,10 +215,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { // Now check the correctness of encoders and their destinations. for _, e := range rv.encoder { // Get e's type. - encoderType, err := typeOfEncoder(e) - if err != nil { - t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err) - } + encoderType := fmt.Sprintf("%T", e) // Check that we expect this encoder to be here. idx := -1 @@ -278,10 +249,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { ok := false for _, dst := range senders { // Get type of sender. - senderType, err := typeOfSender(dst.(loadSender)) - if err != nil { - t.Fatalf("could not get encoders type for test %v, failed with err: %v", testNum, err) - } + senderType := fmt.Sprintf("%T", dst.(loadSender)) // If it's one we want, indicate. if senderType == expectDst { From 8cc7f6e500afb3c3502f19c58af1582ff79ede57 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 29 Mar 2019 16:22:51 +1030 Subject: [PATCH 4/6] revid: improved some naming --- revid/revid.go | 18 +++++++++--------- revid/revid_test.go | 2 +- revid/senders.go | 12 ++++++------ revid/senders_test.go | 2 +- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index bcf0d759..e6bc2f3a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -189,30 +189,30 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. - var sender io.Writer + var w io.Writer for _, out := range r.config.Outputs { switch out { case Http: - sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) - mtsSenders = append(mtsSenders, sender) + w = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) + mtsSenders = append(mtsSenders, w) case Rtp: - sender, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } - mtsSenders = append(mtsSenders, sender) + mtsSenders = append(mtsSenders, w) case File: - sender, err := newFileSender(r.config.OutputPath) + w, err := newFileSender(r.config.OutputPath) if err != nil { return err } - mtsSenders = append(mtsSenders, sender) + mtsSenders = append(mtsSenders, w) case Rtmp: - sender, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } - flvSenders = append(flvSenders, sender) + flvSenders = append(flvSenders, w) } } diff --git a/revid/revid_test.go b/revid/revid_test.go index 14519fd8..91f5d329 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -237,7 +237,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { ms = e.(*tstFlvEncoder).dst } - senders := ms.(*multiSender).senders + senders := ms.(*multiSender).dst got = len(senders) want = len(test.encoders[idx].destinations) if got != want { diff --git a/revid/senders.go b/revid/senders.go index 959fba21..a8b77c35 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -59,28 +59,28 @@ type Log func(level int8, message string, params ...interface{}) // multiSender implements io.Writer. It provides the capacity to send to multiple // senders from a single Write call. type multiSender struct { - senders []io.Writer - log Log + dst []io.Writer + log Log } // newMultiSender returns a pointer to a new multiSender. func newMultiSender(senders []io.Writer, log Log) *multiSender { return &multiSender{ - senders: senders, - log: log, + dst: senders, + log: log, } } // Write implements io.Writer. This will call load (with the passed slice), send // and release on all senders of multiSender. func (s *multiSender) Write(d []byte) (int, error) { - for i, sender := range s.senders { + for i, sender := range s.dst { _, err := sender.Write(d) if err != nil { s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err) } } - for _, sender := range s.senders { + for _, sender := range s.dst { s, ok := sender.(loadSender) if !ok { panic("sender is not a loadSender") diff --git a/revid/senders_test.go b/revid/senders_test.go index d327077e..a435e512 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -335,7 +335,7 @@ func TestMultiSenderWrite(t *testing.T) { // Check that the senders got the data correctly from the writes. for i := byte(0); i < noOfWrites; i++ { - for j, dest := range ms.senders { + for j, dest := range ms.dst { got := dest.(*dummyLoadSender).buf[i][0] if got != i { t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i) From 86971ca0557e38244534a2a9408e3d81ed2448d3 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 29 Mar 2019 16:24:47 +1030 Subject: [PATCH 5/6] revid: simplified comments for sender Write methods --- revid/senders.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index a8b77c35..53ae7e45 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -141,7 +141,7 @@ type fileSender struct { data []byte } -// Write implements io.Writer. This will attempt to load and send the data given. +// Write implements io.Writer. func (s *fileSender) Write(d []byte) (int, error) { err := s.load(d) if err != nil { @@ -191,7 +191,7 @@ type mtsSender struct { curPid int } -// Write implements io.Writer. This will attempt to load and send the data given. +// Write implements io.Writer. func (s *mtsSender) Write(d []byte) (int, error) { return write(s, d) } @@ -379,7 +379,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg return s, err } -// Write implements io.Writer. This will attempt to load and send the data given. +// Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { return write(s, d) } @@ -433,7 +433,7 @@ type rtpSender struct { data []byte } -// Write implements io.Writer. This will attempt to load and send the data given. +// Write implements io.Writer. func (s *rtpSender) Write(d []byte) (int, error) { return write(s, d) } From afe2948cf78df5c019453f7e9f27505fdf7d7b97 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 30 Mar 2019 10:08:27 +1030 Subject: [PATCH 6/6] revid: removed unnecessary assertion --- revid/revid_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid_test.go b/revid/revid_test.go index 91f5d329..fbd13e72 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -249,7 +249,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { ok := false for _, dst := range senders { // Get type of sender. - senderType := fmt.Sprintf("%T", dst.(loadSender)) + senderType := fmt.Sprintf("%T", dst) // If it's one we want, indicate. if senderType == expectDst {