From 866e398496867998f04db8666b88eed42adc9a7a Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 12 Nov 2019 16:06:04 +1030 Subject: [PATCH] revid: move processFrom func and deletec inputs.go file inputs.go can now go considering it only held "start functions" which have been replaced by the AVDevice and it's implementations. --- revid/inputs.go | 358 ------------------------------------------------ revid/revid.go | 8 ++ 2 files changed, 8 insertions(+), 358 deletions(-) delete mode 100644 revid/inputs.go diff --git a/revid/inputs.go b/revid/inputs.go deleted file mode 100644 index 124d800b..00000000 --- a/revid/inputs.go +++ /dev/null @@ -1,358 +0,0 @@ -/* -DESCRIPTION - inputs.go contains code for interfacing with various revid inputs to obtain - input data streams. - -AUTHORS - Saxon A. Nelson-Milton - Alan Noble - Dan Kortschak - Trek Hopton - -LICENSE - revid is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package revid - -import ( - "errors" - "fmt" - "io" - "net" - "os" - "os/exec" - "strconv" - "strings" - "time" - - "bitbucket.org/ausocean/av/codec/codecutil" - gvconfig "bitbucket.org/ausocean/av/device/geovision/config" - "bitbucket.org/ausocean/av/protocol/rtcp" - "bitbucket.org/ausocean/av/protocol/rtp" - "bitbucket.org/ausocean/av/protocol/rtsp" - avconfig "bitbucket.org/ausocean/av/revid/config" - "bitbucket.org/ausocean/utils/logger" -) - -// TODO: remove this when config has configurable user and pass. -const ( - ipCamUser = "admin" - ipCamPass = "admin" -) - -// Constants for real time clients. -const ( - rtpPort = 60000 - rtcpPort = 60001 - defaultServerRTCPPort = 17301 -) - -// startRaspivid sets up things for input from raspivid i.e. starts -// a raspivid process and pipes it's data output. -func (r *Revid) startRaspivid() (func() error, error) { - r.cfg.Logger.Log(logger.Info, pkg+"starting raspivid") - - const disabled = "0" - args := []string{ - "--output", "-", - "--nopreview", - "--timeout", disabled, - "--width", fmt.Sprint(r.cfg.Width), - "--height", fmt.Sprint(r.cfg.Height), - "--bitrate", fmt.Sprint(r.cfg.Bitrate * 1000), // Convert from kbps to bps. - "--framerate", fmt.Sprint(r.cfg.FrameRate), - "--rotation", fmt.Sprint(r.cfg.Rotation), - "--brightness", fmt.Sprint(r.cfg.Brightness), - "--saturation", fmt.Sprint(r.cfg.Saturation), - "--exposure", fmt.Sprint(r.cfg.Exposure), - "--awb", fmt.Sprint(r.cfg.AutoWhiteBalance), - } - - if r.cfg.FlipHorizontal { - args = append(args, "--hflip") - } - - if r.cfg.FlipVertical { - args = append(args, "--vflip") - } - if r.cfg.FlipHorizontal { - args = append(args, "--hflip") - } - - switch r.cfg.InputCodec { - default: - return nil, fmt.Errorf("revid: invalid input codec: %v", r.cfg.InputCodec) - case codecutil.H264: - args = append(args, - "--codec", "H264", - "--inline", - "--intra", fmt.Sprint(r.cfg.MinFrames), - ) - if r.cfg.VBR { - args = append(args, "-qp", fmt.Sprint(r.cfg.Quantization)) - } - case codecutil.MJPEG: - args = append(args, "--codec", "MJPEG") - } - r.cfg.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) - r.cmd = exec.Command("raspivid", args...) - - stdout, err := r.cmd.StdoutPipe() - if err != nil { - return nil, err - } - err = r.cmd.Start() - if err != nil { - return nil, fmt.Errorf("could not start raspivid command: %w", err) - } - - r.wg.Add(1) - go r.processFrom(stdout, 0) - return nil, nil -} - -// startV4l sets up webcam input and starts the revid.processFrom routine. -func (r *Revid) startV4L() (func() error, error) { - const defaultVideo = "/dev/video0" - - r.cfg.Logger.Log(logger.Info, pkg+"starting webcam") - if r.cfg.InputPath == "" { - r.cfg.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo) - r.cfg.InputPath = defaultVideo - } - - args := []string{ - "-i", r.cfg.InputPath, - "-f", "h264", - "-r", fmt.Sprint(r.cfg.FrameRate), - } - - br := r.cfg.Bitrate * 1000 - args = append(args, - "-b:v", fmt.Sprint(br), - "-maxrate", fmt.Sprint(br), - "-bufsize", fmt.Sprint(br/2), - "-s", fmt.Sprintf("%dx%d", r.cfg.Width, r.cfg.Height), - "-", - ) - - r.cfg.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) - r.cmd = exec.Command("ffmpeg", args...) - - stdout, err := r.cmd.StdoutPipe() - if err != nil { - return nil, nil - } - - err = r.cmd.Start() - if err != nil { - r.cfg.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) - return nil, nil - } - - r.wg.Add(1) - go r.processFrom(stdout, time.Duration(0)) - return nil, nil -} - -// setupInputForFile sets up input from file and starts the revid.processFrom -// routine. -func (r *Revid) setupInputForFile() (func() error, error) { - f, err := os.Open(r.cfg.InputPath) - if err != nil { - r.cfg.Logger.Log(logger.Error, err.Error()) - r.Stop() - return nil, err - } - - // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - r.wg.Add(1) - go r.processFrom(f, 0) - return func() error { return f.Close() }, nil -} - -// startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP -// client is created from which RTP packets containing either h264/h265 can be -// read by the selected lexer. -// -// TODO(saxon): this function should really be startGeoVision. It's much too -// specific to be called startRTSPCamera. -func (r *Revid) startRTSPCamera() (func() error, error) { - r.cfg.Logger.Log(logger.Info, pkg+"starting geovision...") - - err := gvconfig.Set( - r.cfg.CameraIP, - gvconfig.Channel(r.cfg.CameraChan), - gvconfig.CodecOut( - map[uint8]gvconfig.Codec{ - codecutil.H264: gvconfig.CodecH264, - codecutil.H265: gvconfig.CodecH265, - codecutil.MJPEG: gvconfig.CodecMJPEG, - }[r.cfg.InputCodec], - ), - gvconfig.Height(int(r.cfg.Height)), - gvconfig.FrameRate(int(r.cfg.FrameRate)), - gvconfig.VariableBitrate(r.cfg.VBR), - gvconfig.VBRQuality( - map[avconfig.Quality]gvconfig.Quality{ - avconfig.QualityStandard: gvconfig.QualityStandard, - avconfig.QualityFair: gvconfig.QualityFair, - avconfig.QualityGood: gvconfig.QualityGood, - avconfig.QualityGreat: gvconfig.QualityGreat, - avconfig.QualityExcellent: gvconfig.QualityExcellent, - }[r.cfg.VBRQuality], - ), - gvconfig.VBRBitrate(r.cfg.VBRBitrate), - gvconfig.CBRBitrate(int(r.cfg.Bitrate)), - gvconfig.Refresh(float64(r.cfg.MinFrames)/float64(r.cfg.FrameRate)), - ) - if err != nil { - return nil, fmt.Errorf("could not set IPCamera settings: %w", err) - } - - r.cfg.Logger.Log(logger.Info, pkg+"completed geovision configuration") - - time.Sleep(5 * time.Second) - - rtspClt, local, remote, err := rtsp.NewClient("rtsp://" + ipCamUser + ":" + ipCamPass + "@" + r.cfg.CameraIP + ":8554/" + "CH002.sdp") - if err != nil { - return nil, err - } - - r.cfg.Logger.Log(logger.Info, pkg+"created RTSP client") - - resp, err := rtspClt.Options() - if err != nil { - return nil, err - } - r.cfg.Logger.Log(logger.Debug, pkg+"RTSP OPTIONS response", "response", resp.String()) - - resp, err = rtspClt.Describe() - if err != nil { - return nil, err - } - r.cfg.Logger.Log(logger.Debug, pkg+"RTSP DESCRIBE response", "response", resp.String()) - - resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) - if err != nil { - return nil, err - } - r.cfg.Logger.Log(logger.Debug, pkg+"RTSP SETUP response", "response", resp.String()) - - rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp) - if err != nil { - return nil, err - } - - r.cfg.Logger.Log(logger.Info, pkg+"RTSP session setup complete") - - rtpClt, err := rtp.NewClient(rtpCltAddr) - if err != nil { - return nil, err - } - - rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.cfg.Logger.Log) - if err != nil { - return nil, err - } - - r.cfg.Logger.Log(logger.Info, pkg+"RTCP and RTP clients created") - - // Check errors from RTCP client until it has stopped running. - go func() { - for { - err, ok := <-rtcpClt.Err() - if ok { - r.cfg.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) - } else { - return - } - } - }() - - // Start the RTCP client. - rtcpClt.Start() - - r.cfg.Logger.Log(logger.Info, pkg+"RTCP client started") - - // Start reading data from the RTP client. - r.wg.Add(1) - go r.processFrom(rtpClt, time.Second/time.Duration(r.cfg.FrameRate)) - - r.cfg.Logger.Log(logger.Info, pkg+"started input processor") - - resp, err = rtspClt.Play() - if err != nil { - return nil, err - } - r.cfg.Logger.Log(logger.Debug, pkg+"RTSP server PLAY response", "response", resp.String()) - r.cfg.Logger.Log(logger.Info, pkg+"play requested, now receiving stream") - - return func() error { - err := rtpClt.Close() - if err != nil { - return fmt.Errorf("could not close RTP client: %w", err) - } - - err = rtspClt.Close() - if err != nil { - return fmt.Errorf("could not close RTSP client: %w", err) - } - - rtcpClt.Stop() - - r.cfg.Logger.Log(logger.Info, pkg+"RTP, RTSP and RTCP clients stopped and closed") - return nil - }, nil -} - -// formAddrs is a helper function to form the addresses for the RTP client, -// RTCP client, and the RTSP server's RTCP addr using the local, remote addresses -// of the RTSP conn, and the SETUP method response. -func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) { - svrRTCPPort, err := parseSvrRTCPPort(setupResp) - if err != nil { - return "", "", "", err - } - rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort) - rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) - rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort) - return -} - -// parseServerRTCPPort is a helper function to get the RTSP server's RTCP port. -func parseSvrRTCPPort(resp rtsp.Response) (int, error) { - transport := resp.Header.Get("Transport") - for _, p := range strings.Split(transport, ";") { - if strings.Contains(p, "server_port") { - port, err := strconv.Atoi(strings.Split(p, "-")[1]) - if err != nil { - return 0, err - } - return port, nil - } - } - return 0, errors.New("SETUP response did not provide RTCP port") -} - -// processFrom is run as a routine to read from a input data source, lex and -// then send individual access units to revid's encoders. -func (r *Revid) processFrom(read io.Reader, delay time.Duration) { - r.err <- r.lexTo(r.encoders, read, delay) - r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") - r.wg.Done() -} diff --git a/revid/revid.go b/revid/revid.go index dd3eb6c0..f5212486 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -703,3 +703,11 @@ func (r *Revid) Update(vars map[string]string) error { r.cfg.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.cfg)) return nil } + +// processFrom is run as a routine to read from a input data source, lex and +// then send individual access units to revid's encoders. +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { + r.err <- r.lexTo(r.encoders, read, delay) + r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") + r.wg.Done() +}