stream/mts: wrote sender interface and a minimalHttpSender type

This commit is contained in:
saxon 2019-02-16 14:26:51 +10:30
parent 34daa45b46
commit 361f5edbc9
3 changed files with 40 additions and 23 deletions

View File

@ -211,7 +211,7 @@ func (r *Revid) reset(config Config) error {
case Http:
switch r.Config().Packetization {
case Mpegts:
r.destination = append(r.destination, newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil))
r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil))
default:
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
}

View File

@ -45,6 +45,28 @@ import (
"github.com/Comcast/gots/packet"
)
type sender interface {
send(d []byte) error
}
// httpSender implements loadSender for posting HTTP to NetReceiver
type minimalHttpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
}
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
return &minimalHttpSender{
client: ns,
log: log,
}
}
func (s *minimalHttpSender) send(d []byte) error {
return httpSend(d, s.client, s.log)
}
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
@ -110,7 +132,7 @@ func (s *fileSender) close() error {
// clips based on PSI. It also fixes accounts for discontinuities by setting
// the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
ls loadSender
sender sender
buf []byte
pkt [mts.PacketSize]byte
fail bool
@ -120,9 +142,9 @@ type mtsSender struct {
}
// newmtsSender returns a new mtsSender.
func newMtsSender(s loadSender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
func newMtsSender(s sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
ls: s,
sender: s,
repairer: mts.NewDiscontinuityRepairer(),
}
}
@ -178,20 +200,15 @@ func (s *mtsSender) failed() {
// fixAndSend uses the discontinuity repairer to ensure there is not a
// discontinuity, and if so sets the discontinuity indicator of the PAT packet.
func (s *mtsSender) fixAndSend() error {
err := s.repairer.Repair(s.buf)
func (ms *mtsSender) fixAndSend() error {
err := ms.repairer.Repair(ms.buf)
if err != nil {
return err
}
err = s.ls.load(ring.NewChunk(s.buf))
err = ms.sender.send(ms.buf)
if err != nil {
return err
}
err = s.ls.send()
if err != nil {
return err
}
s.ls.release()
return nil
}
@ -234,13 +251,13 @@ func (s *httpSender) send() error {
// if the chunk has been cleared.
return nil
}
return s.httpSend(s.chunk.Bytes())
return httpSend(s.chunk.Bytes(), s.client, s.log)
}
func (s *httpSender) httpSend(d []byte) error {
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input.
send := false
ip := s.client.Param("ip")
ip := client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
@ -257,16 +274,16 @@ func (s *httpSender) httpSend(d []byte) error {
}
var err error
var reply string
reply, _, err = s.client.Send(netsender.RequestRecv, pins)
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
return s.extractMeta(reply)
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func (s *httpSender) extractMeta(r string) error {
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
@ -274,18 +291,18 @@ func (s *httpSender) extractMeta(r string) error {
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
s.log(logger.Warning, pkg+"No timestamp in reply")
log(logger.Warning, pkg+"No timestamp in reply")
} else {
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
s.log(logger.Warning, pkg+"No location in reply")
log(logger.Warning, pkg+"No location in reply")
} else {
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g)
}

View File

@ -59,7 +59,7 @@ func (dr *DiscontinuityRepairer) Repair(d []byte) error {
panic("Clip to repair must have PAT first")
}
cc := p.ContinuityCounter()
expect, exists := dr.expectedCC()
expect, exists := dr.expectedCC(3)
dr.incExpectedCC()
if !exists {
dr.setExpectedCC(uint8(cc))