revid: Buffered MtsSender

The mtsSender now has a ringBuffer and tests have been updated accordingly. The mtsSender now uses an output routine to get data from it's ringBuffer to send.
 Revid now uses ioext.multiWriteClosers for encoders to write to so that senders can be closed and therefore any output routines.
This commit is contained in:
Saxon 2019-04-08 19:02:42 +09:30
parent 4978db2f2b
commit 66622920d5
3 changed files with 274 additions and 108 deletions

View File

@ -44,6 +44,7 @@ import (
"bitbucket.org/ausocean/av/container/flv"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
@ -111,6 +112,8 @@ type Revid struct {
// encoder holds the required encoders, which then write to destinations.
encoder []io.Writer
writeClosers []io.WriteCloser
// bitrate hold the last send bitrate calculation result.
bitrate int
@ -178,42 +181,42 @@ func (r *Revid) setConfig(config Config) error {
}
// setupPipeline constructs a data pipeline.
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error {
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout))
r.encoder = r.encoder[:0]
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
// will hold senders that require FLV encoding.
var mtsSenders, flvSenders []io.Writer
var mtsSenders, flvSenders []io.WriteCloser
// We will go through our outputs and create the corresponding senders to add
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
// output requires FLV encoding.
var w io.Writer
var wc io.WriteCloser
for _, out := range r.config.Outputs {
switch out {
case Http:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil)
mtsSenders = append(mtsSenders, w)
wc = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout)
mtsSenders = append(mtsSenders, wc)
case Rtp:
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
wc, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error())
}
mtsSenders = append(mtsSenders, w)
mtsSenders = append(mtsSenders, wc)
case File:
w, err := newFileSender(r.config.OutputPath)
wc, err := newFileSender(r.config.OutputPath)
if err != nil {
return err
}
mtsSenders = append(mtsSenders, w)
mtsSenders = append(mtsSenders, wc)
case Rtmp:
w, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
wc, err := newRtmpSender(r.config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error())
}
flvSenders = append(flvSenders, w)
flvSenders = append(flvSenders, wc)
}
}
@ -279,7 +282,7 @@ func (r *Revid) reset(config Config) error {
return err
}
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter)
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser)
if err != nil {
return err
}
@ -338,15 +341,21 @@ func (r *Revid) Stop() {
return
}
for _, wc := range r.writeClosers {
err := wc.Close()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error())
return
}
}
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.setIsRunning(false)
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
}
r.wg.Wait()
r.setIsRunning(false)
}
func (r *Revid) Update(vars map[string]string) error {

View File

@ -35,6 +35,8 @@ import (
"net"
"os"
"strconv"
"sync"
"time"
"github.com/Comcast/gots/packet"
@ -43,6 +45,7 @@ import (
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Log is used by the multiSender.
@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) {
return len(d), httpSend(d, s.client, s.log)
}
func (s *httpSender) Close() error { return nil }
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input.
send := false
@ -129,7 +134,7 @@ type fileSender struct {
data []byte
}
func newFileSender(path string) (io.Writer, error) {
func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
@ -142,27 +147,37 @@ func (s *fileSender) Write(d []byte) (int, error) {
return s.file.Write(d)
}
func (s *fileSender) close() error { return s.file.Close() }
func (s *fileSender) Close() error { return s.file.Close() }
// mtsSender implements loadSender and provides sending capability specifically
// mtsSender implements io.WriteCloser and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
// lengthed clips based on PSI. It also accounts for discontinuities by
// setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
dst io.Writer
dst io.WriteCloser
buf []byte
ringBuf *ring.Buffer
next []byte
pkt packet.Packet
repairer *mts.DiscontinuityRepairer
curPid int
quit chan struct{}
log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup
}
// newMtsSender returns a new mtsSender.
func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender {
s := &mtsSender{
dst: dst,
repairer: mts.NewDiscontinuityRepairer(),
log: log,
ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout),
quit: make(chan struct{}),
}
s.wg.Add(1)
go s.output()
return s
}
// Write implements io.Writer.
@ -176,20 +191,70 @@ func (s *mtsSender) Write(d []byte) (int, error) {
copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID()
if s.curPid == mts.PatPid && len(s.buf) > 0 {
err := s.repairer.Repair(s.buf)
if err == nil {
_, err = s.dst.Write(s.buf)
if err == nil {
goto done
}
_, err := s.ringBuf.Write(s.buf)
if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
}
s.repairer.Failed()
done:
s.ringBuf.Flush()
s.buf = s.buf[:0]
}
return len(d), nil
}
// Close implements io.Closer.
func (s *mtsSender) Close() error {
close(s.quit)
s.wg.Wait()
return nil
}
// output is a routine start at creation of the mtsSender. It will get data
// from the mtsSenders ringBuffer and attempt to send.
func (s *mtsSender) output() {
var chunk *ring.Chunk
loop:
for {
select {
case <-s.quit:
s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.ringBuf.Next(readTimeout)
switch err {
case nil:
case ring.ErrTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
fallthrough
case io.EOF:
goto loop
}
// If chunk is not nil, then we need to try sending it off.
} else {
err := s.repairer.Repair(chunk.Bytes())
if err != nil {
chunk.Close()
chunk = nil
continue
}
_, err = s.dst.Write(chunk.Bytes())
if err != nil {
s.repairer.Failed()
continue
}
chunk.Close()
chunk = nil
}
}
}
}
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
conn *rtmp.Conn
@ -238,7 +303,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
}
func (s *rtmpSender) restart() error {
s.close()
s.Close()
var err error
for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
@ -253,7 +318,7 @@ func (s *rtmpSender) restart() error {
return err
}
func (s *rtmpSender) close() error {
func (s *rtmpSender) Close() error {
if s.conn != nil {
return s.conn.Close()
}
@ -290,3 +355,5 @@ func (s *rtpSender) Write(d []byte) (int, error) {
}
return len(d), nil
}
func (s *rtpSender) Close() error { return nil }

View File

@ -30,7 +30,6 @@ package revid
import (
"errors"
"fmt"
"testing"
"time"
@ -40,7 +39,6 @@ import (
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer sizes and read/write timeouts.
@ -55,20 +53,25 @@ var (
errSendFailed = errors.New("send failed")
)
// sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true.
// destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends.
type destination struct {
buf [][]byte
testDiscontinuities bool
discontinuityAt int
currentPkt int
buf [][]byte
testFails bool
failAt int
currentPkt int
t *testing.T
sendDelay time.Duration
delayAt int
}
// Write implements io.Writer.
// Write takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *destination) Write(d []byte) (int, error) {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.t.Log("writing clip to destination")
if ts.delayAt != 0 && ts.currentPkt == ts.delayAt {
time.Sleep(ts.sendDelay)
}
if ts.testFails && ts.currentPkt == ts.failAt {
ts.t.Log("failed send")
ts.currentPkt++
return 0, errSendFailed
}
@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) {
return len(d), nil
}
// log implements the required logging func for some of the structs in use
// within tests.
func log(lvl int8, msg string, args ...interface{}) {
func (ts *destination) Close() error { return nil }
// dummyLogger will allow logging to be done by the testing pkg.
type dummyLogger testing.T
func (dl dummyLogger) log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) {
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
if len(args) == 0 {
dl.Log(msg + "\n")
return
}
dl.Logf(msg+"\n", args)
}
// TestSegment ensures that the mtsSender correctly segments data into clips
@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) {
func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstDst := &destination{}
loadSender := newMtsSender(tstDst, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Create ringBuffer, sender, sender and the MPEGTS encoder.
tstDst := &destination{t: t}
sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
encoder := mts.NewEncoder(sender, 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number.
t.Log("writing packets")
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet number will be used.
encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
_, err = loadSender.Write(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
next.Close()
next = nil
}
}
// Give the mtsSender some time to finish up and then Close it.
time.Sleep(10 * time.Millisecond)
sender.Close()
// Check the data.
result := tstDst.buf
expectData := 0
for clipNo, clip := range result {
@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) {
}
// Check that the clip data is okay.
t.Log("checking clip data")
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
t.Log("got video PID")
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) {
}
}
// TestMtsSenderFailedSend checks that a failed send is correctly handled by
// the mtsSender. The mtsSender should try to send the same clip again.
func TestMtsSenderFailedSend(t *testing.T) {
mts.Meta = meta.New()
// Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt}
sender := newMtsSender(tstDst, dummyLogger(*t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
encoder := mts.NewEncoder(sender, 25)
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number.
t.Log("writing packets")
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
encoder.Write([]byte{byte(i)})
}
// Give the mtsSender some time to finish up and then Close it.
time.Sleep(10 * time.Millisecond)
sender.Close()
// Check that we have data as expected.
result := tstDst.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay.
t.Log("checking clip data")
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
t.Log("got video PID")
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0])
// Calc expected data in the PES and then check.
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is
// correctly handled by the mtsSender. A discontinuity is caused by overflowing
// the mtsSender's ringBuffer. It is expected that the next clip seen has the
// disconinuity indicator applied.
func TestMtsSenderDiscontinuity(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3
tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstDst, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay}
sender := newMtsSender(tstDst, dummyLogger(*t).log, 1, ringBufferElementSize, writeTimeout)
encoder := mts.NewEncoder(sender, 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number.
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet number.
encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
_, err = loadSender.Write(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
next.Close()
next = nil
}
}
// Give mtsSender time to finish up then Close.
time.Sleep(100 * time.Millisecond)
sender.Close()
// Check the data.
result := tstDst.buf
expectedCC := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result)
if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
}
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
disconClip := result[clipWithDiscontinuity]
firstPkt := disconClip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
if !discon {
t.Fatalf("Did not get discontinuity indicator for PAT")
// Get the discontinuity indicator
discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity()
// Check the continuity counter.
cc := pkt.ContinuityCounter()
if cc != expectedCC {
t.Log("discontinuity found")
expectedCC = cc
if !discon {
t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo)
}
} else {
if discon && clipNo != 0 {
t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo)
}
}
expectedCC = (expectedCC + 1) & 0xf
}
}