/* NAME revid.go AUTHORS Saxon A. Nelson-Milton <saxon@ausocean.org> Alan Noble <alan@ausocean.org> Dan Kortschak <dan@ausocean.org> Trek Hopton <trek@ausocean.org> Scott Barnard <scott@ausocean.org> LICENSE revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ // Package revid provides an API for reading, transcoding, and writing audio/video streams and files. package revid import ( "errors" "fmt" "io" "sync" "time" "bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/codec/mjpeg" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/device" "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/device/geovision" "bitbucket.org/ausocean/av/device/raspivid" "bitbucket.org/ausocean/av/device/webcam" "bitbucket.org/ausocean/av/filter" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/bitrate" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Misc consts. const ( rbStartingElementSize = 10000 // Bytes. rtmpConnectionMaxTries = 5 ) type Logger interface { SetLevel(int8) Log(level int8, message string, params ...interface{}) } type encLog struct{ Logger } func (el *encLog) Debug(msg string, args ...interface{}) { el.Log(logger.Debug, msg, args...) } func (el *encLog) Info(msg string, args ...interface{}) { el.Log(logger.Info, msg, args...) } func (el *encLog) Warning(msg string, args ...interface{}) { el.Log(logger.Warning, msg, args...) } func (el *encLog) Error(msg string, args ...interface{}) { el.Log(logger.Error, msg, args...) } func (el *encLog) Fatal(msg string, args ...interface{}) { el.Log(logger.Fatal, msg, args...) } // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { // config holds the Revid configuration. // For historical reasons it also handles logging. // FIXME(kortschak): The relationship of concerns // in config/ns is weird. cfg config.Config // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender // input will capture audio or video from which we can read data. input device.AVDevice // closeInput holds the cleanup function return from setupInput and is called // in Revid.Stop(). closeInput func() error // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error // filters will hold the filter interface that will write to the chosen filter from the lexer. filters []filter.Filter // encoders will hold the multiWriteCloser that writes to encoders from the filter. encoders io.WriteCloser // running is used to keep track of revid's running state between methods. running bool // mu is used to protect isRunning during concurrent use. mu sync.Mutex // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup // err will channel errors from revid routines to the handle errors routine. err chan error // bitrate is used for bitrate calculations. bitrate bitrate.Calculator // stop is used to signal stopping when looping an input. stop chan struct{} } // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c config.Config, ns *netsender.Sender) (*Revid, error) { r := Revid{ns: ns, err: make(chan error)} err := r.setConfig(c) if err != nil { return nil, fmt.Errorf("could not set config, failed with error: %w", err) } go r.handleErrors() return &r, nil } // Config returns a copy of revids current config. // // Config is not safe for concurrent use. func (r *Revid) Config() config.Config { return r.cfg } // TODO(Saxon): put more thought into error severity and how to handle these. func (r *Revid) handleErrors() { for { err := <-r.err if err != nil { r.cfg.Logger.Log(logger.Error, "async error", "error", err.Error()) } } } // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate.Bitrate() } // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. It then // sets up the data pipeline accordingly to this configuration. func (r *Revid) reset(c config.Config) error { r.cfg.Logger.Log(logger.Debug, "setting config") err := r.setConfig(c) if err != nil { return err } r.cfg.Logger.Log(logger.Info, "config set") r.cfg.Logger.Log(logger.Debug, "setting up revid pipeline") err = r.setupPipeline( func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { var st int var encOptions []func(*mts.Encoder) error switch r.cfg.Input { case config.InputRaspivid: switch r.cfg.InputCodec { case codecutil.H264: st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) r.cfg.CBR = true default: panic("unknown input codec for raspivid input") } case config.InputFile, config.InputV4L: switch r.cfg.InputCodec { case codecutil.H264: st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) r.cfg.CBR = true default: panic(fmt.Sprintf("unknown input codec %d for v4l or input file input", r.cfg.InputCodec)) } case config.InputRTSP: switch r.cfg.InputCodec { case codecutil.H265: st = mts.EncodeH265 case codecutil.H264: st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) r.cfg.CBR = true default: panic("unknown input codec for RTSP input") } case config.InputAudio: st = mts.EncodeAudio default: panic("unknown input type") } encOptions = append(encOptions, mts.MediaType(st), mts.Rate(int(fps))) return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...) }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) }, ioext.MultiWriteCloser, ) r.cfg.Logger.Log(logger.Info, "finished setting pipeline") if err != nil { return err } return nil } // setConfig takes a config, checks it's validity and then replaces the current // revid config. func (r *Revid) setConfig(config config.Config) error { r.cfg.Logger = config.Logger r.cfg.Logger.Log(logger.Debug, "validating config") err := config.Validate() if err != nil { return errors.New("Config struct is bad: " + err.Error()) } r.cfg.Logger.Log(logger.Info, "config validated") r.cfg = config r.cfg.Logger.SetLevel(r.cfg.LogLevel) return nil } // setupPipeline constructs the revid dataPipeline. Inputs, encoders and // senders are created and linked based on the current revid config. // // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { // encoders will hold the encoders that are required for revid's current // configuration. var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. var mtsSenders, flvSenders []io.WriteCloser // Calculate no. of ring buffer elements based on starting element size // const and config directed max ring buffer size, then create buffer. // This is only used if the selected output uses a ring buffer. nElements := r.cfg.RBCapacity / rbStartingElementSize writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. var w io.WriteCloser for _, out := range r.cfg.Outputs { switch out { case config.OutputHTTP: r.cfg.Logger.Log(logger.Debug, "using HTTP output") rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) case config.OutputRTP: r.cfg.Logger.Log(logger.Debug, "using RTP output") w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, "rtp connect error", "error", err.Error()) } mtsSenders = append(mtsSenders, w) case config.OutputFile: r.cfg.Logger.Log(logger.Debug, "using File output") w, err := newFileSender(r.cfg.OutputPath) if err != nil { return err } mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Log(logger.Debug, "using RTMP output") rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) } flvSenders = append(flvSenders, w) } } // If we have some senders that require MPEGTS encoding then add an MPEGTS // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { mw := multiWriter(mtsSenders...) e, _ := mtsEnc(mw, r.cfg.WriteRate) encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { mw := multiWriter(flvSenders...) e, err := flvEnc(mw, int(r.cfg.FrameRate)) if err != nil { return err } encoders = append(encoders, e) } r.encoders = multiWriter(encoders...) l := len(r.cfg.Filters) r.filters = []filter.Filter{filter.NewNoOp(r.encoders)} if l != 0 { r.cfg.Logger.Log(logger.Debug, "setting up filters", "filters", r.cfg.Filters) r.filters = make([]filter.Filter, l) dst := r.encoders for i := l - 1; i >= 0; i-- { switch r.cfg.Filters[i] { case config.FilterNoOp: r.cfg.Logger.Log(logger.Debug, "using NoOp filter") r.filters[i] = filter.NewNoOp(dst) case config.FilterMOG: r.cfg.Logger.Log(logger.Debug, "using MOG filter") r.filters[i] = filter.NewMOG(dst, r.cfg) case config.FilterVariableFPS: r.cfg.Logger.Log(logger.Debug, "using Variable FPS MOG filter") r.filters[i] = filter.NewVariableFPS(dst, r.cfg.MinFPS, filter.NewMOG(dst, r.cfg)) case config.FilterKNN: r.cfg.Logger.Log(logger.Debug, "using KNN filter") r.filters[i] = filter.NewKNN(dst, r.cfg) case config.FilterDiff: r.cfg.Logger.Log(logger.Debug, "using gocv difference filter") r.filters[i] = filter.NewDiff(dst, r.cfg) case config.FilterBasic: r.cfg.Logger.Log(logger.Debug, "using go difference filter") r.filters[i] = filter.NewBasic(dst, r.cfg) default: panic("Undefined Filter") } dst = r.filters[i] } r.cfg.Logger.Log(logger.Info, "filters set up") } switch r.cfg.Input { case config.InputRaspivid: r.cfg.Logger.Log(logger.Debug, "using raspivid input") r.input = raspivid.New(r.cfg.Logger) r.setLexer(r.cfg.InputCodec, false) case config.InputV4L: r.cfg.Logger.Log(logger.Debug, "using V4L input") r.input = webcam.New(r.cfg.Logger) r.setLexer(r.cfg.InputCodec, false) case config.InputFile: r.cfg.Logger.Log(logger.Debug, "using file input") r.input = file.New() r.setLexer(r.cfg.InputCodec, false) case config.InputRTSP: r.cfg.Logger.Log(logger.Debug, "using RTSP input") r.input = geovision.New(r.cfg.Logger) r.setLexer(r.cfg.InputCodec, true) case config.InputAudio: r.cfg.Logger.Log(logger.Debug, "using audio input") err := r.setupAudio() if err != nil { return err } } // Configure the input device. We know that defaults are set, so no need to // return error, but we should log. r.cfg.Logger.Log(logger.Debug, "configuring input device") err := r.input.Set(r.cfg) if err != nil { r.cfg.Logger.Log(logger.Warning, "errors from configuring input device", "errors", err) } r.cfg.Logger.Log(logger.Info, "input device configured") return nil } // setLexer sets the revid input lexer based on input codec and whether input // is RTSP or not, in which case an RTP/<codec> extractor is used. func (r *Revid) setLexer(c uint8, isRTSP bool) { switch c { case codecutil.H264: r.cfg.Logger.Log(logger.Debug, "using H.264 codec") r.lexTo = h264.Lex if isRTSP { r.lexTo = h264.NewExtractor().Extract } case codecutil.H265: r.cfg.Logger.Log(logger.Debug, "using H.265 codec") r.lexTo = h265.NewExtractor(false).Extract if !isRTSP { panic("byte stream H.265 lexing not implemented") } case codecutil.MJPEG: r.cfg.Logger.Log(logger.Debug, "using MJPEG codec") r.lexTo = mjpeg.Lex if isRTSP { r.lexTo = mjpeg.NewExtractor().Extract } default: panic("unrecognised codec") } } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. // // Start is safe for concurrent use. func (r *Revid) Start() error { if r.IsRunning() { r.cfg.Logger.Log(logger.Warning, "start called, but revid already running") return nil } r.mu.Lock() defer r.mu.Unlock() r.stop = make(chan struct{}) r.cfg.Logger.Log(logger.Debug, "resetting revid") err := r.reset(r.cfg) if err != nil { r.Stop() return err } r.cfg.Logger.Log(logger.Info, "revid reset") // Calculate delay between frames based on FileFPS. d := time.Duration(0) if r.cfg.FileFPS != 0 { d = time.Duration(1000/r.cfg.FileFPS) * time.Millisecond } r.cfg.Logger.Log(logger.Debug, "starting input processing routine") r.wg.Add(1) go r.processFrom(r.input, d) r.running = true return nil } // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. // // Stop is safe for concurrent use. func (r *Revid) Stop() { if !r.IsRunning() { r.cfg.Logger.Log(logger.Warning, "stop called but revid isn't running") return } r.mu.Lock() defer r.mu.Unlock() close(r.stop) r.cfg.Logger.Log(logger.Debug, "stopping input") err := r.input.Stop() if err != nil { r.cfg.Logger.Log(logger.Error, "could not stop input", "error", err.Error()) } else { r.cfg.Logger.Log(logger.Info, "input stopped") } r.cfg.Logger.Log(logger.Debug, "closing pipeline") err = r.encoders.Close() if err != nil { r.cfg.Logger.Log(logger.Error, "failed to close pipeline", "error", err.Error()) } else { r.cfg.Logger.Log(logger.Info, "pipeline closed") } for _, filter := range r.filters { err = filter.Close() if err != nil { r.cfg.Logger.Log(logger.Error, "failed to close filters", "error", err.Error()) } else { r.cfg.Logger.Log(logger.Info, "filters closed") } } r.cfg.Logger.Log(logger.Debug, "waiting for routines to finish") r.wg.Wait() r.cfg.Logger.Log(logger.Info, "routines finished") r.running = false } // Burst starts revid, waits for time specified, and then stops revid. func (r *Revid) Burst() error { r.cfg.Logger.Log(logger.Debug, "starting revid") err := r.Start() if err != nil { return fmt.Errorf("could not start revid: %w", err) } r.cfg.Logger.Log(logger.Info, "revid started") dur := time.Duration(r.cfg.BurstPeriod) * time.Second time.Sleep(dur) r.cfg.Logger.Log(logger.Debug, "stopping revid") r.Stop() r.cfg.Logger.Log(logger.Info, "revid stopped") return nil } func (r *Revid) IsRunning() bool { r.mu.Lock() defer r.mu.Unlock() return r.running } // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. // // Update is safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { r.cfg.Logger.Log(logger.Debug, "revid running; stopping for re-config") r.Stop() r.cfg.Logger.Log(logger.Info, "revid was running; stopped for re-config") } r.mu.Lock() defer r.mu.Unlock() //look through the vars and update revid where needed r.cfg.Logger.Log(logger.Debug, "checking vars from server", "vars", vars) r.cfg.Update(vars) r.cfg.Logger.Log(logger.Info, "finished reconfig") r.cfg.Logger.Log(logger.Debug, "config changed", "config", r.cfg) return nil } // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { defer r.wg.Done() for l := true; l; l = r.cfg.Loop { err := in.Start() if err != nil { r.err <- fmt.Errorf("could not start input device: %w", err) return } // Lex data from input device, in, until finished or an error is encountered. // For a continuous source e.g. a camera or microphone, we should remain // in this call indefinitely unless in.Stop() is called and an io.EOF is forced. r.cfg.Logger.Log(logger.Debug, "lexing") err = r.lexTo(r.filters[0], in, delay) switch err { case nil, io.EOF: r.cfg.Logger.Log(logger.Info, "end of file") case io.ErrUnexpectedEOF: r.cfg.Logger.Log(logger.Info, "unexpected EOF from input") default: r.err <- err } r.cfg.Logger.Log(logger.Info, "finished reading input") r.cfg.Logger.Log(logger.Debug, "stopping input") err = in.Stop() if err != nil { r.err <- fmt.Errorf("could not stop input source: %w", err) } else { r.cfg.Logger.Log(logger.Info, "input stopped") } // If we're looping and we get a stop signal we return. select { case <-r.stop: return default: } } }