av/revid/senders_test.go

268 lines
7.3 KiB
Go

/*
NAME
mtsSender_test.go
DESCRIPTION
mtsSender_test.go contains tests that validate the functionalilty of the
mtsSender under senders.go. Tests include checks that the mtsSender is
segmenting sends correctly, and also that it can correct discontinuities.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"testing"
"time"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/pes"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
// sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true.
type sender struct {
buf [][]byte
testDiscontinuities bool
discontinuityAt int
currentPkt int
}
// send takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *sender) send(d []byte) error {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.currentPkt++
return errors.New("could not send")
}
cpy := make([]byte, len(d))
copy(cpy, d)
ts.buf = append(ts.buf, cpy)
ts.currentPkt++
return nil
}
// log implements the required logging func for some of the structs in use
// within tests.
func log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
l = "warning"
case logger.Debug:
l = "debug"
case logger.Info:
l = "info"
case logger.Error:
l = "error"
case logger.Fatal:
l = "fatal"
}
msg = l + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
}
// TestSegment ensures that the mtsSender correctly segments data into clips
// based on positioning of PSI in the mtsEncoder's output stream.
func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{}
loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet number will be used.
encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
err = loadSender.send()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.release()
next.Close()
next = nil
}
}
result := tstSender.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay.
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0])
// Calc expected data in the PES and then check.
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
func TestMtsSenderDiscontinuity(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet number.
encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.send()
loadSender.release()
next.Close()
next = nil
}
}
result := tstSender.buf
// First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result)
if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
}
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
disconClip := result[clipWithDiscontinuity]
firstPkt := disconClip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
if !discon {
t.Fatalf("Did not get discontinuity indicator for PAT")
}
}
// TestNewMultiSender checks that newMultiSender performs as expected when an
// active function is not provided, and when an active function is provided.
func TestNewMultiSender(t *testing.T) {
// First test without giving an 'active' function.
_, err := newMultiSender(nil, false, nil)
if err == nil {
t.Error("did not get expected error on creation of multiSender without active function")
}
// Now test with providing an active function
_, err = newMultiSender(nil, false, func() bool { return true })
if err != nil {
t.Errorf("did not expect to get error on creation of multiSender with active func provided, err: %v", err)
}
}