revid: applying some feedback from last PR

This commit is contained in:
Saxon 2019-04-15 10:25:35 +09:30
parent f59879b51d
commit d75ea20137
3 changed files with 34 additions and 29 deletions

View File

@ -142,7 +142,8 @@ func (r *Revid) Bitrate() int {
} }
// reset swaps the current config of a Revid with the passed // reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid. // configuration; checking validity and returning errors if not valid. It then
// sets up the data pipeline accordinging to this configuration.
func (r *Revid) reset(config Config) error { func (r *Revid) reset(config Config) error {
err := r.setConfig(config) err := r.setConfig(config)
if err != nil { if err != nil {

View File

@ -156,7 +156,7 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct { type mtsSender struct {
dst io.WriteCloser dst io.WriteCloser
buf []byte buf []byte
ringBuf *ring.Buffer rb *ring.Buffer
next []byte next []byte
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
@ -172,7 +172,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
log: log, log: log,
ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), rb: ring.NewBuffer(rbSize, rbElementSize, wTimeout),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)
@ -193,17 +193,15 @@ func (s *mtsSender) output() {
// If chunk is nil then we're ready to get another from the ringBuffer. // If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil { if chunk == nil {
var err error var err error
chunk, err = s.ringBuf.Next(rTimeout) chunk, err = s.rb.Next(rTimeout)
switch err { switch err {
case nil: case nil, io.EOF:
continue continue
case ring.ErrTimeout: case ring.ErrTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
continue continue
default: default:
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
fallthrough
case io.EOF:
continue continue
} }
} }
@ -235,11 +233,11 @@ func (s *mtsSender) Write(d []byte) (int, error) {
copy(s.pkt[:], bytes) copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID() s.curPid = s.pkt.PID()
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
_, err := s.ringBuf.Write(s.buf) _, err := s.rb.Write(s.buf)
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
} }
s.ringBuf.Flush() s.rb.Flush()
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
return len(d), nil return len(d), nil

View File

@ -48,29 +48,34 @@ var (
// destination simulates a destination for the mtsSender. It allows for the // destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends. // emulation of failed and delayed sends.
type destination struct { type destination struct {
buf [][]byte buf [][]byte
testFails bool testFails bool
failAt int failAt int
currentPkt int currentClip int
t *testing.T t *testing.T
sendDelay time.Duration sendDelay time.Duration
delayAt int delayAt int
done chan struct{}
doneAt int
} }
func (ts *destination) Write(d []byte) (int, error) { func (ts *destination) Write(d []byte) (int, error) {
ts.t.Log("writing clip to destination") ts.t.Log("writing clip to destination")
if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { if ts.delayAt != 0 && ts.currentClip == ts.delayAt {
time.Sleep(ts.sendDelay) time.Sleep(ts.sendDelay)
} }
if ts.testFails && ts.currentPkt == ts.failAt { if ts.testFails && ts.currentClip == ts.failAt {
ts.t.Log("failed send") ts.t.Log("failed send")
ts.currentPkt++ ts.currentClip++
return 0, errSendFailed return 0, errSendFailed
} }
cpy := make([]byte, len(d)) cpy := make([]byte, len(d))
copy(cpy, d) copy(cpy, d)
ts.buf = append(ts.buf, cpy) ts.buf = append(ts.buf, cpy)
ts.currentPkt++ if ts.currentClip == ts.doneAt {
close(ts.done)
}
ts.currentClip++
return len(d), nil return len(d), nil
} }
@ -110,7 +115,8 @@ func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
tstDst := &destination{t: t} const numberOfClips = 11
tstDst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
@ -126,8 +132,8 @@ func TestMtsSenderSegment(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give the mtsSender some time to finish up and then Close it. // Wait until the destination has all the data, then close the sender.
time.Sleep(10 * time.Millisecond) <-tstDst.done
sender.Close() sender.Close()
// Check the data. // Check the data.
@ -188,7 +194,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout) sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
@ -204,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give the mtsSender some time to finish up and then Close it. // Wait until the destination has all the data, then close the sender.
time.Sleep(10 * time.Millisecond) <-tstDst.done
sender.Close() sender.Close()
// Check that we have data as expected. // Check that we have data as expected.
@ -268,7 +274,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout) sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
@ -283,8 +289,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give mtsSender time to finish up then Close. // Wait until the destination has all the data, then close the sender.
time.Sleep(100 * time.Millisecond) <-tstDst.done
sender.Close() sender.Close()
// Check the data. // Check the data.