revid: retry is now an attribute of senders and can be set at initialisation

This commit is contained in:
Saxon 2019-03-12 20:53:27 +10:30
parent da1532b9d1
commit 500edc05aa
3 changed files with 56 additions and 38 deletions

View File

@ -188,18 +188,20 @@ func (r *Revid) reset(config Config) error {
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
// output requires FLV encoding. // output requires FLV encoding.
var sender loadSender var sender loadSender
var retry bool
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case Http: case Http:
retry := false
if len(r.config.Outputs) == 1 {
retry = true retry = true
sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) }
sender = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), retry, nil)
case Rtp: case Rtp:
sender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) sender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
case File: case File:
sender, err = newFileSender(r.config.OutputPath) sender, err = newFileSender(r.config.OutputPath)
case Rtmp: case Rtmp:
sender, err = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) sender, err = newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, false, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
@ -216,10 +218,7 @@ func (r *Revid) reset(config Config) error {
// encoder to revid's encoder slice, and give this encoder the mtsSenders // encoder to revid's encoder slice, and give this encoder the mtsSenders
// as a destination. // as a destination.
if len(mtsSenders) != 0 { if len(mtsSenders) != 0 {
if len(mtsSenders) != 1 && len(flvSenders) != 0 { ms, _ := newMultiSender(mtsSenders, r.IsRunning)
retry = false
}
ms, _ := newMultiSender(mtsSenders, retry, r.IsRunning)
if err != nil { if err != nil {
return err return err
} }
@ -231,7 +230,7 @@ func (r *Revid) reset(config Config) error {
// encoder to revid's encoder slice, and give this encoder the flvSenders // encoder to revid's encoder slice, and give this encoder the flvSenders
// as a destination. // as a destination.
if len(flvSenders) != 0 { if len(flvSenders) != 0 {
ms, _ := newMultiSender(flvSenders, false, r.IsRunning) ms, _ := newMultiSender(flvSenders, r.IsRunning)
e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate)) e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate))
if err != nil { if err != nil {
return err return err

View File

@ -63,13 +63,12 @@ type multiSender struct {
// newMultiSender returns a pointer to a new multiSender. active is a function // newMultiSender returns a pointer to a new multiSender. active is a function
// to indicate the state of the multiSenders owner i.e. whether it is running // to indicate the state of the multiSenders owner i.e. whether it is running
// or not. // or not.
func newMultiSender(senders []loadSender, retry bool, active func() bool) (*multiSender, error) { func newMultiSender(senders []loadSender, active func() bool) (*multiSender, error) {
if active == nil { if active == nil {
return nil, errors.New("multi sender requires that active func is provided") return nil, errors.New("multi sender requires that active func is provided")
} }
s := &multiSender{ s := &multiSender{
senders: senders, senders: senders,
retry: retry,
active: active, active: active,
} }
return s, nil return s, nil
@ -86,7 +85,7 @@ func (s *multiSender) Write(d []byte) (int, error) {
if err != nil { if err != nil {
sender.handleSendFail(err) sender.handleSendFail(err)
} }
if err == nil || !s.retry { if err == nil || !sender.retrySend() {
break break
} }
} }
@ -139,6 +138,9 @@ type loadSender interface {
// handleSendFail performs any actions necessary in response to a failed send. // handleSendFail performs any actions necessary in response to a failed send.
handleSendFail(err error) error handleSendFail(err error) error
// retry returns true if this sender has been set for send retry.
retrySend() bool
} }
// restart is an optional interface for loadSenders that // restart is an optional interface for loadSenders that
@ -173,12 +175,12 @@ func (s *fileSender) send() error {
func (s *fileSender) release() {} func (s *fileSender) release() {}
func (s *fileSender) close() error { func (s *fileSender) close() error { return s.file.Close() }
return s.file.Close()
}
func (s *fileSender) handleSendFail(err error) error { return nil } func (s *fileSender) handleSendFail(err error) error { return nil }
func (s *fileSender) retrySend() bool { return false }
// mtsSender implemented loadSender and provides sending capability specifically // mtsSender implemented loadSender and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately // for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also fixes accounts for discontinuities by // lengthed clips based on PSI. It also fixes accounts for discontinuities by
@ -192,13 +194,15 @@ type mtsSender struct {
discarded bool discarded bool
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
curPid int curPid int
retry bool
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { func newMtsSender(s Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{ return &mtsSender{
sender: s, sender: s,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
retry: retry,
} }
} }
@ -258,6 +262,10 @@ func (s *mtsSender) release() {
func (s *mtsSender) handleSendFail(err error) error { return nil } func (s *mtsSender) handleSendFail(err error) error { return nil }
func (s *mtsSender) retrySend() bool {
return s.retry
}
// httpSender implements loadSender for posting HTTP to NetReceiver // httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct { type httpSender struct {
client *netsender.Sender client *netsender.Sender
@ -265,9 +273,11 @@ type httpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
data []byte data []byte
retry bool
} }
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { func newHttpSender(ns *netsender.Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{ return &httpSender{
client: ns, client: ns,
log: log, log: log,
@ -344,6 +354,8 @@ func (s *httpSender) close() error { return nil }
func (s *httpSender) handleSendFail(err error) error { return nil } func (s *httpSender) handleSendFail(err error) error { return nil }
func (s *httpSender) retrySend() bool { return s.retry }
// rtmpSender implements loadSender for a native RTMP destination. // rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct { type rtmpSender struct {
conn *rtmp.Conn conn *rtmp.Conn
@ -354,11 +366,12 @@ type rtmpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
data []byte data []byte
retry bool
} }
var _ restarter = (*rtmpSender)(nil) var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn var conn *rtmp.Conn
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
@ -381,6 +394,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
retry: retry,
} }
return s, nil return s, nil
} }
@ -424,9 +438,9 @@ func (s *rtmpSender) close() error {
return nil return nil
} }
func (s *rtmpSender) handleSendFail(err error) error { func (s *rtmpSender) handleSendFail(err error) error { return s.restart() }
return s.restart()
} func (s *rtmpSender) retrySend() bool { return s.retry }
// TODO: Write restart func for rtpSender // TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization. // rtpSender implements loadSender for a native udp destination with rtp packetization.
@ -464,3 +478,5 @@ func (s *rtpSender) send() error {
} }
func (s *rtpSender) handleSendFail(err error) error { return nil } func (s *rtpSender) handleSendFail(err error) error { return nil }
func (s *rtpSender) retrySend() bool { return false }

View File

@ -109,7 +109,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, loadsender and the MPEGTS encoder. // Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{} tstSender := &sender{}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstSender, false, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25)
@ -199,7 +199,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create ringBuffer sender, loadSender and the MPEGTS encoder. // Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3 const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstSender, false, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25)
@ -259,13 +259,13 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// active function is not provided, and when an active function is provided. // active function is not provided, and when an active function is provided.
func TestNewMultiSender(t *testing.T) { func TestNewMultiSender(t *testing.T) {
// First test without giving an 'active' function. // First test without giving an 'active' function.
_, err := newMultiSender(nil, false, nil) _, err := newMultiSender(nil, nil)
if err == nil { if err == nil {
t.Fatal("did not get expected error") t.Fatal("did not get expected error")
} }
// Now test with providing an active function. // Now test with providing an active function.
_, err = newMultiSender(nil, false, func() bool { return true }) _, err = newMultiSender(nil, func() bool { return true })
if err != nil { if err != nil {
t.Fatalf("unespected error: %v", err) t.Fatalf("unespected error: %v", err)
} }
@ -278,11 +278,12 @@ type dummyLoadSender struct {
buf [][]byte buf [][]byte
failOnSend bool failOnSend bool
failHandled bool failHandled bool
retry bool
} }
// newDummyLoadSender returns a pointer to a new dummyLoadSender. // newDummyLoadSender returns a pointer to a new dummyLoadSender.
func newDummyLoadSender(fail bool) *dummyLoadSender { func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender {
return &dummyLoadSender{failOnSend: fail, failHandled: true} return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry}
} }
// load takes a byte slice and assigns it to the dummyLoadSenders data slice. // load takes a byte slice and assigns it to the dummyLoadSenders data slice.
@ -317,15 +318,17 @@ func (s *dummyLoadSender) handleSendFail(err error) error {
return nil return nil
} }
func (s *dummyLoadSender) retrySend() bool { return s.retry }
// TestMultiSenderWrite checks that we can do basic writing to multiple senders // TestMultiSenderWrite checks that we can do basic writing to multiple senders
// using the multiSender. // using the multiSender.
func TestMultiSenderWrite(t *testing.T) { func TestMultiSenderWrite(t *testing.T) {
senders := []loadSender{ senders := []loadSender{
newDummyLoadSender(false), newDummyLoadSender(false, false),
newDummyLoadSender(false), newDummyLoadSender(false, false),
newDummyLoadSender(false), newDummyLoadSender(false, false),
} }
ms, err := newMultiSender(senders, false, func() bool { return true }) ms, err := newMultiSender(senders, func() bool { return true })
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -352,9 +355,9 @@ func TestMultiSenderWrite(t *testing.T) {
// retries, then we return from Write as expected. // retries, then we return from Write as expected.
func TestMultiSenderNotActiveNoRetry(t *testing.T) { func TestMultiSenderNotActiveNoRetry(t *testing.T) {
senders := []loadSender{ senders := []loadSender{
newDummyLoadSender(false), newDummyLoadSender(false, false),
newDummyLoadSender(false), newDummyLoadSender(false, false),
newDummyLoadSender(false), newDummyLoadSender(false, false),
} }
// This will allow us to simulate a change in running state of // This will allow us to simulate a change in running state of
@ -364,7 +367,7 @@ func TestMultiSenderNotActiveNoRetry(t *testing.T) {
return active return active
} }
ms, err := newMultiSender(senders, false, activeFunc) ms, err := newMultiSender(senders, activeFunc)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -388,7 +391,7 @@ func TestMultiSenderNotActiveNoRetry(t *testing.T) {
// send retries. // send retries.
func TestMultiSenderNotActiveRetry(t *testing.T) { func TestMultiSenderNotActiveRetry(t *testing.T) {
senders := []loadSender{ senders := []loadSender{
newDummyLoadSender(false), newDummyLoadSender(false, false),
} }
// Active will simulate the running state of the multiSender's 'owner'. // Active will simulate the running state of the multiSender's 'owner'.
@ -414,7 +417,7 @@ func TestMultiSenderNotActiveRetry(t *testing.T) {
return active return active
} }
ms, err := newMultiSender(senders, false, activeFunc) ms, err := newMultiSender(senders, activeFunc)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }