diff --git a/.circleci/config.yml b/.circleci/config.yml index b1ae37f2..b25ea562 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,7 +3,9 @@ jobs: build: docker: # CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/ - - image: circleci/golang:1.11 + - image: circleci/golang:1.12 + environment: + GO111MODULE: "on" working_directory: /go/src/bitbucket.org/ausocean/av diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..6c6977ea --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module bitbucket.org/ausocean/av + +go 1.12 + +require ( + bitbucket.org/ausocean/iot v1.2.4 + bitbucket.org/ausocean/utils v1.2.4 + github.com/BurntSushi/toml v0.3.1 // indirect + github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect + github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 + github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 + github.com/mewkiz/flac v1.0.5 + github.com/pkg/errors v0.8.1 // indirect + github.com/sergi/go-diff v1.0.0 // indirect + github.com/stretchr/testify v1.3.0 // indirect + go.uber.org/atomic v1.3.2 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.9.1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..fd14eb02 --- /dev/null +++ b/go.sum @@ -0,0 +1,45 @@ +bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= +bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= +bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= +bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= +github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= +github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= +github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= +github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= +github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= +github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= +github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= +github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= +github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= +github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= +github.com/mewkiz/flac v1.0.5/go.mod h1:EHZNU32dMF6alpurYyKHDLYpW1lYpBZ5WrXi/VuNIGs= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= +github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/init/Makefile b/init/Makefile index 787d3149..0bbe57c7 100644 --- a/init/Makefile +++ b/init/Makefile @@ -1,18 +1,28 @@ -# install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.) -# NB: the default (soft) install does not override conf files +# Install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.) +# and create a dhcpcd.enter-hook for setting the MAC address. +# MA and DK can be optionally passed to Make, e.g, for a hard (first-time) installation: +# sudo MA=mac DK=dk install_hard +# NB: The default (soft) install does not override conf files. USER := $(shell whoami) PATH := /usr/local/go/bin:$(PATH) +ifeq ($(MA),) + MA := "00:E0:4C:00:00:01" +endif +ifeq ($(DK),) + DK := 0 +endif .SILENT:make_dirs .SILENT:soft_copy_files .SILENT:hard_copy_files +.SILENT:set_mac .SILENT:syncreboot .SILENT:clean -install: as_root make_dirs soft_copy_files syncreboot +install: as_root make_dirs soft_copy_files @echo "Install complete" -install_hard: as_root make_dirs hard_copy_files syncreboot +install_hard: as_root make_dirs hard_copy_files set_mac syncreboot @echo "Hard install complete" as_root: @@ -39,7 +49,7 @@ soft_copy_files: if [ -f /etc/netsender.conf ] ; then \ echo "/etc/netsender.conf left unmodified" ; \ else \ - cp netsender.conf /etc; \ + printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf; \ chown pi /etc/netsender.conf; \ fi @@ -53,9 +63,13 @@ hard_copy_files: echo "Backed up netsender.conf to /etc/netsender.conf.bak"; \ cp /etc/netsender.conf /etc/netsender.conf.bak ; \ fi - cp -f netsender.conf /etc + printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf chown pi /etc/netsender.conf +set_mac: + printf "ip link set eth0 address $(MA)\n" > /etc/dhcpcd.enter-hook + chmod guo+x /etc/dhcpcd.enter-hook + syncreboot: cd ../../utils/cmd/syncreboot; make; make install diff --git a/init/netsender.conf b/init/netsender.conf deleted file mode 100644 index 9706d8c6..00000000 --- a/init/netsender.conf +++ /dev/null @@ -1,4 +0,0 @@ -# /etc/netsender.conf -# replace with the actual MAC address and device key respectively -ma 00:00:00:00:00:00 -dk 0 \ No newline at end of file diff --git a/revid/mtsSender_test.go b/revid/mtsSender_test.go new file mode 100644 index 00000000..ad4a18ba --- /dev/null +++ b/revid/mtsSender_test.go @@ -0,0 +1,260 @@ +/* +NAME + mtsSender_test.go + +DESCRIPTION + mtsSender_test.go contains tests that validate the functionalilty of the + mtsSender under senders.go. Tests include checks that the mtsSender is + segmenting sends correctly, and also that it can correct discontinuities. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package revid + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/pes" + + "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/mts/meta" + "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" +) + +// Ring buffer sizes and read/write timeouts. +const ( + rbSize = 100 + rbElementSize = 150000 + wTimeout = 10 * time.Millisecond + rTimeout = 10 * time.Millisecond +) + +// sender simulates sending of video data, creating discontinuities if +// testDiscontinuities is set to true. +type sender struct { + buf [][]byte + testDiscontinuities bool + discontinuityAt int + currentPkt int +} + +// send takes d and neglects if testDiscontinuities is true, returning an error, +// otherwise d is appended to senders buf. +func (ts *sender) send(d []byte) error { + if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.currentPkt++ + return errors.New("could not send") + } + cpy := make([]byte, len(d)) + copy(cpy, d) + ts.buf = append(ts.buf, cpy) + ts.currentPkt++ + return nil +} + +// log implements the required logging func for some of the structs in use +// within tests. +func log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + fmt.Printf(msg, args) +} + +// buffer implements io.Writer and handles the writing of data to a +// ring buffer used in tests. +type buffer ring.Buffer + +// Write implements the io.Writer interface. +func (b *buffer) Write(d []byte) (int, error) { + r := (*ring.Buffer)(b) + n, err := r.Write(d) + r.Flush() + return n, err +} + +// TestSegment ensures that the mtsSender correctly segments data into clips +// based on positioning of PSI in the mtsEncoder's output stream. +func TestSegment(t *testing.T) { + mts.Meta = meta.New() + + // Create ringBuffer, sender, loadsender and the MPEGTS encoder. + tstSender := &sender{} + loadSender := newMtsSender(tstSender, log) + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + encoder := mts.NewEncoder((*buffer)(rb), 25) + + // Turn time based PSI writing off for encoder. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + // Insert a payload so that we check that the segmentation works correctly + // in this regard. Packet number will be used. + encoder.Encode([]byte{byte(i)}) + rb.Flush() + + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } + + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + err = loadSender.send() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + loadSender.release() + } + } + + result := tstSender.buf + expectData := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) + + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } + + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay. + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { + payload, err := pkt.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse PES from the MTS payload. + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the PES packet and convert to an int. + data := int8(pes.Data()[0]) + + // Calc expected data in the PES and then check. + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) + } + expectData++ + } + } + } +} + +func TestSendFailDiscontinuity(t *testing.T) { + mts.Meta = meta.New() + + // Create ringBuffer sender, loadSender and the MPEGTS encoder. + const clipWithDiscontinuity = 3 + tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} + loadSender := newMtsSender(tstSender, log) + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + encoder := mts.NewEncoder((*buffer)(rb), 25) + + // Turn time based PSI writing off for encoder. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + // Our payload will just be packet number. + encoder.Encode([]byte{byte(i)}) + rb.Flush() + + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } + + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + loadSender.send() + loadSender.release() + } + } + + result := tstSender.buf + + // First check that we have less clips as expected. + expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 + gotLen := len(result) + if gotLen != expectedLen { + t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) + } + + // Now check that the discontinuity indicator is set at the discontinuityClip PAT. + disconClip := result[clipWithDiscontinuity] + firstPkt := disconClip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + if !discon { + t.Fatalf("Did not get discontinuity indicator for PAT") + } + +} diff --git a/revid/revid.go b/revid/revid.go index f36cbca0..33c3d0cc 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -114,10 +114,6 @@ type Revid struct { // destination is the target endpoint. destination []loadSender - // rtpSender is an unbuffered sender. - // It is used to isolate RTP from ring buffer-induced delays. - rtpSender *rtpSender - // bitrate hold the last send bitrate calculation result. bitrate int @@ -143,40 +139,22 @@ type packer struct { // are deemed to be successful, although a successful // write may include a dropped frame. func (p *packer) Write(frame []byte) (int, error) { - if len(p.owner.destination) != 0 { - n, err := p.owner.buffer.Write(frame) - if err != nil { - if err == ring.ErrDropped { - p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) - return len(frame), nil - } - p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) - return n, err - } + if len(p.owner.destination) == 0 { + panic("must have at least 1 destination") } - // If we have an rtp sender bypass ringbuffer and give straight to sender - if p.owner.rtpSender != nil { - err := p.owner.rtpSender.send(frame) - if err != nil { - p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) + n, err := p.owner.buffer.Write(frame) + if err != nil { + if err == ring.ErrDropped { + p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) + return len(frame), nil } + p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) + return n, err } - p.packetCount++ - var hasRtmp bool - for _, d := range p.owner.config.Outputs { - if d == Rtmp { - hasRtmp = true - break - } - } - now := time.Now() - if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { - p.owner.buffer.Flush() - p.packetCount = 0 - p.lastTime = now - } + p.owner.buffer.Flush() + return len(frame), nil } @@ -232,7 +210,7 @@ func (r *Revid) reset(config Config) error { r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout) } - r.destination = r.destination[:0] + r.destination = make([]loadSender, 0, len(r.config.Outputs)) for _, typ := range r.config.Outputs { switch typ { case File: @@ -241,12 +219,6 @@ func (r *Revid) reset(config Config) error { return err } r.destination = append(r.destination, s) - case FfmpegRtmp: - s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) - if err != nil { - return err - } - r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { @@ -254,18 +226,18 @@ func (r *Revid) reset(config Config) error { } r.destination = append(r.destination, s) case Http: - r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) - case Udp: - s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) + switch r.Config().Packetization { + case Mpegts: + r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)) + default: + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) + } + case Rtp: + s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { return err } r.destination = append(r.destination, s) - case Rtp: - r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) - if err != nil { - return err - } } } @@ -387,23 +359,25 @@ func (r *Revid) Update(vars map[string]string) error { for key, value := range vars { switch key { case "Output": - r.config.Outputs = make([]uint8, 1) - // FIXME(kortschak): There can be only one! - // How do we specify outputs after the first? - // - // Maybe we shouldn't be doing this! - switch value { - case "File": - r.config.Outputs[0] = File - case "Http": - r.config.Outputs[0] = Http - case "Rtmp": - r.config.Outputs[0] = Rtmp - case "FfmpegRtmp": - r.config.Outputs[0] = FfmpegRtmp - default: - r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue + outputs := strings.Split(value, ",") + r.config.Outputs = make([]uint8, len(outputs)) + + for i, output := range outputs { + switch output { + case "File": + r.config.Outputs[i] = File + case "Http": + r.config.Outputs[i] = Http + case "Rtmp": + r.config.Outputs[i] = Rtmp + case "FfmpegRtmp": + r.config.Outputs[i] = FfmpegRtmp + case "Rtp": + r.config.Outputs[i] = Rtp + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid output param", "value", value) + continue + } } case "Packetization": @@ -425,6 +399,8 @@ func (r *Revid) Update(vars map[string]string) error { r.config.FramesPerClip = uint(f) case "RtmpUrl": r.config.RtmpUrl = value + case "RtpAddr": + r.config.RtpAddress = value case "Bitrate": v, err := strconv.ParseUint(value, 10, 0) if err != nil { diff --git a/revid/senders.go b/revid/senders.go index af7657b2..ac76736e 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,14 +29,13 @@ LICENSE package revid import ( - "errors" "fmt" - "io" "net" "os" - "os/exec" "strconv" + "github.com/Comcast/gots/packet" + "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/rtp" @@ -45,6 +44,33 @@ import ( "bitbucket.org/ausocean/utils/ring" ) +// Sender is intended to provided functionality for the sending of a byte slice +// to an implemented destination. +type Sender interface { + // send takes the bytes slice d and sends to a particular destination as + // implemented. + send(d []byte) error +} + +// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. +type minimalHttpSender struct { + client *netsender.Sender + log func(lvl int8, msg string, args ...interface{}) +} + +// newMinimalHttpSender returns a pointer to a new minimalHttpSender. +func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { + return &minimalHttpSender{ + client: ns, + log: log, + } +} + +// send takes a bytes slice d and sends to http using s' http client. +func (s *minimalHttpSender) send(d []byte) error { + return httpSend(d, s.client, s.log) +} + // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. @@ -105,6 +131,86 @@ func (s *fileSender) close() error { return s.file.Close() } +// mtsSender implemented loadSender and provides sending capability specifically +// for use with MPEGTS packetization. It handles the construction of appropriately +// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// setting the discontinuity indicator for the first packet of a clip. +type mtsSender struct { + sender Sender + buf []byte + next []byte + pkt packet.Packet + failed bool + discarded bool + repairer *mts.DiscontinuityRepairer + chunk *ring.Chunk + curPid int +} + +// newMtsSender returns a new mtsSender. +func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { + return &mtsSender{ + sender: s, + repairer: mts.NewDiscontinuityRepairer(), + } +} + +// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and +// assigning to s.curPid. s.next if exists is also appended to the sender buf. +func (s *mtsSender) load(c *ring.Chunk) error { + if s.next != nil { + s.buf = append(s.buf, s.next...) + } + s.chunk = c + bytes := s.chunk.Bytes() + s.next = bytes + copy(s.pkt[:], bytes) + s.curPid = s.pkt.PID() + return nil +} + +// send checks the currently loaded paackets PID; if it is a PAT then what is in +// the mtsSenders buffer is fixed and sent. +func (ms *mtsSender) send() error { + if ms.curPid == mts.PatPid && len(ms.buf) > 0 { + err := ms.fixAndSend() + if err != nil { + return err + } + ms.buf = ms.buf[:0] + } + return nil +} + +// fixAndSend checks for discontinuities in the senders buffer and then sends. +// If a discontinuity is found the PAT packet at the start of the clip has it's +// discontintuity indicator set to true. +func (ms *mtsSender) fixAndSend() error { + err := ms.repairer.Repair(ms.buf) + if err == nil { + err = ms.sender.send(ms.buf) + if err == nil { + return nil + } + } + ms.failed = true + ms.repairer.Failed() + return err +} + +func (s *mtsSender) close() error { return nil } + +// release will set the s.fail flag to false and clear the buffer if +// the previous send was a fail. The currently loaded chunk is also closed. +func (s *mtsSender) release() { + if s.failed { + s.buf = s.buf[:0] + s.failed = false + } + s.chunk.Close() + s.chunk = nil +} + // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender @@ -133,15 +239,19 @@ func (s *httpSender) send() error { // if the chunk has been cleared. return nil } + return httpSend(s.chunk.Bytes(), s.client, s.log) +} + +func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false - ip := s.client.Param("ip") + ip := client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { send = true - pins[i].Value = s.chunk.Len() - pins[i].Data = s.chunk.Bytes() + pins[i].Value = len(d) + pins[i].Data = d pins[i].MimeType = "video/mp2t" break } @@ -152,17 +262,16 @@ func (s *httpSender) send() error { } var err error var reply string - reply, _, err = s.client.Send(netsender.RequestRecv, pins) + reply, _, err = client.Send(netsender.RequestRecv, pins) if err != nil { return err } - - return s.extractMeta(reply) + return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. -func (s *httpSender) extractMeta(r string) error { +func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil @@ -170,18 +279,18 @@ func (s *httpSender) extractMeta(r string) error { // Extract time from reply t, err := dec.Int("ts") if err != nil { - s.log(logger.Warning, pkg+"No timestamp in reply") + log(logger.Warning, pkg+"No timestamp in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) + log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply g, err := dec.String("ll") if err != nil { - s.log(logger.Warning, pkg+"No location in reply") + log(logger.Warning, pkg+"No location in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) + log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } @@ -197,59 +306,6 @@ func (s *httpSender) release() { func (s *httpSender) close() error { return nil } -// ffmpegSender implements loadSender for an FFMPEG RTMP destination. -type ffmpegSender struct { - ffmpeg io.WriteCloser - - chunk *ring.Chunk -} - -func newFfmpegSender(url, framerate string) (*ffmpegSender, error) { - cmd := exec.Command(ffmpegPath, - "-f", "h264", - "-r", framerate, - "-i", "-", - "-f", "lavfi", - "-i", "aevalsrc=0", - "-fflags", "nobuffer", - "-vcodec", "copy", - "-acodec", "aac", - "-map", "0:0", - "-map", "1:0", - "-strict", "experimental", - "-f", "flv", - url, - ) - w, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - err = cmd.Start() - if err != nil { - return nil, err - } - return &ffmpegSender{ffmpeg: w}, nil -} - -func (s *ffmpegSender) load(c *ring.Chunk) error { - s.chunk = c - return nil -} - -func (s *ffmpegSender) send() error { - _, err := s.chunk.WriteTo(s.ffmpeg) - return err -} - -func (s *ffmpegSender) release() { - s.chunk.Close() - s.chunk = nil -} - -func (s *ffmpegSender) close() error { - return s.ffmpeg.Close() -} - // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -332,46 +388,12 @@ func (s *rtmpSender) close() error { return nil } -// udpSender implements loadSender for a native udp destination. -type udpSender struct { - conn net.Conn - log func(lvl int8, msg string, args ...interface{}) - chunk *ring.Chunk -} - -func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) { - conn, err := net.Dial("udp", addr) - if err != nil { - return nil, err - } - return &udpSender{ - conn: conn, - log: log, - }, nil -} - -func (s *udpSender) load(c *ring.Chunk) error { - s.chunk = c - return nil -} - -func (s *udpSender) send() error { - _, err := s.chunk.WriteTo(s.conn) - return err -} - -func (s *udpSender) release() { - s.chunk.Close() - s.chunk = nil -} - -func (s *udpSender) close() error { return nil } - // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder + chunk *ring.Chunk } func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { @@ -386,12 +408,19 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ return s, nil } -func (s *rtpSender) send(d []byte) error { - var err error - if d != nil { - _, err = s.encoder.Write(d) - } else { - err = errors.New("no data to send provided") - } +func (s *rtpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtpSender) close() error { return nil } + +func (s *rtpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *rtpSender) send() error { + _, err := s.chunk.WriteTo(s.encoder) return err } diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go new file mode 100644 index 00000000..adccebad --- /dev/null +++ b/stream/mts/discontinuity.go @@ -0,0 +1,109 @@ +/* +NAME + discontinuity.go + +DESCRIPTION + discontinuity.go provides functionality for detecting discontinuities in + mpegts and accounting for using the discontinuity indicator in the adaptation + field. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + discontinuity.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package mts + +import ( + "github.com/Comcast/gots/packet" +) + +// discontinuityRepairer provides function to detect discontinuities in mpegts +// and set the discontinuity indicator as appropriate. +type DiscontinuityRepairer struct { + expCC map[int]int +} + +// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer. +func NewDiscontinuityRepairer() *DiscontinuityRepairer { + return &DiscontinuityRepairer{ + expCC: map[int]int{ + PatPid: 16, + PmtPid: 16, + VideoPid: 16, + }, + } +} + +// Failed is to be called in the case of a failed send. This will decrement the +// expectedCC so that it aligns with the failed chunks cc. +func (dr *DiscontinuityRepairer) Failed() { + dr.decExpectedCC(PatPid) +} + +// Repair takes a clip of mpegts and checks that the first packet, which should +// be a PAT, contains a cc that is expected, otherwise the discontinuity indicator +// is set to true. +func (dr *DiscontinuityRepairer) Repair(d []byte) error { + var pkt packet.Packet + copy(pkt[:], d[:PacketSize]) + pid := pkt.PID() + if pid != PatPid { + panic("Clip to repair must have PAT first") + } + cc := pkt.ContinuityCounter() + expect, _ := dr.ExpectedCC(pid) + if cc != int(expect) { + if packet.ContainsAdaptationField(&pkt) { + (*packet.AdaptationField)(&pkt).SetDiscontinuity(true) + } else { + err := addAdaptationField(&pkt, DiscontinuityIndicator(true)) + if err != nil { + return err + } + } + dr.SetExpectedCC(pid, cc) + copy(d[:PacketSize], pkt[:]) + } + dr.IncExpectedCC(pid) + return nil +} + +// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16 +// and false is returned. +func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) { + if dr.expCC[pid] == 16 { + return 16, false + } + return dr.expCC[pid], true +} + +// incExpectedCC increments the expected cc. +func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) { + dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf +} + +// decExpectedCC decrements the expected cc. +func (dr *DiscontinuityRepairer) decExpectedCC(pid int) { + dr.expCC[pid] = (dr.expCC[pid] - 1) & 0xf +} + +// setExpectedCC sets the expected cc. +func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) { + dr.expCC[pid] = cc +} diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 6ac11cc3..b1e098a4 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -85,7 +85,8 @@ var ( ) const ( - psiInterval = 1 * time.Second + psiInterval = 1 * time.Second + psiSendCount = 7 ) // Meta allows addition of metadata to encoded mts from outside of this pkg. @@ -130,6 +131,10 @@ type Encoder struct { continuity map[int]byte + timeBasedPsi bool + pktCount int + psiSendCount int + psiLastTime time.Time } @@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { frameInterval: time.Duration(float64(time.Second) / fps), ptsOffset: ptsOffset, + timeBasedPsi: true, + + pktCount: 8, + continuity: map[int]byte{ patPid: 0, pmtPid: 0, @@ -159,11 +168,22 @@ const ( hasPTS = 0x2 ) +// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if +// PSI is written based on some time duration, or based on a packet count. +// If b is true, then time based PSI is used, otherwise the PSI is written +// every sendCount. +func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { + e.timeBasedPsi = b + e.psiSendCount = sendCount + e.pktCount = e.psiSendCount +} + // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { now := time.Now() - if now.Sub(e.psiLastTime) > psiInterval { + if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { + e.pktCount = 0 err := e.writePSI() if err != nil { return err @@ -204,6 +224,7 @@ func (e *Encoder) Encode(nalu []byte) error { if err != nil { return err } + e.pktCount++ } e.tick() @@ -226,6 +247,7 @@ func (e *Encoder) writePSI() error { if err != nil { return err } + e.pktCount++ pmtTable, err = updateMeta(pmtTable) if err != nil { return err @@ -243,6 +265,7 @@ func (e *Encoder) writePSI() error { if err != nil { return err } + e.pktCount++ return nil } diff --git a/stream/mts/meta/meta.go b/stream/mts/meta/meta.go index 481b5ae5..188c2d4e 100644 --- a/stream/mts/meta/meta.go +++ b/stream/mts/meta/meta.go @@ -144,6 +144,9 @@ func (m *Data) Delete(key string) { // Encode takes the meta data map and encodes into a byte slice with header // describing the version, length of data and data in TSV format. func (m *Data) Encode() []byte { + if m.enc == nil { + panic("Meta has not been initialized yet") + } m.enc = m.enc[:headSize] // Iterate over map and append entries, only adding tab if we're not on the diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 705f88de..18fe8b5f 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -31,6 +31,8 @@ package mts import ( "errors" "fmt" + + "github.com/Comcast/gots/packet" ) // General mpegts packet properties. @@ -55,11 +57,14 @@ const HeadSize = 4 // Consts relating to adaptation field. const ( - AdaptationIdx = 4 // Index to the adaptation field (index of AFL). - AdaptationControlIdx = 3 // Index to octet with adaptation field control. - AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. - DefaultAdaptationSize = 2 // Default size of the adaptation field. - AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. + AdaptationIdx = 4 // Index to the adaptation field (index of AFL). + AdaptationControlIdx = 3 // Index to octet with adaptation field control. + AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. + DefaultAdaptationSize = 2 // Default size of the adaptation field. + AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. + DefaultAdaptationBodySize = 1 // Default size of the adaptation field body. + DiscontinuityIndicatorMask = 0x80 // Mask for the discontinuity indicator at the discontinuity indicator idk. + DiscontinuityIndicatorIdx = AdaptationIdx + 1 // The index at which the discontinuity indicator is found in an MTS packet. ) // TODO: make this better - currently doesn't make sense. @@ -257,3 +262,52 @@ func (p *Packet) Bytes(buf []byte) []byte { buf = append(buf, p.Payload...) return buf } + +type Option func(p *packet.Packet) + +// addAdaptationField adds an adaptation field to p, and applys the passed options to this field. +// TODO: this will probably break if we already have adaptation field. +func addAdaptationField(p *packet.Packet, options ...Option) error { + if packet.ContainsAdaptationField((*packet.Packet)(p)) { + return errors.New("Adaptation field is already present in packet") + } + // Create space for adaptation field. + copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize]) + + // TODO: seperate into own function + // Update adaptation field control. + p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask + p[AdaptationControlIdx] |= AdaptationControlMask + // Default the adaptationfield. + resetAdaptation(p) + + // Apply and options that have bee passed. + for _, option := range options { + option(p) + } + return nil +} + +// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field +// exists, otherwise an error is returned. +func resetAdaptation(p *packet.Packet) error { + if !packet.ContainsAdaptationField((*packet.Packet)(p)) { + return errors.New("No adaptation field in this packet") + } + p[AdaptationIdx] = DefaultAdaptationBodySize + p[AdaptationIdx+1] = 0x00 + return nil +} + +// DiscontinuityIndicator returns and Option that will set p's discontinuity +// indicator according to f. +func DiscontinuityIndicator(f bool) Option { + return func(p *packet.Packet) { + set := byte(DiscontinuityIndicatorMask) + if !f { + set = 0x00 + } + p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask + p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set + } +}