Merged in use-gvctrl-in-revid (pull request #266)

Use gvctrl in revid

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2019-10-25 01:27:05 +00:00
commit 392ed399d9
14 changed files with 213 additions and 56 deletions

View File

@ -3,7 +3,7 @@ jobs:
build:
docker:
# CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/
- image: circleci/golang:1.12
- image: circleci/golang:1.13
environment:
GO111MODULE: "on"

View File

@ -108,7 +108,7 @@ func main() {
}
logSender := smartlogger.New(logPath)
log = logger.New(int8(logLevel), &logSender.LogRoller)
log = logger.New(int8(logLevel), &logSender.LogRoller, true)
log.Log(logger.Info, "log-netsender: Logger Initialized")
if !validLogLevel {
log.Log(logger.Error, "invalid log level was defaulted to Info")

View File

@ -109,7 +109,7 @@ func handleFlags() revid.Config {
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
inputCodecPtr = flag.String("InputCodec", "H264", "The codec of the input: H264, Mjpeg, PCM, ADPCM")
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, v4l, Audio, RTSP")
rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.")
cameraIPPtr = flag.String("CameraIP", "", "The IP of the RTSP server")
verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Debug, Info, Warning, Error, Fatal")
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
logPathPtr = flag.String("LogPath", defaultLogPath, "The log path")
@ -158,7 +158,7 @@ func handleFlags() revid.Config {
cfg.LogLevel = defaultLogVerbosity
}
log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller)
log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller, true)
cfg.Logger = log
@ -231,7 +231,7 @@ func handleFlags() revid.Config {
netsender.ConfigFile = *configFilePtr
}
cfg.RTSPURL = *rtspURLPtr
cfg.CameraIP = *cameraIPPtr
cfg.Rotation = *rotationPtr
cfg.FlipHorizontal = *horizontalFlipPtr
cfg.FlipVertical = *verticalFlipPtr

View File

@ -143,7 +143,6 @@ func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) e
}
}
}
return nil
}
// handleSTAPA parses NAL units from an aggregation packet and writes
@ -207,7 +206,7 @@ func (e *Extracter) handleFUA(d []byte) {
func (e *Extracter) writeWithPrefix(d []byte) {
e.toWrite = append(e.toWrite, d...)
curType, _ := NALType(e.toWrite)
if e.buf.Len() != 0 && (curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) {
if e.buf.Len() != 0 && (curType == h264dec.NALTypeSPS || curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) {
e.buf.WriteTo(e.dst)
e.buf.Reset()
e.buf.Write(aud)

3
go.mod
View File

@ -5,7 +5,7 @@ go 1.13
require (
bitbucket.org/ausocean/iot v1.2.7
bitbucket.org/ausocean/test v0.0.0-20190821085226-7a524f2344ba
bitbucket.org/ausocean/utils v1.2.9
bitbucket.org/ausocean/utils v1.2.10
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
@ -13,5 +13,6 @@ require (
github.com/mewkiz/flac v1.0.5
github.com/pkg/errors v0.8.1
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e
gocv.io/x/gocv v0.21.0
gopkg.in/yaml.v2 v2.2.2 // indirect
)

7
go.sum
View File

@ -1,15 +1,20 @@
bitbucket.org/ausocean/av v0.0.0-20190416003121-6ee286e98874/go.mod h1:DxZEprrNNQ2slHKAQVUHryDaWc5CbjxyHAvomhzg+AE=
bitbucket.org/ausocean/av/input/gvctrl v0.0.0-20191017223116-ce6c12cce8cd h1:L99pvZZtdy3v54ym6GswYi8SOGgz+4Tr8hiIOI+nSiQ=
bitbucket.org/ausocean/av/input/gvctrl v0.0.0-20191017223116-ce6c12cce8cd/go.mod h1:Hg522DOVaj23J7CIxknCxmNsLGdg1iZ+Td1FDcTOdLQ=
bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU=
bitbucket.org/ausocean/iot v1.2.6 h1:KAAY1KZDbyOpoKajT1dM8BawupHiW9hUOelseSV1Ptc=
bitbucket.org/ausocean/iot v1.2.6/go.mod h1:71AYHh8yGZ8XyzDBskwIWMF+8E8ORagXpXE24wlhoE0=
bitbucket.org/ausocean/iot v1.2.7 h1:dZgrmVtuXnzHgybDthn0bYgAJms9euTONXBsqsx9g5M=
bitbucket.org/ausocean/iot v1.2.7/go.mod h1:aAWgPo2f8sD2OPmxae1E5/iD9+tKY/iW4pcQMQXUvHM=
bitbucket.org/ausocean/test v0.0.0-20190821085226-7a524f2344ba/go.mod h1:MbKtu9Pu8l3hiVGX6ep8S1VwAVY5uCbifCFOYsm914w=
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.6/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.8 h1:hyxAIqYBqjqCguG+6A/kKyrAihyeUt2LziZg6CH0gLU=
bitbucket.org/ausocean/utils v1.2.8/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.9 h1:g45C6KCNvCLOGFv+ZnmDbQOOdnwpIsvzuNOD141CTVI=
bitbucket.org/ausocean/utils v1.2.9/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.10 h1:JTS7n+K+0o/FQFWKjdGgA1ElZ4TQu9aHX3wTJXOayXw=
bitbucket.org/ausocean/utils v1.2.10/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=
@ -55,6 +60,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
gocv.io/x/gocv v0.21.0 h1:dVjagrupZrfCRY0qPEaYWgoNMRpBel6GYDH4mvQOK8Y=
gocv.io/x/gocv v0.21.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs=
golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -47,7 +47,7 @@ func TestDevice(t *testing.T) {
n := 2 // Number of periods to wait while recording.
// Create a new audio Device, start, read/lex, and then stop it.
l := logger.New(logger.Debug, os.Stderr)
l := logger.New(logger.Debug, os.Stderr, true)
ai, err := NewDevice(ac, l)
// If there was an error opening the device, skip this test.
if _, ok := err.(OpenError); ok {

View File

@ -1,3 +0,0 @@
module bitbucket.org/ausocean/av/input/gvctrl
go 1.12

View File

@ -52,7 +52,7 @@ const (
// from which (as well as username and password) two hashes are generated.
// The generated hex is encoded into a url encoded form and returned as a string.
func getLogin(c *http.Client, id, host string) (string, error) {
req, err := http.NewRequest("GET", "https://"+host+loginSubDir, nil)
req, err := http.NewRequest("GET", "http://"+host+loginSubDir, nil)
if err != nil {
return "", fmt.Errorf("can't create GET request for log-in page: %v", err)
}

View File

@ -28,8 +28,10 @@ LICENSE
package rtp
import (
"fmt"
"net"
"sync"
"time"
)
// Client describes an RTP client that can receive an RTP stream and implements
@ -58,6 +60,7 @@ func NewClient(addr string) (*Client, error) {
if err != nil {
return nil, err
}
return c, nil
}
@ -81,6 +84,11 @@ func (c *Client) Read(p []byte) (int, error) {
return n, err
}
// Close will close the RTP client's connection.
func (c *Client) Close() error {
return c.r.PacketConn.Close()
}
// setSequence sets the most recently received sequence number, and updates the
// cycles count if the sequence number has rolled over.
func (c *Client) setSequence(s uint16) {
@ -113,6 +121,11 @@ type PacketReader struct {
// Read implements io.Reader.
func (r PacketReader) Read(b []byte) (int, error) {
const readTimeout = 5 * time.Second
err := r.PacketConn.SetReadDeadline(time.Now().Add(readTimeout))
if err != nil {
return 0, fmt.Errorf("could not set read deadline for PacketConn: %w", err)
}
n, _, err := r.PacketConn.ReadFrom(b)
return n, err
}

View File

@ -63,6 +63,19 @@ var AutoWhiteBalanceModes = [...]string{
"horizon",
}
// quality represents video quality.
type quality int
// The different video qualities that can be used for variable bitrate when
// using the GeoVision camera.
const (
qualityStandard quality = iota
qualityFair
qualityGood
qualityGreat
qualityExcellent
)
// Enums to define inputs, outputs and codecs.
const (
// Indicates no option has been set.
@ -100,8 +113,11 @@ const (
defaultInputCodec = codecutil.H264
defaultVerbosity = logger.Error
defaultRtpAddr = "localhost:6970"
defaultRTSPURL = "rtsp://admin:admin@192.168.1.50:8554/CH001.sdp"
defaultCameraIP = "192.168.1.50"
defaultVBR = false
defaultVBRQuality = qualityStandard
defaultBurstPeriod = 10 // Seconds
defaultVBRBitrate = 500 // kbps
// Raspivid video defaults.
defaultBrightness = 50
@ -113,7 +129,7 @@ const (
defaultMinFrames = 100
defaultClipDuration = 0
defaultQuantization = 30
defaultBitrate = 400000
defaultBitrate = 400
// Audio defaults.
defaultAudioInputCodec = codecutil.ADPCM
@ -152,7 +168,7 @@ type Config struct {
// File:
// Location must be specified in InputPath field.
// RTSP:
// RTSPURL must also be defined.
// CameraIP should also be defined.
Input uint8
// InputCodec defines the input codec we wish to use, and therefore defines the
@ -181,9 +197,9 @@ type Config struct {
// RTMP is to be used as an output.
RTMPURL string
// RTSPURL specifies the RTSP server URL for RTSP input. This must be defined
// when Input is RTSP.
RTSPURL string
// CameraIP is the IP address of the camera in the case of the input camera
// being an IP camera.
CameraIP string
// OutputPath defines the output destination for File output. This must be
// defined if File output is to be used.
@ -204,11 +220,27 @@ type Config struct {
// defined in /etc/netsender.conf.
HTTPAddress string
// Quantization defines the quantization level, which may be a value between
// 0-40. This will only take effect if the Quantize field is true and if we
// are using Raspivid input.
// VBR indicates whether we wish to use constant or variable bitrate. If VBR
// is true then we will use variable bitrate, and constant bitrate otherwise.
// In the case of the Pi camera, variable bitrate quality is controlled by
// the Quantization parameter below. In the case of the GeoVision camera,
// variable bitrate quality is controlled by firstly the VBRQuality parameter
// and second the VBRBitrate parameter.
VBR bool
// Quantization defines the quantization level, which will determine variable
// bitrate quality in the case of input from the Pi Camera.
Quantization uint
// VBRQuality describes the general quality of video from the GeoVision camera
// under variable bitrate. VBRQuality can be one 5 consts defined:
// qualityStandard, qualityFair, qualityGood, qualityGreat and qualityExcellent.
VBRQuality quality
// VBRBitrate describes maximal bitrate for the GeoVision camera when under
// variable bitrate.
VBRBitrate int
// MinFrames defines the frequency of key NAL units SPS, PPS and IDR in
// number of NAL units. This will also determine the frequency of PSI if the
// output container is MPEG-TS. If ClipDuration is less than MinFrames,
@ -251,7 +283,7 @@ type Config struct {
Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input.
Height uint // Height defines the input video height Raspivid input.
Width uint // Width defines the input video width Raspivid input.
Bitrate uint // Bitrate specifies the input bitrate for Raspivid input.
Bitrate uint // Bitrate specifies the bitrate for constant bitrate in kbps.
FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input.
FlipVertical bool // FlipVertial flips video vertically for Raspivid input.
@ -285,13 +317,13 @@ func (c *Config) Validate() error {
c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity)
}
switch c.Input {
case Raspivid, V4L, File, Audio:
case RTSP:
if c.RTSPURL == "" {
c.Logger.Log(logger.Info, pkg+"no RTSPURL defined, defaulting", "RTSPURL", defaultRTSPURL)
c.RTSPURL = defaultRTSPURL
if c.CameraIP == "" {
c.Logger.Log(logger.Info, pkg+"no CameraIP defined, defaulting", "CameraIP", defaultCameraIP)
c.CameraIP = defaultCameraIP
}
switch c.Input {
case Raspivid, V4L, File, Audio, RTSP:
case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput)
c.Input = defaultInput
@ -481,6 +513,18 @@ func (c *Config) Validate() error {
c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout
}
switch c.VBRQuality {
case qualityStandard, qualityFair, qualityGood, qualityGreat, qualityExcellent:
default:
c.Logger.Log(logger.Info, pkg+"VBRQuality bad or unset, defaulting", "VBRQuality", defaultVBRQuality)
c.VBRQuality = defaultVBRQuality
}
if c.VBRBitrate <= 0 {
c.Logger.Log(logger.Info, pkg+"VBRBitrate bad or unset, defaulting", "VBRBitrate", defaultVBRBitrate)
c.VBRBitrate = defaultVBRBitrate
}
return nil
}

View File

@ -40,12 +40,19 @@ import (
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/input/gvctrl"
"bitbucket.org/ausocean/av/protocol/rtcp"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/av/protocol/rtsp"
"bitbucket.org/ausocean/utils/logger"
)
// TODO: remove this when gvctrl has configurable user and pass.
const (
ipCamUser = "admin"
ipCamPass = "admin"
)
// startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() (func() error, error) {
@ -58,7 +65,7 @@ func (r *Revid) startRaspivid() (func() error, error) {
"--timeout", disabled,
"--width", fmt.Sprint(r.config.Width),
"--height", fmt.Sprint(r.config.Height),
"--bitrate", fmt.Sprint(r.config.Bitrate),
"--bitrate", fmt.Sprint(r.config.Bitrate * 1000), // Convert from kbps to bps.
"--framerate", fmt.Sprint(r.config.FrameRate),
"--rotation", fmt.Sprint(r.config.Rotation),
"--brightness", fmt.Sprint(r.config.Brightness),
@ -87,7 +94,7 @@ func (r *Revid) startRaspivid() (func() error, error) {
"--inline",
"--intra", fmt.Sprint(r.config.MinFrames),
)
if r.config.Quantization != 0 {
if r.config.VBR {
args = append(args, "-qp", fmt.Sprint(r.config.Quantization))
}
case codecutil.MJPEG:
@ -102,7 +109,7 @@ func (r *Revid) startRaspivid() (func() error, error) {
}
err = r.cmd.Start()
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
return nil, fmt.Errorf("could not start raspivid command: %w", err)
}
r.wg.Add(1)
@ -126,10 +133,11 @@ func (r *Revid) startV4L() (func() error, error) {
"-r", fmt.Sprint(r.config.FrameRate),
}
br := r.config.Bitrate * 1000
args = append(args,
"-b:v", fmt.Sprint(r.config.Bitrate),
"-maxrate", fmt.Sprint(r.config.Bitrate),
"-bufsize", fmt.Sprint(r.config.Bitrate/2),
"-b:v", fmt.Sprint(br),
"-maxrate", fmt.Sprint(br),
"-bufsize", fmt.Sprint(br/2),
"-s", fmt.Sprintf("%dx%d", r.config.Width, r.config.Height),
"-",
)
@ -172,39 +180,76 @@ func (r *Revid) setupInputForFile() (func() error, error) {
// startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP
// client is created from which RTP packets containing either h264/h265 can be
// read by the selected lexer.
//
// TODO(saxon): this function should really be startGeoVision. It's much too
// specific to be called startRTSPCamera.
func (r *Revid) startRTSPCamera() (func() error, error) {
rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL)
r.config.Logger.Log(logger.Info, pkg+"starting geovision...")
err := gvctrl.Set(
r.config.CameraIP,
gvctrl.CodecOut(
map[uint8]gvctrl.Codec{
codecutil.H264: gvctrl.CodecH264,
codecutil.H265: gvctrl.CodecH265,
codecutil.MJPEG: gvctrl.CodecMJPEG,
}[r.config.InputCodec],
),
gvctrl.Height(int(r.config.Height)),
gvctrl.FrameRate(int(r.config.FrameRate)),
gvctrl.VariableBitrate(r.config.VBR),
gvctrl.VBRQuality(
map[quality]gvctrl.Quality{
qualityStandard: gvctrl.QualityStandard,
qualityFair: gvctrl.QualityFair,
qualityGood: gvctrl.QualityGood,
qualityGreat: gvctrl.QualityGreat,
qualityExcellent: gvctrl.QualityExcellent,
}[r.config.VBRQuality],
),
gvctrl.VBRBitrate(r.config.VBRBitrate),
gvctrl.CBRBitrate(int(r.config.Bitrate)),
gvctrl.Refresh(float64(r.config.MinFrames)/float64(r.config.FrameRate)),
)
if err != nil {
return nil, fmt.Errorf("could not set IPCamera settings: %w", err)
}
r.config.Logger.Log(logger.Info, pkg+"completed geovision configuration")
time.Sleep(5 * time.Second)
rtspClt, local, remote, err := rtsp.NewClient("rtsp://" + ipCamUser + ":" + ipCamPass + "@" + r.config.CameraIP + ":8554/" + "CH002.sdp")
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"created RTSP client")
resp, err := rtspClt.Options()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP OPTIONS response", "response", resp.String())
r.config.Logger.Log(logger.Debug, pkg+"RTSP OPTIONS response", "response", resp.String())
resp, err = rtspClt.Describe()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP DESCRIBE response", "response", resp.String())
r.config.Logger.Log(logger.Debug, pkg+"RTSP DESCRIBE response", "response", resp.String())
resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort))
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP SETUP response", "response", resp.String())
r.config.Logger.Log(logger.Debug, pkg+"RTSP SETUP response", "response", resp.String())
rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp)
if err != nil {
return nil, err
}
resp, err = rtspClt.Play()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String())
r.config.Logger.Log(logger.Info, pkg+"RTSP session setup complete")
rtpClt, err := rtp.NewClient(rtpCltAddr)
if err != nil {
@ -216,6 +261,8 @@ func (r *Revid) startRTSPCamera() (func() error, error) {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTCP and RTP clients created")
// Check errors from RTCP client until it has stopped running.
go func() {
for {
@ -231,13 +278,35 @@ func (r *Revid) startRTSPCamera() (func() error, error) {
// Start the RTCP client.
rtcpClt.Start()
r.config.Logger.Log(logger.Info, pkg+"RTCP client started")
// Start reading data from the RTP client.
r.wg.Add(1)
go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate))
r.config.Logger.Log(logger.Info, pkg+"started input processor")
resp, err = rtspClt.Play()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Debug, pkg+"RTSP server PLAY response", "response", resp.String())
r.config.Logger.Log(logger.Info, pkg+"play requested, now receiving stream")
return func() error {
rtspClt.Close()
err := rtpClt.Close()
if err != nil {
return fmt.Errorf("could not close RTP client: %w", err)
}
err = rtspClt.Close()
if err != nil {
return fmt.Errorf("could not close RTSP client: %w", err)
}
rtcpClt.Stop()
r.config.Logger.Log(logger.Info, pkg+"RTP, RTSP and RTCP clients stopped and closed")
return nil
}, nil
}
@ -274,8 +343,7 @@ func parseSvrRTCPPort(resp rtsp.Response) (int, error) {
// processFrom is run as a routine to read from a input data source, lex and
// then send individual access units to revid's encoders.
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.encoders, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.config.Logger.Log(logger.Info, pkg+"finished lexing")
r.wg.Done()
}

View File

@ -134,17 +134,12 @@ func (r *Revid) Config() Config {
return r.config
}
// TODO(Saxon): put more thought into error severity.
// TODO(Saxon): put more thought into error severity and how to handle these.
func (r *Revid) handleErrors() {
for {
err := <-r.err
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
r.Stop()
err = r.Start()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to restart revid", "error", err.Error())
}
}
}
}
@ -362,7 +357,6 @@ func (r *Revid) Start() error {
}
r.closeInput, err = r.setupInput()
if err != nil {
r.Stop()
return err
}
r.running = true
@ -394,12 +388,17 @@ func (r *Revid) Stop() {
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
}
r.config.Logger.Log(logger.Info, pkg+"closed pipeline")
if r.cmd != nil && r.cmd.Process != nil {
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
r.cmd.Process.Kill()
}
r.config.Logger.Log(logger.Info, pkg+"waiting for routines to close")
r.wg.Wait()
r.config.Logger.Log(logger.Info, pkg+"revid stopped")
r.running = false
}
@ -423,6 +422,13 @@ func (r *Revid) Update(vars map[string]string) error {
//look through the vars and update revid where needed
for key, value := range vars {
switch key {
case "Input":
v, ok := map[string]uint8{"raspivid": Raspivid, "rtsp": RTSP}[strings.ToLower(value)]
if !ok {
r.config.Logger.Log(logger.Warning, pkg+"invalid input var", "value", value)
break
}
r.config.Input = v
case "Saturation":
s, err := strconv.ParseInt(value, 10, 0)
if err != nil {
@ -618,6 +624,27 @@ func (r *Revid) Update(vars map[string]string) error {
break
}
r.config.MTSRBWriteTimeout = v
case "VBR":
v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)]
if !ok {
r.config.Logger.Log(logger.Warning, pkg+"invalid VBR var", "value", value)
break
}
r.config.VBR = v
case "VBRQuality":
v, ok := map[string]quality{"standard": qualityStandard, "fair": qualityFair, "good": qualityGood, "great": qualityGreat, "excellent": qualityExcellent}[strings.ToLower(value)]
if !ok {
r.config.Logger.Log(logger.Warning, pkg+"invalid VBRQuality var", "value", value)
break
}
r.config.VBRQuality = v
case "VBRBitrate":
v, err := strconv.Atoi(value)
if err != nil || v <= 0 {
r.config.Logger.Log(logger.Warning, pkg+"invalid VBRBitrate var", "value", value)
break
}
r.config.VBRBitrate = v
}
}
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))

View File

@ -219,6 +219,7 @@ func (s *mtsSender) output() {
chunk = nil
continue
}
s.log(logger.Debug, pkg+"mtsSender: writing")
_, err = s.dst.Write(chunk.Bytes())
if err != nil {
s.repairer.Failed()