mirror of https://bitbucket.org/ausocean/av.git
revid: Buffered MtsSender
The mtsSender now has a ringBuffer and tests have been updated accordingly. The mtsSender now uses an output routine to get data from it's ringBuffer to send. Revid now uses ioext.multiWriteClosers for encoders to write to so that senders can be closed and therefore any output routines.
This commit is contained in:
parent
2d15e98445
commit
5ecf06e093
|
@ -44,6 +44,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/flv"
|
"bitbucket.org/ausocean/av/container/flv"
|
||||||
"bitbucket.org/ausocean/av/container/mts"
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||||
|
"bitbucket.org/ausocean/utils/ioext"
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
"bitbucket.org/ausocean/utils/ring"
|
"bitbucket.org/ausocean/utils/ring"
|
||||||
)
|
)
|
||||||
|
@ -111,6 +112,8 @@ type Revid struct {
|
||||||
// encoder holds the required encoders, which then write to destinations.
|
// encoder holds the required encoders, which then write to destinations.
|
||||||
encoder []io.Writer
|
encoder []io.Writer
|
||||||
|
|
||||||
|
writeClosers []io.WriteCloser
|
||||||
|
|
||||||
// bitrate hold the last send bitrate calculation result.
|
// bitrate hold the last send bitrate calculation result.
|
||||||
bitrate int
|
bitrate int
|
||||||
|
|
||||||
|
@ -178,42 +181,42 @@ func (r *Revid) setConfig(config Config) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupPipeline constructs a data pipeline.
|
// setupPipeline constructs a data pipeline.
|
||||||
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error {
|
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
|
||||||
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout))
|
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout))
|
||||||
|
|
||||||
r.encoder = r.encoder[:0]
|
r.encoder = r.encoder[:0]
|
||||||
|
|
||||||
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
|
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
|
||||||
// will hold senders that require FLV encoding.
|
// will hold senders that require FLV encoding.
|
||||||
var mtsSenders, flvSenders []io.Writer
|
var mtsSenders, flvSenders []io.WriteCloser
|
||||||
|
|
||||||
// We will go through our outputs and create the corresponding senders to add
|
// We will go through our outputs and create the corresponding senders to add
|
||||||
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
|
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
|
||||||
// output requires FLV encoding.
|
// output requires FLV encoding.
|
||||||
var w io.Writer
|
var wc io.WriteCloser
|
||||||
for _, out := range r.config.Outputs {
|
for _, out := range r.config.Outputs {
|
||||||
switch out {
|
switch out {
|
||||||
case Http:
|
case Http:
|
||||||
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil)
|
wc = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout)
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, wc)
|
||||||
case Rtp:
|
case Rtp:
|
||||||
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
wc, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error())
|
r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error())
|
||||||
}
|
}
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, wc)
|
||||||
case File:
|
case File:
|
||||||
w, err := newFileSender(r.config.OutputPath)
|
wc, err := newFileSender(r.config.OutputPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, wc)
|
||||||
case Rtmp:
|
case Rtmp:
|
||||||
w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
|
wc, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
|
r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
|
||||||
}
|
}
|
||||||
flvSenders = append(flvSenders, w)
|
flvSenders = append(flvSenders, wc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,7 +282,7 @@ func (r *Revid) reset(config Config) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter)
|
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -338,15 +341,21 @@ func (r *Revid) Stop() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, wc := range r.writeClosers {
|
||||||
|
err := wc.Close()
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
||||||
r.setIsRunning(false)
|
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
||||||
// If a cmd process is running, we kill!
|
// If a cmd process is running, we kill!
|
||||||
if r.cmd != nil && r.cmd.Process != nil {
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
r.cmd.Process.Kill()
|
r.cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
r.wg.Wait()
|
r.wg.Wait()
|
||||||
|
r.setIsRunning(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Revid) Update(vars map[string]string) error {
|
func (r *Revid) Update(vars map[string]string) error {
|
||||||
|
|
101
revid/senders.go
101
revid/senders.go
|
@ -35,6 +35,8 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Comcast/gots/packet"
|
"github.com/Comcast/gots/packet"
|
||||||
|
|
||||||
|
@ -43,6 +45,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/protocol/rtp"
|
"bitbucket.org/ausocean/av/protocol/rtp"
|
||||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
|
"bitbucket.org/ausocean/utils/ring"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Log is used by the multiSender.
|
// Log is used by the multiSender.
|
||||||
|
@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) {
|
||||||
return len(d), httpSend(d, s.client, s.log)
|
return len(d), httpSend(d, s.client, s.log)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *httpSender) Close() error { return nil }
|
||||||
|
|
||||||
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
|
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
|
||||||
// Only send if "V0" is configured as an input.
|
// Only send if "V0" is configured as an input.
|
||||||
send := false
|
send := false
|
||||||
|
@ -129,7 +134,7 @@ type fileSender struct {
|
||||||
data []byte
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFileSender(path string) (io.Writer, error) {
|
func newFileSender(path string) (*fileSender, error) {
|
||||||
f, err := os.Create(path)
|
f, err := os.Create(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -142,27 +147,37 @@ func (s *fileSender) Write(d []byte) (int, error) {
|
||||||
return s.file.Write(d)
|
return s.file.Write(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fileSender) close() error { return s.file.Close() }
|
func (s *fileSender) Close() error { return s.file.Close() }
|
||||||
|
|
||||||
// mtsSender implements loadSender and provides sending capability specifically
|
// mtsSender implements io.WriteCloser and provides sending capability specifically
|
||||||
// for use with MPEGTS packetization. It handles the construction of appropriately
|
// for use with MPEGTS packetization. It handles the construction of appropriately
|
||||||
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
|
// lengthed clips based on PSI. It also accounts for discontinuities by
|
||||||
// setting the discontinuity indicator for the first packet of a clip.
|
// setting the discontinuity indicator for the first packet of a clip.
|
||||||
type mtsSender struct {
|
type mtsSender struct {
|
||||||
dst io.Writer
|
dst io.WriteCloser
|
||||||
buf []byte
|
buf []byte
|
||||||
|
ringBuf *ring.Buffer
|
||||||
next []byte
|
next []byte
|
||||||
pkt packet.Packet
|
pkt packet.Packet
|
||||||
repairer *mts.DiscontinuityRepairer
|
repairer *mts.DiscontinuityRepairer
|
||||||
curPid int
|
curPid int
|
||||||
|
quit chan struct{}
|
||||||
|
log func(lvl int8, msg string, args ...interface{})
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMtsSender returns a new mtsSender.
|
// newMtsSender returns a new mtsSender.
|
||||||
func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
|
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender {
|
||||||
return &mtsSender{
|
s := &mtsSender{
|
||||||
dst: dst,
|
dst: dst,
|
||||||
repairer: mts.NewDiscontinuityRepairer(),
|
repairer: mts.NewDiscontinuityRepairer(),
|
||||||
|
log: log,
|
||||||
|
ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout),
|
||||||
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
s.wg.Add(1)
|
||||||
|
go s.output()
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements io.Writer.
|
// Write implements io.Writer.
|
||||||
|
@ -176,20 +191,70 @@ func (s *mtsSender) Write(d []byte) (int, error) {
|
||||||
copy(s.pkt[:], bytes)
|
copy(s.pkt[:], bytes)
|
||||||
s.curPid = s.pkt.PID()
|
s.curPid = s.pkt.PID()
|
||||||
if s.curPid == mts.PatPid && len(s.buf) > 0 {
|
if s.curPid == mts.PatPid && len(s.buf) > 0 {
|
||||||
err := s.repairer.Repair(s.buf)
|
_, err := s.ringBuf.Write(s.buf)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
_, err = s.dst.Write(s.buf)
|
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
|
||||||
if err == nil {
|
|
||||||
goto done
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
s.repairer.Failed()
|
s.ringBuf.Flush()
|
||||||
done:
|
|
||||||
s.buf = s.buf[:0]
|
s.buf = s.buf[:0]
|
||||||
}
|
}
|
||||||
return len(d), nil
|
return len(d), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close implements io.Closer.
|
||||||
|
func (s *mtsSender) Close() error {
|
||||||
|
close(s.quit)
|
||||||
|
s.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// output is a routine start at creation of the mtsSender. It will get data
|
||||||
|
// from the mtsSenders ringBuffer and attempt to send.
|
||||||
|
func (s *mtsSender) output() {
|
||||||
|
var chunk *ring.Chunk
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.quit:
|
||||||
|
s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine")
|
||||||
|
defer s.wg.Done()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// If chunk is nil then we're ready to get another from the ringBuffer.
|
||||||
|
if chunk == nil {
|
||||||
|
var err error
|
||||||
|
chunk, err = s.ringBuf.Next(readTimeout)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
case ring.ErrTimeout:
|
||||||
|
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
|
||||||
|
fallthrough
|
||||||
|
case io.EOF:
|
||||||
|
goto loop
|
||||||
|
}
|
||||||
|
// If chunk is not nil, then we need to try sending it off.
|
||||||
|
} else {
|
||||||
|
err := s.repairer.Repair(chunk.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
chunk.Close()
|
||||||
|
chunk = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err = s.dst.Write(chunk.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
s.repairer.Failed()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
chunk.Close()
|
||||||
|
chunk = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// rtmpSender implements loadSender for a native RTMP destination.
|
// rtmpSender implements loadSender for a native RTMP destination.
|
||||||
type rtmpSender struct {
|
type rtmpSender struct {
|
||||||
conn *rtmp.Conn
|
conn *rtmp.Conn
|
||||||
|
@ -238,7 +303,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rtmpSender) restart() error {
|
func (s *rtmpSender) restart() error {
|
||||||
s.close()
|
s.Close()
|
||||||
var err error
|
var err error
|
||||||
for n := 0; n < s.retries; n++ {
|
for n := 0; n < s.retries; n++ {
|
||||||
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
|
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
|
||||||
|
@ -253,7 +318,7 @@ func (s *rtmpSender) restart() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rtmpSender) close() error {
|
func (s *rtmpSender) Close() error {
|
||||||
if s.conn != nil {
|
if s.conn != nil {
|
||||||
return s.conn.Close()
|
return s.conn.Close()
|
||||||
}
|
}
|
||||||
|
@ -284,3 +349,5 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
|
||||||
func (s *rtpSender) Write(d []byte) (int, error) {
|
func (s *rtpSender) Write(d []byte) (int, error) {
|
||||||
return s.encoder.Write(s.data)
|
return s.encoder.Write(s.data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *rtpSender) Close() error { return nil }
|
||||||
|
|
|
@ -30,7 +30,6 @@ package revid
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -40,7 +39,6 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/mts"
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
"bitbucket.org/ausocean/av/container/mts/meta"
|
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
"bitbucket.org/ausocean/utils/ring"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Ring buffer sizes and read/write timeouts.
|
// Ring buffer sizes and read/write timeouts.
|
||||||
|
@ -55,20 +53,25 @@ var (
|
||||||
errSendFailed = errors.New("send failed")
|
errSendFailed = errors.New("send failed")
|
||||||
)
|
)
|
||||||
|
|
||||||
// sender simulates sending of video data, creating discontinuities if
|
// destination simulates a destination for the mtsSender. It allows for the
|
||||||
// testDiscontinuities is set to true.
|
// emulation of failed and delayed sends.
|
||||||
type destination struct {
|
type destination struct {
|
||||||
buf [][]byte
|
buf [][]byte
|
||||||
testDiscontinuities bool
|
testFails bool
|
||||||
discontinuityAt int
|
failAt int
|
||||||
currentPkt int
|
currentPkt int
|
||||||
|
t *testing.T
|
||||||
|
sendDelay time.Duration
|
||||||
|
delayAt int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements io.Writer.
|
|
||||||
// Write takes d and neglects if testDiscontinuities is true, returning an error,
|
|
||||||
// otherwise d is appended to senders buf.
|
|
||||||
func (ts *destination) Write(d []byte) (int, error) {
|
func (ts *destination) Write(d []byte) (int, error) {
|
||||||
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
|
ts.t.Log("writing clip to destination")
|
||||||
|
if ts.delayAt != 0 && ts.currentPkt == ts.delayAt {
|
||||||
|
time.Sleep(ts.sendDelay)
|
||||||
|
}
|
||||||
|
if ts.testFails && ts.currentPkt == ts.failAt {
|
||||||
|
ts.t.Log("failed send")
|
||||||
ts.currentPkt++
|
ts.currentPkt++
|
||||||
return 0, errSendFailed
|
return 0, errSendFailed
|
||||||
}
|
}
|
||||||
|
@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) {
|
||||||
return len(d), nil
|
return len(d), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// log implements the required logging func for some of the structs in use
|
func (ts *destination) Close() error { return nil }
|
||||||
// within tests.
|
|
||||||
func log(lvl int8, msg string, args ...interface{}) {
|
// dummyLogger will allow logging to be done by the testing pkg.
|
||||||
|
type dummyLogger testing.T
|
||||||
|
|
||||||
|
func (dl dummyLogger) log(lvl int8, msg string, args ...interface{}) {
|
||||||
var l string
|
var l string
|
||||||
switch lvl {
|
switch lvl {
|
||||||
case logger.Warning:
|
case logger.Warning:
|
||||||
|
@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) {
|
||||||
for i := 0; i < len(args); i++ {
|
for i := 0; i < len(args); i++ {
|
||||||
msg += " %v"
|
msg += " %v"
|
||||||
}
|
}
|
||||||
fmt.Printf(msg, args)
|
if len(args) == 0 {
|
||||||
|
dl.Log(msg + "\n")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dl.Logf(msg+"\n", args)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSegment ensures that the mtsSender correctly segments data into clips
|
// TestSegment ensures that the mtsSender correctly segments data into clips
|
||||||
|
@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) {
|
||||||
func TestMtsSenderSegment(t *testing.T) {
|
func TestMtsSenderSegment(t *testing.T) {
|
||||||
mts.Meta = meta.New()
|
mts.Meta = meta.New()
|
||||||
|
|
||||||
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
|
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
||||||
tstDst := &destination{}
|
tstDst := &destination{t: t}
|
||||||
loadSender := newMtsSender(tstDst, log)
|
sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
|
||||||
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
|
encoder := mts.NewEncoder(sender, 25)
|
||||||
encoder := mts.NewEncoder((*buffer)(rb), 25)
|
|
||||||
|
|
||||||
// Turn time based PSI writing off for encoder.
|
// Turn time based PSI writing off for encoder.
|
||||||
const psiSendCount = 10
|
const psiSendCount = 10
|
||||||
encoder.TimeBasedPsi(false, psiSendCount)
|
encoder.TimeBasedPsi(false, psiSendCount)
|
||||||
|
|
||||||
|
// Write the packets to the encoder, which will in turn write to the mtsSender.
|
||||||
|
// Payload will just be packet number.
|
||||||
|
t.Log("writing packets")
|
||||||
const noOfPacketsToWrite = 100
|
const noOfPacketsToWrite = 100
|
||||||
for i := 0; i < noOfPacketsToWrite; i++ {
|
for i := 0; i < noOfPacketsToWrite; i++ {
|
||||||
// Insert a payload so that we check that the segmentation works correctly
|
|
||||||
// in this regard. Packet number will be used.
|
|
||||||
encoder.Write([]byte{byte(i)})
|
encoder.Write([]byte{byte(i)})
|
||||||
rb.Flush()
|
|
||||||
|
|
||||||
for {
|
|
||||||
next, err := rb.Next(rTimeout)
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = loadSender.Write(next.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
|
||||||
}
|
|
||||||
next.Close()
|
|
||||||
next = nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Give the mtsSender some time to finish up and then Close it.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
sender.Close()
|
||||||
|
|
||||||
|
// Check the data.
|
||||||
result := tstDst.buf
|
result := tstDst.buf
|
||||||
expectData := 0
|
expectData := 0
|
||||||
for clipNo, clip := range result {
|
for clipNo, clip := range result {
|
||||||
|
@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the clip data is okay.
|
// Check that the clip data is okay.
|
||||||
|
t.Log("checking clip data")
|
||||||
for i := 0; i < len(clip); i += mts.PacketSize {
|
for i := 0; i < len(clip); i += mts.PacketSize {
|
||||||
copy(pkt[:], clip[i:i+mts.PacketSize])
|
copy(pkt[:], clip[i:i+mts.PacketSize])
|
||||||
if pkt.PID() == mts.VideoPid {
|
if pkt.PID() == mts.VideoPid {
|
||||||
|
t.Log("got video PID")
|
||||||
payload, err := pkt.Payload()
|
payload, err := pkt.Payload()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
t.Fatalf("Unexpected err: %v\n", err)
|
||||||
|
@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestMtsSenderFailedSend checks that a failed send is correctly handled by
|
||||||
|
// the mtsSender. The mtsSender should try to send the same clip again.
|
||||||
|
func TestMtsSenderFailedSend(t *testing.T) {
|
||||||
|
mts.Meta = meta.New()
|
||||||
|
|
||||||
|
// Create destination, the mtsSender and the mtsEncoder
|
||||||
|
const clipToFailAt = 3
|
||||||
|
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt}
|
||||||
|
sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
|
||||||
|
encoder := mts.NewEncoder(sender, 25)
|
||||||
|
|
||||||
|
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
|
||||||
|
const psiSendCount = 10
|
||||||
|
encoder.TimeBasedPsi(false, psiSendCount)
|
||||||
|
|
||||||
|
// Write the packets to the encoder, which will in turn write to the mtsSender.
|
||||||
|
// Payload will just be packet number.
|
||||||
|
t.Log("writing packets")
|
||||||
|
const noOfPacketsToWrite = 100
|
||||||
|
for i := 0; i < noOfPacketsToWrite; i++ {
|
||||||
|
encoder.Write([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give the mtsSender some time to finish up and then Close it.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
sender.Close()
|
||||||
|
|
||||||
|
// Check that we have data as expected.
|
||||||
|
result := tstDst.buf
|
||||||
|
expectData := 0
|
||||||
|
for clipNo, clip := range result {
|
||||||
|
t.Logf("Checking clip: %v\n", clipNo)
|
||||||
|
|
||||||
|
// Check that the clip is of expected length.
|
||||||
|
clipLen := len(clip)
|
||||||
|
if clipLen != psiSendCount*mts.PacketSize {
|
||||||
|
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also check that the first packet is a PAT.
|
||||||
|
firstPkt := clip[:mts.PacketSize]
|
||||||
|
var pkt packet.Packet
|
||||||
|
copy(pkt[:], firstPkt)
|
||||||
|
pid := pkt.PID()
|
||||||
|
if pid != mts.PatPid {
|
||||||
|
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the clip data is okay.
|
||||||
|
t.Log("checking clip data")
|
||||||
|
for i := 0; i < len(clip); i += mts.PacketSize {
|
||||||
|
copy(pkt[:], clip[i:i+mts.PacketSize])
|
||||||
|
if pkt.PID() == mts.VideoPid {
|
||||||
|
t.Log("got video PID")
|
||||||
|
payload, err := pkt.Payload()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected err: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse PES from the MTS payload.
|
||||||
|
pes, err := pes.NewPESHeader(payload)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected err: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the data from the PES packet and convert to an int.
|
||||||
|
data := int8(pes.Data()[0])
|
||||||
|
|
||||||
|
// Calc expected data in the PES and then check.
|
||||||
|
if data != int8(expectData) {
|
||||||
|
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
|
||||||
|
}
|
||||||
|
expectData++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is
|
||||||
|
// correctly handled by the mtsSender. A discontinuity is caused by overflowing
|
||||||
|
// the mtsSender's ringBuffer. It is expected that the next clip seen has the
|
||||||
|
// disconinuity indicator applied.
|
||||||
func TestMtsSenderDiscontinuity(t *testing.T) {
|
func TestMtsSenderDiscontinuity(t *testing.T) {
|
||||||
mts.Meta = meta.New()
|
mts.Meta = meta.New()
|
||||||
|
|
||||||
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
|
// Create destination, the mtsSender and the mtsEncoder.
|
||||||
const clipWithDiscontinuity = 3
|
const clipToDelay = 3
|
||||||
tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
|
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay}
|
||||||
loadSender := newMtsSender(tstDst, log)
|
sender := newMtsSender(tstDst, dummyLogger(*t).log, 1, ringBufferElementSize, writeTimeout)
|
||||||
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
|
encoder := mts.NewEncoder(sender, 25)
|
||||||
encoder := mts.NewEncoder((*buffer)(rb), 25)
|
|
||||||
|
|
||||||
// Turn time based PSI writing off for encoder.
|
// Turn time based PSI writing off for encoder.
|
||||||
const psiSendCount = 10
|
const psiSendCount = 10
|
||||||
encoder.TimeBasedPsi(false, psiSendCount)
|
encoder.TimeBasedPsi(false, psiSendCount)
|
||||||
|
|
||||||
|
// Write the packets to the encoder, which will in turn write to the mtsSender.
|
||||||
|
// Payload will just be packet number.
|
||||||
const noOfPacketsToWrite = 100
|
const noOfPacketsToWrite = 100
|
||||||
for i := 0; i < noOfPacketsToWrite; i++ {
|
for i := 0; i < noOfPacketsToWrite; i++ {
|
||||||
// Our payload will just be packet number.
|
|
||||||
encoder.Write([]byte{byte(i)})
|
encoder.Write([]byte{byte(i)})
|
||||||
rb.Flush()
|
|
||||||
|
|
||||||
for {
|
|
||||||
next, err := rb.Next(rTimeout)
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = loadSender.Write(next.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
|
||||||
}
|
|
||||||
next.Close()
|
|
||||||
next = nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Give mtsSender time to finish up then Close.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
sender.Close()
|
||||||
|
|
||||||
|
// Check the data.
|
||||||
result := tstDst.buf
|
result := tstDst.buf
|
||||||
|
expectedCC := 0
|
||||||
|
for clipNo, clip := range result {
|
||||||
|
t.Logf("Checking clip: %v\n", clipNo)
|
||||||
|
|
||||||
// First check that we have less clips as expected.
|
// Check that the clip is of expected length.
|
||||||
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
|
clipLen := len(clip)
|
||||||
gotLen := len(result)
|
if clipLen != psiSendCount*mts.PacketSize {
|
||||||
if gotLen != expectedLen {
|
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
|
||||||
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
|
// Also check that the first packet is a PAT.
|
||||||
disconClip := result[clipWithDiscontinuity]
|
firstPkt := clip[:mts.PacketSize]
|
||||||
firstPkt := disconClip[:mts.PacketSize]
|
var pkt packet.Packet
|
||||||
var pkt packet.Packet
|
copy(pkt[:], firstPkt)
|
||||||
copy(pkt[:], firstPkt)
|
pid := pkt.PID()
|
||||||
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
|
if pid != mts.PatPid {
|
||||||
if err != nil {
|
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if !discon {
|
// Get the discontinuity indicator
|
||||||
t.Fatalf("Did not get discontinuity indicator for PAT")
|
discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity()
|
||||||
|
|
||||||
|
// Check the continuity counter.
|
||||||
|
cc := pkt.ContinuityCounter()
|
||||||
|
if cc != expectedCC {
|
||||||
|
t.Log("discontinuity found")
|
||||||
|
expectedCC = cc
|
||||||
|
if !discon {
|
||||||
|
t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if discon && clipNo != 0 {
|
||||||
|
t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expectedCC = (expectedCC + 1) & 0xf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue