/* DESCRIPTION inputs.go contains code for interfacing with various revid inputs to obtain input data streams. AUTHORS Saxon A. Nelson-Milton Alan Noble Dan Kortschak Trek Hopton LICENSE revid is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid import ( "errors" "fmt" "io" "net" "os" "os/exec" "strconv" "strings" "time" "bitbucket.org/ausocean/av/codec/codecutil" gvconfig "bitbucket.org/ausocean/av/device/geovision/config" "bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtsp" avconfig "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/utils/logger" ) // TODO: remove this when config has configurable user and pass. const ( ipCamUser = "admin" ipCamPass = "admin" ) // Constants for real time clients. const ( rtpPort = 60000 rtcpPort = 60001 defaultServerRTCPPort = 17301 ) // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() (func() error, error) { r.cfg.Logger.Log(logger.Info, pkg+"starting raspivid") const disabled = "0" args := []string{ "--output", "-", "--nopreview", "--timeout", disabled, "--width", fmt.Sprint(r.cfg.Width), "--height", fmt.Sprint(r.cfg.Height), "--bitrate", fmt.Sprint(r.cfg.Bitrate * 1000), // Convert from kbps to bps. "--framerate", fmt.Sprint(r.cfg.FrameRate), "--rotation", fmt.Sprint(r.cfg.Rotation), "--brightness", fmt.Sprint(r.cfg.Brightness), "--saturation", fmt.Sprint(r.cfg.Saturation), "--exposure", fmt.Sprint(r.cfg.Exposure), "--awb", fmt.Sprint(r.cfg.AutoWhiteBalance), } if r.cfg.FlipHorizontal { args = append(args, "--hflip") } if r.cfg.FlipVertical { args = append(args, "--vflip") } if r.cfg.FlipHorizontal { args = append(args, "--hflip") } switch r.cfg.InputCodec { default: return nil, fmt.Errorf("revid: invalid input codec: %v", r.cfg.InputCodec) case codecutil.H264: args = append(args, "--codec", "H264", "--inline", "--intra", fmt.Sprint(r.cfg.MinFrames), ) if r.cfg.VBR { args = append(args, "-qp", fmt.Sprint(r.cfg.Quantization)) } case codecutil.MJPEG: args = append(args, "--codec", "MJPEG") } r.cfg.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) r.cmd = exec.Command("raspivid", args...) stdout, err := r.cmd.StdoutPipe() if err != nil { return nil, err } err = r.cmd.Start() if err != nil { return nil, fmt.Errorf("could not start raspivid command: %w", err) } r.wg.Add(1) go r.processFrom(stdout, 0) return nil, nil } // startV4l sets up webcam input and starts the revid.processFrom routine. func (r *Revid) startV4L() (func() error, error) { const defaultVideo = "/dev/video0" r.cfg.Logger.Log(logger.Info, pkg+"starting webcam") if r.cfg.InputPath == "" { r.cfg.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo) r.cfg.InputPath = defaultVideo } args := []string{ "-i", r.cfg.InputPath, "-f", "h264", "-r", fmt.Sprint(r.cfg.FrameRate), } br := r.cfg.Bitrate * 1000 args = append(args, "-b:v", fmt.Sprint(br), "-maxrate", fmt.Sprint(br), "-bufsize", fmt.Sprint(br/2), "-s", fmt.Sprintf("%dx%d", r.cfg.Width, r.cfg.Height), "-", ) r.cfg.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) stdout, err := r.cmd.StdoutPipe() if err != nil { return nil, nil } err = r.cmd.Start() if err != nil { r.cfg.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) return nil, nil } r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil, nil } // setupInputForFile sets up input from file and starts the revid.processFrom // routine. func (r *Revid) setupInputForFile() (func() error, error) { f, err := os.Open(r.cfg.InputPath) if err != nil { r.cfg.Logger.Log(logger.Error, err.Error()) r.Stop() return nil, err } // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. r.wg.Add(1) go r.processFrom(f, 0) return func() error { return f.Close() }, nil } // startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP // client is created from which RTP packets containing either h264/h265 can be // read by the selected lexer. // // TODO(saxon): this function should really be startGeoVision. It's much too // specific to be called startRTSPCamera. func (r *Revid) startRTSPCamera() (func() error, error) { r.cfg.Logger.Log(logger.Info, pkg+"starting geovision...") err := gvconfig.Set( r.cfg.CameraIP, gvconfig.Channel(r.cfg.CameraChan), gvconfig.CodecOut( map[uint8]gvconfig.Codec{ codecutil.H264: gvconfig.CodecH264, codecutil.H265: gvconfig.CodecH265, codecutil.MJPEG: gvconfig.CodecMJPEG, }[r.cfg.InputCodec], ), gvconfig.Height(int(r.cfg.Height)), gvconfig.FrameRate(int(r.cfg.FrameRate)), gvconfig.VariableBitrate(r.cfg.VBR), gvconfig.VBRQuality( map[avconfig.Quality]gvconfig.Quality{ avconfig.QualityStandard: gvconfig.QualityStandard, avconfig.QualityFair: gvconfig.QualityFair, avconfig.QualityGood: gvconfig.QualityGood, avconfig.QualityGreat: gvconfig.QualityGreat, avconfig.QualityExcellent: gvconfig.QualityExcellent, }[r.cfg.VBRQuality], ), gvconfig.VBRBitrate(r.cfg.VBRBitrate), gvconfig.CBRBitrate(int(r.cfg.Bitrate)), gvconfig.Refresh(float64(r.cfg.MinFrames)/float64(r.cfg.FrameRate)), ) if err != nil { return nil, fmt.Errorf("could not set IPCamera settings: %w", err) } r.cfg.Logger.Log(logger.Info, pkg+"completed geovision configuration") time.Sleep(5 * time.Second) rtspClt, local, remote, err := rtsp.NewClient("rtsp://" + ipCamUser + ":" + ipCamPass + "@" + r.cfg.CameraIP + ":8554/" + "CH002.sdp") if err != nil { return nil, err } r.cfg.Logger.Log(logger.Info, pkg+"created RTSP client") resp, err := rtspClt.Options() if err != nil { return nil, err } r.cfg.Logger.Log(logger.Debug, pkg+"RTSP OPTIONS response", "response", resp.String()) resp, err = rtspClt.Describe() if err != nil { return nil, err } r.cfg.Logger.Log(logger.Debug, pkg+"RTSP DESCRIBE response", "response", resp.String()) resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) if err != nil { return nil, err } r.cfg.Logger.Log(logger.Debug, pkg+"RTSP SETUP response", "response", resp.String()) rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp) if err != nil { return nil, err } r.cfg.Logger.Log(logger.Info, pkg+"RTSP session setup complete") rtpClt, err := rtp.NewClient(rtpCltAddr) if err != nil { return nil, err } rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.cfg.Logger.Log) if err != nil { return nil, err } r.cfg.Logger.Log(logger.Info, pkg+"RTCP and RTP clients created") // Check errors from RTCP client until it has stopped running. go func() { for { err, ok := <-rtcpClt.Err() if ok { r.cfg.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) } else { return } } }() // Start the RTCP client. rtcpClt.Start() r.cfg.Logger.Log(logger.Info, pkg+"RTCP client started") // Start reading data from the RTP client. r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.cfg.FrameRate)) r.cfg.Logger.Log(logger.Info, pkg+"started input processor") resp, err = rtspClt.Play() if err != nil { return nil, err } r.cfg.Logger.Log(logger.Debug, pkg+"RTSP server PLAY response", "response", resp.String()) r.cfg.Logger.Log(logger.Info, pkg+"play requested, now receiving stream") return func() error { err := rtpClt.Close() if err != nil { return fmt.Errorf("could not close RTP client: %w", err) } err = rtspClt.Close() if err != nil { return fmt.Errorf("could not close RTSP client: %w", err) } rtcpClt.Stop() r.cfg.Logger.Log(logger.Info, pkg+"RTP, RTSP and RTCP clients stopped and closed") return nil }, nil } // formAddrs is a helper function to form the addresses for the RTP client, // RTCP client, and the RTSP server's RTCP addr using the local, remote addresses // of the RTSP conn, and the SETUP method response. func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) { svrRTCPPort, err := parseSvrRTCPPort(setupResp) if err != nil { return "", "", "", err } rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort) rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort) return } // parseServerRTCPPort is a helper function to get the RTSP server's RTCP port. func parseSvrRTCPPort(resp rtsp.Response) (int, error) { transport := resp.Header.Get("Transport") for _, p := range strings.Split(transport, ";") { if strings.Contains(p, "server_port") { port, err := strconv.Atoi(strings.Split(p, "-")[1]) if err != nil { return 0, err } return port, nil } } return 0, errors.New("SETUP response did not provide RTCP port") } // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.err <- r.lexTo(r.encoders, read, delay) r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") r.wg.Done() }