diff --git a/.circleci/config.yml b/.circleci/config.yml index b25ea562..85bfbcc7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,7 +3,7 @@ jobs: build: docker: # CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/ - - image: circleci/golang:1.12 + - image: circleci/golang:1.13 environment: GO111MODULE: "on" diff --git a/cmd/audio-netsender/main.go b/cmd/audio-netsender/main.go index b4cbfc12..ac9e662d 100644 --- a/cmd/audio-netsender/main.go +++ b/cmd/audio-netsender/main.go @@ -108,7 +108,7 @@ func main() { } logSender := smartlogger.New(logPath) - log = logger.New(int8(logLevel), &logSender.LogRoller) + log = logger.New(int8(logLevel), &logSender.LogRoller, true) log.Log(logger.Info, "log-netsender: Logger Initialized") if !validLogLevel { log.Log(logger.Error, "invalid log level was defaulted to Info") diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 5eca3ff7..ae5e76af 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -109,7 +109,7 @@ func handleFlags() revid.Config { cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") inputCodecPtr = flag.String("InputCodec", "H264", "The codec of the input: H264, Mjpeg, PCM, ADPCM") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, v4l, Audio, RTSP") - rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") + cameraIPPtr = flag.String("CameraIP", "", "The IP of the RTSP server") verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Debug, Info, Warning, Error, Fatal") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: : (port is generally 6970-6999)") logPathPtr = flag.String("LogPath", defaultLogPath, "The log path") @@ -158,7 +158,7 @@ func handleFlags() revid.Config { cfg.LogLevel = defaultLogVerbosity } - log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller) + log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller, true) cfg.Logger = log @@ -231,7 +231,7 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } - cfg.RTSPURL = *rtspURLPtr + cfg.CameraIP = *cameraIPPtr cfg.Rotation = *rotationPtr cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipVertical = *verticalFlipPtr diff --git a/codec/h264/extract.go b/codec/h264/extract.go index f432de31..7964600a 100644 --- a/codec/h264/extract.go +++ b/codec/h264/extract.go @@ -143,7 +143,6 @@ func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) e } } } - return nil } // handleSTAPA parses NAL units from an aggregation packet and writes @@ -207,7 +206,7 @@ func (e *Extracter) handleFUA(d []byte) { func (e *Extracter) writeWithPrefix(d []byte) { e.toWrite = append(e.toWrite, d...) curType, _ := NALType(e.toWrite) - if e.buf.Len() != 0 && (curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) { + if e.buf.Len() != 0 && (curType == h264dec.NALTypeSPS || curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) { e.buf.WriteTo(e.dst) e.buf.Reset() e.buf.Write(aud) diff --git a/go.mod b/go.mod index c0553442..922e40a9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( bitbucket.org/ausocean/iot v1.2.7 bitbucket.org/ausocean/test v0.0.0-20190821085226-7a524f2344ba - bitbucket.org/ausocean/utils v1.2.9 + bitbucket.org/ausocean/utils v1.2.10 github.com/BurntSushi/toml v0.3.1 // indirect github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 @@ -13,5 +13,6 @@ require ( github.com/mewkiz/flac v1.0.5 github.com/pkg/errors v0.8.1 github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e + gocv.io/x/gocv v0.21.0 gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index c7035a5d..231895b1 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,20 @@ bitbucket.org/ausocean/av v0.0.0-20190416003121-6ee286e98874/go.mod h1:DxZEprrNNQ2slHKAQVUHryDaWc5CbjxyHAvomhzg+AE= +bitbucket.org/ausocean/av/input/gvctrl v0.0.0-20191017223116-ce6c12cce8cd h1:L99pvZZtdy3v54ym6GswYi8SOGgz+4Tr8hiIOI+nSiQ= +bitbucket.org/ausocean/av/input/gvctrl v0.0.0-20191017223116-ce6c12cce8cd/go.mod h1:Hg522DOVaj23J7CIxknCxmNsLGdg1iZ+Td1FDcTOdLQ= bitbucket.org/ausocean/iot v1.2.4/go.mod h1:5HVLgPHccW2PxS7WDUQO6sKWMgk3Vfze/7d5bHs8EWU= bitbucket.org/ausocean/iot v1.2.6 h1:KAAY1KZDbyOpoKajT1dM8BawupHiW9hUOelseSV1Ptc= bitbucket.org/ausocean/iot v1.2.6/go.mod h1:71AYHh8yGZ8XyzDBskwIWMF+8E8ORagXpXE24wlhoE0= bitbucket.org/ausocean/iot v1.2.7 h1:dZgrmVtuXnzHgybDthn0bYgAJms9euTONXBsqsx9g5M= bitbucket.org/ausocean/iot v1.2.7/go.mod h1:aAWgPo2f8sD2OPmxae1E5/iD9+tKY/iW4pcQMQXUvHM= +bitbucket.org/ausocean/test v0.0.0-20190821085226-7a524f2344ba/go.mod h1:MbKtu9Pu8l3hiVGX6ep8S1VwAVY5uCbifCFOYsm914w= bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.6/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.8 h1:hyxAIqYBqjqCguG+6A/kKyrAihyeUt2LziZg6CH0gLU= bitbucket.org/ausocean/utils v1.2.8/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.9 h1:g45C6KCNvCLOGFv+ZnmDbQOOdnwpIsvzuNOD141CTVI= bitbucket.org/ausocean/utils v1.2.9/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= +bitbucket.org/ausocean/utils v1.2.10 h1:JTS7n+K+0o/FQFWKjdGgA1ElZ4TQu9aHX3wTJXOayXw= +bitbucket.org/ausocean/utils v1.2.10/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= @@ -55,6 +60,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +gocv.io/x/gocv v0.21.0 h1:dVjagrupZrfCRY0qPEaYWgoNMRpBel6GYDH4mvQOK8Y= +gocv.io/x/gocv v0.21.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs= golang.org/x/sys v0.0.0-20190305064518-30e92a19ae4a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/input/audio/audio_test.go b/input/audio/audio_test.go index 5618c63a..5389524b 100644 --- a/input/audio/audio_test.go +++ b/input/audio/audio_test.go @@ -47,7 +47,7 @@ func TestDevice(t *testing.T) { n := 2 // Number of periods to wait while recording. // Create a new audio Device, start, read/lex, and then stop it. - l := logger.New(logger.Debug, os.Stderr) + l := logger.New(logger.Debug, os.Stderr, true) ai, err := NewDevice(ac, l) // If there was an error opening the device, skip this test. if _, ok := err.(OpenError); ok { diff --git a/input/gvctrl/go.mod b/input/gvctrl/go.mod deleted file mode 100644 index 1ffc4877..00000000 --- a/input/gvctrl/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module bitbucket.org/ausocean/av/input/gvctrl - -go 1.12 diff --git a/input/gvctrl/request.go b/input/gvctrl/request.go index 3dd72b93..2fa5d9cb 100644 --- a/input/gvctrl/request.go +++ b/input/gvctrl/request.go @@ -52,7 +52,7 @@ const ( // from which (as well as username and password) two hashes are generated. // The generated hex is encoded into a url encoded form and returned as a string. func getLogin(c *http.Client, id, host string) (string, error) { - req, err := http.NewRequest("GET", "https://"+host+loginSubDir, nil) + req, err := http.NewRequest("GET", "http://"+host+loginSubDir, nil) if err != nil { return "", fmt.Errorf("can't create GET request for log-in page: %v", err) } diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index e8418b0d..80fe3fc5 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -28,8 +28,10 @@ LICENSE package rtp import ( + "fmt" "net" "sync" + "time" ) // Client describes an RTP client that can receive an RTP stream and implements @@ -58,6 +60,7 @@ func NewClient(addr string) (*Client, error) { if err != nil { return nil, err } + return c, nil } @@ -81,6 +84,11 @@ func (c *Client) Read(p []byte) (int, error) { return n, err } +// Close will close the RTP client's connection. +func (c *Client) Close() error { + return c.r.PacketConn.Close() +} + // setSequence sets the most recently received sequence number, and updates the // cycles count if the sequence number has rolled over. func (c *Client) setSequence(s uint16) { @@ -113,6 +121,11 @@ type PacketReader struct { // Read implements io.Reader. func (r PacketReader) Read(b []byte) (int, error) { + const readTimeout = 5 * time.Second + err := r.PacketConn.SetReadDeadline(time.Now().Add(readTimeout)) + if err != nil { + return 0, fmt.Errorf("could not set read deadline for PacketConn: %w", err) + } n, _, err := r.PacketConn.ReadFrom(b) return n, err } diff --git a/revid/config.go b/revid/config.go index 29acb42c..b00cd63e 100644 --- a/revid/config.go +++ b/revid/config.go @@ -63,6 +63,19 @@ var AutoWhiteBalanceModes = [...]string{ "horizon", } +// quality represents video quality. +type quality int + +// The different video qualities that can be used for variable bitrate when +// using the GeoVision camera. +const ( + qualityStandard quality = iota + qualityFair + qualityGood + qualityGreat + qualityExcellent +) + // Enums to define inputs, outputs and codecs. const ( // Indicates no option has been set. @@ -100,8 +113,11 @@ const ( defaultInputCodec = codecutil.H264 defaultVerbosity = logger.Error defaultRtpAddr = "localhost:6970" - defaultRTSPURL = "rtsp://admin:admin@192.168.1.50:8554/CH001.sdp" - defaultBurstPeriod = 10 // Seconds + defaultCameraIP = "192.168.1.50" + defaultVBR = false + defaultVBRQuality = qualityStandard + defaultBurstPeriod = 10 // Seconds + defaultVBRBitrate = 500 // kbps // Raspivid video defaults. defaultBrightness = 50 @@ -113,7 +129,7 @@ const ( defaultMinFrames = 100 defaultClipDuration = 0 defaultQuantization = 30 - defaultBitrate = 400000 + defaultBitrate = 400 // Audio defaults. defaultAudioInputCodec = codecutil.ADPCM @@ -152,7 +168,7 @@ type Config struct { // File: // Location must be specified in InputPath field. // RTSP: - // RTSPURL must also be defined. + // CameraIP should also be defined. Input uint8 // InputCodec defines the input codec we wish to use, and therefore defines the @@ -181,9 +197,9 @@ type Config struct { // RTMP is to be used as an output. RTMPURL string - // RTSPURL specifies the RTSP server URL for RTSP input. This must be defined - // when Input is RTSP. - RTSPURL string + // CameraIP is the IP address of the camera in the case of the input camera + // being an IP camera. + CameraIP string // OutputPath defines the output destination for File output. This must be // defined if File output is to be used. @@ -204,11 +220,27 @@ type Config struct { // defined in /etc/netsender.conf. HTTPAddress string - // Quantization defines the quantization level, which may be a value between - // 0-40. This will only take effect if the Quantize field is true and if we - // are using Raspivid input. + // VBR indicates whether we wish to use constant or variable bitrate. If VBR + // is true then we will use variable bitrate, and constant bitrate otherwise. + // In the case of the Pi camera, variable bitrate quality is controlled by + // the Quantization parameter below. In the case of the GeoVision camera, + // variable bitrate quality is controlled by firstly the VBRQuality parameter + // and second the VBRBitrate parameter. + VBR bool + + // Quantization defines the quantization level, which will determine variable + // bitrate quality in the case of input from the Pi Camera. Quantization uint + // VBRQuality describes the general quality of video from the GeoVision camera + // under variable bitrate. VBRQuality can be one 5 consts defined: + // qualityStandard, qualityFair, qualityGood, qualityGreat and qualityExcellent. + VBRQuality quality + + // VBRBitrate describes maximal bitrate for the GeoVision camera when under + // variable bitrate. + VBRBitrate int + // MinFrames defines the frequency of key NAL units SPS, PPS and IDR in // number of NAL units. This will also determine the frequency of PSI if the // output container is MPEG-TS. If ClipDuration is less than MinFrames, @@ -251,7 +283,7 @@ type Config struct { Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. Height uint // Height defines the input video height Raspivid input. Width uint // Width defines the input video width Raspivid input. - Bitrate uint // Bitrate specifies the input bitrate for Raspivid input. + Bitrate uint // Bitrate specifies the bitrate for constant bitrate in kbps. FlipHorizontal bool // FlipHorizontal flips video horizontally for Raspivid input. FlipVertical bool // FlipVertial flips video vertically for Raspivid input. @@ -285,13 +317,13 @@ func (c *Config) Validate() error { c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity) } + if c.CameraIP == "" { + c.Logger.Log(logger.Info, pkg+"no CameraIP defined, defaulting", "CameraIP", defaultCameraIP) + c.CameraIP = defaultCameraIP + } + switch c.Input { - case Raspivid, V4L, File, Audio: - case RTSP: - if c.RTSPURL == "" { - c.Logger.Log(logger.Info, pkg+"no RTSPURL defined, defaulting", "RTSPURL", defaultRTSPURL) - c.RTSPURL = defaultRTSPURL - } + case Raspivid, V4L, File, Audio, RTSP: case NothingDefined: c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput @@ -481,6 +513,18 @@ func (c *Config) Validate() error { c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout } + switch c.VBRQuality { + case qualityStandard, qualityFair, qualityGood, qualityGreat, qualityExcellent: + default: + c.Logger.Log(logger.Info, pkg+"VBRQuality bad or unset, defaulting", "VBRQuality", defaultVBRQuality) + c.VBRQuality = defaultVBRQuality + } + + if c.VBRBitrate <= 0 { + c.Logger.Log(logger.Info, pkg+"VBRBitrate bad or unset, defaulting", "VBRBitrate", defaultVBRBitrate) + c.VBRBitrate = defaultVBRBitrate + } + return nil } diff --git a/revid/inputs.go b/revid/inputs.go index 4df3abb3..e83806e0 100644 --- a/revid/inputs.go +++ b/revid/inputs.go @@ -40,12 +40,19 @@ import ( "time" "bitbucket.org/ausocean/av/codec/codecutil" + "bitbucket.org/ausocean/av/input/gvctrl" "bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/utils/logger" ) +// TODO: remove this when gvctrl has configurable user and pass. +const ( + ipCamUser = "admin" + ipCamPass = "admin" +) + // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() (func() error, error) { @@ -58,7 +65,7 @@ func (r *Revid) startRaspivid() (func() error, error) { "--timeout", disabled, "--width", fmt.Sprint(r.config.Width), "--height", fmt.Sprint(r.config.Height), - "--bitrate", fmt.Sprint(r.config.Bitrate), + "--bitrate", fmt.Sprint(r.config.Bitrate * 1000), // Convert from kbps to bps. "--framerate", fmt.Sprint(r.config.FrameRate), "--rotation", fmt.Sprint(r.config.Rotation), "--brightness", fmt.Sprint(r.config.Brightness), @@ -87,7 +94,7 @@ func (r *Revid) startRaspivid() (func() error, error) { "--inline", "--intra", fmt.Sprint(r.config.MinFrames), ) - if r.config.Quantization != 0 { + if r.config.VBR { args = append(args, "-qp", fmt.Sprint(r.config.Quantization)) } case codecutil.MJPEG: @@ -102,7 +109,7 @@ func (r *Revid) startRaspivid() (func() error, error) { } err = r.cmd.Start() if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) + return nil, fmt.Errorf("could not start raspivid command: %w", err) } r.wg.Add(1) @@ -126,10 +133,11 @@ func (r *Revid) startV4L() (func() error, error) { "-r", fmt.Sprint(r.config.FrameRate), } + br := r.config.Bitrate * 1000 args = append(args, - "-b:v", fmt.Sprint(r.config.Bitrate), - "-maxrate", fmt.Sprint(r.config.Bitrate), - "-bufsize", fmt.Sprint(r.config.Bitrate/2), + "-b:v", fmt.Sprint(br), + "-maxrate", fmt.Sprint(br), + "-bufsize", fmt.Sprint(br/2), "-s", fmt.Sprintf("%dx%d", r.config.Width, r.config.Height), "-", ) @@ -172,39 +180,76 @@ func (r *Revid) setupInputForFile() (func() error, error) { // startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP // client is created from which RTP packets containing either h264/h265 can be // read by the selected lexer. +// +// TODO(saxon): this function should really be startGeoVision. It's much too +// specific to be called startRTSPCamera. func (r *Revid) startRTSPCamera() (func() error, error) { - rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL) + r.config.Logger.Log(logger.Info, pkg+"starting geovision...") + + err := gvctrl.Set( + r.config.CameraIP, + gvctrl.CodecOut( + map[uint8]gvctrl.Codec{ + codecutil.H264: gvctrl.CodecH264, + codecutil.H265: gvctrl.CodecH265, + codecutil.MJPEG: gvctrl.CodecMJPEG, + }[r.config.InputCodec], + ), + gvctrl.Height(int(r.config.Height)), + gvctrl.FrameRate(int(r.config.FrameRate)), + gvctrl.VariableBitrate(r.config.VBR), + gvctrl.VBRQuality( + map[quality]gvctrl.Quality{ + qualityStandard: gvctrl.QualityStandard, + qualityFair: gvctrl.QualityFair, + qualityGood: gvctrl.QualityGood, + qualityGreat: gvctrl.QualityGreat, + qualityExcellent: gvctrl.QualityExcellent, + }[r.config.VBRQuality], + ), + gvctrl.VBRBitrate(r.config.VBRBitrate), + gvctrl.CBRBitrate(int(r.config.Bitrate)), + gvctrl.Refresh(float64(r.config.MinFrames)/float64(r.config.FrameRate)), + ) + if err != nil { + return nil, fmt.Errorf("could not set IPCamera settings: %w", err) + } + + r.config.Logger.Log(logger.Info, pkg+"completed geovision configuration") + + time.Sleep(5 * time.Second) + + rtspClt, local, remote, err := rtsp.NewClient("rtsp://" + ipCamUser + ":" + ipCamPass + "@" + r.config.CameraIP + ":8554/" + "CH002.sdp") if err != nil { return nil, err } + r.config.Logger.Log(logger.Info, pkg+"created RTSP client") + resp, err := rtspClt.Options() if err != nil { return nil, err } - r.config.Logger.Log(logger.Info, pkg+"RTSP OPTIONS response", "response", resp.String()) + r.config.Logger.Log(logger.Debug, pkg+"RTSP OPTIONS response", "response", resp.String()) resp, err = rtspClt.Describe() if err != nil { return nil, err } - r.config.Logger.Log(logger.Info, pkg+"RTSP DESCRIBE response", "response", resp.String()) + r.config.Logger.Log(logger.Debug, pkg+"RTSP DESCRIBE response", "response", resp.String()) resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) if err != nil { return nil, err } - r.config.Logger.Log(logger.Info, pkg+"RTSP SETUP response", "response", resp.String()) + r.config.Logger.Log(logger.Debug, pkg+"RTSP SETUP response", "response", resp.String()) + rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp) if err != nil { return nil, err } - resp, err = rtspClt.Play() - if err != nil { - return nil, err - } - r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) + r.config.Logger.Log(logger.Info, pkg+"RTSP session setup complete") rtpClt, err := rtp.NewClient(rtpCltAddr) if err != nil { @@ -216,6 +261,8 @@ func (r *Revid) startRTSPCamera() (func() error, error) { return nil, err } + r.config.Logger.Log(logger.Info, pkg+"RTCP and RTP clients created") + // Check errors from RTCP client until it has stopped running. go func() { for { @@ -231,13 +278,35 @@ func (r *Revid) startRTSPCamera() (func() error, error) { // Start the RTCP client. rtcpClt.Start() + r.config.Logger.Log(logger.Info, pkg+"RTCP client started") + // Start reading data from the RTP client. r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) + r.config.Logger.Log(logger.Info, pkg+"started input processor") + + resp, err = rtspClt.Play() + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Debug, pkg+"RTSP server PLAY response", "response", resp.String()) + r.config.Logger.Log(logger.Info, pkg+"play requested, now receiving stream") + return func() error { - rtspClt.Close() + err := rtpClt.Close() + if err != nil { + return fmt.Errorf("could not close RTP client: %w", err) + } + + err = rtspClt.Close() + if err != nil { + return fmt.Errorf("could not close RTSP client: %w", err) + } + rtcpClt.Stop() + + r.config.Logger.Log(logger.Info, pkg+"RTP, RTSP and RTCP clients stopped and closed") return nil }, nil } @@ -274,8 +343,7 @@ func parseSvrRTCPPort(resp rtsp.Response) (int, error) { // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { - r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoders, read, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.config.Logger.Log(logger.Info, pkg+"finished lexing") r.wg.Done() } diff --git a/revid/revid.go b/revid/revid.go index 7a867bcd..1d9736a1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -62,7 +62,7 @@ const ( defaultServerRTCPPort = 17301 ) -const pkg = "revid:" +const pkg = "revid: " type Logger interface { SetLevel(int8) @@ -134,17 +134,12 @@ func (r *Revid) Config() Config { return r.config } -// TODO(Saxon): put more thought into error severity. +// TODO(Saxon): put more thought into error severity and how to handle these. func (r *Revid) handleErrors() { for { err := <-r.err if err != nil { r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) - r.Stop() - err = r.Start() - if err != nil { - r.config.Logger.Log(logger.Error, pkg+"failed to restart revid", "error", err.Error()) - } } } } @@ -362,7 +357,6 @@ func (r *Revid) Start() error { } r.closeInput, err = r.setupInput() if err != nil { - r.Stop() return err } r.running = true @@ -394,12 +388,17 @@ func (r *Revid) Stop() { if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) } + r.config.Logger.Log(logger.Info, pkg+"closed pipeline") if r.cmd != nil && r.cmd.Process != nil { r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } + r.config.Logger.Log(logger.Info, pkg+"waiting for routines to close") + r.wg.Wait() + + r.config.Logger.Log(logger.Info, pkg+"revid stopped") r.running = false } @@ -423,6 +422,13 @@ func (r *Revid) Update(vars map[string]string) error { //look through the vars and update revid where needed for key, value := range vars { switch key { + case "Input": + v, ok := map[string]uint8{"raspivid": Raspivid, "rtsp": RTSP}[strings.ToLower(value)] + if !ok { + r.config.Logger.Log(logger.Warning, pkg+"invalid input var", "value", value) + break + } + r.config.Input = v case "Saturation": s, err := strconv.ParseInt(value, 10, 0) if err != nil { @@ -618,6 +624,27 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.MTSRBWriteTimeout = v + case "VBR": + v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)] + if !ok { + r.config.Logger.Log(logger.Warning, pkg+"invalid VBR var", "value", value) + break + } + r.config.VBR = v + case "VBRQuality": + v, ok := map[string]quality{"standard": qualityStandard, "fair": qualityFair, "good": qualityGood, "great": qualityGreat, "excellent": qualityExcellent}[strings.ToLower(value)] + if !ok { + r.config.Logger.Log(logger.Warning, pkg+"invalid VBRQuality var", "value", value) + break + } + r.config.VBRQuality = v + case "VBRBitrate": + v, err := strconv.Atoi(value) + if err != nil || v <= 0 { + r.config.Logger.Log(logger.Warning, pkg+"invalid VBRBitrate var", "value", value) + break + } + r.config.VBRBitrate = v } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) diff --git a/revid/senders.go b/revid/senders.go index 432ad9d8..d1e1c18c 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -219,6 +219,7 @@ func (s *mtsSender) output() { chunk = nil continue } + s.log(logger.Debug, pkg+"mtsSender: writing") _, err = s.dst.Write(chunk.Bytes()) if err != nil { s.repairer.Failed()