revid: removed concept of send retry for now

Send retry has been removed from the multiSender. This also means there is not need for the active func, because we simply wait until the send is complete or failed to exit the output clips
routine. Tests pertinent to retrying or the active function have been removed.
This commit is contained in:
Saxon 2019-03-14 10:35:09 +10:30
parent 4881e179cc
commit 99a4010c79
3 changed files with 19 additions and 174 deletions

View File

@ -192,17 +192,13 @@ func (r *Revid) reset(config Config) error {
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case Http: case Http:
retry := false sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)
if len(r.config.Outputs) == 1 {
retry = true
}
sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), retry, nil)
case Rtp: case Rtp:
sender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) sender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
case File: case File:
sender, err = newFileSender(r.config.OutputPath) sender, err = newFileSender(r.config.OutputPath)
case Rtmp: case Rtmp:
sender, _ = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, false, r.config.Logger.Log) sender, _ = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
flvSenders = append(flvSenders, sender) flvSenders = append(flvSenders, sender)
continue continue
} }
@ -216,7 +212,7 @@ func (r *Revid) reset(config Config) error {
// encoder to revid's encoder slice, and give this encoder the mtsSenders // encoder to revid's encoder slice, and give this encoder the mtsSenders
// as a destination. // as a destination.
if len(mtsSenders) != 0 { if len(mtsSenders) != 0 {
ms := newMultiSender(mtsSenders, r.IsRunning) ms := newMultiSender(mtsSenders)
if err != nil { if err != nil {
return err return err
} }
@ -228,7 +224,7 @@ func (r *Revid) reset(config Config) error {
// encoder to revid's encoder slice, and give this encoder the flvSenders // encoder to revid's encoder slice, and give this encoder the flvSenders
// as a destination. // as a destination.
if len(flvSenders) != 0 { if len(flvSenders) != 0 {
ms := newMultiSender(flvSenders, r.IsRunning) ms := newMultiSender(flvSenders)
e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate)) e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate))
if err != nil { if err != nil {
return err return err

View File

@ -55,7 +55,6 @@ type Sender interface {
// multiSender allows for the sending through multi loadSenders using a single // multiSender allows for the sending through multi loadSenders using a single
// call to multiSender.Write. // call to multiSender.Write.
type multiSender struct { type multiSender struct {
isActive func() bool
senders []loadSender senders []loadSender
retry bool retry bool
} }
@ -63,13 +62,9 @@ type multiSender struct {
// newMultiSender returns a pointer to a new multiSender. active is a function // newMultiSender returns a pointer to a new multiSender. active is a function
// to indicate the state of the multiSenders owner i.e. whether it is running // to indicate the state of the multiSenders owner i.e. whether it is running
// or not. // or not.
func newMultiSender(senders []loadSender, active func() bool) *multiSender { func newMultiSender(senders []loadSender) *multiSender {
if active == nil {
panic("multi sender requires that active func is provided")
}
s := &multiSender{ s := &multiSender{
senders: senders, senders: senders,
isActive: active,
} }
return s return s
} }
@ -80,15 +75,10 @@ func newMultiSender(senders []loadSender, active func() bool) *multiSender {
func (s *multiSender) Write(d []byte) (int, error) { func (s *multiSender) Write(d []byte) (int, error) {
for _, sender := range s.senders { for _, sender := range s.senders {
sender.load(d) sender.load(d)
for s.isActive() {
err := sender.send() err := sender.send()
if err != nil { if err != nil {
sender.handleSendFail(err) sender.handleSendFail(err)
} }
if err == nil || !sender.retrySend() {
break
}
}
} }
for _, sender := range s.senders { for _, sender := range s.senders {
@ -138,9 +128,6 @@ type loadSender interface {
// handleSendFail performs any actions necessary in response to a failed send. // handleSendFail performs any actions necessary in response to a failed send.
handleSendFail(err error) error handleSendFail(err error) error
// retry returns true if this sender has been set for send retry.
retrySend() bool
} }
// restart is an optional interface for loadSenders that // restart is an optional interface for loadSenders that
@ -198,11 +185,10 @@ type mtsSender struct {
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *mtsSender { func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{ return &mtsSender{
sender: s, sender: s,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
retry: retry,
} }
} }
@ -262,10 +248,6 @@ func (s *mtsSender) release() {
func (s *mtsSender) handleSendFail(err error) error { return nil } func (s *mtsSender) handleSendFail(err error) error { return nil }
func (s *mtsSender) retrySend() bool {
return s.retry
}
// httpSender implements loadSender for posting HTTP to NetReceiver // httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct { type httpSender struct {
client *netsender.Sender client *netsender.Sender
@ -273,11 +255,9 @@ type httpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
data []byte data []byte
retry bool
} }
func newHttpSender(ns *netsender.Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *httpSender { func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{ return &httpSender{
client: ns, client: ns,
log: log, log: log,
@ -354,8 +334,6 @@ func (s *httpSender) close() error { return nil }
func (s *httpSender) handleSendFail(err error) error { return nil } func (s *httpSender) handleSendFail(err error) error { return nil }
func (s *httpSender) retrySend() bool { return s.retry }
// rtmpSender implements loadSender for a native RTMP destination. // rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct { type rtmpSender struct {
conn *rtmp.Conn conn *rtmp.Conn
@ -366,12 +344,11 @@ type rtmpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
data []byte data []byte
retry bool
} }
var _ restarter = (*rtmpSender)(nil) var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn var conn *rtmp.Conn
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
@ -390,7 +367,6 @@ func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(l
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
retry: retry,
} }
return s, err return s, err
} }
@ -435,8 +411,6 @@ func (s *rtmpSender) close() error {
func (s *rtmpSender) handleSendFail(err error) error { return s.restart() } func (s *rtmpSender) handleSendFail(err error) error { return s.restart() }
func (s *rtmpSender) retrySend() bool { return s.retry }
// TODO: Write restart func for rtpSender // TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization. // rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct { type rtpSender struct {
@ -473,5 +447,3 @@ func (s *rtpSender) send() error {
} }
func (s *rtpSender) handleSendFail(err error) error { return nil } func (s *rtpSender) handleSendFail(err error) error { return nil }
func (s *rtpSender) retrySend() bool { return false }

View File

@ -110,7 +110,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, loadsender and the MPEGTS encoder. // Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{} tstSender := &sender{}
loadSender := newMtsSender(tstSender, false, log) loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25)
@ -200,7 +200,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create ringBuffer sender, loadSender and the MPEGTS encoder. // Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3 const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, false, log) loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25)
@ -321,7 +321,7 @@ func TestMultiSenderWrite(t *testing.T) {
newDummyLoadSender(false, false), newDummyLoadSender(false, false),
newDummyLoadSender(false, false), newDummyLoadSender(false, false),
} }
ms := newMultiSender(senders, func() bool { return true }) ms := newMultiSender(senders)
// Perform some multiSender writes. // Perform some multiSender writes.
const noOfWrites = 5 const noOfWrites = 5
@ -340,85 +340,6 @@ func TestMultiSenderWrite(t *testing.T) {
} }
} }
// TestMultiSenderNotActiveNoRetry checks that if the active func passed to
// newMultiSender returns false before a write, or in the middle of write with
// retries, then we return from Write as expected.
func TestMultiSenderNotActiveNoRetry(t *testing.T) {
senders := []loadSender{
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
}
// This will allow us to simulate a change in running state of
// multiSender's 'owner'.
active := true
activeFunc := func() bool {
return active
}
ms := newMultiSender(senders, activeFunc)
// We will perform two writes. We expect the second write not to be complete,
// i.e. the senders should not send anything on this write.
ms.Write([]byte{0x00})
active = false
ms.Write([]byte{0x01})
// Check that the senders only sent data once.
for _, dest := range ms.senders {
if len(dest.(*dummyLoadSender).buf) != 1 {
t.Errorf("length of sender buf is not 1 as expected")
}
}
}
// TestMultiSenderNotActiveRetry checks that we correctly returns from a call to
// multiSender.Write when the active callback func return false during repeated
// send retries.
func TestMultiSenderNotActiveRetry(t *testing.T) {
senders := []loadSender{
newDummyLoadSender(false, false),
}
// Active will simulate the running state of the multiSender's 'owner'.
active := true
// We will run the ms.Write as routine so we need some sync.
var mu sync.Mutex
// Once we use setActive to change the state of the fake owner, this will
// return false and we expect the ms.Write method to return from the continous
// send retry state.
activeFunc := func() bool {
mu.Lock()
defer mu.Unlock()
return active
}
ms := newMultiSender(senders, activeFunc)
// We run this in background so that we can change running state during the
// the write. We then expect done to be true after some period of time.
done := false
go func() {
ms.Write([]byte{0x00})
done = true
}()
// Wait for half a second and then change the active state.
time.Sleep(500 * time.Millisecond)
mu.Lock()
active = false
mu.Unlock()
// Wait half a second for the routine to return and check that done is true.
time.Sleep(500 * time.Millisecond)
if !done {
t.Fatal("multiSender.Write did not return as expected with active=false")
}
}
// TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender // TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender
// fails at a send and does not retry. // fails at a send and does not retry.
func TestMultiSenderFailNoRetry(t *testing.T) { func TestMultiSenderFailNoRetry(t *testing.T) {
@ -428,7 +349,7 @@ func TestMultiSenderFailNoRetry(t *testing.T) {
newDummyLoadSender(false, false), newDummyLoadSender(false, false),
} }
ms := newMultiSender(senders, func() bool { return true }) ms := newMultiSender(senders)
// We will perform two writes. We expect the second write not to be complete, // We will perform two writes. We expect the second write not to be complete,
// i.e. the senders should not send anything on this write. // i.e. the senders should not send anything on this write.
@ -473,47 +394,3 @@ func TestMultiSenderFailNoRetry(t *testing.T) {
} }
} }
} }
// TestMultiSenderFailRetry checks that a if a sender is set to retry on failed
// sends, that it does so repeatedly until it can successfully send.
func TestMultiSenderFailRetry(t *testing.T) {
// NB: This is only being tested with one sender - this is AusOcean's use case.
senders := []loadSender{newDummyLoadSender(false, true)}
ms := newMultiSender(senders, func() bool { return true })
// Perform one write with successful send.
ms.Write([]byte{0x00})
// Now cause sender to fail on next write.
failedSender := ms.senders[0].(*dummyLoadSender)
failedSender.failOnSend = true
// Wrap next write in a routine. It will keep trying to send until we set
// failOnSend to false.
done := false
go func() {
ms.Write([]byte{0x01})
done = true
}()
// Now set failOnSend to false.
failedSender.mu.Lock()
failedSender.failOnSend = false
failedSender.mu.Unlock()
// Sleep and then check that we've successfully returned from the write.
time.Sleep(10 * time.Millisecond)
if done != true {
t.Fatal("did not exit write when send was successful")
}
// Write on last time.
ms.Write([]byte{0x02})
// Check that all the data is there.
got := failedSender.buf
want := [][]byte{{0x00}, {0x01}, {0x02}}
if !reflect.DeepEqual(got, want) {
t.Errorf("sender did not send expected data. \nGot: %v\nWant: %v\n", got, want)
}
}