revid: cleaned up documentation in senders.go and mtsSender_test.go

This commit is contained in:
Saxon 2019-03-01 13:28:34 +10:30
parent 8baff93918
commit 5eb832e6c3
2 changed files with 49 additions and 31 deletions

View File

@ -51,6 +51,8 @@ const (
rTimeout = 10 * time.Millisecond rTimeout = 10 * time.Millisecond
) )
// sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true.
type sender struct { type sender struct {
buf [][]byte buf [][]byte
testDiscontinuities bool testDiscontinuities bool
@ -58,6 +60,8 @@ type sender struct {
currentPkt int currentPkt int
} }
// send takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *sender) send(d []byte) error { func (ts *sender) send(d []byte) error {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.currentPkt++ ts.currentPkt++
@ -70,6 +74,8 @@ func (ts *sender) send(d []byte) error {
return nil return nil
} }
// log implements the required logging func for some of the structs in use
// within tests.
func log(lvl int8, msg string, args ...interface{}) { func log(lvl int8, msg string, args ...interface{}) {
var l string var l string
switch lvl { switch lvl {
@ -91,32 +97,38 @@ func log(lvl int8, msg string, args ...interface{}) {
fmt.Printf(msg, args) fmt.Printf(msg, args)
} }
// tstPacker implements io.Writer and handles the writing of data to the
// ringBuffer used in tests.
type tstPacker struct { type tstPacker struct {
rb *ring.Buffer rb *ring.Buffer
} }
// Write writes to tstPacker's ringBuffer.
func (p *tstPacker) Write(d []byte) (int, error) { func (p *tstPacker) Write(d []byte) (int, error) {
n, err := p.rb.Write(d) n, err := p.rb.Write(d)
p.rb.Flush() p.rb.Flush()
return n, err return n, err
} }
// TestSegment ensures that the mtsSender correctly segments data into clips
// based on positioning of PSI in the mtsEncoder's output stream.
func TestSegment(t *testing.T) { func TestSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringbuffer tst sender, loadsender and the mpegts encoder // Create ringBuffer, sender, loadsender and the MPEGTS encoder.
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
tstSender := &sender{} tstSender := &sender{}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstSender, log)
packer := &tstPacker{rb: rb} packer := &tstPacker{rb: rb}
encoder := mts.NewEncoder(packer, 25) encoder := mts.NewEncoder(packer, 25)
// Turn time based psi writing off for encoder // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100 const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet no // Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet no will be used.
encoder.Encode([]byte{byte(i)}) encoder.Encode([]byte{byte(i)})
rb.Flush() rb.Flush()
@ -143,13 +155,13 @@ func TestSegment(t *testing.T) {
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is the right length // Check that the clip is of expected length.
clipLen := len(clip) clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize { if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
} }
// Also check that the first packet is a PAT // Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize] firstPkt := clip[:mts.PacketSize]
var pkt [mts.PacketSize]byte var pkt [mts.PacketSize]byte
copy(pkt[:], firstPkt) copy(pkt[:], firstPkt)
@ -158,29 +170,28 @@ func TestSegment(t *testing.T) {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
} }
// Check that the clip data is okay // Check that the clip data is okay.
for i := 0; i < len(clip); i += mts.PacketSize { for i := 0; i < len(clip); i += mts.PacketSize {
firstPkt := clip[i : i+mts.PacketSize] firstPkt := clip[i : i+mts.PacketSize]
copy(pkt[:], firstPkt) copy(pkt[:], firstPkt)
p := (*packet.Packet)(&pkt) p := (*packet.Packet)(&pkt)
pid := p.PID() pid := p.PID()
if pid == mts.VideoPid { if pid == mts.VideoPid {
// Mts payload
payload, err := p.Payload() payload, err := p.Payload()
if err != nil { if err != nil {
t.Fatalf("Unexpected err: %v\n", err) t.Fatalf("Unexpected err: %v\n", err)
} }
// Parse pes from the mts payload // Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload) pes, err := pes.NewPESHeader(payload)
if err != nil { if err != nil {
t.Fatalf("Unexpected err: %v\n", err) t.Fatalf("Unexpected err: %v\n", err)
} }
// Get the data from the pes packet and convert to int // Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0]) data := int8(pes.Data()[0])
// Calc expected data in the pes and then check // Calc expected data in the PES and then check.
if data != int8(expectData) { if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
} }
@ -192,7 +203,7 @@ func TestSegment(t *testing.T) {
func TestSendFailDiscontinuity(t *testing.T) { func TestSendFailDiscontinuity(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringbuffer tst sender, loadsender and the mpegts encoder // Create ringBuffer sender, loadSender and the MPEGTS encoder.
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
const disconClipNo = 3 const disconClipNo = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: disconClipNo} tstSender := &sender{testDiscontinuities: true, discontinuityAt: disconClipNo}
@ -200,13 +211,13 @@ func TestSendFailDiscontinuity(t *testing.T) {
packer := tstPacker{rb: rb} packer := tstPacker{rb: rb}
encoder := mts.NewEncoder(&packer, 25) encoder := mts.NewEncoder(&packer, 25)
// Turn time based psi writing off for encoder // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100 const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet no // Our payload will just be packet no.
encoder.Encode([]byte{byte(i)}) encoder.Encode([]byte{byte(i)})
rb.Flush() rb.Flush()
@ -229,13 +240,13 @@ func TestSendFailDiscontinuity(t *testing.T) {
result := tstSender.buf result := tstSender.buf
// First check that we have less clips // First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result) gotLen := len(result)
if gotLen != expectedLen { if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
} }
// Now check that the discontonuity indicator is set at the disconClip pat // Now check that the discontinuity indicator is set at the discontinuityClip PAT.
disconClip := result[disconClipNo] disconClip := result[disconClipNo]
firstPkt := disconClip[:mts.PacketSize] firstPkt := disconClip[:mts.PacketSize]
var pkt [mts.PacketSize]byte var pkt [mts.PacketSize]byte

View File

@ -45,17 +45,21 @@ import (
"github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet"
) )
// Sender is intended to provided functionality for the sending of a byte slice
// to an implemented destination.
type Sender interface { type Sender interface {
// send takes the bytes slice d and sends to a particular destination as
// implemented.
send(d []byte) error send(d []byte) error
} }
// httpSender implements loadSender for posting HTTP to NetReceiver // minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
type minimalHttpSender struct { type minimalHttpSender struct {
client *netsender.Sender client *netsender.Sender
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
} }
// newMinimalHttpSender returns a pointer to a new minimalHttpSender.
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
return &minimalHttpSender{ return &minimalHttpSender{
client: ns, client: ns,
@ -63,6 +67,7 @@ func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, a
} }
} }
// send takes a bytes slice d and sends to http using s' http client.
func (s *minimalHttpSender) send(d []byte) error { func (s *minimalHttpSender) send(d []byte) error {
return httpSend(d, s.client, s.log) return httpSend(d, s.client, s.log)
} }
@ -127,10 +132,10 @@ func (s *fileSender) close() error {
return s.file.Close() return s.file.Close()
} }
// mtsSender provides sending capability specifically for use with // mtsSender implemented loadSender and provides sending capability specifically
// mpegts packetization. It handles the construction of appropriately lengthed // for use with MPEGTS packetization. It handles the construction of appropriately
// clips based on PSI. It also fixes accounts for discontinuities by setting // lengthed clips based on PSI. It also fixes accounts for discontinuities by
// the discontinuity indicator for the first packet of a clip. // setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct { type mtsSender struct {
sender Sender sender Sender
buf []byte buf []byte
@ -143,7 +148,7 @@ type mtsSender struct {
curPid int curPid int
} }
// newmtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{ return &mtsSender{
sender: s, sender: s,
@ -151,12 +156,13 @@ func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{}))
} }
} }
// load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. // load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and
// assigning to s.curPid. s.next if exists is also appended to the sender buf.
func (s *mtsSender) load(c *ring.Chunk) error { func (s *mtsSender) load(c *ring.Chunk) error {
s.chunk = c
if s.next != nil { if s.next != nil {
s.buf = append(s.buf, s.next...) s.buf = append(s.buf, s.next...)
} }
s.chunk = c
bytes := s.chunk.Bytes() bytes := s.chunk.Bytes()
cpy := make([]byte, len(bytes)) cpy := make([]byte, len(bytes))
copy(cpy, bytes) copy(cpy, bytes)
@ -168,8 +174,8 @@ func (s *mtsSender) load(c *ring.Chunk) error {
return nil return nil
} }
// send checks the most recently loaded packet and if it is a PAT then the clip // send checks the currently loaded paackets PID; if it is a PAT then what is in
// in s.buf is sent, otherwise the packet is added to s.buf. // the mtsSenders buffer is fixed and sent.
func (s *mtsSender) send() error { func (s *mtsSender) send() error {
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
err := s.fixAndSend() err := s.fixAndSend()
@ -183,8 +189,9 @@ func (s *mtsSender) send() error {
return nil return nil
} }
// fixAndSend uses the discontinuity repairer to ensure there is not a // fixAndSend checks for discontinuities in the senders buffer and then sends.
// discontinuity, and if so sets the discontinuity indicator of the PAT packet. // If a discontinuity is found the PAT packet at the start of the clip has it's
// discontintuity indicator set to true.
func (ms *mtsSender) fixAndSend() error { func (ms *mtsSender) fixAndSend() error {
err := ms.repairer.Repair(ms.buf) err := ms.repairer.Repair(ms.buf)
if err != nil { if err != nil {
@ -199,8 +206,8 @@ func (ms *mtsSender) fixAndSend() error {
func (s *mtsSender) close() error { return nil } func (s *mtsSender) close() error { return nil }
// release will set the s.fail flag to fals and clear the buffer if // release will set the s.fail flag to false and clear the buffer if
// the previous send was a fail. // the previous send was a fail. The currently loaded chunk is also closed.
func (s *mtsSender) release() { func (s *mtsSender) release() {
if s.failed { if s.failed {
s.buf = s.buf[:0] s.buf = s.buf[:0]