Merged in buffered-mts-sender (pull request #180)

revid: Buffered MtsSender

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2019-04-14 01:44:53 +00:00
commit 899a2fe89e
6 changed files with 292 additions and 118 deletions

12
go.mod
View File

@ -4,21 +4,11 @@ go 1.12
require ( require (
bitbucket.org/ausocean/iot v1.2.4 bitbucket.org/ausocean/iot v1.2.4
bitbucket.org/ausocean/utils v1.2.4 bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884
github.com/mewkiz/flac v1.0.5 github.com/mewkiz/flac v1.0.5
github.com/pkg/errors v0.8.1 // indirect
github.com/sergi/go-diff v1.0.0 // indirect
github.com/stretchr/testify v1.3.0 // indirect
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
) )

14
go.sum
View File

@ -1,23 +1,34 @@
bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4=
bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU=
bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q=
bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA=
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA=
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20=
bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw=
bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c=
github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw=
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs=
github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ=
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA=
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s=
github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc=
@ -29,16 +40,19 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0=
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw=
github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=

View File

@ -44,6 +44,7 @@ import (
"bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/flv"
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
) )
@ -111,6 +112,9 @@ type Revid struct {
// encoder holds the required encoders, which then write to destinations. // encoder holds the required encoders, which then write to destinations.
encoder []io.Writer encoder []io.Writer
// writeClosers holds the senders that the encoders will write to.
writeClosers []io.WriteCloser
// bitrate hold the last send bitrate calculation result. // bitrate hold the last send bitrate calculation result.
bitrate int bitrate int
@ -177,24 +181,29 @@ func (r *Revid) setConfig(config Config) error {
return nil return nil
} }
// setupPipeline constructs a data pipeline. // setupPipeline constructs the revid dataPipeline. Inputs, encoders and
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error { // senders are created and linked based on the current revid config.
//
// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder
// respectively. multiWriter will be used to create an ioext.multiWriteCloser
// so that encoders can write to multiple senders.
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout))
r.encoder = r.encoder[:0] r.encoder = r.encoder[:0]
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
// will hold senders that require FLV encoding. // will hold senders that require FLV encoding.
var mtsSenders, flvSenders []io.Writer var mtsSenders, flvSenders []io.WriteCloser
// We will go through our outputs and create the corresponding senders to add // We will go through our outputs and create the corresponding senders to add
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
// output requires FLV encoding. // output requires FLV encoding.
var w io.Writer var w io.WriteCloser
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case Http: case Http:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout)
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case Rtp: case Rtp:
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
@ -279,7 +288,7 @@ func (r *Revid) reset(config Config) error {
return err return err
} }
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter) err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser)
if err != nil { if err != nil {
return err return err
} }
@ -338,14 +347,19 @@ func (r *Revid) Stop() {
return return
} }
for _, w := range r.writeClosers {
err := w.Close()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error())
}
}
r.config.Logger.Log(logger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.setIsRunning(false)
r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.setIsRunning(false)
r.wg.Wait() r.wg.Wait()
} }

View File

