/* NAME mtsSender_test.go DESCRIPTION mtsSender_test.go contains tests that validate the functionalilty of the mtsSender under senders.go. Tests include checks that the mtsSender is segmenting sends correctly, and also that it can correct discontinuities. AUTHORS Saxon A. Nelson-Milton LICENSE mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid import ( "errors" "fmt" "testing" "time" "github.com/Comcast/gots/packet" "github.com/Comcast/gots/pes" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Ring buffer sizes and read/write timeouts. const ( rbSize = 100 rbElementSize = 150000 wTimeout = 10 * time.Millisecond rTimeout = 10 * time.Millisecond ) type sender struct { buf [][]byte tstDiscon bool disconAt int curPktNo int } func (ts *sender) send(d []byte) error { if ts.tstDiscon && ts.curPktNo == ts.disconAt { ts.curPktNo++ return errors.New("could not send") } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) ts.curPktNo++ return nil } func log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { case logger.Warning: l = "warning" case logger.Debug: l = "debug" case logger.Info: l = "info" case logger.Error: l = "error" case logger.Fatal: l = "fatal" } msg = l + ": " + msg for i := 0; i < len(args); i++ { msg += " %v" } fmt.Printf(msg, args) } type tstPacker struct { rb *ring.Buffer } func (p *tstPacker) Write(d []byte) (int, error) { n, err := p.rb.Write(d) p.rb.Flush() return n, err } func TestSegment(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &sender{} loadSender := newMtsSender(tstSender, log) packer := &tstPacker{rb: rb} encoder := mts.NewEncoder(packer, 25) // Turn time based psi writing off for encoder const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { // Our payload will just be packet no encoder.Encode([]byte{byte(i)}) rb.Flush() for { next, err := rb.Next(rTimeout) if err != nil { break } err = loadSender.load(next) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } err = loadSender.send() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } loadSender.release() } } result := tstSender.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) // Check that the clip is the right length clipLen := len(clip) if clipLen != psiSendCount*mts.PacketSize { t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } // Also check that the first packet is a PAT firstPkt := clip[:mts.PacketSize] var pkt [mts.PacketSize]byte copy(pkt[:], firstPkt) pid := (*packet.Packet)(&pkt).PID() if pid != mts.PatPid { t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) } // Check that the clip data is okay for i := 0; i < len(clip); i += mts.PacketSize { firstPkt := clip[i : i+mts.PacketSize] copy(pkt[:], firstPkt) p := (*packet.Packet)(&pkt) pid := p.PID() if pid == mts.VideoPid { // Mts payload payload, err := p.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } // Parse pes from the mts payload pes, err := pes.NewPESHeader(payload) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } // Get the data from the pes packet and convert to int data := int8(pes.Data()[0]) // Calc expected data in the pes and then check if data != int8(expectData) { t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) } expectData++ } } } } func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const disconClipNo = 3 tstSender := &sender{tstDiscon: true, disconAt: disconClipNo} loadSender := newMtsSender(tstSender, log) packer := tstPacker{rb: rb} encoder := mts.NewEncoder(&packer, 25) // Turn time based psi writing off for encoder const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { // Our payload will just be packet no encoder.Encode([]byte{byte(i)}) rb.Flush() for { next, err := rb.Next(rTimeout) if err != nil { break } err = loadSender.load(next) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } _ = loadSender.send() loadSender.release() } } result := tstSender.buf // First check that we have less clips expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 gotLen := len(result) if gotLen != expectedLen { t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } // Now check that the discontonuity indicator is set at the disconClip pat disconClip := result[disconClipNo] firstPkt := disconClip[:mts.PacketSize] var pkt [mts.PacketSize]byte copy(pkt[:], firstPkt) discon, err := (*packet.AdaptationField)((*packet.Packet)(&pkt)).Discontinuity() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } if !discon { t.Fatalf("Did not get discontinuity indicator for PAT") } }