revid: made minimalHttpSender implement io.Writer

This commit is contained in:
Saxon 2019-04-01 12:02:15 +10:30
parent 24e484c07f
commit 5a67e71fe4
2 changed files with 16 additions and 24 deletions

View File

@ -45,14 +45,6 @@ import (
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
// Sender is intended to provided functionality for the sending of a byte slice
// to an implemented destination.
type Sender interface {
// send takes the bytes slice d and sends to a particular destination as
// implemented.
send(d []byte) error
}
// Log is used by the multiSender. // Log is used by the multiSender.
type Log func(level int8, message string, params ...interface{}) type Log func(level int8, message string, params ...interface{})
@ -105,8 +97,8 @@ func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, a
} }
// send takes a bytes slice d and sends to http using s' http client. // send takes a bytes slice d and sends to http using s' http client.
func (s *minimalHttpSender) send(d []byte) error { func (s *minimalHttpSender) Write(d []byte) (int, error) {
return httpSend(d, s.client, s.log) return len(d), httpSend(d, s.client, s.log)
} }
// loadSender is a destination to send a *ring.Chunk to. // loadSender is a destination to send a *ring.Chunk to.
@ -181,7 +173,7 @@ func (s *fileSender) close() error { return s.file.Close() }
// lengthed clips based on PSI. It also fixes accounts for discontinuities by // lengthed clips based on PSI. It also fixes accounts for discontinuities by
// setting the discontinuity indicator for the first packet of a clip. // setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct { type mtsSender struct {
sender Sender dst io.Writer
buf []byte buf []byte
next []byte next []byte
pkt packet.Packet pkt packet.Packet
@ -190,9 +182,9 @@ type mtsSender struct {
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{ return &mtsSender{
sender: s, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
} }
} }
@ -210,7 +202,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
err := s.repairer.Repair(s.buf) err := s.repairer.Repair(s.buf)
if err == nil { if err == nil {
err = s.sender.send(s.buf) _, err = s.dst.Write(s.buf)
if err == nil { if err == nil {
goto done goto done
} }

View File

@ -59,7 +59,7 @@ var (
// sender simulates sending of video data, creating discontinuities if // sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true. // testDiscontinuities is set to true.
type sender struct { type destination struct {
buf [][]byte buf [][]byte
testDiscontinuities bool testDiscontinuities bool
discontinuityAt int discontinuityAt int
@ -68,17 +68,17 @@ type sender struct {
// send takes d and neglects if testDiscontinuities is true, returning an error, // send takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf. // otherwise d is appended to senders buf.
func (ts *sender) send(d []byte) error { func (ts *destination) Write(d []byte) (int, error) {
//fmt.Println("sending") //fmt.Println("sending")
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.currentPkt++ ts.currentPkt++
return errSendFailed return 0, errSendFailed
} }
cpy := make([]byte, len(d)) cpy := make([]byte, len(d))
copy(cpy, d) copy(cpy, d)
ts.buf = append(ts.buf, cpy) ts.buf = append(ts.buf, cpy)
ts.currentPkt++ ts.currentPkt++
return nil return len(d), nil
} }
// log implements the required logging func for some of the structs in use // log implements the required logging func for some of the structs in use
@ -110,8 +110,8 @@ func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder. // Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{} tstDst := &destination{}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstDst, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25)
@ -141,7 +141,7 @@ func TestMtsSenderSegment(t *testing.T) {
} }
} }
result := tstSender.buf result := tstDst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)
@ -194,8 +194,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create ringBuffer sender, loadSender and the MPEGTS encoder. // Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3 const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstDst, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25)
@ -224,7 +224,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
} }
} }
result := tstSender.buf result := tstDst.buf
// First check that we have less clips as expected. // First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1