This commit is contained in:
scruzin 2019-03-08 16:59:23 +10:30
commit aced40b5bd
12 changed files with 726 additions and 192 deletions

View File

@ -3,7 +3,9 @@ jobs:
build:
docker:
# CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/
- image: circleci/golang:1.11
- image: circleci/golang:1.12
environment:
GO111MODULE: "on"
working_directory: /go/src/bitbucket.org/ausocean/av

23
go.mod Normal file
View File

@ -0,0 +1,23 @@
module bitbucket.org/ausocean/av
go 1.12
require (
bitbucket.org/ausocean/iot v1.2.4
bitbucket.org/ausocean/utils v1.2.4
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884
github.com/mewkiz/flac v1.0.5
github.com/pkg/errors v0.8.1 // indirect
github.com/sergi/go-diff v1.0.0 // indirect
github.com/stretchr/testify v1.3.0 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

45
go.sum Normal file
View File

@ -0,0 +1,45 @@
bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4=
bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU=
bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20=
bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c=
github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw=
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks=
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA=
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s=
github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc=
github.com/mewkiz/flac v1.0.5/go.mod h1:EHZNU32dMF6alpurYyKHDLYpW1lYpBZ5WrXi/VuNIGs=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -1,18 +1,28 @@
# install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.)
# NB: the default (soft) install does not override conf files
# Install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.)
# and create a dhcpcd.enter-hook for setting the MAC address.
# MA and DK can be optionally passed to Make, e.g, for a hard (first-time) installation:
# sudo MA=mac DK=dk install_hard
# NB: The default (soft) install does not override conf files.
USER := $(shell whoami)
PATH := /usr/local/go/bin:$(PATH)
ifeq ($(MA),)
MA := "00:E0:4C:00:00:01"
endif
ifeq ($(DK),)
DK := 0
endif
.SILENT:make_dirs
.SILENT:soft_copy_files
.SILENT:hard_copy_files
.SILENT:set_mac
.SILENT:syncreboot
.SILENT:clean
install: as_root make_dirs soft_copy_files syncreboot
install: as_root make_dirs soft_copy_files
@echo "Install complete"
install_hard: as_root make_dirs hard_copy_files syncreboot
install_hard: as_root make_dirs hard_copy_files set_mac syncreboot
@echo "Hard install complete"
as_root:
@ -39,7 +49,7 @@ soft_copy_files:
if [ -f /etc/netsender.conf ] ; then \
echo "/etc/netsender.conf left unmodified" ; \
else \
cp netsender.conf /etc; \
printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf; \
chown pi /etc/netsender.conf; \
fi
@ -53,9 +63,13 @@ hard_copy_files:
echo "Backed up netsender.conf to /etc/netsender.conf.bak"; \
cp /etc/netsender.conf /etc/netsender.conf.bak ; \
fi
cp -f netsender.conf /etc
printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf
chown pi /etc/netsender.conf
set_mac:
printf "ip link set eth0 address $(MA)\n" > /etc/dhcpcd.enter-hook
chmod guo+x /etc/dhcpcd.enter-hook
syncreboot:
cd ../../utils/cmd/syncreboot; make; make install

View File

@ -1,4 +0,0 @@
# /etc/netsender.conf
# replace with the actual MAC address and device key respectively
ma 00:00:00:00:00:00
dk 0

260
revid/mtsSender_test.go Normal file
View File

@ -0,0 +1,260 @@
/*
NAME
mtsSender_test.go
DESCRIPTION
mtsSender_test.go contains tests that validate the functionalilty of the
mtsSender under senders.go. Tests include checks that the mtsSender is
segmenting sends correctly, and also that it can correct discontinuities.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"testing"
"time"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/pes"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
// sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true.
type sender struct {
buf [][]byte
testDiscontinuities bool
discontinuityAt int
currentPkt int
}
// send takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *sender) send(d []byte) error {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.currentPkt++
return errors.New("could not send")
}
cpy := make([]byte, len(d))
copy(cpy, d)
ts.buf = append(ts.buf, cpy)
ts.currentPkt++
return nil
}
// log implements the required logging func for some of the structs in use
// within tests.
func log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
l = "warning"
case logger.Debug:
l = "debug"
case logger.Info:
l = "info"
case logger.Error:
l = "error"
case logger.Fatal:
l = "fatal"
}
msg = l + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
}
// buffer implements io.Writer and handles the writing of data to a
// ring buffer used in tests.
type buffer ring.Buffer
// Write implements the io.Writer interface.
func (b *buffer) Write(d []byte) (int, error) {
r := (*ring.Buffer)(b)
n, err := r.Write(d)
r.Flush()
return n, err
}
// TestSegment ensures that the mtsSender correctly segments data into clips
// based on positioning of PSI in the mtsEncoder's output stream.
func TestSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{}
loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet number will be used.
encoder.Encode([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
err = loadSender.send()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.release()
}
}
result := tstSender.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay.
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0])
// Calc expected data in the PES and then check.
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
func TestSendFailDiscontinuity(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet number.
encoder.Encode([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.send()
loadSender.release()
}
}
result := tstSender.buf
// First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result)
if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
}
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
disconClip := result[clipWithDiscontinuity]
firstPkt := disconClip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
if !discon {
t.Fatalf("Did not get discontinuity indicator for PAT")
}
}

View File

@ -114,10 +114,6 @@ type Revid struct {
// destination is the target endpoint.
destination []loadSender
// rtpSender is an unbuffered sender.
// It is used to isolate RTP from ring buffer-induced delays.
rtpSender *rtpSender
// bitrate hold the last send bitrate calculation result.
bitrate int
@ -143,7 +139,10 @@ type packer struct {
// are deemed to be successful, although a successful
// write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) {
if len(p.owner.destination) != 0 {
if len(p.owner.destination) == 0 {
panic("must have at least 1 destination")
}
n, err := p.owner.buffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
@ -153,30 +152,9 @@ func (p *packer) Write(frame []byte) (int, error) {
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
}
}
// If we have an rtp sender bypass ringbuffer and give straight to sender
if p.owner.rtpSender != nil {
err := p.owner.rtpSender.send(frame)
if err != nil {
p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error())
}
}
p.packetCount++
var hasRtmp bool
for _, d := range p.owner.config.Outputs {
if d == Rtmp {
hasRtmp = true
break
}
}
now := time.Now()
if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
p.owner.buffer.Flush()
p.packetCount = 0
p.lastTime = now
}
return len(frame), nil
}
@ -232,7 +210,7 @@ func (r *Revid) reset(config Config) error {
r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout)
}
r.destination = r.destination[:0]
r.destination = make([]loadSender, 0, len(r.config.Outputs))
for _, typ := range r.config.Outputs {
switch typ {
case File:
@ -241,12 +219,6 @@ func (r *Revid) reset(config Config) error {
return err
}
r.destination = append(r.destination, s)
case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
if err != nil {
return err
}
r.destination = append(r.destination, s)
case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
@ -254,18 +226,18 @@ func (r *Revid) reset(config Config) error {
}
r.destination = append(r.destination, s)
case Http:
switch r.Config().Packetization {
case Mpegts:
r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil))
default:
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
case Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
}
case Rtp:
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil {
return err
}
r.destination = append(r.destination, s)
case Rtp:
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil {
return err
}
}
}
@ -387,24 +359,26 @@ func (r *Revid) Update(vars map[string]string) error {
for key, value := range vars {
switch key {
case "Output":
r.config.Outputs = make([]uint8, 1)
// FIXME(kortschak): There can be only one!
// How do we specify outputs after the first?
//
// Maybe we shouldn't be doing this!
switch value {
outputs := strings.Split(value, ",")
r.config.Outputs = make([]uint8, len(outputs))
for i, output := range outputs {
switch output {
case "File":
r.config.Outputs[0] = File
r.config.Outputs[i] = File
case "Http":
r.config.Outputs[0] = Http
r.config.Outputs[i] = Http
case "Rtmp":
r.config.Outputs[0] = Rtmp
r.config.Outputs[i] = Rtmp
case "FfmpegRtmp":
r.config.Outputs[0] = FfmpegRtmp
r.config.Outputs[i] = FfmpegRtmp
case "Rtp":
r.config.Outputs[i] = Rtp
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
r.config.Logger.Log(logger.Warning, pkg+"invalid output param", "value", value)
continue
}
}
case "Packetization":
switch value {
@ -425,6 +399,8 @@ func (r *Revid) Update(vars map[string]string) error {
r.config.FramesPerClip = uint(f)
case "RtmpUrl":
r.config.RtmpUrl = value
case "RtpAddr":
r.config.RtpAddress = value
case "Bitrate":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {

View File

@ -29,14 +29,13 @@ LICENSE
package revid
import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"strconv"
"github.com/Comcast/gots/packet"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
@ -45,6 +44,33 @@ import (
"bitbucket.org/ausocean/utils/ring"
)
// Sender is intended to provided functionality for the sending of a byte slice
// to an implemented destination.
type Sender interface {
// send takes the bytes slice d and sends to a particular destination as
// implemented.
send(d []byte) error
}
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
type minimalHttpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
}
// newMinimalHttpSender returns a pointer to a new minimalHttpSender.
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
return &minimalHttpSender{
client: ns,
log: log,
}
}
// send takes a bytes slice d and sends to http using s' http client.
func (s *minimalHttpSender) send(d []byte) error {
return httpSend(d, s.client, s.log)
}
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
@ -105,6 +131,86 @@ func (s *fileSender) close() error {
return s.file.Close()
}
// mtsSender implemented loadSender and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
// setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
sender Sender
buf []byte
next []byte
pkt packet.Packet
failed bool
discarded bool
repairer *mts.DiscontinuityRepairer
chunk *ring.Chunk
curPid int
}
// newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
sender: s,
repairer: mts.NewDiscontinuityRepairer(),
}
}
// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and
// assigning to s.curPid. s.next if exists is also appended to the sender buf.
func (s *mtsSender) load(c *ring.Chunk) error {
if s.next != nil {
s.buf = append(s.buf, s.next...)
}
s.chunk = c
bytes := s.chunk.Bytes()
s.next = bytes
copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID()
return nil
}
// send checks the currently loaded paackets PID; if it is a PAT then what is in
// the mtsSenders buffer is fixed and sent.
func (ms *mtsSender) send() error {
if ms.curPid == mts.PatPid && len(ms.buf) > 0 {
err := ms.fixAndSend()
if err != nil {
return err
}
ms.buf = ms.buf[:0]
}
return nil
}
// fixAndSend checks for discontinuities in the senders buffer and then sends.
// If a discontinuity is found the PAT packet at the start of the clip has it's
// discontintuity indicator set to true.
func (ms *mtsSender) fixAndSend() error {
err := ms.repairer.Repair(ms.buf)
if err == nil {
err = ms.sender.send(ms.buf)
if err == nil {
return nil
}
}
ms.failed = true
ms.repairer.Failed()
return err
}
func (s *mtsSender) close() error { return nil }
// release will set the s.fail flag to false and clear the buffer if
// the previous send was a fail. The currently loaded chunk is also closed.
func (s *mtsSender) release() {
if s.failed {
s.buf = s.buf[:0]
s.failed = false
}
s.chunk.Close()
s.chunk = nil
}
// httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct {
client *netsender.Sender
@ -133,15 +239,19 @@ func (s *httpSender) send() error {
// if the chunk has been cleared.
return nil
}
return httpSend(s.chunk.Bytes(), s.client, s.log)
}
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input.
send := false
ip := s.client.Param("ip")
ip := client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = s.chunk.Len()
pins[i].Data = s.chunk.Bytes()
pins[i].Value = len(d)
pins[i].Data = d
pins[i].MimeType = "video/mp2t"
break
}
@ -152,17 +262,16 @@ func (s *httpSender) send() error {
}
var err error
var reply string
reply, _, err = s.client.Send(netsender.RequestRecv, pins)
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
return s.extractMeta(reply)
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func (s *httpSender) extractMeta(r string) error {
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
@ -170,18 +279,18 @@ func (s *httpSender) extractMeta(r string) error {
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
s.log(logger.Warning, pkg+"No timestamp in reply")
log(logger.Warning, pkg+"No timestamp in reply")
} else {
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
s.log(logger.Warning, pkg+"No location in reply")
log(logger.Warning, pkg+"No location in reply")
} else {
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g)
}
@ -197,59 +306,6 @@ func (s *httpSender) release() {
func (s *httpSender) close() error { return nil }
// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
type ffmpegSender struct {
ffmpeg io.WriteCloser
chunk *ring.Chunk
}
func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
cmd := exec.Command(ffmpegPath,
"-f", "h264",
"-r", framerate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
url,
)
w, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
return &ffmpegSender{ffmpeg: w}, nil
}
func (s *ffmpegSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *ffmpegSender) send() error {
_, err := s.chunk.WriteTo(s.ffmpeg)
return err
}
func (s *ffmpegSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *ffmpegSender) close() error {
return s.ffmpeg.Close()
}
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
conn *rtmp.Conn
@ -332,46 +388,12 @@ func (s *rtmpSender) close() error {
return nil
}
// udpSender implements loadSender for a native udp destination.
type udpSender struct {
conn net.Conn
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
return &udpSender{
conn: conn,
log: log,
}, nil
}
func (s *udpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *udpSender) send() error {
_, err := s.chunk.WriteTo(s.conn)
return err
}
func (s *udpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *udpSender) close() error { return nil }
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder
chunk *ring.Chunk
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
@ -386,12 +408,19 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
return s, nil
}
func (s *rtpSender) send(d []byte) error {
var err error
if d != nil {
_, err = s.encoder.Write(d)
} else {
err = errors.New("no data to send provided")
}
func (s *rtpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtpSender) close() error { return nil }
func (s *rtpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)
return err
}

109
stream/mts/discontinuity.go Normal file
View File

@ -0,0 +1,109 @@
/*
NAME
discontinuity.go
DESCRIPTION
discontinuity.go provides functionality for detecting discontinuities in
mpegts and accounting for using the discontinuity indicator in the adaptation
field.
AUTHOR
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
discontinuity.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package mts
import (
"github.com/Comcast/gots/packet"
)
// discontinuityRepairer provides function to detect discontinuities in mpegts
// and set the discontinuity indicator as appropriate.
type DiscontinuityRepairer struct {
expCC map[int]int
}
// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer.
func NewDiscontinuityRepairer() *DiscontinuityRepairer {
return &DiscontinuityRepairer{
expCC: map[int]int{
PatPid: 16,
PmtPid: 16,
VideoPid: 16,
},
}
}
// Failed is to be called in the case of a failed send. This will decrement the
// expectedCC so that it aligns with the failed chunks cc.
func (dr *DiscontinuityRepairer) Failed() {
dr.decExpectedCC(PatPid)
}
// Repair takes a clip of mpegts and checks that the first packet, which should
// be a PAT, contains a cc that is expected, otherwise the discontinuity indicator
// is set to true.
func (dr *DiscontinuityRepairer) Repair(d []byte) error {
var pkt packet.Packet
copy(pkt[:], d[:PacketSize])
pid := pkt.PID()
if pid != PatPid {
panic("Clip to repair must have PAT first")
}
cc := pkt.ContinuityCounter()
expect, _ := dr.ExpectedCC(pid)
if cc != int(expect) {
if packet.ContainsAdaptationField(&pkt) {
(*packet.AdaptationField)(&pkt).SetDiscontinuity(true)
} else {
err := addAdaptationField(&pkt, DiscontinuityIndicator(true))
if err != nil {
return err
}
}
dr.SetExpectedCC(pid, cc)
copy(d[:PacketSize], pkt[:])
}
dr.IncExpectedCC(pid)
return nil
}
// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16
// and false is returned.
func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) {
if dr.expCC[pid] == 16 {
return 16, false
}
return dr.expCC[pid], true
}
// incExpectedCC increments the expected cc.
func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) {
dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf
}
// decExpectedCC decrements the expected cc.
func (dr *DiscontinuityRepairer) decExpectedCC(pid int) {
dr.expCC[pid] = (dr.expCC[pid] - 1) & 0xf
}
// setExpectedCC sets the expected cc.
func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) {
dr.expCC[pid] = cc
}

View File

@ -86,6 +86,7 @@ var (
const (
psiInterval = 1 * time.Second
psiSendCount = 7
)
// Meta allows addition of metadata to encoded mts from outside of this pkg.
@ -130,6 +131,10 @@ type Encoder struct {
continuity map[int]byte
timeBasedPsi bool
pktCount int
psiSendCount int
psiLastTime time.Time
}
@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder {
frameInterval: time.Duration(float64(time.Second) / fps),
ptsOffset: ptsOffset,
timeBasedPsi: true,
pktCount: 8,
continuity: map[int]byte{
patPid: 0,
pmtPid: 0,
@ -159,11 +168,22 @@ const (
hasPTS = 0x2
)
// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if
// PSI is written based on some time duration, or based on a packet count.
// If b is true, then time based PSI is used, otherwise the PSI is written
// every sendCount.
func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
e.timeBasedPsi = b
e.psiSendCount = sendCount
e.pktCount = e.psiSendCount
}
// generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error {
now := time.Now()
if now.Sub(e.psiLastTime) > psiInterval {
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
e.pktCount = 0
err := e.writePSI()
if err != nil {
return err
@ -204,6 +224,7 @@ func (e *Encoder) Encode(nalu []byte) error {
if err != nil {
return err
}
e.pktCount++
}
e.tick()
@ -226,6 +247,7 @@ func (e *Encoder) writePSI() error {
if err != nil {
return err
}
e.pktCount++
pmtTable, err = updateMeta(pmtTable)
if err != nil {
return err
@ -243,6 +265,7 @@ func (e *Encoder) writePSI() error {
if err != nil {
return err
}
e.pktCount++
return nil
}

View File

@ -144,6 +144,9 @@ func (m *Data) Delete(key string) {
// Encode takes the meta data map and encodes into a byte slice with header
// describing the version, length of data and data in TSV format.
func (m *Data) Encode() []byte {
if m.enc == nil {
panic("Meta has not been initialized yet")
}
m.enc = m.enc[:headSize]
// Iterate over map and append entries, only adding tab if we're not on the

View File

@ -31,6 +31,8 @@ package mts
import (
"errors"
"fmt"
"github.com/Comcast/gots/packet"
)
// General mpegts packet properties.
@ -60,6 +62,9 @@ const (
AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields.
DefaultAdaptationSize = 2 // Default size of the adaptation field.
AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3.
DefaultAdaptationBodySize = 1 // Default size of the adaptation field body.
DiscontinuityIndicatorMask = 0x80 // Mask for the discontinuity indicator at the discontinuity indicator idk.
DiscontinuityIndicatorIdx = AdaptationIdx + 1 // The index at which the discontinuity indicator is found in an MTS packet.
)
// TODO: make this better - currently doesn't make sense.
@ -257,3 +262,52 @@ func (p *Packet) Bytes(buf []byte) []byte {
buf = append(buf, p.Payload...)
return buf
}
type Option func(p *packet.Packet)
// addAdaptationField adds an adaptation field to p, and applys the passed options to this field.
// TODO: this will probably break if we already have adaptation field.
func addAdaptationField(p *packet.Packet, options ...Option) error {
if packet.ContainsAdaptationField((*packet.Packet)(p)) {
return errors.New("Adaptation field is already present in packet")
}
// Create space for adaptation field.
copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize])
// TODO: seperate into own function
// Update adaptation field control.
p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask
p[AdaptationControlIdx] |= AdaptationControlMask
// Default the adaptationfield.
resetAdaptation(p)
// Apply and options that have bee passed.
for _, option := range options {
option(p)
}
return nil
}
// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field
// exists, otherwise an error is returned.
func resetAdaptation(p *packet.Packet) error {
if !packet.ContainsAdaptationField((*packet.Packet)(p)) {
return errors.New("No adaptation field in this packet")
}
p[AdaptationIdx] = DefaultAdaptationBodySize
p[AdaptationIdx+1] = 0x00
return nil
}
// DiscontinuityIndicator returns and Option that will set p's discontinuity
// indicator according to f.
func DiscontinuityIndicator(f bool) Option {
return func(p *packet.Packet) {
set := byte(DiscontinuityIndicatorMask)
if !f {
set = 0x00
}
p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask
p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set
}
}