revid & stream/mts: senders now handle clip duration and cc fixing for mts.

Removed rtpSender as we now put in loadSender slice. Removed packer write method as no longer required to do clip duration logic. For initialisation of encoders, they are now no longer writing
to packer, but now straight to the ring buffer. Wrote ausOceanSender which will deal with clip logic and cc fixing for mts. Added some functionality to mts/mpegts.go that allows adding
of adaptation fields to mts packets.
This commit is contained in:
saxon 2019-02-15 12:01:07 +10:30
parent bfdefa97f8
commit 7d03830a4e
3 changed files with 148 additions and 73 deletions

View File

@ -112,10 +112,6 @@ type Revid struct {
// destination is the target endpoint.
destination []loadSender
// rtpSender is an unbuffered sender.
// It is used to isolate RTP from ring buffer-induced delays.
rtpSender *rtpSender
// bitrate hold the last send bitrate calculation result.
bitrate int
@ -135,54 +131,6 @@ type packer struct {
packetCount uint
}
// Write implements the io.Writer interface.
//
// Unless the ring buffer returns an error, all writes
// are deemed to be successful, although a successful
// write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil
}
if len(p.owner.destination) != 0 {
n, err := p.owner.buffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil
}
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
}
}
// If we have an rtp sender bypass ringbuffer and give straight to sender
if p.owner.rtpSender != nil {
err := p.owner.rtpSender.send(frame)
if err != nil {
p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error())
}
}
p.packetCount++
var hasRtmp bool
for _, d := range p.owner.config.Outputs {
if d == Rtmp {
hasRtmp = true
break
}
}
now := time.Now()
if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
p.owner.buffer.Flush()
p.packetCount = 0
p.lastTime = now
}
return len(frame), nil
}
// New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) {
@ -239,7 +187,7 @@ func (r *Revid) reset(config Config) error {
}
}
r.destination = r.destination[:0]
r.destination = make([]loadSender, len(r.config.Outputs))
for _, typ := range r.config.Outputs {
switch typ {
case File:
@ -269,10 +217,11 @@ func (r *Revid) reset(config Config) error {
}
r.destination = append(r.destination, s)
case Rtp:
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil {
return err
}
r.destination = append(r.destination, s)
}
}
@ -309,13 +258,13 @@ func (r *Revid) reset(config Config) error {
}
}
}
r.encoder = stream.NopEncoder(&r.packer)
r.encoder = stream.NopEncoder(r.buffer)
case Mpegts:
r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation")
r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate))
r.encoder = mts.NewEncoder(r.buffer, float64(r.config.FrameRate))
case Flv:
r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
r.encoder, err = flv.NewEncoder(r.buffer, true, true, int(r.config.FrameRate))
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error())
}

View File

@ -29,7 +29,6 @@ LICENSE
package revid
import (
"errors"
"fmt"
"io"
"net"
@ -43,6 +42,7 @@ import (
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"github.com/Comcast/gots/packet"
)
// loadSender is a destination to send a *ring.Chunk to.
@ -105,6 +105,62 @@ func (s *fileSender) close() error {
return s.file.Close()
}
type ausOceanSender struct {
hs *httpSender
buf []byte
pkt [mts.PacketSize]byte
sendFailed bool
repairer *mts.DiscontinuityRepairer
}
func newAusOceanSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *ausOceanSender {
return &ausOceanSender{
hs: newHttpSender(ns, log),
repairer: mts.NewDiscontinuityRepairer(),
}
}
func (s *ausOceanSender) load(c *ring.Chunk) error {
copy(s.pkt[:], c.Bytes())
return nil
}
func (s *ausOceanSender) send() error {
if s.sendFailed || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) {
err := s.fixAndSend()
if err != nil {
s.failed()
return err
}
s.sendFailed = false
s.buf = s.buf[:0]
}
s.buf = append(s.buf, s.pkt[:]...)
return nil
}
func (s *ausOceanSender) failed() {
s.sendFailed = true
s.repairer.Failed()
}
func (s *ausOceanSender) fixAndSend() error {
err := s.repairer.Repair(s.buf)
if err != nil {
return err
}
return s.hs.httpSend(s.buf)
}
func (s *ausOceanSender) close() error { return nil }
func (s *ausOceanSender) release() {
if s.sendFailed {
s.sendFailed = false
s.buf = s.buf[:0]
}
}
// httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct {
client *netsender.Sender
@ -133,6 +189,10 @@ func (s *httpSender) send() error {
// if the chunk has been cleared.
return nil
}
return s.httpSend(s.chunk.Bytes())
}
func (s *httpSender) httpSend(d []byte) error {
// Only send if "V0" is configured as an input.
send := false
ip := s.client.Param("ip")
@ -140,8 +200,8 @@ func (s *httpSender) send() error {
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = s.chunk.Len()
pins[i].Data = s.chunk.Bytes()
pins[i].Value = len(d)
pins[i].Data = d
pins[i].MimeType = "video/mp2t"
break
}
@ -156,7 +216,6 @@ func (s *httpSender) send() error {
if err != nil {
return err
}
return s.extractMeta(reply)
}
@ -370,6 +429,7 @@ func (s *udpSender) close() error { return nil }
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder
chunk *ring.Chunk
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
@ -384,12 +444,16 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
return s, nil
}
func (s *rtpSender) send(d []byte) error {
var err error
if d != nil {
_, err = s.encoder.Write(d)
} else {
err = errors.New("no data to send provided")
func (s *rtpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtpSender) close() error { return nil }
func (s *rtpSender) release() {}
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)
return err
}

View File

@ -30,6 +30,8 @@ package mts
import (
"errors"
"github.com/Comcast/gots/packet"
)
// General mpegts packet properties.
@ -59,6 +61,9 @@ const (
AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields.
DefaultAdaptationSize = 2 // Default size of the adaptation field.
AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3.
DefaultAdaptationBodySize = 1
DiscontinuityIndicatorMask = 0x80
DiscontinuityIndicatorIdx = AdaptationIdx + 1
)
// TODO: make this better - currently doesn't make sense.
@ -244,3 +249,60 @@ func (p *Packet) Bytes(buf []byte) []byte {
buf = append(buf, p.Payload...)
return buf
}
type Option func(p *[PacketSize]byte)
// addAdaptationField adds an adaptation field to p, and applys the passed options to this field.
// TODO: this will probably break if we already have adaptation field.
func addAdaptationField(p *[PacketSize]byte, options ...Option) error {
b, err := packet.ContainsAdaptationField((*packet.Packet)(p))
if err != nil {
return err
}
if b {
return errors.New("Adaptation field is already present in packet")
}
// Create space for adaptation field.
copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize])
// TODO: seperate into own function
// Update adaptation field control.
p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask
p[AdaptationControlIdx] |= AdaptationControlMask
// Default the adaptationfield.
resetAdaptation(p)
// Apply and options that have bee passed.
for _, option := range options {
option(p)
}
return nil
}
// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field
// exists, otherwise an error is returned.
func resetAdaptation(p *[PacketSize]byte) error {
b, err := packet.ContainsAdaptationField((*packet.Packet)(p))
if err != nil {
return err
}
if !b {
return errors.New("No adaptation field in this packet")
}
p[AdaptationIdx] = DefaultAdaptationBodySize
p[AdaptationIdx+1] = 0x00
return nil
}
// DiscontinuityIndicator returns and Option that will set p's discontinuity
// indicator according to f.
func DiscontinuityIndicator(f bool) Option {
return func(p *[PacketSize]byte) {
set := byte(DiscontinuityIndicatorMask)
if !f {
set = 0x00
}
p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask
p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set
}
}