diff --git a/revid/revid.go b/revid/revid.go index 190d2c59..2016c53f 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -211,7 +211,7 @@ func (r *Revid) reset(config Config) error { case Http: switch r.Config().Packetization { case Mpegts: - r.destination = append(r.destination, newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil)) + r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)) default: r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) } diff --git a/revid/senders.go b/revid/senders.go index 6018ad97..8e102096 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,6 +45,28 @@ import ( "github.com/Comcast/gots/packet" ) +type sender interface { + send(d []byte) error +} + +// httpSender implements loadSender for posting HTTP to NetReceiver +type minimalHttpSender struct { + client *netsender.Sender + + log func(lvl int8, msg string, args ...interface{}) +} + +func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { + return &minimalHttpSender{ + client: ns, + log: log, + } +} + +func (s *minimalHttpSender) send(d []byte) error { + return httpSend(d, s.client, s.log) +} + // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. @@ -110,7 +132,7 @@ func (s *fileSender) close() error { // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. type mtsSender struct { - ls loadSender + sender sender buf []byte pkt [mts.PacketSize]byte fail bool @@ -120,9 +142,9 @@ type mtsSender struct { } // newmtsSender returns a new mtsSender. -func newMtsSender(s loadSender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { +func newMtsSender(s sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ - ls: s, + sender: s, repairer: mts.NewDiscontinuityRepairer(), } } @@ -178,20 +200,15 @@ func (s *mtsSender) failed() { // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. -func (s *mtsSender) fixAndSend() error { - err := s.repairer.Repair(s.buf) +func (ms *mtsSender) fixAndSend() error { + err := ms.repairer.Repair(ms.buf) if err != nil { return err } - err = s.ls.load(ring.NewChunk(s.buf)) + err = ms.sender.send(ms.buf) if err != nil { return err } - err = s.ls.send() - if err != nil { - return err - } - s.ls.release() return nil } @@ -234,13 +251,13 @@ func (s *httpSender) send() error { // if the chunk has been cleared. return nil } - return s.httpSend(s.chunk.Bytes()) + return httpSend(s.chunk.Bytes(), s.client, s.log) } -func (s *httpSender) httpSend(d []byte) error { +func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false - ip := s.client.Param("ip") + ip := client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { @@ -257,16 +274,16 @@ func (s *httpSender) httpSend(d []byte) error { } var err error var reply string - reply, _, err = s.client.Send(netsender.RequestRecv, pins) + reply, _, err = client.Send(netsender.RequestRecv, pins) if err != nil { return err } - return s.extractMeta(reply) + return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. -func (s *httpSender) extractMeta(r string) error { +func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil @@ -274,18 +291,18 @@ func (s *httpSender) extractMeta(r string) error { // Extract time from reply t, err := dec.Int("ts") if err != nil { - s.log(logger.Warning, pkg+"No timestamp in reply") + log(logger.Warning, pkg+"No timestamp in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) + log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply g, err := dec.String("ll") if err != nil { - s.log(logger.Warning, pkg+"No location in reply") + log(logger.Warning, pkg+"No location in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) + log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go index ff122032..d1c6165a 100644 --- a/stream/mts/discontinuity.go +++ b/stream/mts/discontinuity.go @@ -59,7 +59,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error { panic("Clip to repair must have PAT first") } cc := p.ContinuityCounter() - expect, exists := dr.expectedCC() + expect, exists := dr.expectedCC(3) dr.incExpectedCC() if !exists { dr.setExpectedCC(uint8(cc))