diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 0329afe4..f89f31cd 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -34,6 +34,7 @@ import ( "time" "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" "github.com/Comcast/gots/packet" @@ -77,18 +78,24 @@ func log(lvl int8, msg string, args ...interface{}) { fmt.Printf(msg, args) } -type Chunk struct { - buf []byte - off int - owner *ring.Buffer +type tstPacker struct { + rb *ring.Buffer +} + +func (p *tstPacker) Write(d []byte) (int, error) { + n, err := p.rb.Write(d) + p.rb.Flush() + return n, err } func TestSegment(t *testing.T) { + mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &testSender{} loadSender := newMtsSender(tstSender, log) - encoder := mts.NewEncoder(rb, 25) + packer := tstPacker{rb: rb} + encoder := mts.NewEncoder(&packer, 25) // Turn time based psi writing off for encoder const psiSendCount = 10 @@ -98,21 +105,24 @@ func TestSegment(t *testing.T) { for i := 0; i < noOfPacketsToWrite; i++ { // Our payload will just be packet no encoder.Encode([]byte{byte(i)}) + rb.Flush() next, err := rb.Next(rTimeout) if err != nil { - t.Errorf("Unexpected err: %v\n", err) + t.Fatalf("Unexpected err: %v\n", err) } err = loadSender.load(next) if err != nil { - t.Errorf("Unexpected err: %v\n", err) + t.Fatalf("Unexpected err: %v\n", err) } err = loadSender.send() if err != nil { - t.Errorf("Unexpected err: %v\n", err) + t.Fatalf("Unexpected err: %v\n", err) } + + loadSender.release() } result := tstSender.Buf @@ -120,7 +130,7 @@ func TestSegment(t *testing.T) { // Check that the clip is the right length clipLen := len(clip) if clipLen != psiSendCount { - t.Errorf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount, clip) + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } // Also check that the first packet is a PAT @@ -129,7 +139,7 @@ func TestSegment(t *testing.T) { copy(pkt[:], firstPkt) pid := (*packet.Packet)(&pkt).PID() if pid != mts.PatPid { - t.Errorf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) + t.Fatalf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) } } } diff --git a/revid/revid.go b/revid/revid.go index 2016c53f..fa594566 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -131,6 +131,32 @@ type packer struct { packetCount uint } +// Write implements the io.Writer interface. +// +// Unless the ring buffer returns an error, all writes +// are deemed to be successful, although a successful +// write may include a dropped frame. +func (p *packer) Write(frame []byte) (int, error) { + if len(frame) > ringBufferElementSize { + p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) + return len(frame), nil + } + + n, err := p.owner.buffer.Write(frame) + if err != nil { + if err == ring.ErrDropped { + p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) + return len(frame), nil + } + p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) + return n, err + } + + p.owner.buffer.Flush() + + return len(frame), nil +} + // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -263,13 +289,13 @@ func (r *Revid) reset(config Config) error { } } } - r.encoder = stream.NopEncoder(r.buffer) + r.encoder = stream.NopEncoder(&r.packer) case Mpegts: r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation") - r.encoder = mts.NewEncoder(r.buffer, float64(r.config.FrameRate)) + r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate)) case Flv: r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation") - r.encoder, err = flv.NewEncoder(r.buffer, true, true, int(r.config.FrameRate)) + r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate)) if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error()) } diff --git a/revid/senders.go b/revid/senders.go index ba1dac93..870cf15f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -169,7 +169,6 @@ func (s *mtsSender) send() error { } s.discard = false } - if pid == mts.VideoPid { expect, exists := s.repairer.ExpectedCC(pid) if !exists { @@ -181,8 +180,7 @@ func (s *mtsSender) send() error { return nil } } - - if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { + if s.fail || (pid == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() if err != nil { s.failed() diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index f1a11263..e262abe8 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,6 +171,7 @@ const ( func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { e.timeBasedPsi = b e.psiSendCount = sendCount + e.pktCount = e.psiSendCount + 1 } // generate handles the incoming data and generates equivalent mpegts packets - diff --git a/stream/mts/meta/meta.go b/stream/mts/meta/meta.go index 481b5ae5..188c2d4e 100644 --- a/stream/mts/meta/meta.go +++ b/stream/mts/meta/meta.go @@ -144,6 +144,9 @@ func (m *Data) Delete(key string) { // Encode takes the meta data map and encodes into a byte slice with header // describing the version, length of data and data in TSV format. func (m *Data) Encode() []byte { + if m.enc == nil { + panic("Meta has not been initialized yet") + } m.enc = m.enc[:headSize] // Iterate over map and append entries, only adding tab if we're not on the