av/revid/mtsSender_test.go

249 lines
6.2 KiB
Go

/*
NAME
mtsSender_test.go
DESCRIPTION
mtsSender_test.go contains tests that validate the functionalilty of the
mtsSender under senders.go. Tests include checks that the mtsSender is
segmenting sends correctly, and also that it can correct discontinuities.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"testing"
"time"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/pes"
)
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
type testSender struct {
Buf [][]byte
tstDiscon bool
disconAt int
curPktNo int
}
func (ts *testSender) send(d []byte) error {
if ts.tstDiscon && ts.curPktNo == ts.disconAt {
return errors.New("could not send")
}
cpy := make([]byte, len(d))
copy(cpy, d)
ts.Buf = append(ts.Buf, cpy)
ts.curPktNo++
return nil
}
func log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
l = "warning"
case logger.Debug:
l = "debug"
case logger.Info:
l = "info"
case logger.Error:
l = "error"
case logger.Fatal:
l = "fatal"
}
msg = l + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
}
type tstPacker struct {
rb *ring.Buffer
}
func (p *tstPacker) Write(d []byte) (int, error) {
n, err := p.rb.Write(d)
p.rb.Flush()
return n, err
}
func TestSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringbuffer tst sender, loadsender and the mpegts encoder
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
tstSender := &testSender{Buf: make([][]byte, 0)}
loadSender := newMtsSender(tstSender, log)
packer := &tstPacker{rb: rb}
encoder := mts.NewEncoder(packer, 25)
// Turn time based psi writing off for encoder
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet no
encoder.Encode([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
err = loadSender.send()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.release()
}
}
result := tstSender.Buf
expectData := 0
for clipNo, clip := range result {
// Check that the clip is the right length
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT
firstPkt := clip[:mts.PacketSize]
var pkt [mts.PacketSize]byte
copy(pkt[:], firstPkt)
pid := (*packet.Packet)(&pkt).PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay
for i := 0; i < len(clip); i += mts.PacketSize {
firstPkt := clip[i : i+mts.PacketSize]
copy(pkt[:], firstPkt)
p := (*packet.Packet)(&pkt)
pid := p.PID()
if pid == mts.VideoPid {
// Mts payload
payload, err := p.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse pes from the mts payload
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the pes packet and convert to int
data := int8(pes.Data()[0])
// Calc expected data in the pes and then check
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
func TestSendFailDiscontinuity(t *testing.T) {
mts.Meta = meta.New()
// Create ringbuffer tst sender, loadsender and the mpegts encoder
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
const disconClipNo = 3
tstSender := &testSender{tstDiscon: true, disconAt: disconClipNo}
loadSender := newMtsSender(tstSender, log)
packer := tstPacker{rb: rb}
encoder := mts.NewEncoder(&packer, 25)
// Turn time based psi writing off for encoder
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet no
encoder.Encode([]byte{byte(i)})
rb.Flush()
next, err := rb.Next(rTimeout)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
_ = loadSender.send()
loadSender.release()
}
result := tstSender.Buf
// First check that we have less clips
expectedLen := ((noOfPacketsToWrite / psiSendCount) - 1)
gotLen := len(result)
if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
}
// Now check that the discontonuity indicator is set at the disconClip pat
disconClip := result[disconClipNo]
firstPkt := disconClip[:mts.PacketSize]
var pkt [mts.PacketSize]byte
copy(pkt[:], firstPkt)
discon, err := (*packet.AdaptationField)((*packet.Packet)(&pkt)).Discontinuity()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
if !discon {
t.Fatalf("Did not get discontinuity indicator for PAT")
}
}