diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index c5d91cdd..5c42e72c 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -51,6 +51,8 @@ const ( rTimeout = 10 * time.Millisecond ) +// sender simulates sending of video data, creating discontinuities if +// testDiscontinuities is set to true. type sender struct { buf [][]byte testDiscontinuities bool @@ -58,6 +60,8 @@ type sender struct { currentPkt int } +// send takes d and neglects if testDiscontinuities is true, returning an error, +// otherwise d is appended to senders buf. func (ts *sender) send(d []byte) error { if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { ts.currentPkt++ @@ -70,6 +74,8 @@ func (ts *sender) send(d []byte) error { return nil } +// log implements the required logging func for some of the structs in use +// within tests. func log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { @@ -91,32 +97,38 @@ func log(lvl int8, msg string, args ...interface{}) { fmt.Printf(msg, args) } +// tstPacker implements io.Writer and handles the writing of data to the +// ringBuffer used in tests. type tstPacker struct { rb *ring.Buffer } +// Write writes to tstPacker's ringBuffer. func (p *tstPacker) Write(d []byte) (int, error) { n, err := p.rb.Write(d) p.rb.Flush() return n, err } +// TestSegment ensures that the mtsSender correctly segments data into clips +// based on positioning of PSI in the mtsEncoder's output stream. func TestSegment(t *testing.T) { mts.Meta = meta.New() - // Create ringbuffer tst sender, loadsender and the mpegts encoder + // Create ringBuffer, sender, loadsender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &sender{} loadSender := newMtsSender(tstSender, log) packer := &tstPacker{rb: rb} encoder := mts.NewEncoder(packer, 25) - // Turn time based psi writing off for encoder + // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet no + // Insert a payload so that we check that the segmentation works correctly + // in this regard. Packet no will be used. encoder.Encode([]byte{byte(i)}) rb.Flush() @@ -143,13 +155,13 @@ func TestSegment(t *testing.T) { expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) - // Check that the clip is the right length + // Check that the clip is of expected length. clipLen := len(clip) if clipLen != psiSendCount*mts.PacketSize { t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } - // Also check that the first packet is a PAT + // Also check that the first packet is a PAT. firstPkt := clip[:mts.PacketSize] var pkt [mts.PacketSize]byte copy(pkt[:], firstPkt) @@ -158,29 +170,28 @@ func TestSegment(t *testing.T) { t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) } - // Check that the clip data is okay + // Check that the clip data is okay. for i := 0; i < len(clip); i += mts.PacketSize { firstPkt := clip[i : i+mts.PacketSize] copy(pkt[:], firstPkt) p := (*packet.Packet)(&pkt) pid := p.PID() if pid == mts.VideoPid { - // Mts payload payload, err := p.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - // Parse pes from the mts payload + // Parse PES from the MTS payload. pes, err := pes.NewPESHeader(payload) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - // Get the data from the pes packet and convert to int + // Get the data from the PES packet and convert to an int. data := int8(pes.Data()[0]) - // Calc expected data in the pes and then check + // Calc expected data in the PES and then check. if data != int8(expectData) { t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) } @@ -192,7 +203,7 @@ func TestSegment(t *testing.T) { func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() - // Create ringbuffer tst sender, loadsender and the mpegts encoder + // Create ringBuffer sender, loadSender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const disconClipNo = 3 tstSender := &sender{testDiscontinuities: true, discontinuityAt: disconClipNo} @@ -200,13 +211,13 @@ func TestSendFailDiscontinuity(t *testing.T) { packer := tstPacker{rb: rb} encoder := mts.NewEncoder(&packer, 25) - // Turn time based psi writing off for encoder + // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet no + // Our payload will just be packet no. encoder.Encode([]byte{byte(i)}) rb.Flush() @@ -229,13 +240,13 @@ func TestSendFailDiscontinuity(t *testing.T) { result := tstSender.buf - // First check that we have less clips + // First check that we have less clips as expected. expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 gotLen := len(result) if gotLen != expectedLen { t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } - // Now check that the discontonuity indicator is set at the disconClip pat + // Now check that the discontinuity indicator is set at the discontinuityClip PAT. disconClip := result[disconClipNo] firstPkt := disconClip[:mts.PacketSize] var pkt [mts.PacketSize]byte diff --git a/revid/senders.go b/revid/senders.go index 9c8619b3..46a5ad24 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,17 +45,21 @@ import ( "github.com/Comcast/gots/packet" ) +// Sender is intended to provided functionality for the sending of a byte slice +// to an implemented destination. type Sender interface { + // send takes the bytes slice d and sends to a particular destination as + // implemented. send(d []byte) error } -// httpSender implements loadSender for posting HTTP to NetReceiver +// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. type minimalHttpSender struct { client *netsender.Sender - - log func(lvl int8, msg string, args ...interface{}) + log func(lvl int8, msg string, args ...interface{}) } +// newMinimalHttpSender returns a pointer to a new minimalHttpSender. func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { return &minimalHttpSender{ client: ns, @@ -63,6 +67,7 @@ func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, a } } +// send takes a bytes slice d and sends to http using s' http client. func (s *minimalHttpSender) send(d []byte) error { return httpSend(d, s.client, s.log) } @@ -127,10 +132,10 @@ func (s *fileSender) close() error { return s.file.Close() } -// mtsSender provides sending capability specifically for use with -// mpegts packetization. It handles the construction of appropriately lengthed -// clips based on PSI. It also fixes accounts for discontinuities by setting -// the discontinuity indicator for the first packet of a clip. +// mtsSender implemented loadSender and provides sending capability specifically +// for use with MPEGTS packetization. It handles the construction of appropriately +// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { sender Sender buf []byte @@ -143,7 +148,7 @@ type mtsSender struct { curPid int } -// newmtsSender returns a new mtsSender. +// newMtsSender returns a new mtsSender. func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ sender: s, @@ -151,12 +156,13 @@ func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) } } -// load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. +// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and +// assigning to s.curPid. s.next if exists is also appended to the sender buf. func (s *mtsSender) load(c *ring.Chunk) error { - s.chunk = c if s.next != nil { s.buf = append(s.buf, s.next...) } + s.chunk = c bytes := s.chunk.Bytes() cpy := make([]byte, len(bytes)) copy(cpy, bytes) @@ -168,8 +174,8 @@ func (s *mtsSender) load(c *ring.Chunk) error { return nil } -// send checks the most recently loaded packet and if it is a PAT then the clip -// in s.buf is sent, otherwise the packet is added to s.buf. +// send checks the currently loaded paackets PID; if it is a PAT then what is in +// the mtsSenders buffer is fixed and sent. func (s *mtsSender) send() error { if s.curPid == mts.PatPid && len(s.buf) > 0 { err := s.fixAndSend() @@ -183,8 +189,9 @@ func (s *mtsSender) send() error { return nil } -// fixAndSend uses the discontinuity repairer to ensure there is not a -// discontinuity, and if so sets the discontinuity indicator of the PAT packet. +// fixAndSend checks for discontinuities in the senders buffer and then sends. +// If a discontinuity is found the PAT packet at the start of the clip has it's +// discontintuity indicator set to true. func (ms *mtsSender) fixAndSend() error { err := ms.repairer.Repair(ms.buf) if err != nil { @@ -199,8 +206,8 @@ func (ms *mtsSender) fixAndSend() error { func (s *mtsSender) close() error { return nil } -// release will set the s.fail flag to fals and clear the buffer if -// the previous send was a fail. +// release will set the s.fail flag to false and clear the buffer if +// the previous send was a fail. The currently loaded chunk is also closed. func (s *mtsSender) release() { if s.failed { s.buf = s.buf[:0]