mirror of https://bitbucket.org/ausocean/av.git
revid: added function type called active to multiSender
We wish to have a way to check that the 'owner' of the multi sender is still active while it may be doing continual send retries - therefore a function with bool return called active has been added as a field to multiSender so that we can call this and check whether the owner of the multiSender is 'active' or not.
This commit is contained in:
parent
0ca75538d2
commit
e2a6d9f4bd
|
@ -219,7 +219,7 @@ func (r *Revid) reset(config Config) error {
|
||||||
if len(mtsSenders) != 1 && len(flvSenders) != 0 {
|
if len(mtsSenders) != 1 && len(flvSenders) != 0 {
|
||||||
retry = false
|
retry = false
|
||||||
}
|
}
|
||||||
ms := newMultiSender(r, mtsSenders, retry)
|
ms := newMultiSender(mtsSenders, retry, r.IsRunning)
|
||||||
e := mts.NewEncoder(ms, float64(r.config.FrameRate))
|
e := mts.NewEncoder(ms, float64(r.config.FrameRate))
|
||||||
r.encoder = append(r.encoder, e)
|
r.encoder = append(r.encoder, e)
|
||||||
}
|
}
|
||||||
|
@ -228,7 +228,7 @@ func (r *Revid) reset(config Config) error {
|
||||||
// encoder to revid's encoder slice, and give this encoder the flvSenders
|
// encoder to revid's encoder slice, and give this encoder the flvSenders
|
||||||
// as a destination.
|
// as a destination.
|
||||||
if len(flvSenders) != 0 {
|
if len(flvSenders) != 0 {
|
||||||
ms := newMultiSender(r, flvSenders, false)
|
ms := newMultiSender(flvSenders, false, r.IsRunning)
|
||||||
e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate))
|
e, err := flv.NewEncoder(ms, true, true, int(r.config.FrameRate))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -55,27 +55,32 @@ type Sender interface {
|
||||||
// multiSender allows for the sending through multi loadSenders using a single
|
// multiSender allows for the sending through multi loadSenders using a single
|
||||||
// call to multiSender.Write.
|
// call to multiSender.Write.
|
||||||
type multiSender struct {
|
type multiSender struct {
|
||||||
owner *Revid
|
active func() bool
|
||||||
senders []loadSender
|
senders []loadSender
|
||||||
retry bool
|
retry bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMultiSender returns a pointer to a new multiSender.
|
// newMultiSender returns a pointer to a new multiSender. active is a function
|
||||||
func newMultiSender(owner *Revid, senders []loadSender, retry bool) *multiSender {
|
// to indicate the state of the multiSenders owner i.e. whether it is running
|
||||||
return &multiSender{owner: owner, senders: senders, retry: retry}
|
// or not.
|
||||||
|
func newMultiSender(senders []loadSender, retry bool, active func() bool) *multiSender {
|
||||||
|
return &multiSender{
|
||||||
|
senders: senders,
|
||||||
|
retry: retry,
|
||||||
|
active: active,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements io.Writer. The written slice will be sent to each loadSender
|
// Write implements io.Writer. The written slice will be sent to each loadSender
|
||||||
// in multiSender.senders. If s.owner.config.SendRetry is true then on failed
|
// in multiSender.senders as long as s.active() is true. If a send fails, and
|
||||||
// sends we notify the current sender to take any required actions and then try
|
// s.retry is true, the send will be tried again.
|
||||||
// the send again.
|
|
||||||
func (s *multiSender) Write(d []byte) (int, error) {
|
func (s *multiSender) Write(d []byte) (int, error) {
|
||||||
for _, sender := range s.senders {
|
for _, sender := range s.senders {
|
||||||
sender.load(d)
|
sender.load(d)
|
||||||
for s.owner.IsRunning() {
|
for s.active() {
|
||||||
err := sender.send()
|
err := sender.send()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.handleFail(sender, err)
|
sender.handleSendFail(err)
|
||||||
}
|
}
|
||||||
if err == nil || !s.retry {
|
if err == nil || !s.retry {
|
||||||
break
|
break
|
||||||
|
@ -90,15 +95,6 @@ func (s *multiSender) Write(d []byte) (int, error) {
|
||||||
return len(d), nil
|
return len(d), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleFail calls the passed sender's handleSendFail method and then logs
|
|
||||||
// error if this was not successful.
|
|
||||||
func (s *multiSender) handleFail(sender loadSender, e error) {
|
|
||||||
err := sender.handleSendFail(e)
|
|
||||||
if err != nil {
|
|
||||||
s.owner.config.Logger.Log(logger.Warning, "could not currenty handle send fail", "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
|
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
|
||||||
type minimalHttpSender struct {
|
type minimalHttpSender struct {
|
||||||
client *netsender.Sender
|
client *netsender.Sender
|
||||||
|
|
Loading…
Reference in New Issue