/* NAME revid.go AUTHORS Saxon A. Nelson-Milton Alan Noble Dan Kortschak Trek Hopton LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ // Package revid provides an API for reading, transcoding, and writing audio/video streams and files. package revid import ( "errors" "fmt" "io" "os/exec" "strconv" "strings" "sync" "time" "bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/codec/mjpeg" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/device" "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/device/geovision" "bitbucket.org/ausocean/av/device/raspivid" "bitbucket.org/ausocean/av/device/webcam" "bitbucket.org/ausocean/av/filter" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Ring buffer defaults. const ( // MTS ring buffer defaults. defaultMTSRBSize = 1000 defaultMTSRBElementSize = 100000 defaultMTSRBWriteTimeout = 5 // RTMP ring buffer defaults. defaultRTMPRBSize = 1000 defaultRTMPRBElementSize = 300000 defaultRTMPRBWriteTimeout = 5 ) // RTMP connection properties. const ( rtmpConnectionMaxTries = 5 rtmpConnectionTimeout = 10 ) // Motion filter parameters const ( knnMinArea = 25.0 knnThreshold = 300 knnHistory = 300 knnKernel = 9 mogMinArea = 25.0 mogThreshold = 20.0 mogHistory = 500 showWindows = true minFPS = 1.0 ) const pkg = "revid: " type Logger interface { SetLevel(int8) Log(level int8, message string, params ...interface{}) } // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { // config holds the Revid configuration. // For historical reasons it also handles logging. // FIXME(kortschak): The relationship of concerns // in config/ns is weird. cfg config.Config // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender // input will capture audio or video from which we can read data. input device.AVDevice // closeInput holds the cleanup function return from setupInput and is called // in Revid.Stop(). closeInput func() error // cmd is the exec'd process that may be used to produce // the input stream. // FIXME(kortschak): This should not exist. Replace this // with a context.Context cancellation. cmd *exec.Cmd // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error // filter will hold the filter interface that will write to the chosen filter from the lexer. filter filter.Filter // encoders will hold the multiWriteCloser that writes to encoders from the filter. encoders io.WriteCloser // running is used to keep track of revid's running state between methods. running bool // mu is used to protect isRunning during concurrent use. mu sync.Mutex // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup // err will channel errors from revid routines to the handle errors routine. err chan error } // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c config.Config, ns *netsender.Sender) (*Revid, error) { r := Revid{ns: ns, err: make(chan error)} err := r.setConfig(c) if err != nil { return nil, fmt.Errorf("could not set config, failed with error: %w", err) } r.cfg.Logger.SetLevel(c.LogLevel) go r.handleErrors() return &r, nil } // Config returns a copy of revids current config. // // Config is not safe for concurrent use. func (r *Revid) Config() config.Config { return r.cfg } // TODO(Saxon): put more thought into error severity and how to handle these. func (r *Revid) handleErrors() { for { err := <-r.err if err != nil { r.cfg.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) } } } // Bitrate returns the result of the most recent bitrate check. // // TODO: get this working again. func (r *Revid) Bitrate() int { return -1 } // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. It then // sets up the data pipeline accordingly to this configuration. func (r *Revid) reset(c config.Config) error { err := r.setConfig(c) if err != nil { return err } r.cfg.Logger.SetLevel(c.LogLevel) err = r.setupPipeline( func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { var st int var encOptions []func(*mts.Encoder) error switch r.cfg.Input { case config.InputRaspivid: switch r.cfg.InputCodec { case codecutil.H264: st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) default: panic("unknown input codec for raspivid input") } case config.InputFile, config.InputV4L: switch r.cfg.InputCodec { case codecutil.H264: st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) default: panic("unknown input codec for v4l or input file input") } case config.InputRTSP: switch r.cfg.InputCodec { case codecutil.H265: st = mts.EncodeH265 case codecutil.H264: st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) default: panic("unknown input codec for RTSP input") } case config.InputAudio: st = mts.EncodeAudio default: panic("unknown input type") } return mts.NewEncoder(dst, float64(fps), st, encOptions...) }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) }, ioext.MultiWriteCloser, ) if err != nil { return err } return nil } // setConfig takes a config, checks it's validity and then replaces the current // revid config. func (r *Revid) setConfig(config config.Config) error { r.cfg.Logger = config.Logger err := config.Validate() if err != nil { return errors.New("Config struct is bad: " + err.Error()) } r.cfg = config return nil } // setupPipeline constructs the revid dataPipeline. Inputs, encoders and // senders are created and linked based on the current revid config. // // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { // encoders will hold the encoders that are required for revid's current // configuration. var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. var mtsSenders, flvSenders []io.WriteCloser // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. var w io.WriteCloser for _, out := range r.cfg.Outputs { switch out { case config.OutputHTTP: w = newMtsSender( newHttpSender(r.ns, r.cfg.Logger.Log), r.cfg.Logger.Log, ring.NewBuffer(r.cfg.MTSRBSize, r.cfg.MTSRBElementSize, time.Duration(r.cfg.MTSRBWriteTimeout)*time.Second), r.cfg.ClipDuration, ) mtsSenders = append(mtsSenders, w) case config.OutputRTP: w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } mtsSenders = append(mtsSenders, w) case config.OutputFile: w, err := newFileSender(r.cfg.OutputPath) if err != nil { return err } mtsSenders = append(mtsSenders, w) case config.OutputRTMP: w, err := newRtmpSender( r.cfg.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, ring.NewBuffer(r.cfg.RTMPRBSize, r.cfg.RTMPRBElementSize, time.Duration(r.cfg.RTMPRBWriteTimeout)*time.Second), r.cfg.Logger.Log, ) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } flvSenders = append(flvSenders, w) } } // If we have some senders that require MPEGTS encoding then add an MPEGTS // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { mw := multiWriter(mtsSenders...) e, _ := mtsEnc(mw, r.cfg.WriteRate) encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { mw := multiWriter(flvSenders...) e, err := flvEnc(mw, int(r.cfg.FrameRate)) if err != nil { return err } encoders = append(encoders, e) } r.encoders = multiWriter(encoders...) switch r.cfg.Filter { case config.FilterNoOp: r.filter = filter.NewNoOp(r.encoders) case config.FilterMOG: r.filter = filter.NewMOGFilter(r.encoders, mogMinArea, mogThreshold, mogHistory, showWindows) case config.FilterVariableFPS: r.filter = filter.NewVariableFPSFilter(r.encoders, minFPS, filter.NewMOGFilter(r.encoders, mogMinArea, mogThreshold, mogHistory, showWindows)) case config.FilterKNN: r.filter = filter.NewKNNFilter(r.encoders, knnMinArea, knnThreshold, knnHistory, knnKernel, showWindows) default: panic("Undefined Filter") } switch r.cfg.Input { case config.InputRaspivid: r.input = raspivid.New(r.cfg.Logger) switch r.cfg.InputCodec { case codecutil.H264: r.lexTo = h264.Lex case codecutil.MJPEG: r.lexTo = mjpeg.Lex } case config.InputV4L: r.input = webcam.New(r.cfg.Logger) switch r.cfg.InputCodec { case codecutil.H264: r.lexTo = h264.Lex case codecutil.MJPEG: r.lexTo = mjpeg.Lex } case config.InputFile: r.input = file.New() switch r.cfg.InputCodec { case codecutil.H264: r.lexTo = h264.Lex case codecutil.MJPEG: r.lexTo = mjpeg.Lex } case config.InputRTSP: r.input = geovision.New(r.cfg.Logger) switch r.cfg.InputCodec { case codecutil.H264: r.lexTo = h264.NewExtractor().Extract case codecutil.H265: r.lexTo = h265.NewLexer(false).Lex case codecutil.MJPEG: panic("not implemented") } case config.InputAudio: err := r.setupAudio() if err != nil { return err } } // Configure the input device. We know that defaults are set, so no need to // return error, but we should log. err := r.input.Set(r.cfg) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"errors from configuring input device", "errors", err) } return nil } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. // // Start is safe for concurrent use. func (r *Revid) Start() error { if r.IsRunning() { r.cfg.Logger.Log(logger.Warning, pkg+"start called, but revid already running") return nil } r.mu.Lock() defer r.mu.Unlock() r.cfg.Logger.Log(logger.Info, pkg+"starting Revid") err := r.reset(r.cfg) if err != nil { r.Stop() return err } err = r.input.Start() if err != nil { return fmt.Errorf("could not start input device: %w", err) } r.wg.Add(1) go r.processFrom(r.input, 0) r.running = true return nil } // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. // // Stop is safe for concurrent use. func (r *Revid) Stop() { if !r.IsRunning() { r.cfg.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") return } r.mu.Lock() defer r.mu.Unlock() err := r.input.Stop() if err != nil { r.cfg.Logger.Log(logger.Error, pkg+"could not stop input", "error", err.Error()) } r.cfg.Logger.Log(logger.Info, pkg+"closing pid peline") err = r.encoders.Close() if err != nil { r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) } err = r.filter.Close() if err != nil { r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) } r.cfg.Logger.Log(logger.Info, pkg+"closed pipeline") if r.cmd != nil && r.cmd.Process != nil { r.cfg.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } r.cfg.Logger.Log(logger.Info, pkg+"waiting for routines to close") r.wg.Wait() r.cfg.Logger.Log(logger.Info, pkg+"revid stopped") r.running = false } func (r *Revid) IsRunning() bool { r.mu.Lock() defer r.mu.Unlock() return r.running } // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. // // Update is safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { r.Stop() } r.mu.Lock() defer r.mu.Unlock() //look through the vars and update revid where needed for key, value := range vars { switch key { case "Input": v, ok := map[string]uint8{"raspivid": config.InputRaspivid, "rtsp": config.InputRTSP, "v4l": config.InputV4L, "file": config.InputFile}[strings.ToLower(value)] if !ok { r.cfg.Logger.Log(logger.Warning, pkg+"invalid input var", "value", value) break } r.cfg.Input = v case "Saturation": s, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid saturation param", "value", value) break } r.cfg.Saturation = int(s) case "Brightness": b, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid brightness param", "value", value) break } r.cfg.Brightness = uint(b) case "Exposure": r.cfg.Exposure = value case "AutoWhiteBalance": r.cfg.AutoWhiteBalance = value case "InputCodec": switch value { case "H264": r.cfg.InputCodec = codecutil.H264 case "MJPEG": r.cfg.InputCodec = codecutil.MJPEG default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid InputCodec variable value", "value", value) } case "Outputs": outputs := strings.Split(value, ",") r.cfg.Outputs = make([]uint8, len(outputs)) for i, output := range outputs { switch output { case "File": r.cfg.Outputs[i] = config.OutputFile case "Http": r.cfg.Outputs[i] = config.OutputHTTP case "Rtmp": r.cfg.Outputs[i] = config.OutputRTMP case "Rtp": r.cfg.Outputs[i] = config.OutputRTP default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid outputs param", "value", value) continue } } case "Output": r.cfg.Outputs = make([]uint8, 1) switch value { case "File": r.cfg.Outputs[0] = config.OutputFile case "Http": r.cfg.Outputs[0] = config.OutputHTTP case "Rtmp": r.cfg.Outputs[0] = config.OutputRTMP case "Rtp": r.cfg.Outputs[0] = config.OutputRTP default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid output param", "value", value) continue } case "RtmpUrl": r.cfg.RTMPURL = value case "RtpAddress": r.cfg.RTPAddress = value case "Bitrate": v, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } r.cfg.Bitrate = uint(v) case "OutputPath": r.cfg.OutputPath = value case "InputPath": r.cfg.InputPath = value case "Height": h, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) break } r.cfg.Height = uint(h) case "Width": w, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) break } r.cfg.Width = uint(w) case "FrameRate": v, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } r.cfg.FrameRate = uint(v) case "Rotation": v, err := strconv.Atoi(value) if err != nil || v > 359 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid rotation param", "value", value) break } r.cfg.Rotation = uint(v) case "HttpAddress": r.cfg.HTTPAddress = value case "Quantization": v, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", v) break } r.cfg.Quantization = uint(v) case "MinFrames": v, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid MinFrames param", "value", value) break } r.cfg.MinFrames = uint(v) case "ClipDuration": v, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid ClipDuration param", "value", value) break } r.cfg.ClipDuration = time.Duration(v) * time.Second case "HorizontalFlip": switch strings.ToLower(value) { case "true": r.cfg.HorizontalFlip = true case "false": r.cfg.HorizontalFlip = false default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": r.cfg.VerticalFlip = true case "false": r.cfg.VerticalFlip = false default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } case "Filter": m := map[string]int{"NoOp": config.FilterNoOp, "MOG": config.FilterMOG, "VariableFPS": config.FilterVariableFPS, "KNN": config.FilterKNN} v, ok := m[value] if !ok { r.cfg.Logger.Log(logger.Warning, pkg+"invalid FilterMethod param", "value", value) } r.cfg.Filter = v case "PSITime": v, err := strconv.Atoi(value) if err != nil || v < 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid PSITime var", "value", value) break } r.cfg.PSITime = v case "BurstPeriod": v, err := strconv.Atoi(value) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) break } r.cfg.BurstPeriod = uint(v) case "Logging": switch value { case "Debug": r.cfg.LogLevel = logger.Debug case "Info": r.cfg.LogLevel = logger.Info case "Warning": r.cfg.LogLevel = logger.Warning case "Error": r.cfg.LogLevel = logger.Error case "Fatal": r.cfg.LogLevel = logger.Fatal default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) } case "RTMPRBSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBSize var", "value", value) break } r.cfg.RTMPRBSize = v case "RTMPRBElementSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBElementSize var", "value", value) break } r.cfg.RTMPRBElementSize = v case "RTMPRBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid RTMPRBWriteTimeout var", "value", value) break } r.cfg.RTMPRBWriteTimeout = v case "MTSRBSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBSize var", "value", value) break } r.cfg.MTSRBSize = v case "MTSRBElementSize": v, err := strconv.Atoi(value) if err != nil || v < 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBElementSize var", "value", value) break } r.cfg.MTSRBElementSize = v case "MTSRBWriteTimeout": v, err := strconv.Atoi(value) if err != nil || v <= 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid MTSRBWriteTimeout var", "value", value) break } r.cfg.MTSRBWriteTimeout = v case "CBR": v, ok := map[string]bool{"true": true, "false": false}[strings.ToLower(value)] if !ok { r.cfg.Logger.Log(logger.Warning, pkg+"invalid CBR var", "value", value) break } r.cfg.CBR = v case "VBRQuality": v, ok := map[string]config.Quality{"standard": config.QualityStandard, "fair": config.QualityFair, "good": config.QualityGood, "great": config.QualityGreat, "excellent": config.QualityExcellent}[strings.ToLower(value)] if !ok { r.cfg.Logger.Log(logger.Warning, pkg+"invalid VBRQuality var", "value", value) break } r.cfg.VBRQuality = v case "VBRBitrate": v, err := strconv.Atoi(value) if err != nil || v <= 0 { r.cfg.Logger.Log(logger.Warning, pkg+"invalid VBRBitrate var", "value", value) break } r.cfg.VBRBitrate = v case "CameraChan": v, err := strconv.Atoi(value) if err != nil || (v != 1 && v != 2) { r.cfg.Logger.Log(logger.Warning, pkg+"invalid CameraChan var", "value", value) break } r.cfg.CameraChan = v } } r.cfg.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.cfg)) return nil } // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.err <- r.lexTo(r.filter, read, delay) r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") r.wg.Done() }