@ -97,10 +97,10 @@ func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil }
// dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we
// can access the destinations. // can access the destinations.
type dummyMultiWriter struct { type dummyMultiWriter struct {
dst []io.Writer dst []io.WriteCloser
} }
func newDummyMultiWriter(dst ...io.Writer) io.Writer { func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser {
return &dummyMultiWriter{ return &dummyMultiWriter{
dst: dst, dst: dst,
} }
@ -108,6 +108,8 @@ func newDummyMultiWriter(dst ...io.Writer) io.Writer {
func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil }
func (w *dummyMultiWriter) Close() error { return nil }
// TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the
// revid.encoder slice and the senders the encoders write to. // revid.encoder slice and the senders the encoders write to.
func TestResetEncoderSenderSetup(t *testing.T) { func TestResetEncoderSenderSetup(t *testing.T) {

View File

@ -35,6 +35,8 @@ import (
"net" "net"
"os" "os"
"strconv" "strconv"
"sync"
"time"
"github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet"
@ -43,6 +45,7 @@ import (
"bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
// Log is used by the multiSender. // Log is used by the multiSender.
@ -68,6 +71,8 @@ func (s *httpSender) Write(d []byte) (int, error) {
return len(d), httpSend(d, s.client, s.log) return len(d), httpSend(d, s.client, s.log)
} }
func (s *httpSender) Close() error { return nil }
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input. // Only send if "V0" is configured as an input.
send := false send := false
@ -129,7 +134,7 @@ type fileSender struct {
data []byte data []byte
} }
func newFileSender(path string) (io.Writer, error) { func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path) f, err := os.Create(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -142,26 +147,80 @@ func (s *fileSender) Write(d []byte) (int, error) {
return s.file.Write(d) return s.file.Write(d)
} }
func (s *fileSender) close() error { return s.file.Close() } func (s *fileSender) Close() error { return s.file.Close() }
// mtsSender implements loadSender and provides sending capability specifically // mtsSender implements io.WriteCloser and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately // for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also fixes accounts for discontinuities by // lengthed clips based on PSI. It also accounts for discontinuities by
// setting the discontinuity indicator for the first packet of a clip. // setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct { type mtsSender struct {
dst io.Writer dst io.WriteCloser
buf []byte buf []byte
ringBuf *ring.Buffer
next []byte next []byte
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
curPid int curPid int
quit chan struct{}
log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender { func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender {
return &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
log: log,
ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout),
quit: make(chan struct{}),
}
s.wg.Add(1)
go s.output()
return s
}
// output starts an mtsSender's data handling routine.
func (s *mtsSender) output() {
var chunk *ring.Chunk
for {
select {
case <-s.quit:
s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.ringBuf.Next(readTimeout)
switch err {
case nil:
continue
case ring.ErrTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
fallthrough
case io.EOF:
continue
}
}
err := s.repairer.Repair(chunk.Bytes())
if err != nil {
chunk.Close()
chunk = nil
continue
}
_, err = s.dst.Write(chunk.Bytes())
if err != nil {
s.repairer.Failed()
continue
}
chunk.Close()
chunk = nil
}
} }
} }
@ -176,20 +235,23 @@ func (s *mtsSender) Write(d []byte) (int, error) {
copy(s.pkt[:], bytes) copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID() s.curPid = s.pkt.PID()
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
err := s.repairer.Repair(s.buf) _, err := s.ringBuf.Write(s.buf)
if err == nil { if err != nil {
_, err = s.dst.Write(s.buf) s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
if err == nil {
goto done
}
} }
s.repairer.Failed() s.ringBuf.Flush()
done:
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
return len(d), nil return len(d), nil
} }
// Close implements io.Closer.
func (s *mtsSender) Close() error {
close(s.quit)
s.wg.Wait()
return nil
}
// rtmpSender implements loadSender for a native RTMP destination. // rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct { type rtmpSender struct {
conn *rtmp.Conn conn *rtmp.Conn
@ -238,7 +300,7 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
} }
func (s *rtmpSender) restart() error { func (s *rtmpSender) restart() error {
s.close() s.Close()
var err error var err error
for n := 0; n < s.retries; n++ { for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
@ -253,11 +315,11 @@ func (s *rtmpSender) restart() error {
return err return err
} }
func (s *rtmpSender) close() error { func (s *rtmpSender) Close() error {
if s.conn != nil { if s.conn == nil {
return s.conn.Close() return nil
} }
return nil return s.conn.Close()
} }
// TODO: Write restart func for rtpSender // TODO: Write restart func for rtpSender
@ -290,3 +352,5 @@ func (s *rtpSender) Write(d []byte) (int, error) {
} }
return len(d), nil return len(d), nil
} }
func (s *rtpSender) Close() error { return nil }

View File

