mirror of https://bitbucket.org/ausocean/av.git
revid: removed ringBuffer after lexer
Now that we want buffered senders (as required), the ringBuffer that was after the lexer has been removed. Instead, we now have an ioext.multiWriterCloser to which the lexer writes to. This then writes to the encoders, and then encoders write to each of their own multiWriteClosers, which write to the appropriate senders. We now call close on the first multiWriteCloser to close down the entired pipeline, as this close call propogates through each level. We have removed the outputClips routine as it's not required anymore to get data from the revid ringBuffer, and have removed other things that were used by this, like the IsRunning function. We have also updated tests to work with these changes - they are passing.
This commit is contained in:
parent
899a2fe89e
commit
f59879b51d
|
@ -55,8 +55,7 @@ var (
|
||||||
// Encoder provides properties required for the generation of flv video
|
// Encoder provides properties required for the generation of flv video
|
||||||
// from raw video data
|
// from raw video data
|
||||||
type Encoder struct {
|
type Encoder struct {
|
||||||
dst io.Writer
|
dst io.WriteCloser
|
||||||
|
|
||||||
fps int
|
fps int
|
||||||
audio bool
|
audio bool
|
||||||
video bool
|
video bool
|
||||||
|
@ -64,7 +63,7 @@ type Encoder struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEncoder retuns a new FLV encoder.
|
// NewEncoder retuns a new FLV encoder.
|
||||||
func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) {
|
func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) {
|
||||||
e := Encoder{
|
e := Encoder{
|
||||||
dst: dst,
|
dst: dst,
|
||||||
fps: fps,
|
fps: fps,
|
||||||
|
@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) {
|
||||||
|
|
||||||
return len(frame), nil
|
return len(frame), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close will close the encoder destination.
|
||||||
|
func (e *Encoder) Close() error {
|
||||||
|
return e.dst.Close()
|
||||||
|
}
|
||||||
|
|
|
@ -35,12 +35,20 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/mts/meta"
|
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type buffer bytes.Buffer
|
||||||
|
|
||||||
|
func (b *buffer) Write(d []byte) (int, error) {
|
||||||
|
return b.Write(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *buffer) Close() error { return nil }
|
||||||
|
|
||||||
// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data.
|
// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data.
|
||||||
// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm.
|
// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm.
|
||||||
func TestEncodePcm(t *testing.T) {
|
func TestEncodePcm(t *testing.T) {
|
||||||
Meta = meta.New()
|
Meta = meta.New()
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf buffer
|
||||||
sampleRate := 48000
|
sampleRate := 48000
|
||||||
sampleSize := 2
|
sampleSize := 2
|
||||||
blockSize := 16000
|
blockSize := 16000
|
||||||
|
@ -69,7 +77,7 @@ func TestEncodePcm(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clip := buf.Bytes()
|
clip := (*bytes.Buffer)(&buf).Bytes()
|
||||||
|
|
||||||
// Get the first MTS packet to check
|
// Get the first MTS packet to check
|
||||||
var pkt packet.Packet
|
var pkt packet.Packet
|
||||||
|
|
|
@ -127,7 +127,7 @@ const (
|
||||||
|
|
||||||
// Encoder encapsulates properties of an mpegts generator.
|
// Encoder encapsulates properties of an mpegts generator.
|
||||||
type Encoder struct {
|
type Encoder struct {
|
||||||
dst io.Writer
|
dst io.WriteCloser
|
||||||
|
|
||||||
clock time.Duration
|
clock time.Duration
|
||||||
lastTime time.Time
|
lastTime time.Time
|
||||||
|
@ -149,7 +149,7 @@ type Encoder struct {
|
||||||
|
|
||||||
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
|
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
|
||||||
// calls write for every frame, the rate will be the frame rate of the video.
|
// calls write for every frame, the rate will be the frame rate of the video.
|
||||||
func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder {
|
func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
|
||||||
var mPid int
|
var mPid int
|
||||||
var sid byte
|
var sid byte
|
||||||
switch mediaType {
|
switch mediaType {
|
||||||
|
@ -327,3 +327,7 @@ func updateMeta(b []byte) ([]byte, error) {
|
||||||
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
|
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
|
||||||
return []byte(p), err
|
return []byte(p), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Encoder) Close() error {
|
||||||
|
return e.dst.Close()
|
||||||
|
}
|
||||||
|
|
|
@ -47,14 +47,13 @@ const fps = 25
|
||||||
// write this to psi.
|
// write this to psi.
|
||||||
func TestMetaEncode1(t *testing.T) {
|
func TestMetaEncode1(t *testing.T) {
|
||||||
Meta = meta.New()
|
Meta = meta.New()
|
||||||
var b []byte
|
var buf buffer
|
||||||
buf := bytes.NewBuffer(b)
|
e := NewEncoder(&buf, fps, Video)
|
||||||
e := NewEncoder(buf, fps, Video)
|
|
||||||
Meta.Add("ts", "12345678")
|
Meta.Add("ts", "12345678")
|
||||||
if err := e.writePSI(); err != nil {
|
if err := e.writePSI(); err != nil {
|
||||||
t.Errorf(errUnexpectedErr, err.Error())
|
t.Errorf(errUnexpectedErr, err.Error())
|
||||||
}
|
}
|
||||||
out := buf.Bytes()
|
out := (*bytes.Buffer)(&buf).Bytes()
|
||||||
got := out[PacketSize+4:]
|
got := out[PacketSize+4:]
|
||||||
|
|
||||||
want := []byte{
|
want := []byte{
|
||||||
|
@ -76,15 +75,14 @@ func TestMetaEncode1(t *testing.T) {
|
||||||
// into psi.
|
// into psi.
|
||||||
func TestMetaEncode2(t *testing.T) {
|
func TestMetaEncode2(t *testing.T) {
|
||||||
Meta = meta.New()
|
Meta = meta.New()
|
||||||
var b []byte
|
var buf buffer
|
||||||
buf := bytes.NewBuffer(b)
|
e := NewEncoder(&buf, fps, Video)
|
||||||
e := NewEncoder(buf, fps, Video)
|
|
||||||
Meta.Add("ts", "12345678")
|
Meta.Add("ts", "12345678")
|
||||||
Meta.Add("loc", "1234,4321,1234")
|
Meta.Add("loc", "1234,4321,1234")
|
||||||
if err := e.writePSI(); err != nil {
|
if err := e.writePSI(); err != nil {
|
||||||
t.Errorf(errUnexpectedErr, err.Error())
|
t.Errorf(errUnexpectedErr, err.Error())
|
||||||
}
|
}
|
||||||
out := buf.Bytes()
|
out := (*bytes.Buffer)(&buf).Bytes()
|
||||||
got := out[PacketSize+4:]
|
got := out[PacketSize+4:]
|
||||||
want := []byte{
|
want := []byte{
|
||||||
0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24,
|
0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24,
|
||||||
|
|
244
revid/revid.go
244
revid/revid.go
|
@ -8,6 +8,7 @@ DESCRIPTION
|
||||||
AUTHORS
|
AUTHORS
|
||||||
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||||
Alan Noble <alan@ausocean.org>
|
Alan Noble <alan@ausocean.org>
|
||||||
|
Dan Kortschak <dan@ausocean.org>
|
||||||
|
|
||||||
LICENSE
|
LICENSE
|
||||||
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
|
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
|
||||||
|
@ -26,7 +27,6 @@ LICENSE
|
||||||
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
|
|
||||||
package revid
|
package revid
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -46,15 +46,14 @@ import (
|
||||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||||
"bitbucket.org/ausocean/utils/ioext"
|
"bitbucket.org/ausocean/utils/ioext"
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
"bitbucket.org/ausocean/utils/ring"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Ring buffer sizes and read/write timeouts.
|
// mtsSender ringBuffer sizes.
|
||||||
const (
|
const (
|
||||||
ringBufferSize = 1000
|
rbSize = 1000
|
||||||
ringBufferElementSize = 100000
|
rbElementSize = 100000
|
||||||
writeTimeout = 10 * time.Millisecond
|
wTimeout = 1 * time.Second
|
||||||
readTimeout = 10 * time.Millisecond
|
rTimeout = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// RTMP connection properties.
|
// RTMP connection properties.
|
||||||
|
@ -63,17 +62,6 @@ const (
|
||||||
rtmpConnectionTimeout = 10
|
rtmpConnectionTimeout = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
// Duration of video for each clip sent out.
|
|
||||||
const clipDuration = 1 * time.Second
|
|
||||||
|
|
||||||
// Time duration between bitrate checks.
|
|
||||||
const bitrateTime = 1 * time.Minute
|
|
||||||
|
|
||||||
// After a send fail, this is the delay before another send.
|
|
||||||
const sendFailedDelay = 5 * time.Millisecond
|
|
||||||
|
|
||||||
const ffmpegPath = "/usr/local/bin/ffmpeg"
|
|
||||||
|
|
||||||
const pkg = "revid:"
|
const pkg = "revid:"
|
||||||
|
|
||||||
type Logger interface {
|
type Logger interface {
|
||||||
|
@ -89,6 +77,7 @@ type Revid struct {
|
||||||
// FIXME(kortschak): The relationship of concerns
|
// FIXME(kortschak): The relationship of concerns
|
||||||
// in config/ns is weird.
|
// in config/ns is weird.
|
||||||
config Config
|
config Config
|
||||||
|
|
||||||
// ns holds the netsender.Sender responsible for HTTP.
|
// ns holds the netsender.Sender responsible for HTTP.
|
||||||
ns *netsender.Sender
|
ns *netsender.Sender
|
||||||
|
|
||||||
|
@ -105,40 +94,19 @@ type Revid struct {
|
||||||
// lexTo, encoder and packer handle transcoding the input stream.
|
// lexTo, encoder and packer handle transcoding the input stream.
|
||||||
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
|
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
|
||||||
|
|
||||||
// buffer handles passing frames from the transcoder
|
// mwc will hold the multiWriteCloser that writes to encoders from the lexer.
|
||||||
// to the target destination.
|
mwc io.WriteCloser
|
||||||
buffer *buffer
|
|
||||||
|
|
||||||
// encoder holds the required encoders, which then write to destinations.
|
|
||||||
encoder []io.Writer
|
|
||||||
|
|
||||||
// writeClosers holds the senders that the encoders will write to.
|
|
||||||
writeClosers []io.WriteCloser
|
|
||||||
|
|
||||||
// bitrate hold the last send bitrate calculation result.
|
|
||||||
bitrate int
|
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
isRunning bool
|
|
||||||
|
|
||||||
|
// wg will be used to wait for any processing routines to finish.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
// isRunning is used to keep track of revid's running state between methods.
|
||||||
|
isRunning bool
|
||||||
|
|
||||||
|
// err will channel errors from revid routines to the handle errors routine.
|
||||||
err chan error
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
// buffer is a wrapper for a ring.Buffer and provides function to write and
|
|
||||||
// flush in one Write call.
|
|
||||||
type buffer ring.Buffer
|
|
||||||
|
|
||||||
// Write implements the io.Writer interface. It will write to the underlying
|
|
||||||
// ring.Buffer and then flush to indicate a complete ring.Buffer write.
|
|
||||||
func (b *buffer) Write(d []byte) (int, error) {
|
|
||||||
r := (*ring.Buffer)(b)
|
|
||||||
n, err := r.Write(d)
|
|
||||||
r.Flush()
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a pointer to a new Revid with the desired configuration, and/or
|
// New returns a pointer to a new Revid with the desired configuration, and/or
|
||||||
// an error if construction of the new instance was not successful.
|
// an error if construction of the new instance was not successful.
|
||||||
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
||||||
|
@ -167,10 +135,40 @@ func (r *Revid) handleErrors() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bitrate returns the result of the most recent bitrate check.
|
// Bitrate returns the result of the most recent bitrate check.
|
||||||
|
//
|
||||||
|
// TODO: get this working again.
|
||||||
func (r *Revid) Bitrate() int {
|
func (r *Revid) Bitrate() int {
|
||||||
return r.bitrate
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset swaps the current config of a Revid with the passed
|
||||||
|
// configuration; checking validity and returning errors if not valid.
|
||||||
|
func (r *Revid) reset(config Config) error {
|
||||||
|
err := r.setConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = r.setupPipeline(
|
||||||
|
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
|
||||||
|
e := mts.NewEncoder(dst, float64(fps), mts.Video)
|
||||||
|
return e, nil
|
||||||
|
},
|
||||||
|
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
|
||||||
|
return flv.NewEncoder(dst, true, true, fps)
|
||||||
|
},
|
||||||
|
ioext.MultiWriteCloser,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// setConfig takes a config, checks it's validity and then replaces the current
|
||||||
|
// revid config.
|
||||||
func (r *Revid) setConfig(config Config) error {
|
func (r *Revid) setConfig(config Config) error {
|
||||||
r.config.Logger = config.Logger
|
r.config.Logger = config.Logger
|
||||||
err := config.Validate(r)
|
err := config.Validate(r)
|
||||||
|
@ -187,10 +185,10 @@ func (r *Revid) setConfig(config Config) error {
|
||||||
// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder
|
// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder
|
||||||
// respectively. multiWriter will be used to create an ioext.multiWriteCloser
|
// respectively. multiWriter will be used to create an ioext.multiWriteCloser
|
||||||
// so that encoders can write to multiple senders.
|
// so that encoders can write to multiple senders.
|
||||||
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
|
func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
|
||||||
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout))
|
// encoders will hold the encoders that are required for revid's current
|
||||||
|
// configuration.
|
||||||
r.encoder = r.encoder[:0]
|
var encoders []io.WriteCloser
|
||||||
|
|
||||||
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
|
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
|
||||||
// will hold senders that require FLV encoding.
|
// will hold senders that require FLV encoding.
|
||||||
|
@ -203,7 +201,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
|
||||||
for _, out := range r.config.Outputs {
|
for _, out := range r.config.Outputs {
|
||||||
switch out {
|
switch out {
|
||||||
case Http:
|
case Http:
|
||||||
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout)
|
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, wTimeout)
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, w)
|
||||||
case Rtp:
|
case Rtp:
|
||||||
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
||||||
|
@ -232,7 +230,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
|
||||||
if len(mtsSenders) != 0 {
|
if len(mtsSenders) != 0 {
|
||||||
mw := multiWriter(mtsSenders...)
|
mw := multiWriter(mtsSenders...)
|
||||||
e, _ := mtsEnc(mw, int(r.config.FrameRate))
|
e, _ := mtsEnc(mw, int(r.config.FrameRate))
|
||||||
r.encoder = append(r.encoder, e)
|
encoders = append(encoders, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have some senders that require FLV encoding then add an FLV
|
// If we have some senders that require FLV encoding then add an FLV
|
||||||
|
@ -244,9 +242,11 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.encoder = append(r.encoder, e)
|
encoders = append(encoders, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.mwc = multiWriter(encoders...)
|
||||||
|
|
||||||
switch r.config.Input {
|
switch r.config.Input {
|
||||||
case Raspivid:
|
case Raspivid:
|
||||||
r.setupInput = r.startRaspivid
|
r.setupInput = r.startRaspivid
|
||||||
|
@ -267,72 +267,15 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) {
|
|
||||||
e := mts.NewEncoder(dst, float64(fps), mts.Video)
|
|
||||||
return e, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) {
|
|
||||||
e, err := flv.NewEncoder(dst, true, true, fps)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return e, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset swaps the current config of a Revid with the passed
|
|
||||||
// configuration; checking validity and returning errors if not valid.
|
|
||||||
func (r *Revid) reset(config Config) error {
|
|
||||||
err := r.setConfig(config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsRunning returns true if revid is running.
|
|
||||||
func (r *Revid) IsRunning() bool {
|
|
||||||
r.mu.Lock()
|
|
||||||
ret := r.isRunning
|
|
||||||
r.mu.Unlock()
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Revid) Config() Config {
|
|
||||||
r.mu.Lock()
|
|
||||||
cfg := r.config
|
|
||||||
r.mu.Unlock()
|
|
||||||
return cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// setIsRunning sets r.isRunning using b.
|
|
||||||
func (r *Revid) setIsRunning(b bool) {
|
|
||||||
r.mu.Lock()
|
|
||||||
r.isRunning = b
|
|
||||||
r.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start invokes a Revid to start processing video from a defined input
|
// Start invokes a Revid to start processing video from a defined input
|
||||||
// and packetising (if theres packetization) to a defined output.
|
// and packetising (if theres packetization) to a defined output.
|
||||||
func (r *Revid) Start() error {
|
func (r *Revid) Start() error {
|
||||||
if r.IsRunning() {
|
if r.isRunning {
|
||||||
r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
|
r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
||||||
// TODO: this doesn't need to be here
|
r.isRunning = true
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
|
||||||
r.setIsRunning(true)
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
|
||||||
r.wg.Add(1)
|
|
||||||
go r.outputClips()
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
|
||||||
err := r.setupInput()
|
err := r.setupInput()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Stop()
|
r.Stop()
|
||||||
|
@ -340,33 +283,35 @@ func (r *Revid) Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop halts any processing of video data from a camera or file
|
// Stop closes down the pipeline. This closes encoders and sender output routines,
|
||||||
|
// connections, and/or files.
|
||||||
func (r *Revid) Stop() {
|
func (r *Revid) Stop() {
|
||||||
if !r.IsRunning() {
|
if !r.isRunning {
|
||||||
r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
|
r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, w := range r.writeClosers {
|
r.config.Logger.Log(logger.Info, pkg+"closing pipeline")
|
||||||
err := w.Close()
|
err := r.mwc.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error())
|
r.config.Logger.Log(logger.Error, pkg+"got error while closing pipeline", "error", err.Error())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
|
||||||
// If a cmd process is running, we kill!
|
|
||||||
if r.cmd != nil && r.cmd.Process != nil {
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
|
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
||||||
r.cmd.Process.Kill()
|
r.cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
r.setIsRunning(false)
|
|
||||||
r.wg.Wait()
|
r.wg.Wait()
|
||||||
|
r.isRunning = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update takes a map of variables and their values and edits the current config
|
||||||
|
// if the variables are recognised as valid parameters.
|
||||||
func (r *Revid) Update(vars map[string]string) error {
|
func (r *Revid) Update(vars map[string]string) error {
|
||||||
if r.IsRunning() {
|
if r.isRunning {
|
||||||
r.Stop()
|
r.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
//look through the vars and update revid where needed
|
//look through the vars and update revid where needed
|
||||||
for key, value := range vars {
|
for key, value := range vars {
|
||||||
switch key {
|
switch key {
|
||||||
|
@ -497,55 +442,6 @@ func (r *Revid) Update(vars map[string]string) error {
|
||||||
return r.reset(r.config)
|
return r.reset(r.config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// outputClips takes the clips produced in the packClips method and outputs them
|
|
||||||
// to the desired output defined in the revid config
|
|
||||||
func (r *Revid) outputClips() {
|
|
||||||
defer r.wg.Done()
|
|
||||||
lastTime := time.Now()
|
|
||||||
var count int
|
|
||||||
loop:
|
|
||||||
for r.IsRunning() {
|
|
||||||
// If the ring buffer has something we can read and send off
|
|
||||||
chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout)
|
|
||||||
switch err {
|
|
||||||
case nil:
|
|
||||||
// Do nothing.
|
|
||||||
case ring.ErrTimeout:
|
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout")
|
|
||||||
continue
|
|
||||||
default:
|
|
||||||
r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
|
|
||||||
fallthrough
|
|
||||||
case io.EOF:
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
|
|
||||||
// Loop over encoders and hand bytes over to each one.
|
|
||||||
for _, e := range r.encoder {
|
|
||||||
_, err := chunk.WriteTo(e)
|
|
||||||
if err != nil {
|
|
||||||
r.err <- err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release the chunk back to the ring buffer.
|
|
||||||
chunk.Close()
|
|
||||||
|
|
||||||
// FIXME(saxon): this doesn't work anymore.
|
|
||||||
now := time.Now()
|
|
||||||
deltaTime := now.Sub(lastTime)
|
|
||||||
if deltaTime > bitrateTime {
|
|
||||||
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
|
|
||||||
r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second))
|
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
|
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len())
|
|
||||||
lastTime = now
|
|
||||||
count = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore")
|
|
||||||
}
|
|
||||||
|
|
||||||
// startRaspivid sets up things for input from raspivid i.e. starts
|
// startRaspivid sets up things for input from raspivid i.e. starts
|
||||||
// a raspivid process and pipes it's data output.
|
// a raspivid process and pipes it's data output.
|
||||||
func (r *Revid) startRaspivid() error {
|
func (r *Revid) startRaspivid() error {
|
||||||
|
@ -670,7 +566,7 @@ func (r *Revid) setupInputForFile() error {
|
||||||
|
|
||||||
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
|
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
|
||||||
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
||||||
r.err <- r.lexTo(r.buffer, read, delay)
|
r.err <- r.lexTo(r.mwc, read, delay)
|
||||||
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
||||||
r.wg.Done()
|
r.wg.Done()
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,11 +47,8 @@ func TestRaspivid(t *testing.T) {
|
||||||
// testLogger implements a netsender.Logger.
|
// testLogger implements a netsender.Logger.
|
||||||
type testLogger struct{}
|
type testLogger struct{}
|
||||||
|
|
||||||
// SetLevel normally sets the logging level, but it is a no-op in our case.
|
func (tl *testLogger) SetLevel(level int8) {}
|
||||||
func (tl *testLogger) SetLevel(level int8) {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log requests the Logger to write a message at the given level.
|
|
||||||
func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
|
func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
|
||||||
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
|
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
|
||||||
if level < -1 || level > 5 {
|
if level < -1 || level > 5 {
|
||||||
|
@ -70,44 +67,23 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
|
||||||
|
|
||||||
// tstMtsEncoder emulates the mts.Encoder to the extent of the dst field.
|
// tstMtsEncoder emulates the mts.Encoder to the extent of the dst field.
|
||||||
// This will allow access to the dst to check that it has been set corrctly.
|
// This will allow access to the dst to check that it has been set corrctly.
|
||||||
type tstMtsEncoder struct {
|
type tstMtsEncoder struct{ dst io.WriteCloser }
|
||||||
dst io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTstMtsEncoder returns a pointer to a newTsMtsEncoder.
|
func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil }
|
||||||
func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) {
|
func (e *tstMtsEncoder) Close() error { return nil }
|
||||||
return &tstMtsEncoder{dst: dst}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil }
|
|
||||||
|
|
||||||
// tstFlvEncoder emulates the flv.Encoder to the extent of the dst field.
|
// tstFlvEncoder emulates the flv.Encoder to the extent of the dst field.
|
||||||
// This will allow access to the dst to check that it has been set corrctly.
|
// This will allow access to the dst to check that it has been set corrctly.
|
||||||
type tstFlvEncoder struct {
|
type tstFlvEncoder struct{ dst io.WriteCloser }
|
||||||
dst io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTstFlvEncoder returns a pointer to a new tstFlvEncoder.
|
|
||||||
func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) {
|
|
||||||
return &tstFlvEncoder{dst: dst}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil }
|
func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil }
|
||||||
|
func (e *tstFlvEncoder) Close() error { return nil }
|
||||||
|
|
||||||
// dummyMultiWriter emulates the MultiWriter provided by std lib, so that we
|
// dummyMultiWriter emulates the MultiWriter provided by std lib, so that we
|
||||||
// can access the destinations.
|
// can access the destinations.
|
||||||
type dummyMultiWriter struct {
|
type dummyMultiWriter struct{ dst []io.WriteCloser }
|
||||||
dst []io.WriteCloser
|
|
||||||
}
|
|
||||||
|
|
||||||
func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser {
|
|
||||||
return &dummyMultiWriter{
|
|
||||||
dst: dst,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil }
|
func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil }
|
||||||
|
|
||||||
func (w *dummyMultiWriter) Close() error { return nil }
|
func (w *dummyMultiWriter) Close() error { return nil }
|
||||||
|
|
||||||
// TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the
|
// TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the
|
||||||
|
@ -216,20 +192,30 @@ func TestResetEncoderSenderSetup(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This logic is what we want to check.
|
// This logic is what we want to check.
|
||||||
err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter)
|
err = rv.setupPipeline(
|
||||||
|
func(dst io.WriteCloser, rate int) (io.WriteCloser, error) {
|
||||||
|
return &tstMtsEncoder{dst: dst}, nil
|
||||||
|
},
|
||||||
|
func(dst io.WriteCloser, rate int) (io.WriteCloser, error) {
|
||||||
|
return &tstFlvEncoder{dst: dst}, nil
|
||||||
|
},
|
||||||
|
func(writers ...io.WriteCloser) io.WriteCloser {
|
||||||
|
return &dummyMultiWriter{dst: writers}
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v for test %v", err, testNum)
|
t.Fatalf("unexpected error: %v for test %v", err, testNum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// First check that we have the correct number of encoders.
|
// First check that we have the correct number of encoders.
|
||||||
got := len(rv.encoder)
|
got := len(rv.mwc.(*dummyMultiWriter).dst)
|
||||||
want := len(test.encoders)
|
want := len(test.encoders)
|
||||||
if got != want {
|
if got != want {
|
||||||
t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want)
|
t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now check the correctness of encoders and their destinations.
|
// Now check the correctness of encoders and their destinations.
|
||||||
for _, e := range rv.encoder {
|
for _, e := range rv.mwc.(*dummyMultiWriter).dst {
|
||||||
// Get e's type.
|
// Get e's type.
|
||||||
encoderType := fmt.Sprintf("%T", e)
|
encoderType := fmt.Sprintf("%T", e)
|
||||||
|
|
||||||
|
|
|
@ -193,7 +193,7 @@ func (s *mtsSender) output() {
|
||||||
// If chunk is nil then we're ready to get another from the ringBuffer.
|
// If chunk is nil then we're ready to get another from the ringBuffer.
|
||||||
if chunk == nil {
|
if chunk == nil {
|
||||||
var err error
|
var err error
|
||||||
chunk, err = s.ringBuf.Next(readTimeout)
|
chunk, err = s.ringBuf.Next(rTimeout)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -41,14 +41,6 @@ import (
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Ring buffer sizes and read/write timeouts.
|
|
||||||
const (
|
|
||||||
rbSize = 100
|
|
||||||
rbElementSize = 150000
|
|
||||||
wTimeout = 10 * time.Millisecond
|
|
||||||
rTimeout = 10 * time.Millisecond
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errSendFailed = errors.New("send failed")
|
errSendFailed = errors.New("send failed")
|
||||||
)
|
)
|
||||||
|
@ -119,7 +111,7 @@ func TestMtsSenderSegment(t *testing.T) {
|
||||||
|
|
||||||
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
// Create ringBuffer, sender, sender and the MPEGTS encoder.
|
||||||
tstDst := &destination{t: t}
|
tstDst := &destination{t: t}
|
||||||
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
|
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
|
||||||
encoder := mts.NewEncoder(sender, 25, mts.Video)
|
encoder := mts.NewEncoder(sender, 25, mts.Video)
|
||||||
|
|
||||||
// Turn time based PSI writing off for encoder.
|
// Turn time based PSI writing off for encoder.
|
||||||
|
@ -197,7 +189,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
|
||||||
// Create destination, the mtsSender and the mtsEncoder
|
// Create destination, the mtsSender and the mtsEncoder
|
||||||
const clipToFailAt = 3
|
const clipToFailAt = 3
|
||||||
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt}
|
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt}
|
||||||
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout)
|
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, rbSize, rbElementSize, wTimeout)
|
||||||
encoder := mts.NewEncoder(sender, 25, mts.Video)
|
encoder := mts.NewEncoder(sender, 25, mts.Video)
|
||||||
|
|
||||||
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
|
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
|
||||||
|
@ -277,7 +269,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
|
||||||
// Create destination, the mtsSender and the mtsEncoder.
|
// Create destination, the mtsSender and the mtsEncoder.
|
||||||
const clipToDelay = 3
|
const clipToDelay = 3
|
||||||
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay}
|
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay}
|
||||||
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout)
|
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, rbElementSize, wTimeout)
|
||||||
encoder := mts.NewEncoder(sender, 25, mts.Video)
|
encoder := mts.NewEncoder(sender, 25, mts.Video)
|
||||||
|
|
||||||
// Turn time based PSI writing off for encoder.
|
// Turn time based PSI writing off for encoder.
|
||||||
|
|
Loading…
Reference in New Issue