From 3f3d587eeb2604dea70c4183102e06b42cbabd72 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 03:35:59 +1030 Subject: [PATCH] revid: mtsSender test for segmenting actually working now --- revid/mtsSender_test.go | 64 +++++++++++++++++++------------------ revid/senders.go | 2 ++ stream/mts/discontinuity.go | 4 +-- stream/mts/encoder.go | 6 ++-- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 14144c54..3a275ba0 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -29,6 +29,7 @@ LICENSE package revid import ( + "errors" "fmt" "testing" "time" @@ -58,9 +59,11 @@ type testSender struct { func (ts *testSender) send(d []byte) error { if ts.tstDiscon && ts.curPktNo == ts.disconAt { - return nil + return errors.New("could not send") } - ts.Buf = append(ts.Buf, d) + cpy := make([]byte, len(d)) + copy(cpy, d) + ts.Buf = append(ts.Buf, cpy) ts.curPktNo++ return nil } @@ -100,10 +103,10 @@ func TestSegment(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - tstSender := &testSender{} + tstSender := &testSender{Buf: make([][]byte, 0)} loadSender := newMtsSender(tstSender, log) - packer := tstPacker{rb: rb} - encoder := mts.NewEncoder(&packer, 25) + packer := &tstPacker{rb: rb} + encoder := mts.NewEncoder(packer, 25) // Turn time based psi writing off for encoder const psiSendCount = 10 @@ -115,29 +118,32 @@ func TestSegment(t *testing.T) { encoder.Encode([]byte{byte(i)}) rb.Flush() - next, err := rb.Next(rTimeout) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } - err = loadSender.load(next) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } - err = loadSender.send() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + err = loadSender.send() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } - loadSender.release() + loadSender.release() + } } result := tstSender.Buf + expectData := 0 for clipNo, clip := range result { // Check that the clip is the right length clipLen := len(clip) - if clipLen != psiSendCount { + if clipLen != psiSendCount*mts.PacketSize { t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } @@ -152,6 +158,7 @@ func TestSegment(t *testing.T) { // Check that the clip data is okay for i := 0; i < len(clip); i += mts.PacketSize { + firstPkt := clip[i : i+mts.PacketSize] copy(pkt[:], firstPkt) p := (*packet.Packet)(&pkt) pid := p.PID() @@ -169,19 +176,18 @@ func TestSegment(t *testing.T) { } // Get the data from the pes packet and convert to int - data := int(pes.Data()[0]) + data := int8(pes.Data()[0]) // Calc expected data in the pes and then check - expectedData := clipNo*10 + ((i / mts.PacketSize) - 2) - if data != expectedData { - t.Fatalf("Did not get expected pkt data. Got: %v, want: %v\n", data, expectedData) + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) } + expectData++ } } } } -/* func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder @@ -212,10 +218,7 @@ func TestSendFailDiscontinuity(t *testing.T) { t.Fatalf("Unexpected err: %v\n", err) } - err = loadSender.send() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + _ = loadSender.send() loadSender.release() } @@ -226,9 +229,8 @@ func TestSendFailDiscontinuity(t *testing.T) { expectedLen := ((noOfPacketsToWrite / psiSendCount) - 1) gotLen := len(result) if gotLen != expectedLen { - t.Fatalf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) + t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } - // Now check that the discontonuity indicator is set at the disconClip pat disconClip := result[disconClipNo] firstPkt := disconClip[:mts.PacketSize] @@ -242,5 +244,5 @@ func TestSendFailDiscontinuity(t *testing.T) { if !discon { t.Fatalf("Did not get discontinuity indicator for PAT") } + } -*/ diff --git a/revid/senders.go b/revid/senders.go index 870cf15f..54712cd1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -171,8 +171,10 @@ func (s *mtsSender) send() error { } if pid == mts.VideoPid { expect, exists := s.repairer.ExpectedCC(pid) + s.repairer.IncExpectedCC(pid) if !exists { s.repairer.SetExpectedCC(pid, cc) + s.repairer.IncExpectedCC(pid) } else if cc != expect { s.repairer.SetExpectedCC(pid, cc) s.discard = true diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 30f8dfc3..b7cc3bed 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -67,7 +67,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { } cc := p.ContinuityCounter() expect, exists := dr.ExpectedCC(pid) - dr.incExpectedCC(pid) + dr.IncExpectedCC(pid) if !exists { dr.SetExpectedCC(pid, cc) } else if cc != int(expect) { @@ -95,7 +95,7 @@ func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) { } // incExpectedCC increments the expected cc. -func (dr *DiscontinuityRepairer) incExpectedCC(pid int) { +func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) { dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index e262abe8..fec41608 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,19 +171,19 @@ const ( func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { e.timeBasedPsi = b e.psiSendCount = sendCount - e.pktCount = e.psiSendCount + 1 + e.pktCount = e.psiSendCount } // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { now := time.Now() - if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount > e.psiSendCount { + if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount >= e.psiSendCount { + e.pktCount = 0 err := e.writePSI() if err != nil { return err } - e.pktCount = 0 e.psiLastTime = now }