diff --git a/revid/revid.go b/revid/revid.go index 3a3e28f7..acdcbbba 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -192,17 +192,13 @@ func (r *Revid) reset(config Config) error { for _, out := range r.config.Outputs { switch out { case Http: - retry := false - if len(r.config.Outputs) == 1 { - retry = true - } - sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), retry, nil) + sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) case Rtp: sender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) case File: sender, err = newFileSender(r.config.OutputPath) case Rtmp: - sender, _ = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, false, r.config.Logger.Log) + sender, _ = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) flvSenders = append(flvSenders, sender) continue } @@ -216,7 +212,7 @@ func (r *Revid) reset(config Config) error { // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { - ms := newMultiSender(mtsSenders, r.IsRunning) + ms := newMultiSender(mtsSenders) if err != nil { return err } @@ -228,7 +224,7 @@ func (r *Revid) reset(config Config) error { // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { - ms := newMultiSender(flvSenders, r.IsRunning) + ms := newMultiSender(flvSenders) e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate)) if err != nil { return err diff --git a/revid/senders.go b/revid/senders.go index 47ab02f5..1466fda2 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -55,21 +55,16 @@ type Sender interface { // multiSender allows for the sending through multi loadSenders using a single // call to multiSender.Write. type multiSender struct { - isActive func() bool - senders []loadSender - retry bool + senders []loadSender + retry bool } // newMultiSender returns a pointer to a new multiSender. active is a function // to indicate the state of the multiSenders owner i.e. whether it is running // or not. -func newMultiSender(senders []loadSender, active func() bool) *multiSender { - if active == nil { - panic("multi sender requires that active func is provided") - } +func newMultiSender(senders []loadSender) *multiSender { s := &multiSender{ - senders: senders, - isActive: active, + senders: senders, } return s } @@ -80,14 +75,9 @@ func newMultiSender(senders []loadSender, active func() bool) *multiSender { func (s *multiSender) Write(d []byte) (int, error) { for _, sender := range s.senders { sender.load(d) - for s.isActive() { - err := sender.send() - if err != nil { - sender.handleSendFail(err) - } - if err == nil || !sender.retrySend() { - break - } + err := sender.send() + if err != nil { + sender.handleSendFail(err) } } @@ -138,9 +128,6 @@ type loadSender interface { // handleSendFail performs any actions necessary in response to a failed send. handleSendFail(err error) error - - // retry returns true if this sender has been set for send retry. - retrySend() bool } // restart is an optional interface for loadSenders that @@ -198,11 +185,10 @@ type mtsSender struct { } // newMtsSender returns a new mtsSender. -func newMtsSender(s Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *mtsSender { +func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ sender: s, repairer: mts.NewDiscontinuityRepairer(), - retry: retry, } } @@ -262,10 +248,6 @@ func (s *mtsSender) release() { func (s *mtsSender) handleSendFail(err error) error { return nil } -func (s *mtsSender) retrySend() bool { - return s.retry -} - // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender @@ -273,11 +255,9 @@ type httpSender struct { log func(lvl int8, msg string, args ...interface{}) data []byte - - retry bool } -func newHttpSender(ns *netsender.Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *httpSender { +func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { return &httpSender{ client: ns, log: log, @@ -354,8 +334,6 @@ func (s *httpSender) close() error { return nil } func (s *httpSender) handleSendFail(err error) error { return nil } -func (s *httpSender) retrySend() bool { return s.retry } - // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -365,13 +343,12 @@ type rtmpSender struct { retries int log func(lvl int8, msg string, args ...interface{}) - data []byte - retry bool + data []byte } var _ restarter = (*rtmpSender)(nil) -func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -390,7 +367,6 @@ func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(l timeout: timeout, retries: retries, log: log, - retry: retry, } return s, err } @@ -435,8 +411,6 @@ func (s *rtmpSender) close() error { func (s *rtmpSender) handleSendFail(err error) error { return s.restart() } -func (s *rtmpSender) retrySend() bool { return s.retry } - // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { @@ -473,5 +447,3 @@ func (s *rtpSender) send() error { } func (s *rtpSender) handleSendFail(err error) error { return nil } - -func (s *rtpSender) retrySend() bool { return false } diff --git a/revid/senders_test.go b/revid/senders_test.go index 82991a3f..0ae222d5 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -110,7 +110,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, loadsender and the MPEGTS encoder. tstSender := &sender{} - loadSender := newMtsSender(tstSender, false, log) + loadSender := newMtsSender(tstSender, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder((*buffer)(rb), 25) @@ -200,7 +200,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create ringBuffer sender, loadSender and the MPEGTS encoder. const clipWithDiscontinuity = 3 tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstSender, false, log) + loadSender := newMtsSender(tstSender, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder((*buffer)(rb), 25) @@ -321,7 +321,7 @@ func TestMultiSenderWrite(t *testing.T) { newDummyLoadSender(false, false), newDummyLoadSender(false, false), } - ms := newMultiSender(senders, func() bool { return true }) + ms := newMultiSender(senders) // Perform some multiSender writes. const noOfWrites = 5 @@ -340,85 +340,6 @@ func TestMultiSenderWrite(t *testing.T) { } } -// TestMultiSenderNotActiveNoRetry checks that if the active func passed to -// newMultiSender returns false before a write, or in the middle of write with -// retries, then we return from Write as expected. -func TestMultiSenderNotActiveNoRetry(t *testing.T) { - senders := []loadSender{ - newDummyLoadSender(false, false), - newDummyLoadSender(false, false), - newDummyLoadSender(false, false), - } - - // This will allow us to simulate a change in running state of - // multiSender's 'owner'. - active := true - activeFunc := func() bool { - return active - } - - ms := newMultiSender(senders, activeFunc) - - // We will perform two writes. We expect the second write not to be complete, - // i.e. the senders should not send anything on this write. - ms.Write([]byte{0x00}) - active = false - ms.Write([]byte{0x01}) - - // Check that the senders only sent data once. - for _, dest := range ms.senders { - if len(dest.(*dummyLoadSender).buf) != 1 { - t.Errorf("length of sender buf is not 1 as expected") - } - } -} - -// TestMultiSenderNotActiveRetry checks that we correctly returns from a call to -// multiSender.Write when the active callback func return false during repeated -// send retries. -func TestMultiSenderNotActiveRetry(t *testing.T) { - senders := []loadSender{ - newDummyLoadSender(false, false), - } - - // Active will simulate the running state of the multiSender's 'owner'. - active := true - - // We will run the ms.Write as routine so we need some sync. - var mu sync.Mutex - - // Once we use setActive to change the state of the fake owner, this will - // return false and we expect the ms.Write method to return from the continous - // send retry state. - activeFunc := func() bool { - mu.Lock() - defer mu.Unlock() - return active - } - - ms := newMultiSender(senders, activeFunc) - - // We run this in background so that we can change running state during the - // the write. We then expect done to be true after some period of time. - done := false - go func() { - ms.Write([]byte{0x00}) - done = true - }() - - // Wait for half a second and then change the active state. - time.Sleep(500 * time.Millisecond) - mu.Lock() - active = false - mu.Unlock() - - // Wait half a second for the routine to return and check that done is true. - time.Sleep(500 * time.Millisecond) - if !done { - t.Fatal("multiSender.Write did not return as expected with active=false") - } -} - // TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender // fails at a send and does not retry. func TestMultiSenderFailNoRetry(t *testing.T) { @@ -428,7 +349,7 @@ func TestMultiSenderFailNoRetry(t *testing.T) { newDummyLoadSender(false, false), } - ms := newMultiSender(senders, func() bool { return true }) + ms := newMultiSender(senders) // We will perform two writes. We expect the second write not to be complete, // i.e. the senders should not send anything on this write. @@ -473,47 +394,3 @@ func TestMultiSenderFailNoRetry(t *testing.T) { } } } - -// TestMultiSenderFailRetry checks that a if a sender is set to retry on failed -// sends, that it does so repeatedly until it can successfully send. -func TestMultiSenderFailRetry(t *testing.T) { - // NB: This is only being tested with one sender - this is AusOcean's use case. - senders := []loadSender{newDummyLoadSender(false, true)} - ms := newMultiSender(senders, func() bool { return true }) - - // Perform one write with successful send. - ms.Write([]byte{0x00}) - - // Now cause sender to fail on next write. - failedSender := ms.senders[0].(*dummyLoadSender) - failedSender.failOnSend = true - - // Wrap next write in a routine. It will keep trying to send until we set - // failOnSend to false. - done := false - go func() { - ms.Write([]byte{0x01}) - done = true - }() - - // Now set failOnSend to false. - failedSender.mu.Lock() - failedSender.failOnSend = false - failedSender.mu.Unlock() - - // Sleep and then check that we've successfully returned from the write. - time.Sleep(10 * time.Millisecond) - if done != true { - t.Fatal("did not exit write when send was successful") - } - - // Write on last time. - ms.Write([]byte{0x02}) - - // Check that all the data is there. - got := failedSender.buf - want := [][]byte{{0x00}, {0x01}, {0x02}} - if !reflect.DeepEqual(got, want) { - t.Errorf("sender did not send expected data. \nGot: %v\nWant: %v\n", got, want) - } -}