av/revid/senders_test.go

520 lines
14 KiB
Go

/*
NAME
mtsSender_test.go
DESCRIPTION
mtsSender_test.go contains tests that validate the functionalilty of the
mtsSender under senders.go. Tests include checks that the mtsSender is
segmenting sends correctly, and also that it can correct discontinuities.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/pes"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
var (
errSendFailed = errors.New("send failed")
)
// sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true.
type sender struct {
buf [][]byte
testDiscontinuities bool
discontinuityAt int
currentPkt int
}
// send takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *sender) send(d []byte) error {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.currentPkt++
return errSendFailed
}
cpy := make([]byte, len(d))
copy(cpy, d)
ts.buf = append(ts.buf, cpy)
ts.currentPkt++
return nil
}
// log implements the required logging func for some of the structs in use
// within tests.
func log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
l = "warning"
case logger.Debug:
l = "debug"
case logger.Info:
l = "info"
case logger.Error:
l = "error"
case logger.Fatal:
l = "fatal"
}
msg = l + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
}
// TestSegment ensures that the mtsSender correctly segments data into clips
// based on positioning of PSI in the mtsEncoder's output stream.
func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{}
loadSender := newMtsSender(tstSender, false, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet number will be used.
encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
err = loadSender.send()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.release()
next.Close()
next = nil
}
}
result := tstSender.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay.
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0])
// Calc expected data in the PES and then check.
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
func TestMtsSenderDiscontinuity(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, false, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet number.
encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.send()
loadSender.release()
next.Close()
next = nil
}
}
result := tstSender.buf
// First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result)
if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
}
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
disconClip := result[clipWithDiscontinuity]
firstPkt := disconClip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
if !discon {
t.Fatalf("Did not get discontinuity indicator for PAT")
}
}
// dummyLoadSender is a loadSender implementation that allows us to simulate
// the behaviour of a loadSender and check that it performas as expected.
type dummyLoadSender struct {
data []byte
buf [][]byte
failOnSend bool
failHandled bool
retry bool
mu sync.Mutex
}
// newDummyLoadSender returns a pointer to a new dummyLoadSender.
func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender {
return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry}
}
// load takes a byte slice and assigns it to the dummyLoadSenders data slice.
func (s *dummyLoadSender) load(d []byte) error {
s.data = d
return nil
}
// send will append to dummyLoadSender's buf slice, only if failOnSend is false.
// If failOnSend is set to true, we expect that data sent won't be written to
// the buf simulating a failed send.
func (s *dummyLoadSender) send() error {
if !s.getFailOnSend() {
s.buf = append(s.buf, s.data)
return nil
}
s.failHandled = false
return errSendFailed
}
func (s *dummyLoadSender) getFailOnSend() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.failOnSend
}
// release sets dummyLoadSender's data slice to nil. data can be checked to see
// if release has been called at the right time.
func (s *dummyLoadSender) release() {
s.data = nil
}
func (s *dummyLoadSender) close() error { return nil }
// handleSendFail simply sets the failHandled flag to true. This can be checked
// to see if handleSendFail has been called by the multiSender at the right time.
func (s *dummyLoadSender) handleSendFail(err error) error {
s.failHandled = true
return nil
}
func (s *dummyLoadSender) retrySend() bool { return s.retry }
// TestMultiSenderWrite checks that we can do basic writing to multiple senders
// using the multiSender.
func TestMultiSenderWrite(t *testing.T) {
senders := []loadSender{
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
}
ms := newMultiSender(senders, func() bool { return true })
// Perform some multiSender writes.
const noOfWrites = 5
for i := byte(0); i < noOfWrites; i++ {
ms.Write([]byte{i})
}
// Check that the senders got the data correctly from the writes.
for i := byte(0); i < noOfWrites; i++ {
for j, dest := range ms.senders {
got := dest.(*dummyLoadSender).buf[i][0]
if got != i {
t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i)
}
}
}
}
// TestMultiSenderNotActiveNoRetry checks that if the active func passed to
// newMultiSender returns false before a write, or in the middle of write with
// retries, then we return from Write as expected.
func TestMultiSenderNotActiveNoRetry(t *testing.T) {
senders := []loadSender{
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
}
// This will allow us to simulate a change in running state of
// multiSender's 'owner'.
active := true
activeFunc := func() bool {
return active
}
ms := newMultiSender(senders, activeFunc)
// We will perform two writes. We expect the second write not to be complete,
// i.e. the senders should not send anything on this write.
ms.Write([]byte{0x00})
active = false
ms.Write([]byte{0x01})
// Check that the senders only sent data once.
for _, dest := range ms.senders {
if len(dest.(*dummyLoadSender).buf) != 1 {
t.Errorf("length of sender buf is not 1 as expected")
}
}
}
// TestMultiSenderNotActiveRetry checks that we correctly returns from a call to
// multiSender.Write when the active callback func return false during repeated
// send retries.
func TestMultiSenderNotActiveRetry(t *testing.T) {
senders := []loadSender{
newDummyLoadSender(false, false),
}
// Active will simulate the running state of the multiSender's 'owner'.
active := true
// We will run the ms.Write as routine so we need some sync.
var mu sync.Mutex
// Once we use setActive to change the state of the fake owner, this will
// return false and we expect the ms.Write method to return from the continous
// send retry state.
activeFunc := func() bool {
mu.Lock()
defer mu.Unlock()
return active
}
ms := newMultiSender(senders, activeFunc)
// We run this in background so that we can change running state during the
// the write. We then expect done to be true after some period of time.
done := false
go func() {
ms.Write([]byte{0x00})
done = true
}()
// Wait for half a second and then change the active state.
time.Sleep(500 * time.Millisecond)
mu.Lock()
active = false
mu.Unlock()
// Wait half a second for the routine to return and check that done is true.
time.Sleep(500 * time.Millisecond)
if !done {
t.Fatal("multiSender.Write did not return as expected with active=false")
}
}
// TestMultiSenderFailNoRetry checks that behaviour is as expected when a sender
// fails at a send and does not retry.
func TestMultiSenderFailNoRetry(t *testing.T) {
senders := []loadSender{
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
newDummyLoadSender(false, false),
}
ms := newMultiSender(senders, func() bool { return true })
// We will perform two writes. We expect the second write not to be complete,
// i.e. the senders should not send anything on this write.
ms.Write([]byte{0x00})
// Make second sender fail a send.
const failedSenderIdx = 1
failedSender := ms.senders[failedSenderIdx].(*dummyLoadSender)
failedSender.failOnSend = true
ms.Write([]byte{0x01})
// Check that handleSendFail was called.
if !failedSender.failHandled {
t.Fatal("the failed send was not handled")
}
// Now for next send we don't want to fail.
failedSender.failOnSend = false
ms.Write([]byte{0x02})
// Check number of slices sent for each sender and also check data.
for i, sender := range ms.senders {
// First check number of slices sent for each sender.
wantLen := 3
if i == failedSenderIdx {
wantLen = 2
}
curSender := sender.(*dummyLoadSender)
gotLen := len(curSender.buf)
if gotLen != wantLen {
t.Errorf("len of sender that failed is not expected: \nGot: %v\nWant: %v\n", gotLen, wantLen)
}
// Now check the quality of the data.
wantData := [][]byte{{0x00}, {0x01}, {0x02}}
if i == failedSenderIdx {
wantData = [][]byte{{0x00}, {0x02}}
}
gotData := curSender.buf
if !reflect.DeepEqual(gotData, wantData) {
t.Errorf("unexpect data sent through sender idx: %v. \nGot: %v\nWant: %v\n", i, gotData, wantData)
}
}
}
// TestMultiSenderFailRetry checks that a if a sender is set to retry on failed
// sends, that it does so repeatedly until it can successfully send.
func TestMultiSenderFailRetry(t *testing.T) {
// NB: This is only being tested with one sender - this is AusOcean's use case.
senders := []loadSender{newDummyLoadSender(false, true)}
ms := newMultiSender(senders, func() bool { return true })
// Perform one write with successful send.
ms.Write([]byte{0x00})
// Now cause sender to fail on next write.
failedSender := ms.senders[0].(*dummyLoadSender)
failedSender.failOnSend = true
// Wrap next write in a routine. It will keep trying to send until we set
// failOnSend to false.
done := false
go func() {
ms.Write([]byte{0x01})
done = true
}()
// Now set failOnSend to false.
failedSender.mu.Lock()
failedSender.failOnSend = false
failedSender.mu.Unlock()
// Sleep and then check that we've successfully returned from the write.
time.Sleep(10 * time.Millisecond)
if done != true {
t.Fatal("did not exit write when send was successful")
}
// Write on last time.
ms.Write([]byte{0x02})
// Check that all the data is there.
got := failedSender.buf
want := [][]byte{{0x00}, {0x01}, {0x02}}
if !reflect.DeepEqual(got, want) {
t.Errorf("sender did not send expected data. \nGot: %v\nWant: %v\n", got, want)
}
}