diff --git a/.circleci/config.yml b/.circleci/config.yml index b1ae37f2..b25ea562 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,7 +3,9 @@ jobs: build: docker: # CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/ - - image: circleci/golang:1.11 + - image: circleci/golang:1.12 + environment: + GO111MODULE: "on" working_directory: /go/src/bitbucket.org/ausocean/av diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 893f249b..116aa2a7 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -109,25 +109,26 @@ func handleFlags() revid.Config { inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") - quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") - verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Info, Warning, Error, Fatal") - framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent") - rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") - bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") - outputPathPtr = flag.String("OutputPath", "", "The directory of the output file") - inputFilePtr = flag.String("InputPath", "", "The directory of the input file") - heightPtr = flag.Uint("Height", 0, "Height in pixels") - widthPtr = flag.Uint("Width", 0, "Width in pixels") - frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") - httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") - quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40") - intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send") - verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") - horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: : (port is generally 6970-6999)") logPathPtr = flag.String("LogPath", defaultLogPath, "The log path") configFilePtr = flag.String("ConfigFile", "", "NetSender config file") + rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") + outputPathPtr = flag.String("OutputPath", "", "The directory of the output file") + inputFilePtr = flag.String("InputPath", "", "The directory of the input file") + verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Info, Warning, Error, Fatal") + httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") + quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") sendRetryPtr = flag.Bool("retry", false, "Specify whether a failed send should be retried.") + verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") + horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") + framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent") + bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") + heightPtr = flag.Uint("Height", 0, "Height in pixels") + widthPtr = flag.Uint("Width", 0, "Width in pixels") + frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") + quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40") + intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send") + rotationPtr = flag.Uint("Rotatation", 0, "Rotate video output. (0-359 degrees)") brightnessPtr = flag.Uint("Brightness", 50, "Set Brightness: 0-100 ") saturationPtr = flag.Int("Saturation", 0, "Set Saturation: -100:100") exposurePtr = flag.String("Exposure", "auto", "Set exposure mode: "+strings.Join(revid.ExposureModes[:], ",")) @@ -243,6 +244,7 @@ func handleFlags() revid.Config { } cfg.Quantize = *quantizePtr + cfg.Rotation = *rotationPtr cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipVertical = *verticalFlipPtr cfg.FramesPerClip = *framesPerClipPtr @@ -275,10 +277,10 @@ func run(cfg revid.Config) { readPin := func(pin *netsender.Pin) error { switch { case pin.Name == "X23": - if rv == nil { - pin.Value = -1 + pin.Value = -1 + if rv != nil { + pin.Value = rv.Bitrate() } - pin.Value = rv.Bitrate() case pin.Name[0] == 'X': return sds.ReadSystem(pin) default: diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..6c6977ea --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module bitbucket.org/ausocean/av + +go 1.12 + +require ( + bitbucket.org/ausocean/iot v1.2.4 + bitbucket.org/ausocean/utils v1.2.4 + github.com/BurntSushi/toml v0.3.1 // indirect + github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect + github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 + github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 + github.com/mewkiz/flac v1.0.5 + github.com/pkg/errors v0.8.1 // indirect + github.com/sergi/go-diff v1.0.0 // indirect + github.com/stretchr/testify v1.3.0 // indirect + go.uber.org/atomic v1.3.2 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.9.1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..fd14eb02 --- /dev/null +++ b/go.sum @@ -0,0 +1,45 @@ +bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4= +bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= +bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20= +bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= +github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= +github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c= +github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= +github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig= +github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= +github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw= +github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks= +github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA= +github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s= +github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc= +github.com/mewkiz/flac v1.0.5/go.mod h1:EHZNU32dMF6alpurYyKHDLYpW1lYpBZ5WrXi/VuNIGs= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= +github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/init/Makefile b/init/Makefile index 787d3149..0bbe57c7 100644 --- a/init/Makefile +++ b/init/Makefile @@ -1,18 +1,28 @@ -# install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.) -# NB: the default (soft) install does not override conf files +# Install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.) +# and create a dhcpcd.enter-hook for setting the MAC address. +# MA and DK can be optionally passed to Make, e.g, for a hard (first-time) installation: +# sudo MA=mac DK=dk install_hard +# NB: The default (soft) install does not override conf files. USER := $(shell whoami) PATH := /usr/local/go/bin:$(PATH) +ifeq ($(MA),) + MA := "00:E0:4C:00:00:01" +endif +ifeq ($(DK),) + DK := 0 +endif .SILENT:make_dirs .SILENT:soft_copy_files .SILENT:hard_copy_files +.SILENT:set_mac .SILENT:syncreboot .SILENT:clean -install: as_root make_dirs soft_copy_files syncreboot +install: as_root make_dirs soft_copy_files @echo "Install complete" -install_hard: as_root make_dirs hard_copy_files syncreboot +install_hard: as_root make_dirs hard_copy_files set_mac syncreboot @echo "Hard install complete" as_root: @@ -39,7 +49,7 @@ soft_copy_files: if [ -f /etc/netsender.conf ] ; then \ echo "/etc/netsender.conf left unmodified" ; \ else \ - cp netsender.conf /etc; \ + printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf; \ chown pi /etc/netsender.conf; \ fi @@ -53,9 +63,13 @@ hard_copy_files: echo "Backed up netsender.conf to /etc/netsender.conf.bak"; \ cp /etc/netsender.conf /etc/netsender.conf.bak ; \ fi - cp -f netsender.conf /etc + printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf chown pi /etc/netsender.conf +set_mac: + printf "ip link set eth0 address $(MA)\n" > /etc/dhcpcd.enter-hook + chmod guo+x /etc/dhcpcd.enter-hook + syncreboot: cd ../../utils/cmd/syncreboot; make; make install diff --git a/init/netsender.conf b/init/netsender.conf deleted file mode 100644 index 9706d8c6..00000000 --- a/init/netsender.conf +++ /dev/null @@ -1,4 +0,0 @@ -# /etc/netsender.conf -# replace with the actual MAC address and device key respectively -ma 00:00:00:00:00:00 -dk 0 \ No newline at end of file diff --git a/revid/config.go b/revid/config.go index dab5a565..64183318 100644 --- a/revid/config.go +++ b/revid/config.go @@ -69,6 +69,7 @@ type Config struct { Logger Logger SendRetry bool BurstPeriod uint + Rotation uint Brightness uint Saturation int Exposure string @@ -148,6 +149,7 @@ const ( defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultRtpAddr = "localhost:6970" defaultBurstPeriod = 10 // Seconds + defaultRotation = 0 // Degrees ) // Validate checks for any errors in the config fields and defaults settings @@ -247,6 +249,11 @@ func (c *Config) Validate(r *Revid) error { c.FramesPerClip = defaultFramesPerClip } + if c.Rotation > 359 { + c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation) + c.Rotation = defaultRotation + } + if c.Width == 0 { c.Logger.Log(logger.Info, pkg+"no width defined, defaulting", "width", defaultWidth) c.Width = defaultWidth diff --git a/revid/revid.go b/revid/revid.go index cee8fc0d..cb04733d 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -114,10 +114,6 @@ type Revid struct { // destination is the target endpoint. destination []loadSender - // rtpSender is an unbuffered sender. - // It is used to isolate RTP from ring buffer-induced delays. - rtpSender *rtpSender - // bitrate hold the last send bitrate calculation result. bitrate int @@ -143,40 +139,22 @@ type packer struct { // are deemed to be successful, although a successful // write may include a dropped frame. func (p *packer) Write(frame []byte) (int, error) { - if len(p.owner.destination) != 0 { - n, err := p.owner.buffer.Write(frame) - if err != nil { - if err == ring.ErrDropped { - p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) - return len(frame), nil - } - p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) - return n, err - } + if len(p.owner.destination) == 0 { + panic("must have at least 1 destination") } - // If we have an rtp sender bypass ringbuffer and give straight to sender - if p.owner.rtpSender != nil { - err := p.owner.rtpSender.send(frame) - if err != nil { - p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) + n, err := p.owner.buffer.Write(frame) + if err != nil { + if err == ring.ErrDropped { + p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) + return len(frame), nil } + p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) + return n, err } - p.packetCount++ - var hasRtmp bool - for _, d := range p.owner.config.Outputs { - if d == Rtmp { - hasRtmp = true - break - } - } - now := time.Now() - if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { - p.owner.buffer.Flush() - p.packetCount = 0 - p.lastTime = now - } + p.owner.buffer.Flush() + return len(frame), nil } @@ -232,7 +210,7 @@ func (r *Revid) reset(config Config) error { r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout) } - r.destination = r.destination[:0] + r.destination = make([]loadSender, 0, len(r.config.Outputs)) for _, typ := range r.config.Outputs { switch typ { case File: @@ -241,12 +219,6 @@ func (r *Revid) reset(config Config) error { return err } r.destination = append(r.destination, s) - case FfmpegRtmp: - s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) - if err != nil { - return err - } - r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { @@ -254,18 +226,18 @@ func (r *Revid) reset(config Config) error { } r.destination = append(r.destination, s) case Http: - r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) - case Udp: - s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) + switch r.Config().Packetization { + case Mpegts: + r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)) + default: + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) + } + case Rtp: + s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { return err } r.destination = append(r.destination, s) - case Rtp: - r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) - if err != nil { - return err - } } } @@ -403,23 +375,25 @@ func (r *Revid) Update(vars map[string]string) error { case "Awb": r.config.Awb = value case "Output": - r.config.Outputs = make([]uint8, 1) - // FIXME(kortschak): There can be only one! - // How do we specify outputs after the first? - // - // Maybe we shouldn't be doing this! - switch value { - case "File": - r.config.Outputs[0] = File - case "Http": - r.config.Outputs[0] = Http - case "Rtmp": - r.config.Outputs[0] = Rtmp - case "FfmpegRtmp": - r.config.Outputs[0] = FfmpegRtmp - default: - r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue + outputs := strings.Split(value, ",") + r.config.Outputs = make([]uint8, len(outputs)) + + for i, output := range outputs { + switch output { + case "File": + r.config.Outputs[i] = File + case "Http": + r.config.Outputs[i] = Http + case "Rtmp": + r.config.Outputs[i] = Rtmp + case "FfmpegRtmp": + r.config.Outputs[i] = FfmpegRtmp + case "Rtp": + r.config.Outputs[i] = Rtp + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid output param", "value", value) + continue + } } case "Packetization": @@ -441,6 +415,8 @@ func (r *Revid) Update(vars map[string]string) error { r.config.FramesPerClip = uint(f) case "RtmpUrl": r.config.RtmpUrl = value + case "RtpAddress": + r.config.RtpAddress = value case "Bitrate": v, err := strconv.ParseUint(value, 10, 0) if err != nil { @@ -473,6 +449,13 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.FrameRate = uint(v) + case "Rotation": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil || v > 359 { + r.config.Logger.Log(logger.Warning, pkg+"invalid rotation param", "value", value) + break + } + r.config.Rotation = uint(v) case "HttpAddress": r.config.HttpAddress = value case "Quantization": @@ -489,6 +472,7 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.IntraRefreshPeriod = uint(p) + case "HorizontalFlip": switch strings.ToLower(value) { case "true": @@ -625,6 +609,7 @@ func (r *Revid) startRaspivid() error { "--height", fmt.Sprint(r.config.Height), "--bitrate", fmt.Sprint(r.config.Bitrate), "--framerate", fmt.Sprint(r.config.FrameRate), + "--rotation", fmt.Sprint(r.config.Rotation), "--brightness", fmt.Sprint(r.config.Brightness), "--saturation", fmt.Sprint(r.config.Saturation), "--exposure", fmt.Sprint(r.config.Exposure), @@ -634,9 +619,14 @@ func (r *Revid) startRaspivid() error { if r.config.FlipHorizontal { args = append(args, "--hflip") } + if r.config.FlipVertical { args = append(args, "--vflip") } + if r.config.FlipHorizontal { + args = append(args, "--hflip") + } + switch r.config.InputCodec { default: return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) @@ -683,14 +673,7 @@ func (r *Revid) startV4L() error { "-f", "h264", "-r", fmt.Sprint(r.config.FrameRate), } - switch { - case r.config.FlipHorizontal && r.config.FlipVertical: - args = append(args, "-vf", "hflip,vflip") - case r.config.FlipHorizontal: - args = append(args, "-vf", "hflip") - case r.config.FlipVertical: - args = append(args, "-vf", "vflip") - } + args = append(args, "-b:v", fmt.Sprint(r.config.Bitrate), "-maxrate", fmt.Sprint(r.config.Bitrate), diff --git a/revid/revid_test.go b/revid/revid_test.go new file mode 100644 index 00000000..d88e4e9a --- /dev/null +++ b/revid/revid_test.go @@ -0,0 +1,68 @@ +package revid + +import ( + "fmt" + "os" + "runtime" + "testing" + + "bitbucket.org/ausocean/iot/pi/netsender" +) + +const raspividPath = "/usr/local/bin/raspivid" + +// Suppress all test logging, except for t.Errorf output. +var silent bool + +// TestRaspivid tests that raspivid starts correctly. +// It is intended to be run on a Raspberry Pi. +func TestRaspivid(t *testing.T) { + if _, err := os.Stat(raspividPath); os.IsNotExist(err) { + t.Skip("Skipping TestRaspivid since no raspivid found.") + } + + var logger testLogger + ns, err := netsender.New(&logger, nil, nil, nil) + if err != nil { + t.Errorf("netsender.New failed with error %v", err) + } + + var c Config + c.Logger = &logger + c.Input = Raspivid + c.Outputs = make([]uint8, 1) + + rv, err := New(c, ns) + if err != nil { + t.Errorf("revid.New failed with error %v", err) + } + + err = rv.Start() + if err != nil { + t.Errorf("revid.Start failed with error %v", err) + } +} + +// testLogger implements a netsender.Logger. +type testLogger struct{} + +// SetLevel normally sets the logging level, but it is a no-op in our case. +func (tl *testLogger) SetLevel(level int8) { +} + +// Log requests the Logger to write a message at the given level. +func (tl *testLogger) Log(level int8, msg string, params ...interface{}) { + logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"} + if level < -1 || level > 5 { + panic("Invalid log level") + } + if !silent { + fmt.Printf("%s: %s\n", logLevels[level+1], msg) + } + if level == 5 { + buf := make([]byte, 1<<16) + size := runtime.Stack(buf, true) + fmt.Printf("%s\n", string(buf[:size])) + os.Exit(1) + } +} diff --git a/revid/senders.go b/revid/senders.go index af7657b2..ac76736e 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,14 +29,13 @@ LICENSE package revid import ( - "errors" "fmt" - "io" "net" "os" - "os/exec" "strconv" + "github.com/Comcast/gots/packet" + "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/rtp" @@ -45,6 +44,33 @@ import ( "bitbucket.org/ausocean/utils/ring" ) +// Sender is intended to provided functionality for the sending of a byte slice +// to an implemented destination. +type Sender interface { + // send takes the bytes slice d and sends to a particular destination as + // implemented. + send(d []byte) error +} + +// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. +type minimalHttpSender struct { + client *netsender.Sender + log func(lvl int8, msg string, args ...interface{}) +} + +// newMinimalHttpSender returns a pointer to a new minimalHttpSender. +func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { + return &minimalHttpSender{ + client: ns, + log: log, + } +} + +// send takes a bytes slice d and sends to http using s' http client. +func (s *minimalHttpSender) send(d []byte) error { + return httpSend(d, s.client, s.log) +} + // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. @@ -105,6 +131,86 @@ func (s *fileSender) close() error { return s.file.Close() } +// mtsSender implemented loadSender and provides sending capability specifically +// for use with MPEGTS packetization. It handles the construction of appropriately +// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// setting the discontinuity indicator for the first packet of a clip. +type mtsSender struct { + sender Sender + buf []byte + next []byte + pkt packet.Packet + failed bool + discarded bool + repairer *mts.DiscontinuityRepairer + chunk *ring.Chunk + curPid int +} + +// newMtsSender returns a new mtsSender. +func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { + return &mtsSender{ + sender: s, + repairer: mts.NewDiscontinuityRepairer(), + } +} + +// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and +// assigning to s.curPid. s.next if exists is also appended to the sender buf. +func (s *mtsSender) load(c *ring.Chunk) error { + if s.next != nil { + s.buf = append(s.buf, s.next...) + } + s.chunk = c + bytes := s.chunk.Bytes() + s.next = bytes + copy(s.pkt[:], bytes) + s.curPid = s.pkt.PID() + return nil +} + +// send checks the currently loaded paackets PID; if it is a PAT then what is in +// the mtsSenders buffer is fixed and sent. +func (ms *mtsSender) send() error { + if ms.curPid == mts.PatPid && len(ms.buf) > 0 { + err := ms.fixAndSend() + if err != nil { + return err + } + ms.buf = ms.buf[:0] + } + return nil +} + +// fixAndSend checks for discontinuities in the senders buffer and then sends. +// If a discontinuity is found the PAT packet at the start of the clip has it's +// discontintuity indicator set to true. +func (ms *mtsSender) fixAndSend() error { + err := ms.repairer.Repair(ms.buf) + if err == nil { + err = ms.sender.send(ms.buf) + if err == nil { + return nil + } + } + ms.failed = true + ms.repairer.Failed() + return err +} + +func (s *mtsSender) close() error { return nil } + +// release will set the s.fail flag to false and clear the buffer if +// the previous send was a fail. The currently loaded chunk is also closed. +func (s *mtsSender) release() { + if s.failed { + s.buf = s.buf[:0] + s.failed = false + } + s.chunk.Close() + s.chunk = nil +} + // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender @@ -133,15 +239,19 @@ func (s *httpSender) send() error { // if the chunk has been cleared. return nil } + return httpSend(s.chunk.Bytes(), s.client, s.log) +} + +func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false - ip := s.client.Param("ip") + ip := client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { send = true - pins[i].Value = s.chunk.Len() - pins[i].Data = s.chunk.Bytes() + pins[i].Value = len(d) + pins[i].Data = d pins[i].MimeType = "video/mp2t" break } @@ -152,17 +262,16 @@ func (s *httpSender) send() error { } var err error var reply string - reply, _, err = s.client.Send(netsender.RequestRecv, pins) + reply, _, err = client.Send(netsender.RequestRecv, pins) if err != nil { return err } - - return s.extractMeta(reply) + return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. -func (s *httpSender) extractMeta(r string) error { +func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil @@ -170,18 +279,18 @@ func (s *httpSender) extractMeta(r string) error { // Extract time from reply t, err := dec.Int("ts") if err != nil { - s.log(logger.Warning, pkg+"No timestamp in reply") + log(logger.Warning, pkg+"No timestamp in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) + log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply g, err := dec.String("ll") if err != nil { - s.log(logger.Warning, pkg+"No location in reply") + log(logger.Warning, pkg+"No location in reply") } else { - s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) + log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } @@ -197,59 +306,6 @@ func (s *httpSender) release() { func (s *httpSender) close() error { return nil } -// ffmpegSender implements loadSender for an FFMPEG RTMP destination. -type ffmpegSender struct { - ffmpeg io.WriteCloser - - chunk *ring.Chunk -} - -func newFfmpegSender(url, framerate string) (*ffmpegSender, error) { - cmd := exec.Command(ffmpegPath, - "-f", "h264", - "-r", framerate, - "-i", "-", - "-f", "lavfi", - "-i", "aevalsrc=0", - "-fflags", "nobuffer", - "-vcodec", "copy", - "-acodec", "aac", - "-map", "0:0", - "-map", "1:0", - "-strict", "experimental", - "-f", "flv", - url, - ) - w, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - err = cmd.Start() - if err != nil { - return nil, err - } - return &ffmpegSender{ffmpeg: w}, nil -} - -func (s *ffmpegSender) load(c *ring.Chunk) error { - s.chunk = c - return nil -} - -func (s *ffmpegSender) send() error { - _, err := s.chunk.WriteTo(s.ffmpeg) - return err -} - -func (s *ffmpegSender) release() { - s.chunk.Close() - s.chunk = nil -} - -func (s *ffmpegSender) close() error { - return s.ffmpeg.Close() -} - // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn @@ -332,46 +388,12 @@ func (s *rtmpSender) close() error { return nil } -// udpSender implements loadSender for a native udp destination. -type udpSender struct { - conn net.Conn - log func(lvl int8, msg string, args ...interface{}) - chunk *ring.Chunk -} - -func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) { - conn, err := net.Dial("udp", addr) - if err != nil { - return nil, err - } - return &udpSender{ - conn: conn, - log: log, - }, nil -} - -func (s *udpSender) load(c *ring.Chunk) error { - s.chunk = c - return nil -} - -func (s *udpSender) send() error { - _, err := s.chunk.WriteTo(s.conn) - return err -} - -func (s *udpSender) release() { - s.chunk.Close() - s.chunk = nil -} - -func (s *udpSender) close() error { return nil } - // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder + chunk *ring.Chunk } func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { @@ -386,12 +408,19 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ return s, nil } -func (s *rtpSender) send(d []byte) error { - var err error - if d != nil { - _, err = s.encoder.Write(d) - } else { - err = errors.New("no data to send provided") - } +func (s *rtpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtpSender) close() error { return nil } + +func (s *rtpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *rtpSender) send() error { + _, err := s.chunk.WriteTo(s.encoder) return err } diff --git a/revid/senders_test.go b/revid/senders_test.go new file mode 100644 index 00000000..ad4a18ba --- /dev/null +++ b/revid/senders_test.go @@ -0,0 +1,260 @@ +/* +NAME + mtsSender_test.go + +DESCRIPTION + mtsSender_test.go contains tests that validate the functionalilty of the + mtsSender under senders.go. Tests include checks that the mtsSender is + segmenting sends correctly, and also that it can correct discontinuities. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package revid + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/pes" + + "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/mts/meta" + "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" +) + +// Ring buffer sizes and read/write timeouts. +const ( + rbSize = 100 + rbElementSize = 150000 + wTimeout = 10 * time.Millisecond + rTimeout = 10 * time.Millisecond +) + +// sender simulates sending of video data, creating discontinuities if +// testDiscontinuities is set to true. +type sender struct { + buf [][]byte + testDiscontinuities bool + discontinuityAt int + currentPkt int +} + +// send takes d and neglects if testDiscontinuities is true, returning an error, +// otherwise d is appended to senders buf. +func (ts *sender) send(d []byte) error { + if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { + ts.currentPkt++ + return errors.New("could not send") + } + cpy := make([]byte, len(d)) + copy(cpy, d) + ts.buf = append(ts.buf, cpy) + ts.currentPkt++ + return nil +} + +// log implements the required logging func for some of the structs in use +// within tests. +func log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + fmt.Printf(msg, args) +} + +// buffer implements io.Writer and handles the writing of data to a +// ring buffer used in tests. +type buffer ring.Buffer + +// Write implements the io.Writer interface. +func (b *buffer) Write(d []byte) (int, error) { + r := (*ring.Buffer)(b) + n, err := r.Write(d) + r.Flush() + return n, err +} + +// TestSegment ensures that the mtsSender correctly segments data into clips +// based on positioning of PSI in the mtsEncoder's output stream. +func TestSegment(t *testing.T) { + mts.Meta = meta.New() + + // Create ringBuffer, sender, loadsender and the MPEGTS encoder. + tstSender := &sender{} + loadSender := newMtsSender(tstSender, log) + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + encoder := mts.NewEncoder((*buffer)(rb), 25) + + // Turn time based PSI writing off for encoder. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + // Insert a payload so that we check that the segmentation works correctly + // in this regard. Packet number will be used. + encoder.Encode([]byte{byte(i)}) + rb.Flush() + + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } + + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + err = loadSender.send() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + loadSender.release() + } + } + + result := tstSender.buf + expectData := 0 + for clipNo, clip := range result { + t.Logf("Checking clip: %v\n", clipNo) + + // Check that the clip is of expected length. + clipLen := len(clip) + if clipLen != psiSendCount*mts.PacketSize { + t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip) + } + + // Also check that the first packet is a PAT. + firstPkt := clip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + pid := pkt.PID() + if pid != mts.PatPid { + t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid) + } + + // Check that the clip data is okay. + for i := 0; i < len(clip); i += mts.PacketSize { + copy(pkt[:], clip[i:i+mts.PacketSize]) + if pkt.PID() == mts.VideoPid { + payload, err := pkt.Payload() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Parse PES from the MTS payload. + pes, err := pes.NewPESHeader(payload) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + // Get the data from the PES packet and convert to an int. + data := int8(pes.Data()[0]) + + // Calc expected data in the PES and then check. + if data != int8(expectData) { + t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData) + } + expectData++ + } + } + } +} + +func TestSendFailDiscontinuity(t *testing.T) { + mts.Meta = meta.New() + + // Create ringBuffer sender, loadSender and the MPEGTS encoder. + const clipWithDiscontinuity = 3 + tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} + loadSender := newMtsSender(tstSender, log) + rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) + encoder := mts.NewEncoder((*buffer)(rb), 25) + + // Turn time based PSI writing off for encoder. + const psiSendCount = 10 + encoder.TimeBasedPsi(false, psiSendCount) + + const noOfPacketsToWrite = 100 + for i := 0; i < noOfPacketsToWrite; i++ { + // Our payload will just be packet number. + encoder.Encode([]byte{byte(i)}) + rb.Flush() + + for { + next, err := rb.Next(rTimeout) + if err != nil { + break + } + + err = loadSender.load(next) + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + loadSender.send() + loadSender.release() + } + } + + result := tstSender.buf + + // First check that we have less clips as expected. + expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 + gotLen := len(result) + if gotLen != expectedLen { + t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen) + } + + // Now check that the discontinuity indicator is set at the discontinuityClip PAT. + disconClip := result[clipWithDiscontinuity] + firstPkt := disconClip[:mts.PacketSize] + var pkt packet.Packet + copy(pkt[:], firstPkt) + discon, err := (*packet.AdaptationField)(&pkt).Discontinuity() + if err != nil { + t.Fatalf("Unexpected err: %v\n", err) + } + + if !discon { + t.Fatalf("Did not get discontinuity indicator for PAT") + } + +} diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index 4fea7d82..132c9acd 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -75,28 +75,32 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp port = uint16(pi) } - if !path.IsAbs(u.Path) { - return protocol, host, port, app, playpath, nil - } - elems := strings.SplitN(u.Path[1:], "/", 3) - app = elems[0] - if app == "" { + if len(u.Path) < 1 || !path.IsAbs(u.Path) { return protocol, host, port, app, playpath, errInvalidURL } - playpath = elems[1] - if len(elems) == 3 && len(elems[2]) != 0 { - playpath = path.Join(elems[1:]...) - switch ext := path.Ext(playpath); ext { - case ".f4v", ".mp4": - playpath = "mp4:" + playpath[:len(playpath)-len(ext)] - case ".mp3": - playpath = "mp3:" + playpath[:len(playpath)-len(ext)] - case ".flv": - playpath = playpath[:len(playpath)-len(ext)] + elems := strings.SplitN(u.Path[1:], "/", 3) + if len(elems) < 2 || elems[0] == "" || elems[1] == "" { + return protocol, host, port, app, playpath, errInvalidURL + } + app = elems[0] + playpath = path.Join(elems[1:]...) + + switch ext := path.Ext(playpath); ext { + case ".f4v", ".mp4": + playpath = playpath[:len(playpath)-len(ext)] + if !strings.HasPrefix(playpath, "mp4:") { + playpath = "mp4:" + playpath } - if u.RawQuery != "" { - playpath += "?" + u.RawQuery + case ".mp3": + playpath = playpath[:len(playpath)-len(ext)] + if !strings.HasPrefix(playpath, "mp3:") { + playpath = "mp3:" + playpath } + case ".flv": + playpath = playpath[:len(playpath)-len(ext)] + } + if u.RawQuery != "" { + playpath += "?" + u.RawQuery } switch { diff --git a/rtmp/parseurl_test.go b/rtmp/parseurl_test.go new file mode 100644 index 00000000..47743693 --- /dev/null +++ b/rtmp/parseurl_test.go @@ -0,0 +1,200 @@ +/* +NAME + parseurl_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Dan Kortschak + +LICENSE + parseurl.go is Copyright (C) 2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package rtmp + +import ( + "testing" +) + +var parseURLTests = []struct { + url string + wantProtocol int32 + wantHost string + wantPort uint16 + wantApp string + wantPlaypath string + wantErr error +}{ + { + url: "rtmp://addr", + wantErr: errInvalidURL, + }, + { + url: "rtmp://addr/", + wantErr: errInvalidURL, + }, + { + url: "rtmp://addr/live2", + wantErr: errInvalidURL, + }, + { + url: "rtmp://addr/live2/", + wantErr: errInvalidURL, + }, + { + url: "rtmp://addr/appname/key", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "key", + }, + { + url: "rtmp://addr/appname/instancename", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "instancename", + }, + { + url: "rtmp://addr/appname/instancename/", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "instancename", + }, + { + url: "rtmp://addr/appname/mp4:path.f4v", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path", + }, + { + url: "rtmp://addr/appname/mp4:path.f4v?param1=value1¶m2=value2", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path?param1=value1¶m2=value2", + }, + { + url: "rtmp://addr/appname/mp4:path/to/file.f4v", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path/to/file", + }, + { + url: "rtmp://addr/appname/mp4:path/to/file.f4v?param1=value1¶m2=value2", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path/to/file?param1=value1¶m2=value2", + }, + { + url: "rtmp://addr/appname/path.mp4", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path", + }, + { + url: "rtmp://addr/appname/path.mp4?param1=value1¶m2=value2", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path?param1=value1¶m2=value2", + }, + { + url: "rtmp://addr/appname/path/to/file.mp4", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path/to/file", + }, + { + url: "rtmp://addr/appname/path/to/file.mp4?param1=value1¶m2=value2", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "mp4:path/to/file?param1=value1¶m2=value2", + }, + { + url: "rtmp://addr/appname/path.flv", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "path", + }, + { + url: "rtmp://addr/appname/path.flv?param1=value1¶m2=value2", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "path?param1=value1¶m2=value2", + }, + { + url: "rtmp://addr/appname/path/to/file.flv", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "path/to/file", + }, + { + url: "rtmp://addr/appname/path/to/file.flv?param1=value1¶m2=value2", + wantHost: "addr", + wantPort: 1935, + wantApp: "appname", + wantPlaypath: "path/to/file?param1=value1¶m2=value2", + }, +} + +func TestParseURL(t *testing.T) { + for _, test := range parseURLTests { + func() { + defer func() { + p := recover() + if p != nil { + t.Errorf("unexpected panic for %q: %v", test.url, p) + } + }() + + protocol, host, port, app, playpath, err := parseURL(test.url) + if err != test.wantErr { + t.Errorf("unexpected error for %q: got:%v want:%v", test.url, err, test.wantErr) + return + } + if err != nil { + return + } + if protocol != test.wantProtocol { + t.Errorf("unexpected protocol for %q: got:%v want:%v", test.url, protocol, test.wantProtocol) + } + if host != test.wantHost { + t.Errorf("unexpected host for %q: got:%v want:%v", test.url, host, test.wantHost) + } + if port != test.wantPort { + t.Errorf("unexpected port for %q: got:%v want:%v", test.url, port, test.wantPort) + } + if app != test.wantApp { + t.Errorf("unexpected app for %q: got:%v want:%v", test.url, app, test.wantApp) + } + if playpath != test.wantPlaypath { + t.Errorf("unexpected playpath for %q: got:%v want:%v", test.url, playpath, test.wantPlaypath) + } + }() + } +} diff --git a/stream/mts/discontinuity.go b/stream/mts/discontinuity.go new file mode 100644 index 00000000..adccebad --- /dev/null +++ b/stream/mts/discontinuity.go @@ -0,0 +1,109 @@ +/* +NAME + discontinuity.go + +DESCRIPTION + discontinuity.go provides functionality for detecting discontinuities in + mpegts and accounting for using the discontinuity indicator in the adaptation + field. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + discontinuity.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package mts + +import ( + "github.com/Comcast/gots/packet" +) + +// discontinuityRepairer provides function to detect discontinuities in mpegts +// and set the discontinuity indicator as appropriate. +type DiscontinuityRepairer struct { + expCC map[int]int +} + +// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer. +func NewDiscontinuityRepairer() *DiscontinuityRepairer { + return &DiscontinuityRepairer{ + expCC: map[int]int{ + PatPid: 16, + PmtPid: 16, + VideoPid: 16, + }, + } +} + +// Failed is to be called in the case of a failed send. This will decrement the +// expectedCC so that it aligns with the failed chunks cc. +func (dr *DiscontinuityRepairer) Failed() { + dr.decExpectedCC(PatPid) +} + +// Repair takes a clip of mpegts and checks that the first packet, which should +// be a PAT, contains a cc that is expected, otherwise the discontinuity indicator +// is set to true. +func (dr *DiscontinuityRepairer) Repair(d []byte) error { + var pkt packet.Packet + copy(pkt[:], d[:PacketSize]) + pid := pkt.PID() + if pid != PatPid { + panic("Clip to repair must have PAT first") + } + cc := pkt.ContinuityCounter() + expect, _ := dr.ExpectedCC(pid) + if cc != int(expect) { + if packet.ContainsAdaptationField(&pkt) { + (*packet.AdaptationField)(&pkt).SetDiscontinuity(true) + } else { + err := addAdaptationField(&pkt, DiscontinuityIndicator(true)) + if err != nil { + return err + } + } + dr.SetExpectedCC(pid, cc) + copy(d[:PacketSize], pkt[:]) + } + dr.IncExpectedCC(pid) + return nil +} + +// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16 +// and false is returned. +func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) { + if dr.expCC[pid] == 16 { + return 16, false + } + return dr.expCC[pid], true +} + +// incExpectedCC increments the expected cc. +func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) { + dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf +} + +// decExpectedCC decrements the expected cc. +func (dr *DiscontinuityRepairer) decExpectedCC(pid int) { + dr.expCC[pid] = (dr.expCC[pid] - 1) & 0xf +} + +// setExpectedCC sets the expected cc. +func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) { + dr.expCC[pid] = cc +} diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 6ac11cc3..b1e098a4 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -85,7 +85,8 @@ var ( ) const ( - psiInterval = 1 * time.Second + psiInterval = 1 * time.Second + psiSendCount = 7 ) // Meta allows addition of metadata to encoded mts from outside of this pkg. @@ -130,6 +131,10 @@ type Encoder struct { continuity map[int]byte + timeBasedPsi bool + pktCount int + psiSendCount int + psiLastTime time.Time } @@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { frameInterval: time.Duration(float64(time.Second) / fps), ptsOffset: ptsOffset, + timeBasedPsi: true, + + pktCount: 8, + continuity: map[int]byte{ patPid: 0, pmtPid: 0, @@ -159,11 +168,22 @@ const ( hasPTS = 0x2 ) +// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if +// PSI is written based on some time duration, or based on a packet count. +// If b is true, then time based PSI is used, otherwise the PSI is written +// every sendCount. +func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { + e.timeBasedPsi = b + e.psiSendCount = sendCount + e.pktCount = e.psiSendCount +} + // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { now := time.Now() - if now.Sub(e.psiLastTime) > psiInterval { + if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { + e.pktCount = 0 err := e.writePSI() if err != nil { return err @@ -204,6 +224,7 @@ func (e *Encoder) Encode(nalu []byte) error { if err != nil { return err } + e.pktCount++ } e.tick() @@ -226,6 +247,7 @@ func (e *Encoder) writePSI() error { if err != nil { return err } + e.pktCount++ pmtTable, err = updateMeta(pmtTable) if err != nil { return err @@ -243,6 +265,7 @@ func (e *Encoder) writePSI() error { if err != nil { return err } + e.pktCount++ return nil } diff --git a/stream/mts/meta/meta.go b/stream/mts/meta/meta.go index 481b5ae5..188c2d4e 100644 --- a/stream/mts/meta/meta.go +++ b/stream/mts/meta/meta.go @@ -144,6 +144,9 @@ func (m *Data) Delete(key string) { // Encode takes the meta data map and encodes into a byte slice with header // describing the version, length of data and data in TSV format. func (m *Data) Encode() []byte { + if m.enc == nil { + panic("Meta has not been initialized yet") + } m.enc = m.enc[:headSize] // Iterate over map and append entries, only adding tab if we're not on the diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 705f88de..18fe8b5f 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -31,6 +31,8 @@ package mts import ( "errors" "fmt" + + "github.com/Comcast/gots/packet" ) // General mpegts packet properties. @@ -55,11 +57,14 @@ const HeadSize = 4 // Consts relating to adaptation field. const ( - AdaptationIdx = 4 // Index to the adaptation field (index of AFL). - AdaptationControlIdx = 3 // Index to octet with adaptation field control. - AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. - DefaultAdaptationSize = 2 // Default size of the adaptation field. - AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. + AdaptationIdx = 4 // Index to the adaptation field (index of AFL). + AdaptationControlIdx = 3 // Index to octet with adaptation field control. + AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. + DefaultAdaptationSize = 2 // Default size of the adaptation field. + AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. + DefaultAdaptationBodySize = 1 // Default size of the adaptation field body. + DiscontinuityIndicatorMask = 0x80 // Mask for the discontinuity indicator at the discontinuity indicator idk. + DiscontinuityIndicatorIdx = AdaptationIdx + 1 // The index at which the discontinuity indicator is found in an MTS packet. ) // TODO: make this better - currently doesn't make sense. @@ -257,3 +262,52 @@ func (p *Packet) Bytes(buf []byte) []byte { buf = append(buf, p.Payload...) return buf } + +type Option func(p *packet.Packet) + +// addAdaptationField adds an adaptation field to p, and applys the passed options to this field. +// TODO: this will probably break if we already have adaptation field. +func addAdaptationField(p *packet.Packet, options ...Option) error { + if packet.ContainsAdaptationField((*packet.Packet)(p)) { + return errors.New("Adaptation field is already present in packet") + } + // Create space for adaptation field. + copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize]) + + // TODO: seperate into own function + // Update adaptation field control. + p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask + p[AdaptationControlIdx] |= AdaptationControlMask + // Default the adaptationfield. + resetAdaptation(p) + + // Apply and options that have bee passed. + for _, option := range options { + option(p) + } + return nil +} + +// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field +// exists, otherwise an error is returned. +func resetAdaptation(p *packet.Packet) error { + if !packet.ContainsAdaptationField((*packet.Packet)(p)) { + return errors.New("No adaptation field in this packet") + } + p[AdaptationIdx] = DefaultAdaptationBodySize + p[AdaptationIdx+1] = 0x00 + return nil +} + +// DiscontinuityIndicator returns and Option that will set p's discontinuity +// indicator according to f. +func DiscontinuityIndicator(f bool) Option { + return func(p *packet.Packet) { + set := byte(DiscontinuityIndicatorMask) + if !f { + set = 0x00 + } + p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask + p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set + } +}