mirror of https://bitbucket.org/ausocean/av.git
revid: sender any logic that is performed on a failed send is now done inside loadSender.send()
This commit is contained in:
parent
72d0683162
commit
e7c6b7319b
|
@ -220,7 +220,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func
|
||||||
// encoder to revid's encoder slice, and give this encoder the mtsSenders
|
// encoder to revid's encoder slice, and give this encoder the mtsSenders
|
||||||
// as a destination.
|
// as a destination.
|
||||||
if len(mtsSenders) != 0 {
|
if len(mtsSenders) != 0 {
|
||||||
ms := multiSender(mtsSenders)
|
ms := newMultiSender(mtsSenders, r.config.Logger.Log)
|
||||||
e := mtsEnc(ms, int(r.config.FrameRate))
|
e := mtsEnc(ms, int(r.config.FrameRate))
|
||||||
r.encoder = append(r.encoder, e)
|
r.encoder = append(r.encoder, e)
|
||||||
}
|
}
|
||||||
|
@ -229,7 +229,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func
|
||||||
// encoder to revid's encoder slice, and give this encoder the flvSenders
|
// encoder to revid's encoder slice, and give this encoder the flvSenders
|
||||||
// as a destination.
|
// as a destination.
|
||||||
if len(flvSenders) != 0 {
|
if len(flvSenders) != 0 {
|
||||||
ms := multiSender(flvSenders)
|
ms := newMultiSender(flvSenders, r.config.Logger.Log)
|
||||||
e, err := flvEnc(ms, int(r.config.FrameRate))
|
e, err := flvEnc(ms, int(r.config.FrameRate))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -265,7 +265,8 @@ func TestResetEncoderSenderSetup(t *testing.T) {
|
||||||
case flvEncoderStr:
|
case flvEncoderStr:
|
||||||
ms = e.(*tstFlvEncoder).dst
|
ms = e.(*tstFlvEncoder).dst
|
||||||
}
|
}
|
||||||
senders := []loadSender(ms.(multiSender))
|
|
||||||
|
senders := ms.(*multiSender).senders
|
||||||
got = len(senders)
|
got = len(senders)
|
||||||
want = len(test.encoders[idx].destinations)
|
want = len(test.encoders[idx].destinations)
|
||||||
if got != want {
|
if got != want {
|
||||||
|
|
|
@ -52,21 +52,33 @@ type Sender interface {
|
||||||
send(d []byte) error
|
send(d []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Log func(level int8, message string, params ...interface{})
|
||||||
|
|
||||||
// multiSender implements io.Writer. It provides the capacity to send to multiple
|
// multiSender implements io.Writer. It provides the capacity to send to multiple
|
||||||
// senders from a single Write call.
|
// senders from a single Write call.
|
||||||
type multiSender []loadSender
|
type multiSender struct {
|
||||||
|
senders []loadSender
|
||||||
|
log Log
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMultiSender(senders []loadSender, log Log) *multiSender {
|
||||||
|
return &multiSender{
|
||||||
|
senders: senders,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Write implements io.Writer. This will call load (with the passed slice), send
|
// Write implements io.Writer. This will call load (with the passed slice), send
|
||||||
// and release on all senders of multiSender.
|
// and release on all senders of multiSender.
|
||||||
func (s multiSender) Write(d []byte) (int, error) {
|
func (s *multiSender) Write(d []byte) (int, error) {
|
||||||
for _, sender := range s {
|
for i, sender := range s.senders {
|
||||||
sender.load(d)
|
sender.load(d)
|
||||||
err := sender.send()
|
err := sender.send()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sender.handleSendFail(err)
|
s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, sender := range s {
|
for _, sender := range s.senders {
|
||||||
sender.release()
|
sender.release()
|
||||||
}
|
}
|
||||||
return len(d), nil
|
return len(d), nil
|
||||||
|
@ -109,9 +121,6 @@ type loadSender interface {
|
||||||
|
|
||||||
// close cleans up after use of the loadSender.
|
// close cleans up after use of the loadSender.
|
||||||
close() error
|
close() error
|
||||||
|
|
||||||
// handleSendFail performs any actions necessary in response to a failed send.
|
|
||||||
handleSendFail(err error) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// restart is an optional interface for loadSenders that
|
// restart is an optional interface for loadSenders that
|
||||||
|
@ -148,8 +157,6 @@ func (s *fileSender) release() {}
|
||||||
|
|
||||||
func (s *fileSender) close() error { return s.file.Close() }
|
func (s *fileSender) close() error { return s.file.Close() }
|
||||||
|
|
||||||
func (s *fileSender) handleSendFail(err error) error { return nil }
|
|
||||||
|
|
||||||
// mtsSender implements loadSender and provides sending capability specifically
|
// mtsSender implements loadSender and provides sending capability specifically
|
||||||
// for use with MPEGTS packetization. It handles the construction of appropriately
|
// for use with MPEGTS packetization. It handles the construction of appropriately
|
||||||
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
|
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
|
||||||
|
@ -227,8 +234,6 @@ func (s *mtsSender) release() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *mtsSender) handleSendFail(err error) error { return nil }
|
|
||||||
|
|
||||||
// httpSender implements loadSender for posting HTTP to NetReceiver
|
// httpSender implements loadSender for posting HTTP to NetReceiver
|
||||||
type httpSender struct {
|
type httpSender struct {
|
||||||
client *netsender.Sender
|
client *netsender.Sender
|
||||||
|
@ -313,8 +318,6 @@ func (s *httpSender) release() {}
|
||||||
|
|
||||||
func (s *httpSender) close() error { return nil }
|
func (s *httpSender) close() error { return nil }
|
||||||
|
|
||||||
func (s *httpSender) handleSendFail(err error) error { return nil }
|
|
||||||
|
|
||||||
// rtmpSender implements loadSender for a native RTMP destination.
|
// rtmpSender implements loadSender for a native RTMP destination.
|
||||||
type rtmpSender struct {
|
type rtmpSender struct {
|
||||||
conn *rtmp.Conn
|
conn *rtmp.Conn
|
||||||
|
@ -362,6 +365,9 @@ func (s *rtmpSender) send() error {
|
||||||
return errors.New("no rtmp connection, cannot write")
|
return errors.New("no rtmp connection, cannot write")
|
||||||
}
|
}
|
||||||
_, err := s.conn.Write(s.data)
|
_, err := s.conn.Write(s.data)
|
||||||
|
if err != nil {
|
||||||
|
err = s.restart()
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,8 +396,6 @@ func (s *rtmpSender) close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rtmpSender) handleSendFail(err error) error { return s.restart() }
|
|
||||||
|
|
||||||
// TODO: Write restart func for rtpSender
|
// TODO: Write restart func for rtpSender
|
||||||
// rtpSender implements loadSender for a native udp destination with rtp packetization.
|
// rtpSender implements loadSender for a native udp destination with rtp packetization.
|
||||||
type rtpSender struct {
|
type rtpSender struct {
|
||||||
|
@ -426,5 +430,3 @@ func (s *rtpSender) send() error {
|
||||||
_, err := s.encoder.Write(s.data)
|
_, err := s.encoder.Write(s.data)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rtpSender) handleSendFail(err error) error { return nil }
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ package revid
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -321,7 +320,7 @@ func TestMultiSenderWrite(t *testing.T) {
|
||||||
newDummyLoadSender(false, false),
|
newDummyLoadSender(false, false),
|
||||||
newDummyLoadSender(false, false),
|
newDummyLoadSender(false, false),
|
||||||
}
|
}
|
||||||
ms := multiSender(senders)
|
ms := newMultiSender(senders, log)
|
||||||
|
|
||||||
// Perform some multiSender writes.
|
// Perform some multiSender writes.
|
||||||
const noOfWrites = 5
|
const noOfWrites = 5
|
||||||
|
@ -331,7 +330,7 @@ func TestMultiSenderWrite(t *testing.T) {
|
||||||
|
|
||||||
// Check that the senders got the data correctly from the writes.
|
// Check that the senders got the data correctly from the writes.
|
||||||
for i := byte(0); i < noOfWrites; i++ {
|
for i := byte(0); i < noOfWrites; i++ {
|
||||||
for j, dest := range []loadSender(ms) {
|
for j, dest := range ms.senders {
|
||||||
got := dest.(*dummyLoadSender).buf[i][0]
|
got := dest.(*dummyLoadSender).buf[i][0]
|
||||||
if got != i {
|
if got != i {
|
||||||
t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i)
|
t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i)
|
||||||
|
@ -339,58 +338,3 @@ func TestMultiSenderWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender
|
|
||||||
// fails at a send and does not retry.
|
|
||||||
func TestMultiSenderFailNoRetry(t *testing.T) {
|
|
||||||
senders := []loadSender{
|
|
||||||
newDummyLoadSender(false, false),
|
|
||||||
newDummyLoadSender(false, false),
|
|
||||||
newDummyLoadSender(false, false),
|
|
||||||
}
|
|
||||||
|
|
||||||
ms := multiSender(senders)
|
|
||||||
|
|
||||||
// We will perform two writes. We expect the second write not to be complete,
|
|
||||||
// i.e. the senders should not send anything on this write.
|
|
||||||
ms.Write([]byte{0x00})
|
|
||||||
|
|
||||||
// Make second sender fail a send.
|
|
||||||
const failedSenderIdx = 1
|
|
||||||
failedSender := []loadSender(ms)[failedSenderIdx].(*dummyLoadSender)
|
|
||||||
failedSender.failOnSend = true
|
|
||||||
ms.Write([]byte{0x01})
|
|
||||||
|
|
||||||
// Check that handleSendFail was called.
|
|
||||||
if !failedSender.failHandled {
|
|
||||||
t.Fatal("the failed send was not handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now for next send we don't want to fail.
|
|
||||||
failedSender.failOnSend = false
|
|
||||||
ms.Write([]byte{0x02})
|
|
||||||
|
|
||||||
// Check number of slices sent for each sender and also check data.
|
|
||||||
for i, sender := range []loadSender(ms) {
|
|
||||||
// First check number of slices sent for each sender.
|
|
||||||
wantLen := 3
|
|
||||||
if i == failedSenderIdx {
|
|
||||||
wantLen = 2
|
|
||||||
}
|
|
||||||
curSender := sender.(*dummyLoadSender)
|
|
||||||
gotLen := len(curSender.buf)
|
|
||||||
if gotLen != wantLen {
|
|
||||||
t.Errorf("len of sender that failed is not expected: \nGot: %v\nWant: %v\n", gotLen, wantLen)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now check the quality of the data.
|
|
||||||
wantData := [][]byte{{0x00}, {0x01}, {0x02}}
|
|
||||||
if i == failedSenderIdx {
|
|
||||||
wantData = [][]byte{{0x00}, {0x02}}
|
|
||||||
}
|
|
||||||
gotData := curSender.buf
|
|
||||||
if !reflect.DeepEqual(gotData, wantData) {
|
|
||||||
t.Errorf("unexpect data sent through sender idx: %v. \nGot: %v\nWant: %v\n", i, gotData, wantData)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue