mirror of https://bitbucket.org/ausocean/av.git
551 lines
15 KiB
Go
551 lines
15 KiB
Go
/*
|
|
NAME
|
|
mtsSender_test.go
|
|
|
|
DESCRIPTION
|
|
mtsSender_test.go contains tests that validate the functionalilty of the
|
|
mtsSender under senders.go. Tests include checks that the mtsSender is
|
|
segmenting sends correctly, and also that it can correct discontinuities.
|
|
|
|
AUTHORS
|
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
|
|
|
LICENSE
|
|
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
|
|
|
It is free software: you can redistribute it and/or modify them
|
|
under the terms of the GNU General Public License as published by the
|
|
Free Software Foundation, either version 3 of the License, or (at your
|
|
option) any later version.
|
|
|
|
It is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
in gpl.txt. If not, see http://www.gnu.org/licenses.
|
|
*/
|
|
package revid
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/Comcast/gots/packet"
|
|
"github.com/Comcast/gots/pes"
|
|
|
|
"bitbucket.org/ausocean/av/stream/mts"
|
|
"bitbucket.org/ausocean/av/stream/mts/meta"
|
|
"bitbucket.org/ausocean/utils/logger"
|
|
"bitbucket.org/ausocean/utils/ring"
|
|
)
|
|
|
|
// Ring buffer sizes and read/write timeouts.
|
|
const (
|
|
rbSize = 100
|
|
rbElementSize = 150000
|
|
wTimeout = 10 * time.Millisecond
|
|
rTimeout = 10 * time.Millisecond
|
|
)
|
|
|
|
var (
|
|
errSendFailed = errors.New("send failed")
|
|
)
|
|
|
|
// sender simulates sending of video data, creating discontinuities if
|
|
// testDiscontinuities is set to true.
|
|
type sender struct {
|
|
buf [][]byte
|
|
testDiscontinuities bool
|
|
discontinuityAt int
|
|
currentPkt int
|
|
}
|
|
|
|
// send takes d and neglects if testDiscontinuities is true, returning an error,
|
|
// otherwise d is appended to senders buf.
|
|
func (ts *sender) send(d []byte) error {
|
|
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
|
|
ts.currentPkt++
|
|
return errSendFailed
|
|
}
|
|
cpy := make([]byte, len(d))
|
|
copy(cpy, d)
|
|
ts.buf = append(ts.buf, cpy)
|
|
ts.currentPkt++
|
|
return nil
|
|
}
|
|
|
|
// log implements the required logging func for some of the structs in use
|
|
// within tests.
|
|
func log(lvl int8, msg string, args ...interface{}) {
|
|
var l string
|
|
switch lvl {
|
|
case logger.Warning:
|
|
l = "warning"
|
|
case logger.Debug:
|
|
l = "debug"
|
|
case logger.Info:
|
|
l = "info"
|
|
case logger.Error:
|
|
l = "error"
|
|
case logger.Fatal:
|
|
l = "fatal"
|
|
}
|
|
msg = l + ": " + msg
|
|
for i := 0; i < len(args); i++ {
|
|
msg += " %v"
|
|
}
|
|
fmt.Printf(msg, args)
|
|
}
|
|
|
|
// TestSegment ensures that the mtsSender correctly segments data into clips
|
|
// based on positioning of PSI in the mtsEncoder's output stream.
|
|
func TestMtsSenderSegment(t *testing.T) {
|
|
mts.Meta = meta.New()
|
|
|
|
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
|
|
tstSender := &sender{}
|
|
loadSender := newMtsSender(tstSender, false, log)
|
|
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
|
|
encoder := mts.NewEncoder((*buffer)(rb), 25)
|
|
|
|
// Turn time based PSI writing off for encoder.
|
|
const psiSendCount = 10
|
|
encoder.TimeBasedPsi(false, psiSendCount)
|
|
|
|
const noOfPacketsToWrite = 100
|
|
for i := 0; i < noOfPacketsToWrite; i++ {
|
|
// Insert a payload so that we check that the segmentation works correctly
|
|
// in this regard. Packet number will be used.
|
|
encoder.Write([]byte{byte(i)})
|
|
rb.Flush()
|
|
|
|
for {
|
|
next, err := rb.Next(rTimeout)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
err = loadSender.load(next.Bytes())
|
|
if err != nil {
|
|
t.Fatalf("Unexpected err: %v\n", err)
|
|
}
|
|
|
|
err = loadSender.send()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected err: %v\n", err)
|
|
}
|
|
loadSender.release()
|
|
next.Close()
|
|
next = nil
|
|
}
|
|
}
|
|
|
|
result := tstSender.buf
|
|
expectData := 0
|
|
for clipNo, clip := range result {
|
|
t.Logf("Checking clip: %v\n", clipNo)
|
|
|
|
// Check that the clip is of expected length.
|
|
clipLen := len(clip)
|
|
if clipLen != psiSendCount*mts.PacketSize {
|
|
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
|
|
}
|
|
|
|
// Also check that the first packet is a PAT.
|
|
firstPkt := clip[:mts.PacketSize]
|
|
var pkt packet.Packet
|
|
copy(pkt[:], firstPkt)
|
|
pid := pkt.PID()
|
|
if pid != mts.PatPid {
|
|
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
|
|
}
|
|
|
|
// Check that the clip data is okay.
|
|
for i := 0; i < len(clip); i += mts.PacketSize {
|
|
copy(pkt[:], clip[i:i+mts.PacketSize])
|
|
if pkt.PID() == mts.VideoPid {
|
|
payload, err := pkt.Payload()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected err: %v\n", err)
|
|
}
|
|
|
|
// Parse PES from the MTS payload.
|
|
pes, err := pes.NewPESHeader(payload)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected err: %v\n", err)
|
|
}
|
|
|
|
// Get the data from the PES packet and convert to an int.
|
|
data := int8(pes.Data()[0])
|
|
|
|
// Calc expected data in the PES and then check.
|
|
if data != int8(expectData) {
|
|
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
|
|
}
|
|
expectData++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMtsSenderDiscontinuity(t *testing.T) {
|
|
mts.Meta = meta.New()
|
|
|
|
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
|
|
const clipWithDiscontinuity = 3
|
|
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
|
|
loadSender := newMtsSender(tstSender, false, log)
|
|
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
|
|
encoder := mts.NewEncoder((*buffer)(rb), 25)
|
|
|
|
// Turn time based PSI writing off for encoder.
|
|
const psiSendCount = 10
|
|
encoder.TimeBasedPsi(false, psiSendCount)
|
|
|
|
const noOfPacketsToWrite = 100
|
|
for i := 0; i < noOfPacketsToWrite; i++ {
|
|
// Our payload will just be packet number.
|
|
encoder.Write([]byte{byte(i)})
|
|
rb.Flush()
|
|
|
|
for {
|
|
next, err := rb.Next(rTimeout)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
err = loadSender.load(next.Bytes())
|
|
if err != nil {
|
|
t.Fatalf("Unexpected err: %v\n", err)
|
|
}
|
|
|
|
loadSender.send()
|
|
loadSender.release()
|
|
next.Close()
|
|
next = nil
|
|
}
|
|
}
|
|
|
|
result := tstSender.buf
|
|
|
|
// First check that we have less clips as expected.
|
|
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
|
|
gotLen := len(result)
|
|
if gotLen != expectedLen {
|
|
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
|
|
}
|
|
|
|
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
|
|
disconClip := result[clipWithDiscontinuity]
|
|
firstPkt := disconClip[:mts.PacketSize]
|
|
var pkt packet.Packet
|
|
copy(pkt[:], firstPkt)
|
|
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected err: %v\n", err)
|
|
}
|
|
|
|
if !discon {
|
|
t.Fatalf("Did not get discontinuity indicator for PAT")
|
|
}
|
|
}
|
|
|
|
// TestNewMultiSender checks that newMultiSender performs as expected when an
|
|
// active function is not provided, and when an active function is provided.
|
|
func TestNewMultiSender(t *testing.T) {
|
|
// First test without giving an 'active' function.
|
|
_, err := newMultiSender(nil, nil)
|
|
if err == nil {
|
|
t.Fatal("did not get expected error")
|
|
}
|
|
|
|
// Now test with providing an active function.
|
|
_, err = newMultiSender(nil, func() bool { return true })
|
|
if err != nil {
|
|
t.Fatalf("unespected error: %v", err)
|
|
}
|
|
}
|
|
|
|
// dummyLoadSender is a loadSender implementation that allows us to simulate
|
|
// the behaviour of a loadSender and check that it performas as expected.
|
|
type dummyLoadSender struct {
|
|
data []byte
|
|
buf [][]byte
|
|
failOnSend bool
|
|
failHandled bool
|
|
retry bool
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// newDummyLoadSender returns a pointer to a new dummyLoadSender.
|
|
func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender {
|
|
return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry}
|
|
}
|
|
|
|
// load takes a byte slice and assigns it to the dummyLoadSenders data slice.
|
|
func (s *dummyLoadSender) load(d []byte) error {
|
|
s.data = d
|
|
return nil
|
|
}
|
|
|
|
// send will append to dummyLoadSender's buf slice, only if failOnSend is false.
|
|
// If failOnSend is set to true, we expect that data sent won't be written to
|
|
// the buf simulating a failed send.
|
|
func (s *dummyLoadSender) send() error {
|
|
if !s.getFailOnSend() {
|
|
s.buf = append(s.buf, s.data)
|
|
return nil
|
|
}
|
|
s.failHandled = false
|
|
return errSendFailed
|
|
}
|
|
|
|
func (s *dummyLoadSender) getFailOnSend() bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.failOnSend
|
|
}
|
|
|
|
// release sets dummyLoadSender's data slice to nil. data can be checked to see
|
|
// if release has been called at the right time.
|
|
func (s *dummyLoadSender) release() {
|
|
s.data = nil
|
|
}
|
|
|
|
func (s *dummyLoadSender) close() error { return nil }
|
|
|
|
// handleSendFail simply sets the failHandled flag to true. This can be checked
|
|
// to see if handleSendFail has been called by the multiSender at the right time.
|
|
func (s *dummyLoadSender) handleSendFail(err error) error {
|
|
s.failHandled = true
|
|
return nil
|
|
}
|
|
|
|
func (s *dummyLoadSender) retrySend() bool { return s.retry }
|
|
|
|
// TestMultiSenderWrite checks that we can do basic writing to multiple senders
|
|
// using the multiSender.
|
|
func TestMultiSenderWrite(t *testing.T) {
|
|
senders := []loadSender{
|
|
newDummyLoadSender(false, false),
|
|
newDummyLoadSender(false, false),
|
|
newDummyLoadSender(false, false),
|
|
}
|
|
ms, err := newMultiSender(senders, func() bool { return true })
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// Perform some multiSender writes.
|
|
const noOfWrites = 5
|
|
for i := byte(0); i < noOfWrites; i++ {
|
|
ms.Write([]byte{i})
|
|
}
|
|
|
|
// Check that the senders got the data correctly from the writes.
|
|
for i := byte(0); i < noOfWrites; i++ {
|
|
for j, dest := range ms.senders {
|
|
got := dest.(*dummyLoadSender).buf[i][0]
|
|
if got != i {
|
|
t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestMultiSenderNotActiveNoRetry checks that if the active func passed to
|
|
// newMultiSender returns false before a write, or in the middle of write with
|
|
// retries, then we return from Write as expected.
|
|
func TestMultiSenderNotActiveNoRetry(t *testing.T) {
|
|
senders := []loadSender{
|
|
newDummyLoadSender(false, false),
|
|
newDummyLoadSender(false, false),
|
|
newDummyLoadSender(false, false),
|
|
}
|
|
|
|
// This will allow us to simulate a change in running state of
|
|
// multiSender's 'owner'.
|
|
active := true
|
|
activeFunc := func() bool {
|
|
return active
|
|
}
|
|
|
|
ms, err := newMultiSender(senders, activeFunc)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// We will perform two writes. We expect the second write not to be complete,
|
|
// i.e. the senders should not send anything on this write.
|
|
ms.Write([]byte{0x00})
|
|
active = false
|
|
ms.Write([]byte{0x01})
|
|
|
|
// Check that the senders only sent data once.
|
|
for _, dest := range ms.senders {
|
|
if len(dest.(*dummyLoadSender).buf) != 1 {
|
|
t.Errorf("length of sender buf is not 1 as expected")
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestMultiSenderNotActiveRetry checks that we correctly returns from a call to
|
|
// multiSender.Write when the active callback func return false during repeated
|
|
// send retries.
|
|
func TestMultiSenderNotActiveRetry(t *testing.T) {
|
|
senders := []loadSender{
|
|
newDummyLoadSender(false, false),
|
|
}
|
|
|
|
// Active will simulate the running state of the multiSender's 'owner'.
|
|
active := true
|
|
|
|
// We will run the ms.Write as routine so we need some sync.
|
|
var mu sync.Mutex
|
|
|
|
// Once we use setActive to change the state of the fake owner, this will
|
|
// return false and we expect the ms.Write method to return from the continous
|
|
// send retry state.
|
|
activeFunc := func() bool {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
return active
|
|
}
|
|
|
|
ms, err := newMultiSender(senders, activeFunc)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// We run this in background so that we can change running state during the
|
|
// the write. We then expect done to be true after some period of time.
|
|
done := false
|
|
go func() {
|
|
ms.Write([]byte{0x00})
|
|
done = true
|
|
}()
|
|
|
|
// Wait for half a second and then change the active state.
|
|
time.Sleep(500 * time.Millisecond)
|
|
mu.Lock()
|
|
active = false
|
|
mu.Unlock()
|
|
|
|
// Wait half a second for the routine to return and check that done is true.
|
|
time.Sleep(500 * time.Millisecond)
|
|
if !done {
|
|
t.Fatal("multiSender.Write did not return as expected with active=false")
|
|
}
|
|
}
|
|
|
|
// TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender
|
|
// fails at a send and does not retry.
|
|
func TestMultiSenderFailNoRetry(t *testing.T) {
|
|
senders := []loadSender{
|
|
newDummyLoadSender(false, false),
|
|
newDummyLoadSender(false, false),
|
|
newDummyLoadSender(false, false),
|
|
}
|
|
|
|
ms, err := newMultiSender(senders, func() bool { return true })
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// We will perform two writes. We expect the second write not to be complete,
|
|
// i.e. the senders should not send anything on this write.
|
|
ms.Write([]byte{0x00})
|
|
|
|
// Make second sender fail a send.
|
|
const failedSenderIdx = 1
|
|
failedSender := ms.senders[failedSenderIdx].(*dummyLoadSender)
|
|
failedSender.failOnSend = true
|
|
ms.Write([]byte{0x01})
|
|
|
|
// Check that handleSendFail was called.
|
|
if !failedSender.failHandled {
|
|
t.Fatal("the failed send was not handled")
|
|
}
|
|
|
|
// Now for next send we don't want to fail.
|
|
failedSender.failOnSend = false
|
|
ms.Write([]byte{0x02})
|
|
|
|
// Check number of slices sent for each sender and also check data.
|
|
for i, sender := range ms.senders {
|
|
// First check number of slices sent for each sender.
|
|
wantLen := 3
|
|
if i == failedSenderIdx {
|
|
wantLen = 2
|
|
}
|
|
curSender := sender.(*dummyLoadSender)
|
|
gotLen := len(curSender.buf)
|
|
if gotLen != wantLen {
|
|
t.Errorf("len of sender that failed is not expected: \nGot: %v\nWant: %v\n", gotLen, wantLen)
|
|
}
|
|
|
|
// Now check the quality of the data.
|
|
wantData := [][]byte{{0x00}, {0x01}, {0x02}}
|
|
if i == failedSenderIdx {
|
|
wantData = [][]byte{{0x00}, {0x02}}
|
|
}
|
|
gotData := curSender.buf
|
|
if !reflect.DeepEqual(gotData, wantData) {
|
|
t.Errorf("unexpect data sent through sender idx: %v. \nGot: %v\nWant: %v\n", i, gotData, wantData)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestMultiSenderFailRetry checks that a if a sender is set to retry on failed
|
|
// sends, that it does so repeatedly until it can successfully send.
|
|
func TestMultiSenderFailRetry(t *testing.T) {
|
|
// NB: This is only being tested with one sender - this is AusOcean's use case.
|
|
senders := []loadSender{newDummyLoadSender(false, true)}
|
|
ms, err := newMultiSender(senders, func() bool { return true })
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// Perform one write with successful send.
|
|
ms.Write([]byte{0x00})
|
|
|
|
// Now cause sender to fail on next write.
|
|
failedSender := ms.senders[0].(*dummyLoadSender)
|
|
failedSender.failOnSend = true
|
|
|
|
// Wrap next write in a routine. It will keep trying to send until we set
|
|
// failOnSend to false.
|
|
done := false
|
|
go func() {
|
|
ms.Write([]byte{0x01})
|
|
done = true
|
|
}()
|
|
|
|
// Now set failOnSend to false.
|
|
failedSender.mu.Lock()
|
|
failedSender.failOnSend = false
|
|
failedSender.mu.Unlock()
|
|
|
|
// Sleep and then check that we've successfully returned from the write.
|
|
time.Sleep(10 * time.Millisecond)
|
|
if done != true {
|
|
t.Fatal("did not exit write when send was successful")
|
|
}
|
|
|
|
// Write on last time.
|
|
ms.Write([]byte{0x02})
|
|
|
|
// Check that all the data is there.
|
|
got := failedSender.buf
|
|
want := [][]byte{{0x00}, {0x01}, {0x02}}
|
|
if !reflect.DeepEqual(got, want) {
|
|
t.Errorf("sender did not send expected data. \nGot: %v\nWant: %v\n", got, want)
|
|
}
|
|
}
|