stream/mts: made modifications such that the segment test is passing

This commit is contained in:
saxon 2019-02-16 23:52:40 +10:30
parent 3356457c71
commit a5cb1c5abb
5 changed files with 54 additions and 16 deletions

View File

@ -34,6 +34,7 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet"
@ -77,18 +78,24 @@ func log(lvl int8, msg string, args ...interface{}) {
fmt.Printf(msg, args) fmt.Printf(msg, args)
} }
type Chunk struct { type tstPacker struct {
buf []byte rb *ring.Buffer
off int }
owner *ring.Buffer
func (p *tstPacker) Write(d []byte) (int, error) {
n, err := p.rb.Write(d)
p.rb.Flush()
return n, err
} }
func TestSegment(t *testing.T) { func TestSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringbuffer tst sender, loadsender and the mpegts encoder // Create ringbuffer tst sender, loadsender and the mpegts encoder
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
tstSender := &testSender{} tstSender := &testSender{}
loadSender := newMtsSender(tstSender, log) loadSender := newMtsSender(tstSender, log)
encoder := mts.NewEncoder(rb, 25) packer := tstPacker{rb: rb}
encoder := mts.NewEncoder(&packer, 25)
// Turn time based psi writing off for encoder // Turn time based psi writing off for encoder
const psiSendCount = 10 const psiSendCount = 10
@ -98,21 +105,24 @@ func TestSegment(t *testing.T) {
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet no // Our payload will just be packet no
encoder.Encode([]byte{byte(i)}) encoder.Encode([]byte{byte(i)})
rb.Flush()
next, err := rb.Next(rTimeout) next, err := rb.Next(rTimeout)
if err != nil { if err != nil {
t.Errorf("Unexpected err: %v\n", err) t.Fatalf("Unexpected err: %v\n", err)
} }
err = loadSender.load(next) err = loadSender.load(next)
if err != nil { if err != nil {
t.Errorf("Unexpected err: %v\n", err) t.Fatalf("Unexpected err: %v\n", err)
} }
err = loadSender.send() err = loadSender.send()
if err != nil { if err != nil {
t.Errorf("Unexpected err: %v\n", err) t.Fatalf("Unexpected err: %v\n", err)
} }
loadSender.release()
} }
result := tstSender.Buf result := tstSender.Buf
@ -120,7 +130,7 @@ func TestSegment(t *testing.T) {
// Check that the clip is the right length // Check that the clip is the right length
clipLen := len(clip) clipLen := len(clip)
if clipLen != psiSendCount { if clipLen != psiSendCount {
t.Errorf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount, clip) t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
} }
// Also check that the first packet is a PAT // Also check that the first packet is a PAT
@ -129,7 +139,7 @@ func TestSegment(t *testing.T) {
copy(pkt[:], firstPkt) copy(pkt[:], firstPkt)
pid := (*packet.Packet)(&pkt).PID() pid := (*packet.Packet)(&pkt).PID()
if pid != mts.PatPid { if pid != mts.PatPid {
t.Errorf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) t.Fatalf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid)
} }
} }
} }

View File

@ -131,6 +131,32 @@ type packer struct {
packetCount uint packetCount uint
} }
// Write implements the io.Writer interface.
//
// Unless the ring buffer returns an error, all writes
// are deemed to be successful, although a successful
// write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil
}
n, err := p.owner.buffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil
}
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
}
p.owner.buffer.Flush()
return len(frame), nil
}
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful. // an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
@ -263,13 +289,13 @@ func (r *Revid) reset(config Config) error {
} }
} }
} }
r.encoder = stream.NopEncoder(r.buffer) r.encoder = stream.NopEncoder(&r.packer)
case Mpegts: case Mpegts:
r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation") r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation")
r.encoder = mts.NewEncoder(r.buffer, float64(r.config.FrameRate)) r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate))
case Flv: case Flv:
r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation") r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
r.encoder, err = flv.NewEncoder(r.buffer, true, true, int(r.config.FrameRate)) r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
if err != nil { if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error())
} }

View File

@ -169,7 +169,6 @@ func (s *mtsSender) send() error {
} }
s.discard = false s.discard = false
} }
if pid == mts.VideoPid { if pid == mts.VideoPid {
expect, exists := s.repairer.ExpectedCC(pid) expect, exists := s.repairer.ExpectedCC(pid)
if !exists { if !exists {
@ -181,8 +180,7 @@ func (s *mtsSender) send() error {
return nil return nil
} }
} }
if s.fail || (pid == mts.PatPid && len(s.buf) != 0) {
if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) {
err := s.fixAndSend() err := s.fixAndSend()
if err != nil { if err != nil {
s.failed() s.failed()

View File

@ -171,6 +171,7 @@ const (
func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
e.timeBasedPsi = b e.timeBasedPsi = b
e.psiSendCount = sendCount e.psiSendCount = sendCount
e.pktCount = e.psiSendCount + 1
} }
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -

View File

@ -144,6 +144,9 @@ func (m *Data) Delete(key string) {
// Encode takes the meta data map and encodes into a byte slice with header // Encode takes the meta data map and encodes into a byte slice with header
// describing the version, length of data and data in TSV format. // describing the version, length of data and data in TSV format.
func (m *Data) Encode() []byte { func (m *Data) Encode() []byte {
if m.enc == nil {
panic("Meta has not been initialized yet")
}
m.enc = m.enc[:headSize] m.enc = m.enc[:headSize]
// Iterate over map and append entries, only adding tab if we're not on the // Iterate over map and append entries, only adding tab if we're not on the