From 7d03830a4eed27d4bf5a595f28b84223eb4b624a Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 12:01:07 +1030 Subject: [PATCH 01/43] revid & stream/mts: senders now handle clip duration and cc fixing for mts. Removed rtpSender as we now put in loadSender slice. Removed packer write method as no longer required to do clip duration logic. For initialisation of encoders, they are now no longer writing to packer, but now straight to the ring buffer. Wrote ausOceanSender which will deal with clip logic and cc fixing for mts. Added some functionality to mts/mpegts.go that allows adding of adaptation fields to mts packets. --- revid/revid.go | 63 ++++---------------------------- revid/senders.go | 86 ++++++++++++++++++++++++++++++++++++++------ stream/mts/mpegts.go | 72 ++++++++++++++++++++++++++++++++++--- 3 files changed, 148 insertions(+), 73 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index e426272a..b35c3890 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -112,10 +112,6 @@ type Revid struct { // destination is the target endpoint. destination []loadSender - // rtpSender is an unbuffered sender. - // It is used to isolate RTP from ring buffer-induced delays. - rtpSender *rtpSender - // bitrate hold the last send bitrate calculation result. bitrate int @@ -135,54 +131,6 @@ type packer struct { packetCount uint } -// Write implements the io.Writer interface. -// -// Unless the ring buffer returns an error, all writes -// are deemed to be successful, although a successful -// write may include a dropped frame. -func (p *packer) Write(frame []byte) (int, error) { - if len(frame) > ringBufferElementSize { - p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) - return len(frame), nil - } - - if len(p.owner.destination) != 0 { - n, err := p.owner.buffer.Write(frame) - if err != nil { - if err == ring.ErrDropped { - p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) - return len(frame), nil - } - p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) - return n, err - } - } - - // If we have an rtp sender bypass ringbuffer and give straight to sender - if p.owner.rtpSender != nil { - err := p.owner.rtpSender.send(frame) - if err != nil { - p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) - } - } - - p.packetCount++ - var hasRtmp bool - for _, d := range p.owner.config.Outputs { - if d == Rtmp { - hasRtmp = true - break - } - } - now := time.Now() - if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { - p.owner.buffer.Flush() - p.packetCount = 0 - p.lastTime = now - } - return len(frame), nil -} - // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -239,7 +187,7 @@ func (r *Revid) reset(config Config) error { } } - r.destination = r.destination[:0] + r.destination = make([]loadSender, len(r.config.Outputs)) for _, typ := range r.config.Outputs { switch typ { case File: @@ -269,10 +217,11 @@ func (r *Revid) reset(config Config) error { } r.destination = append(r.destination, s) case Rtp: - r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) + s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { return err } + r.destination = append(r.destination, s) } } @@ -309,13 +258,13 @@ func (r *Revid) reset(config Config) error { } } } - r.encoder = stream.NopEncoder(&r.packer) + r.encoder = stream.NopEncoder(r.buffer) case Mpegts: r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation") - r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate)) + r.encoder = mts.NewEncoder(r.buffer, float64(r.config.FrameRate)) case Flv: r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation") - r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate)) + r.encoder, err = flv.NewEncoder(r.buffer, true, true, int(r.config.FrameRate)) if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error()) } diff --git a/revid/senders.go b/revid/senders.go index e86cb24b..f94e6a6c 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "errors" "fmt" "io" "net" @@ -43,6 +42,7 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" + "github.com/Comcast/gots/packet" ) // loadSender is a destination to send a *ring.Chunk to. @@ -105,6 +105,62 @@ func (s *fileSender) close() error { return s.file.Close() } +type ausOceanSender struct { + hs *httpSender + buf []byte + pkt [mts.PacketSize]byte + sendFailed bool + repairer *mts.DiscontinuityRepairer +} + +func newAusOceanSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *ausOceanSender { + return &ausOceanSender{ + hs: newHttpSender(ns, log), + repairer: mts.NewDiscontinuityRepairer(), + } +} + +func (s *ausOceanSender) load(c *ring.Chunk) error { + copy(s.pkt[:], c.Bytes()) + return nil +} + +func (s *ausOceanSender) send() error { + if s.sendFailed || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { + err := s.fixAndSend() + if err != nil { + s.failed() + return err + } + s.sendFailed = false + s.buf = s.buf[:0] + } + s.buf = append(s.buf, s.pkt[:]...) + return nil +} + +func (s *ausOceanSender) failed() { + s.sendFailed = true + s.repairer.Failed() +} + +func (s *ausOceanSender) fixAndSend() error { + err := s.repairer.Repair(s.buf) + if err != nil { + return err + } + return s.hs.httpSend(s.buf) +} + +func (s *ausOceanSender) close() error { return nil } + +func (s *ausOceanSender) release() { + if s.sendFailed { + s.sendFailed = false + s.buf = s.buf[:0] + } +} + // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender @@ -133,6 +189,10 @@ func (s *httpSender) send() error { // if the chunk has been cleared. return nil } + return s.httpSend(s.chunk.Bytes()) +} + +func (s *httpSender) httpSend(d []byte) error { // Only send if "V0" is configured as an input. send := false ip := s.client.Param("ip") @@ -140,8 +200,8 @@ func (s *httpSender) send() error { for i, pin := range pins { if pin.Name == "V0" { send = true - pins[i].Value = s.chunk.Len() - pins[i].Data = s.chunk.Bytes() + pins[i].Value = len(d) + pins[i].Data = d pins[i].MimeType = "video/mp2t" break } @@ -156,7 +216,6 @@ func (s *httpSender) send() error { if err != nil { return err } - return s.extractMeta(reply) } @@ -370,6 +429,7 @@ func (s *udpSender) close() error { return nil } type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder + chunk *ring.Chunk } func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { @@ -384,12 +444,16 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ return s, nil } -func (s *rtpSender) send(d []byte) error { - var err error - if d != nil { - _, err = s.encoder.Write(d) - } else { - err = errors.New("no data to send provided") - } +func (s *rtpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtpSender) close() error { return nil } + +func (s *rtpSender) release() {} + +func (s *rtpSender) send() error { + _, err := s.chunk.WriteTo(s.encoder) return err } diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 08791af7..0cfa0ca9 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -30,6 +30,8 @@ package mts import ( "errors" + + "github.com/Comcast/gots/packet" ) // General mpegts packet properties. @@ -54,11 +56,14 @@ const HeadSize = 4 // Consts relating to adaptation field. const ( - AdaptationIdx = 4 // Index to the adaptation field (index of AFL). - AdaptationControlIdx = 3 // Index to octet with adaptation field control. - AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. - DefaultAdaptationSize = 2 // Default size of the adaptation field. - AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. + AdaptationIdx = 4 // Index to the adaptation field (index of AFL). + AdaptationControlIdx = 3 // Index to octet with adaptation field control. + AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. + DefaultAdaptationSize = 2 // Default size of the adaptation field. + AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. + DefaultAdaptationBodySize = 1 + DiscontinuityIndicatorMask = 0x80 + DiscontinuityIndicatorIdx = AdaptationIdx + 1 ) // TODO: make this better - currently doesn't make sense. @@ -244,3 +249,60 @@ func (p *Packet) Bytes(buf []byte) []byte { buf = append(buf, p.Payload...) return buf } + +type Option func(p *[PacketSize]byte) + +// addAdaptationField adds an adaptation field to p, and applys the passed options to this field. +// TODO: this will probably break if we already have adaptation field. +func addAdaptationField(p *[PacketSize]byte, options ...Option) error { + b, err := packet.ContainsAdaptationField((*packet.Packet)(p)) + if err != nil { + return err + } + if b { + return errors.New("Adaptation field is already present in packet") + } + // Create space for adaptation field. + copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize]) + + // TODO: seperate into own function + // Update adaptation field control. + p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask + p[AdaptationControlIdx] |= AdaptationControlMask + // Default the adaptationfield. + resetAdaptation(p) + + // Apply and options that have bee passed. + for _, option := range options { + option(p) + } + return nil +} + +// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field +// exists, otherwise an error is returned. +func resetAdaptation(p *[PacketSize]byte) error { + b, err := packet.ContainsAdaptationField((*packet.Packet)(p)) + if err != nil { + return err + } + if !b { + return errors.New("No adaptation field in this packet") + } + p[AdaptationIdx] = DefaultAdaptationBodySize + p[AdaptationIdx+1] = 0x00 + return nil +} + +// DiscontinuityIndicator returns and Option that will set p's discontinuity +// indicator according to f. +func DiscontinuityIndicator(f bool) Option { + return func(p *[PacketSize]byte) { + set := byte(DiscontinuityIndicatorMask) + if !f { + set = 0x00 + } + p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask + p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set + } +} From d3a8bb20de9cfded86f19274dd331d8da2269d9e Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 13:05:48 +1030 Subject: [PATCH 02/43] revid: using ausOceanSender if we're doing http output with mpegtrs packetization --- revid/revid.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index b35c3890..4ead6163 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -209,7 +209,12 @@ func (r *Revid) reset(config Config) error { } r.destination = append(r.destination, s) case Http: - r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) + switch r.Config().Packetization { + case Mpegts: + r.destination = append(r.destination, newAusOceanSender(r.ns, r.config.Logger.Log)) + default: + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) + } case Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { From 694ec5d009ce3a579be3c10721bd11582776d382 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 13:43:01 +1030 Subject: [PATCH 03/43] stream/mts: fixed build errors --- stream/mts/mpegts.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 0cfa0ca9..23fcee13 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -255,11 +255,7 @@ type Option func(p *[PacketSize]byte) // addAdaptationField adds an adaptation field to p, and applys the passed options to this field. // TODO: this will probably break if we already have adaptation field. func addAdaptationField(p *[PacketSize]byte, options ...Option) error { - b, err := packet.ContainsAdaptationField((*packet.Packet)(p)) - if err != nil { - return err - } - if b { + if packet.ContainsAdaptationField((*packet.Packet)(p)) { return errors.New("Adaptation field is already present in packet") } // Create space for adaptation field. @@ -282,11 +278,7 @@ func addAdaptationField(p *[PacketSize]byte, options ...Option) error { // resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field // exists, otherwise an error is returned. func resetAdaptation(p *[PacketSize]byte) error { - b, err := packet.ContainsAdaptationField((*packet.Packet)(p)) - if err != nil { - return err - } - if !b { + if !packet.ContainsAdaptationField((*packet.Packet)(p)) { return errors.New("No adaptation field in this packet") } p[AdaptationIdx] = DefaultAdaptationBodySize From 6964ac513e6319e9506e75eb23612c0528dda04e Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 13:49:37 +1030 Subject: [PATCH 04/43] stream/mts: adding discontinuity.go which contains discontinuity repairer --- stream/mts/discontinuity.go | 99 +++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 stream/mts/discontinuity.go diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go new file mode 100644 index 00000000..69378916 --- /dev/null +++ b/stream/mts/discontinuity.go @@ -0,0 +1,99 @@ +/* +NAME + discontinuity.go + +DESCRIPTION + discontinuity.go provides functionality for detecting discontinuities in + mpegts and accounting for using the discontinuity indicator in the adaptation + field. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + discontinuity.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package mts + +import "github.com/Comcast/gots/packet" + +// discontinuityRepairer provides function to detect discontinuities in mpegts +// and set the discontinuity indicator as appropriate. +type DiscontinuityRepairer struct { + expCC uint8 +} + +// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer +func NewDiscontinuityRepairer() *DiscontinuityRepairer { + return &DiscontinuityRepairer{expCC: 16} +} + +func (dr *DiscontinuityRepairer) Failed() { + dr.decExpectedCC() +} + +// Repair takes a slice of mpegts +func (dr *DiscontinuityRepairer) Repair(d []byte) error { + var pkt [PacketSize]byte + copy(pkt[:], d[:PacketSize]) + p := (*packet.Packet)(&pkt) + if p.PID() != PatPid { + panic("Clip to repair must have PAT first") + } + cc := p.ContinuityCounter() + expect, exists := dr.expectedCC() + dr.incExpectedCC() + if !exists { + dr.setExpectedCC(uint8(cc)) + } else if cc != int(expect) { + if packet.ContainsAdaptationField(p) { + (*packet.AdaptationField)(p).SetDiscontinuity(true) + } else { + err := addAdaptationField(&pkt, DiscontinuityIndicator(true)) + if err != nil { + return err + } + } + dr.setExpectedCC(uint8(cc)) + copy(d[:PacketSize], pkt[:]) + } + return nil +} + +// expectedCC returns the expected cc for the given pid. If the cc hasn't been +// used yet, then 16 and false is returned. +func (dr *DiscontinuityRepairer) expectedCC() (byte, bool) { + if dr.expCC == 16 { + return 16, false + } + return dr.expCC, true +} + +// incExpectedCC increments the expected cc in dr's cc map for the given pid. +func (dr *DiscontinuityRepairer) incExpectedCC() { + dr.expCC = (dr.expCC + 1) & 0xf +} + +// decExpectedCC decrements the expected cc in dr's cc map for the given pid. +func (dr *DiscontinuityRepairer) decExpectedCC() { + dr.expCC = (dr.expCC - 1) & 0xf +} + +// setExpectedCC sets the expected cc in dr's cc map for the given pid, and cc. +func (dr *DiscontinuityRepairer) setExpectedCC(cc uint8) { + dr.expCC = cc +} From 3aa94887ebd2342073396a56789cceeb8de476f5 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 14:33:18 +1030 Subject: [PATCH 05/43] revid/senders.go: commenting of ausOceanSender --- revid/senders.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/revid/senders.go b/revid/senders.go index f94e6a6c..f34e9588 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -105,6 +105,10 @@ func (s *fileSender) close() error { return s.file.Close() } +// ausOceanSender provides http sending capability specifically for use with +// mpegts packetization. It handles the construction of appropriately lengthed +// clips based on PSI. It also fixes accounts for discontinuities by setting +// the discontinuity indicator for the first packet of a clip. type ausOceanSender struct { hs *httpSender buf []byte @@ -113,6 +117,7 @@ type ausOceanSender struct { repairer *mts.DiscontinuityRepairer } +// newAusOceanSender returns a new ausOceanSender. func newAusOceanSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *ausOceanSender { return &ausOceanSender{ hs: newHttpSender(ns, log), @@ -120,11 +125,14 @@ func newAusOceanSender(ns *netsender.Sender, log func(lvl int8, msg string, args } } +// load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. func (s *ausOceanSender) load(c *ring.Chunk) error { copy(s.pkt[:], c.Bytes()) return nil } +// send checks the most recently loaded packet and if it is a PAT then the clip +// in s.buf is sent, otherwise the packet is added to s.buf. func (s *ausOceanSender) send() error { if s.sendFailed || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() @@ -139,11 +147,15 @@ func (s *ausOceanSender) send() error { return nil } +// failed sets the s.sendFailed flag to true, and let's the discontinuity +// repairer know that there has been a failed send. func (s *ausOceanSender) failed() { s.sendFailed = true s.repairer.Failed() } +// fixAndSend uses the discontinuity repairer to ensure there is not a +// discontinuity, and if so sets the discontinuity indicator of the PAT packet. func (s *ausOceanSender) fixAndSend() error { err := s.repairer.Repair(s.buf) if err != nil { @@ -154,6 +166,8 @@ func (s *ausOceanSender) fixAndSend() error { func (s *ausOceanSender) close() error { return nil } +// release will set the s.sendFailed flag to fals and clear the buffer if +// the previous send was a fail. func (s *ausOceanSender) release() { if s.sendFailed { s.sendFailed = false From 404436883721792787b073971aeb7c359d341275 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 22:24:07 +1030 Subject: [PATCH 06/43] stream/mts: fixed commenting in discontinuity.go --- stream/mts/discontinuity.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 69378916..5187dcb8 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -37,16 +37,20 @@ type DiscontinuityRepairer struct { expCC uint8 } -// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer +// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer. func NewDiscontinuityRepairer() *DiscontinuityRepairer { return &DiscontinuityRepairer{expCC: 16} } +// Failed is to be called in the case of a failed send. This will decrement the +// expectedCC so that it aligns with the failed chunks cc. func (dr *DiscontinuityRepairer) Failed() { dr.decExpectedCC() } -// Repair takes a slice of mpegts +// Repair takes a clip of mpegts and checks that the first packet, which should +// be a PAT, contains a cc that is expected, otherwise the discontinuity indicator +// is set to true. func (dr *DiscontinuityRepairer) Repair(d []byte) error { var pkt [PacketSize]byte copy(pkt[:], d[:PacketSize]) @@ -74,8 +78,8 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { return nil } -// expectedCC returns the expected cc for the given pid. If the cc hasn't been -// used yet, then 16 and false is returned. +// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16 +// and false is returned. func (dr *DiscontinuityRepairer) expectedCC() (byte, bool) { if dr.expCC == 16 { return 16, false @@ -83,17 +87,17 @@ func (dr *DiscontinuityRepairer) expectedCC() (byte, bool) { return dr.expCC, true } -// incExpectedCC increments the expected cc in dr's cc map for the given pid. +// incExpectedCC increments the expected cc. func (dr *DiscontinuityRepairer) incExpectedCC() { dr.expCC = (dr.expCC + 1) & 0xf } -// decExpectedCC decrements the expected cc in dr's cc map for the given pid. +// decExpectedCC decrements the expected cc. func (dr *DiscontinuityRepairer) decExpectedCC() { dr.expCC = (dr.expCC - 1) & 0xf } -// setExpectedCC sets the expected cc in dr's cc map for the given pid, and cc. +// setExpectedCC sets the expected cc. func (dr *DiscontinuityRepairer) setExpectedCC(cc uint8) { dr.expCC = cc } From 31d36577b12404952fc96a729fdf370fee4cab03 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 23:17:13 +1030 Subject: [PATCH 07/43] stream/mts: ausOceanSender => mtsSender --- revid/revid.go | 2 +- revid/senders.go | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 4ead6163..de59b7a8 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -211,7 +211,7 @@ func (r *Revid) reset(config Config) error { case Http: switch r.Config().Packetization { case Mpegts: - r.destination = append(r.destination, newAusOceanSender(r.ns, r.config.Logger.Log)) + r.destination = append(r.destination, newMtsSender(r.ns, r.config.Logger.Log)) default: r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) } diff --git a/revid/senders.go b/revid/senders.go index f34e9588..0bda8f23 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -105,11 +105,11 @@ func (s *fileSender) close() error { return s.file.Close() } -// ausOceanSender provides http sending capability specifically for use with +// mtsSender provides http sending capability specifically for use with // mpegts packetization. It handles the construction of appropriately lengthed // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. -type ausOceanSender struct { +type mtsSender struct { hs *httpSender buf []byte pkt [mts.PacketSize]byte @@ -117,23 +117,23 @@ type ausOceanSender struct { repairer *mts.DiscontinuityRepairer } -// newAusOceanSender returns a new ausOceanSender. -func newAusOceanSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *ausOceanSender { - return &ausOceanSender{ +// newmtsSender returns a new mtsSender. +func newMtsSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { + return &mtsSender{ hs: newHttpSender(ns, log), repairer: mts.NewDiscontinuityRepairer(), } } // load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. -func (s *ausOceanSender) load(c *ring.Chunk) error { +func (s *mtsSender) load(c *ring.Chunk) error { copy(s.pkt[:], c.Bytes()) return nil } // send checks the most recently loaded packet and if it is a PAT then the clip // in s.buf is sent, otherwise the packet is added to s.buf. -func (s *ausOceanSender) send() error { +func (s *mtsSender) send() error { if s.sendFailed || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() if err != nil { @@ -149,14 +149,14 @@ func (s *ausOceanSender) send() error { // failed sets the s.sendFailed flag to true, and let's the discontinuity // repairer know that there has been a failed send. -func (s *ausOceanSender) failed() { +func (s *mtsSender) failed() { s.sendFailed = true s.repairer.Failed() } // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. -func (s *ausOceanSender) fixAndSend() error { +func (s *mtsSender) fixAndSend() error { err := s.repairer.Repair(s.buf) if err != nil { return err @@ -164,11 +164,11 @@ func (s *ausOceanSender) fixAndSend() error { return s.hs.httpSend(s.buf) } -func (s *ausOceanSender) close() error { return nil } +func (s *mtsSender) close() error { return nil } // release will set the s.sendFailed flag to fals and clear the buffer if // the previous send was a fail. -func (s *ausOceanSender) release() { +func (s *mtsSender) release() { if s.sendFailed { s.sendFailed = false s.buf = s.buf[:0] From b6bf41b17d9ba60ccc0d8c0b16b75ceab5ac984e Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 23:35:45 +1030 Subject: [PATCH 08/43] stream/mts: made mtsSender more general The mtsSender can now use any loadSender to send fixed and segmented mts data to. --- revid/revid.go | 2 +- revid/senders.go | 49 +++++++++++++++++++++++++++++------------------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index de59b7a8..190d2c59 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -211,7 +211,7 @@ func (r *Revid) reset(config Config) error { case Http: switch r.Config().Packetization { case Mpegts: - r.destination = append(r.destination, newMtsSender(r.ns, r.config.Logger.Log)) + r.destination = append(r.destination, newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil)) default: r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) } diff --git a/revid/senders.go b/revid/senders.go index 0bda8f23..5a20b94b 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -105,72 +105,83 @@ func (s *fileSender) close() error { return s.file.Close() } -// mtsSender provides http sending capability specifically for use with +// mtsSender provides sending capability specifically for use with // mpegts packetization. It handles the construction of appropriately lengthed // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. type mtsSender struct { - hs *httpSender - buf []byte - pkt [mts.PacketSize]byte - sendFailed bool - repairer *mts.DiscontinuityRepairer + ls loadSender + buf []byte + pkt [mts.PacketSize]byte + fail bool + repairer *mts.DiscontinuityRepairer + chunk *ring.Chunk } // newmtsSender returns a new mtsSender. -func newMtsSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { +func newMtsSender(s loadSender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ - hs: newHttpSender(ns, log), + ls: s, repairer: mts.NewDiscontinuityRepairer(), } } // load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. func (s *mtsSender) load(c *ring.Chunk) error { - copy(s.pkt[:], c.Bytes()) + s.chunk = c return nil } // send checks the most recently loaded packet and if it is a PAT then the clip // in s.buf is sent, otherwise the packet is added to s.buf. func (s *mtsSender) send() error { - if s.sendFailed || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { + copy(s.pkt[:], s.chunk.Bytes()) + if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() if err != nil { s.failed() return err } - s.sendFailed = false + s.fail = false s.buf = s.buf[:0] } - s.buf = append(s.buf, s.pkt[:]...) + s.buf = append(s.buf, s.chunk.Bytes()...) return nil } -// failed sets the s.sendFailed flag to true, and let's the discontinuity +// failed sets the s.fail flag to true, and let's the discontinuity // repairer know that there has been a failed send. func (s *mtsSender) failed() { - s.sendFailed = true + s.fail = true s.repairer.Failed() } // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. func (s *mtsSender) fixAndSend() error { - err := s.repairer.Repair(s.buf) + err := s.repairer.Repair(s.chunk.Bytes()) if err != nil { return err } - return s.hs.httpSend(s.buf) + err = s.ls.load(s.chunk) + if err != nil { + return err + } + err = s.ls.send() + if err != nil { + return err + } + s.ls.release() + return nil } func (s *mtsSender) close() error { return nil } -// release will set the s.sendFailed flag to fals and clear the buffer if +// release will set the s.fail flag to fals and clear the buffer if // the previous send was a fail. func (s *mtsSender) release() { - if s.sendFailed { - s.sendFailed = false + if s.fail { + s.fail = false s.buf = s.buf[:0] } } From bea0000340227528cd72daaa78091ec35277e68c Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 15 Feb 2019 23:55:51 +1030 Subject: [PATCH 09/43] stream/mts: creating ring.Chunk so that we can use another loadSender --- revid/senders.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index 5a20b94b..65374fd2 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -159,11 +159,11 @@ func (s *mtsSender) failed() { // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. func (s *mtsSender) fixAndSend() error { - err := s.repairer.Repair(s.chunk.Bytes()) + err := s.repairer.Repair(s.buf) if err != nil { return err } - err = s.ls.load(s.chunk) + err = s.ls.load(ring.NewChunk(s.buf)) if err != nil { return err } From 281aa47fd5a61c8c182af1b7075a88f0a91c1a17 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 16 Feb 2019 01:40:35 +1030 Subject: [PATCH 10/43] stream/mts: fixing discontinuities that could be caused by ringbuffer --- revid/mtsSender_test.go | 39 +++++++++++++++++++++++++++++++++++++ revid/senders.go | 20 +++++++++++++++++++ stream/mts/discontinuity.go | 2 +- 3 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 revid/mtsSender_test.go diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go new file mode 100644 index 00000000..8c925b8d --- /dev/null +++ b/revid/mtsSender_test.go @@ -0,0 +1,39 @@ +/* +NAME + mtsSender_test.go + +DESCRIPTION + mtsSender_test.go contains tests that validate the functionalilty of the + mtsSender under senders.go. Tests include checks that the mtsSender is + segmenting sends correctly, and also that it can correct discontinuities. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package revid + +import "testing" + +func TestSegment(t *testing.T) { + +} + +func TestDiscontinuity(t *testing.T) { + +} diff --git a/revid/senders.go b/revid/senders.go index 65374fd2..6018ad97 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -114,6 +114,7 @@ type mtsSender struct { buf []byte pkt [mts.PacketSize]byte fail bool + discard bool repairer *mts.DiscontinuityRepairer chunk *ring.Chunk } @@ -136,6 +137,25 @@ func (s *mtsSender) load(c *ring.Chunk) error { // in s.buf is sent, otherwise the packet is added to s.buf. func (s *mtsSender) send() error { copy(s.pkt[:], s.chunk.Bytes()) + p := (*packet.Packet)(&s.pkt) + pid := p.PID() + cc := p.ContinuityCounter() + + if s.discard { + if pid != mts.PatPid { + return nil + } + s.discard = false + } + + if pid == mts.VideoPid { + if cc != s.repairer.expectedCC(pid) { + s.discard = true + s.buf = s.buf[:0] + return nil + } + } + if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() if err != nil { diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 5187dcb8..ff122032 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -80,7 +80,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { // expectedCC returns the expected cc. If the cc hasn't been used yet, then 16 // and false is returned. -func (dr *DiscontinuityRepairer) expectedCC() (byte, bool) { +func (dr *DiscontinuityRepairer) expectedCC(pid int) (byte, bool) { if dr.expCC == 16 { return 16, false } From 361f5edbc99d52cb42cc6df8496fba6cce26b42c Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 16 Feb 2019 14:26:51 +1030 Subject: [PATCH 11/43] stream/mts: wrote sender interface and a minimalHttpSender type --- revid/revid.go | 2 +- revid/senders.go | 59 ++++++++++++++++++++++++------------- stream/mts/discontinuity.go | 2 +- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 190d2c59..2016c53f 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -211,7 +211,7 @@ func (r *Revid) reset(config Config) error { case Http: switch r.Config().Packetization { case Mpegts: - r.destination = append(r.destination, newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil)) + r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)) default: r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) } diff --git a/revid/senders.go b/revid/senders.go index 6018ad97..8e102096 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,6 +45,28 @@ import ( "github.com/Comcast/gots/packet" ) +type sender interface { + send(d []byte) error +} + +// httpSender implements loadSender for posting HTTP to NetReceiver +type minimalHttpSender struct { + client *netsender.Sender + + log func(lvl int8, msg string, args ...interface{}) +} + +func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { + return &minimalHttpSender{ + client: ns, + log: log, + } +} + +func (s *minimalHttpSender) send(d []byte) error { + return httpSend(d, s.client, s.log) +} + // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. @@ -110,7 +132,7 @@ func (s *fileSender) close() error { // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. type mtsSender struct { - ls loadSender + sender sender buf []byte pkt [mts.PacketSize]byte fail bool @@ -120,9 +142,9 @@ type mtsSender struct { } // newmtsSender returns a new mtsSender. -func newMtsSender(s loadSender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { +func newMtsSender(s sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ - ls: s, + sender: s, repairer: mts.NewDiscontinuityRepairer(), } } @@ -178,20 +200,15 @@ func (s *mtsSender) failed() { // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. -func (s *mtsSender) fixAndSend() error { - err := s.repairer.Repair(s.buf) +func (ms *mtsSender) fixAndSend() error { + err := ms.repairer.Repair(ms.buf) if err != nil { return err } - err = s.ls.load(ring.NewChunk(s.buf)) + err = ms.sender.send(ms.buf) if err != nil { return err } - err = s.ls.send() - if err != nil { - return err - } - s.ls.release() return nil } @@ -234,13 +251,13 @@ func (s *httpSender) send() error { // if the chunk has been cleared. return nil } - return s.httpSend(s.chunk.Bytes()) + return httpSend(s.chunk.Bytes(), s.client, s.log) } -func (s *httpSender) httpSend(d []byte) error { +func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false - ip := s.client.Param("ip") + ip := client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { @@ -257,16 +274,16 @@ func (s *httpSender) httpSend(d []byte) error { } var err error var reply string - reply, _, err = s.client.Send(netsender.RequestRecv, pins) + reply, _, err = client.Send(netsender.RequestRecv, pins) if err != nil { return err } - return s.extractMeta(reply) + return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. -func (s *httpSender) extractMeta(r string) error { +func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil @@ -274,18 +291,18 @@ func (s *httpSender) extractMeta(r string) error { // Extract time from reply t, err := dec.Int("ts") if err != nil { - s.log(logger.Warning, pkg+"No timestamp in reply") + log(logger.Warning, pkg+"No timestamp in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) + log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply g, err := dec.String("ll") if err != nil { - s.log(logger.Warning, pkg+"No location in reply") + log(logger.Warning, pkg+"No location in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) + log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index ff122032..d1c6165a 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -59,7 +59,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { panic("Clip to repair must have PAT first") } cc := p.ContinuityCounter() - expect, exists := dr.expectedCC() + expect, exists := dr.expectedCC(3) dr.incExpectedCC() if !exists { dr.setExpectedCC(uint8(cc)) From 4ddf87d63d2f0ace2d90db6a405a57bd0b179308 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 16 Feb 2019 14:38:10 +1030 Subject: [PATCH 12/43] stream/mts/discontinuity.go: fixed expectedCC logic --- stream/mts/discontinuity.go | 41 ++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index d1c6165a..0c6f6071 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -34,18 +34,24 @@ import "github.com/Comcast/gots/packet" // discontinuityRepairer provides function to detect discontinuities in mpegts // and set the discontinuity indicator as appropriate. type DiscontinuityRepairer struct { - expCC uint8 + expCC map[int]int } // NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer. func NewDiscontinuityRepairer() *DiscontinuityRepairer { - return &DiscontinuityRepairer{expCC: 16} + return &DiscontinuityRepairer{ + expCC: map[int]int{ + PatPid: 16, + PmtPid: 16, + VideoPid: 16, + }, + } } // Failed is to be called in the case of a failed send. This will decrement the // expectedCC so that it aligns with the failed chunks cc. func (dr *DiscontinuityRepairer) Failed() { - dr.decExpectedCC() + dr.decExpectedCC(PatPid) } // Repair takes a clip of mpegts and checks that the first packet, which should @@ -55,14 +61,15 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { var pkt [PacketSize]byte copy(pkt[:], d[:PacketSize]) p := (*packet.Packet)(&pkt) - if p.PID() != PatPid { + pid := p.PID() + if pid != PatPid { panic("Clip to repair must have PAT first") } cc := p.ContinuityCounter() - expect, exists := dr.expectedCC(3) - dr.incExpectedCC() + expect, exists := dr.expectedCC(pid) + dr.incExpectedCC(pid) if !exists { - dr.setExpectedCC(uint8(cc)) + dr.setExpectedCC(pid, cc) } else if cc != int(expect) { if packet.ContainsAdaptationField(p) { (*packet.AdaptationField)(p).SetDiscontinuity(true) @@ -72,7 +79,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { return err } } - dr.setExpectedCC(uint8(cc)) + dr.setExpectedCC(pid, cc) copy(d[:PacketSize], pkt[:]) } return nil @@ -80,24 +87,24 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { // expectedCC returns the expected cc. If the cc hasn't been used yet, then 16 // and false is returned. -func (dr *DiscontinuityRepairer) expectedCC(pid int) (byte, bool) { - if dr.expCC == 16 { +func (dr *DiscontinuityRepairer) expectedCC(pid int) (int, bool) { + if dr.expCC[pid] == 16 { return 16, false } - return dr.expCC, true + return dr.expCC[pid], true } // incExpectedCC increments the expected cc. -func (dr *DiscontinuityRepairer) incExpectedCC() { - dr.expCC = (dr.expCC + 1) & 0xf +func (dr *DiscontinuityRepairer) incExpectedCC(pid int) { + dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf } // decExpectedCC decrements the expected cc. -func (dr *DiscontinuityRepairer) decExpectedCC() { - dr.expCC = (dr.expCC - 1) & 0xf +func (dr *DiscontinuityRepairer) decExpectedCC(pid int) { + dr.expCC[pid] = (dr.expCC[pid] - 1) & 0xf } // setExpectedCC sets the expected cc. -func (dr *DiscontinuityRepairer) setExpectedCC(cc uint8) { - dr.expCC = cc +func (dr *DiscontinuityRepairer) setExpectedCC(pid, cc int) { + dr.expCC[pid] = cc } From bb091f5961bb44bc8360e175d253b616c243c761 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 16 Feb 2019 16:33:39 +1030 Subject: [PATCH 13/43] revid: wrote test for mtsSender_test.go to see if the mtsSender is segmenting properly --- revid/mtsSender_test.go | 106 +++++++++++++++++++++++++++++++++++- revid/senders.go | 13 ++++- stream/mts/discontinuity.go | 10 ++-- stream/mts/encoder.go | 22 +++++++- 4 files changed, 141 insertions(+), 10 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 8c925b8d..d2ae56bc 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -28,10 +28,114 @@ LICENSE */ package revid -import "testing" +import ( + "fmt" + "testing" + "time" + + "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" + "github.com/Comcast/gots/packet" +) + +// Ring buffer sizes and read/write timeouts. +const ( + rbSize = 100 + rbElementSize = 150000 + wTimeout = 10 * time.Millisecond + rTimeout = 10 * time.Millisecond +) + +type testSender struct { + Buf [][]byte +} + +func (ts *testSender) send(d []byte) error { + ts.Buf = append(ts.Buf, d) + return nil +} + +func log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + fmt.Printf(msg, args) +} + +type Chunk struct { + buf []byte + off int + owner *ring.Buffer +} func TestSegment(t *testing.T) { + // Create ringbuffer tst sender, loadsender and the mpegts encoder + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + tstSender := &testSender{} + loadSender := newMtsSender(tstSender, log) + encoder := mts.NewEncoder(rb, 25) + // Turn time based psi writing off for encoder + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + // Our payload will just be packet no + encoder.Encode([]byte{byte(i)}) + + next, err := rb.Next(rTimeout) + if err != nil { + unexpectErr(err, t) + } + + err = loadSender.load(next) + if err != nil { + unexpectErr(err, t) + } + + err = loadSender.send() + if err != nil { + unexpectErr(err, t) + } + } + + result := tstSender.Buf + for clipNo, clip := range result { + // Check that the clip is the right length + clipLen := len(clip) + if clipLen != psiSendCount { + t.Errorf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount, clip) + } + + // Also check that the first packet is a PAT + firstPkt := clip[:mts.PacketSize] + var pkt [mts.PacketSize]byte + copy(pkt[:], firstPkt) + pid := (*packet.Packet)(&pkt).PID() + if pid != mts.PatPid { + t.Errorf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + } +} + +func unexpectErr(err error, t *testing.T) { + t.Errorf("Unexpected err: %v\n", err) } func TestDiscontinuity(t *testing.T) { diff --git a/revid/senders.go b/revid/senders.go index 8e102096..ba1dac93 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -171,7 +171,11 @@ func (s *mtsSender) send() error { } if pid == mts.VideoPid { - if cc != s.repairer.expectedCC(pid) { + expect, exists := s.repairer.ExpectedCC(pid) + if !exists { + s.repairer.SetExpectedCC(pid, cc) + } else if cc != expect { + s.repairer.SetExpectedCC(pid, cc) s.discard = true s.buf = s.buf[:0] return nil @@ -221,6 +225,8 @@ func (s *mtsSender) release() { s.fail = false s.buf = s.buf[:0] } + s.chunk.Close() + s.chunk = nil } // httpSender implements loadSender for posting HTTP to NetReceiver @@ -513,7 +519,10 @@ func (s *rtpSender) load(c *ring.Chunk) error { func (s *rtpSender) close() error { return nil } -func (s *rtpSender) release() {} +func (s *rtpSender) release() { + s.chunk.Close() + s.chunk = nil +} func (s *rtpSender) send() error { _, err := s.chunk.WriteTo(s.encoder) diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 0c6f6071..30f8dfc3 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -66,10 +66,10 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { panic("Clip to repair must have PAT first") } cc := p.ContinuityCounter() - expect, exists := dr.expectedCC(pid) + expect, exists := dr.ExpectedCC(pid) dr.incExpectedCC(pid) if !exists { - dr.setExpectedCC(pid, cc) + dr.SetExpectedCC(pid, cc) } else if cc != int(expect) { if packet.ContainsAdaptationField(p) { (*packet.AdaptationField)(p).SetDiscontinuity(true) @@ -79,7 +79,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { return err } } - dr.setExpectedCC(pid, cc) + dr.SetExpectedCC(pid, cc) copy(d[:PacketSize], pkt[:]) } return nil @@ -87,7 +87,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { // expectedCC returns the expected cc. If the cc hasn't been used yet, then 16 // and false is returned. -func (dr *DiscontinuityRepairer) expectedCC(pid int) (int, bool) { +func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) { if dr.expCC[pid] == 16 { return 16, false } @@ -105,6 +105,6 @@ func (dr *DiscontinuityRepairer) decExpectedCC(pid int) { } // setExpectedCC sets the expected cc. -func (dr *DiscontinuityRepairer) setExpectedCC(pid, cc int) { +func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) { dr.expCC[pid] = cc } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index e63c15bc..f1a11263 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -85,7 +85,8 @@ var ( ) const ( - psiInterval = 1 * time.Second + psiInterval = 1 * time.Second + psiSendCount = 7 ) // Meta allows addition of metadata to encoded mts from outside of this pkg. @@ -130,6 +131,10 @@ type Encoder struct { continuity map[int]byte + timeBasedPsi bool + pktCount int + psiSendCount int + psiLastTime time.Time } @@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { frameInterval: time.Duration(float64(time.Second) / fps), ptsOffset: ptsOffset, + timeBasedPsi: true, + + pktCount: 8, + continuity: map[int]byte{ patPid: 0, pmtPid: 0, @@ -159,15 +168,21 @@ const ( hasPTS = 0x2 ) +func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { + e.timeBasedPsi = b + e.psiSendCount = sendCount +} + // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { now := time.Now() - if now.Sub(e.psiLastTime) > psiInterval { + if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount > e.psiSendCount { err := e.writePSI() if err != nil { return err } + e.pktCount = 0 e.psiLastTime = now } @@ -204,6 +219,7 @@ func (e *Encoder) Encode(nalu []byte) error { if err != nil { return err } + e.pktCount++ } e.tick() @@ -226,6 +242,7 @@ func (e *Encoder) writePSI() error { if err != nil { return err } + e.pktCount++ pmtTable, err = updateMeta(pmtTable) if err != nil { return err @@ -243,6 +260,7 @@ func (e *Encoder) writePSI() error { if err != nil { return err } + e.pktCount++ return nil } From 3356457c71fcb50ad7fd6f65311c54e7ac84d1c9 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 16 Feb 2019 16:43:15 +1030 Subject: [PATCH 14/43] revid: not wrapping t.Errorf --- revid/mtsSender_test.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index d2ae56bc..0329afe4 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -101,17 +101,17 @@ func TestSegment(t *testing.T) { next, err := rb.Next(rTimeout) if err != nil { - unexpectErr(err, t) + t.Errorf("Unexpected err: %v\n", err) } err = loadSender.load(next) if err != nil { - unexpectErr(err, t) + t.Errorf("Unexpected err: %v\n", err) } err = loadSender.send() if err != nil { - unexpectErr(err, t) + t.Errorf("Unexpected err: %v\n", err) } } @@ -134,10 +134,6 @@ func TestSegment(t *testing.T) { } } -func unexpectErr(err error, t *testing.T) { - t.Errorf("Unexpected err: %v\n", err) -} - func TestDiscontinuity(t *testing.T) { } From a5cb1c5abb476166fb7350e0b57e9e962cd6ff58 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 16 Feb 2019 23:52:40 +1030 Subject: [PATCH 15/43] stream/mts: made modifications such that the segment test is passing --- revid/mtsSender_test.go | 30 ++++++++++++++++++++---------- revid/revid.go | 32 +++++++++++++++++++++++++++++--- revid/senders.go | 4 +--- stream/mts/encoder.go | 1 + stream/mts/meta/meta.go | 3 +++ 5 files changed, 54 insertions(+), 16 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 0329afe4..f89f31cd 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -34,6 +34,7 @@ import ( "time" "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" "github.com/Comcast/gots/packet" @@ -77,18 +78,24 @@ func log(lvl int8, msg string, args ...interface{}) { fmt.Printf(msg, args) } -type Chunk struct { - buf []byte - off int - owner *ring.Buffer +type tstPacker struct { + rb *ring.Buffer +} + +func (p *tstPacker) Write(d []byte) (int, error) { + n, err := p.rb.Write(d) + p.rb.Flush() + return n, err } func TestSegment(t *testing.T) { + mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &testSender{} loadSender := newMtsSender(tstSender, log) - encoder := mts.NewEncoder(rb, 25) + packer := tstPacker{rb: rb} + encoder := mts.NewEncoder(&packer, 25) // Turn time based psi writing off for encoder const psiSendCount = 10 @@ -98,21 +105,24 @@ func TestSegment(t *testing.T) { for i := 0; i < noOfPacketsToWrite; i++ { // Our payload will just be packet no encoder.Encode([]byte{byte(i)}) + rb.Flush() next, err := rb.Next(rTimeout) if err != nil { - t.Errorf("Unexpected err: %v\n", err) + t.Fatalf("Unexpected err: %v\n", err) } err = loadSender.load(next) if err != nil { - t.Errorf("Unexpected err: %v\n", err) + t.Fatalf("Unexpected err: %v\n", err) } err = loadSender.send() if err != nil { - t.Errorf("Unexpected err: %v\n", err) + t.Fatalf("Unexpected err: %v\n", err) } + + loadSender.release() } result := tstSender.Buf @@ -120,7 +130,7 @@ func TestSegment(t *testing.T) { // Check that the clip is the right length clipLen := len(clip) if clipLen != psiSendCount { - t.Errorf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount, clip) + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } // Also check that the first packet is a PAT @@ -129,7 +139,7 @@ func TestSegment(t *testing.T) { copy(pkt[:], firstPkt) pid := (*packet.Packet)(&pkt).PID() if pid != mts.PatPid { - t.Errorf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) + t.Fatalf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) } } } diff --git a/revid/revid.go b/revid/revid.go index 2016c53f..fa594566 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -131,6 +131,32 @@ type packer struct { packetCount uint } +// Write implements the io.Writer interface. +// +// Unless the ring buffer returns an error, all writes +// are deemed to be successful, although a successful +// write may include a dropped frame. +func (p *packer) Write(frame []byte) (int, error) { + if len(frame) > ringBufferElementSize { + p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) + return len(frame), nil + } + + n, err := p.owner.buffer.Write(frame) + if err != nil { + if err == ring.ErrDropped { + p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) + return len(frame), nil + } + p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) + return n, err + } + + p.owner.buffer.Flush() + + return len(frame), nil +} + // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -263,13 +289,13 @@ func (r *Revid) reset(config Config) error { } } } - r.encoder = stream.NopEncoder(r.buffer) + r.encoder = stream.NopEncoder(&r.packer) case Mpegts: r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation") - r.encoder = mts.NewEncoder(r.buffer, float64(r.config.FrameRate)) + r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate)) case Flv: r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation") - r.encoder, err = flv.NewEncoder(r.buffer, true, true, int(r.config.FrameRate)) + r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate)) if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error()) } diff --git a/revid/senders.go b/revid/senders.go index ba1dac93..870cf15f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -169,7 +169,6 @@ func (s *mtsSender) send() error { } s.discard = false } - if pid == mts.VideoPid { expect, exists := s.repairer.ExpectedCC(pid) if !exists { @@ -181,8 +180,7 @@ func (s *mtsSender) send() error { return nil } } - - if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { + if s.fail || (pid == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() if err != nil { s.failed() diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index f1a11263..e262abe8 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,6 +171,7 @@ const ( func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { e.timeBasedPsi = b e.psiSendCount = sendCount + e.pktCount = e.psiSendCount + 1 } // generate handles the incoming data and generates equivalent mpegts packets - diff --git a/stream/mts/meta/meta.go b/stream/mts/meta/meta.go index 481b5ae5..188c2d4e 100644 --- a/stream/mts/meta/meta.go +++ b/stream/mts/meta/meta.go @@ -144,6 +144,9 @@ func (m *Data) Delete(key string) { // Encode takes the meta data map and encodes into a byte slice with header // describing the version, length of data and data in TSV format. func (m *Data) Encode() []byte { + if m.enc == nil { + panic("Meta has not been initialized yet") + } m.enc = m.enc[:headSize] // Iterate over map and append entries, only adding tab if we're not on the From b3b8c6bb443c2ab40398405b788b06cbed8b7c32 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 00:31:30 +1030 Subject: [PATCH 16/43] stream/mts: checking data is also good in mts segment test --- revid/mtsSender_test.go | 103 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 3 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index f89f31cd..14144c54 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -38,6 +38,7 @@ import ( "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/pes" ) // Ring buffer sizes and read/write timeouts. @@ -49,11 +50,18 @@ const ( ) type testSender struct { - Buf [][]byte + Buf [][]byte + tstDiscon bool + disconAt int + curPktNo int } func (ts *testSender) send(d []byte) error { + if ts.tstDiscon && ts.curPktNo == ts.disconAt { + return nil + } ts.Buf = append(ts.Buf, d) + ts.curPktNo++ return nil } @@ -139,11 +147,100 @@ func TestSegment(t *testing.T) { copy(pkt[:], firstPkt) pid := (*packet.Packet)(&pkt).PID() if pid != mts.PatPid { - t.Fatalf("First packte of clip %v is not pat, but rather: %v\n", clipNo, pid) + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], firstPkt) + p := (*packet.Packet)(&pkt) + pid := p.PID() + if pid == mts.VideoPid { + // Mts payload + payload, err := p.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse pes from the mts payload + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the pes packet and convert to int + data := int(pes.Data()[0]) + + // Calc expected data in the pes and then check + expectedData := clipNo*10 + ((i / mts.PacketSize) - 2) + if data != expectedData { + t.Fatalf("Did not get expected pkt data. Got: %v, want: %v\n", data, expectedData) + } + } } } } -func TestDiscontinuity(t *testing.T) { +/* +func TestSendFailDiscontinuity(t *testing.T) { + mts.Meta = meta.New() + // Create ringbuffer tst sender, loadsender and the mpegts encoder + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + const disconClipNo = 3 + tstSender := &testSender{tstDiscon: true, disconAt: disconClipNo} + loadSender := newMtsSender(tstSender, log) + packer := tstPacker{rb: rb} + encoder := mts.NewEncoder(&packer, 25) + // Turn time based psi writing off for encoder + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + // Our payload will just be packet no + encoder.Encode([]byte{byte(i)}) + rb.Flush() + + next, err := rb.Next(rTimeout) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + err = loadSender.send() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + loadSender.release() + } + + result := tstSender.Buf + + // First check that we have less clips + expectedLen := ((noOfPacketsToWrite / psiSendCount) - 1) + gotLen := len(result) + if gotLen != expectedLen { + t.Fatalf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) + } + + // Now check that the discontonuity indicator is set at the disconClip pat + disconClip := result[disconClipNo] + firstPkt := disconClip[:mts.PacketSize] + var pkt [mts.PacketSize]byte + copy(pkt[:], firstPkt) + discon, err := (*packet.AdaptationField)((*packet.Packet)(&pkt)).Discontinuity() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + if !discon { + t.Fatalf("Did not get discontinuity indicator for PAT") + } } +*/ From 3f3d587eeb2604dea70c4183102e06b42cbabd72 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 03:35:59 +1030 Subject: [PATCH 17/43] revid: mtsSender test for segmenting actually working now --- revid/mtsSender_test.go | 64 +++++++++++++++++++------------------ revid/senders.go | 2 ++ stream/mts/discontinuity.go | 4 +-- stream/mts/encoder.go | 6 ++-- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 14144c54..3a275ba0 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -29,6 +29,7 @@ LICENSE package revid import ( + "errors" "fmt" "testing" "time" @@ -58,9 +59,11 @@ type testSender struct { func (ts *testSender) send(d []byte) error { if ts.tstDiscon && ts.curPktNo == ts.disconAt { - return nil + return errors.New("could not send") } - ts.Buf = append(ts.Buf, d) + cpy := make([]byte, len(d)) + copy(cpy, d) + ts.Buf = append(ts.Buf, cpy) ts.curPktNo++ return nil } @@ -100,10 +103,10 @@ func TestSegment(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - tstSender := &testSender{} + tstSender := &testSender{Buf: make([][]byte, 0)} loadSender := newMtsSender(tstSender, log) - packer := tstPacker{rb: rb} - encoder := mts.NewEncoder(&packer, 25) + packer := &tstPacker{rb: rb} + encoder := mts.NewEncoder(packer, 25) // Turn time based psi writing off for encoder const psiSendCount = 10 @@ -115,29 +118,32 @@ func TestSegment(t *testing.T) { encoder.Encode([]byte{byte(i)}) rb.Flush() - next, err := rb.Next(rTimeout) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } - err = loadSender.load(next) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } - err = loadSender.send() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + err = loadSender.send() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } - loadSender.release() + loadSender.release() + } } result := tstSender.Buf + expectData := 0 for clipNo, clip := range result { // Check that the clip is the right length clipLen := len(clip) - if clipLen != psiSendCount { + if clipLen != psiSendCount*mts.PacketSize { t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } @@ -152,6 +158,7 @@ func TestSegment(t *testing.T) { // Check that the clip data is okay for i := 0; i < len(clip); i += mts.PacketSize { + firstPkt := clip[i : i+mts.PacketSize] copy(pkt[:], firstPkt) p := (*packet.Packet)(&pkt) pid := p.PID() @@ -169,19 +176,18 @@ func TestSegment(t *testing.T) { } // Get the data from the pes packet and convert to int - data := int(pes.Data()[0]) + data := int8(pes.Data()[0]) // Calc expected data in the pes and then check - expectedData := clipNo*10 + ((i / mts.PacketSize) - 2) - if data != expectedData { - t.Fatalf("Did not get expected pkt data. Got: %v, want: %v\n", data, expectedData) + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) } + expectData++ } } } } -/* func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder @@ -212,10 +218,7 @@ func TestSendFailDiscontinuity(t *testing.T) { t.Fatalf("Unexpected err: %v\n", err) } - err = loadSender.send() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } + _ = loadSender.send() loadSender.release() } @@ -226,9 +229,8 @@ func TestSendFailDiscontinuity(t *testing.T) { expectedLen := ((noOfPacketsToWrite / psiSendCount) - 1) gotLen := len(result) if gotLen != expectedLen { - t.Fatalf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) + t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } - // Now check that the discontonuity indicator is set at the disconClip pat disconClip := result[disconClipNo] firstPkt := disconClip[:mts.PacketSize] @@ -242,5 +244,5 @@ func TestSendFailDiscontinuity(t *testing.T) { if !discon { t.Fatalf("Did not get discontinuity indicator for PAT") } + } -*/ diff --git a/revid/senders.go b/revid/senders.go index 870cf15f..54712cd1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -171,8 +171,10 @@ func (s *mtsSender) send() error { } if pid == mts.VideoPid { expect, exists := s.repairer.ExpectedCC(pid) + s.repairer.IncExpectedCC(pid) if !exists { s.repairer.SetExpectedCC(pid, cc) + s.repairer.IncExpectedCC(pid) } else if cc != expect { s.repairer.SetExpectedCC(pid, cc) s.discard = true diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 30f8dfc3..b7cc3bed 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -67,7 +67,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { } cc := p.ContinuityCounter() expect, exists := dr.ExpectedCC(pid) - dr.incExpectedCC(pid) + dr.IncExpectedCC(pid) if !exists { dr.SetExpectedCC(pid, cc) } else if cc != int(expect) { @@ -95,7 +95,7 @@ func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) { } // incExpectedCC increments the expected cc. -func (dr *DiscontinuityRepairer) incExpectedCC(pid int) { +func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) { dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index e262abe8..fec41608 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,19 +171,19 @@ const ( func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { e.timeBasedPsi = b e.psiSendCount = sendCount - e.pktCount = e.psiSendCount + 1 + e.pktCount = e.psiSendCount } // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { now := time.Now() - if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount > e.psiSendCount { + if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount >= e.psiSendCount { + e.pktCount = 0 err := e.writePSI() if err != nil { return err } - e.pktCount = 0 e.psiLastTime = now } From 819c9a784c730c287d289320b9ea85bb99394de2 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 05:32:44 +1030 Subject: [PATCH 18/43] revid: mtsSender_test.go passing segmenting and discontinuity tests --- revid/mtsSender_test.go | 33 ++++++++++++--------- revid/senders.go | 64 ++++++++++++++--------------------------- 2 files changed, 41 insertions(+), 56 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 3a275ba0..094d1844 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -59,6 +59,8 @@ type testSender struct { func (ts *testSender) send(d []byte) error { if ts.tstDiscon && ts.curPktNo == ts.disconAt { + fmt.Println("SendFailed") + ts.curPktNo++ return errors.New("could not send") } cpy := make([]byte, len(d)) @@ -103,7 +105,7 @@ func TestSegment(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - tstSender := &testSender{Buf: make([][]byte, 0)} + tstSender := &testSender{} loadSender := newMtsSender(tstSender, log) packer := &tstPacker{rb: rb} encoder := mts.NewEncoder(packer, 25) @@ -141,6 +143,7 @@ func TestSegment(t *testing.T) { result := tstSender.Buf expectData := 0 for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) // Check that the clip is the right length clipLen := len(clip) if clipLen != psiSendCount*mts.PacketSize { @@ -208,25 +211,27 @@ func TestSendFailDiscontinuity(t *testing.T) { encoder.Encode([]byte{byte(i)}) rb.Flush() - next, err := rb.Next(rTimeout) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } + + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + _ = loadSender.send() + + loadSender.release() } - - err = loadSender.load(next) - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - - _ = loadSender.send() - - loadSender.release() } result := tstSender.Buf // First check that we have less clips - expectedLen := ((noOfPacketsToWrite / psiSendCount) - 1) + expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 gotLen := len(result) if gotLen != expectedLen { t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) diff --git a/revid/senders.go b/revid/senders.go index 54712cd1..9f4745cd 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -132,13 +132,14 @@ func (s *fileSender) close() error { // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. type mtsSender struct { - sender sender - buf []byte - pkt [mts.PacketSize]byte - fail bool - discard bool - repairer *mts.DiscontinuityRepairer - chunk *ring.Chunk + sender sender + buf []byte + next []byte + pkt [mts.PacketSize]byte + failed bool + discarded bool + repairer *mts.DiscontinuityRepairer + chunk *ring.Chunk } // newmtsSender returns a new mtsSender. @@ -158,50 +159,29 @@ func (s *mtsSender) load(c *ring.Chunk) error { // send checks the most recently loaded packet and if it is a PAT then the clip // in s.buf is sent, otherwise the packet is added to s.buf. func (s *mtsSender) send() error { - copy(s.pkt[:], s.chunk.Bytes()) + if s.next != nil { + s.buf = append(s.buf, s.next...) + } + bytes := s.chunk.Bytes() + cpy := make([]byte, len(bytes)) + copy(cpy, bytes) + s.next = cpy + + copy(s.pkt[:], cpy) p := (*packet.Packet)(&s.pkt) pid := p.PID() - cc := p.ContinuityCounter() - - if s.discard { - if pid != mts.PatPid { - return nil - } - s.discard = false - } - if pid == mts.VideoPid { - expect, exists := s.repairer.ExpectedCC(pid) - s.repairer.IncExpectedCC(pid) - if !exists { - s.repairer.SetExpectedCC(pid, cc) - s.repairer.IncExpectedCC(pid) - } else if cc != expect { - s.repairer.SetExpectedCC(pid, cc) - s.discard = true - s.buf = s.buf[:0] - return nil - } - } - if s.fail || (pid == mts.PatPid && len(s.buf) != 0) { + if pid == mts.PatPid && len(s.buf) > 0 { err := s.fixAndSend() if err != nil { - s.failed() + s.failed = true + s.repairer.Failed() return err } - s.fail = false s.buf = s.buf[:0] } - s.buf = append(s.buf, s.chunk.Bytes()...) return nil } -// failed sets the s.fail flag to true, and let's the discontinuity -// repairer know that there has been a failed send. -func (s *mtsSender) failed() { - s.fail = true - s.repairer.Failed() -} - // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. func (ms *mtsSender) fixAndSend() error { @@ -221,9 +201,9 @@ func (s *mtsSender) close() error { return nil } // release will set the s.fail flag to fals and clear the buffer if // the previous send was a fail. func (s *mtsSender) release() { - if s.fail { - s.fail = false + if s.failed { s.buf = s.buf[:0] + s.failed = false } s.chunk.Close() s.chunk = nil From ca0a008c597580ddbaedb0f8d36e75920cc7fd2e Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 12:50:33 +1030 Subject: [PATCH 19/43] revid: correct cc logic is discontinuity.go --- revid/mtsSender_test.go | 2 -- stream/mts/discontinuity.go | 12 ++++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 094d1844..de1b3ef4 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -59,7 +59,6 @@ type testSender struct { func (ts *testSender) send(d []byte) error { if ts.tstDiscon && ts.curPktNo == ts.disconAt { - fmt.Println("SendFailed") ts.curPktNo++ return errors.New("could not send") } @@ -135,7 +134,6 @@ func TestSegment(t *testing.T) { if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - loadSender.release() } } diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index b7cc3bed..34624554 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -29,7 +29,9 @@ LICENSE package mts -import "github.com/Comcast/gots/packet" +import ( + "github.com/Comcast/gots/packet" +) // discontinuityRepairer provides function to detect discontinuities in mpegts // and set the discontinuity indicator as appropriate. @@ -66,11 +68,8 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { panic("Clip to repair must have PAT first") } cc := p.ContinuityCounter() - expect, exists := dr.ExpectedCC(pid) - dr.IncExpectedCC(pid) - if !exists { - dr.SetExpectedCC(pid, cc) - } else if cc != int(expect) { + expect, _ := dr.ExpectedCC(pid) + if cc != int(expect) { if packet.ContainsAdaptationField(p) { (*packet.AdaptationField)(p).SetDiscontinuity(true) } else { @@ -82,6 +81,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { dr.SetExpectedCC(pid, cc) copy(d[:PacketSize], pkt[:]) } + dr.IncExpectedCC(pid) return nil } From f1625d27f55264f734608152dd0e0b4c159ef20f Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 13:11:17 +1030 Subject: [PATCH 20/43] revid: fixed destination slice bug --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index fa594566..e65d3b51 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -213,7 +213,7 @@ func (r *Revid) reset(config Config) error { } } - r.destination = make([]loadSender, len(r.config.Outputs)) + r.destination = make([]loadSender, 0, len(r.config.Outputs)) for _, typ := range r.config.Outputs { switch typ { case File: From 9e9e349cbfc3354b0d508412b3516c44086b2b9e Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 17 Feb 2019 13:27:01 +1030 Subject: [PATCH 21/43] stream: reverted some changes I made to tick() in both mts encoder and rtp encoder --- stream/mts/encoder.go | 4 +--- stream/rtp/encoder.go | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index fec41608..78a841f5 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -267,9 +267,7 @@ func (e *Encoder) writePSI() error { // tick advances the clock one frame interval. func (e *Encoder) tick() { - now := time.Now() - e.clock += now.Sub(e.lastTime) - e.lastTime = now + e.clock += e.frameInterval } // pts retuns the current presentation timestamp. diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 28119b43..26016a23 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -121,9 +121,7 @@ func (e *Encoder) Encode(payload []byte) error { // tick advances the clock one frame interval. func (e *Encoder) tick() { - now := time.Now() - e.clock += now.Sub(e.lastTime) - e.lastTime = now + e.clock += e.frameInterval } // nxtTimestamp gets the next timestamp From 31025a114abba107b2b40b10275ce035790c21f8 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 28 Feb 2019 18:19:06 +1030 Subject: [PATCH 22/43] stream/mts: fixed logic regarding psi writing frequency --- stream/mts/encoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 78a841f5..74df4746 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -178,7 +178,7 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { now := time.Now() - if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || e.pktCount >= e.psiSendCount { + if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { e.pktCount = 0 err := e.writePSI() if err != nil { From ae4debd388127068b68638fd48518d63f4b8dafe Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 10:00:33 +1030 Subject: [PATCH 23/43] revid & stream/mts: fixed file headers regarding licensing location --- revid/mtsSender_test.go | 2 +- revid/senders.go | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index de1b3ef4..9a0cfab3 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -24,7 +24,7 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid diff --git a/revid/senders.go b/revid/senders.go index 85f86a7d..03b4afc6 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -140,6 +140,7 @@ type mtsSender struct { discarded bool repairer *mts.DiscontinuityRepairer chunk *ring.Chunk + curPid int } // newmtsSender returns a new mtsSender. @@ -153,12 +154,6 @@ func newMtsSender(s sender, log func(lvl int8, msg string, args ...interface{})) // load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. func (s *mtsSender) load(c *ring.Chunk) error { s.chunk = c - return nil -} - -// send checks the most recently loaded packet and if it is a PAT then the clip -// in s.buf is sent, otherwise the packet is added to s.buf. -func (s *mtsSender) send() error { if s.next != nil { s.buf = append(s.buf, s.next...) } @@ -169,8 +164,14 @@ func (s *mtsSender) send() error { copy(s.pkt[:], cpy) p := (*packet.Packet)(&s.pkt) - pid := p.PID() - if pid == mts.PatPid && len(s.buf) > 0 { + s.curPid = p.PID() + return nil +} + +// send checks the most recently loaded packet and if it is a PAT then the clip +// in s.buf is sent, otherwise the packet is added to s.buf. +func (s *mtsSender) send() error { + if s.curPid == mts.PatPid && len(s.buf) > 0 { err := s.fixAndSend() if err != nil { s.failed = true From 6188d756fd79926e57c12d61d8fa1bf2c868d51c Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 10:01:52 +1030 Subject: [PATCH 24/43] revid: moved some mtsSender logic into load rather than send so that we're not doing the same work over and over again in the case of send failure and retry --- revid/mtsSender_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 9a0cfab3..de1b3ef4 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -24,7 +24,7 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - in gpl.txt. If not, see http://www.gnu.org/licenses. + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid From 559d15d6eb8e82dbe1c22a73e39bbb18f3781144 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 10:02:43 +1030 Subject: [PATCH 25/43] revid & stream/mts: fixed file headers in mtsSender_test.go and discontinuity.go regarding licensing location --- revid/mtsSender_test.go | 2 +- stream/mts/discontinuity.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index de1b3ef4..9a0cfab3 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -24,7 +24,7 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 34624554..68905bd4 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -24,7 +24,7 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). + in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ package mts From 91d21f1da8457bfa4998cfb4a314e8fde711ddb5 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 10:03:39 +1030 Subject: [PATCH 26/43] revid: fixed import stanzas in mtsSender_test.go file header --- revid/mtsSender_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 9a0cfab3..c96613cb 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -34,12 +34,13 @@ import ( "testing" "time" + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/pes" + "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" - "github.com/Comcast/gots/packet" - "github.com/Comcast/gots/pes" ) // Ring buffer sizes and read/write timeouts. From 74110dee313e1b35c3e669eff4f8bbd2da004032 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 10:16:54 +1030 Subject: [PATCH 27/43] revid: sender interface renamed to Sender and tstSender in mtsSender_test.go renamed to sender --- revid/mtsSender_test.go | 8 ++++---- revid/senders.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index c96613cb..c4a4f1ab 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -51,14 +51,14 @@ const ( rTimeout = 10 * time.Millisecond ) -type testSender struct { +type sender struct { Buf [][]byte tstDiscon bool disconAt int curPktNo int } -func (ts *testSender) send(d []byte) error { +func (ts *sender) send(d []byte) error { if ts.tstDiscon && ts.curPktNo == ts.disconAt { ts.curPktNo++ return errors.New("could not send") @@ -105,7 +105,7 @@ func TestSegment(t *testing.T) { mts.Meta = meta.New() // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - tstSender := &testSender{} + tstSender := &sender{} loadSender := newMtsSender(tstSender, log) packer := &tstPacker{rb: rb} encoder := mts.NewEncoder(packer, 25) @@ -195,7 +195,7 @@ func TestSendFailDiscontinuity(t *testing.T) { // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const disconClipNo = 3 - tstSender := &testSender{tstDiscon: true, disconAt: disconClipNo} + tstSender := &sender{tstDiscon: true, disconAt: disconClipNo} loadSender := newMtsSender(tstSender, log) packer := tstPacker{rb: rb} encoder := mts.NewEncoder(&packer, 25) diff --git a/revid/senders.go b/revid/senders.go index 03b4afc6..9c8619b3 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,7 +45,7 @@ import ( "github.com/Comcast/gots/packet" ) -type sender interface { +type Sender interface { send(d []byte) error } @@ -132,7 +132,7 @@ func (s *fileSender) close() error { // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. type mtsSender struct { - sender sender + sender Sender buf []byte next []byte pkt [mts.PacketSize]byte @@ -144,7 +144,7 @@ type mtsSender struct { } // newmtsSender returns a new mtsSender. -func newMtsSender(s sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { +func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ sender: s, repairer: mts.NewDiscontinuityRepairer(), From b3f2439c5b361c32598b561dce9c34765242e7ee Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 11:11:20 +1030 Subject: [PATCH 28/43] revid: unexported buf in sender within mtsSender_test.go --- revid/mtsSender_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index c4a4f1ab..c6bbe48a 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -52,7 +52,7 @@ const ( ) type sender struct { - Buf [][]byte + buf [][]byte tstDiscon bool disconAt int curPktNo int @@ -65,7 +65,7 @@ func (ts *sender) send(d []byte) error { } cpy := make([]byte, len(d)) copy(cpy, d) - ts.Buf = append(ts.Buf, cpy) + ts.buf = append(ts.buf, cpy) ts.curPktNo++ return nil } @@ -139,7 +139,7 @@ func TestSegment(t *testing.T) { } } - result := tstSender.Buf + result := tstSender.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -227,7 +227,7 @@ func TestSendFailDiscontinuity(t *testing.T) { } } - result := tstSender.Buf + result := tstSender.buf // First check that we have less clips expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 From 8baff93918e3c6625073adf4e4100709b08bedca Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 11:17:40 +1030 Subject: [PATCH 29/43] revid: renamed fields of sender in mtsSender_test.go --- revid/mtsSender_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index c6bbe48a..c5d91cdd 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -52,21 +52,21 @@ const ( ) type sender struct { - buf [][]byte - tstDiscon bool - disconAt int - curPktNo int + buf [][]byte + testDiscontinuities bool + discontinuityAt int + currentPkt int } func (ts *sender) send(d []byte) error { - if ts.tstDiscon && ts.curPktNo == ts.disconAt { - ts.curPktNo++ + if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.currentPkt++ return errors.New("could not send") } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) - ts.curPktNo++ + ts.currentPkt++ return nil } @@ -195,7 +195,7 @@ func TestSendFailDiscontinuity(t *testing.T) { // Create ringbuffer tst sender, loadsender and the mpegts encoder rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const disconClipNo = 3 - tstSender := &sender{tstDiscon: true, disconAt: disconClipNo} + tstSender := &sender{testDiscontinuities: true, discontinuityAt: disconClipNo} loadSender := newMtsSender(tstSender, log) packer := tstPacker{rb: rb} encoder := mts.NewEncoder(&packer, 25) From 5eb832e6c380a2fd5d69b674368223dfc69998fa Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 13:28:34 +1030 Subject: [PATCH 30/43] revid: cleaned up documentation in senders.go and mtsSender_test.go --- revid/mtsSender_test.go | 41 ++++++++++++++++++++++++++--------------- revid/senders.go | 39 +++++++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 31 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index c5d91cdd..5c42e72c 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -51,6 +51,8 @@ const ( rTimeout = 10 * time.Millisecond ) +// sender simulates sending of video data, creating discontinuities if +// testDiscontinuities is set to true. type sender struct { buf [][]byte testDiscontinuities bool @@ -58,6 +60,8 @@ type sender struct { currentPkt int } +// send takes d and neglects if testDiscontinuities is true, returning an error, +// otherwise d is appended to senders buf. func (ts *sender) send(d []byte) error { if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { ts.currentPkt++ @@ -70,6 +74,8 @@ func (ts *sender) send(d []byte) error { return nil } +// log implements the required logging func for some of the structs in use +// within tests. func log(lvl int8, msg string, args ...interface{}) { var l string switch lvl { @@ -91,32 +97,38 @@ func log(lvl int8, msg string, args ...interface{}) { fmt.Printf(msg, args) } +// tstPacker implements io.Writer and handles the writing of data to the +// ringBuffer used in tests. type tstPacker struct { rb *ring.Buffer } +// Write writes to tstPacker's ringBuffer. func (p *tstPacker) Write(d []byte) (int, error) { n, err := p.rb.Write(d) p.rb.Flush() return n, err } +// TestSegment ensures that the mtsSender correctly segments data into clips +// based on positioning of PSI in the mtsEncoder's output stream. func TestSegment(t *testing.T) { mts.Meta = meta.New() - // Create ringbuffer tst sender, loadsender and the mpegts encoder + // Create ringBuffer, sender, loadsender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &sender{} loadSender := newMtsSender(tstSender, log) packer := &tstPacker{rb: rb} encoder := mts.NewEncoder(packer, 25) - // Turn time based psi writing off for encoder + // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet no + // Insert a payload so that we check that the segmentation works correctly + // in this regard. Packet no will be used. encoder.Encode([]byte{byte(i)}) rb.Flush() @@ -143,13 +155,13 @@ func TestSegment(t *testing.T) { expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) - // Check that the clip is the right length + // Check that the clip is of expected length. clipLen := len(clip) if clipLen != psiSendCount*mts.PacketSize { t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) } - // Also check that the first packet is a PAT + // Also check that the first packet is a PAT. firstPkt := clip[:mts.PacketSize] var pkt [mts.PacketSize]byte copy(pkt[:], firstPkt) @@ -158,29 +170,28 @@ func TestSegment(t *testing.T) { t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) } - // Check that the clip data is okay + // Check that the clip data is okay. for i := 0; i < len(clip); i += mts.PacketSize { firstPkt := clip[i : i+mts.PacketSize] copy(pkt[:], firstPkt) p := (*packet.Packet)(&pkt) pid := p.PID() if pid == mts.VideoPid { - // Mts payload payload, err := p.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - // Parse pes from the mts payload + // Parse PES from the MTS payload. pes, err := pes.NewPESHeader(payload) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - // Get the data from the pes packet and convert to int + // Get the data from the PES packet and convert to an int. data := int8(pes.Data()[0]) - // Calc expected data in the pes and then check + // Calc expected data in the PES and then check. if data != int8(expectData) { t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) } @@ -192,7 +203,7 @@ func TestSegment(t *testing.T) { func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() - // Create ringbuffer tst sender, loadsender and the mpegts encoder + // Create ringBuffer sender, loadSender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const disconClipNo = 3 tstSender := &sender{testDiscontinuities: true, discontinuityAt: disconClipNo} @@ -200,13 +211,13 @@ func TestSendFailDiscontinuity(t *testing.T) { packer := tstPacker{rb: rb} encoder := mts.NewEncoder(&packer, 25) - // Turn time based psi writing off for encoder + // Turn time based PSI writing off for encoder. const psiSendCount = 10 encoder.TimeBasedPsi(false, psiSendCount) const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet no + // Our payload will just be packet no. encoder.Encode([]byte{byte(i)}) rb.Flush() @@ -229,13 +240,13 @@ func TestSendFailDiscontinuity(t *testing.T) { result := tstSender.buf - // First check that we have less clips + // First check that we have less clips as expected. expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 gotLen := len(result) if gotLen != expectedLen { t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } - // Now check that the discontonuity indicator is set at the disconClip pat + // Now check that the discontinuity indicator is set at the discontinuityClip PAT. disconClip := result[disconClipNo] firstPkt := disconClip[:mts.PacketSize] var pkt [mts.PacketSize]byte diff --git a/revid/senders.go b/revid/senders.go index 9c8619b3..46a5ad24 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,17 +45,21 @@ import ( "github.com/Comcast/gots/packet" ) +// Sender is intended to provided functionality for the sending of a byte slice +// to an implemented destination. type Sender interface { + // send takes the bytes slice d and sends to a particular destination as + // implemented. send(d []byte) error } -// httpSender implements loadSender for posting HTTP to NetReceiver +// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. type minimalHttpSender struct { client *netsender.Sender - - log func(lvl int8, msg string, args ...interface{}) + log func(lvl int8, msg string, args ...interface{}) } +// newMinimalHttpSender returns a pointer to a new minimalHttpSender. func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { return &minimalHttpSender{ client: ns, @@ -63,6 +67,7 @@ func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, a } } +// send takes a bytes slice d and sends to http using s' http client. func (s *minimalHttpSender) send(d []byte) error { return httpSend(d, s.client, s.log) } @@ -127,10 +132,10 @@ func (s *fileSender) close() error { return s.file.Close() } -// mtsSender provides sending capability specifically for use with -// mpegts packetization. It handles the construction of appropriately lengthed -// clips based on PSI. It also fixes accounts for discontinuities by setting -// the discontinuity indicator for the first packet of a clip. +// mtsSender implemented loadSender and provides sending capability specifically +// for use with MPEGTS packetization. It handles the construction of appropriately +// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { sender Sender buf []byte @@ -143,7 +148,7 @@ type mtsSender struct { curPid int } -// newmtsSender returns a new mtsSender. +// newMtsSender returns a new mtsSender. func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ sender: s, @@ -151,12 +156,13 @@ func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) } } -// load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. +// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and +// assigning to s.curPid. s.next if exists is also appended to the sender buf. func (s *mtsSender) load(c *ring.Chunk) error { - s.chunk = c if s.next != nil { s.buf = append(s.buf, s.next...) } + s.chunk = c bytes := s.chunk.Bytes() cpy := make([]byte, len(bytes)) copy(cpy, bytes) @@ -168,8 +174,8 @@ func (s *mtsSender) load(c *ring.Chunk) error { return nil } -// send checks the most recently loaded packet and if it is a PAT then the clip -// in s.buf is sent, otherwise the packet is added to s.buf. +// send checks the currently loaded paackets PID; if it is a PAT then what is in +// the mtsSenders buffer is fixed and sent. func (s *mtsSender) send() error { if s.curPid == mts.PatPid && len(s.buf) > 0 { err := s.fixAndSend() @@ -183,8 +189,9 @@ func (s *mtsSender) send() error { return nil } -// fixAndSend uses the discontinuity repairer to ensure there is not a -// discontinuity, and if so sets the discontinuity indicator of the PAT packet. +// fixAndSend checks for discontinuities in the senders buffer and then sends. +// If a discontinuity is found the PAT packet at the start of the clip has it's +// discontintuity indicator set to true. func (ms *mtsSender) fixAndSend() error { err := ms.repairer.Repair(ms.buf) if err != nil { @@ -199,8 +206,8 @@ func (ms *mtsSender) fixAndSend() error { func (s *mtsSender) close() error { return nil } -// release will set the s.fail flag to fals and clear the buffer if -// the previous send was a fail. +// release will set the s.fail flag to false and clear the buffer if +// the previous send was a fail. The currently loaded chunk is also closed. func (s *mtsSender) release() { if s.failed { s.buf = s.buf[:0] From b1b5ff2f497ea38c1efa047070298751f4f7b2e9 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 13:30:06 +1030 Subject: [PATCH 31/43] revid: fixed import stanzas in senders.go --- revid/senders.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/revid/senders.go b/revid/senders.go index 46a5ad24..5c38842f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -36,13 +36,14 @@ import ( "os/exec" "strconv" + "github.com/Comcast/gots/packet" + "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" - "github.com/Comcast/gots/packet" ) // Sender is intended to provided functionality for the sending of a byte slice From 88eac4900d8b7e1a9c2e4e3121d8f72170d92554 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 13:32:58 +1030 Subject: [PATCH 32/43] revid: discarding error from loadSender.send() in mtsSender_test.go TestSendFailDiscontinuity --- revid/mtsSender_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 5c42e72c..dc2ae822 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -232,8 +232,7 @@ func TestSendFailDiscontinuity(t *testing.T) { t.Fatalf("Unexpected err: %v\n", err) } - _ = loadSender.send() - + loadSender.send() loadSender.release() } } From 7070c4e4343694674e9b4942ff11f0e92c7167dd Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 13:35:22 +1030 Subject: [PATCH 33/43] revid: in TestSendFailDiscontinuity disconClipNo const => clipWithDiscontinuity --- revid/mtsSender_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index dc2ae822..d07bdcfe 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -205,8 +205,8 @@ func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer sender, loadSender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - const disconClipNo = 3 - tstSender := &sender{testDiscontinuities: true, discontinuityAt: disconClipNo} + const clipWithDiscontinuity = 3 + tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} loadSender := newMtsSender(tstSender, log) packer := tstPacker{rb: rb} encoder := mts.NewEncoder(&packer, 25) @@ -246,7 +246,7 @@ func TestSendFailDiscontinuity(t *testing.T) { t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } // Now check that the discontinuity indicator is set at the discontinuityClip PAT. - disconClip := result[disconClipNo] + disconClip := result[clipWithDiscontinuity] firstPkt := disconClip[:mts.PacketSize] var pkt [mts.PacketSize]byte copy(pkt[:], firstPkt) From ff4778945e3abb76c0bd412f85f471846f6423ea Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 14:48:26 +1030 Subject: [PATCH 34/43] revid: not doing redundant conversions --- revid/mtsSender_test.go | 15 +++++++-------- revid/senders.go | 5 ++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index d07bdcfe..81270191 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -128,7 +128,7 @@ func TestSegment(t *testing.T) { const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { // Insert a payload so that we check that the segmentation works correctly - // in this regard. Packet no will be used. + // in this regard. Packet number will be used. encoder.Encode([]byte{byte(i)}) rb.Flush() @@ -163,9 +163,9 @@ func TestSegment(t *testing.T) { // Also check that the first packet is a PAT. firstPkt := clip[:mts.PacketSize] - var pkt [mts.PacketSize]byte + var pkt packet.Packet copy(pkt[:], firstPkt) - pid := (*packet.Packet)(&pkt).PID() + pid := pkt.PID() if pid != mts.PatPid { t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) } @@ -174,10 +174,9 @@ func TestSegment(t *testing.T) { for i := 0; i < len(clip); i += mts.PacketSize { firstPkt := clip[i : i+mts.PacketSize] copy(pkt[:], firstPkt) - p := (*packet.Packet)(&pkt) - pid := p.PID() + pid := pkt.PID() if pid == mts.VideoPid { - payload, err := p.Payload() + payload, err := pkt.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } @@ -248,9 +247,9 @@ func TestSendFailDiscontinuity(t *testing.T) { // Now check that the discontinuity indicator is set at the discontinuityClip PAT. disconClip := result[clipWithDiscontinuity] firstPkt := disconClip[:mts.PacketSize] - var pkt [mts.PacketSize]byte + var pkt packet.Packet copy(pkt[:], firstPkt) - discon, err := (*packet.AdaptationField)((*packet.Packet)(&pkt)).Discontinuity() + discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() if err != nil { t.Fatalf("Unexpected err: %v\n", err) } diff --git a/revid/senders.go b/revid/senders.go index 5c38842f..c1c31b3f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -141,7 +141,7 @@ type mtsSender struct { sender Sender buf []byte next []byte - pkt [mts.PacketSize]byte + pkt packet.Packet failed bool discarded bool repairer *mts.DiscontinuityRepairer @@ -170,8 +170,7 @@ func (s *mtsSender) load(c *ring.Chunk) error { s.next = cpy copy(s.pkt[:], cpy) - p := (*packet.Packet)(&s.pkt) - s.curPid = p.PID() + s.curPid = s.pkt.PID() return nil } From 2570cb6ecb7abe53093bc1b3d29ec3ecb8b5fa0a Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 15:14:01 +1030 Subject: [PATCH 35/43] revid: simplified some mtsSender logic --- revid/senders.go | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index c1c31b3f..f4098c56 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -165,26 +165,21 @@ func (s *mtsSender) load(c *ring.Chunk) error { } s.chunk = c bytes := s.chunk.Bytes() - cpy := make([]byte, len(bytes)) - copy(cpy, bytes) - s.next = cpy - - copy(s.pkt[:], cpy) + s.next = bytes + copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() return nil } // send checks the currently loaded paackets PID; if it is a PAT then what is in // the mtsSenders buffer is fixed and sent. -func (s *mtsSender) send() error { - if s.curPid == mts.PatPid && len(s.buf) > 0 { - err := s.fixAndSend() +func (ms *mtsSender) send() error { + if ms.curPid == mts.PatPid && len(ms.buf) > 0 { + err := ms.fixAndSend() if err != nil { - s.failed = true - s.repairer.Failed() return err } - s.buf = s.buf[:0] + ms.buf = ms.buf[:0] } return nil } @@ -194,14 +189,15 @@ func (s *mtsSender) send() error { // discontintuity indicator set to true. func (ms *mtsSender) fixAndSend() error { err := ms.repairer.Repair(ms.buf) - if err != nil { - return err + if err == nil { + err = ms.sender.send(ms.buf) + if err == nil { + return nil + } } - err = ms.sender.send(ms.buf) - if err != nil { - return err - } - return nil + ms.failed = true + ms.repairer.Failed() + return err } func (s *mtsSender) close() error { return nil } From ef5e43dac7e4c7db16f62e9c8a91d27938643864 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 15:15:44 +1030 Subject: [PATCH 36/43] revid: improved code layout in mtsSender_test.go --- revid/mtsSender_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 81270191..f600a664 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -114,6 +114,7 @@ func (p *tstPacker) Write(d []byte) (int, error) { // based on positioning of PSI in the mtsEncoder's output stream. func TestSegment(t *testing.T) { mts.Meta = meta.New() + // Create ringBuffer, sender, loadsender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &sender{} @@ -155,6 +156,7 @@ func TestSegment(t *testing.T) { expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) + // Check that the clip is of expected length. clipLen := len(clip) if clipLen != psiSendCount*mts.PacketSize { @@ -202,6 +204,7 @@ func TestSegment(t *testing.T) { func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() + // Create ringBuffer sender, loadSender and the MPEGTS encoder. rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const clipWithDiscontinuity = 3 @@ -244,6 +247,7 @@ func TestSendFailDiscontinuity(t *testing.T) { if gotLen != expectedLen { t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) } + // Now check that the discontinuity indicator is set at the discontinuityClip PAT. disconClip := result[clipWithDiscontinuity] firstPkt := disconClip[:mts.PacketSize] From 8452d8f3d1fd57f1f950e89a300aa386b557b8b2 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 15:28:40 +1030 Subject: [PATCH 37/43] revid: cleaned up code structure in mtsSender_test.go --- revid/mtsSender_test.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index f600a664..eebb957a 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -116,11 +116,10 @@ func TestSegment(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer, sender, loadsender and the MPEGTS encoder. - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) tstSender := &sender{} loadSender := newMtsSender(tstSender, log) - packer := &tstPacker{rb: rb} - encoder := mts.NewEncoder(packer, 25) + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + encoder := mts.NewEncoder(&tstPacker{rb: rb}, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 @@ -174,10 +173,8 @@ func TestSegment(t *testing.T) { // Check that the clip data is okay. for i := 0; i < len(clip); i += mts.PacketSize { - firstPkt := clip[i : i+mts.PacketSize] - copy(pkt[:], firstPkt) - pid := pkt.PID() - if pid == mts.VideoPid { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { payload, err := pkt.Payload() if err != nil { t.Fatalf("Unexpected err: %v\n", err) @@ -206,12 +203,11 @@ func TestSendFailDiscontinuity(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer sender, loadSender and the MPEGTS encoder. - rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) const clipWithDiscontinuity = 3 tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} loadSender := newMtsSender(tstSender, log) - packer := tstPacker{rb: rb} - encoder := mts.NewEncoder(&packer, 25) + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + encoder := mts.NewEncoder(&tstPacker{rb: rb}, 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 From 7619f5b921bb8d7d3173ace38b07c2dead3bede6 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 1 Mar 2019 15:35:54 +1030 Subject: [PATCH 38/43] revid: mtsSender_test.go corrected comment --- revid/mtsSender_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index eebb957a..1648008b 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -215,7 +215,7 @@ func TestSendFailDiscontinuity(t *testing.T) { const noOfPacketsToWrite = 100 for i := 0; i < noOfPacketsToWrite; i++ { - // Our payload will just be packet no. + // Our payload will just be packet number. encoder.Encode([]byte{byte(i)}) rb.Flush() From 5dd3045db271886334b26e00915ccdf8a4ff9a3b Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 2 Mar 2019 13:08:48 +1030 Subject: [PATCH 39/43] revid: checking that we have more than 0 destinations before we write to the ring buffer --- revid/revid.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/revid/revid.go b/revid/revid.go index 522c72a1..1a1732a5 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -142,6 +142,10 @@ func (p *packer) Write(frame []byte) (int, error) { return len(frame), nil } + if len(p.owner.destination) == 0 { + panic("must have at least 1 destination") + } + n, err := p.owner.buffer.Write(frame) if err != nil { if err == ring.ErrDropped { From 7c6719ab5afba4ba9daaf1f5053840096a33d368 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 2 Mar 2019 13:15:18 +1030 Subject: [PATCH 40/43] stream/mts: using packet.Packet straight away and not doing unnecessary conversions --- stream/mts/discontinuity.go | 11 +++++------ stream/mts/mpegts.go | 8 ++++---- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index 68905bd4..adccebad 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -60,18 +60,17 @@ func (dr *DiscontinuityRepairer) Failed() { // be a PAT, contains a cc that is expected, otherwise the discontinuity indicator // is set to true. func (dr *DiscontinuityRepairer) Repair(d []byte) error { - var pkt [PacketSize]byte + var pkt packet.Packet copy(pkt[:], d[:PacketSize]) - p := (*packet.Packet)(&pkt) - pid := p.PID() + pid := pkt.PID() if pid != PatPid { panic("Clip to repair must have PAT first") } - cc := p.ContinuityCounter() + cc := pkt.ContinuityCounter() expect, _ := dr.ExpectedCC(pid) if cc != int(expect) { - if packet.ContainsAdaptationField(p) { - (*packet.AdaptationField)(p).SetDiscontinuity(true) + if packet.ContainsAdaptationField(&pkt) { + (*packet.AdaptationField)(&pkt).SetDiscontinuity(true) } else { err := addAdaptationField(&pkt, DiscontinuityIndicator(true)) if err != nil { diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index ea36c474..a3908659 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -263,11 +263,11 @@ func (p *Packet) Bytes(buf []byte) []byte { return buf } -type Option func(p *[PacketSize]byte) +type Option func(p *packet.Packet) // addAdaptationField adds an adaptation field to p, and applys the passed options to this field. // TODO: this will probably break if we already have adaptation field. -func addAdaptationField(p *[PacketSize]byte, options ...Option) error { +func addAdaptationField(p *packet.Packet, options ...Option) error { if packet.ContainsAdaptationField((*packet.Packet)(p)) { return errors.New("Adaptation field is already present in packet") } @@ -290,7 +290,7 @@ func addAdaptationField(p *[PacketSize]byte, options ...Option) error { // resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field // exists, otherwise an error is returned. -func resetAdaptation(p *[PacketSize]byte) error { +func resetAdaptation(p *packet.Packet) error { if !packet.ContainsAdaptationField((*packet.Packet)(p)) { return errors.New("No adaptation field in this packet") } @@ -302,7 +302,7 @@ func resetAdaptation(p *[PacketSize]byte) error { // DiscontinuityIndicator returns and Option that will set p's discontinuity // indicator according to f. func DiscontinuityIndicator(f bool) Option { - return func(p *[PacketSize]byte) { + return func(p *packet.Packet) { set := byte(DiscontinuityIndicatorMask) if !f { set = 0x00 From 9be2f95befa8db2795a25584a9348e2e448e06c6 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 2 Mar 2019 13:21:46 +1030 Subject: [PATCH 41/43] stream/mts: improved and added commenting --- stream/mts/encoder.go | 4 ++++ stream/mts/mpegts.go | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 74df4746..b1e098a4 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -168,6 +168,10 @@ const ( hasPTS = 0x2 ) +// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if +// PSI is written based on some time duration, or based on a packet count. +// If b is true, then time based PSI is used, otherwise the PSI is written +// every sendCount. func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { e.timeBasedPsi = b e.psiSendCount = sendCount diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index a3908659..18fe8b5f 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -62,9 +62,9 @@ const ( AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. DefaultAdaptationSize = 2 // Default size of the adaptation field. AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. - DefaultAdaptationBodySize = 1 - DiscontinuityIndicatorMask = 0x80 - DiscontinuityIndicatorIdx = AdaptationIdx + 1 + DefaultAdaptationBodySize = 1 // Default size of the adaptation field body. + DiscontinuityIndicatorMask = 0x80 // Mask for the discontinuity indicator at the discontinuity indicator idk. + DiscontinuityIndicatorIdx = AdaptationIdx + 1 // The index at which the discontinuity indicator is found in an MTS packet. ) // TODO: make this better - currently doesn't make sense. From 246f4f33be0ddf42c54068c3521ee7865eb02dac Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 2 Mar 2019 20:37:04 +1030 Subject: [PATCH 42/43] revid/mtsSender_test.go: created type buffer based on ring.Buffer that may be written to to perform a ringBuffer write and flush --- revid/mtsSender_test.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 1648008b..63b1758b 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -97,16 +97,15 @@ func log(lvl int8, msg string, args ...interface{}) { fmt.Printf(msg, args) } -// tstPacker implements io.Writer and handles the writing of data to the -// ringBuffer used in tests. -type tstPacker struct { - rb *ring.Buffer -} +// buffer implements io.Writer and handles the writing of data to a +// ring buffer used in tests. +type buffer ring.Buffer -// Write writes to tstPacker's ringBuffer. -func (p *tstPacker) Write(d []byte) (int, error) { - n, err := p.rb.Write(d) - p.rb.Flush() +// Write writes to b. +func (b *buffer) Write(d []byte) (int, error) { + r := (*ring.Buffer)(b) + n, err := r.Write(d) + r.Flush() return n, err } @@ -119,7 +118,7 @@ func TestSegment(t *testing.T) { tstSender := &sender{} loadSender := newMtsSender(tstSender, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder(&tstPacker{rb: rb}, 25) + encoder := mts.NewEncoder((*buffer)(rb), 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 @@ -207,7 +206,7 @@ func TestSendFailDiscontinuity(t *testing.T) { tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} loadSender := newMtsSender(tstSender, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder(&tstPacker{rb: rb}, 25) + encoder := mts.NewEncoder((*buffer)(rb), 25) // Turn time based PSI writing off for encoder. const psiSendCount = 10 From bd3f89978e6d4b0d4961d2f6905f93818edf255b Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 2 Mar 2019 20:41:58 +1030 Subject: [PATCH 43/43] revid/mtsSender_test.go: updated comment for buffer.Write() --- revid/mtsSender_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go index 63b1758b..ad4a18ba 100644 --- a/revid/mtsSender_test.go +++ b/revid/mtsSender_test.go @@ -101,7 +101,7 @@ func log(lvl int8, msg string, args ...interface{}) { // ring buffer used in tests. type buffer ring.Buffer -// Write writes to b. +// Write implements the io.Writer interface. func (b *buffer) Write(d []byte) (int, error) { r := (*ring.Buffer)(b) n, err := r.Write(d)