mirror of https://bitbucket.org/ausocean/av.git
revid/senders.go: newMtsSender=>newMTSSender and newHttpSender=>newHTTPSender
This commit is contained in:
parent
18836f521b
commit
48ecea7123
|
@ -272,8 +272,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
||||||
case config.OutputHTTP:
|
case config.OutputHTTP:
|
||||||
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
|
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
|
||||||
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
|
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
|
||||||
hs := newHttpSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
|
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
|
||||||
w = newMtsSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
|
w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, w)
|
||||||
|
|
||||||
case config.OutputRTP:
|
case config.OutputRTP:
|
||||||
|
|
|
@ -59,7 +59,7 @@ const (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
adjustedRTMPRBElementSize int
|
adjustedRTMPRBElementSize int
|
||||||
adjustedMTSRBElementSize int
|
adjustedMTSRBElementSize int
|
||||||
)
|
)
|
||||||
|
|
||||||
// httpSender provides an implemntation of io.Writer to perform sends to a http
|
// httpSender provides an implemntation of io.Writer to perform sends to a http
|
||||||
|
@ -71,7 +71,7 @@ type httpSender struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHttpSender returns a pointer to a new httpSender.
|
// newHttpSender returns a pointer to a new httpSender.
|
||||||
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender {
|
func newHTTPSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender {
|
||||||
return &httpSender{
|
return &httpSender{
|
||||||
client: ns,
|
client: ns,
|
||||||
log: log,
|
log: log,
|
||||||
|
@ -192,7 +192,7 @@ type mtsSender struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMtsSender returns a new mtsSender.
|
// newMtsSender returns a new mtsSender.
|
||||||
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
|
func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
|
||||||
s := &mtsSender{
|
s := &mtsSender{
|
||||||
dst: dst,
|
dst: dst,
|
||||||
repairer: mts.NewDiscontinuityRepairer(),
|
repairer: mts.NewDiscontinuityRepairer(),
|
||||||
|
@ -275,10 +275,10 @@ func (s *mtsSender) Write(d []byte) (int, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf))
|
s.log(logger.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf))
|
||||||
if err == ring.ErrTooLong {
|
if err == ring.ErrTooLong {
|
||||||
adjustedMTSRBElementSize = len(d)*2
|
adjustedMTSRBElementSize = len(d) * 2
|
||||||
numElements := maxBuffLen/adjustedMTSRBElementSize
|
numElements := maxBuffLen / adjustedMTSRBElementSize
|
||||||
s.ring = ring.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second)
|
s.ring = ring.NewBuffer(maxBuffLen/adjustedMTSRBElementSize, adjustedMTSRBElementSize, 5*time.Second)
|
||||||
s.log(logger.Info,"adjusted MTS ring buffer element size","new size",adjustedMTSRBElementSize,"num elements",numElements,"size(MB)",numElements*adjustedMTSRBElementSize)
|
s.log(logger.Info, "adjusted MTS ring buffer element size", "new size", adjustedMTSRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSRBElementSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.buf = s.buf[:0]
|
s.buf = s.buf[:0]
|
||||||
|
@ -397,10 +397,10 @@ func (s *rtmpSender) Write(d []byte) (int, error) {
|
||||||
} else {
|
} else {
|
||||||
s.log(logger.Warning, "ring buffer write error", "error", err.Error())
|
s.log(logger.Warning, "ring buffer write error", "error", err.Error())
|
||||||
if err == ring.ErrTooLong {
|
if err == ring.ErrTooLong {
|
||||||
adjustedRTMPRBElementSize = len(d)*2
|
adjustedRTMPRBElementSize = len(d) * 2
|
||||||
numElements := maxBuffLen/adjustedRTMPRBElementSize
|
numElements := maxBuffLen / adjustedRTMPRBElementSize
|
||||||
s.ring = ring.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second)
|
s.ring = ring.NewBuffer(numElements, adjustedRTMPRBElementSize, 5*time.Second)
|
||||||
s.log(logger.Info,"adjusted RTMP ring buffer element size","new size",adjustedRTMPRBElementSize,"num elements",numElements,"size(MB)",numElements*adjustedRTMPRBElementSize)
|
s.log(logger.Info, "adjusted RTMP ring buffer element size", "new size", adjustedRTMPRBElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPRBElementSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.report(len(d))
|
s.report(len(d))
|
||||||
|
|
|
@ -128,7 +128,7 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) {
|
||||||
|
|
||||||
// TestSegment ensures that the mtsSender correctly segments data into clips
|
// TestSegment ensures that the mtsSender correctly segments data into clips
|
||||||
// based on positioning of PSI in the mtsEncoder's output stream.
|
// based on positioning of PSI in the mtsEncoder's output stream.
|
||||||
func TestMtsSenderSegment(t *testing.T) {
|
func TestMTSSenderSegment(t *testing.T) {
|
||||||
mts.Meta = meta.New()
|
mts.Meta = meta.New()
|
||||||
|
|
||||||
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
||||||
|
@ -136,7 +136,7 @@ func TestMtsSenderSegment(t *testing.T) {
|
||||||
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
|
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
|
||||||
const testRBCapacity = 50000000
|
const testRBCapacity = 50000000
|
||||||
nElements := testRBCapacity / rbStartingElementSize
|
nElements := testRBCapacity / rbStartingElementSize
|
||||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
|
sender := newMTSSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
|
||||||
|
|
||||||
const psiSendCount = 10
|
const psiSendCount = 10
|
||||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||||
|
@ -217,7 +217,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
|
||||||
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
|
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
|
||||||
const testRBCapacity = 50000000 // 50MB
|
const testRBCapacity = 50000000 // 50MB
|
||||||
nElements := testRBCapacity / rbStartingElementSize
|
nElements := testRBCapacity / rbStartingElementSize
|
||||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
|
sender := newMTSSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
|
||||||
|
|
||||||
const psiSendCount = 10
|
const psiSendCount = 10
|
||||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||||
|
@ -298,7 +298,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
|
||||||
// Create destination, the mtsSender and the mtsEncoder.
|
// Create destination, the mtsSender and the mtsEncoder.
|
||||||
const clipToDelay = 3
|
const clipToDelay = 3
|
||||||
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
|
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
|
||||||
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, rbStartingElementSize, 0), 0)
|
sender := newMTSSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, rbStartingElementSize, 0), 0)
|
||||||
|
|
||||||
const psiSendCount = 10
|
const psiSendCount = 10
|
||||||
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
|
||||||
|
|
Loading…
Reference in New Issue