revid: addressing PR feedback

This commit is contained in:
Saxon 2019-04-18 18:31:49 +09:30
parent 74c995d452
commit d76b60a515
5 changed files with 33 additions and 26 deletions

View File

@ -26,6 +26,7 @@ package mts
import (
"bytes"
"io"
"io/ioutil"
"testing"
@ -35,25 +36,21 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta"
)
type buffer bytes.Buffer
type nopCloser struct{ io.Writer }
func (b *buffer) Write(d []byte) (int, error) {
return (*bytes.Buffer)(b).Write(d)
}
func (b *buffer) Close() error { return nil }
func (nopCloser) Close() error { return nil }
// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data.
// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm.
func TestEncodePcm(t *testing.T) {
Meta = meta.New()
var buf buffer
var buf bytes.Buffer
sampleRate := 48000
sampleSize := 2
blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e := NewEncoder(&buf, writeFreq, Audio)
e := NewEncoder(&nopCloser{&buf}, writeFreq, Audio)
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath)

View File

@ -47,8 +47,8 @@ const fps = 25
// write this to psi.
func TestMetaEncode1(t *testing.T) {
Meta = meta.New()
var buf buffer
e := NewEncoder(&buf, fps, Video)
var buf bytes.Buffer
e := NewEncoder(&nopCloser{&buf}, fps, Video)
Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
@ -75,8 +75,8 @@ func TestMetaEncode1(t *testing.T) {
// into psi.
func TestMetaEncode2(t *testing.T) {
Meta = meta.New()
var buf buffer
e := NewEncoder(&buf, fps, Video)
var buf bytes.Buffer
e := NewEncoder(&nopCloser{&buf}, fps, Video)
Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil {

View File

@ -52,8 +52,6 @@ import (
const (
rbSize = 1000
rbElementSize = 100000
wTimeout = 0 * time.Second
rTimeout = 0 * time.Second
)
// RTMP connection properties.
@ -120,6 +118,9 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
}
// Config returns a copy of revids current config.
//
// This is not intended, nor is it safe, to be used concurrently with any other
// exported functionalilty from this file.
func (r *Revid) Config() Config {
return r.config
}
@ -207,7 +208,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int)
for _, out := range r.config.Outputs {
switch out {
case Http:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, wTimeout)
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0)
mtsSenders = append(mtsSenders, w)
case Rtp:
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
@ -275,6 +276,9 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int)
// Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
//
// This is not intended, nor is it safe, to be used concurrently with any other
// exported functionalilty from this file.
func (r *Revid) Start() error {
if r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
@ -291,6 +295,9 @@ func (r *Revid) Start() error {
// Stop closes down the pipeline. This closes encoders and sender output routines,
// connections, and/or files.
//
// This is not intended, nor is it safe, to be used concurrently with any other
// exported functionalilty from this file.
func (r *Revid) Stop() {
if !r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
@ -313,6 +320,9 @@ func (r *Revid) Stop() {
// Update takes a map of variables and their values and edits the current config
// if the variables are recognised as valid parameters.
//
// This is not intended, nor is it safe, to be used concurrently with any other
// exported functionalilty from this file.
func (r *Revid) Update(vars map[string]string) error {
if r.isRunning {
r.Stop()

View File

@ -192,7 +192,7 @@ func (s *mtsSender) output() {
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.ring.Next(rTimeout)
chunk, err = s.ring.Next(0)
switch err {
case nil, io.EOF:
continue
@ -301,7 +301,7 @@ func (s *rtmpSender) output() {
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.ring.Next(rTimeout)
chunk, err = s.ring.Next(0)
switch err {
case nil, io.EOF:
continue

View File

@ -133,7 +133,7 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder.
@ -210,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
@ -227,11 +227,11 @@ func TestMtsSenderFailedSend(t *testing.T) {
}
// Wait until the destination has all the data, then close the sender.
<-tstDst.done
<-dst.done
sender.Close()
// Check that we have data as expected.
result := tstDst.buf
result := dst.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
@ -290,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout)
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder.
@ -306,11 +306,11 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
}
// Wait until the destination has all the data, then close the sender.
<-tstDst.done
<-dst.done
sender.Close()
// Check the data.
result := tstDst.buf
result := dst.buf
expectedCC := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)