stream/mts: made mtsSender more general

The mtsSender can now use any loadSender to send fixed and segmented mts data to.
This commit is contained in:
saxon 2019-02-15 23:35:45 +10:30
parent 31d36577b1
commit b6bf41b17d
2 changed files with 31 additions and 20 deletions

View File

@ -211,7 +211,7 @@ func (r *Revid) reset(config Config) error {
case Http:
switch r.Config().Packetization {
case Mpegts:
r.destination = append(r.destination, newMtsSender(r.ns, r.config.Logger.Log))
r.destination = append(r.destination, newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil))
default:
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
}

View File

@ -105,72 +105,83 @@ func (s *fileSender) close() error {
return s.file.Close()
}
// mtsSender provides http sending capability specifically for use with
// mtsSender provides sending capability specifically for use with
// mpegts packetization. It handles the construction of appropriately lengthed
// clips based on PSI. It also fixes accounts for discontinuities by setting
// the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
hs *httpSender
buf []byte
pkt [mts.PacketSize]byte
sendFailed bool
repairer *mts.DiscontinuityRepairer
ls loadSender
buf []byte
pkt [mts.PacketSize]byte
fail bool
repairer *mts.DiscontinuityRepairer
chunk *ring.Chunk
}
// newmtsSender returns a new mtsSender.
func newMtsSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
func newMtsSender(s loadSender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
hs: newHttpSender(ns, log),
ls: s,
repairer: mts.NewDiscontinuityRepairer(),
}
}
// load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender.
func (s *mtsSender) load(c *ring.Chunk) error {
copy(s.pkt[:], c.Bytes())
s.chunk = c
return nil
}
// send checks the most recently loaded packet and if it is a PAT then the clip
// in s.buf is sent, otherwise the packet is added to s.buf.
func (s *mtsSender) send() error {
if s.sendFailed || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) {
copy(s.pkt[:], s.chunk.Bytes())
if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) {
err := s.fixAndSend()
if err != nil {
s.failed()
return err
}
s.sendFailed = false
s.fail = false
s.buf = s.buf[:0]
}
s.buf = append(s.buf, s.pkt[:]...)
s.buf = append(s.buf, s.chunk.Bytes()...)
return nil
}
// failed sets the s.sendFailed flag to true, and let's the discontinuity
// failed sets the s.fail flag to true, and let's the discontinuity
// repairer know that there has been a failed send.
func (s *mtsSender) failed() {
s.sendFailed = true
s.fail = true
s.repairer.Failed()
}
// fixAndSend uses the discontinuity repairer to ensure there is not a
// discontinuity, and if so sets the discontinuity indicator of the PAT packet.
func (s *mtsSender) fixAndSend() error {
err := s.repairer.Repair(s.buf)
err := s.repairer.Repair(s.chunk.Bytes())
if err != nil {
return err
}
return s.hs.httpSend(s.buf)
err = s.ls.load(s.chunk)
if err != nil {
return err
}
err = s.ls.send()
if err != nil {
return err
}
s.ls.release()
return nil
}
func (s *mtsSender) close() error { return nil }
// release will set the s.sendFailed flag to fals and clear the buffer if
// release will set the s.fail flag to fals and clear the buffer if
// the previous send was a fail.
func (s *mtsSender) release() {
if s.sendFailed {
s.sendFailed = false
if s.fail {
s.fail = false
s.buf = s.buf[:0]
}
}