Merge branch 'master' into revid-audio

This commit is contained in:
Trek H 2019-05-08 16:23:56 +09:30
commit faa6246a51
16 changed files with 394 additions and 373 deletions

View File

@ -2,21 +2,15 @@
av is a collection of tools and packages written in Go for audio-video processing. av is a collection of tools and packages written in Go for audio-video processing.
# Authors Codecs, containers and protocols are organized according to directories named accordingly.
Alan Noble
Saxon A. Nelson-Milton <saxon.milton@gmail.com>
Trek Hopton <trek@ausocean.org>
# Description cmd/revid-cli is a command-line program for reading, transcoding, and writing audio/video streams and files.
* revid: a tool for re-muxing and re-directing video streams.
* RingBuffer: a package that implements a ring buffer with concurrency control.
# License # License
Copyright (C) 2017 the Australian Ocean Lab (AusOcean). Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean).
It is free software: you can redistribute it and/or modify them This is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your Free Software Foundation, either version 3 of the License, or (at your
option) any later version. option) any later version.
@ -27,4 +21,4 @@ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
or more details. or more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses/). along with revid in gpl.txt. If not, see http://www.gnu.org/licenses/.

View File

@ -305,7 +305,7 @@ func run(cfg revid.Config) {
for { for {
err = ns.Run() err = ns.Run()
if err != nil { if err != nil {
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) log.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }

View File

@ -55,8 +55,7 @@ var (
// Encoder provides properties required for the generation of flv video // Encoder provides properties required for the generation of flv video
// from raw video data // from raw video data
type Encoder struct { type Encoder struct {
dst io.Writer dst io.WriteCloser
fps int fps int
audio bool audio bool
video bool video bool
@ -64,7 +63,7 @@ type Encoder struct {
} }
// NewEncoder retuns a new FLV encoder. // NewEncoder retuns a new FLV encoder.
func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { func NewEncoder(dst io.WriteCloser, audio, video bool, fps int) (*Encoder, error) {
e := Encoder{ e := Encoder{
dst: dst, dst: dst,
fps: fps, fps: fps,
@ -261,3 +260,8 @@ func (e *Encoder) Write(frame []byte) (int, error) {
return len(frame), nil return len(frame), nil
} }
// Close will close the encoder destination.
func (e *Encoder) Close() error {
return e.dst.Close()
}

View File

@ -26,6 +26,7 @@ package mts
import ( import (
"bytes" "bytes"
"io"
"io/ioutil" "io/ioutil"
"testing" "testing"
@ -35,6 +36,10 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
) )
type nopCloser struct{ io.Writer }
func (nopCloser) Close() error { return nil }
// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. // TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data.
// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. // It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm.
func TestEncodePcm(t *testing.T) { func TestEncodePcm(t *testing.T) {
@ -45,7 +50,7 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2 sampleSize := 2
chunkSize := 16000 chunkSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(chunkSize) writeFreq := float64(sampleRate*sampleSize) / float64(chunkSize)
e := NewEncoder(&buf, writeFreq, Audio) e := NewEncoder(nopCloser{&buf}, writeFreq, Audio)
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath) inPcm, err := ioutil.ReadFile(inPath)
@ -84,7 +89,7 @@ func TestEncodePcm(t *testing.T) {
for i+PacketSize <= len(clip) { for i+PacketSize <= len(clip) {
// Check MTS packet // Check MTS packet
if !(pkt.PID() == audioPid) { if !(pkt.PID() == AudioPid) {
i += PacketSize i += PacketSize
if i+PacketSize <= len(clip) { if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize]) copy(pkt[:], clip[i:i+PacketSize])

View File

@ -26,7 +26,6 @@ LICENSE
package mts package mts
import ( import (
"fmt"
"io" "io"
"time" "time"
@ -128,7 +127,7 @@ const (
// Encoder encapsulates properties of an mpegts generator. // Encoder encapsulates properties of an mpegts generator.
type Encoder struct { type Encoder struct {
dst io.Writer dst io.WriteCloser
clock time.Duration clock time.Duration
lastTime time.Time lastTime time.Time
@ -150,7 +149,7 @@ type Encoder struct {
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
// calls write for every frame, the rate will be the frame rate of the video. // calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
var mPid int var mPid int
var sid byte var sid byte
switch mediaType { switch mediaType {
@ -206,9 +205,6 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
// Write implements io.Writer. Write takes raw video or audio data and encodes into mpegts, // Write implements io.Writer. Write takes raw video or audio data and encodes into mpegts,
// then sending it to the encoder's io.Writer destination. // then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(data []byte) (int, error) { func (e *Encoder) Write(data []byte) (int, error) {
if len(data) > pes.MaxPesSize {
return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data))
}
now := time.Now() now := time.Now()
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
e.pktCount = 0 e.pktCount = 0
@ -328,3 +324,7 @@ func updateMeta(b []byte) ([]byte, error) {
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
return []byte(p), err return []byte(p), err
} }
func (e *Encoder) Close() error {
return e.dst.Close()
}

View File

@ -47,9 +47,8 @@ const fps = 25
// write this to psi. // write this to psi.
func TestMetaEncode1(t *testing.T) { func TestMetaEncode1(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var b []byte var buf bytes.Buffer
buf := bytes.NewBuffer(b) e := NewEncoder(nopCloser{&buf}, fps, Video)
e := NewEncoder(buf, fps, Video)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error()) t.Errorf(errUnexpectedErr, err.Error())
@ -76,9 +75,8 @@ func TestMetaEncode1(t *testing.T) {
// into psi. // into psi.
func TestMetaEncode2(t *testing.T) { func TestMetaEncode2(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var b []byte var buf bytes.Buffer
buf := bytes.NewBuffer(b) e := NewEncoder(nopCloser{&buf}, fps, Video)
e := NewEncoder(buf, fps, Video)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {

View File

@ -1,3 +1,5 @@
Alan Noble Alan Noble
Saxon Nelson-Milton Saxon Nelson-Milton
Jack Richardson Jack Richardson
Dan Kortschak
Trek Hopton

4
go.mod
View File

@ -4,7 +4,8 @@ go 1.12
require ( require (
bitbucket.org/ausocean/iot v1.2.4 bitbucket.org/ausocean/iot v1.2.4
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e bitbucket.org/ausocean/utils v1.2.6
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect
@ -14,4 +15,5 @@ require (
github.com/sergi/go-diff v1.0.0 // indirect github.com/sergi/go-diff v1.0.0 // indirect
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
) )

18
go.sum
View File

@ -1,34 +1,23 @@
bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4=
bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU=
bitbucket.org/ausocean/iot v1.2.5 h1:udD5X4oXUuKwdjO7bcq4StcDdjP8fJa2L0FnJJwF+6Q= bitbucket.org/ausocean/utils v1.2.6 h1:JN66APCV+hu6GebIHSu2KSywhLym4vigjSz5+fB0zXc=
bitbucket.org/ausocean/iot v1.2.5/go.mod h1:dOclxXkdxAQGWO7Y5KcP1wpNfxg9oKUA2VqjJ3Le4RA= bitbucket.org/ausocean/utils v1.2.6/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e h1:rn7Z1vE6m1NSH+mrPJPgquEfBDsqzBEH4Y6fxzyB6kA=
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20=
bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw=
bitbucket.org/ausocean/utils v1.2.5 h1:70lkWnoN1SUxiIBIKDiDzaYZ2bjczdNYSAsKj7DUpl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/adrianmo/go-nmea v1.1.1-0.20190109062325-c448653979f7/go.mod h1:HHPxPAm2kmev+61qmkZh7xgZF/7qHtSpsWppip2Ipv8=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c=
github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw=
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs=
github.com/kidoman/embd v0.0.0-20170508013040-d3d8c0c5c68d/go.mod h1:ACKj9jnzOzj1lw2ETilpFGK7L9dtJhAzT7T1OhAGtRQ=
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA=
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s=
github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc=
@ -40,19 +29,16 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0=
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw= github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw=
github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=

View File

@ -31,6 +31,9 @@ LICENSE
Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu Copyright (C) 2009-2010 Howard Chu
*/ */
// Package rtmp provides an RTMP client implementation.
// The package currently supports live streaming to YouTube.
package rtmp package rtmp
import ( import (

View File

@ -222,6 +222,8 @@ func (rs *rtmpSender) Write(p []byte) (int, error) {
return n, nil return n, nil
} }
func (rs *rtmpSender) Close() error { return nil }
// TestFromFile tests streaming from an video file comprising raw H.264. // TestFromFile tests streaming from an video file comprising raw H.264.
// The test file is supplied via the RTMP_TEST_FILE environment variable. // The test file is supplied via the RTMP_TEST_FILE environment variable.
func TestFromFile(t *testing.T) { func TestFromFile(t *testing.T) {

View File

@ -158,7 +158,7 @@ const (
defaultFramesPerClip = 1 defaultFramesPerClip = 1
httpFramesPerClip = 560 httpFramesPerClip = 560
defaultInputCodec = H264 defaultInputCodec = H264
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultVerbosity = logger.Error
defaultRtpAddr = "localhost:6970" defaultRtpAddr = "localhost:6970"
defaultBurstPeriod = 10 // Seconds defaultBurstPeriod = 10 // Seconds
defaultRotation = 0 // Degrees defaultRotation = 0 // Degrees
@ -176,21 +176,20 @@ const (
// if particular parameters have not been defined. // if particular parameters have not been defined.
func (c *Config) Validate(r *Revid) error { func (c *Config) Validate(r *Revid) error {
switch c.LogLevel { switch c.LogLevel {
case Yes: case logger.Debug:
case No: case logger.Info:
case NothingDefined: case logger.Warning:
c.LogLevel = defaultVerbosity case logger.Error:
c.Logger.Log(logger.Info, pkg+"no LogLevel mode defined, defaulting", case logger.Fatal:
"LogLevel", defaultVerbosity)
default: default:
return errors.New("bad LogLevel defined in config") c.LogLevel = defaultVerbosity
c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity)
} }
switch c.Input { switch c.Input {
case Raspivid, V4L, File, Audio: case Raspivid, V4L, File, Audio:
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput)
defaultInput)
c.Input = defaultInput c.Input = defaultInput
default: default:
return errors.New("bad input type defined in config") return errors.New("bad input type defined in config")
@ -214,17 +213,19 @@ func (c *Config) Validate(r *Revid) error {
} }
case PCM, ADPCM: case PCM, ADPCM:
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting", c.Logger.Log(logger.Info, pkg+"no input codec defined, defaulting", "inputCodec", defaultInputCodec)
"inputCodec", defaultInputCodec)
c.InputCodec = defaultInputCodec c.InputCodec = defaultInputCodec
c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization", c.Logger.Log(logger.Info, pkg+"defaulting quantization", "quantization", defaultQuantization)
defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
default: default:
return errors.New("bad input codec defined in config") return errors.New("bad input codec defined in config")
} }
if c.Outputs == nil {
c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output", defaultOutput)
c.Outputs = append(c.Outputs, defaultOutput)
c.Packetization = defaultPacketization
} else {
for i, o := range c.Outputs { for i, o := range c.Outputs {
switch o { switch o {
case File: case File:
@ -237,26 +238,19 @@ func (c *Config) Validate(r *Revid) error {
// c.FramesPerClip = httpFramesPerClip // c.FramesPerClip = httpFramesPerClip
break break
} }
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", "framesPerClip", defaultFramesPerClip)
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
c.Packetization = Flv c.Packetization = Flv
c.SendRetry = true c.SendRetry = true
case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no output defined, defaulting", "output",
defaultOutput)
c.Outputs[i] = defaultOutput
c.Packetization = defaultPacketization
fallthrough
case Http, Rtp: case Http, Rtp:
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", "framesPerClip", httpFramesPerClip)
"framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip c.FramesPerClip = httpFramesPerClip
c.Packetization = Mpegts c.Packetization = Mpegts
default: default:
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }
} }
}
if c.BurstPeriod == 0 { if c.BurstPeriod == 0 {
c.Logger.Log(logger.Info, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) c.Logger.Log(logger.Info, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod)
@ -264,8 +258,7 @@ func (c *Config) Validate(r *Revid) error {
} }
if c.FramesPerClip < 1 { if c.FramesPerClip < 1 {
c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", c.Logger.Log(logger.Info, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip)
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
} }

View File

@ -8,6 +8,7 @@ DESCRIPTION
AUTHORS AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org> Alan Noble <alan@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
@ -23,10 +24,9 @@ LICENSE
for more details. for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
package revid package revid
import ( import (
@ -46,15 +46,12 @@ import (
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
// Ring buffer sizes and read/write timeouts. // mtsSender ringBuffer sizes.
const ( const (
ringBufferSize = 1000 rbSize = 1000
ringBufferElementSize = 100000 rbElementSize = 100000
writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond
) )
// RTMP connection properties. // RTMP connection properties.
@ -63,17 +60,6 @@ const (
rtmpConnectionTimeout = 10 rtmpConnectionTimeout = 10
) )
// Duration of video for each clip sent out.
const clipDuration = 1 * time.Second
// Time duration between bitrate checks.
const bitrateTime = 1 * time.Minute
// After a send fail, this is the delay before another send.
const sendFailedDelay = 5 * time.Millisecond
const ffmpegPath = "/usr/local/bin/ffmpeg"
const pkg = "revid:" const pkg = "revid:"
type Logger interface { type Logger interface {
@ -89,12 +75,17 @@ type Revid struct {
// FIXME(kortschak): The relationship of concerns // FIXME(kortschak): The relationship of concerns
// in config/ns is weird. // in config/ns is weird.
config Config config Config
// ns holds the netsender.Sender responsible for HTTP. // ns holds the netsender.Sender responsible for HTTP.
ns *netsender.Sender ns *netsender.Sender
// setupInput holds the current approach to setting up // setupInput holds the current approach to setting up
// the input stream. // the input stream. It returns a function used for cleaning up, and any errors.
setupInput func() error setupInput func() (func() error, error)
// closeInput holds the cleanup function return from setupInput and is called
// in Revid.Stop().
closeInput func() error
// cmd is the exec'd process that may be used to produce // cmd is the exec'd process that may be used to produce
// the input stream. // the input stream.
@ -105,40 +96,19 @@ type Revid struct {
// lexTo, encoder and packer handle transcoding the input stream. // lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dest io.Writer, src io.Reader, delay time.Duration, bufSize int) error lexTo func(dest io.Writer, src io.Reader, delay time.Duration, bufSize int) error
// buffer handles passing frames from the transcoder // encoders will hold the multiWriteCloser that writes to encoders from the lexer.
// to the target destination. encoders io.WriteCloser
buffer *buffer
// encoder holds the required encoders, which then write to destinations. // isRunning is used to keep track of revid's running state between methods.
encoder []io.Writer
// writeClosers holds the senders that the encoders will write to.
writeClosers []io.WriteCloser
// bitrate hold the last send bitrate calculation result.
bitrate int
mu sync.Mutex
isRunning bool isRunning bool
// wg will be used to wait for any processing routines to finish.
wg sync.WaitGroup wg sync.WaitGroup
// err will channel errors from revid routines to the handle errors routine.
err chan error err chan error
} }
// buffer is a wrapper for a ring.Buffer and provides function to write and
// flush in one Write call.
type buffer ring.Buffer
// Write implements the io.Writer interface. It will write to the underlying
// ring.Buffer and then flush to indicate a complete ring.Buffer write.
func (b *buffer) Write(d []byte) (int, error) {
r := (*ring.Buffer)(b)
n, err := r.Write(d)
r.Flush()
return n, err
}
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful. // an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
@ -151,6 +121,13 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
return &r, nil return &r, nil
} }
// Config returns a copy of revids current config.
//
// Config is not safe for concurrent use.
func (r *Revid) Config() Config {
return r.config
}
// TODO(Saxon): put more thought into error severity. // TODO(Saxon): put more thought into error severity.
func (r *Revid) handleErrors() { func (r *Revid) handleErrors() {
for { for {
@ -167,10 +144,43 @@ func (r *Revid) handleErrors() {
} }
// Bitrate returns the result of the most recent bitrate check. // Bitrate returns the result of the most recent bitrate check.
//
// TODO: get this working again.
func (r *Revid) Bitrate() int { func (r *Revid) Bitrate() int {
return r.bitrate return -1
} }
// reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid. It then
// sets up the data pipeline accordingly to this configuration.
func (r *Revid) reset(config Config) error {
err := r.setConfig(config)
if err != nil {
return err
}
r.config.Logger.SetLevel(config.LogLevel)
err = r.setupPipeline(
func(dst io.WriteCloser, fps, medType int) (io.WriteCloser, error) {
e := mts.NewEncoder(dst, float64(fps), mts.Video)
return e, nil
},
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
return flv.NewEncoder(dst, true, true, fps)
},
ioext.MultiWriteCloser,
)
if err != nil {
return err
}
return nil
}
// setConfig takes a config, checks it's validity and then replaces the current
// revid config.
func (r *Revid) setConfig(config Config) error { func (r *Revid) setConfig(config Config) error {
r.config.Logger = config.Logger r.config.Logger = config.Logger
err := config.Validate(r) err := config.Validate(r)
@ -187,10 +197,10 @@ func (r *Revid) setConfig(config Config) error {
// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder
// respectively. multiWriter will be used to create an ioext.multiWriteCloser // respectively. multiWriter will be used to create an ioext.multiWriteCloser
// so that encoders can write to multiple senders. // so that encoders can write to multiple senders.
func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (io.Writer, error), flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate, mediaType int) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error {
r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) // encoders will hold the encoders that are required for revid's current
// configuration.
r.encoder = r.encoder[:0] var encoders []io.WriteCloser
// mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders
// will hold senders that require FLV encoding. // will hold senders that require FLV encoding.
@ -203,7 +213,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i
for _, out := range r.config.Outputs { for _, out := range r.config.Outputs {
switch out { switch out {
case Http: case Http:
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, ringBufferSize, ringBufferElementSize, writeTimeout) w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0)
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case Rtp: case Rtp:
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
@ -238,7 +248,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i
mediaType = mts.Video mediaType = mts.Video
} }
e, _ := mtsEnc(mw, int(r.config.WriteRate), mediaType) e, _ := mtsEnc(mw, int(r.config.WriteRate), mediaType)
r.encoder = append(r.encoder, e) encoders = append(encoders, e)
} }
// If we have some senders that require FLV encoding then add an FLV // If we have some senders that require FLV encoding then add an FLV
@ -250,9 +260,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i
if err != nil { if err != nil {
return err return err
} }
r.encoder = append(r.encoder, e) encoders = append(encoders, e)
} }
r.encoders = multiWriter(encoders...)
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.setupInput = r.startRaspivid r.setupInput = r.startRaspivid
@ -281,106 +293,65 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.Writer, rate, mediaType int) (i
return nil return nil
} }
func newMtsEncoder(dst io.Writer, writeRate, mediaType int) (io.Writer, error) {
e := mts.NewEncoder(dst, float64(writeRate), mediaType)
return e, nil
}
func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) {
e, err := flv.NewEncoder(dst, true, true, fps)
if err != nil {
return nil, err
}
return e, nil
}
// reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid.
func (r *Revid) reset(config Config) error {
err := r.setConfig(config)
if err != nil {
return err
}
err = r.setupPipeline(newMtsEncoder, newFlvEncoder, ioext.MultiWriteCloser)
if err != nil {
return err
}
return nil
}
// IsRunning returns true if revid is running.
func (r *Revid) IsRunning() bool {
r.mu.Lock()
ret := r.isRunning
r.mu.Unlock()
return ret
}
func (r *Revid) Config() Config {
r.mu.Lock()
cfg := r.config
r.mu.Unlock()
return cfg
}
// setIsRunning sets r.isRunning using b.
func (r *Revid) setIsRunning(b bool) {
r.mu.Lock()
r.isRunning = b
r.mu.Unlock()
}
// Start invokes a Revid to start processing video from a defined input // Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
//
// Start is not safe for concurrent use.
func (r *Revid) Start() error { func (r *Revid) Start() error {
if r.IsRunning() { if r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
return nil return nil
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here r.isRunning = true
r.config.Logger.Log(logger.Debug, pkg+"setting up output") var err error
r.setIsRunning(true) r.closeInput, err = r.setupInput()
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
r.wg.Add(1)
go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput()
if err != nil { if err != nil {
r.Stop() r.Stop()
} }
return err return err
} }
// Stop halts any processing of video data from a camera or file // Stop closes down the pipeline. This closes encoders and sender output routines,
// connections, and/or files.
//
// Stop is not safe for concurrent use.
func (r *Revid) Stop() { func (r *Revid) Stop() {
if !r.IsRunning() { if !r.isRunning {
r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
return return
} }
for _, w := range r.writeClosers { if r.closeInput != nil {
err := w.Close() err := r.closeInput()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"could not close all writeClosers, cannot stop", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error())
} }
} }
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
}
r.setIsRunning(false)
r.wg.Wait()
}
r.config.Logger.Log(logger.Info, pkg+"closing pipeline")
err := r.encoders.Close()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
}
if r.cmd != nil && r.cmd.Process != nil {
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
r.cmd.Process.Kill()
}
r.wg.Wait()
r.isRunning = false
}
// Update takes a map of variables and their values and edits the current config
// if the variables are recognised as valid parameters.
//
// Update is not safe for concurrent use.
func (r *Revid) Update(vars map[string]string) error { func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() { if r.isRunning {
r.Stop() r.Stop()
} }
//look through the vars and update revid where needed //look through the vars and update revid where needed
for key, value := range vars { for key, value := range vars {
switch key { switch key {
@ -505,64 +476,30 @@ func (r *Revid) Update(vars map[string]string) error {
break break
} }
r.config.BurstPeriod = uint(v) r.config.BurstPeriod = uint(v)
case "Logging":
switch value {
case "Debug":
r.config.LogLevel = logger.Debug
case "Info":
r.config.LogLevel = logger.Info
case "Warning":
r.config.LogLevel = logger.Warning
case "Error":
r.config.LogLevel = logger.Error
case "Fatal":
r.config.LogLevel = logger.Fatal
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value)
}
} }
} }
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))
return r.reset(r.config) return r.reset(r.config)
} }
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config
func (r *Revid) outputClips() {
defer r.wg.Done()
lastTime := time.Now()
var count int
loop:
for r.IsRunning() {
// If the ring buffer has something we can read and send off
chunk, err := (*ring.Buffer)(r.buffer).Next(readTimeout)
switch err {
case nil:
// Do nothing.
case ring.ErrTimeout:
r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout")
continue
default:
r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
fallthrough
case io.EOF:
break loop
}
// Loop over encoders and hand bytes over to each one.
for _, e := range r.encoder {
_, err := chunk.WriteTo(e)
if err != nil {
r.err <- err
}
}
// Release the chunk back to the ring buffer.
chunk.Close()
// FIXME(saxon): this doesn't work anymore.
now := time.Now()
deltaTime := now.Sub(lastTime)
if deltaTime > bitrateTime {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second))
r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", (*ring.Buffer)(r.buffer).Len())
lastTime = now
count = 0
}
}
r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore")
}
// startRaspivid sets up things for input from raspivid i.e. starts // startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output. // a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error { func (r *Revid) startRaspivid() (func() error, error) {
r.config.Logger.Log(logger.Info, pkg+"starting raspivid") r.config.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0" const disabled = "0"
@ -594,7 +531,7 @@ func (r *Revid) startRaspivid() error {
switch r.config.InputCodec { switch r.config.InputCodec {
default: default:
return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec)
case H264: case H264:
args = append(args, args = append(args,
"--codec", "H264", "--codec", "H264",
@ -612,7 +549,7 @@ func (r *Revid) startRaspivid() error {
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
if err != nil { if err != nil {
return err return nil, err
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
@ -621,10 +558,10 @@ func (r *Revid) startRaspivid() error {
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(stdout, 0, 0) go r.processFrom(stdout, 0, 0)
return nil return nil, nil
} }
func (r *Revid) startV4L() error { func (r *Revid) startV4L() (func() error, error) {
const defaultVideo = "/dev/video0" const defaultVideo = "/dev/video0"
r.config.Logger.Log(logger.Info, pkg+"starting webcam") r.config.Logger.Log(logger.Info, pkg+"starting webcam")
@ -652,38 +589,37 @@ func (r *Revid) startV4L() error {
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
if err != nil { if err != nil {
return err return nil, nil
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error())
return err return nil, nil
} }
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(stdout, time.Duration(0), 0) go r.processFrom(stdout, time.Duration(0), 0)
return nil return nil, nil
} }
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() (func() error, error) {
f, err := os.Open(r.config.InputPath) f, err := os.Open(r.config.InputPath)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
r.Stop() r.Stop()
return err return nil, err
} }
defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate), 0) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate), 0)
return nil return func() error { return f.Close() }, nil
} }
// startAudioInput is used to start capturing audio from an audio device and processing it. // startAudioInput is used to start capturing audio from an audio device and processing it.
func (r *Revid) startAudioInput() error { func (r *Revid) startAudioInput() (func() error, error) {
ac := &AudioConfig{ ac := &AudioConfig{
SampleRate: r.config.SampleRate, SampleRate: r.config.SampleRate,
Channels: r.config.Channels, Channels: r.config.Channels,
@ -693,12 +629,12 @@ func (r *Revid) startAudioInput() error {
} }
ai := NewAudioInput(ac) ai := NewAudioInput(ac)
go r.processFrom(ai, time.Second/time.Duration(r.config.WriteRate), ai.ChunkSize()) go r.processFrom(ai, time.Second/time.Duration(r.config.WriteRate), ai.ChunkSize())
return nil return nil, nil
} }
func (r *Revid) processFrom(read io.Reader, delay time.Duration, bufSize int) { func (r *Revid) processFrom(read io.Reader, delay time.Duration, bufSize int) {
r.config.Logger.Log(logger.Info, pkg+"reading input data") r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.buffer, read, delay, bufSize) r.err <- r.lexTo(r.encoders, read, delay, bufSize)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.wg.Done() r.wg.Done()
} }

View File

@ -1,3 +1,31 @@
/*
NAME
revid_test.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean).
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid package revid
import ( import (
@ -31,7 +59,6 @@ func TestRaspivid(t *testing.T) {
var c Config var c Config
c.Logger = &logger c.Logger = &logger
c.Input = Raspivid c.Input = Raspivid
c.Outputs = make([]uint8, 1)
rv, err := New(c, ns) rv, err := New(c, ns)
if err != nil { if err != nil {
@ -47,11 +74,8 @@ func TestRaspivid(t *testing.T) {
// testLogger implements a netsender.Logger. // testLogger implements a netsender.Logger.
type testLogger struct{} type testLogger struct{}
// SetLevel normally sets the logging level, but it is a no-op in our case. func (tl *testLogger) SetLevel(level int8) {}
func (tl *testLogger) SetLevel(level int8) {
}
// Log requests the Logger to write a message at the given level.
func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
if level < -1 || level > 5 { if level < -1 || level > 5 {
@ -71,43 +95,34 @@ func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
// tstMtsEncoder emulates the mts.Encoder to the extent of the dst field. // tstMtsEncoder emulates the mts.Encoder to the extent of the dst field.
// This will allow access to the dst to check that it has been set corrctly. // This will allow access to the dst to check that it has been set corrctly.
type tstMtsEncoder struct { type tstMtsEncoder struct {
dst io.Writer // dst is here solely to detect the type stored in the encoder.
// No data is written to dst.
dst io.WriteCloser
} }
// newTstMtsEncoder returns a pointer to a newTsMtsEncoder. func (e *tstMtsEncoder) Write(d []byte) (int, error) { return len(d), nil }
func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { func (e *tstMtsEncoder) Close() error { return nil }
return &tstMtsEncoder{dst: dst}, nil
}
func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil }
// tstFlvEncoder emulates the flv.Encoder to the extent of the dst field. // tstFlvEncoder emulates the flv.Encoder to the extent of the dst field.
// This will allow access to the dst to check that it has been set corrctly. // This will allow access to the dst to check that it has been set corrctly.
type tstFlvEncoder struct { type tstFlvEncoder struct {
dst io.Writer // dst is here solely to detect the type stored in the encoder.
} // No data is written to dst.
dst io.WriteCloser
// newTstFlvEncoder returns a pointer to a new tstFlvEncoder.
func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) {
return &tstFlvEncoder{dst: dst}, nil
} }
func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil }
func (e *tstFlvEncoder) Close() error { return nil }
// dummyMultiWriter emulates the MultiWriter provided by std lib, so that we // dummyMultiWriter emulates the MultiWriter provided by std lib, so that we
// can access the destinations. // can access the destinations.
type dummyMultiWriter struct { type dummyMultiWriter struct {
// dst is here solely to detect the types stored in the multiWriter.
// No data is written to dst.
dst []io.WriteCloser dst []io.WriteCloser
} }
func newDummyMultiWriter(dst ...io.WriteCloser) io.WriteCloser {
return &dummyMultiWriter{
dst: dst,
}
}
func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil }
func (w *dummyMultiWriter) Close() error { return nil } func (w *dummyMultiWriter) Close() error { return nil }
// TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the
@ -216,20 +231,30 @@ func TestResetEncoderSenderSetup(t *testing.T) {
} }
// This logic is what we want to check. // This logic is what we want to check.
err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) err = rv.setupPipeline(
func(dst io.WriteCloser, rate int) (io.WriteCloser, error) {
return &tstMtsEncoder{dst: dst}, nil
},
func(dst io.WriteCloser, rate int) (io.WriteCloser, error) {
return &tstFlvEncoder{dst: dst}, nil
},
func(writers ...io.WriteCloser) io.WriteCloser {
return &dummyMultiWriter{dst: writers}
},
)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v for test %v", err, testNum) t.Fatalf("unexpected error: %v for test %v", err, testNum)
} }
// First check that we have the correct number of encoders. // First check that we have the correct number of encoders.
got := len(rv.encoder) got := len(rv.encoders.(*dummyMultiWriter).dst)
want := len(test.encoders) want := len(test.encoders)
if got != want { if got != want {
t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want) t.Errorf("incorrect number of encoders in revid for test: %v. \nGot: %v\nWant: %v\n", testNum, got, want)
} }
// Now check the correctness of encoders and their destinations. // Now check the correctness of encoders and their destinations.
for _, e := range rv.encoder { for _, e := range rv.encoders.(*dummyMultiWriter).dst {
// Get e's type. // Get e's type.
encoderType := fmt.Sprintf("%T", e) encoderType := fmt.Sprintf("%T", e)
@ -245,7 +270,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
} }
// Now check that this encoder has correct number of destinations (senders). // Now check that this encoder has correct number of destinations (senders).
var ms io.Writer var ms io.WriteCloser
switch encoderType { switch encoderType {
case mtsEncoderStr: case mtsEncoderStr:
ms = e.(*tstMtsEncoder).dst ms = e.(*tstMtsEncoder).dst

View File

@ -29,7 +29,6 @@ LICENSE
package revid package revid
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -119,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
// Extract location from reply // Extract location from reply
g, err := dec.String("ll") g, err := dec.String("ll")
if err != nil { if err != nil {
log(logger.Warning, pkg+"No location in reply") log(logger.Debug, pkg+"No location in reply")
} else { } else {
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g) mts.Meta.Add("loc", g)
@ -156,24 +155,24 @@ func (s *fileSender) Close() error { return s.file.Close() }
type mtsSender struct { type mtsSender struct {
dst io.WriteCloser dst io.WriteCloser
buf []byte buf []byte
ringBuf *ring.Buffer ring *ring.Buffer
next []byte next []byte
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
curPid int curPid int
quit chan struct{} done chan struct{}
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup wg sync.WaitGroup
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rbSize int, rbElementSize int, wTimeout time.Duration) *mtsSender { func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender {
s := &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
log: log, log: log,
ringBuf: ring.NewBuffer(rbSize, rbElementSize, wTimeout), ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout),
quit: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)
go s.output() go s.output()
@ -185,25 +184,23 @@ func (s *mtsSender) output() {
var chunk *ring.Chunk var chunk *ring.Chunk
for { for {
select { select {
case <-s.quit: case <-s.done:
s.log(logger.Info, pkg+"mtsSender: got quit signal, terminating output routine") s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine")
defer s.wg.Done() defer s.wg.Done()
return return
default: default:
// If chunk is nil then we're ready to get another from the ringBuffer. // If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil { if chunk == nil {
var err error var err error
chunk, err = s.ringBuf.Next(readTimeout) chunk, err = s.ring.Next(0)
switch err { switch err {
case nil: case nil, io.EOF:
continue continue
case ring.ErrTimeout: case ring.ErrTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
continue continue
default: default:
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
fallthrough
case io.EOF:
continue continue
} }
} }
@ -235,11 +232,11 @@ func (s *mtsSender) Write(d []byte) (int, error) {
copy(s.pkt[:], bytes) copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID() s.curPid = s.pkt.PID()
if s.curPid == mts.PatPid && len(s.buf) > 0 { if s.curPid == mts.PatPid && len(s.buf) > 0 {
_, err := s.ringBuf.Write(s.buf) _, err := s.ring.Write(s.buf)
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
} }
s.ringBuf.Flush() s.ring.Flush()
s.buf = s.buf[:0] s.buf = s.buf[:0]
} }
return len(d), nil return len(d), nil
@ -247,7 +244,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
// Close implements io.Closer. // Close implements io.Closer.
func (s *mtsSender) Close() error { func (s *mtsSender) Close() error {
close(s.quit) close(s.done)
s.wg.Wait() s.wg.Wait()
return nil return nil
} }
@ -255,13 +252,13 @@ func (s *mtsSender) Close() error {
// rtmpSender implements loadSender for a native RTMP destination. // rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct { type rtmpSender struct {
conn *rtmp.Conn conn *rtmp.Conn
url string url string
timeout uint timeout uint
retries int retries int
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
ring *ring.Buffer
data []byte done chan struct{}
wg sync.WaitGroup
} }
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
@ -283,24 +280,76 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
ring: ring.NewBuffer(10, rbElementSize, 0),
done: make(chan struct{}),
} }
s.wg.Add(1)
go s.output()
return s, err return s, err
} }
// output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() {
var chunk *ring.Chunk
for {
select {
case <-s.done:
s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ring buffer.
if chunk == nil {
var err error
chunk, err = s.ring.Next(0)
switch err {
case nil, io.EOF:
continue
case ring.ErrTimeout:
s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error())
continue
}
}
if s.conn == nil {
s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...")
err := s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
continue
}
}
_, err := s.conn.Write(chunk.Bytes())
switch err {
case nil, rtmp.ErrInvalidFlvTag:
default:
s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error())
err = s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
}
continue
}
chunk.Close()
chunk = nil
}
}
}
// Write implements io.Writer. // Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) { func (s *rtmpSender) Write(d []byte) (int, error) {
if s.conn == nil { _, err := s.ring.Write(d)
return 0, errors.New("no rtmp connection, cannot write")
}
_, err := s.conn.Write(d)
if err != nil { if err != nil {
err = s.restart() s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error())
} }
return len(d), err s.ring.Flush()
return len(d), nil
} }
func (s *rtmpSender) restart() error { func (s *rtmpSender) restart() error {
s.Close() s.close()
var err error var err error
for n := 0; n < s.retries; n++ { for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
@ -316,6 +365,14 @@ func (s *rtmpSender) restart() error {
} }
func (s *rtmpSender) Close() error { func (s *rtmpSender) Close() error {
if s.done != nil {
close(s.done)
}
s.wg.Wait()
return s.close()
}
func (s *rtmpSender) close() error {
if s.conn == nil { if s.conn == nil {
return nil return nil
} }

View File

@ -41,14 +41,6 @@ import (
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
var ( var (
errSendFailed = errors.New("send failed") errSendFailed = errors.New("send failed")
) )
@ -56,29 +48,50 @@ var (
// destination simulates a destination for the mtsSender. It allows for the // destination simulates a destination for the mtsSender. It allows for the
// emulation of failed and delayed sends. // emulation of failed and delayed sends.
type destination struct { type destination struct {
// Holds the clips written to this destination using Write.
buf [][]byte buf [][]byte
// testFails is set to true if we would like a write to fail at a particular
// clip as determined by failAt.
testFails bool testFails bool
failAt int failAt int
currentPkt int
// Holds the current clip number.
currentClip int
// Pointer to the testing.T of a test where this struct is being used. This
// is used so that logging can be done through the testing log utilities.
t *testing.T t *testing.T
// sendDelay is the amount of time we would like a Write to be delayed when
// we hit the clip number indicated by delayAt.
sendDelay time.Duration sendDelay time.Duration
delayAt int delayAt int
// done will be used to send a signal to the main routine to indicate that
// the destination has received all clips. doneAt indicates the final clip
// number.
done chan struct{}
doneAt int
} }
func (ts *destination) Write(d []byte) (int, error) { func (ts *destination) Write(d []byte) (int, error) {
ts.t.Log("writing clip to destination") ts.t.Log("writing clip to destination")
if ts.delayAt != 0 && ts.currentPkt == ts.delayAt { if ts.delayAt != 0 && ts.currentClip == ts.delayAt {
time.Sleep(ts.sendDelay) time.Sleep(ts.sendDelay)
} }
if ts.testFails && ts.currentPkt == ts.failAt { if ts.testFails && ts.currentClip == ts.failAt {
ts.t.Log("failed send") ts.t.Log("failed send")
ts.currentPkt++ ts.currentClip++
return 0, errSendFailed return 0, errSendFailed
} }
cpy := make([]byte, len(d)) cpy := make([]byte, len(d))
copy(cpy, d) copy(cpy, d)
ts.buf = append(ts.buf, cpy) ts.buf = append(ts.buf, cpy)
ts.currentPkt++ if ts.currentClip == ts.doneAt {
close(ts.done)
}
ts.currentClip++
return len(d), nil return len(d), nil
} }
@ -118,8 +131,9 @@ func TestMtsSenderSegment(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
tstDst := &destination{t: t} const numberOfClips = 11
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
@ -134,12 +148,12 @@ func TestMtsSenderSegment(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give the mtsSender some time to finish up and then Close it. // Wait until the destination has all the data, then close the sender.
time.Sleep(10 * time.Millisecond) <-dst.done
sender.Close() sender.Close()
// Check the data. // Check the data.
result := tstDst.buf result := dst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)
@ -196,8 +210,8 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
tstDst := &destination{t: t, testFails: true, failAt: clipToFailAt} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, ringBufferSize, ringBufferElementSize, writeTimeout) sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder and send PSI every 10 packets. // Turn time based PSI writing off for encoder and send PSI every 10 packets.
@ -212,12 +226,12 @@ func TestMtsSenderFailedSend(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give the mtsSender some time to finish up and then Close it. // Wait until the destination has all the data, then close the sender.
time.Sleep(10 * time.Millisecond) <-dst.done
sender.Close() sender.Close()
// Check that we have data as expected. // Check that we have data as expected.
result := tstDst.buf result := dst.buf
expectData := 0 expectData := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)
@ -276,8 +290,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
tstDst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(tstDst, (*dummyLogger)(t).log, 1, ringBufferElementSize, writeTimeout) sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
@ -291,12 +305,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
encoder.Write([]byte{byte(i)}) encoder.Write([]byte{byte(i)})
} }
// Give mtsSender time to finish up then Close. // Wait until the destination has all the data, then close the sender.
time.Sleep(100 * time.Millisecond) <-dst.done
sender.Close() sender.Close()
// Check the data. // Check the data.
result := tstDst.buf result := dst.buf
expectedCC := 0 expectedCC := 0
for clipNo, clip := range result { for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo) t.Logf("Checking clip: %v\n", clipNo)