diff --git a/revid/revid.go b/revid/revid.go index 25aeda69..3b2bc530 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -220,7 +220,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { - ms := multiSender(mtsSenders) + ms := newMultiSender(mtsSenders, r.config.Logger.Log) e := mtsEnc(ms, int(r.config.FrameRate)) r.encoder = append(r.encoder, e) } @@ -229,7 +229,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { - ms := multiSender(flvSenders) + ms := newMultiSender(flvSenders, r.config.Logger.Log) e, err := flvEnc(ms, int(r.config.FrameRate)) if err != nil { return err diff --git a/revid/revid_test.go b/revid/revid_test.go index b2dc87b1..4380a2bf 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -265,7 +265,8 @@ func TestResetEncoderSenderSetup(t *testing.T) { case flvEncoderStr: ms = e.(*tstFlvEncoder).dst } - senders := []loadSender(ms.(multiSender)) + + senders := ms.(*multiSender).senders got = len(senders) want = len(test.encoders[idx].destinations) if got != want { diff --git a/revid/senders.go b/revid/senders.go index 5f88c345..cd219712 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -52,21 +52,33 @@ type Sender interface { send(d []byte) error } +type Log func(level int8, message string, params ...interface{}) + // multiSender implements io.Writer. It provides the capacity to send to multiple // senders from a single Write call. -type multiSender []loadSender +type multiSender struct { + senders []loadSender + log Log +} + +func newMultiSender(senders []loadSender, log Log) *multiSender { + return &multiSender{ + senders: senders, + log: log, + } +} // Write implements io.Writer. This will call load (with the passed slice), send // and release on all senders of multiSender. -func (s multiSender) Write(d []byte) (int, error) { - for _, sender := range s { +func (s *multiSender) Write(d []byte) (int, error) { + for i, sender := range s.senders { sender.load(d) err := sender.send() if err != nil { - sender.handleSendFail(err) + s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err) } } - for _, sender := range s { + for _, sender := range s.senders { sender.release() } return len(d), nil @@ -109,9 +121,6 @@ type loadSender interface { // close cleans up after use of the loadSender. close() error - - // handleSendFail performs any actions necessary in response to a failed send. - handleSendFail(err error) error } // restart is an optional interface for loadSenders that @@ -148,8 +157,6 @@ func (s *fileSender) release() {} func (s *fileSender) close() error { return s.file.Close() } -func (s *fileSender) handleSendFail(err error) error { return nil } - // mtsSender implements loadSender and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately // lengthed clips based on PSI. It also fixes accounts for discontinuities by @@ -227,8 +234,6 @@ func (s *mtsSender) release() { } } -func (s *mtsSender) handleSendFail(err error) error { return nil } - // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender @@ -313,8 +318,6 @@ func (s *httpSender) release() {} func (s *httpSender) close() error { return nil } -func (s *httpSender) handleSendFail(err error) error { return nil } - // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -362,6 +365,9 @@ func (s *rtmpSender) send() error { return errors.New("no rtmp connection, cannot write") } _, err := s.conn.Write(s.data) + if err != nil { + err = s.restart() + } return err } @@ -390,8 +396,6 @@ func (s *rtmpSender) close() error { return nil } -func (s *rtmpSender) handleSendFail(err error) error { return s.restart() } - // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { @@ -426,5 +430,3 @@ func (s *rtpSender) send() error { _, err := s.encoder.Write(s.data) return err } - -func (s *rtpSender) handleSendFail(err error) error { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index c3432975..b78fee5e 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -31,7 +31,6 @@ package revid import ( "errors" "fmt" - "reflect" "sync" "testing" "time" @@ -321,7 +320,7 @@ func TestMultiSenderWrite(t *testing.T) { newDummyLoadSender(false, false), newDummyLoadSender(false, false), } - ms := multiSender(senders) + ms := newMultiSender(senders, log) // Perform some multiSender writes. const noOfWrites = 5 @@ -331,7 +330,7 @@ func TestMultiSenderWrite(t *testing.T) { // Check that the senders got the data correctly from the writes. for i := byte(0); i < noOfWrites; i++ { - for j, dest := range []loadSender(ms) { + for j, dest := range ms.senders { got := dest.(*dummyLoadSender).buf[i][0] if got != i { t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i) @@ -339,58 +338,3 @@ func TestMultiSenderWrite(t *testing.T) { } } } - -// TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender -// fails at a send and does not retry. -func TestMultiSenderFailNoRetry(t *testing.T) { - senders := []loadSender{ - newDummyLoadSender(false, false), - newDummyLoadSender(false, false), - newDummyLoadSender(false, false), - } - - ms := multiSender(senders) - - // We will perform two writes. We expect the second write not to be complete, - // i.e. the senders should not send anything on this write. - ms.Write([]byte{0x00}) - - // Make second sender fail a send. - const failedSenderIdx = 1 - failedSender := []loadSender(ms)[failedSenderIdx].(*dummyLoadSender) - failedSender.failOnSend = true - ms.Write([]byte{0x01}) - - // Check that handleSendFail was called. - if !failedSender.failHandled { - t.Fatal("the failed send was not handled") - } - - // Now for next send we don't want to fail. - failedSender.failOnSend = false - ms.Write([]byte{0x02}) - - // Check number of slices sent for each sender and also check data. - for i, sender := range []loadSender(ms) { - // First check number of slices sent for each sender. - wantLen := 3 - if i == failedSenderIdx { - wantLen = 2 - } - curSender := sender.(*dummyLoadSender) - gotLen := len(curSender.buf) - if gotLen != wantLen { - t.Errorf("len of sender that failed is not expected: \nGot: %v\nWant: %v\n", gotLen, wantLen) - } - - // Now check the quality of the data. - wantData := [][]byte{{0x00}, {0x01}, {0x02}} - if i == failedSenderIdx { - wantData = [][]byte{{0x00}, {0x02}} - } - gotData := curSender.buf - if !reflect.DeepEqual(gotData, wantData) { - t.Errorf("unexpect data sent through sender idx: %v. \nGot: %v\nWant: %v\n", i, gotData, wantData) - } - } -}