mirror of https://bitbucket.org/ausocean/av.git
Merged in using-vring (pull request #324)
revid: now using variable ring buffer Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
commit
949c4ee2af
|
@ -86,15 +86,10 @@ const (
|
|||
defaultAudioInputCodec = codecutil.ADPCM
|
||||
defaultPSITime = 2
|
||||
|
||||
// MTS ring buffer defaults.
|
||||
defaultMTSRBSize = 100
|
||||
defaultMTSRBElementSize = 300000
|
||||
defaultMTSRBWriteTimeout = 5
|
||||
|
||||
// RTMP ring buffer defaults.
|
||||
defaultRTMPRBSize = 100
|
||||
defaultRTMPRBElementSize = 300000
|
||||
defaultRTMPRBWriteTimeout = 5
|
||||
// Ring buffer defaults.
|
||||
defaultRBMaxElements = 10000
|
||||
defaultRBCapacity = 200000000 // bytes (200MB)
|
||||
defaultRBWriteTimeout = 5
|
||||
|
||||
// Motion filter parameter defaults.
|
||||
defaultMinFPS = 1.0
|
||||
|
@ -282,15 +277,10 @@ type Config struct {
|
|||
Filters []int // Defines the methods of filtering to be used in between lexing and encoding.
|
||||
PSITime int // Sets the time between a packet being sent.
|
||||
|
||||
// RTMP ring buffer parameters.
|
||||
RTMPRBSize int // The number of elements in the RTMP sender ringbuffer.
|
||||
RTMPRBElementSize int // The element size in bytes of the RTMP sender RingBuffer.
|
||||
RTMPRBWriteTimeout int // The ringbuffer write timeout in seconds.
|
||||
|
||||
// MTS ring buffer parameters.
|
||||
MTSRBSize int // The number of elements in the MTS sender ringbuffer.
|
||||
MTSRBElementSize int // The element size in bytes of the MTS sender RingBuffer.
|
||||
MTSRBWriteTimeout int // The ringbuffer write timeout in seconds.
|
||||
// Ring buffer parameters.
|
||||
RBMaxElements int // The maximum possible number of elements in ring buffer.
|
||||
RBCapacity int // The total number of bytes available for the ring buffer.
|
||||
RBWriteTimeout int // The ringbuffer write timeout in seconds.
|
||||
|
||||
// Motion filter parameters.
|
||||
MinFPS float64 // The reduced framerate of the video when there is no motion.
|
||||
|
@ -310,51 +300,48 @@ type Config struct {
|
|||
// TypeData contains information about all of the variables that
|
||||
// can be set over the web. It is a psuedo const.
|
||||
var TypeData = map[string]string{
|
||||
"AutoWhiteBalance": "enum:off,auto,sun,cloud,shade,tungsten,fluorescent,incandescent,flash,horizon",
|
||||
"BitDepth": "int",
|
||||
"Brightness": "uint",
|
||||
"BurstPeriod": "uint",
|
||||
"CameraChan": "int",
|
||||
"CBR": "bool",
|
||||
"ClipDuration": "uint",
|
||||
"Exposure": "enum:auto,night,nightpreview,backlight,spotlight,sports,snow,beach,verylong,fixedfps,antishake,fireworks",
|
||||
"Filters": "enums:NoOp,MOG,VariableFPS,KNN",
|
||||
"FrameRate": "uint",
|
||||
"Height": "uint",
|
||||
"HorizontalFlip": "bool",
|
||||
"HTTPAddress": "string",
|
||||
"Input": "enum:raspivid,rtsp,v4l,file",
|
||||
"InputCodec": "enum:H264,MJPEG",
|
||||
"InputPath": "string",
|
||||
"KNNHistory": "uint",
|
||||
"KNNKernel": "float",
|
||||
"KNNMinArea": "float",
|
||||
"KNNThreshold": "float",
|
||||
"logging": "enum:Debug,Info,Warning,Error,Fatal",
|
||||
"MinFPS": "float",
|
||||
"MinFrames": "uint",
|
||||
"MOGHistory": "uint",
|
||||
"MOGMinArea": "float",
|
||||
"MOGThreshold": "float",
|
||||
"MTSRBElementSize": "int",
|
||||
"MTSRBSize": "int",
|
||||
"MTSRBWriteTimeout": "int",
|
||||
"Output": "enum:File,Http,Rtmp,Rtp",
|
||||
"OutputPath": "string",
|
||||
"Outputs": "enums:File,Http,Rtmp,Rtp",
|
||||
"Quantization": "uint",
|
||||
"Rotation": "uint",
|
||||
"RTMPRBElementSize": "int",
|
||||
"RTMPRBSize": "int",
|
||||
"RTMPRBWriteTimeout": "int",
|
||||
"RTMPURL": "string",
|
||||
"RTPAddress": "string",
|
||||
"Saturation": "int",
|
||||
"ShowWindows": "bool",
|
||||
"VBRBitrate": "int",
|
||||
"VBRQuality": "enum:standard,fair,good,great,excellent",
|
||||
"VerticalFlip": "bool",
|
||||
"Width": "uint",
|
||||
"AutoWhiteBalance": "enum:off,auto,sun,cloud,shade,tungsten,fluorescent,incandescent,flash,horizon",
|
||||
"BitDepth": "int",
|
||||
"Brightness": "uint",
|
||||
"BurstPeriod": "uint",
|
||||
"CameraChan": "int",
|
||||
"CBR": "bool",
|
||||
"ClipDuration": "uint",
|
||||
"Exposure": "enum:auto,night,nightpreview,backlight,spotlight,sports,snow,beach,verylong,fixedfps,antishake,fireworks",
|
||||
"Filters": "enums:NoOp,MOG,VariableFPS,KNN",
|
||||
"FrameRate": "uint",
|
||||
"Height": "uint",
|
||||
"HorizontalFlip": "bool",
|
||||
"HTTPAddress": "string",
|
||||
"Input": "enum:raspivid,rtsp,v4l,file",
|
||||
"InputCodec": "enum:H264,MJPEG",
|
||||
"InputPath": "string",
|
||||
"KNNHistory": "uint",
|
||||
"KNNKernel": "float",
|
||||
"KNNMinArea": "float",
|
||||
"KNNThreshold": "float",
|
||||
"logging": "enum:Debug,Info,Warning,Error,Fatal",
|
||||
"MinFPS": "float",
|
||||
"MinFrames": "uint",
|
||||
"MOGHistory": "uint",
|
||||
"MOGMinArea": "float",
|
||||
"MOGThreshold": "float",
|
||||
"RBCapacity": "uint",
|
||||
"RBMaxElements": "uint",
|
||||
"RBWriteTimeout": "uint",
|
||||
"Output": "enum:File,Http,Rtmp,Rtp",
|
||||
"OutputPath": "string",
|
||||
"Outputs": "enums:File,Http,Rtmp,Rtp",
|
||||
"Quantization": "uint",
|
||||
"Rotation": "uint",
|
||||
"RTMPURL": "string",
|
||||
"RTPAddress": "string",
|
||||
"Saturation": "int",
|
||||
"ShowWindows": "bool",
|
||||
"VBRBitrate": "int",
|
||||
"VBRQuality": "enum:standard,fair,good,great,excellent",
|
||||
"VerticalFlip": "bool",
|
||||
"Width": "uint",
|
||||
}
|
||||
|
||||
// Validation errors.
|
||||
|
@ -454,34 +441,19 @@ func (c *Config) Validate() error {
|
|||
c.RTPAddress = defaultRtpAddr
|
||||
}
|
||||
|
||||
if c.RTMPRBSize <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"RTMPRBSize bad or unset, defaulting", "RTMPRBSize", defaultRTMPRBSize)
|
||||
c.RTMPRBSize = defaultRTMPRBSize
|
||||
if c.RBMaxElements <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"RBMaxElements bad or unset, defaulting", "RBMaxElements", defaultRBMaxElements)
|
||||
c.RBMaxElements = defaultRBMaxElements
|
||||
}
|
||||
|
||||
if c.RTMPRBElementSize <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"RTMPRBElementSize bad or unset, defaulting", "RTMPRBElementSize", defaultRTMPRBElementSize)
|
||||
c.RTMPRBElementSize = defaultRTMPRBElementSize
|
||||
if c.RBCapacity <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"RBCapacity bad or unset, defaulting", "RBCapacity", defaultRBCapacity)
|
||||
c.RBCapacity = defaultRBCapacity
|
||||
}
|
||||
|
||||
if c.RTMPRBWriteTimeout <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"RTMPRBWriteTimeout bad or unset, defaulting", "RTMPRBWriteTimeout", defaultRTMPRBWriteTimeout)
|
||||
c.RTMPRBWriteTimeout = defaultRTMPRBWriteTimeout
|
||||
}
|
||||
|
||||
if c.MTSRBSize <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"MTSRBSize bad or unset, defaulting", "MTSRBSize", defaultMTSRBSize)
|
||||
c.MTSRBSize = defaultMTSRBSize
|
||||
}
|
||||
|
||||
if c.MTSRBElementSize <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"MTSRBElementSize bad or unset, defaulting", "MTSRBElementSize", defaultMTSRBElementSize)
|
||||
c.MTSRBElementSize = defaultMTSRBElementSize
|
||||
}
|
||||
|
||||
if c.MTSRBWriteTimeout <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"MTSRBWriteTimeout bad or unset, defaulting", "MTSRBWriteTimeout", defaultMTSRBWriteTimeout)
|
||||
c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout
|
||||
if c.RBWriteTimeout <= 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"RBWriteTimeout bad or unset, defaulting", "RBWriteTimeout", defaultRBWriteTimeout)
|
||||
c.RBWriteTimeout = defaultRBWriteTimeout
|
||||
}
|
||||
|
||||
if c.PSITime <= 0 {
|
||||
|
|
|
@ -54,20 +54,7 @@ import (
|
|||
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||
"bitbucket.org/ausocean/utils/ioext"
|
||||
"bitbucket.org/ausocean/utils/logger"
|
||||
"bitbucket.org/ausocean/utils/ring"
|
||||
)
|
||||
|
||||
// Ring buffer defaults.
|
||||
const (
|
||||
// MTS ring buffer defaults.
|
||||
defaultMTSRBSize = 1000
|
||||
defaultMTSRBElementSize = 100000
|
||||
defaultMTSRBWriteTimeout = 5
|
||||
|
||||
// RTMP ring buffer defaults.
|
||||
defaultRTMPRBSize = 1000
|
||||
defaultRTMPRBElementSize = 300000
|
||||
defaultRTMPRBWriteTimeout = 5
|
||||
"bitbucket.org/ausocean/utils/vring"
|
||||
)
|
||||
|
||||
// RTMP connection properties.
|
||||
|
@ -274,10 +261,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
for _, out := range r.cfg.Outputs {
|
||||
switch out {
|
||||
case config.OutputHTTP:
|
||||
rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
|
||||
}
|
||||
w = newMtsSender(
|
||||
newHttpSender(r.ns, r.cfg.Logger.Log),
|
||||
r.cfg.Logger.Log,
|
||||
ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second),
|
||||
rb,
|
||||
r.cfg.ClipDuration,
|
||||
)
|
||||
mtsSenders = append(mtsSenders, w)
|
||||
|
@ -294,11 +285,15 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
|||
}
|
||||
mtsSenders = append(mtsSenders, w)
|
||||
case config.OutputRTMP:
|
||||
rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not initialise RTMP ring buffer: %w", err)
|
||||
}
|
||||
w, err := newRtmpSender(
|
||||
r.cfg.RTMPURL,
|
||||
rtmpConnectionTimeout,
|
||||
rtmpConnectionMaxTries,
|
||||
ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second),
|
||||
rb,
|
||||
r.cfg.Logger.Log,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -705,48 +700,48 @@ func (r *Revid) Update(vars map[string]string) error {
|
|||
default:
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value)
|
||||
}
|
||||
case "RTMPRBSize":
|
||||
case "RTMPRBMaxElements":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value)
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBMaxElements var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.RTMPRBSize = v
|
||||
case "RTMPRBElementSize":
|
||||
r.cfg.RBMaxElements = v
|
||||
case "RTMPRBCapacity":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value)
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBCapacity var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.RTMPRBElementSize = v
|
||||
r.cfg.RBCapacity = v
|
||||
case "RTMPRBWriteTimeout":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v <= 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBWriteTimeout var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.RTMPRBWriteTimeout = v
|
||||
case "MTSRBSize":
|
||||
r.cfg.RBWriteTimeout = v
|
||||
case "MTSRBMaxElements":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value)
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBMaxElements var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.MTSRBSize = v
|
||||
case "MTSRBElementSize":
|
||||
r.cfg.RBMaxElements = v
|
||||
case "MTSRBCapacity":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v < 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value)
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBCapacity var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.MTSRBElementSize = v
|
||||
r.cfg.RBCapacity = v
|
||||
case "MTSRBWriteTimeout":
|
||||
v, err := strconv.Atoi(value)
|
||||
if err != nil || v <= 0 {
|
||||
r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBWriteTimeout var", "value", value)
|
||||
break
|
||||
}
|
||||
r.cfg.MTSRBWriteTimeout = v
|
||||
r.cfg.RBWriteTimeout = v
|
||||
case "CBR":
|
||||
v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)]
|
||||
if !ok {
|
||||
|
|
|
@ -44,7 +44,7 @@ import (
|
|||
"bitbucket.org/ausocean/av/protocol/rtp"
|
||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||
"bitbucket.org/ausocean/utils/logger"
|
||||
"bitbucket.org/ausocean/utils/ring"
|
||||
"bitbucket.org/ausocean/utils/vring"
|
||||
)
|
||||
|
||||
// Log is used by the multiSender.
|
||||
|
@ -161,7 +161,7 @@ func (s *fileSender) Close() error { return s.file.Close() }
|
|||
type mtsSender struct {
|
||||
dst io.WriteCloser
|
||||
buf []byte
|
||||
ring *ring.Buffer
|
||||
ring *vring.Buffer
|
||||
next []byte
|
||||
pkt packet.Packet
|
||||
repairer *mts.DiscontinuityRepairer
|
||||
|
@ -174,7 +174,7 @@ type mtsSender struct {
|
|||
}
|
||||
|
||||
// newMtsSender returns a new mtsSender.
|
||||
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
|
||||
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *vring.Buffer, clipDur time.Duration) *mtsSender {
|
||||
s := &mtsSender{
|
||||
dst: dst,
|
||||
repairer: mts.NewDiscontinuityRepairer(),
|
||||
|
@ -190,7 +190,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int
|
|||
|
||||
// output starts an mtsSender's data handling routine.
|
||||
func (s *mtsSender) output() {
|
||||
var chunk *ring.Chunk
|
||||
var elem *vring.Element
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
|
@ -198,14 +198,14 @@ func (s *mtsSender) output() {
|
|||
defer s.wg.Done()
|
||||
return
|
||||
default:
|
||||
// If chunk is nil then we're ready to get another from the ringBuffer.
|
||||
if chunk == nil {
|
||||
// If elem is nil then we're ready to get another from the ringBuffer.
|
||||
if elem == nil {
|
||||
var err error
|
||||
chunk, err = s.ring.Next(mtsRBReadTimeout)
|
||||
elem, err = s.ring.Next(mtsRBReadTimeout)
|
||||
switch err {
|
||||
case nil, io.EOF:
|
||||
continue
|
||||
case ring.ErrTimeout:
|
||||
case vring.ErrNextTimeout:
|
||||
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
|
||||
continue
|
||||
default:
|
||||
|
@ -213,20 +213,20 @@ func (s *mtsSender) output() {
|
|||
continue
|
||||
}
|
||||
}
|
||||
err := s.repairer.Repair(chunk.Bytes())
|
||||
err := s.repairer.Repair(elem.Bytes())
|
||||
if err != nil {
|
||||
chunk.Close()
|
||||
chunk = nil
|
||||
elem.Close()
|
||||
elem = nil
|
||||
continue
|
||||
}
|
||||
s.log(logger.Debug, pkg+"mtsSender: writing")
|
||||
_, err = s.dst.Write(chunk.Bytes())
|
||||
_, err = s.dst.Write(elem.Bytes())
|
||||
if err != nil {
|
||||
s.repairer.Failed()
|
||||
continue
|
||||
}
|
||||
chunk.Close()
|
||||
chunk = nil
|
||||
elem.Close()
|
||||
elem = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -273,12 +273,12 @@ type rtmpSender struct {
|
|||
timeout uint
|
||||
retries int
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
ring *ring.Buffer
|
||||
ring *vring.Buffer
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
|
||||
func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
|
||||
var conn *rtmp.Conn
|
||||
var err error
|
||||
for n := 0; n < retries; n++ {
|
||||
|
@ -307,7 +307,7 @@ func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log f
|
|||
|
||||
// output starts an mtsSender's data handling routine.
|
||||
func (s *rtmpSender) output() {
|
||||
var chunk *ring.Chunk
|
||||
var elem *vring.Element
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
|
@ -315,14 +315,14 @@ func (s *rtmpSender) output() {
|
|||
defer s.wg.Done()
|
||||
return
|
||||
default:
|
||||
// If chunk is nil then we're ready to get another from the ring buffer.
|
||||
if chunk == nil {
|
||||
// If elem is nil then we're ready to get another from the ring buffer.
|
||||
if elem == nil {
|
||||
var err error
|
||||
chunk, err = s.ring.Next(rtmpRBReadTimeout)
|
||||
elem, err = s.ring.Next(rtmpRBReadTimeout)
|
||||
switch err {
|
||||
case nil, io.EOF:
|
||||
continue
|
||||
case ring.ErrTimeout:
|
||||
case vring.ErrNextTimeout:
|
||||
s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout")
|
||||
continue
|
||||
default:
|
||||
|
@ -338,7 +338,7 @@ func (s *rtmpSender) output() {
|
|||
continue
|
||||
}
|
||||
}
|
||||
_, err := s.conn.Write(chunk.Bytes())
|
||||
_, err := s.conn.Write(elem.Bytes())
|
||||
switch err {
|
||||
case nil, rtmp.ErrInvalidFlvTag:
|
||||
default:
|
||||
|
@ -349,8 +349,8 @@ func (s *rtmpSender) output() {
|
|||
}
|
||||
continue
|
||||
}
|
||||
chunk.Close()
|
||||
chunk = nil
|
||||
elem.Close()
|
||||
elem = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,13 +39,20 @@ import (
|
|||
"bitbucket.org/ausocean/av/container/mts"
|
||||
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||
"bitbucket.org/ausocean/utils/logger"
|
||||
"bitbucket.org/ausocean/utils/ring"
|
||||
"bitbucket.org/ausocean/utils/vring"
|
||||
)
|
||||
|
||||
var (
|
||||
errSendFailed = errors.New("send failed")
|
||||
)
|
||||
|
||||
// Ring buffer parameters.
|
||||
const (
|
||||
rbMaxElements = 10000
|
||||
rbCapacity = 200000000 // bytes
|
||||
rbWriteTimeout = 5
|
||||
)
|
||||
|
||||
// destination simulates a destination for the mtsSender. It allows for the
|
||||
// emulation of failed and delayed sends.
|
||||
type destination struct {
|
||||
|
@ -134,7 +141,11 @@ func TestMtsSenderSegment(t *testing.T) {
|
|||
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
||||
const numberOfClips = 11
|
||||
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
|
||||
rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("could not initialise ring buffer: %v", err)
|
||||
}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
|
||||
|
||||
const psiSendCount = 10
|
||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||
|
@ -213,7 +224,11 @@ func TestMtsSenderFailedSend(t *testing.T) {
|
|||
// Create destination, the mtsSender and the mtsEncoder
|
||||
const clipToFailAt = 3
|
||||
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
|
||||
rb, err := vring.NewBuffer(rbMaxElements, rbCapacity, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("could not initialise ring buffer: %v", err)
|
||||
}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
|
||||
|
||||
const psiSendCount = 10
|
||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||
|
@ -294,7 +309,11 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
|
|||
// Create destination, the mtsSender and the mtsEncoder.
|
||||
const clipToDelay = 3
|
||||
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
|
||||
rb, err := vring.NewBuffer(1, rbCapacity, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("could not initialise ring buffer: %v", err)
|
||||
}
|
||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, rb, 0)
|
||||
|
||||
const psiSendCount = 10
|
||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||
|
|
Loading…
Reference in New Issue