mirror of https://bitbucket.org/ausocean/av.git
revid: resolved conflicts
This commit is contained in:
commit
524989d2ed
|
@ -3,7 +3,9 @@ jobs:
|
|||
build:
|
||||
docker:
|
||||
# CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/
|
||||
- image: circleci/golang:1.11
|
||||
- image: circleci/golang:1.12
|
||||
environment:
|
||||
GO111MODULE: "on"
|
||||
|
||||
working_directory: /go/src/bitbucket.org/ausocean/av
|
||||
|
||||
|
|
|
@ -109,25 +109,26 @@ func handleFlags() revid.Config {
|
|||
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
|
||||
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
|
||||
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
|
||||
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
|
||||
verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Info, Warning, Error, Fatal")
|
||||
framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
|
||||
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
|
||||
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
|
||||
outputPathPtr = flag.String("OutputPath", "", "The directory of the output file")
|
||||
inputFilePtr = flag.String("InputPath", "", "The directory of the input file")
|
||||
heightPtr = flag.Uint("Height", 0, "Height in pixels")
|
||||
widthPtr = flag.Uint("Width", 0, "Width in pixels")
|
||||
frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
|
||||
httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts")
|
||||
quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40")
|
||||
intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send")
|
||||
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
|
||||
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
|
||||
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
|
||||
logPathPtr = flag.String("LogPath", defaultLogPath, "The log path")
|
||||
configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
|
||||
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
|
||||
outputPathPtr = flag.String("OutputPath", "", "The directory of the output file")
|
||||
inputFilePtr = flag.String("InputPath", "", "The directory of the input file")
|
||||
verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Info, Warning, Error, Fatal")
|
||||
httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts")
|
||||
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
|
||||
sendRetryPtr = flag.Bool("retry", false, "Specify whether a failed send should be retried.")
|
||||
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
|
||||
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
|
||||
framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
|
||||
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
|
||||
heightPtr = flag.Uint("Height", 0, "Height in pixels")
|
||||
widthPtr = flag.Uint("Width", 0, "Width in pixels")
|
||||
frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
|
||||
quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40")
|
||||
intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send")
|
||||
rotationPtr = flag.Uint("Rotatation", 0, "Rotate video output. (0-359 degrees)")
|
||||
brightnessPtr = flag.Uint("Brightness", 50, "Set Brightness: 0-100 ")
|
||||
saturationPtr = flag.Int("Saturation", 0, "Set Saturation: -100:100")
|
||||
exposurePtr = flag.String("Exposure", "auto", "Set exposure mode: "+strings.Join(revid.ExposureModes[:], ","))
|
||||
|
@ -243,6 +244,7 @@ func handleFlags() revid.Config {
|
|||
}
|
||||
|
||||
cfg.Quantize = *quantizePtr
|
||||
cfg.Rotation = *rotationPtr
|
||||
cfg.FlipHorizontal = *horizontalFlipPtr
|
||||
cfg.FlipVertical = *verticalFlipPtr
|
||||
cfg.FramesPerClip = *framesPerClipPtr
|
||||
|
@ -275,10 +277,10 @@ func run(cfg revid.Config) {
|
|||
readPin := func(pin *netsender.Pin) error {
|
||||
switch {
|
||||
case pin.Name == "X23":
|
||||
if rv == nil {
|
||||
pin.Value = -1
|
||||
pin.Value = -1
|
||||
if rv != nil {
|
||||
pin.Value = rv.Bitrate()
|
||||
}
|
||||
pin.Value = rv.Bitrate()
|
||||
case pin.Name[0] == 'X':
|
||||
return sds.ReadSystem(pin)
|
||||
default:
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
module bitbucket.org/ausocean/av
|
||||
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
bitbucket.org/ausocean/iot v1.2.4
|
||||
bitbucket.org/ausocean/utils v1.2.4
|
||||
github.com/BurntSushi/toml v0.3.1 // indirect
|
||||
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect
|
||||
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
|
||||
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884
|
||||
github.com/mewkiz/flac v1.0.5
|
||||
github.com/pkg/errors v0.8.1 // indirect
|
||||
github.com/sergi/go-diff v1.0.0 // indirect
|
||||
github.com/stretchr/testify v1.3.0 // indirect
|
||||
go.uber.org/atomic v1.3.2 // indirect
|
||||
go.uber.org/multierr v1.1.0 // indirect
|
||||
go.uber.org/zap v1.9.1 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.2.2 // indirect
|
||||
)
|
|
@ -0,0 +1,45 @@
|
|||
bitbucket.org/ausocean/iot v1.2.4 h1:M/473iQ0d4q+76heerjAQuqXzQyc5dZ3F7Bfuq6X7q4=
|
||||
bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU=
|
||||
bitbucket.org/ausocean/utils v1.2.4 h1:/Kc7RflLH8eXP5j/5gNRJW18pnyRhu/Hkf2SvIZPm20=
|
||||
bitbucket.org/ausocean/utils v1.2.4/go.mod h1:5JIXFTAMMNl5Ob79tpZfDCJ+gOO8rj7v4ORj56tHZpw=
|
||||
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=
|
||||
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7/go.mod h1:O5HA0jgDXkBp+jw0770QNBT8fsRJCbH7JXmM7wxLUBU=
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c=
|
||||
github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
|
||||
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig=
|
||||
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
|
||||
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 h1:2TaXIaVA4ff/MHHezOj83tCypALTFAcXOImcFWNa3jw=
|
||||
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884/go.mod h1:UiqzUyfX0zs3pJ/DPyvS5v8sN6s5bXPUDDIVA5v8dks=
|
||||
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 h1:Hc1iKlyxNHp3CV59G2E/qabUkHvEwOIJxDK0CJ7CRjA=
|
||||
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21/go.mod h1:LlQmBGkOuV/SKzEDXBPKauvN2UqCgzXO2XjecTGj40s=
|
||||
github.com/mewkiz/flac v1.0.5 h1:dHGW/2kf+/KZ2GGqSVayNEhL9pluKn/rr/h/QqD9Ogc=
|
||||
github.com/mewkiz/flac v1.0.5/go.mod h1:EHZNU32dMF6alpurYyKHDLYpW1lYpBZ5WrXi/VuNIGs=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
|
||||
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
|
||||
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
|
||||
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|
@ -1,18 +1,28 @@
|
|||
# install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.)
|
||||
# NB: the default (soft) install does not override conf files
|
||||
# Install files and directories required by NetSender clients (such as gpio-netsender, revid-cli, etc.)
|
||||
# and create a dhcpcd.enter-hook for setting the MAC address.
|
||||
# MA and DK can be optionally passed to Make, e.g, for a hard (first-time) installation:
|
||||
# sudo MA=mac DK=dk install_hard
|
||||
# NB: The default (soft) install does not override conf files.
|
||||
USER := $(shell whoami)
|
||||
PATH := /usr/local/go/bin:$(PATH)
|
||||
ifeq ($(MA),)
|
||||
MA := "00:E0:4C:00:00:01"
|
||||
endif
|
||||
ifeq ($(DK),)
|
||||
DK := 0
|
||||
endif
|
||||
|
||||
.SILENT:make_dirs
|
||||
.SILENT:soft_copy_files
|
||||
.SILENT:hard_copy_files
|
||||
.SILENT:set_mac
|
||||
.SILENT:syncreboot
|
||||
.SILENT:clean
|
||||
|
||||
install: as_root make_dirs soft_copy_files syncreboot
|
||||
install: as_root make_dirs soft_copy_files
|
||||
@echo "Install complete"
|
||||
|
||||
install_hard: as_root make_dirs hard_copy_files syncreboot
|
||||
install_hard: as_root make_dirs hard_copy_files set_mac syncreboot
|
||||
@echo "Hard install complete"
|
||||
|
||||
as_root:
|
||||
|
@ -39,7 +49,7 @@ soft_copy_files:
|
|||
if [ -f /etc/netsender.conf ] ; then \
|
||||
echo "/etc/netsender.conf left unmodified" ; \
|
||||
else \
|
||||
cp netsender.conf /etc; \
|
||||
printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf; \
|
||||
chown pi /etc/netsender.conf; \
|
||||
fi
|
||||
|
||||
|
@ -53,9 +63,13 @@ hard_copy_files:
|
|||
echo "Backed up netsender.conf to /etc/netsender.conf.bak"; \
|
||||
cp /etc/netsender.conf /etc/netsender.conf.bak ; \
|
||||
fi
|
||||
cp -f netsender.conf /etc
|
||||
printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf
|
||||
chown pi /etc/netsender.conf
|
||||
|
||||
set_mac:
|
||||
printf "ip link set eth0 address $(MA)\n" > /etc/dhcpcd.enter-hook
|
||||
chmod guo+x /etc/dhcpcd.enter-hook
|
||||
|
||||
syncreboot:
|
||||
cd ../../utils/cmd/syncreboot; make; make install
|
||||
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
# /etc/netsender.conf
|
||||
# replace with the actual MAC address and device key respectively
|
||||
ma 00:00:00:00:00:00
|
||||
dk 0
|
|
@ -69,6 +69,7 @@ type Config struct {
|
|||
Logger Logger
|
||||
SendRetry bool
|
||||
BurstPeriod uint
|
||||
Rotation uint
|
||||
Brightness uint
|
||||
Saturation int
|
||||
Exposure string
|
||||
|
@ -148,6 +149,7 @@ const (
|
|||
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
|
||||
defaultRtpAddr = "localhost:6970"
|
||||
defaultBurstPeriod = 10 // Seconds
|
||||
defaultRotation = 0 // Degrees
|
||||
)
|
||||
|
||||
// Validate checks for any errors in the config fields and defaults settings
|
||||
|
@ -247,6 +249,11 @@ func (c *Config) Validate(r *Revid) error {
|
|||
c.FramesPerClip = defaultFramesPerClip
|
||||
}
|
||||
|
||||
if c.Rotation > 359 {
|
||||
c.Logger.Log(logger.Warning, pkg+"bad rotate angle, defaulting", "angle", defaultRotation)
|
||||
c.Rotation = defaultRotation
|
||||
}
|
||||
|
||||
if c.Width == 0 {
|
||||
c.Logger.Log(logger.Info, pkg+"no width defined, defaulting", "width", defaultWidth)
|
||||
c.Width = defaultWidth
|
||||
|
|
129
revid/revid.go
129
revid/revid.go
|
@ -114,10 +114,6 @@ type Revid struct {
|
|||
// destination is the target endpoint.
|
||||
destination []loadSender
|
||||
|
||||
// rtpSender is an unbuffered sender.
|
||||
// It is used to isolate RTP from ring buffer-induced delays.
|
||||
rtpSender *rtpSender
|
||||
|
||||
// bitrate hold the last send bitrate calculation result.
|
||||
bitrate int
|
||||
|
||||
|
@ -143,40 +139,22 @@ type packer struct {
|
|||
// are deemed to be successful, although a successful
|
||||
// write may include a dropped frame.
|
||||
func (p *packer) Write(frame []byte) (int, error) {
|
||||
if len(p.owner.destination) != 0 {
|
||||
n, err := p.owner.buffer.Write(frame)
|
||||
if err != nil {
|
||||
if err == ring.ErrDropped {
|
||||
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
|
||||
return len(frame), nil
|
||||
}
|
||||
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
|
||||
return n, err
|
||||
}
|
||||
if len(p.owner.destination) == 0 {
|
||||
panic("must have at least 1 destination")
|
||||
}
|
||||
|
||||
// If we have an rtp sender bypass ringbuffer and give straight to sender
|
||||
if p.owner.rtpSender != nil {
|
||||
err := p.owner.rtpSender.send(frame)
|
||||
if err != nil {
|
||||
p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error())
|
||||
n, err := p.owner.buffer.Write(frame)
|
||||
if err != nil {
|
||||
if err == ring.ErrDropped {
|
||||
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
|
||||
return len(frame), nil
|
||||
}
|
||||
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
|
||||
return n, err
|
||||
}
|
||||
|
||||
p.packetCount++
|
||||
var hasRtmp bool
|
||||
for _, d := range p.owner.config.Outputs {
|
||||
if d == Rtmp {
|
||||
hasRtmp = true
|
||||
break
|
||||
}
|
||||
}
|
||||
now := time.Now()
|
||||
if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
|
||||
p.owner.buffer.Flush()
|
||||
p.packetCount = 0
|
||||
p.lastTime = now
|
||||
}
|
||||
p.owner.buffer.Flush()
|
||||
|
||||
return len(frame), nil
|
||||
}
|
||||
|
||||
|
@ -232,7 +210,7 @@ func (r *Revid) reset(config Config) error {
|
|||
r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout)
|
||||
}
|
||||
|
||||
r.destination = r.destination[:0]
|
||||
r.destination = make([]loadSender, 0, len(r.config.Outputs))
|
||||
for _, typ := range r.config.Outputs {
|
||||
switch typ {
|
||||
case File:
|
||||
|
@ -241,12 +219,6 @@ func (r *Revid) reset(config Config) error {
|
|||
return err
|
||||
}
|
||||
r.destination = append(r.destination, s)
|
||||
case FfmpegRtmp:
|
||||
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.destination = append(r.destination, s)
|
||||
case Rtmp:
|
||||
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
|
||||
if err != nil {
|
||||
|
@ -254,18 +226,18 @@ func (r *Revid) reset(config Config) error {
|
|||
}
|
||||
r.destination = append(r.destination, s)
|
||||
case Http:
|
||||
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
|
||||
case Udp:
|
||||
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
|
||||
switch r.Config().Packetization {
|
||||
case Mpegts:
|
||||
r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil))
|
||||
default:
|
||||
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
|
||||
}
|
||||
case Rtp:
|
||||
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.destination = append(r.destination, s)
|
||||
case Rtp:
|
||||
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -403,23 +375,25 @@ func (r *Revid) Update(vars map[string]string) error {
|
|||
case "Awb":
|
||||
r.config.Awb = value
|
||||
case "Output":
|
||||
r.config.Outputs = make([]uint8, 1)
|
||||
// FIXME(kortschak): There can be only one!
|
||||
// How do we specify outputs after the first?
|
||||
//
|
||||
// Maybe we shouldn't be doing this!
|
||||
switch value {
|
||||
case "File":
|
||||
r.config.Outputs[0] = File
|
||||
case "Http":
|
||||
r.config.Outputs[0] = Http
|
||||
case "Rtmp":
|
||||
r.config.Outputs[0] = Rtmp
|
||||
case "FfmpegRtmp":
|
||||
r.config.Outputs[0] = FfmpegRtmp
|
||||
default:
|
||||
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
|
||||
continue
|
||||
outputs := strings.Split(value, ",")
|
||||
r.config.Outputs = make([]uint8, len(outputs))
|
||||
|
||||
for i, output := range outputs {
|
||||
switch output {
|
||||
case "File":
|
||||
r.config.Outputs[i] = File
|
||||
case "Http":
|
||||
r.config.Outputs[i] = Http
|
||||
case "Rtmp":
|
||||
r.config.Outputs[i] = Rtmp
|
||||
case "FfmpegRtmp":
|
||||
r.config.Outputs[i] = FfmpegRtmp
|
||||
case "Rtp":
|
||||
r.config.Outputs[i] = Rtp
|
||||
default:
|
||||
r.config.Logger.Log(logger.Warning, pkg+"invalid output param", "value", value)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
case "Packetization":
|
||||
|
@ -441,6 +415,8 @@ func (r *Revid) Update(vars map[string]string) error {
|
|||
r.config.FramesPerClip = uint(f)
|
||||
case "RtmpUrl":
|
||||
r.config.RtmpUrl = value
|
||||
case "RtpAddress":
|
||||
r.config.RtpAddress = value
|
||||
case "Bitrate":
|
||||
v, err := strconv.ParseUint(value, 10, 0)
|
||||
if err != nil {
|
||||
|
@ -473,6 +449,13 @@ func (r *Revid) Update(vars map[string]string) error {
|
|||
break
|
||||
}
|
||||
r.config.FrameRate = uint(v)
|
||||
case "Rotation":
|
||||
v, err := strconv.ParseUint(value, 10, 0)
|
||||
if err != nil || v > 359 {
|
||||
r.config.Logger.Log(logger.Warning, pkg+"invalid rotation param", "value", value)
|
||||
break
|
||||
}
|
||||
r.config.Rotation = uint(v)
|
||||
case "HttpAddress":
|
||||
r.config.HttpAddress = value
|
||||
case "Quantization":
|
||||
|
@ -489,6 +472,7 @@ func (r *Revid) Update(vars map[string]string) error {
|
|||
break
|
||||
}
|
||||
r.config.IntraRefreshPeriod = uint(p)
|
||||
|
||||
case "HorizontalFlip":
|
||||
switch strings.ToLower(value) {
|
||||
case "true":
|
||||
|
@ -625,6 +609,7 @@ func (r *Revid) startRaspivid() error {
|
|||
"--height", fmt.Sprint(r.config.Height),
|
||||
"--bitrate", fmt.Sprint(r.config.Bitrate),
|
||||
"--framerate", fmt.Sprint(r.config.FrameRate),
|
||||
"--rotation", fmt.Sprint(r.config.Rotation),
|
||||
"--brightness", fmt.Sprint(r.config.Brightness),
|
||||
"--saturation", fmt.Sprint(r.config.Saturation),
|
||||
"--exposure", fmt.Sprint(r.config.Exposure),
|
||||
|
@ -634,9 +619,14 @@ func (r *Revid) startRaspivid() error {
|
|||
if r.config.FlipHorizontal {
|
||||
args = append(args, "--hflip")
|
||||
}
|
||||
|
||||
if r.config.FlipVertical {
|
||||
args = append(args, "--vflip")
|
||||
}
|
||||
if r.config.FlipHorizontal {
|
||||
args = append(args, "--hflip")
|
||||
}
|
||||
|
||||
switch r.config.InputCodec {
|
||||
default:
|
||||
return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec)
|
||||
|
@ -683,14 +673,7 @@ func (r *Revid) startV4L() error {
|
|||
"-f", "h264",
|
||||
"-r", fmt.Sprint(r.config.FrameRate),
|
||||
}
|
||||
switch {
|
||||
case r.config.FlipHorizontal && r.config.FlipVertical:
|
||||
args = append(args, "-vf", "hflip,vflip")
|
||||
case r.config.FlipHorizontal:
|
||||
args = append(args, "-vf", "hflip")
|
||||
case r.config.FlipVertical:
|
||||
args = append(args, "-vf", "vflip")
|
||||
}
|
||||
|
||||
args = append(args,
|
||||
"-b:v", fmt.Sprint(r.config.Bitrate),
|
||||
"-maxrate", fmt.Sprint(r.config.Bitrate),
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package revid
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||
)
|
||||
|
||||
const raspividPath = "/usr/local/bin/raspivid"
|
||||
|
||||
// Suppress all test logging, except for t.Errorf output.
|
||||
var silent bool
|
||||
|
||||
// TestRaspivid tests that raspivid starts correctly.
|
||||
// It is intended to be run on a Raspberry Pi.
|
||||
func TestRaspivid(t *testing.T) {
|
||||
if _, err := os.Stat(raspividPath); os.IsNotExist(err) {
|
||||
t.Skip("Skipping TestRaspivid since no raspivid found.")
|
||||
}
|
||||
|
||||
var logger testLogger
|
||||
ns, err := netsender.New(&logger, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("netsender.New failed with error %v", err)
|
||||
}
|
||||
|
||||
var c Config
|
||||
c.Logger = &logger
|
||||
c.Input = Raspivid
|
||||
c.Outputs = make([]uint8, 1)
|
||||
|
||||
rv, err := New(c, ns)
|
||||
if err != nil {
|
||||
t.Errorf("revid.New failed with error %v", err)
|
||||
}
|
||||
|
||||
err = rv.Start()
|
||||
if err != nil {
|
||||
t.Errorf("revid.Start failed with error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// testLogger implements a netsender.Logger.
|
||||
type testLogger struct{}
|
||||
|
||||
// SetLevel normally sets the logging level, but it is a no-op in our case.
|
||||
func (tl *testLogger) SetLevel(level int8) {
|
||||
}
|
||||
|
||||
// Log requests the Logger to write a message at the given level.
|
||||
func (tl *testLogger) Log(level int8, msg string, params ...interface{}) {
|
||||
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
|
||||
if level < -1 || level > 5 {
|
||||
panic("Invalid log level")
|
||||
}
|
||||
if !silent {
|
||||
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
|
||||
}
|
||||
if level == 5 {
|
||||
buf := make([]byte, 1<<16)
|
||||
size := runtime.Stack(buf, true)
|
||||
fmt.Printf("%s\n", string(buf[:size]))
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
247
revid/senders.go
247
revid/senders.go
|
@ -29,14 +29,13 @@ LICENSE
|
|||
package revid
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
|
||||
"github.com/Comcast/gots/packet"
|
||||
|
||||
"bitbucket.org/ausocean/av/rtmp"
|
||||
"bitbucket.org/ausocean/av/stream/mts"
|
||||
"bitbucket.org/ausocean/av/stream/rtp"
|
||||
|
@ -45,6 +44,33 @@ import (
|
|||
"bitbucket.org/ausocean/utils/ring"
|
||||
)
|
||||
|
||||
// Sender is intended to provided functionality for the sending of a byte slice
|
||||
// to an implemented destination.
|
||||
type Sender interface {
|
||||
// send takes the bytes slice d and sends to a particular destination as
|
||||
// implemented.
|
||||
send(d []byte) error
|
||||
}
|
||||
|
||||
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
|
||||
type minimalHttpSender struct {
|
||||
client *netsender.Sender
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
}
|
||||
|
||||
// newMinimalHttpSender returns a pointer to a new minimalHttpSender.
|
||||
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
|
||||
return &minimalHttpSender{
|
||||
client: ns,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// send takes a bytes slice d and sends to http using s' http client.
|
||||
func (s *minimalHttpSender) send(d []byte) error {
|
||||
return httpSend(d, s.client, s.log)
|
||||
}
|
||||
|
||||
// loadSender is a destination to send a *ring.Chunk to.
|
||||
// When a loadSender has finished using the *ring.Chunk
|
||||
// it must be Closed.
|
||||
|
@ -105,6 +131,86 @@ func (s *fileSender) close() error {
|
|||
return s.file.Close()
|
||||
}
|
||||
|
||||
// mtsSender implemented loadSender and provides sending capability specifically
|
||||
// for use with MPEGTS packetization. It handles the construction of appropriately
|
||||
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
|
||||
// setting the discontinuity indicator for the first packet of a clip.
|
||||
type mtsSender struct {
|
||||
sender Sender
|
||||
buf []byte
|
||||
next []byte
|
||||
pkt packet.Packet
|
||||
failed bool
|
||||
discarded bool
|
||||
repairer *mts.DiscontinuityRepairer
|
||||
chunk *ring.Chunk
|
||||
curPid int
|
||||
}
|
||||
|
||||
// newMtsSender returns a new mtsSender.
|
||||
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
|
||||
return &mtsSender{
|
||||
sender: s,
|
||||
repairer: mts.NewDiscontinuityRepairer(),
|
||||
}
|
||||
}
|
||||
|
||||
// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and
|
||||
// assigning to s.curPid. s.next if exists is also appended to the sender buf.
|
||||
func (s *mtsSender) load(c *ring.Chunk) error {
|
||||
if s.next != nil {
|
||||
s.buf = append(s.buf, s.next...)
|
||||
}
|
||||
s.chunk = c
|
||||
bytes := s.chunk.Bytes()
|
||||
s.next = bytes
|
||||
copy(s.pkt[:], bytes)
|
||||
s.curPid = s.pkt.PID()
|
||||
return nil
|
||||
}
|
||||
|
||||
// send checks the currently loaded paackets PID; if it is a PAT then what is in
|
||||
// the mtsSenders buffer is fixed and sent.
|
||||
func (ms *mtsSender) send() error {
|
||||
if ms.curPid == mts.PatPid && len(ms.buf) > 0 {
|
||||
err := ms.fixAndSend()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ms.buf = ms.buf[:0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// fixAndSend checks for discontinuities in the senders buffer and then sends.
|
||||
// If a discontinuity is found the PAT packet at the start of the clip has it's
|
||||
// discontintuity indicator set to true.
|
||||
func (ms *mtsSender) fixAndSend() error {
|
||||
err := ms.repairer.Repair(ms.buf)
|
||||
if err == nil {
|
||||
err = ms.sender.send(ms.buf)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
ms.failed = true
|
||||
ms.repairer.Failed()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *mtsSender) close() error { return nil }
|
||||
|
||||
// release will set the s.fail flag to false and clear the buffer if
|
||||
// the previous send was a fail. The currently loaded chunk is also closed.
|
||||
func (s *mtsSender) release() {
|
||||
if s.failed {
|
||||
s.buf = s.buf[:0]
|
||||
s.failed = false
|
||||
}
|
||||
s.chunk.Close()
|
||||
s.chunk = nil
|
||||
}
|
||||
|
||||
// httpSender implements loadSender for posting HTTP to NetReceiver
|
||||
type httpSender struct {
|
||||
client *netsender.Sender
|
||||
|
@ -133,15 +239,19 @@ func (s *httpSender) send() error {
|
|||
// if the chunk has been cleared.
|
||||
return nil
|
||||
}
|
||||
return httpSend(s.chunk.Bytes(), s.client, s.log)
|
||||
}
|
||||
|
||||
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
|
||||
// Only send if "V0" is configured as an input.
|
||||
send := false
|
||||
ip := s.client.Param("ip")
|
||||
ip := client.Param("ip")
|
||||
pins := netsender.MakePins(ip, "V")
|
||||
for i, pin := range pins {
|
||||
if pin.Name == "V0" {
|
||||
send = true
|
||||
pins[i].Value = s.chunk.Len()
|
||||
pins[i].Data = s.chunk.Bytes()
|
||||
pins[i].Value = len(d)
|
||||
pins[i].Data = d
|
||||
pins[i].MimeType = "video/mp2t"
|
||||
break
|
||||
}
|
||||
|
@ -152,17 +262,16 @@ func (s *httpSender) send() error {
|
|||
}
|
||||
var err error
|
||||
var reply string
|
||||
reply, _, err = s.client.Send(netsender.RequestRecv, pins)
|
||||
reply, _, err = client.Send(netsender.RequestRecv, pins)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.extractMeta(reply)
|
||||
return extractMeta(reply, log)
|
||||
}
|
||||
|
||||
// extractMeta looks at a reply at extracts any time or location data - then used
|
||||
// to update time and location information in the mpegts encoder.
|
||||
func (s *httpSender) extractMeta(r string) error {
|
||||
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
|
||||
dec, err := netsender.NewJSONDecoder(r)
|
||||
if err != nil {
|
||||
return nil
|
||||
|
@ -170,18 +279,18 @@ func (s *httpSender) extractMeta(r string) error {
|
|||
// Extract time from reply
|
||||
t, err := dec.Int("ts")
|
||||
if err != nil {
|
||||
s.log(logger.Warning, pkg+"No timestamp in reply")
|
||||
log(logger.Warning, pkg+"No timestamp in reply")
|
||||
} else {
|
||||
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
|
||||
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
|
||||
mts.Meta.Add("ts", strconv.Itoa(t))
|
||||
}
|
||||
|
||||
// Extract location from reply
|
||||
g, err := dec.String("ll")
|
||||
if err != nil {
|
||||
s.log(logger.Warning, pkg+"No location in reply")
|
||||
log(logger.Warning, pkg+"No location in reply")
|
||||
} else {
|
||||
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
|
||||
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
|
||||
mts.Meta.Add("loc", g)
|
||||
}
|
||||
|
||||
|
@ -197,59 +306,6 @@ func (s *httpSender) release() {
|
|||
|
||||
func (s *httpSender) close() error { return nil }
|
||||
|
||||
// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
|
||||
type ffmpegSender struct {
|
||||
ffmpeg io.WriteCloser
|
||||
|
||||
chunk *ring.Chunk
|
||||
}
|
||||
|
||||
func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
|
||||
cmd := exec.Command(ffmpegPath,
|
||||
"-f", "h264",
|
||||
"-r", framerate,
|
||||
"-i", "-",
|
||||
"-f", "lavfi",
|
||||
"-i", "aevalsrc=0",
|
||||
"-fflags", "nobuffer",
|
||||
"-vcodec", "copy",
|
||||
"-acodec", "aac",
|
||||
"-map", "0:0",
|
||||
"-map", "1:0",
|
||||
"-strict", "experimental",
|
||||
"-f", "flv",
|
||||
url,
|
||||
)
|
||||
w, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ffmpegSender{ffmpeg: w}, nil
|
||||
}
|
||||
|
||||
func (s *ffmpegSender) load(c *ring.Chunk) error {
|
||||
s.chunk = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ffmpegSender) send() error {
|
||||
_, err := s.chunk.WriteTo(s.ffmpeg)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ffmpegSender) release() {
|
||||
s.chunk.Close()
|
||||
s.chunk = nil
|
||||
}
|
||||
|
||||
func (s *ffmpegSender) close() error {
|
||||
return s.ffmpeg.Close()
|
||||
}
|
||||
|
||||
// rtmpSender implements loadSender for a native RTMP destination.
|
||||
type rtmpSender struct {
|
||||
conn *rtmp.Conn
|
||||
|
@ -332,46 +388,12 @@ func (s *rtmpSender) close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// udpSender implements loadSender for a native udp destination.
|
||||
type udpSender struct {
|
||||
conn net.Conn
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
chunk *ring.Chunk
|
||||
}
|
||||
|
||||
func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) {
|
||||
conn, err := net.Dial("udp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &udpSender{
|
||||
conn: conn,
|
||||
log: log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *udpSender) load(c *ring.Chunk) error {
|
||||
s.chunk = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *udpSender) send() error {
|
||||
_, err := s.chunk.WriteTo(s.conn)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *udpSender) release() {
|
||||
s.chunk.Close()
|
||||
s.chunk = nil
|
||||
}
|
||||
|
||||
func (s *udpSender) close() error { return nil }
|
||||
|
||||
// TODO: Write restart func for rtpSender
|
||||
// rtpSender implements loadSender for a native udp destination with rtp packetization.
|
||||
type rtpSender struct {
|
||||
log func(lvl int8, msg string, args ...interface{})
|
||||
encoder *rtp.Encoder
|
||||
chunk *ring.Chunk
|
||||
}
|
||||
|
||||
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
|
||||
|
@ -386,12 +408,19 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func (s *rtpSender) send(d []byte) error {
|
||||
var err error
|
||||
if d != nil {
|
||||
_, err = s.encoder.Write(d)
|
||||
} else {
|
||||
err = errors.New("no data to send provided")
|
||||
}
|
||||
func (s *rtpSender) load(c *ring.Chunk) error {
|
||||
s.chunk = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *rtpSender) close() error { return nil }
|
||||
|
||||
func (s *rtpSender) release() {
|
||||
s.chunk.Close()
|
||||
s.chunk = nil
|
||||
}
|
||||
|
||||
func (s *rtpSender) send() error {
|
||||
_, err := s.chunk.WriteTo(s.encoder)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,260 @@
|
|||
/*
|
||||
NAME
|
||||
mtsSender_test.go
|
||||
|
||||
DESCRIPTION
|
||||
mtsSender_test.go contains tests that validate the functionalilty of the
|
||||
mtsSender under senders.go. Tests include checks that the mtsSender is
|
||||
segmenting sends correctly, and also that it can correct discontinuities.
|
||||
|
||||
AUTHORS
|
||||
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||
|
||||
LICENSE
|
||||
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
||||
|
||||
It is free software: you can redistribute it and/or modify them
|
||||
under the terms of the GNU General Public License as published by the
|
||||
Free Software Foundation, either version 3 of the License, or (at your
|
||||
option) any later version.
|
||||
|
||||
It is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||
*/
|
||||
package revid
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Comcast/gots/packet"
|
||||
"github.com/Comcast/gots/pes"
|
||||
|
||||
"bitbucket.org/ausocean/av/stream/mts"
|
||||
"bitbucket.org/ausocean/av/stream/mts/meta"
|
||||
"bitbucket.org/ausocean/utils/logger"
|
||||
"bitbucket.org/ausocean/utils/ring"
|
||||
)
|
||||
|
||||
// Ring buffer sizes and read/write timeouts.
|
||||
const (
|
||||
rbSize = 100
|
||||
rbElementSize = 150000
|
||||
wTimeout = 10 * time.Millisecond
|
||||
rTimeout = 10 * time.Millisecond
|
||||
)
|
||||
|
||||
// sender simulates sending of video data, creating discontinuities if
|
||||
// testDiscontinuities is set to true.
|
||||
type sender struct {
|
||||
buf [][]byte
|
||||
testDiscontinuities bool
|
||||
discontinuityAt int
|
||||
currentPkt int
|
||||
}
|
||||
|
||||
// send takes d and neglects if testDiscontinuities is true, returning an error,
|
||||
// otherwise d is appended to senders buf.
|
||||
func (ts *sender) send(d []byte) error {
|
||||
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
|
||||
ts.currentPkt++
|
||||
return errors.New("could not send")
|
||||
}
|
||||
cpy := make([]byte, len(d))
|
||||
copy(cpy, d)
|
||||
ts.buf = append(ts.buf, cpy)
|
||||
ts.currentPkt++
|
||||
return nil
|
||||
}
|
||||
|
||||
// log implements the required logging func for some of the structs in use
|
||||
// within tests.
|
||||
func log(lvl int8, msg string, args ...interface{}) {
|
||||
var l string
|
||||
switch lvl {
|
||||
case logger.Warning:
|
||||
l = "warning"
|
||||
case logger.Debug:
|
||||
l = "debug"
|
||||
case logger.Info:
|
||||
l = "info"
|
||||
case logger.Error:
|
||||
l = "error"
|
||||
case logger.Fatal:
|
||||
l = "fatal"
|
||||
}
|
||||
msg = l + ": " + msg
|
||||
for i := 0; i < len(args); i++ {
|
||||
msg += " %v"
|
||||
}
|
||||
fmt.Printf(msg, args)
|
||||
}
|
||||
|
||||
// buffer implements io.Writer and handles the writing of data to a
|
||||
// ring buffer used in tests.
|
||||
type buffer ring.Buffer
|
||||
|
||||
// Write implements the io.Writer interface.
|
||||
func (b *buffer) Write(d []byte) (int, error) {
|
||||
r := (*ring.Buffer)(b)
|
||||
n, err := r.Write(d)
|
||||
r.Flush()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// TestSegment ensures that the mtsSender correctly segments data into clips
|
||||
// based on positioning of PSI in the mtsEncoder's output stream.
|
||||
func TestSegment(t *testing.T) {
|
||||
mts.Meta = meta.New()
|
||||
|
||||
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
|
||||
tstSender := &sender{}
|
||||
loadSender := newMtsSender(tstSender, log)
|
||||
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
|
||||
encoder := mts.NewEncoder((*buffer)(rb), 25)
|
||||
|
||||
// Turn time based PSI writing off for encoder.
|
||||
const psiSendCount = 10
|
||||
encoder.TimeBasedPsi(false, psiSendCount)
|
||||
|
||||
const noOfPacketsToWrite = 100
|
||||
for i := 0; i < noOfPacketsToWrite; i++ {
|
||||
// Insert a payload so that we check that the segmentation works correctly
|
||||
// in this regard. Packet number will be used.
|
||||
encoder.Encode([]byte{byte(i)})
|
||||
rb.Flush()
|
||||
|
||||
for {
|
||||
next, err := rb.Next(rTimeout)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
err = loadSender.load(next)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected err: %v\n", err)
|
||||
}
|
||||
|
||||
err = loadSender.send()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected err: %v\n", err)
|
||||
}
|
||||
loadSender.release()
|
||||
}
|
||||
}
|
||||
|
||||
result := tstSender.buf
|
||||
expectData := 0
|
||||
for clipNo, clip := range result {
|
||||
t.Logf("Checking clip: %v\n", clipNo)
|
||||
|
||||
// Check that the clip is of expected length.
|
||||
clipLen := len(clip)
|
||||
if clipLen != psiSendCount*mts.PacketSize {
|
||||
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
|
||||
}
|
||||
|
||||
// Also check that the first packet is a PAT.
|
||||
firstPkt := clip[:mts.PacketSize]
|
||||
var pkt packet.Packet
|
||||
copy(pkt[:], firstPkt)
|
||||
pid := pkt.PID()
|
||||
if pid != mts.PatPid {
|
||||
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
|
||||
}
|
||||
|
||||
// Check that the clip data is okay.
|
||||
for i := 0; i < len(clip); i += mts.PacketSize {
|
||||
copy(pkt[:], clip[i:i+mts.PacketSize])
|
||||
if pkt.PID() == mts.VideoPid {
|
||||
payload, err := pkt.Payload()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected err: %v\n", err)
|
||||
}
|
||||
|
||||
// Parse PES from the MTS payload.
|
||||
pes, err := pes.NewPESHeader(payload)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected err: %v\n", err)
|
||||
}
|
||||
|
||||
// Get the data from the PES packet and convert to an int.
|
||||
data := int8(pes.Data()[0])
|
||||
|
||||
// Calc expected data in the PES and then check.
|
||||
if data != int8(expectData) {
|
||||
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
|
||||
}
|
||||
expectData++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendFailDiscontinuity(t *testing.T) {
|
||||
mts.Meta = meta.New()
|
||||
|
||||
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
|
||||
const clipWithDiscontinuity = 3
|
||||
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
|
||||
loadSender := newMtsSender(tstSender, log)
|
||||
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
|
||||
encoder := mts.NewEncoder((*buffer)(rb), 25)
|
||||
|
||||
// Turn time based PSI writing off for encoder.
|
||||
const psiSendCount = 10
|
||||
encoder.TimeBasedPsi(false, psiSendCount)
|
||||
|
||||
const noOfPacketsToWrite = 100
|
||||
for i := 0; i < noOfPacketsToWrite; i++ {
|
||||
// Our payload will just be packet number.
|
||||
encoder.Encode([]byte{byte(i)})
|
||||
rb.Flush()
|
||||
|
||||
for {
|
||||
next, err := rb.Next(rTimeout)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
err = loadSender.load(next)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected err: %v\n", err)
|
||||
}
|
||||
|
||||
loadSender.send()
|
||||
loadSender.release()
|
||||
}
|
||||
}
|
||||
|
||||
result := tstSender.buf
|
||||
|
||||
// First check that we have less clips as expected.
|
||||
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
|
||||
gotLen := len(result)
|
||||
if gotLen != expectedLen {
|
||||
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
|
||||
}
|
||||
|
||||
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
|
||||
disconClip := result[clipWithDiscontinuity]
|
||||
firstPkt := disconClip[:mts.PacketSize]
|
||||
var pkt packet.Packet
|
||||
copy(pkt[:], firstPkt)
|
||||
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected err: %v\n", err)
|
||||
}
|
||||
|
||||
if !discon {
|
||||
t.Fatalf("Did not get discontinuity indicator for PAT")
|
||||
}
|
||||
|
||||
}
|
|
@ -75,28 +75,32 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
|||
port = uint16(pi)
|
||||
}
|
||||
|
||||
if !path.IsAbs(u.Path) {
|
||||
return protocol, host, port, app, playpath, nil
|
||||
}
|
||||
elems := strings.SplitN(u.Path[1:], "/", 3)
|
||||
app = elems[0]
|
||||
if app == "" {
|
||||
if len(u.Path) < 1 || !path.IsAbs(u.Path) {
|
||||
return protocol, host, port, app, playpath, errInvalidURL
|
||||
}
|
||||
playpath = elems[1]
|
||||
if len(elems) == 3 && len(elems[2]) != 0 {
|
||||
playpath = path.Join(elems[1:]...)
|
||||
switch ext := path.Ext(playpath); ext {
|
||||
case ".f4v", ".mp4":
|
||||
playpath = "mp4:" + playpath[:len(playpath)-len(ext)]
|
||||
case ".mp3":
|
||||
playpath = "mp3:" + playpath[:len(playpath)-len(ext)]
|
||||
case ".flv":
|
||||
playpath = playpath[:len(playpath)-len(ext)]
|
||||
elems := strings.SplitN(u.Path[1:], "/", 3)
|
||||
if len(elems) < 2 || elems[0] == "" || elems[1] == "" {
|
||||
return protocol, host, port, app, playpath, errInvalidURL
|
||||
}
|
||||
app = elems[0]
|
||||
playpath = path.Join(elems[1:]...)
|
||||
|
||||
switch ext := path.Ext(playpath); ext {
|
||||
case ".f4v", ".mp4":
|
||||
playpath = playpath[:len(playpath)-len(ext)]
|
||||
if !strings.HasPrefix(playpath, "mp4:") {
|
||||
playpath = "mp4:" + playpath
|
||||
}
|
||||
if u.RawQuery != "" {
|
||||
playpath += "?" + u.RawQuery
|
||||
case ".mp3":
|
||||
playpath = playpath[:len(playpath)-len(ext)]
|
||||
if !strings.HasPrefix(playpath, "mp3:") {
|
||||
playpath = "mp3:" + playpath
|
||||
}
|
||||
case ".flv":
|
||||
playpath = playpath[:len(playpath)-len(ext)]
|
||||
}
|
||||
if u.RawQuery != "" {
|
||||
playpath += "?" + u.RawQuery
|
||||
}
|
||||
|
||||
switch {
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
NAME
|
||||
parseurl_test.go
|
||||
|
||||
DESCRIPTION
|
||||
See Readme.md
|
||||
|
||||
AUTHOR
|
||||
Dan Kortschak <dan@ausocean.org>
|
||||
|
||||
LICENSE
|
||||
parseurl.go is Copyright (C) 2019 the Australian Ocean Lab (AusOcean)
|
||||
|
||||
It is free software: you can redistribute it and/or modify them
|
||||
under the terms of the GNU General Public License as published by the
|
||||
Free Software Foundation, either version 3 of the License, or (at your
|
||||
option) any later version.
|
||||
|
||||
It is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||
*/
|
||||
package rtmp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
var parseURLTests = []struct {
|
||||
url string
|
||||
wantProtocol int32
|
||||
wantHost string
|
||||
wantPort uint16
|
||||
wantApp string
|
||||
wantPlaypath string
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
url: "rtmp://addr",
|
||||
wantErr: errInvalidURL,
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/",
|
||||
wantErr: errInvalidURL,
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/live2",
|
||||
wantErr: errInvalidURL,
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/live2/",
|
||||
wantErr: errInvalidURL,
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/key",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "key",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/instancename",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "instancename",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/instancename/",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "instancename",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/mp4:path.f4v",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/mp4:path.f4v?param1=value1¶m2=value2",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path?param1=value1¶m2=value2",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/mp4:path/to/file.f4v",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path/to/file",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/mp4:path/to/file.f4v?param1=value1¶m2=value2",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path/to/file?param1=value1¶m2=value2",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path.mp4",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path.mp4?param1=value1¶m2=value2",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path?param1=value1¶m2=value2",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path/to/file.mp4",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path/to/file",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path/to/file.mp4?param1=value1¶m2=value2",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "mp4:path/to/file?param1=value1¶m2=value2",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path.flv",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "path",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path.flv?param1=value1¶m2=value2",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "path?param1=value1¶m2=value2",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path/to/file.flv",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "path/to/file",
|
||||
},
|
||||
{
|
||||
url: "rtmp://addr/appname/path/to/file.flv?param1=value1¶m2=value2",
|
||||
wantHost: "addr",
|
||||
wantPort: 1935,
|
||||
wantApp: "appname",
|
||||
wantPlaypath: "path/to/file?param1=value1¶m2=value2",
|
||||
},
|
||||
}
|
||||
|
||||
func TestParseURL(t *testing.T) {
|
||||
for _, test := range parseURLTests {
|
||||
func() {
|
||||
defer func() {
|
||||
p := recover()
|
||||
if p != nil {
|
||||
t.Errorf("unexpected panic for %q: %v", test.url, p)
|
||||
}
|
||||
}()
|
||||
|
||||
protocol, host, port, app, playpath, err := parseURL(test.url)
|
||||
if err != test.wantErr {
|
||||
t.Errorf("unexpected error for %q: got:%v want:%v", test.url, err, test.wantErr)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if protocol != test.wantProtocol {
|
||||
t.Errorf("unexpected protocol for %q: got:%v want:%v", test.url, protocol, test.wantProtocol)
|
||||
}
|
||||
if host != test.wantHost {
|
||||
t.Errorf("unexpected host for %q: got:%v want:%v", test.url, host, test.wantHost)
|
||||
}
|
||||
if port != test.wantPort {
|
||||
t.Errorf("unexpected port for %q: got:%v want:%v", test.url, port, test.wantPort)
|
||||
}
|
||||
if app != test.wantApp {
|
||||
t.Errorf("unexpected app for %q: got:%v want:%v", test.url, app, test.wantApp)
|
||||
}
|
||||
if playpath != test.wantPlaypath {
|
||||
t.Errorf("unexpected playpath for %q: got:%v want:%v", test.url, playpath, test.wantPlaypath)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
NAME
|
||||
discontinuity.go
|
||||
|
||||
DESCRIPTION
|
||||
discontinuity.go provides functionality for detecting discontinuities in
|
||||
mpegts and accounting for using the discontinuity indicator in the adaptation
|
||||
field.
|
||||
|
||||
AUTHOR
|
||||
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||
|
||||
LICENSE
|
||||
discontinuity.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
||||
|
||||
It is free software: you can redistribute it and/or modify them
|
||||
under the terms of the GNU General Public License as published by the
|
||||
Free Software Foundation, either version 3 of the License, or (at your
|
||||
option) any later version.
|
||||
|
||||
It is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
|
||||
*/
|
||||
|
||||
package mts
|
||||
|
||||
import (
|
||||
"github.com/Comcast/gots/packet"
|
||||
)
|
||||
|
||||
// discontinuityRepairer provides function to detect discontinuities in mpegts
|
||||
// and set the discontinuity indicator as appropriate.
|
||||
type DiscontinuityRepairer struct {
|
||||
expCC map[int]int
|
||||
}
|
||||
|
||||
// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer.
|
||||
func NewDiscontinuityRepairer() *DiscontinuityRepairer {
|
||||
return &DiscontinuityRepairer{
|
||||
expCC: map[int]int{
|
||||
PatPid: 16,
|
||||
PmtPid: 16,
|
||||
VideoPid: 16,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Failed is to be called in the case of a failed send. This will decrement the
|
||||
// expectedCC so that it aligns with the failed chunks cc.
|
||||
func (dr *DiscontinuityRepairer) Failed() {
|
||||
dr.decExpectedCC(PatPid)
|
||||
}
|
||||
|
||||
// Repair takes a clip of mpegts and checks that the first packet, which should
|
||||
// be a PAT, contains a cc that is expected, otherwise the discontinuity indicator
|
||||
// is set to true.
|
||||
func (dr *DiscontinuityRepairer) Repair(d []byte) error {
|
||||
var pkt packet.Packet
|
||||
copy(pkt[:], d[:PacketSize])
|
||||
pid := pkt.PID()
|
||||
if pid != PatPid {
|
||||
panic("Clip to repair must have PAT first")
|
||||
}
|
||||
cc := pkt.ContinuityCounter()
|
||||
expect, _ := dr.ExpectedCC(pid)
|
||||
if cc != int(expect) {
|
||||
if packet.ContainsAdaptationField(&pkt) {
|
||||
(*packet.AdaptationField)(&pkt).SetDiscontinuity(true)
|
||||
} else {
|
||||
err := addAdaptationField(&pkt, DiscontinuityIndicator(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dr.SetExpectedCC(pid, cc)
|
||||
copy(d[:PacketSize], pkt[:])
|
||||
}
|
||||
dr.IncExpectedCC(pid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16
|
||||
// and false is returned.
|
||||
func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) {
|
||||
if dr.expCC[pid] == 16 {
|
||||
return 16, false
|
||||
}
|
||||
return dr.expCC[pid], true
|
||||
}
|
||||
|
||||
// incExpectedCC increments the expected cc.
|
||||
func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) {
|
||||
dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf
|
||||
}
|
||||
|
||||
// decExpectedCC decrements the expected cc.
|
||||
func (dr *DiscontinuityRepairer) decExpectedCC(pid int) {
|
||||
dr.expCC[pid] = (dr.expCC[pid] - 1) & 0xf
|
||||
}
|
||||
|
||||
// setExpectedCC sets the expected cc.
|
||||
func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) {
|
||||
dr.expCC[pid] = cc
|
||||
}
|
|
@ -85,7 +85,8 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
psiInterval = 1 * time.Second
|
||||
psiInterval = 1 * time.Second
|
||||
psiSendCount = 7
|
||||
)
|
||||
|
||||
// Meta allows addition of metadata to encoded mts from outside of this pkg.
|
||||
|
@ -130,6 +131,10 @@ type Encoder struct {
|
|||
|
||||
continuity map[int]byte
|
||||
|
||||
timeBasedPsi bool
|
||||
pktCount int
|
||||
psiSendCount int
|
||||
|
||||
psiLastTime time.Time
|
||||
}
|
||||
|
||||
|
@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder {
|
|||
frameInterval: time.Duration(float64(time.Second) / fps),
|
||||
ptsOffset: ptsOffset,
|
||||
|
||||
timeBasedPsi: true,
|
||||
|
||||
pktCount: 8,
|
||||
|
||||
continuity: map[int]byte{
|
||||
patPid: 0,
|
||||
pmtPid: 0,
|
||||
|
@ -159,11 +168,22 @@ const (
|
|||
hasPTS = 0x2
|
||||
)
|
||||
|
||||
// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if
|
||||
// PSI is written based on some time duration, or based on a packet count.
|
||||
// If b is true, then time based PSI is used, otherwise the PSI is written
|
||||
// every sendCount.
|
||||
func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
|
||||
e.timeBasedPsi = b
|
||||
e.psiSendCount = sendCount
|
||||
e.pktCount = e.psiSendCount
|
||||
}
|
||||
|
||||
// generate handles the incoming data and generates equivalent mpegts packets -
|
||||
// sending them to the output channel.
|
||||
func (e *Encoder) Encode(nalu []byte) error {
|
||||
now := time.Now()
|
||||
if now.Sub(e.psiLastTime) > psiInterval {
|
||||
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
|
||||
e.pktCount = 0
|
||||
err := e.writePSI()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -204,6 +224,7 @@ func (e *Encoder) Encode(nalu []byte) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.pktCount++
|
||||
}
|
||||
|
||||
e.tick()
|
||||
|
@ -226,6 +247,7 @@ func (e *Encoder) writePSI() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.pktCount++
|
||||
pmtTable, err = updateMeta(pmtTable)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -243,6 +265,7 @@ func (e *Encoder) writePSI() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.pktCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -144,6 +144,9 @@ func (m *Data) Delete(key string) {
|
|||
// Encode takes the meta data map and encodes into a byte slice with header
|
||||
// describing the version, length of data and data in TSV format.
|
||||
func (m *Data) Encode() []byte {
|
||||
if m.enc == nil {
|
||||
panic("Meta has not been initialized yet")
|
||||
}
|
||||
m.enc = m.enc[:headSize]
|
||||
|
||||
// Iterate over map and append entries, only adding tab if we're not on the
|
||||
|
|
|
@ -31,6 +31,8 @@ package mts
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/Comcast/gots/packet"
|
||||
)
|
||||
|
||||
// General mpegts packet properties.
|
||||
|
@ -55,11 +57,14 @@ const HeadSize = 4
|
|||
|
||||
// Consts relating to adaptation field.
|
||||
const (
|
||||
AdaptationIdx = 4 // Index to the adaptation field (index of AFL).
|
||||
AdaptationControlIdx = 3 // Index to octet with adaptation field control.
|
||||
AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields.
|
||||
DefaultAdaptationSize = 2 // Default size of the adaptation field.
|
||||
AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3.
|
||||
AdaptationIdx = 4 // Index to the adaptation field (index of AFL).
|
||||
AdaptationControlIdx = 3 // Index to octet with adaptation field control.
|
||||
AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields.
|
||||
DefaultAdaptationSize = 2 // Default size of the adaptation field.
|
||||
AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3.
|
||||
DefaultAdaptationBodySize = 1 // Default size of the adaptation field body.
|
||||
DiscontinuityIndicatorMask = 0x80 // Mask for the discontinuity indicator at the discontinuity indicator idk.
|
||||
DiscontinuityIndicatorIdx = AdaptationIdx + 1 // The index at which the discontinuity indicator is found in an MTS packet.
|
||||
)
|
||||
|
||||
// TODO: make this better - currently doesn't make sense.
|
||||
|
@ -257,3 +262,52 @@ func (p *Packet) Bytes(buf []byte) []byte {
|
|||
buf = append(buf, p.Payload...)
|
||||
return buf
|
||||
}
|
||||
|
||||
type Option func(p *packet.Packet)
|
||||
|
||||
// addAdaptationField adds an adaptation field to p, and applys the passed options to this field.
|
||||
// TODO: this will probably break if we already have adaptation field.
|
||||
func addAdaptationField(p *packet.Packet, options ...Option) error {
|
||||
if packet.ContainsAdaptationField((*packet.Packet)(p)) {
|
||||
return errors.New("Adaptation field is already present in packet")
|
||||
}
|
||||
// Create space for adaptation field.
|
||||
copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize])
|
||||
|
||||
// TODO: seperate into own function
|
||||
// Update adaptation field control.
|
||||
p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask
|
||||
p[AdaptationControlIdx] |= AdaptationControlMask
|
||||
// Default the adaptationfield.
|
||||
resetAdaptation(p)
|
||||
|
||||
// Apply and options that have bee passed.
|
||||
for _, option := range options {
|
||||
option(p)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field
|
||||
// exists, otherwise an error is returned.
|
||||
func resetAdaptation(p *packet.Packet) error {
|
||||
if !packet.ContainsAdaptationField((*packet.Packet)(p)) {
|
||||
return errors.New("No adaptation field in this packet")
|
||||
}
|
||||
p[AdaptationIdx] = DefaultAdaptationBodySize
|
||||
p[AdaptationIdx+1] = 0x00
|
||||
return nil
|
||||
}
|
||||
|
||||
// DiscontinuityIndicator returns and Option that will set p's discontinuity
|
||||
// indicator according to f.
|
||||
func DiscontinuityIndicator(f bool) Option {
|
||||
return func(p *packet.Packet) {
|
||||
set := byte(DiscontinuityIndicatorMask)
|
||||
if !f {
|
||||
set = 0x00
|
||||
}
|
||||
p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask
|
||||
p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue