revid: mtsSender_test.go passing segmenting and discontinuity tests

This commit is contained in:
saxon 2019-02-17 05:32:44 +10:30
parent 3f3d587eeb
commit 819c9a784c
2 changed files with 41 additions and 56 deletions

View File

@ -59,6 +59,8 @@ type testSender struct {
func (ts *testSender) send(d []byte) error { func (ts *testSender) send(d []byte) error {
if ts.tstDiscon && ts.curPktNo == ts.disconAt { if ts.tstDiscon && ts.curPktNo == ts.disconAt {
fmt.Println("SendFailed")
ts.curPktNo++
return errors.New("could not send") return errors.New("could not send")
} }
cpy := make([]byte, len(d)) cpy := make([]byte, len(d))
@ -103,7 +105,7 @@ func TestSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringbuffer tst sender, loadsender and the mpegts encoder // Create ringbuffer tst sender, loadsender and the mpegts encoder
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
tstSender := &testSender{Buf: make([][]byte, 0)} tstSender := &testSender{}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstSender, log)
packer := &tstPacker{rb: rb} packer := &tstPacker{rb: rb}
encoder := mts.NewEncoder(packer, 25) encoder := mts.NewEncoder(packer, 25)
@ -141,6 +143,7 @@ func TestSegment(t *testing.T) {
result := tstSender.Buf result := tstSender.Buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is the right length // Check that the clip is the right length
clipLen := len(clip) clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize { if clipLen != psiSendCount*mts.PacketSize {
@ -208,25 +211,27 @@ func TestSendFailDiscontinuity(t *testing.T) {
encoder.Encode([]byte{byte(i)}) encoder.Encode([]byte{byte(i)})
rb.Flush() rb.Flush()
next, err := rb.Next(rTimeout) for {
if err != nil { next, err := rb.Next(rTimeout)
t.Fatalf("Unexpected err: %v\n", err) if err != nil {
break
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
_ = loadSender.send()
loadSender.release()
} }
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
_ = loadSender.send()
loadSender.release()
} }
result := tstSender.Buf result := tstSender.Buf
// First check that we have less clips // First check that we have less clips
expectedLen := ((noOfPacketsToWrite / psiSendCount) - 1) expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result) gotLen := len(result)
if gotLen != expectedLen { if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)

View File

@ -132,13 +132,14 @@ func (s *fileSender) close() error {
// clips based on PSI. It also fixes accounts for discontinuities by setting // clips based on PSI. It also fixes accounts for discontinuities by setting
// the discontinuity indicator for the first packet of a clip. // the discontinuity indicator for the first packet of a clip.
type mtsSender struct { type mtsSender struct {
sender sender sender sender
buf []byte buf []byte
pkt [mts.PacketSize]byte next []byte
fail bool pkt [mts.PacketSize]byte
discard bool failed bool
repairer *mts.DiscontinuityRepairer discarded bool
chunk *ring.Chunk repairer *mts.DiscontinuityRepairer
chunk *ring.Chunk
} }
// newmtsSender returns a new mtsSender. // newmtsSender returns a new mtsSender.
@ -158,50 +159,29 @@ func (s *mtsSender) load(c *ring.Chunk) error {
// send checks the most recently loaded packet and if it is a PAT then the clip // send checks the most recently loaded packet and if it is a PAT then the clip
// in s.buf is sent, otherwise the packet is added to s.buf. // in s.buf is sent, otherwise the packet is added to s.buf.
func (s *mtsSender) send() error { func (s *mtsSender) send() error {
copy(s.pkt[:], s.chunk.Bytes()) if s.next != nil {
s.buf = append(s.buf, s.next...)
}
bytes := s.chunk.Bytes()
cpy := make([]byte, len(bytes))
copy(cpy, bytes)
s.next = cpy
copy(s.pkt[:], cpy)
p := (*packet.Packet)(&s.pkt) p := (*packet.Packet)(&s.pkt)
pid := p.PID() pid := p.PID()
cc := p.ContinuityCounter() if pid == mts.PatPid && len(s.buf) > 0 {
if s.discard {
if pid != mts.PatPid {
return nil
}
s.discard = false
}
if pid == mts.VideoPid {
expect, exists := s.repairer.ExpectedCC(pid)
s.repairer.IncExpectedCC(pid)
if !exists {
s.repairer.SetExpectedCC(pid, cc)
s.repairer.IncExpectedCC(pid)
} else if cc != expect {
s.repairer.SetExpectedCC(pid, cc)
s.discard = true
s.buf = s.buf[:0]
return nil
}
}
if s.fail || (pid == mts.PatPid && len(s.buf) != 0) {
err := s.fixAndSend() err := s.fixAndSend()
if err != nil { if err != nil {
s.failed() s.failed = true
s.repairer.Failed()
return err return err
} }
s.fail = false
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
s.buf = append(s.buf, s.chunk.Bytes()...)
return nil return nil
} }
// failed sets the s.fail flag to true, and let's the discontinuity
// repairer know that there has been a failed send.
func (s *mtsSender) failed() {
s.fail = true
s.repairer.Failed()
}
// fixAndSend uses the discontinuity repairer to ensure there is not a // fixAndSend uses the discontinuity repairer to ensure there is not a
// discontinuity, and if so sets the discontinuity indicator of the PAT packet. // discontinuity, and if so sets the discontinuity indicator of the PAT packet.
func (ms *mtsSender) fixAndSend() error { func (ms *mtsSender) fixAndSend() error {
@ -221,9 +201,9 @@ func (s *mtsSender) close() error { return nil }
// release will set the s.fail flag to fals and clear the buffer if // release will set the s.fail flag to fals and clear the buffer if
// the previous send was a fail. // the previous send was a fail.
func (s *mtsSender) release() { func (s *mtsSender) release() {
if s.fail { if s.failed {
s.fail = false
s.buf = s.buf[:0] s.buf = s.buf[:0]
s.failed = false
} }
s.chunk.Close() s.chunk.Close()
s.chunk = nil s.chunk = nil