/* NAME revid-cli - command line interface for Revid. DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton <saxon@ausocean.org> Jack Richardson <jack@ausocean.org> LICENSE revid-cli is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package main import ( "flag" "os" "runtime/pprof" "strconv" "strings" "time" "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/sds" "bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/utils/logger" ) const ( // progName is the program name for logging purposes. progName = "revid-cli" // Logging is set to INFO level. defaultLogVerbosity = logger.Info ) // Revid modes const ( normal = "Normal" paused = "Paused" burst = "Burst" ) // Other misc consts const ( netSendRetryTime = 5 * time.Second defaultRunDuration = 24 * time.Hour revidStopTime = 5 * time.Second defaultLogPath = "/var/log/netsender" pkg = "revid-cli:" ) // canProfile is set to false with revid-cli is built with "-tags profile". var canProfile = true // The logger that will be used throughout var log *logger.Logger const ( metaPreambleKey = "copyright" metaPreambleData = "ausocean.org/license/content2019" ) func main() { mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?") runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?") cfg := handleFlags() if !*useNetsender { rv, err := revid.New(cfg, nil) if err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error()) } if err = rv.Start(); err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) } time.Sleep(*runDurationPtr) if err = rv.Stop(); err != nil { cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) } return } if err := run(cfg); err != nil { log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) } } // handleFlags parses command line flags and returns a revid configuration // based on them. func handleFlags() revid.Config { var cfg revid.Config var ( cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Info, Warning, Error, Fatal") framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent") rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") outputPathPtr = flag.String("OutputPath", "", "The directory of the output file") inputFilePtr = flag.String("InputPath", "", "The directory of the input file") heightPtr = flag.Uint("Height", 0, "Height in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels") frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40") intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)") logPathPtr = flag.String("LogPath", defaultLogPath, "The log path") configFilePtr = flag.String("ConfigFile", "", "NetSender config file") sendRetryPtr = flag.Bool("retry", false, "Specify whether a failed send should be retried.") ) var outputs flagStrings flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)") flag.Parse() switch *verbosityPtr { case "Debug": cfg.LogLevel = logger.Debug case "Info": cfg.LogLevel = logger.Info case "Warning": cfg.LogLevel = logger.Warning case "Error": cfg.LogLevel = logger.Error case "Fatal": cfg.LogLevel = logger.Fatal default: cfg.LogLevel = defaultLogVerbosity } log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller) cfg.Logger = log if *cpuprofile != "" { if canProfile { f, err := os.Create(*cpuprofile) if err != nil { log.Log(logger.Fatal, pkg+"could not create CPU profile", "error", err.Error()) } if err := pprof.StartCPUProfile(f); err != nil { log.Log(logger.Fatal, pkg+"could not start CPU profile", "error", err.Error()) } defer pprof.StopCPUProfile() } else { log.Log(logger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.") } } switch *inputPtr { case "Raspivid": cfg.Input = revid.Raspivid case "v4l": cfg.Input = revid.V4L case "File": cfg.Input = revid.File case "": default: log.Log(logger.Error, pkg+"bad input argument") } switch *inputCodecPtr { case "H264": cfg.InputCodec = revid.H264 case "": default: log.Log(logger.Error, pkg+"bad input codec argument") } if len(outputs) == 0 { cfg.Outputs = make([]uint8, 1) } for _, o := range outputs { switch o { case "File": cfg.Outputs = append(cfg.Outputs, revid.File) case "Http": cfg.Outputs = append(cfg.Outputs, revid.Http) case "Rtmp": cfg.Outputs = append(cfg.Outputs, revid.Rtmp) case "FfmpegRtmp": cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp) case "Udp": cfg.Outputs = append(cfg.Outputs, revid.Udp) case "Rtp": cfg.Outputs = append(cfg.Outputs, revid.Rtp) case "": default: log.Log(logger.Error, pkg+"bad output argument", "arg", o) } } switch *rtmpMethodPtr { case "Ffmpeg": cfg.RtmpMethod = revid.Ffmpeg case "LibRtmp": cfg.RtmpMethod = revid.LibRtmp case "": default: log.Log(logger.Error, pkg+"bad rtmp method argument") } switch *packetizationPtr { case "", "None": cfg.Packetization = revid.None case "Mpegts": cfg.Packetization = revid.Mpegts case "Flv": cfg.Packetization = revid.Flv default: log.Log(logger.Error, pkg+"bad packetization argument") } if *configFilePtr != "" { netsender.ConfigFile = *configFilePtr } cfg.Quantize = *quantizePtr cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipVertical = *verticalFlipPtr cfg.FramesPerClip = *framesPerClipPtr cfg.RtmpUrl = *rtmpUrlPtr cfg.Bitrate = *bitratePtr cfg.OutputPath = *outputPathPtr cfg.InputPath = *inputFilePtr cfg.Height = *heightPtr cfg.Width = *widthPtr cfg.FrameRate = *frameRatePtr cfg.HttpAddress = *httpAddressPtr cfg.Quantization = *quantizationPtr cfg.IntraRefreshPeriod = *intraRefreshPeriodPtr cfg.RtpAddress = *rtpAddrPtr cfg.SendRetry = *sendRetryPtr return cfg } // initialize then run the main NetSender client func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") var vars map[string]string var rv *revid.Revid readPin := func(pin *netsender.Pin) error { switch { case pin.Name == "X23": pin.Value = rv.Bitrate() case pin.Name[0] == 'X': return sds.ReadSystem(pin) default: pin.Value = -1 } return nil // Return error only if we want NetSender to generate an error } ns, err := netsender.New(log, nil, readPin, nil) if err != nil { return err } rv, err = revid.New(cfg, ns) if err != nil { log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } vars, _ = ns.Vars() vs := ns.VarSum() // Update revid to get latest config settings from netreceiver. err = rv.Update(vars) if err != nil { return err } // If mode on netreceiver isn't paused then we can start revid. if ns.Mode() != paused && ns.Mode() != burst { err = rv.Start() if err != nil { return err } } if ns.Mode() == burst { ns.SetMode(paused, &vs) } for { err = ns.Run() if err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } // If var sum hasn't change we continue if vs == ns.VarSum() { goto sleep } vars, err = ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) time.Sleep(netSendRetryTime) continue } vs = ns.VarSum() err = rv.Update(vars) if err != nil { return err } switch ns.Mode() { case paused: case normal: err = rv.Start() if err != nil { return err } case burst: log.Log(logger.Info, pkg+"Starting burst...") err = rv.Start() if err != nil { return err } time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) log.Log(logger.Info, pkg+"Stopping burst...") err = rv.Stop() if err != nil { return err } ns.SetMode(paused, &vs) } sleep: sleepTime, err := strconv.Atoi(ns.Param("mp")) if err != nil { return err } time.Sleep(time.Duration(sleepTime) * time.Second) } } // flagStrings implements an appending string set flag. type flagStrings []string func (v *flagStrings) String() string { if *v != nil { return strings.Join(*v, ",") } return "" } func (v *flagStrings) Set(s string) error { if s == "" { return nil } for _, e := range *v { if e == s { return nil } } *v = append(*v, s) return nil } func (v *flagStrings) Get() interface{} { return *v }