revid: wrote test for mtsSender_test.go to see if the mtsSender is segmenting properly

This commit is contained in:
saxon 2019-02-16 16:33:39 +10:30
parent 4ddf87d63d
commit bb091f5961
4 changed files with 141 additions and 10 deletions

View File

@ -28,10 +28,114 @@ LICENSE
*/
package revid
import "testing"
import (
"fmt"
"testing"
"time"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"github.com/Comcast/gots/packet"
)
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
type testSender struct {
Buf [][]byte
}
func (ts *testSender) send(d []byte) error {
ts.Buf = append(ts.Buf, d)
return nil
}
func log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
l = "warning"
case logger.Debug:
l = "debug"
case logger.Info:
l = "info"
case logger.Error:
l = "error"
case logger.Fatal:
l = "fatal"
}
msg = l + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
}
type Chunk struct {
buf []byte
off int
owner *ring.Buffer
}
func TestSegment(t *testing.T) {
// Create ringbuffer tst sender, loadsender and the mpegts encoder
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
tstSender := &testSender{}
loadSender := newMtsSender(tstSender, log)
encoder := mts.NewEncoder(rb, 25)
// Turn time based psi writing off for encoder
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet no
encoder.Encode([]byte{byte(i)})
next, err := rb.Next(rTimeout)
if err != nil {
unexpectErr(err, t)
}
err = loadSender.load(next)
if err != nil {
unexpectErr(err, t)
}
err = loadSender.send()
if err != nil {
unexpectErr(err, t)
}
}
result := tstSender.Buf
for clipNo, clip := range result {
// Check that the clip is the right length
clipLen := len(clip)
if clipLen != psiSendCount {
t.Errorf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount, clip)
}
// Also check that the first packet is a PAT
firstPkt := clip[:mts.PacketSize]
var pkt [mts.PacketSize]byte
copy(pkt[:], firstPkt)
pid := (*packet.Packet)(&pkt).PID()
if pid != mts.PatPid {
t.Errorf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
}
}
func unexpectErr(err error, t *testing.T) {
t.Errorf("Unexpected err: %v\n", err)
}
func TestDiscontinuity(t *testing.T) {

View File

@ -171,7 +171,11 @@ func (s *mtsSender) send() error {
}
if pid == mts.VideoPid {
if cc != s.repairer.expectedCC(pid) {
expect, exists := s.repairer.ExpectedCC(pid)
if !exists {
s.repairer.SetExpectedCC(pid, cc)
} else if cc != expect {
s.repairer.SetExpectedCC(pid, cc)
s.discard = true
s.buf = s.buf[:0]
return nil
@ -221,6 +225,8 @@ func (s *mtsSender) release() {
s.fail = false
s.buf = s.buf[:0]
}
s.chunk.Close()
s.chunk = nil
}
// httpSender implements loadSender for posting HTTP to NetReceiver
@ -513,7 +519,10 @@ func (s *rtpSender) load(c *ring.Chunk) error {
func (s *rtpSender) close() error { return nil }
func (s *rtpSender) release() {}
func (s *rtpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)

View File

@ -66,10 +66,10 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error {
panic("Clip to repair must have PAT first")
}
cc := p.ContinuityCounter()
expect, exists := dr.expectedCC(pid)
expect, exists := dr.ExpectedCC(pid)
dr.incExpectedCC(pid)
if !exists {
dr.setExpectedCC(pid, cc)
dr.SetExpectedCC(pid, cc)
} else if cc != int(expect) {
if packet.ContainsAdaptationField(p) {
(*packet.AdaptationField)(p).SetDiscontinuity(true)
@ -79,7 +79,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error {
return err
}
}
dr.setExpectedCC(pid, cc)
dr.SetExpectedCC(pid, cc)
copy(d[:PacketSize], pkt[:])
}
return nil
@ -87,7 +87,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error {
// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16
// and false is returned.
func (dr *DiscontinuityRepairer) expectedCC(pid int) (int, bool) {
func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) {
if dr.expCC[pid] == 16 {
return 16, false
}
@ -105,6 +105,6 @@ func (dr *DiscontinuityRepairer) decExpectedCC(pid int) {
}
// setExpectedCC sets the expected cc.
func (dr *DiscontinuityRepairer) setExpectedCC(pid, cc int) {
func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) {
dr.expCC[pid] = cc
}

View File

@ -85,7 +85,8 @@ var (
)
const (
psiInterval = 1 * time.Second
psiInterval = 1 * time.Second
psiSendCount = 7
)
// Meta allows addition of metadata to encoded mts from outside of this pkg.
@ -130,6 +131,10 @@ type Encoder struct {
continuity map[int]byte
timeBasedPsi bool
pktCount int
psiSendCount int
psiLastTime time.Time
}
@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder {
frameInterval: time.Duration(float64(time.Second) / fps),
ptsOffset: ptsOffset,
timeBasedPsi: true,
pktCount: 8,
continuity: map[int]byte{
patPid: 0,
pmtPid: 0,
@ -159,15 +168,21 @@ const (
hasPTS = 0x2
)
func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
e.timeBasedPsi = b
e.psiSendCount = sendCount
}
// generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error {
now := time.Now()
if now.Sub(e.psiLastTime) > psiInterval {
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount > e.psiSendCount {
err := e.writePSI()
if err != nil {
return err
}
e.pktCount = 0
e.psiLastTime = now
}
@ -204,6 +219,7 @@ func (e *Encoder) Encode(nalu []byte) error {
if err != nil {
return err
}
e.pktCount++
}
e.tick()
@ -226,6 +242,7 @@ func (e *Encoder) writePSI() error {
if err != nil {
return err
}
e.pktCount++
pmtTable, err = updateMeta(pmtTable)
if err != nil {
return err
@ -243,6 +260,7 @@ func (e *Encoder) writePSI() error {
if err != nil {
return err
}
e.pktCount++
return nil
}