revid: move processFrom func and deletec inputs.go file

inputs.go can now go considering it only held "start functions" which have been replaced
by the AVDevice and it's implementations.
This commit is contained in:
Saxon 2019-11-12 16:06:04 +10:30
parent dec39a3636
commit 866e398496
2 changed files with 8 additions and 358 deletions

View File

@ -1,358 +0,0 @@
/*
DESCRIPTION
inputs.go contains code for interfacing with various revid inputs to obtain
input data streams.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Trek Hopton <trek@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"strconv"
"strings"
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
gvconfig "bitbucket.org/ausocean/av/device/geovision/config"
"bitbucket.org/ausocean/av/protocol/rtcp"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/av/protocol/rtsp"
avconfig "bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logger"
)
// TODO: remove this when config has configurable user and pass.
const (
ipCamUser = "admin"
ipCamPass = "admin"
)
// Constants for real time clients.
const (
rtpPort = 60000
rtcpPort = 60001
defaultServerRTCPPort = 17301
)
// startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() (func() error, error) {
r.cfg.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0"
args := []string{
"--output", "-",
"--nopreview",
"--timeout", disabled,
"--width", fmt.Sprint(r.cfg.Width),
"--height", fmt.Sprint(r.cfg.Height),
"--bitrate", fmt.Sprint(r.cfg.Bitrate * 1000), // Convert from kbps to bps.
"--framerate", fmt.Sprint(r.cfg.FrameRate),
"--rotation", fmt.Sprint(r.cfg.Rotation),
"--brightness", fmt.Sprint(r.cfg.Brightness),
"--saturation", fmt.Sprint(r.cfg.Saturation),
"--exposure", fmt.Sprint(r.cfg.Exposure),
"--awb", fmt.Sprint(r.cfg.AutoWhiteBalance),
}
if r.cfg.FlipHorizontal {
args = append(args, "--hflip")
}
if r.cfg.FlipVertical {
args = append(args, "--vflip")
}
if r.cfg.FlipHorizontal {
args = append(args, "--hflip")
}
switch r.cfg.InputCodec {
default:
return nil, fmt.Errorf("revid: invalid input codec: %v", r.cfg.InputCodec)
case codecutil.H264:
args = append(args,
"--codec", "H264",
"--inline",
"--intra", fmt.Sprint(r.cfg.MinFrames),
)
if r.cfg.VBR {
args = append(args, "-qp", fmt.Sprint(r.cfg.Quantization))
}
case codecutil.MJPEG:
args = append(args, "--codec", "MJPEG")
}
r.cfg.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " "))
r.cmd = exec.Command("raspivid", args...)
stdout, err := r.cmd.StdoutPipe()
if err != nil {
return nil, err
}
err = r.cmd.Start()
if err != nil {
return nil, fmt.Errorf("could not start raspivid command: %w", err)
}
r.wg.Add(1)
go r.processFrom(stdout, 0)
return nil, nil
}
// startV4l sets up webcam input and starts the revid.processFrom routine.
func (r *Revid) startV4L() (func() error, error) {
const defaultVideo = "/dev/video0"
r.cfg.Logger.Log(logger.Info, pkg+"starting webcam")
if r.cfg.InputPath == "" {
r.cfg.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo)
r.cfg.InputPath = defaultVideo
}
args := []string{
"-i", r.cfg.InputPath,
"-f", "h264",
"-r", fmt.Sprint(r.cfg.FrameRate),
}
br := r.cfg.Bitrate * 1000
args = append(args,
"-b:v", fmt.Sprint(br),
"-maxrate", fmt.Sprint(br),
"-bufsize", fmt.Sprint(br/2),
"-s", fmt.Sprintf("%dx%d", r.cfg.Width, r.cfg.Height),
"-",
)
r.cfg.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
r.cmd = exec.Command("ffmpeg", args...)
stdout, err := r.cmd.StdoutPipe()
if err != nil {
return nil, nil
}
err = r.cmd.Start()
if err != nil {
r.cfg.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error())
return nil, nil
}
r.wg.Add(1)
go r.processFrom(stdout, time.Duration(0))
return nil, nil
}
// setupInputForFile sets up input from file and starts the revid.processFrom
// routine.
func (r *Revid) setupInputForFile() (func() error, error) {
f, err := os.Open(r.cfg.InputPath)
if err != nil {
r.cfg.Logger.Log(logger.Error, err.Error())
r.Stop()
return nil, err
}
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1)
go r.processFrom(f, 0)
return func() error { return f.Close() }, nil
}
// startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP
// client is created from which RTP packets containing either h264/h265 can be
// read by the selected lexer.
//
// TODO(saxon): this function should really be startGeoVision. It's much too
// specific to be called startRTSPCamera.
func (r *Revid) startRTSPCamera() (func() error, error) {
r.cfg.Logger.Log(logger.Info, pkg+"starting geovision...")
err := gvconfig.Set(
r.cfg.CameraIP,
gvconfig.Channel(r.cfg.CameraChan),
gvconfig.CodecOut(
map[uint8]gvconfig.Codec{
codecutil.H264: gvconfig.CodecH264,
codecutil.H265: gvconfig.CodecH265,
codecutil.MJPEG: gvconfig.CodecMJPEG,
}[r.cfg.InputCodec],
),
gvconfig.Height(int(r.cfg.Height)),
gvconfig.FrameRate(int(r.cfg.FrameRate)),
gvconfig.VariableBitrate(r.cfg.VBR),
gvconfig.VBRQuality(
map[avconfig.Quality]gvconfig.Quality{
avconfig.QualityStandard: gvconfig.QualityStandard,
avconfig.QualityFair: gvconfig.QualityFair,
avconfig.QualityGood: gvconfig.QualityGood,
avconfig.QualityGreat: gvconfig.QualityGreat,
avconfig.QualityExcellent: gvconfig.QualityExcellent,
}[r.cfg.VBRQuality],
),
gvconfig.VBRBitrate(r.cfg.VBRBitrate),
gvconfig.CBRBitrate(int(r.cfg.Bitrate)),
gvconfig.Refresh(float64(r.cfg.MinFrames)/float64(r.cfg.FrameRate)),
)
if err != nil {
return nil, fmt.Errorf("could not set IPCamera settings: %w", err)
}
r.cfg.Logger.Log(logger.Info, pkg+"completed geovision configuration")
time.Sleep(5 * time.Second)
rtspClt, local, remote, err := rtsp.NewClient("rtsp://" + ipCamUser + ":" + ipCamPass + "@" + r.cfg.CameraIP + ":8554/" + "CH002.sdp")
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Info, pkg+"created RTSP client")
resp, err := rtspClt.Options()
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Debug, pkg+"RTSP OPTIONS response", "response", resp.String())
resp, err = rtspClt.Describe()
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Debug, pkg+"RTSP DESCRIBE response", "response", resp.String())
resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort))
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Debug, pkg+"RTSP SETUP response", "response", resp.String())
rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp)
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Info, pkg+"RTSP session setup complete")
rtpClt, err := rtp.NewClient(rtpCltAddr)
if err != nil {
return nil, err
}
rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.cfg.Logger.Log)
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Info, pkg+"RTCP and RTP clients created")
// Check errors from RTCP client until it has stopped running.
go func() {
for {
err, ok := <-rtcpClt.Err()
if ok {
r.cfg.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error())
} else {
return
}
}
}()
// Start the RTCP client.
rtcpClt.Start()
r.cfg.Logger.Log(logger.Info, pkg+"RTCP client started")
// Start reading data from the RTP client.
r.wg.Add(1)
go r.processFrom(rtpClt, time.Second/time.Duration(r.cfg.FrameRate))
r.cfg.Logger.Log(logger.Info, pkg+"started input processor")
resp, err = rtspClt.Play()
if err != nil {
return nil, err
}
r.cfg.Logger.Log(logger.Debug, pkg+"RTSP server PLAY response", "response", resp.String())
r.cfg.Logger.Log(logger.Info, pkg+"play requested, now receiving stream")
return func() error {
err := rtpClt.Close()
if err != nil {
return fmt.Errorf("could not close RTP client: %w", err)
}
err = rtspClt.Close()
if err != nil {
return fmt.Errorf("could not close RTSP client: %w", err)
}
rtcpClt.Stop()
r.cfg.Logger.Log(logger.Info, pkg+"RTP, RTSP and RTCP clients stopped and closed")
return nil
}, nil
}
// formAddrs is a helper function to form the addresses for the RTP client,
// RTCP client, and the RTSP server's RTCP addr using the local, remote addresses
// of the RTSP conn, and the SETUP method response.
func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) {
svrRTCPPort, err := parseSvrRTCPPort(setupResp)
if err != nil {
return "", "", "", err
}
rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort)
rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort)
rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort)
return
}
// parseServerRTCPPort is a helper function to get the RTSP server's RTCP port.
func parseSvrRTCPPort(resp rtsp.Response) (int, error) {
transport := resp.Header.Get("Transport")
for _, p := range strings.Split(transport, ";") {
if strings.Contains(p, "server_port") {
port, err := strconv.Atoi(strings.Split(p, "-")[1])
if err != nil {
return 0, err
}
return port, nil
}
}
return 0, errors.New("SETUP response did not provide RTCP port")
}
// processFrom is run as a routine to read from a input data source, lex and
// then send individual access units to revid's encoders.
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.err <- r.lexTo(r.encoders, read, delay)
r.cfg.Logger.Log(logger.Info, pkg+"finished lexing")
r.wg.Done()
}

View File

@ -703,3 +703,11 @@ func (r *Revid) Update(vars map[string]string) error {
r.cfg.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.cfg))
return nil
}
// processFrom is run as a routine to read from a input data source, lex and
// then send individual access units to revid's encoders.
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.err <- r.lexTo(r.encoders, read, delay)
r.cfg.Logger.Log(logger.Info, pkg+"finished lexing")
r.wg.Done()
}