@ -30,7 +30,6 @@ package revid
import ( import (
"errors" "errors"
"fmt"
"testing" "testing"
"time" "time"
@ -40,7 +39,6 @@ import (
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
// Ring buffer sizes and read/write timeouts. // Ring buffer sizes and read/write timeouts.
@ -55,20 +53,25 @@ var (
errSendFailed = errors.New("send failed") errSendFailed = errors.New("send failed")
) )
// sender simulates sending of video data, creating discontinuities if // destination simulates a destination for the mtsSender. It allows for the
// testDiscontinuities is set to true. // emulation of failed and delayed sends.
type destination struct { type destination struct {
buf [][]byte buf [][]byte
testDiscontinuities bool testFails bool
discontinuityAt int failAt int
currentPkt int currentPkt int
t *testing.T
sendDelay time.Duration
delayAt int
} }
// Write implements io.Writer.
// Write takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *destination) Write(d []byte) (int, error) { func (ts *destination) Write(d []byte) (int, error) {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { ts.t.Log("writing clip to destination")
if ts.delayAt != 0 && ts.currentPkt == ts.delayAt {
time.Sleep(ts.sendDelay)
}
if ts.testFails && ts.currentPkt == ts.failAt {
ts.t.Log("failed send")
ts.currentPkt++ ts.currentPkt++
return 0, errSendFailed return 0, errSendFailed
} }
@ -79,9 +82,12 @@ func (ts *destination) Write(d []byte) (int, error) {
return len(d), nil return len(d), nil
} }
// log implements the required logging func for some of the structs in use func (ts *destination) Close() error { return nil }
// within tests.
func log(lvl int8, msg string, args ...interface{}) { // dummyLogger will allow logging to be done by the testing pkg.
type dummyLogger testing.T
func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) {
var l string var l string
switch lvl { switch lvl {
case logger.Warning: case logger.Warning:
@ -99,7 +105,11 @@ func log(lvl int8, msg string, args ...interface{}) {
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
msg += " %v" msg += " %v"
} }
fmt.Printf(msg, args) if len(args) == 0 {
dl.Log(msg + "\n")
return
}
dl.Logf(msg+"\n", args)
} }
// TestSegment ensures that the mtsSender correctly segments data into clips // TestSegment ensures that the mtsSender correctly segments data into clips
@ -107,38 +117,28 @@ func log(lvl int8, msg string, args ...interface{}) {
func TestMtsSenderSegment(t *testing.T) { func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
tstDst := &destination{} tstDst := &destination{t: t}
loadSender := newMtsSender(tstDst, log) sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video)
encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.TimeBasedPsi(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number.
t.Log("writing packets")
const noOfPacketsToWrite = 100 const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
// Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet number will be used.
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
_, err = loadSender.Write(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
next.Close()
next = nil
}
} }
// Give the mtsSender some time to finish up and then Close it.
time.Sleep(10 * time.Millisecond)
sender.Close()
// Check the data.
result := tstDst.buf result := tstDst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
@ -160,9 +160,11 @@ func TestMtsSenderSegment(t *testing.T) {
} }
// Check that the clip data is okay. // Check that the clip data is okay.
t.Log("checking clip data")
for i := 0; i < len(clip); i += mts.PacketSize { for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize]) copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid { if pkt.PID() == mts.VideoPid {
t.Log("got video PID")
payload, err := pkt.Payload() payload, err := pkt.Payload()
if err != nil { if err != nil {
t.Fatalf("Unexpected err: %v\n", err) t.Fatalf("Unexpected err: %v\n", err)
@ -187,61 +189,149 @@ func TestMtsSenderSegment(t *testing.T) {
} }
} }
// TestMtsSenderFailedSend checks that a failed send is correctly handled by
// the mtsSender. The mtsSender should try to send the same clip again.
func TestMtsSenderFailedSend(t *testing.T) {
mts.Meta = meta.New()
// Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number.
t.Log("writing packets")
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
encoder.Write([]byte{byte(i)})
}
// Give the mtsSender some time to finish up and then Close it.
time.Sleep(10 * time.Millisecond)
sender.Close()
// Check that we have data as expected.
result := tstDst.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay.
t.Log("checking clip data")
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
t.Log("got video PID")
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0])
// Calc expected data in the PES and then check.
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
// TestMtsSenderDiscontinuity checks that a discontinuity in a stream is
// correctly handled by the mtsSender. A discontinuity is caused by overflowing
// the mtsSender's ringBuffer. It is expected that the next clip seen has the
// disconinuity indicator applied.
func TestMtsSenderDiscontinuity(t *testing.T) { func TestMtsSenderDiscontinuity(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringBuffer sender, loadSender and the MPEGTS encoder. // Create destination, the mtsSender and the mtsEncoder.
const clipWithDiscontinuity = 3 const clipToDelay = 3
tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay}
loadSender := newMtsSender(tstDst, log) sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder(sender, 25, mts.Video)
encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.TimeBasedPsi(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number.
const noOfPacketsToWrite = 100 const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet number.
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
_, err = loadSender.Write(next.Bytes())
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
next.Close()
next = nil
}
} }
// Give mtsSender time to finish up then Close.
time.Sleep(100 * time.Millisecond)
sender.Close()
// Check the data.
result := tstDst.buf result := tstDst.buf
expectedCC := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// First check that we have less clips as expected. // Check that the clip is of expected length.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 clipLen := len(clip)
gotLen := len(result) if clipLen != psiSendCount*mts.PacketSize {
if gotLen != expectedLen { t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) }
}
// Now check that the discontinuity indicator is set at the discontinuityClip PAT. // Also check that the first packet is a PAT.
disconClip := result[clipWithDiscontinuity] firstPkt := clip[:mts.PacketSize]
firstPkt := disconClip[:mts.PacketSize] var pkt packet.Packet
var pkt packet.Packet copy(pkt[:], firstPkt)
copy(pkt[:], firstPkt) pid := pkt.PID()
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() if pid != mts.PatPid {
if err != nil { t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
t.Fatalf("Unexpected err: %v\n", err) }
}
if !discon { // Get the discontinuity indicator
t.Fatalf("Did not get discontinuity indicator for PAT") discon, _ := (*packet.AdaptationField)(&pkt).Discontinuity()
// Check the continuity counter.
cc := pkt.ContinuityCounter()
if cc != expectedCC {
t.Log("discontinuity found")
expectedCC = cc
if !discon {
t.Errorf("discontinuity indicator not set where expected for clip: %v", clipNo)
}
} else {
if discon && clipNo != 0 {
t.Errorf("did not expect discontinuity indicator to be set for clip: %v", clipNo)
}
}
expectedCC = (expectedCC + 1) & 0xf
} }
} }