Merged in remove-early-ringbuffer (pull request #184)

revid: removed main ringBuffer

Approved-by: Alan Noble <anoble@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2019-04-27 12:24:08 +00:00
commit b4ff40e269
10 changed files with 296 additions and 278 deletions

View File

@ -282,7 +282,7 @@ func run(cfg revid.Config) {
for { for {
err = ns.Run() err = ns.Run()
if err != nil { if err != nil {
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) log.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }

View File

@ -55,8 +55,7 @@ var (
// Encoder provides properties required for the generation of flv video // Encoder provides properties required for the generation of flv video
// from raw video data // from raw video data
type Encoder struct { type Encoder struct {
dst io.Writer dst io.WriteCloser
fps int fps int
audio bool audio bool
video bool video bool
@ -64,7 +63,7 @@ type Encoder struct {
} }
// NewEncoder retuns a new FLV encoder. // NewEncoder retuns a new FLV encoder.
func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) {
e := Encoder{ e := Encoder{
dst: dst, dst: dst,
fps: fps, fps: fps,
@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) {
return len(frame), nil return len(frame), nil
} }
// Close will close the encoder destination.
func (e *Encoder) Close() error {
return e.dst.Close()
}

View File

@ -26,6 +26,7 @@ package mts
import ( import (
"bytes" "bytes"
"io"
"io/ioutil" "io/ioutil"
"testing" "testing"
@ -35,6 +36,10 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
) )
type nopCloser struct{ io.Writer }
func (nopCloser) Close() error { return nil }
// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data.
// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm.
func TestEncodePcm(t *testing.T) { func TestEncodePcm(t *testing.T) {
@ -45,7 +50,7 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2 sampleSize := 2
blockSize := 16000 blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e := NewEncoder(&buf, writeFreq, Audio) e := NewEncoder(nopCloser{&buf}, writeFreq, Audio)
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath) inPcm, err := ioutil.ReadFile(inPath)

View File

@ -127,7 +127,7 @@ const (
// Encoder encapsulates properties of an mpegts generator. // Encoder encapsulates properties of an mpegts generator.
type Encoder struct { type Encoder struct {
dst io.Writer dst io.WriteCloser
clock time.Duration clock time.Duration
lastTime time.Time lastTime time.Time
@ -149,7 +149,7 @@ type Encoder struct {
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
// calls write for every frame, the rate will be the frame rate of the video. // calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
var mPid int var mPid int
var sid byte var sid byte
switch mediaType { switch mediaType {
@ -327,3 +327,7 @@ func updateMeta(b []byte) ([]byte, error) {
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
return []byte(p), err return []byte(p), err
} }
func (e *Encoder) Close() error {
return e.dst.Close()
}

View File

@ -47,9 +47,8 @@ const fps = 25
// write this to psi. // write this to psi.
func TestMetaEncode1(t *testing.T) { func TestMetaEncode1(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var b []byte var buf bytes.Buffer
buf := bytes.NewBuffer(b) e := NewEncoder(nopCloser{&buf}, fps, Video)
e := NewEncoder(buf, fps, Video)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error()) t.Errorf(errUnexpectedErr, err.Error())
@ -76,9 +75,8 @@ func TestMetaEncode1(t *testing.T) {
// into psi. // into psi.
func TestMetaEncode2(t *testing.T) { func TestMetaEncode2(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var b []byte var buf bytes.Buffer
buf := bytes.NewBuffer(b) e := NewEncoder(nopCloser{&buf}, fps, Video)
e := NewEncoder(buf, fps, Video)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {

View File

@ -222,6 +222,8 @@ func (rs *rtmpSender) Write(p []byte) (int, error) {
return n, nil return n, nil
} }
func (rs *rtmpSender) Close() error { return nil }
// TestFromFile tests streaming from an video file comprising raw H.264. // TestFromFile tests streaming from an video file comprising raw H.264.
// The test file is supplied via the RTMP_TEST_FILE environment variable. // The test file is supplied via the RTMP_TEST_FILE environment variable.
func TestFromFile(t *testing.T) { func TestFromFile(t *testing.T) {

View File

@ -8,6 +8,7 @@ DESCRIPTION
AUTHORS AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org> Alan Noble <alan@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
@ -20,13 +21,12 @@ LICENSE
It is distributed in the hope that it will be useful, but WITHOUT It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details. for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
package revid package revid
import ( import (
@ -46,15 +46,12 @@ import (
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
// Ring buffer sizes and read/write timeouts. // mtsSender ringBuffer sizes.
const ( const (
ringBufferSize = 1000 rbSize = 1000
ringBufferElementSize = 100000 rbElementSize = 100000
writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond
) )
// RTMP connection properties. // RTMP connection properties.
@ -63,17 +60,6 @@ const (
rtmpConnectionTimeout = 10 rtmpConnectionTimeout = 10
) )
// Duration of video for each clip sent out.
const clipDuration = 1 * time.Second
// Time duration between bitrate checks.
const bitrateTime = 1 * time.Minute
// After a send fail, this is the delay before another send.
const sendFailedDelay = 5 * time.Millisecond
const ffmpegPath = "/usr/local/bin/ffmpeg"
const pkg = "revid:" const pkg = "revid:"
type Logger interface { type Logger interface {
@ -89,6 +75,7 @@ type Revid struct {
// FIXME(kortschak): The relationship of concerns // FIXME(kortschak): The relationship of concerns
// in config/ns is weird. // in config/ns is weird.
config Config config Config
// ns holds the netsender.Sender responsible for HTTP. // ns holds the netsender.Sender responsible for HTTP.
ns *netsender.Sender ns *netsender.Sender
@ -105,40 +92,19 @@ type Revid struct {
// lexTo, encoder and packer handle transcoding the input stream. // lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
// buffer handles passing frames from the transcoder // encoders will hold the multiWriteCloser that writes to encoders from the lexer.
// to the target destination. encoders io.WriteCloser
buffer *buffer
// encoder holds the required encoders, which then write to destinations. // isRunning is used to keep track of revid's running state between methods.
encoder []io.Writer
// writeClosers holds the senders that the encoders will write to.
writeClosers []io.WriteCloser
// bitrate hold the last send bitrate calculation result.
bitrate int
mu sync.Mutex
isRunning bool isRunning bool
// wg will be used to wait for any processing routines to finish.
wg sync.WaitGroup wg sync.WaitGroup
// err will channel errors from revid routines to the handle errors routine.
err chan error err chan error
} }
// buffer is a wrapper for a ring.Buffer and provides function to write and
// flush in one Write call.
type buffer ring.Buffer
// Write implements the io.Writer interface. It will write to the underlying
// ring.Buffer and then flush to indicate a complete ring.Buffer write.
func (b *buffer) Write(d []byte) (int, error) {
r := (*ring.Buffer)(b)
n, err := r.Write(d)
r.Flush()
return n, err
}
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful. // an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
@ -151,6 +117,13 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
return &r, nil return &r, nil
} }
// Config returns a copy of revids current config.
//
// Config is not safe for concurrent use.
func (r *Revid) Config() Config {
return r.config
}
// TODO(Saxon): put more thought into error severity. // TODO(Saxon): put more thought into error severity.
func (r *Revid) handleErrors() { func (r *Revid) handleErrors() {
for { for {
@ -167,10 +140,41 @@ func (r *Revid) handleErrors() {
} }
// Bitrate returns the result of the most recent bitrate check. // Bitrate returns the result of the most recent bitrate check.
//
// TODO: get this working again.
func (r *Revid) Bitrate() int { func (r *Revid) Bitrate() int {
return r.bitrate return -1
} }
// reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid. It then
// sets up the data pipeline accordingly to this configuration.
func (r *Revid) reset(config Config) error {
err := r.setConfig(config)
if err != nil {
return err
}
err = r.setupPipeline(
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
e := mts.NewEncoder(dst, float64(fps), mts.Video)
return e, nil
},
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
return flv.NewEncoder(dst, true, true, fps)
},
ioext.MultiWriteCloser,
)
if err != nil {
return err
}
return nil
}
// setConfig takes a config, checks it's validity and then replaces the current
// revid config.
func (r *Revid) setConfig(config Config) error { func (r *Revid) setConfig(config Config) error {
r.config.Logger = config.Logger r.config.Logger = config.Logger
err := config.Validate(r) err := config.Validate(r)
@ -187,10 +191,10 @@ func (r *Revid) setConfig(config Config) error {
// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder
// respectively. multiWriter will be used to create an ioext.multiWriteCloser // respectively. multiWriter will be used to create an ioext.multiWriteCloser
// so that encoders can write to multiple senders. // so that encoders can write to multiple senders.
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) // encoders will hold the encoders that are required for revid's current
// configuration.
r.encoder = r.encoder[:0] var encoders []io.WriteCloser
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
// will hold senders that require FLV encoding. // will hold senders that require FLV encoding.
@ -203,7 +207,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case Http: case Http:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0)
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case Rtp: case Rtp:
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
@ -232,7 +236,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
if len(mtsSenders) != 0 { if len(mtsSenders) != 0 {
mw := multiWriter(mtsSenders...) mw := multiWriter(mtsSenders...)
e, _ := mtsEnc(mw, int(r.config.FrameRate)) e, _ := mtsEnc(mw, int(r.config.FrameRate))
r.encoder = append(r.encoder, e) encoders = append(encoders, e)
} }
// If we have some senders that require FLV encoding then add an FLV // If we have some senders that require FLV encoding then add an FLV
@ -244,9 +248,11 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
if err != nil { if err != nil {
return err return err
} }
r.encoder = append(r.encoder, e) encoders = append(encoders, e)
} }
r.encoders = multiWriter(encoders...)
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.setupInput = r.startRaspivid r.setupInput = r.startRaspivid
@ -267,72 +273,17 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
return nil return nil
} }
func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) {
e := mts.NewEncoder(dst, float64(fps), mts.Video)
return e, nil
}
func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) {
e, err := flv.NewEncoder(dst, true, true, fps)
if err != nil {
return nil, err
}
return e, nil
}
// reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid.
func (r *Revid) reset(config Config) error {
err := r.setConfig(config)
if err != nil {
return err
}
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser)
if err != nil {
return err
}
return nil
}
// IsRunning returns true if revid is running.
func (r *Revid) IsRunning() bool {
r.mu.Lock()
ret := r.isRunning
r.mu.Unlock()
return ret
}
func (r *Revid) Config() Config {
r.mu.Lock()
cfg := r.config
r.mu.Unlock()
return cfg
}
// setIsRunning sets r.isRunning using b.
func (r *Revid) setIsRunning(b bool) {
r.mu.Lock()
r.isRunning = b
r.mu.Unlock()
}
// Start invokes a Revid to start processing video from a defined input // Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
//
// Start is not safe for concurrent use.
func (r *Revid) Start() error { func (r *Revid) Start() error {
if r.IsRunning() { if r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
return nil return nil
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here r.isRunning = true
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.setIsRunning(true)
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
r.wg.Add(1)
go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput() err := r.setupInput()
if err != nil { if err != nil {
r.Stop() r.Stop()
@ -340,33 +291,39 @@ func (r *Revid) Start() error {
return err return err
} }
// Stop halts any processing of video data from a camera or file // Stop closes down the pipeline. This closes encoders and sender output routines,
// connections, and/or files.
//
// Stop is not safe for concurrent use.
func (r *Revid) Stop() { func (r *Revid) Stop() {
if !r.IsRunning() { if !r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
return return
} }
for _, w := range r.writeClosers { r.config.Logger.Log(logger.Info, pkg+"closing pipeline")
err := w.Close() err := r.encoders.Close()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
}
} }
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.setIsRunning(false)
r.wg.Wait() r.wg.Wait()
r.isRunning = false
} }
// Update takes a map of variables and their values and edits the current config
// if the variables are recognised as valid parameters.
//
// Update is not safe for concurrent use.
func (r *Revid) Update(vars map[string]string) error { func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() { if r.isRunning {
r.Stop() r.Stop()
} }
//look through the vars and update revid where needed //look through the vars and update revid where needed
for key, value := range vars { for key, value := range vars {
switch key { switch key {
@ -497,55 +454,6 @@ func (r *Revid) Update(vars map[string]string) error {
return r.reset(r.config) return r.reset(r.config)
} }
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config
func (r *Revid) outputClips() {
defer r.wg.Done()
lastTime := time.Now()
var count int
loop:
for r.IsRunning() {
// If the ring buffer has something we can read and send off
chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout)
switch err {
case nil:
// Do nothing.
case ring.ErrTimeout:
r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout")
continue
default:
r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
fallthrough
case io.EOF:
break loop
}
// Loop over encoders and hand bytes over to each one.
for _, e := range r.encoder {
_, err := chunk.WriteTo(e)
if err != nil {
r.err <- err
}
}
// Release the chunk back to the ring buffer.
chunk.Close()
// FIXME(saxon): this doesn't work anymore.
now := time.Now()
deltaTime := now.Sub(lastTime)
if deltaTime > bitrateTime {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second))
r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len())
lastTime = now
count = 0
}
}
r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore")
}
// startRaspivid sets up things for input from raspivid i.e. starts // startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output. // a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error { func (r *Revid) startRaspivid() error {
@ -670,7 +578,7 @@ func (r *Revid) setupInputForFile() error {
func (r *Revid) processFrom(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data") r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.buffer, read, delay) r.err <- r.lexTo(r.encoders, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.wg.Done() r.wg.Done()
} }

View File

@ -1,3 +1,31 @@
/*
NAME
revid_test.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean).
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid package revid
import ( import (
@ -47,11 +75,8 @@ func TestRaspivid(t *testing.T) {
// testLogger implements a netsender.Logger. // testLogger implements a netsender.Logger.
type testLogger struct{} type testLogger struct{}
// SetLevel normally sets the logging level, but it is a no-op in our case. func (tl *testLogger) SetLevel(level int8) {}
func (tl *testLogger) SetLevel(level int8) {
}
// Log requests the Logger to write a message at the given level.
func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
if level < -1 || level > 5 { if level < -1 || level > 5 {
@ -71,44 +96,35 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
// tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field.
// This will allow access to the dst to check that it has been set corrctly. // This will allow access to the dst to check that it has been set corrctly.
type tstMtsEncoder struct { type tstMtsEncoder struct {
dst io.Writer // dst is here solely to detect the type stored in the encoder.
// No data is written to dst.
dst io.WriteCloser
} }
// newTstMtsEncoder returns a pointer to a newTsMtsEncoder. func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil }
func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { func (e *tstMtsEncoder) Close() error { return nil }
return &tstMtsEncoder{dst: dst}, nil
}
func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil }
// tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field.
// This will allow access to the dst to check that it has been set corrctly. // This will allow access to the dst to check that it has been set corrctly.
type tstFlvEncoder struct { type tstFlvEncoder struct {
dst io.Writer // dst is here solely to detect the type stored in the encoder.
} // No data is written to dst.
dst io.WriteCloser
// newTstFlvEncoder returns a pointer to a new tstFlvEncoder.
func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) {
return &tstFlvEncoder{dst: dst}, nil
} }
func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil }
func (e *tstFlvEncoder) Close() error { return nil }
// dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we
// can access the destinations. // can access the destinations.
type dummyMultiWriter struct { type dummyMultiWriter struct {
// dst is here solely to detect the types stored in the multiWriter.
// No data is written to dst.
dst []io.WriteCloser dst []io.WriteCloser
} }
func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser {
return &dummyMultiWriter{
dst: dst,
}
}
func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil }
func (w *dummyMultiWriter) Close() error { return nil }
func (w *dummyMultiWriter) Close() error { return nil }
// TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the
// revid.encoder slice and the senders the encoders write to. // revid.encoder slice and the senders the encoders write to.
@ -216,20 +232,30 @@ func TestResetEncoderSenderSetup(t *testing.T) {
} }
// This logic is what we want to check. // This logic is what we want to check.
err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) err = rv.setupPipeline(
func(dst io.WriteCloser, rate int) (io.WriteCloser, error) {
return &tstMtsEncoder{dst: dst}, nil
},
func(dst io.WriteCloser, rate int) (io.WriteCloser, error) {
return &tstFlvEncoder{dst: dst}, nil
},
func(writers ...io.WriteCloser) io.WriteCloser {
return &dummyMultiWriter{dst: writers}
},
)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v for test %v", err, testNum) t.Fatalf("unexpected error: %v for test %v", err, testNum)
} }
// First check that we have the correct number of encoders. // First check that we have the correct number of encoders.
got := len(rv.encoder) got := len(rv.encoders.(*dummyMultiWriter).dst)
want := len(test.encoders) want := len(test.encoders)
if got != want { if got != want {
t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want)
} }
// Now check the correctness of encoders and their destinations. // Now check the correctness of encoders and their destinations.
for _, e := range rv.encoder { for _, e := range rv.encoders.(*dummyMultiWriter).dst {
// Get e's type. // Get e's type.
encoderType := fmt.Sprintf("%T", e) encoderType := fmt.Sprintf("%T", e)
@ -245,7 +271,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
} }
// Now check that this encoder has correct number of destinations (senders). // Now check that this encoder has correct number of destinations (senders).
var ms io.Writer var ms io.WriteCloser
switch encoderType { switch encoderType {
case mtsEncoderStr: case mtsEncoderStr:
ms = e.(*tstMtsEncoder).dst ms = e.(*tstMtsEncoder).dst

View File

@ -29,7 +29,6 @@ LICENSE
package revid package revid
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -119,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
// Extract location from reply // Extract location from reply
g, err := dec.String("ll") g, err := dec.String("ll")
if err != nil { if err != nil {
log(logger.Warning, pkg+"No location in reply") log(logger.Debug, pkg+"No location in reply")
} else { } else {
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g) mts.Meta.Add("loc", g)
@ -156,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct { type mtsSender struct {
dst io.WriteCloser dst io.WriteCloser
buf []byte buf []byte
ringBuf *ring.Buffer ring *ring.Buffer
next []byte next []byte
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
curPid int curPid int
quit chan struct{} done chan struct{}
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup wg sync.WaitGroup
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender {
s := &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
log: log, log: log,
ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout),
quit: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)
go s.output() go s.output()
@ -185,25 +184,23 @@ func (s *mtsSender) output() {
var chunk *ring.Chunk var chunk *ring.Chunk
for { for {
select { select {
case <-s.quit: case <-s.done:
s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine")
defer s.wg.Done() defer s.wg.Done()
return return
default: default:
// If chunk is nil then we're ready to get another from the ringBuffer. // If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil { if chunk == nil {
var err error var err error
chunk, err = s.ringBuf.Next(readTimeout) chunk, err = s.ring.Next(0)
switch err { switch err {
case nil: case nil, io.EOF:
continue continue
case ring.ErrTimeout: case ring.ErrTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
continue continue
default: default:
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
fallthrough
case io.EOF:
continue continue
} }
} }
@ -235,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) {
copy(s.pkt[:], bytes) copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID() s.curPid = s.pkt.PID()
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
_, err := s.ringBuf.Write(s.buf) _, err := s.ring.Write(s.buf)
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
} }
s.ringBuf.Flush() s.ring.Flush()
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
return len(d), nil return len(d), nil
@ -247,21 +244,21 @@ func (s *mtsSender) Write(d []byte) (int, error) {
// Close implements io.Closer. // Close implements io.Closer.
func (s *mtsSender) Close() error { func (s *mtsSender) Close() error {
close(s.quit) close(s.done)
s.wg.Wait() s.wg.Wait()
return nil return nil
} }
// rtmpSender implements loadSender for a native RTMP destination. // rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct { type rtmpSender struct {
conn *rtmp.Conn conn *rtmp.Conn
url string url string
timeout uint timeout uint
retries int retries int
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
ring *ring.Buffer
data []byte done chan struct{}
wg sync.WaitGroup
} }
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
@ -283,24 +280,76 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
ring: ring.NewBuffer(10, rbElementSize, 0),
done: make(chan struct{}),
} }
s.wg.Add(1)
go s.output()
return s, err return s, err
} }
// output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() {
var chunk *ring.Chunk
for {
select {
case <-s.done:
s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ring buffer.
if chunk == nil {
var err error
chunk, err = s.ring.Next(0)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error())
continue
}
}
if s.conn == nil {
s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...")
err := s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
continue
}
}
_, err := s.conn.Write(chunk.Bytes())
switch err {
case nil, rtmp.ErrInvalidFlvTag:
default:
s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error())
err = s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
}
continue
}
chunk.Close()
chunk = nil
}
}
}
// Write implements io.Writer. // Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) { func (s *rtmpSender) Write(d []byte) (int, error) {
if s.conn == nil { _, err := s.ring.Write(d)
return 0, errors.New("no rtmp connection, cannot write")
}
_, err := s.conn.Write(d)
if err != nil { if err != nil {
err = s.restart() s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error())
} }
return len(d), err s.ring.Flush()
return len(d), nil
} }
func (s *rtmpSender) restart() error { func (s *rtmpSender) restart() error {
s.Close() s.close()
var err error var err error
for n := 0; n < s.retries; n++ { for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
@ -316,6 +365,14 @@ func (s *rtmpSender) restart() error {
} }
func (s *rtmpSender) Close() error { func (s *rtmpSender) Close() error {
if s.done != nil {
close(s.done)
}
s.wg.Wait()
return s.close()
}
func (s *rtmpSender) close() error {
if s.conn == nil { if s.conn == nil {
return nil return nil
} }

View File

@ -41,14 +41,6 @@ import (
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
var ( var (
errSendFailed = errors.New("send failed") errSendFailed = errors.New("send failed")
) )
@ -56,29 +48,50 @@ var (
// destination simulates a destination for the mtsSender. It allows for the // destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends. // emulation of failed and delayed sends.
type destination struct { type destination struct {
buf [][]byte // Holds the clips written to this destination using Write.
testFails bool buf [][]byte
failAt int
currentPkt int // testFails is set to true if we would like a write to fail at a particular
t *testing.T // clip as determined by failAt.
sendDelay time.Duration testFails bool
delayAt int failAt int
// Holds the current clip number.
currentClip int
// Pointer to the testing.T of a test where this struct is being used. This
// is used so that logging can be done through the testing log utilities.
t *testing.T
// sendDelay is the amount of time we would like a Write to be delayed when
// we hit the clip number indicated by delayAt.
sendDelay time.Duration
delayAt int
// done will be used to send a signal to the main routine to indicate that
// the destination has received all clips. doneAt indicates the final clip
// number.
done chan struct{}
doneAt int
} }
func (ts *destination) Write(d []byte) (int, error) { func (ts *destination) Write(d []byte) (int, error) {
ts.t.Log("writing clip to destination") ts.t.Log("writing clip to destination")
if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { if ts.delayAt != 0 && ts.currentClip == ts.delayAt {
time.Sleep(ts.sendDelay) time.Sleep(ts.sendDelay)
} }
if ts.testFails && ts.currentPkt == ts.failAt { if ts.testFails && ts.currentClip == ts.failAt {
ts.t.Log("failed send") ts.t.Log("failed send")
ts.currentPkt++ ts.currentClip++
return 0, errSendFailed return 0, errSendFailed
} }
cpy := make([]byte, len(d)) cpy := make([]byte, len(d))
copy(cpy, d) copy(cpy, d)
ts.buf = append(ts.buf, cpy) ts.buf = append(ts.buf, cpy)
ts.currentPkt++ if ts.currentClip == ts.doneAt {
close(ts.done)
}
ts.currentClip++
return len(d), nil return len(d), nil
} }
@ -118,8 +131,9 @@ func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
tstDst := &destination{t: t} const numberOfClips = 11
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
@ -134,12 +148,12 @@ func TestMtsSenderSegment(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give the mtsSender some time to finish up and then Close it. // Wait until the destination has all the data, then close the sender.
time.Sleep(10 * time.Millisecond) <-dst.done
sender.Close() sender.Close()
// Check the data. // Check the data.
result := tstDst.buf result := dst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)
@ -196,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder and send PSI every 10 packets. // Turn time based PSI writing off for encoder and send PSI every 10 packets.
@ -212,12 +226,12 @@ func TestMtsSenderFailedSend(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give the mtsSender some time to finish up and then Close it. // Wait until the destination has all the data, then close the sender.
time.Sleep(10 * time.Millisecond) <-dst.done
sender.Close() sender.Close()
// Check that we have data as expected. // Check that we have data as expected.
result := tstDst.buf result := dst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)
@ -276,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
@ -291,12 +305,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give mtsSender time to finish up then Close. // Wait until the destination has all the data, then close the sender.
time.Sleep(100 * time.Millisecond) <-dst.done
sender.Close() sender.Close()
// Check the data. // Check the data.
result := tstDst.buf result := dst.buf
expectedCC := 0 expectedCC := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)