diff --git a/Readme.md b/Readme.md index 1c3489d4..7009a48d 100644 --- a/Readme.md +++ b/Readme.md @@ -2,21 +2,15 @@ av is a collection of tools and packages written in Go for audio-video processing. -# Authors -Alan Noble -Saxon A. Nelson-Milton -Trek Hopton +Codecs, containers and protocols are organized according to directories named accordingly. -# Description - -* revid: a tool for re-muxing and re-directing video streams. -* RingBuffer: a package that implements a ring buffer with concurrency control. +cmd/revid-cli is a command-line program for reading, transcoding, and writing audio/video streams and files. # License -Copyright (C) 2017 the Australian Ocean Lab (AusOcean). +Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean). -It is free software: you can redistribute it and/or modify them +This is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. @@ -27,4 +21,4 @@ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License or more details. You should have received a copy of the GNU General Public License -along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses/). +along with revid in gpl.txt. If not, see http://www.gnu.org/licenses/. diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 78d0e31b..87044450 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -305,7 +305,7 @@ func run(cfg revid.Config) { for { err = ns.Run() if err != nil { - log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) + log.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } diff --git a/container/flv/encoder.go b/container/flv/encoder.go index 0fe794d2..306d4b66 100644 --- a/container/flv/encoder.go +++ b/container/flv/encoder.go @@ -55,8 +55,7 @@ var ( // Encoder provides properties required for the generation of flv video // from raw video data type Encoder struct { - dst io.Writer - + dst io.WriteCloser fps int audio bool video bool @@ -64,7 +63,7 @@ type Encoder struct { } // NewEncoder retuns a new FLV encoder. -func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { +func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) { e := Encoder{ dst: dst, fps: fps, @@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) { return len(frame), nil } + +// Close will close the encoder destination. +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go index 0674df19..39a77dff 100644 --- a/container/mts/audio_test.go +++ b/container/mts/audio_test.go @@ -26,6 +26,7 @@ package mts import ( "bytes" + "io" "io/ioutil" "testing" @@ -35,6 +36,10 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" ) +type nopCloser struct{ io.Writer } + +func (nopCloser) Close() error { return nil } + // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. func TestEncodePcm(t *testing.T) { @@ -45,7 +50,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 chunkSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(chunkSize) - e := NewEncoder(&buf, writeFreq, Audio) + e := NewEncoder(nopCloser{&buf}, writeFreq, Audio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) @@ -84,7 +89,7 @@ func TestEncodePcm(t *testing.T) { for i+PacketSize <= len(clip) { // Check MTS packet - if !(pkt.PID() == audioPid) { + if !(pkt.PID() == AudioPid) { i += PacketSize if i+PacketSize <= len(clip) { copy(pkt[:], clip[i:i+PacketSize]) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index a4e4931b..61fc1ade 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -26,7 +26,6 @@ LICENSE package mts import ( - "fmt" "io" "time" @@ -128,7 +127,7 @@ const ( // Encoder encapsulates properties of an mpegts generator. type Encoder struct { - dst io.Writer + dst io.WriteCloser clock time.Duration lastTime time.Time @@ -150,7 +149,7 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { +func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { var mPid int var sid byte switch mediaType { @@ -206,9 +205,6 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { // Write implements io.Writer. Write takes raw video or audio data and encodes into mpegts, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { - if len(data) > pes.MaxPesSize { - return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data)) - } now := time.Now() if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { e.pktCount = 0 @@ -328,3 +324,7 @@ func updateMeta(b []byte) ([]byte, error) { err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) return []byte(p), err } + +func (e *Encoder) Close() error { + return e.dst.Close() +} diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 2bf94d64..939de5b7 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -47,9 +47,8 @@ const fps = 25 // write this to psi. func TestMetaEncode1(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -76,9 +75,8 @@ func TestMetaEncode1(t *testing.T) { // into psi. func TestMetaEncode2(t *testing.T) { Meta = meta.New() - var b []byte - buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps, Video) + var buf bytes.Buffer + e := NewEncoder(nopCloser{&buf}, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/contributors.txt b/contributors.txt index 98f4eac8..0675f323 100644 --- a/contributors.txt +++ b/contributors.txt @@ -1,3 +1,5 @@ Alan Noble Saxon Nelson-Milton Jack Richardson +Dan Kortschak +Trek Hopton \ No newline at end of file diff --git a/go.mod b/go.mod index 5d3fde42..c3d766c5 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,8 @@ go 1.12 require ( bitbucket.org/ausocean/iot v1.2.4 - bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e + bitbucket.org/ausocean/utils v1.2.6 + github.com/BurntSushi/toml v0.3.1 // indirect github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect @@ -14,4 +15,5 @@ require ( github.com/sergi/go-diff v1.0.0 // indirect github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index a23bf607..cd09945f 100644 --- a/go.sum +++ b/go.sum @@ -1,34 +1,23 @@ bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= -bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q= -bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA= -bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA= -bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= -bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= -bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= -bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8= +bitbucket.org/ausocean/utils v1.2.6 h1:JN66APCV+hu6GebIHSu2KSywhLym4vigjSz5+fB0zXc= +bitbucket.org/ausocean/utils v1.2.6/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= -github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= @@ -40,19 +29,16 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= -github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= diff --git a/protocol/rtmp/conn.go b/protocol/rtmp/conn.go index 9b453849..26420683 100644 --- a/protocol/rtmp/conn.go +++ b/protocol/rtmp/conn.go @@ -31,6 +31,9 @@ LICENSE Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ + +// Package rtmp provides an RTMP client implementation. +// The package currently supports live streaming to YouTube. package rtmp import ( diff --git a/protocol/rtmp/rtmp_test.go b/protocol/rtmp/rtmp_test.go index f2d661c5..1cf056cb 100644 --- a/protocol/rtmp/rtmp_test.go +++ b/protocol/rtmp/rtmp_test.go @@ -222,6 +222,8 @@ func (rs *rtmpSender) Write(p []byte) (int, error) { return n, nil } +func (rs *rtmpSender) Close() error { return nil } + // TestFromFile tests streaming from an video file comprising raw H.264. // The test file is supplied via the RTMP_TEST_FILE environment variable. func TestFromFile(t *testing.T) { diff --git a/revid/config.go b/revid/config.go index 575c1042..6986e83b 100644 --- a/revid/config.go +++ b/revid/config.go @@ -158,7 +158,7 @@ const ( defaultFramesPerClip = 1 httpFramesPerClip = 560 defaultInputCodec = H264 - defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. + defaultVerbosity = logger.Error defaultRtpAddr = "localhost:6970" defaultBurstPeriod = 10 // Seconds defaultRotation = 0 // Degrees @@ -176,21 +176,20 @@ const ( // if particular parameters have not been defined. func (c *Config) Validate(r *Revid) error { switch c.LogLevel { - case Yes: - case No: - case NothingDefined: - c.LogLevel = defaultVerbosity - c.Logger.Log(logger.Info, pkg+"no LogLevel mode defined, defaulting", - "LogLevel", defaultVerbosity) + case logger.Debug: + case logger.Info: + case logger.Warning: + case logger.Error: + case logger.Fatal: default: - return errors.New("bad LogLevel defined in config") + c.LogLevel = defaultVerbosity + c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity) } switch c.Input { case Raspivid, V4L, File, Audio: case NothingDefined: - c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", - defaultInput) + c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput default: return errors.New("bad input type defined in config") @@ -214,47 +213,42 @@ func (c *Config) Validate(r *Revid) error { } case PCM, ADPCM: case NothingDefined: - c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting", - "inputCodec", defaultInputCodec) + c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting", "inputCodec", defaultInputCodec) c.InputCodec = defaultInputCodec - c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization", - defaultQuantization) + c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization", defaultQuantization) c.Quantization = defaultQuantization - default: return errors.New("bad input codec defined in config") } - for i, o := range c.Outputs { - switch o { - case File: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Outputs[i] = Http - // FIXME(kortschak): Does this want the same line as below? - // c.FramesPerClip = httpFramesPerClip - break + if c.Outputs == nil { + c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", defaultOutput) + c.Outputs = append(c.Outputs, defaultOutput) + c.Packetization = defaultPacketization + } else { + for i, o := range c.Outputs { + switch o { + case File: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Outputs[i] = Http + // FIXME(kortschak): Does this want the same line as below? + // c.FramesPerClip = httpFramesPerClip + break + } + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip) + c.FramesPerClip = defaultFramesPerClip + c.Packetization = Flv + c.SendRetry = true + case Http, Rtp: + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip) + c.FramesPerClip = httpFramesPerClip + c.Packetization = Mpegts + default: + return errors.New("bad output type defined in config") } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - c.Packetization = Flv - c.SendRetry = true - case NothingDefined: - c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", - defaultOutput) - c.Outputs[i] = defaultOutput - c.Packetization = defaultPacketization - fallthrough - case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip - c.Packetization = Mpegts - default: - return errors.New("bad output type defined in config") } } @@ -264,8 +258,7 @@ func (c *Config) Validate(r *Revid) error { } if c.FramesPerClip < 1 { - c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", - "framesPerClip", defaultFramesPerClip) + c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) c.FramesPerClip = defaultFramesPerClip } diff --git a/revid/revid.go b/revid/revid.go index 14923af2..e02b9404 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -8,6 +8,7 @@ DESCRIPTION AUTHORS Saxon A. Nelson-Milton Alan Noble + Dan Kortschak LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) @@ -20,13 +21,12 @@ LICENSE It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. + for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + in gpl.txt. If not, see http://www.gnu.org/licenses. */ -// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( @@ -46,15 +46,12 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -// Ring buffer sizes and read/write timeouts. +// mtsSender ringBuffer sizes. const ( - ringBufferSize = 1000 - ringBufferElementSize = 100000 - writeTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond + rbSize = 1000 + rbElementSize = 100000 ) // RTMP connection properties. @@ -63,17 +60,6 @@ const ( rtmpConnectionTimeout = 10 ) -// Duration of video for each clip sent out. -const clipDuration = 1 * time.Second - -// Time duration between bitrate checks. -const bitrateTime = 1 * time.Minute - -// After a send fail, this is the delay before another send. -const sendFailedDelay = 5 * time.Millisecond - -const ffmpegPath = "/usr/local/bin/ffmpeg" - const pkg = "revid:" type Logger interface { @@ -89,12 +75,17 @@ type Revid struct { // FIXME(kortschak): The relationship of concerns // in config/ns is weird. config Config + // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender // setupInput holds the current approach to setting up - // the input stream. - setupInput func() error + // the input stream. It returns a function used for cleaning up, and any errors. + setupInput func() (func() error, error) + + // closeInput holds the cleanup function return from setupInput and is called + // in Revid.Stop(). + closeInput func() error // cmd is the exec'd process that may be used to produce // the input stream. @@ -105,40 +96,19 @@ type Revid struct { // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration, bufSize int) error - // buffer handles passing frames from the transcoder - // to the target destination. - buffer *buffer + // encoders will hold the multiWriteCloser that writes to encoders from the lexer. + encoders io.WriteCloser - // encoder holds the required encoders, which then write to destinations. - encoder []io.Writer - - // writeClosers holds the senders that the encoders will write to. - writeClosers []io.WriteCloser - - // bitrate hold the last send bitrate calculation result. - bitrate int - - mu sync.Mutex + // isRunning is used to keep track of revid's running state between methods. isRunning bool + // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup + // err will channel errors from revid routines to the handle errors routine. err chan error } -// buffer is a wrapper for a ring.Buffer and provides function to write and -// flush in one Write call. -type buffer ring.Buffer - -// Write implements the io.Writer interface. It will write to the underlying -// ring.Buffer and then flush to indicate a complete ring.Buffer write. -func (b *buffer) Write(d []byte) (int, error) { - r := (*ring.Buffer)(b) - n, err := r.Write(d) - r.Flush() - return n, err -} - // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { @@ -151,6 +121,13 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// Config returns a copy of revids current config. +// +// Config is not safe for concurrent use. +func (r *Revid) Config() Config { + return r.config +} + // TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { @@ -167,10 +144,43 @@ func (r *Revid) handleErrors() { } // Bitrate returns the result of the most recent bitrate check. +// +// TODO: get this working again. func (r *Revid) Bitrate() int { - return r.bitrate + return -1 } +// reset swaps the current config of a Revid with the passed +// configuration; checking validity and returning errors if not valid. It then +// sets up the data pipeline accordingly to this configuration. +func (r *Revid) reset(config Config) error { + err := r.setConfig(config) + if err != nil { + return err + } + + r.config.Logger.SetLevel(config.LogLevel) + + err = r.setupPipeline( + func(dst io.WriteCloser, fps, medType int) (io.WriteCloser, error) { + e := mts.NewEncoder(dst, float64(fps), mts.Video) + return e, nil + }, + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + return flv.NewEncoder(dst, true, true, fps) + }, + ioext.MultiWriteCloser, + ) + + if err != nil { + return err + } + + return nil +} + +// setConfig takes a config, checks it's validity and then replaces the current +// revid config. func (r *Revid) setConfig(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) @@ -187,10 +197,10 @@ func (r *Revid) setConfig(config Config) error { // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. -func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (io.Writer, error), flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { - r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) - - r.encoder = r.encoder[:0] +func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate, mediaType int) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { + // encoders will hold the encoders that are required for revid's current + // configuration. + var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. @@ -203,7 +213,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -238,7 +248,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i mediaType = mts.Video } e, _ := mtsEnc(mw, int(r.config.WriteRate), mediaType) - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV @@ -250,9 +260,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i if err != nil { return err } - r.encoder = append(r.encoder, e) + encoders = append(encoders, e) } + r.encoders = multiWriter(encoders...) + switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid @@ -281,106 +293,65 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i return nil } -func newMtsEncoder(dst io.Writer, writeRate, mediaType int) (io.Writer, error) { - e := mts.NewEncoder(dst, float64(writeRate), mediaType) - return e, nil -} - -func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - e, err := flv.NewEncoder(dst, true, true, fps) - if err != nil { - return nil, err - } - return e, nil -} - -// reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. -func (r *Revid) reset(config Config) error { - err := r.setConfig(config) - if err != nil { - return err - } - - err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser) - if err != nil { - return err - } - - return nil -} - -// IsRunning returns true if revid is running. -func (r *Revid) IsRunning() bool { - r.mu.Lock() - ret := r.isRunning - r.mu.Unlock() - return ret -} - -func (r *Revid) Config() Config { - r.mu.Lock() - cfg := r.config - r.mu.Unlock() - return cfg -} - -// setIsRunning sets r.isRunning using b. -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. +// +// Start is not safe for concurrent use. func (r *Revid) Start() error { - if r.IsRunning() { + if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") return nil } r.config.Logger.Log(logger.Info, pkg+"starting Revid") - // TODO: this doesn't need to be here - r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.setIsRunning(true) - r.config.Logger.Log(logger.Info, pkg+"starting output routine") - r.wg.Add(1) - go r.outputClips() - r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - err := r.setupInput() + r.isRunning = true + var err error + r.closeInput, err = r.setupInput() if err != nil { r.Stop() } return err } -// Stop halts any processing of video data from a camera or file +// Stop closes down the pipeline. This closes encoders and sender output routines, +// connections, and/or files. +// +// Stop is not safe for concurrent use. func (r *Revid) Stop() { - if !r.IsRunning() { + if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") return } - for _, w := range r.writeClosers { - err := w.Close() + if r.closeInput != nil { + err := r.closeInput() if err != nil { - r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) + r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error()) } } - r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.config.Logger.Log(logger.Info, pkg+"killing input proccess") - // If a cmd process is running, we kill! + + r.config.Logger.Log(logger.Info, pkg+"closing pipeline") + err := r.encoders.Close() + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) + } + if r.cmd != nil && r.cmd.Process != nil { + r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } - r.setIsRunning(false) r.wg.Wait() + r.isRunning = false } +// Update takes a map of variables and their values and edits the current config +// if the variables are recognised as valid parameters. +// +// Update is not safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { - if r.IsRunning() { + if r.isRunning { r.Stop() } + //look through the vars and update revid where needed for key, value := range vars { switch key { @@ -505,64 +476,30 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.BurstPeriod = uint(v) + case "Logging": + switch value { + case "Debug": + r.config.LogLevel = logger.Debug + case "Info": + r.config.LogLevel = logger.Info + case "Warning": + r.config.LogLevel = logger.Warning + case "Error": + r.config.LogLevel = logger.Error + case "Fatal": + r.config.LogLevel = logger.Fatal + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) + } } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) return r.reset(r.config) } -// outputClips takes the clips produced in the packClips method and outputs them -// to the desired output defined in the revid config -func (r *Revid) outputClips() { - defer r.wg.Done() - lastTime := time.Now() - var count int -loop: - for r.IsRunning() { - // If the ring buffer has something we can read and send off - chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout) - switch err { - case nil: - // Do nothing. - case ring.ErrTimeout: - r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout") - continue - default: - r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error()) - fallthrough - case io.EOF: - break loop - } - - // Loop over encoders and hand bytes over to each one. - for _, e := range r.encoder { - _, err := chunk.WriteTo(e) - if err != nil { - r.err <- err - } - } - - // Release the chunk back to the ring buffer. - chunk.Close() - - // FIXME(saxon): this doesn't work anymore. - now := time.Now() - deltaTime := now.Sub(lastTime) - if deltaTime > bitrateTime { - // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. - r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) - r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) - r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len()) - lastTime = now - count = 0 - } - } - r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore") -} - // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. -func (r *Revid) startRaspivid() error { +func (r *Revid) startRaspivid() (func() error, error) { r.config.Logger.Log(logger.Info, pkg+"starting raspivid") const disabled = "0" @@ -594,7 +531,7 @@ func (r *Revid) startRaspivid() error { switch r.config.InputCodec { default: - return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) + return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) case H264: args = append(args, "--codec", "H264", @@ -612,7 +549,7 @@ func (r *Revid) startRaspivid() error { stdout, err := r.cmd.StdoutPipe() if err != nil { - return err + return nil, err } err = r.cmd.Start() if err != nil { @@ -621,10 +558,10 @@ func (r *Revid) startRaspivid() error { r.wg.Add(1) go r.processFrom(stdout, 0, 0) - return nil + return nil, nil } -func (r *Revid) startV4L() error { +func (r *Revid) startV4L() (func() error, error) { const defaultVideo = "/dev/video0" r.config.Logger.Log(logger.Info, pkg+"starting webcam") @@ -652,38 +589,37 @@ func (r *Revid) startV4L() error { stdout, err := r.cmd.StdoutPipe() if err != nil { - return err + return nil, nil } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) - return err + return nil, nil } r.wg.Add(1) go r.processFrom(stdout, time.Duration(0), 0) - return nil + return nil, nil } // setupInputForFile sets things up for getting input from a file -func (r *Revid) setupInputForFile() error { +func (r *Revid) setupInputForFile() (func() error, error) { f, err := os.Open(r.config.InputPath) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) r.Stop() - return err + return nil, err } - defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate), 0) - return nil + return func() error { return f.Close() }, nil } // startAudioInput is used to start capturing audio from an audio device and processing it. -func (r *Revid) startAudioInput() error { +func (r *Revid) startAudioInput() (func() error, error) { ac := &AudioConfig{ SampleRate: r.config.SampleRate, Channels: r.config.Channels, @@ -693,12 +629,12 @@ func (r *Revid) startAudioInput() error { } ai := NewAudioInput(ac) go r.processFrom(ai, time.Second/time.Duration(r.config.WriteRate), ai.ChunkSize()) - return nil + return nil, nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration, bufSize int) { r.config.Logger.Log(logger.Info, pkg+"reading input data") - r.err <- r.lexTo(r.buffer, read, delay, bufSize) + r.err <- r.lexTo(r.encoders, read, delay, bufSize) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() } diff --git a/revid/revid_test.go b/revid/revid_test.go index ccf0de3a..18086912 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -1,3 +1,31 @@ +/* +NAME + revid_test.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + +LICENSE + This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + package revid import ( @@ -31,7 +59,6 @@ func TestRaspivid(t *testing.T) { var c Config c.Logger = &logger c.Input = Raspivid - c.Outputs = make([]uint8, 1) rv, err := New(c, ns) if err != nil { @@ -47,11 +74,8 @@ func TestRaspivid(t *testing.T) { // testLogger implements a netsender.Logger. type testLogger struct{} -// SetLevel normally sets the logging level, but it is a no-op in our case. -func (tl *testLogger) SetLevel(level int8) { -} +func (tl *testLogger) SetLevel(level int8) {} -// Log requests the Logger to write a message at the given level. func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} if level < -1 || level > 5 { @@ -71,44 +95,35 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. type tstMtsEncoder struct { - dst io.Writer + // dst is here solely to detect the type stored in the encoder. + // No data is written to dst. + dst io.WriteCloser } -// newTstMtsEncoder returns a pointer to a newTsMtsEncoder. -func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstMtsEncoder{dst: dst}, nil -} - -func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil } +func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstMtsEncoder) Close() error { return nil } // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // This will allow access to the dst to check that it has been set corrctly. type tstFlvEncoder struct { - dst io.Writer -} - -// newTstFlvEncoder returns a pointer to a new tstFlvEncoder. -func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { - return &tstFlvEncoder{dst: dst}, nil + // dst is here solely to detect the type stored in the encoder. + // No data is written to dst. + dst io.WriteCloser } func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } +func (e *tstFlvEncoder) Close() error { return nil } // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // can access the destinations. type dummyMultiWriter struct { + // dst is here solely to detect the types stored in the multiWriter. + // No data is written to dst. dst []io.WriteCloser } -func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser { - return &dummyMultiWriter{ - dst: dst, - } -} - func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } - -func (w *dummyMultiWriter) Close() error { return nil } +func (w *dummyMultiWriter) Close() error { return nil } // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. @@ -216,20 +231,30 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // This logic is what we want to check. - err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) + err = rv.setupPipeline( + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstMtsEncoder{dst: dst}, nil + }, + func(dst io.WriteCloser, rate int) (io.WriteCloser, error) { + return &tstFlvEncoder{dst: dst}, nil + }, + func(writers ...io.WriteCloser) io.WriteCloser { + return &dummyMultiWriter{dst: writers} + }, + ) if err != nil { t.Fatalf("unexpected error: %v for test %v", err, testNum) } // First check that we have the correct number of encoders. - got := len(rv.encoder) + got := len(rv.encoders.(*dummyMultiWriter).dst) want := len(test.encoders) if got != want { t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) } // Now check the correctness of encoders and their destinations. - for _, e := range rv.encoder { + for _, e := range rv.encoders.(*dummyMultiWriter).dst { // Get e's type. encoderType := fmt.Sprintf("%T", e) @@ -245,7 +270,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // Now check that this encoder has correct number of destinations (senders). - var ms io.Writer + var ms io.WriteCloser switch encoderType { case mtsEncoderStr: ms = e.(*tstMtsEncoder).dst diff --git a/revid/senders.go b/revid/senders.go index 01aa208e..17fa7dc1 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "errors" "fmt" "io" "net" @@ -119,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) // Extract location from reply g, err := dec.String("ll") if err != nil { - log(logger.Warning, pkg+"No location in reply") + log(logger.Debug, pkg+"No location in reply") } else { log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) @@ -156,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() } type mtsSender struct { dst io.WriteCloser buf []byte - ringBuf *ring.Buffer + ring *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int - quit chan struct{} + done chan struct{} log func(lvl int8, msg string, args ...interface{}) wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), - quit: make(chan struct{}), + ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout), + done: make(chan struct{}), } s.wg.Add(1) go s.output() @@ -185,25 +184,23 @@ func (s *mtsSender) output() { var chunk *ring.Chunk for { select { - case <-s.quit: - s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") + case <-s.done: + s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error - chunk, err = s.ringBuf.Next(readTimeout) + chunk, err = s.ring.Next(0) switch err { - case nil: + case nil, io.EOF: continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue default: s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) - fallthrough - case io.EOF: continue } } @@ -235,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) { copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() if s.curPid == mts.PatPid && len(s.buf) > 0 { - _, err := s.ringBuf.Write(s.buf) + _, err := s.ring.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } - s.ringBuf.Flush() + s.ring.Flush() s.buf = s.buf[:0] } return len(d), nil @@ -247,21 +244,21 @@ func (s *mtsSender) Write(d []byte) (int, error) { // Close implements io.Closer. func (s *mtsSender) Close() error { - close(s.quit) + close(s.done) s.wg.Wait() return nil } // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { - conn *rtmp.Conn - + conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) - - data []byte + ring *ring.Buffer + done chan struct{} + wg sync.WaitGroup } func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { @@ -283,24 +280,76 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg timeout: timeout, retries: retries, log: log, + ring: ring.NewBuffer(10, rbElementSize, 0), + done: make(chan struct{}), } + s.wg.Add(1) + go s.output() return s, err } +// output starts an mtsSender's data handling routine. +func (s *rtmpSender) output() { + var chunk *ring.Chunk + for { + select { + case <-s.done: + s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine") + defer s.wg.Done() + return + default: + // If chunk is nil then we're ready to get another from the ring buffer. + if chunk == nil { + var err error + chunk, err = s.ring.Next(0) + switch err { + case nil, io.EOF: + continue + case ring.ErrTimeout: + s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") + continue + default: + s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error()) + continue + } + } + if s.conn == nil { + s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...") + err := s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + continue + } + } + _, err := s.conn.Write(chunk.Bytes()) + switch err { + case nil, rtmp.ErrInvalidFlvTag: + default: + s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) + err = s.restart() + if err != nil { + s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + } + continue + } + chunk.Close() + chunk = nil + } + } +} + // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - if s.conn == nil { - return 0, errors.New("no rtmp connection, cannot write") - } - _, err := s.conn.Write(d) + _, err := s.ring.Write(d) if err != nil { - err = s.restart() + s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) } - return len(d), err + s.ring.Flush() + return len(d), nil } func (s *rtmpSender) restart() error { - s.Close() + s.close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) @@ -316,6 +365,14 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { + if s.done != nil { + close(s.done) + } + s.wg.Wait() + return s.close() +} + +func (s *rtmpSender) close() error { if s.conn == nil { return nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index 0a09662a..80293759 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -41,14 +41,6 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// Ring buffer sizes and read/write timeouts. -const ( - rbSize = 100 - rbElementSize = 150000 - wTimeout = 10 * time.Millisecond - rTimeout = 10 * time.Millisecond -) - var ( errSendFailed = errors.New("send failed") ) @@ -56,29 +48,50 @@ var ( // destination simulates a destination for the mtsSender. It allows for the // emulation of failed and delayed sends. type destination struct { - buf [][]byte - testFails bool - failAt int - currentPkt int - t *testing.T - sendDelay time.Duration - delayAt int + // Holds the clips written to this destination using Write. + buf [][]byte + + // testFails is set to true if we would like a write to fail at a particular + // clip as determined by failAt. + testFails bool + failAt int + + // Holds the current clip number. + currentClip int + + // Pointer to the testing.T of a test where this struct is being used. This + // is used so that logging can be done through the testing log utilities. + t *testing.T + + // sendDelay is the amount of time we would like a Write to be delayed when + // we hit the clip number indicated by delayAt. + sendDelay time.Duration + delayAt int + + // done will be used to send a signal to the main routine to indicate that + // the destination has received all clips. doneAt indicates the final clip + // number. + done chan struct{} + doneAt int } func (ts *destination) Write(d []byte) (int, error) { ts.t.Log("writing clip to destination") - if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { + if ts.delayAt != 0 && ts.currentClip == ts.delayAt { time.Sleep(ts.sendDelay) } - if ts.testFails && ts.currentPkt == ts.failAt { + if ts.testFails && ts.currentClip == ts.failAt { ts.t.Log("failed send") - ts.currentPkt++ + ts.currentClip++ return 0, errSendFailed } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) - ts.currentPkt++ + if ts.currentClip == ts.doneAt { + close(ts.done) + } + ts.currentClip++ return len(d), nil } @@ -118,8 +131,9 @@ func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer, sender, sender and the MPEGTS encoder. - tstDst := &destination{t: t} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + const numberOfClips = 11 + dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -134,12 +148,12 @@ func TestMtsSenderSegment(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -196,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 - tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) + dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder and send PSI every 10 packets. @@ -212,12 +226,12 @@ func TestMtsSenderFailedSend(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give the mtsSender some time to finish up and then Close it. - time.Sleep(10 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-dst.done sender.Close() // Check that we have data as expected. - result := tstDst.buf + result := dst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -276,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 - tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} - sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) + dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} + sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) encoder := mts.NewEncoder(sender, 25, mts.Video) // Turn time based PSI writing off for encoder. @@ -291,12 +305,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) { encoder.Write([]byte{byte(i)}) } - // Give mtsSender time to finish up then Close. - time.Sleep(100 * time.Millisecond) + // Wait until the destination has all the data, then close the sender. + <-dst.done sender.Close() // Check the data. - result := tstDst.buf + result := dst.buf expectedCC := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo)