diff --git a/revid/revid.go b/revid/revid.go index 43782a71..3d8994d0 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -188,18 +188,20 @@ func (r *Revid) reset(config Config) error { // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. var sender loadSender - var retry bool for _, out := range r.config.Outputs { switch out { case Http: - retry = true - sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) + retry := false + if len(r.config.Outputs) == 1 { + retry = true + } + sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), retry, nil) case Rtp: sender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) case File: sender, err = newFileSender(r.config.OutputPath) case Rtmp: - sender, err = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) + sender, err = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, false, r.config.Logger.Log) if err != nil { return err } @@ -216,10 +218,7 @@ func (r *Revid) reset(config Config) error { // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { - if len(mtsSenders) != 1 && len(flvSenders) != 0 { - retry = false - } - ms, _ := newMultiSender(mtsSenders, retry, r.IsRunning) + ms, _ := newMultiSender(mtsSenders, r.IsRunning) if err != nil { return err } @@ -231,7 +230,7 @@ func (r *Revid) reset(config Config) error { // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { - ms, _ := newMultiSender(flvSenders, false, r.IsRunning) + ms, _ := newMultiSender(flvSenders, r.IsRunning) e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate)) if err != nil { return err diff --git a/revid/senders.go b/revid/senders.go index 0dc40e9c..68b97963 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -63,13 +63,12 @@ type multiSender struct { // newMultiSender returns a pointer to a new multiSender. active is a function // to indicate the state of the multiSenders owner i.e. whether it is running // or not. -func newMultiSender(senders []loadSender, retry bool, active func() bool) (*multiSender, error) { +func newMultiSender(senders []loadSender, active func() bool) (*multiSender, error) { if active == nil { return nil, errors.New("multi sender requires that active func is provided") } s := &multiSender{ senders: senders, - retry: retry, active: active, } return s, nil @@ -86,7 +85,7 @@ func (s *multiSender) Write(d []byte) (int, error) { if err != nil { sender.handleSendFail(err) } - if err == nil || !s.retry { + if err == nil || !sender.retrySend() { break } } @@ -139,6 +138,9 @@ type loadSender interface { // handleSendFail performs any actions necessary in response to a failed send. handleSendFail(err error) error + + // retry returns true if this sender has been set for send retry. + retrySend() bool } // restart is an optional interface for loadSenders that @@ -173,12 +175,12 @@ func (s *fileSender) send() error { func (s *fileSender) release() {} -func (s *fileSender) close() error { - return s.file.Close() -} +func (s *fileSender) close() error { return s.file.Close() } func (s *fileSender) handleSendFail(err error) error { return nil } +func (s *fileSender) retrySend() bool { return false } + // mtsSender implemented loadSender and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately // lengthed clips based on PSI. It also fixes accounts for discontinuities by @@ -192,13 +194,15 @@ type mtsSender struct { discarded bool repairer *mts.DiscontinuityRepairer curPid int + retry bool } // newMtsSender returns a new mtsSender. -func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { +func newMtsSender(s Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ sender: s, repairer: mts.NewDiscontinuityRepairer(), + retry: retry, } } @@ -258,6 +262,10 @@ func (s *mtsSender) release() { func (s *mtsSender) handleSendFail(err error) error { return nil } +func (s *mtsSender) retrySend() bool { + return s.retry +} + // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender @@ -265,9 +273,11 @@ type httpSender struct { log func(lvl int8, msg string, args ...interface{}) data []byte + + retry bool } -func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { +func newHttpSender(ns *netsender.Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *httpSender { return &httpSender{ client: ns, log: log, @@ -344,6 +354,8 @@ func (s *httpSender) close() error { return nil } func (s *httpSender) handleSendFail(err error) error { return nil } +func (s *httpSender) retrySend() bool { return s.retry } + // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -353,12 +365,13 @@ type rtmpSender struct { retries int log func(lvl int8, msg string, args ...interface{}) - data []byte + data []byte + retry bool } var _ restarter = (*rtmpSender)(nil) -func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { +func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { @@ -381,6 +394,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, + retry: retry, } return s, nil } @@ -424,9 +438,9 @@ func (s *rtmpSender) close() error { return nil } -func (s *rtmpSender) handleSendFail(err error) error { - return s.restart() -} +func (s *rtmpSender) handleSendFail(err error) error { return s.restart() } + +func (s *rtmpSender) retrySend() bool { return s.retry } // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. @@ -464,3 +478,5 @@ func (s *rtpSender) send() error { } func (s *rtpSender) handleSendFail(err error) error { return nil } + +func (s *rtpSender) retrySend() bool { return false } diff --git a/revid/senders_test.go b/revid/senders_test.go index 291f2c1b..f5df1e76 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -109,7 +109,7 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, loadsender and the MPEGTS encoder. tstSender := &sender{} - loadSender := newMtsSender(tstSender, log) + loadSender := newMtsSender(tstSender, false, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder((*buffer)(rb), 25) @@ -199,7 +199,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create ringBuffer sender, loadSender and the MPEGTS encoder. const clipWithDiscontinuity = 3 tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstSender, log) + loadSender := newMtsSender(tstSender, false, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder((*buffer)(rb), 25) @@ -259,13 +259,13 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // active function is not provided, and when an active function is provided. func TestNewMultiSender(t *testing.T) { // First test without giving an 'active' function. - _, err := newMultiSender(nil, false, nil) + _, err := newMultiSender(nil, nil) if err == nil { t.Fatal("did not get expected error") } // Now test with providing an active function. - _, err = newMultiSender(nil, false, func() bool { return true }) + _, err = newMultiSender(nil, func() bool { return true }) if err != nil { t.Fatalf("unespected error: %v", err) } @@ -278,11 +278,12 @@ type dummyLoadSender struct { buf [][]byte failOnSend bool failHandled bool + retry bool } // newDummyLoadSender returns a pointer to a new dummyLoadSender. -func newDummyLoadSender(fail bool) *dummyLoadSender { - return &dummyLoadSender{failOnSend: fail, failHandled: true} +func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender { + return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry} } // load takes a byte slice and assigns it to the dummyLoadSenders data slice. @@ -317,15 +318,17 @@ func (s *dummyLoadSender) handleSendFail(err error) error { return nil } +func (s *dummyLoadSender) retrySend() bool { return s.retry } + // TestMultiSenderWrite checks that we can do basic writing to multiple senders // using the multiSender. func TestMultiSenderWrite(t *testing.T) { senders := []loadSender{ - newDummyLoadSender(false), - newDummyLoadSender(false), - newDummyLoadSender(false), + newDummyLoadSender(false, false), + newDummyLoadSender(false, false), + newDummyLoadSender(false, false), } - ms, err := newMultiSender(senders, false, func() bool { return true }) + ms, err := newMultiSender(senders, func() bool { return true }) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -352,9 +355,9 @@ func TestMultiSenderWrite(t *testing.T) { // retries, then we return from Write as expected. func TestMultiSenderNotActiveNoRetry(t *testing.T) { senders := []loadSender{ - newDummyLoadSender(false), - newDummyLoadSender(false), - newDummyLoadSender(false), + newDummyLoadSender(false, false), + newDummyLoadSender(false, false), + newDummyLoadSender(false, false), } // This will allow us to simulate a change in running state of @@ -364,7 +367,7 @@ func TestMultiSenderNotActiveNoRetry(t *testing.T) { return active } - ms, err := newMultiSender(senders, false, activeFunc) + ms, err := newMultiSender(senders, activeFunc) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -388,7 +391,7 @@ func TestMultiSenderNotActiveNoRetry(t *testing.T) { // send retries. func TestMultiSenderNotActiveRetry(t *testing.T) { senders := []loadSender{ - newDummyLoadSender(false), + newDummyLoadSender(false, false), } // Active will simulate the running state of the multiSender's 'owner'. @@ -414,7 +417,7 @@ func TestMultiSenderNotActiveRetry(t *testing.T) { return active } - ms, err := newMultiSender(senders, false, activeFunc) + ms, err := newMultiSender(senders, activeFunc) if err != nil { t.Fatalf("Unexpected error: %v", err) }