From d7de7b2b5b7403e085a5602e098278862b862f2d Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Wed, 16 Dec 2020 13:26:27 +1030 Subject: [PATCH] revid: removed mutex locking and put unexported pipeline specific code in pipeline.go --- revid/pipeline.go | 366 ++++++++++++++++++++++++++++++++++++++++++++++ revid/revid.go | 355 +------------------------------------------- 2 files changed, 370 insertions(+), 351 deletions(-) create mode 100644 revid/pipeline.go diff --git a/revid/pipeline.go b/revid/pipeline.go new file mode 100644 index 00000000..9f29c873 --- /dev/null +++ b/revid/pipeline.go @@ -0,0 +1,366 @@ +/* +DESCRIPTION + pipeline.go provides functionality for set up of the revid processing pipeline. + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + Dan Kortschak + Trek Hopton + Scott Barnard + +LICENSE + Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package revid + +import ( + "errors" + "fmt" + "io" + "time" + + "bitbucket.org/ausocean/av/codec/codecutil" + "bitbucket.org/ausocean/av/codec/h264" + "bitbucket.org/ausocean/av/codec/h265" + "bitbucket.org/ausocean/av/codec/mjpeg" + "bitbucket.org/ausocean/av/container/flv" + "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/device" + "bitbucket.org/ausocean/av/device/file" + "bitbucket.org/ausocean/av/device/geovision" + "bitbucket.org/ausocean/av/device/raspivid" + "bitbucket.org/ausocean/av/device/webcam" + "bitbucket.org/ausocean/av/filter" + "bitbucket.org/ausocean/av/revid/config" + "bitbucket.org/ausocean/utils/ioext" + "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" +) + +// TODO(Saxon): put more thought into error severity and how to handle these. +func (r *Revid) handleErrors() { + for { + err := <-r.err + if err != nil { + r.cfg.Logger.Log(logger.Error, "async error", "error", err.Error()) + } + } +} + +// reset swaps the current config of a Revid with the passed +// configuration; checking validity and returning errors if not valid. It then +// sets up the data pipeline accordingly to this configuration. +func (r *Revid) reset(c config.Config) error { + r.cfg.Logger.Log(logger.Debug, "setting config") + err := r.setConfig(c) + if err != nil { + return err + } + r.cfg.Logger.Log(logger.Info, "config set") + + r.cfg.Logger.Log(logger.Debug, "setting up revid pipeline") + + err = r.setupPipeline( + func(dst io.WriteCloser, rate float64) (io.WriteCloser, error) { + var st int + var encOptions []func(*mts.Encoder) error + + switch r.cfg.Input { + case config.InputRaspivid, config.InputFile, config.InputV4L, config.InputRTSP: + switch r.cfg.InputCodec { + case codecutil.H265: + if r.cfg.Input != config.InputRTSP { + panic("H265 codec valid only for InputRTSP") + } + st = mts.EncodeH265 + case codecutil.H264: + st = mts.EncodeH264 + case codecutil.MJPEG: + st = mts.EncodeMJPEG + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) + r.cfg.CBR = true + default: + panic("unknown input codec for Raspivid, File, V4l or RTSP input") + } + case config.InputAudio: + st = mts.EncodeAudio + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) + rate = 1 / r.cfg.RecPeriod + default: + panic("unknown input type") + } + encOptions = append(encOptions, mts.MediaType(st), mts.Rate(rate)) + return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...) + }, + func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { + return flv.NewEncoder(dst, true, true, fps) + }, + ioext.MultiWriteCloser, + ) + r.cfg.Logger.Log(logger.Info, "finished setting pipeline") + + if err != nil { + return err + } + + return nil +} + +// setConfig takes a config, checks it's validity and then replaces the current +// revid config. +func (r *Revid) setConfig(config config.Config) error { + r.cfg.Logger = config.Logger + r.cfg.Logger.Log(logger.Debug, "validating config") + err := config.Validate() + if err != nil { + return errors.New("Config struct is bad: " + err.Error()) + } + r.cfg.Logger.Log(logger.Info, "config validated") + r.cfg = config + r.cfg.Logger.SetLevel(r.cfg.LogLevel) + return nil +} + +// setupPipeline constructs the revid dataPipeline. Inputs, encoders and +// senders are created and linked based on the current revid config. +// +// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder +// respectively. multiWriter will be used to create an ioext.multiWriteCloser +// so that encoders can write to multiple senders. +func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { + // encoders will hold the encoders that are required for revid's current + // configuration. + var encoders []io.WriteCloser + + // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders + // will hold senders that require FLV encoding. + var mtsSenders, flvSenders []io.WriteCloser + + // Calculate no. of ring buffer elements based on starting element size + // const and config directed max ring buffer size, then create buffer. + // This is only used if the selected output uses a ring buffer. + nElements := r.cfg.RBCapacity / rbStartingElementSize + writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second + + // We will go through our outputs and create the corresponding senders to add + // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the + // output requires FLV encoding. + var w io.WriteCloser + for _, out := range r.cfg.Outputs { + switch out { + case config.OutputHTTP: + r.cfg.Logger.Log(logger.Debug, "using HTTP output") + rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) + hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) + w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) + mtsSenders = append(mtsSenders, w) + + case config.OutputRTP: + r.cfg.Logger.Log(logger.Debug, "using RTP output") + w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) + if err != nil { + r.cfg.Logger.Log(logger.Warning, "rtp connect error", "error", err.Error()) + } + mtsSenders = append(mtsSenders, w) + case config.OutputFile: + r.cfg.Logger.Log(logger.Debug, "using File output") + w, err := newFileSender(r.cfg.OutputPath) + if err != nil { + return err + } + mtsSenders = append(mtsSenders, w) + case config.OutputRTMP: + r.cfg.Logger.Log(logger.Debug, "using RTMP output") + rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) + w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report) + if err != nil { + r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) + } + flvSenders = append(flvSenders, w) + } + } + + // If we have some senders that require MPEGTS encoding then add an MPEGTS + // encoder to revid's encoder slice, and give this encoder the mtsSenders + // as a destination. + if len(mtsSenders) != 0 { + mw := multiWriter(mtsSenders...) + e, _ := mtsEnc(mw, float64(r.cfg.FrameRate)) + encoders = append(encoders, e) + } + + // If we have some senders that require FLV encoding then add an FLV + // encoder to revid's encoder slice, and give this encoder the flvSenders + // as a destination. + if len(flvSenders) != 0 { + mw := multiWriter(flvSenders...) + e, err := flvEnc(mw, int(r.cfg.FrameRate)) + if err != nil { + return err + } + encoders = append(encoders, e) + } + + r.encoders = multiWriter(encoders...) + + l := len(r.cfg.Filters) + r.filters = []filter.Filter{filter.NewNoOp(r.encoders)} + if l != 0 { + r.cfg.Logger.Log(logger.Debug, "setting up filters", "filters", r.cfg.Filters) + r.filters = make([]filter.Filter, l) + dst := r.encoders + + for i := l - 1; i >= 0; i-- { + switch r.cfg.Filters[i] { + case config.FilterNoOp: + r.cfg.Logger.Log(logger.Debug, "using NoOp filter") + r.filters[i] = filter.NewNoOp(dst) + case config.FilterMOG: + r.cfg.Logger.Log(logger.Debug, "using MOG filter") + r.filters[i] = filter.NewMOG(dst, r.cfg) + case config.FilterVariableFPS: + r.cfg.Logger.Log(logger.Debug, "using Variable FPS MOG filter") + r.filters[i] = filter.NewVariableFPS(dst, r.cfg.MinFPS, filter.NewMOG(dst, r.cfg)) + case config.FilterKNN: + r.cfg.Logger.Log(logger.Debug, "using KNN filter") + r.filters[i] = filter.NewKNN(dst, r.cfg) + case config.FilterDiff: + r.cfg.Logger.Log(logger.Debug, "using gocv difference filter") + r.filters[i] = filter.NewDiff(dst, r.cfg) + case config.FilterBasic: + r.cfg.Logger.Log(logger.Debug, "using go difference filter") + r.filters[i] = filter.NewBasic(dst, r.cfg) + default: + panic("Undefined Filter") + } + dst = r.filters[i] + } + r.cfg.Logger.Log(logger.Info, "filters set up") + } + + switch r.cfg.Input { + case config.InputRaspivid: + r.cfg.Logger.Log(logger.Debug, "using raspivid input") + r.input = raspivid.New(r.cfg.Logger) + r.setLexer(r.cfg.InputCodec, false) + + case config.InputV4L: + r.cfg.Logger.Log(logger.Debug, "using V4L input") + r.input = webcam.New(r.cfg.Logger) + r.setLexer(r.cfg.InputCodec, false) + + case config.InputFile: + r.cfg.Logger.Log(logger.Debug, "using file input") + r.input = file.New() + r.setLexer(r.cfg.InputCodec, false) + + case config.InputRTSP: + r.cfg.Logger.Log(logger.Debug, "using RTSP input") + r.input = geovision.New(r.cfg.Logger) + r.setLexer(r.cfg.InputCodec, true) + + case config.InputAudio: + r.cfg.Logger.Log(logger.Debug, "using audio input") + err := r.setupAudio() + if err != nil { + return err + } + } + + // Configure the input device. We know that defaults are set, so no need to + // return error, but we should log. + r.cfg.Logger.Log(logger.Debug, "configuring input device") + err := r.input.Set(r.cfg) + if err != nil { + r.cfg.Logger.Log(logger.Warning, "errors from configuring input device", "errors", err) + } + r.cfg.Logger.Log(logger.Info, "input device configured") + + return nil +} + +// setLexer sets the revid input lexer based on input codec and whether input +// is RTSP or not, in which case an RTP/ extractor is used. +func (r *Revid) setLexer(c uint8, isRTSP bool) { + switch c { + case codecutil.H264: + r.cfg.Logger.Log(logger.Debug, "using H.264 codec") + r.lexTo = h264.Lex + if isRTSP { + r.lexTo = h264.NewExtractor().Extract + } + case codecutil.H265: + r.cfg.Logger.Log(logger.Debug, "using H.265 codec") + r.lexTo = h265.NewExtractor(false).Extract + if !isRTSP { + panic("byte stream H.265 lexing not implemented") + } + case codecutil.MJPEG: + r.cfg.Logger.Log(logger.Debug, "using MJPEG codec") + r.lexTo = mjpeg.Lex + if isRTSP { + r.lexTo = mjpeg.NewExtractor().Extract + } + default: + panic("unrecognised codec") + } +} + +// processFrom is run as a routine to read from a input data source, lex and +// then send individual access units to revid's encoders. +func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { + defer r.wg.Done() + + for l := true; l; l = r.cfg.Loop { + err := in.Start() + if err != nil { + r.err <- fmt.Errorf("could not start input device: %w", err) + return + } + + // Lex data from input device, in, until finished or an error is encountered. + // For a continuous source e.g. a camera or microphone, we should remain + // in this call indefinitely unless in.Stop() is called and an io.EOF is forced. + r.cfg.Logger.Log(logger.Debug, "lexing") + err = r.lexTo(r.filters[0], in, delay) + switch err { + case nil, io.EOF: + r.cfg.Logger.Log(logger.Info, "end of file") + case io.ErrUnexpectedEOF: + r.cfg.Logger.Log(logger.Info, "unexpected EOF from input") + default: + r.err <- err + } + r.cfg.Logger.Log(logger.Info, "finished reading input") + + r.cfg.Logger.Log(logger.Debug, "stopping input") + err = in.Stop() + if err != nil { + r.err <- fmt.Errorf("could not stop input source: %w", err) + } else { + r.cfg.Logger.Log(logger.Info, "input stopped") + } + + // If we're looping and we get a stop signal we return. + select { + case <-r.stop: + return + default: + } + } +} diff --git a/revid/revid.go b/revid/revid.go index 2acdea3d..5a2d20ec 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -30,30 +30,17 @@ LICENSE package revid import ( - "errors" "fmt" "io" "sync" "time" - "bitbucket.org/ausocean/av/codec/codecutil" - "bitbucket.org/ausocean/av/codec/h264" - "bitbucket.org/ausocean/av/codec/h265" - "bitbucket.org/ausocean/av/codec/mjpeg" - "bitbucket.org/ausocean/av/container/flv" - "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/device" - "bitbucket.org/ausocean/av/device/file" - "bitbucket.org/ausocean/av/device/geovision" - "bitbucket.org/ausocean/av/device/raspivid" - "bitbucket.org/ausocean/av/device/webcam" "bitbucket.org/ausocean/av/filter" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/bitrate" - "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) // Misc consts. @@ -106,9 +93,6 @@ type Revid struct { // running is used to keep track of revid's running state between methods. running bool - // mu is used to protect isRunning during concurrent use. - mu sync.Mutex - // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup @@ -135,299 +119,23 @@ func New(c config.Config, ns *netsender.Sender) (*Revid, error) { } // Config returns a copy of revids current config. -// -// Config is not safe for concurrent use. func (r *Revid) Config() config.Config { return r.cfg } -// TODO(Saxon): put more thought into error severity and how to handle these. -func (r *Revid) handleErrors() { - for { - err := <-r.err - if err != nil { - r.cfg.Logger.Log(logger.Error, "async error", "error", err.Error()) - } - } -} - // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate.Bitrate() } -// reset swaps the current config of a Revid with the passed -// configuration; checking validity and returning errors if not valid. It then -// sets up the data pipeline accordingly to this configuration. -func (r *Revid) reset(c config.Config) error { - r.cfg.Logger.Log(logger.Debug, "setting config") - err := r.setConfig(c) - if err != nil { - return err - } - r.cfg.Logger.Log(logger.Info, "config set") - - r.cfg.Logger.Log(logger.Debug, "setting up revid pipeline") - - err = r.setupPipeline( - func(dst io.WriteCloser, rate float64) (io.WriteCloser, error) { - var st int - var encOptions []func(*mts.Encoder) error - - switch r.cfg.Input { - case config.InputRaspivid,config.InputFile, config.InputV4L,config.InputRTSP: - switch r.cfg.InputCodec { - case codecutil.H265: - if r.cfg.Input != config.InputRTSP { - panic("H265 codec valid only for InputRTSP") - } - st = mts.EncodeH265 - case codecutil.H264: - st = mts.EncodeH264 - case codecutil.MJPEG: - st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) - r.cfg.CBR = true - default: - panic("unknown input codec for Raspivid, File, V4l or RTSP input") - } - case config.InputAudio: - st = mts.EncodeAudio - encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) - rate = 1 / r.cfg.RecPeriod - default: - panic("unknown input type") - } - encOptions = append(encOptions, mts.MediaType(st), mts.Rate(rate)) - return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...) - }, - func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { - return flv.NewEncoder(dst, true, true, fps) - }, - ioext.MultiWriteCloser, - ) - r.cfg.Logger.Log(logger.Info, "finished setting pipeline") - - if err != nil { - return err - } - - return nil -} - -// setConfig takes a config, checks it's validity and then replaces the current -// revid config. -func (r *Revid) setConfig(config config.Config) error { - r.cfg.Logger = config.Logger - r.cfg.Logger.Log(logger.Debug, "validating config") - err := config.Validate() - if err != nil { - return errors.New("Config struct is bad: " + err.Error()) - } - r.cfg.Logger.Log(logger.Info, "config validated") - r.cfg = config - r.cfg.Logger.SetLevel(r.cfg.LogLevel) - return nil -} - -// setupPipeline constructs the revid dataPipeline. Inputs, encoders and -// senders are created and linked based on the current revid config. -// -// mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder -// respectively. multiWriter will be used to create an ioext.multiWriteCloser -// so that encoders can write to multiple senders. -func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { - // encoders will hold the encoders that are required for revid's current - // configuration. - var encoders []io.WriteCloser - - // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders - // will hold senders that require FLV encoding. - var mtsSenders, flvSenders []io.WriteCloser - - // Calculate no. of ring buffer elements based on starting element size - // const and config directed max ring buffer size, then create buffer. - // This is only used if the selected output uses a ring buffer. - nElements := r.cfg.RBCapacity / rbStartingElementSize - writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second - - // We will go through our outputs and create the corresponding senders to add - // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the - // output requires FLV encoding. - var w io.WriteCloser - for _, out := range r.cfg.Outputs { - switch out { - case config.OutputHTTP: - r.cfg.Logger.Log(logger.Debug, "using HTTP output") - rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) - hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) - w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) - mtsSenders = append(mtsSenders, w) - - case config.OutputRTP: - r.cfg.Logger.Log(logger.Debug, "using RTP output") - w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) - if err != nil { - r.cfg.Logger.Log(logger.Warning, "rtp connect error", "error", err.Error()) - } - mtsSenders = append(mtsSenders, w) - case config.OutputFile: - r.cfg.Logger.Log(logger.Debug, "using File output") - w, err := newFileSender(r.cfg.OutputPath) - if err != nil { - return err - } - mtsSenders = append(mtsSenders, w) - case config.OutputRTMP: - r.cfg.Logger.Log(logger.Debug, "using RTMP output") - rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) - w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report) - if err != nil { - r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) - } - flvSenders = append(flvSenders, w) - } - } - - // If we have some senders that require MPEGTS encoding then add an MPEGTS - // encoder to revid's encoder slice, and give this encoder the mtsSenders - // as a destination. - if len(mtsSenders) != 0 { - mw := multiWriter(mtsSenders...) - e, _ := mtsEnc(mw, float64(r.cfg.FrameRate)) - encoders = append(encoders, e) - } - - // If we have some senders that require FLV encoding then add an FLV - // encoder to revid's encoder slice, and give this encoder the flvSenders - // as a destination. - if len(flvSenders) != 0 { - mw := multiWriter(flvSenders...) - e, err := flvEnc(mw, int(r.cfg.FrameRate)) - if err != nil { - return err - } - encoders = append(encoders, e) - } - - r.encoders = multiWriter(encoders...) - - l := len(r.cfg.Filters) - r.filters = []filter.Filter{filter.NewNoOp(r.encoders)} - if l != 0 { - r.cfg.Logger.Log(logger.Debug, "setting up filters", "filters", r.cfg.Filters) - r.filters = make([]filter.Filter, l) - dst := r.encoders - - for i := l - 1; i >= 0; i-- { - switch r.cfg.Filters[i] { - case config.FilterNoOp: - r.cfg.Logger.Log(logger.Debug, "using NoOp filter") - r.filters[i] = filter.NewNoOp(dst) - case config.FilterMOG: - r.cfg.Logger.Log(logger.Debug, "using MOG filter") - r.filters[i] = filter.NewMOG(dst, r.cfg) - case config.FilterVariableFPS: - r.cfg.Logger.Log(logger.Debug, "using Variable FPS MOG filter") - r.filters[i] = filter.NewVariableFPS(dst, r.cfg.MinFPS, filter.NewMOG(dst, r.cfg)) - case config.FilterKNN: - r.cfg.Logger.Log(logger.Debug, "using KNN filter") - r.filters[i] = filter.NewKNN(dst, r.cfg) - case config.FilterDiff: - r.cfg.Logger.Log(logger.Debug, "using gocv difference filter") - r.filters[i] = filter.NewDiff(dst, r.cfg) - case config.FilterBasic: - r.cfg.Logger.Log(logger.Debug, "using go difference filter") - r.filters[i] = filter.NewBasic(dst, r.cfg) - default: - panic("Undefined Filter") - } - dst = r.filters[i] - } - r.cfg.Logger.Log(logger.Info, "filters set up") - } - - switch r.cfg.Input { - case config.InputRaspivid: - r.cfg.Logger.Log(logger.Debug, "using raspivid input") - r.input = raspivid.New(r.cfg.Logger) - r.setLexer(r.cfg.InputCodec, false) - - case config.InputV4L: - r.cfg.Logger.Log(logger.Debug, "using V4L input") - r.input = webcam.New(r.cfg.Logger) - r.setLexer(r.cfg.InputCodec, false) - - case config.InputFile: - r.cfg.Logger.Log(logger.Debug, "using file input") - r.input = file.New() - r.setLexer(r.cfg.InputCodec, false) - - case config.InputRTSP: - r.cfg.Logger.Log(logger.Debug, "using RTSP input") - r.input = geovision.New(r.cfg.Logger) - r.setLexer(r.cfg.InputCodec, true) - - case config.InputAudio: - r.cfg.Logger.Log(logger.Debug, "using audio input") - err := r.setupAudio() - if err != nil { - return err - } - } - - // Configure the input device. We know that defaults are set, so no need to - // return error, but we should log. - r.cfg.Logger.Log(logger.Debug, "configuring input device") - err := r.input.Set(r.cfg) - if err != nil { - r.cfg.Logger.Log(logger.Warning, "errors from configuring input device", "errors", err) - } - r.cfg.Logger.Log(logger.Info, "input device configured") - - return nil -} - -// setLexer sets the revid input lexer based on input codec and whether input -// is RTSP or not, in which case an RTP/ extractor is used. -func (r *Revid) setLexer(c uint8, isRTSP bool) { - switch c { - case codecutil.H264: - r.cfg.Logger.Log(logger.Debug, "using H.264 codec") - r.lexTo = h264.Lex - if isRTSP { - r.lexTo = h264.NewExtractor().Extract - } - case codecutil.H265: - r.cfg.Logger.Log(logger.Debug, "using H.265 codec") - r.lexTo = h265.NewExtractor(false).Extract - if !isRTSP { - panic("byte stream H.265 lexing not implemented") - } - case codecutil.MJPEG: - r.cfg.Logger.Log(logger.Debug, "using MJPEG codec") - r.lexTo = mjpeg.Lex - if isRTSP { - r.lexTo = mjpeg.NewExtractor().Extract - } - default: - panic("unrecognised codec") - } -} - // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. -// -// Start is safe for concurrent use. func (r *Revid) Start() error { - if r.IsRunning() { + if r.running { r.cfg.Logger.Log(logger.Warning, "start called, but revid already running") return nil } - r.mu.Lock() - defer r.mu.Unlock() - r.stop = make(chan struct{}) r.cfg.Logger.Log(logger.Debug, "resetting revid") @@ -457,17 +165,12 @@ func (r *Revid) Start() error { // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. -// -// Stop is safe for concurrent use. func (r *Revid) Stop() { - if !r.IsRunning() { + if !r.running { r.cfg.Logger.Log(logger.Warning, "stop called but revid isn't running") return } - r.mu.Lock() - defer r.mu.Unlock() - close(r.stop) r.cfg.Logger.Log(logger.Debug, "stopping input") @@ -521,25 +224,19 @@ func (r *Revid) Burst() error { return nil } -func (r *Revid) IsRunning() bool { - r.mu.Lock() - defer r.mu.Unlock() +func (r *Revid) Running() bool { return r.running } // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. -// -// Update is safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { - if r.IsRunning() { + if r.running { r.cfg.Logger.Log(logger.Debug, "revid running; stopping for re-config") r.Stop() r.cfg.Logger.Log(logger.Info, "revid was running; stopped for re-config") } - r.mu.Lock() - defer r.mu.Unlock() //look through the vars and update revid where needed r.cfg.Logger.Log(logger.Debug, "checking vars from server", "vars", vars) r.cfg.Update(vars) @@ -547,47 +244,3 @@ func (r *Revid) Update(vars map[string]string) error { r.cfg.Logger.Log(logger.Debug, "config changed", "config", r.cfg) return nil } - -// processFrom is run as a routine to read from a input data source, lex and -// then send individual access units to revid's encoders. -func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { - defer r.wg.Done() - - for l := true; l; l = r.cfg.Loop { - err := in.Start() - if err != nil { - r.err <- fmt.Errorf("could not start input device: %w", err) - return - } - - // Lex data from input device, in, until finished or an error is encountered. - // For a continuous source e.g. a camera or microphone, we should remain - // in this call indefinitely unless in.Stop() is called and an io.EOF is forced. - r.cfg.Logger.Log(logger.Debug, "lexing") - err = r.lexTo(r.filters[0], in, delay) - switch err { - case nil, io.EOF: - r.cfg.Logger.Log(logger.Info, "end of file") - case io.ErrUnexpectedEOF: - r.cfg.Logger.Log(logger.Info, "unexpected EOF from input") - default: - r.err <- err - } - r.cfg.Logger.Log(logger.Info, "finished reading input") - - r.cfg.Logger.Log(logger.Debug, "stopping input") - err = in.Stop() - if err != nil { - r.err <- fmt.Errorf("could not stop input source: %w", err) - } else { - r.cfg.Logger.Log(logger.Info, "input stopped") - } - - // If we're looping and we get a stop signal we return. - select { - case <-r.stop: - return - default: - } - } -}