diff --git a/cmd/rv/main.go b/cmd/rv/main.go index a93a1fb4..a3b60d3a 100644 --- a/cmd/rv/main.go +++ b/cmd/rv/main.go @@ -129,20 +129,24 @@ func main() { if canProfile { profile(log) defer pprof.StopCPUProfile() + log.Log(logger.Info, "profiling started") } var rv *revid.Revid + log.Log(logger.Debug, "initialising netsender client") ns, err := netsender.New(log, nil, readPin(rv), nil, config.TypeData) if err != nil { log.Log(logger.Fatal, pkg+"could not initialise netsender client: "+err.Error()) } + log.Log(logger.Debug, "initialising revid") rv, err = revid.New(config.Config{Logger: log}, ns) if err != nil { log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } + log.Log(logger.Debug, "beginning main loop") run(rv, ns, log, netLog) } @@ -151,6 +155,7 @@ func main() { func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger.Logger) { var vs int for { + l.Log(logger.Debug, "running netsender") err := ns.Run() if err != nil { l.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error()) @@ -158,38 +163,47 @@ func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger. continue } + l.Log(logger.Debug, "sending logs") err = nl.Send(ns) if err != nil { l.Log(logger.Warning, pkg+"Logs could not be sent", "error", err.Error()) } - // If var sum hasn't changed we skip rest of loop. + l.Log(logger.Debug, "checking varsum") newVs := ns.VarSum() if vs == newVs { sleep(ns, l) continue } vs = newVs + l.Log(logger.Info, "varsum changed", "vs", vs) + l.Log(logger.Debug, "getting new vars") vars, err := ns.Vars() if err != nil { l.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) time.Sleep(netSendRetryTime) continue } + l.Log(logger.Debug, "got new vars", "vars", vars) // Configure revid based on the vars. + l.Log(logger.Debug, "updating revid's configuration") err = rv.Update(vars) if err != nil { - l.Log(logger.Warning, pkg+"Couldn't update revid", "error", err.Error()) + l.Log(logger.Warning, pkg+"couldn't update revid", "error", err.Error()) sleep(ns, l) continue } + l.Log(logger.Info, "revid successfully reconfigured") + l.Log(logger.Debug, "checking mode") switch ns.Mode() { case modePaused: + l.Log(logger.Debug, "mode is Paused, stopping revid") rv.Stop() case modeNormal, modeLoop: + l.Log(logger.Debug, "mode is Normal or Loop, starting revid") err = rv.Start() if err != nil { l.Log(logger.Error, pkg+"could not start revid", "error", err.Error()) @@ -198,6 +212,7 @@ func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger. continue } case modeBurst: + l.Log(logger.Debug, "mode is Burst, bursting revid") err = rv.Burst() if err != nil { l.Log(logger.Warning, pkg+"could not start burst", "error", err.Error()) @@ -207,6 +222,7 @@ func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger. } ns.SetMode(modePaused, &vs) } + l.Log(logger.Info, "revid updated with new mode") sleep(ns, l) } @@ -227,12 +243,14 @@ func profile(l *logger.Logger) { // sleep uses a delay to halt the program based on the monitoring period // netsender parameter (mp) defined in the netsender.conf config. func sleep(ns *netsender.Sender, l *logger.Logger) { + l.Log(logger.Debug, "sleeping") t, err := strconv.Atoi(ns.Param("mp")) if err != nil { l.Log(logger.Error, pkg+"could not get sleep time, using default", "error", err) t = defaultSleepTime } time.Sleep(time.Duration(t) * time.Second) + l.Log(logger.Debug, "finished sleeping") } // readPin provides a callback function of consistent signature for use by diff --git a/revid/revid.go b/revid/revid.go index 69646547..5703bf61 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -154,13 +154,14 @@ func (r *Revid) Bitrate() int { // configuration; checking validity and returning errors if not valid. It then // sets up the data pipeline accordingly to this configuration. func (r *Revid) reset(c config.Config) error { + r.cfg.Logger.Log(logger.Debug, "setting config") err := r.setConfig(c) if err != nil { return err } + r.cfg.Logger.Log(logger.Info, "config set") - r.cfg.Logger.SetLevel(c.LogLevel) - + r.cfg.Logger.Log(logger.Debug, "setting up revid pipeline") err = r.setupPipeline( func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { var st int @@ -215,6 +216,7 @@ func (r *Revid) reset(c config.Config) error { }, ioext.MultiWriteCloser, ) + r.cfg.Logger.Log(logger.Info, "finished setting pipeline") if err != nil { return err @@ -227,10 +229,12 @@ func (r *Revid) reset(c config.Config) error { // revid config. func (r *Revid) setConfig(config config.Config) error { r.cfg.Logger = config.Logger + r.cfg.Logger.Log(logger.Debug, "validating config") err := config.Validate() if err != nil { return errors.New("Config struct is bad: " + err.Error()) } + r.cfg.Logger.Log(logger.Info, "config validated") r.cfg = config return nil } @@ -257,6 +261,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. for _, out := range r.cfg.Outputs { switch out { case config.OutputHTTP: + r.cfg.Logger.Log(logger.Debug, "using HTTP output") rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second) if err != nil { return fmt.Errorf("could not initialise MTS ring buffer: %w", err) @@ -269,18 +274,21 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. ) mtsSenders = append(mtsSenders, w) case config.OutputRTP: + r.cfg.Logger.Log(logger.Debug, "using RTP output") w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } mtsSenders = append(mtsSenders, w) case config.OutputFile: + r.cfg.Logger.Log(logger.Debug, "using File output") w, err := newFileSender(r.cfg.OutputPath) if err != nil { return err } mtsSenders = append(mtsSenders, w) case config.OutputRTMP: + r.cfg.Logger.Log(logger.Debug, "using RTMP output") rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second) if err != nil { return fmt.Errorf("could not initialise RTMP ring buffer: %w", err) @@ -326,73 +334,61 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. l := len(r.cfg.Filters) r.filters = []filter.Filter{filter.NewNoOp(r.encoders)} if l != 0 { + r.cfg.Logger.Log(logger.Debug, "setting up filters", "filters", r.cfg.Filters) r.filters = make([]filter.Filter, l) dst := r.encoders for i := l - 1; i >= 0; i-- { switch r.cfg.Filters[i] { case config.FilterNoOp: + r.cfg.Logger.Log(logger.Debug, "using NoOp filter") r.filters[i] = filter.NewNoOp(dst) case config.FilterMOG: + r.cfg.Logger.Log(logger.Debug, "using MOG filter") r.filters[i] = filter.NewMOG(dst, r.cfg) case config.FilterVariableFPS: + r.cfg.Logger.Log(logger.Debug, "using Variable FPS MOG filter") r.filters[i] = filter.NewVariableFPS(dst, r.cfg.MinFPS, filter.NewMOG(dst, r.cfg)) case config.FilterKNN: + r.cfg.Logger.Log(logger.Debug, "using KNN filter") r.filters[i] = filter.NewKNN(dst, r.cfg) case config.FilterDiff: + r.cfg.Logger.Log(logger.Debug, "using gocv difference filter") r.filters[i] = filter.NewDiff(dst, r.cfg) case config.FilterBasic: + r.cfg.Logger.Log(logger.Debug, "using go difference filter") r.filters[i] = filter.NewBasic(dst, r.cfg) default: panic("Undefined Filter") } dst = r.filters[i] } - + r.cfg.Logger.Log(logger.Info, "filters set up") } switch r.cfg.Input { case config.InputRaspivid: + r.cfg.Logger.Log(logger.Debug, "using raspivid input") r.input = raspivid.New(r.cfg.Logger) - - switch r.cfg.InputCodec { - case codecutil.H264: - r.lexTo = h264.Lex - case codecutil.MJPEG: - r.lexTo = mjpeg.Lex - } + r.setLexer(r.cfg.InputCodec, false) case config.InputV4L: + r.cfg.Logger.Log(logger.Debug, "using V4L input") r.input = webcam.New(r.cfg.Logger) - switch r.cfg.InputCodec { - case codecutil.H264: - r.lexTo = h264.Lex - case codecutil.MJPEG: - r.lexTo = mjpeg.Lex - } + r.setLexer(r.cfg.InputCodec, false) case config.InputFile: + r.cfg.Logger.Log(logger.Debug, "using file input") r.input = file.New() - switch r.cfg.InputCodec { - case codecutil.H264: - r.lexTo = h264.Lex - case codecutil.MJPEG: - r.lexTo = mjpeg.Lex - } + r.setLexer(r.cfg.InputCodec, false) case config.InputRTSP: + r.cfg.Logger.Log(logger.Debug, "using RTSP input") r.input = geovision.New(r.cfg.Logger) - - switch r.cfg.InputCodec { - case codecutil.H264: - r.lexTo = h264.NewExtractor().Extract - case codecutil.H265: - r.lexTo = h265.NewLexer(false).Lex - case codecutil.MJPEG: - r.lexTo = mjpeg.NewExtractor().Extract - } + r.setLexer(r.cfg.InputCodec, true) case config.InputAudio: + r.cfg.Logger.Log(logger.Debug, "using audio input") err := r.setupAudio() if err != nil { return err @@ -401,14 +397,43 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. // Configure the input device. We know that defaults are set, so no need to // return error, but we should log. + r.cfg.Logger.Log(logger.Debug, "configuring input device") err := r.input.Set(r.cfg) if err != nil { r.cfg.Logger.Log(logger.Warning, pkg+"errors from configuring input device", "errors", err) } + r.cfg.Logger.Log(logger.Info, "input device configured") return nil } +// setLexer sets the revid input lexer based on input codec and whether input +// is RTSP or not, in which case an RTP/ extractor is used. +func (r *Revid) setLexer(c uint8, isRTSP bool) { + switch c { + case codecutil.H264: + r.cfg.Logger.Log(logger.Debug, "using H.264 codec") + r.lexTo = h264.Lex + if isRTSP { + r.lexTo = h264.NewExtractor().Extract + } + case codecutil.H265: + r.cfg.Logger.Log(logger.Debug, "using H.265 codec") + r.lexTo = h265.NewLexer(false).Lex + if !isRTSP { + panic("byte stream H.265 lexing not implemented") + } + case codecutil.MJPEG: + r.cfg.Logger.Log(logger.Debug, "using MJPEG codec") + r.lexTo = mjpeg.Lex + if isRTSP { + r.lexTo = mjpeg.NewExtractor().Extract + } + default: + panic("unrecognised codec") + } +} + // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. // @@ -422,18 +447,21 @@ func (r *Revid) Start() error { r.mu.Lock() defer r.mu.Unlock() - r.cfg.Logger.Log(logger.Info, pkg+"starting Revid") + r.cfg.Logger.Log(logger.Debug, pkg+"resetting revid") err := r.reset(r.cfg) if err != nil { r.Stop() return err } + r.cfg.Logger.Log(logger.Info, "revid reset") // Calculate delay between frames based on FileFPS. d := time.Duration(0) if r.cfg.FileFPS != 0 { d = time.Duration(1000/r.cfg.FileFPS) * time.Millisecond } + + r.cfg.Logger.Log(logger.Debug, "starting input processing routine") r.wg.Add(1) go r.processFrom(r.input, d) @@ -454,48 +482,54 @@ func (r *Revid) Stop() { r.mu.Lock() defer r.mu.Unlock() + r.cfg.Logger.Log(logger.Debug, "stopping input") err := r.input.Stop() if err != nil { r.cfg.Logger.Log(logger.Error, pkg+"could not stop input", "error", err.Error()) + } else { + r.cfg.Logger.Log(logger.Info, "input stopped") } - r.cfg.Logger.Log(logger.Info, pkg+"closing pid peline") + r.cfg.Logger.Log(logger.Debug, "closing pipeline") err = r.encoders.Close() if err != nil { r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) + } else { + r.cfg.Logger.Log(logger.Info, "pipeline closed") } for _, filter := range r.filters { err = filter.Close() if err != nil { - r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) + r.cfg.Logger.Log(logger.Error, pkg+"failed to close filters", "error", err.Error()) + } else { + r.cfg.Logger.Log(logger.Info, "filters closed") } } - r.cfg.Logger.Log(logger.Info, pkg+"closed pipeline") - - r.cfg.Logger.Log(logger.Info, pkg+"waiting for routines to close") - + r.cfg.Logger.Log(logger.Debug, "waiting for routines to finish") r.wg.Wait() + r.cfg.Logger.Log(logger.Info, "routines finished") - r.cfg.Logger.Log(logger.Info, pkg+"revid stopped") r.running = false } // Burst starts revid, waits for time specified, and then stops revid. func (r *Revid) Burst() error { - r.cfg.Logger.Log(logger.Info, pkg+"starting burst") - + r.cfg.Logger.Log(logger.Debug, "starting revid") err := r.Start() if err != nil { return fmt.Errorf("could not start revid: %w", err) } + r.cfg.Logger.Log(logger.Info, "revid started") dur := time.Duration(r.cfg.BurstPeriod) * time.Second time.Sleep(dur) - r.cfg.Logger.Log(logger.Info, pkg+"stopping burst") + r.cfg.Logger.Log(logger.Debug, "stopping revid") r.Stop() + r.cfg.Logger.Log(logger.Info, "revid stopped") + return nil } @@ -511,12 +545,15 @@ func (r *Revid) IsRunning() bool { // Update is safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { + r.cfg.Logger.Log(logger.Debug, "revid running; stopping for re-config") r.Stop() + r.cfg.Logger.Log(logger.Info, "revid was running; stopped for re-config") } r.mu.Lock() defer r.mu.Unlock() //look through the vars and update revid where needed + r.cfg.Logger.Log(logger.Debug, "checking vars from server", "vars", vars) for key, value := range vars { switch key { case "Input": @@ -863,7 +900,8 @@ func (r *Revid) Update(vars map[string]string) error { } } } - r.cfg.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.cfg)) + r.cfg.Logger.Log(logger.Info, "finished reconfig") + r.cfg.Logger.Log(logger.Debug, pkg+"config changed", "config", r.cfg) return nil } @@ -882,21 +920,24 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { // Lex data from input device, in, until finished or an error is encountered. // For a continuous source e.g. a camera or microphone, we should remain // in this call indefinitely unless in.Stop() is called and an io.EOF is forced. - r.cfg.Logger.Log(logger.Info, pkg+"lexing") + r.cfg.Logger.Log(logger.Debug, pkg+"lexing") err = r.lexTo(r.filters[0], in, delay) switch err { case nil, io.EOF: + r.cfg.Logger.Log(logger.Info, "end of file") case io.ErrUnexpectedEOF: r.cfg.Logger.Log(logger.Info, pkg+"unexpected EOF from input") default: r.err <- err } + r.cfg.Logger.Log(logger.Info, "finished reading input") + r.cfg.Logger.Log(logger.Debug, "stopping input") err = in.Stop() if err != nil { r.err <- fmt.Errorf("could not stop input source: %w", err) + } else { + r.cfg.Logger.Log(logger.Info, "input stopped") } } - - r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") } diff --git a/revid/senders.go b/revid/senders.go index 4929bd40..96d13a8d 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -75,9 +75,13 @@ func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ... // Write implements io.Writer. func (s *httpSender) Write(d []byte) (int, error) { + s.log(logger.Debug, "HTTP sending") err := httpSend(d, s.client, s.log) if err == nil { + s.log(logger.Debug, "good send", "len", len(d)) s.report(len(d)) + } else { + s.log(logger.Debug, "bad send", "error", err) } return len(d), err } @@ -88,6 +92,7 @@ func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, // Only send if "V0" is configured as an input. send := false ip := client.Param("ip") + log(logger.Debug, "making pins, and sending recv request", "ip", ip) pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { @@ -108,6 +113,7 @@ func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, if err != nil { return err } + log(logger.Debug, "good request", "reply", reply) return extractMeta(reply, log) } @@ -200,7 +206,7 @@ func (s *mtsSender) output() { for { select { case <-s.done: - s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine") + s.log(logger.Info, pkg+"terminating sender output routine") defer s.wg.Done() return default: @@ -212,10 +218,10 @@ func (s *mtsSender) output() { case nil, io.EOF: continue case vring.ErrNextTimeout: - s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") + s.log(logger.Debug, pkg+"ring buffer read timeout") continue default: - s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) + s.log(logger.Error, pkg+"unexpected error", "error", err.Error()) continue } } @@ -225,11 +231,14 @@ func (s *mtsSender) output() { elem = nil continue } - s.log(logger.Debug, pkg+"mtsSender: writing") + s.log(logger.Debug, pkg+"writing") _, err = s.dst.Write(elem.Bytes()) if err != nil { + s.log(logger.Debug, "failed write, repairing MTS", "error", err) s.repairer.Failed() continue + } else { + s.log(logger.Debug, "good write") } elem.Close() elem = nil @@ -258,7 +267,7 @@ func (s *mtsSender) Write(d []byte) (int, error) { s.ring.Flush() } if err != nil { - s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf)) + s.log(logger.Warning, pkg+"ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf)) } s.buf = s.buf[:0] } @@ -267,8 +276,10 @@ func (s *mtsSender) Write(d []byte) (int, error) { // Close implements io.Closer. func (s *mtsSender) Close() error { + s.log(logger.Debug, "closing sender output routine") close(s.done) s.wg.Wait() + s.log(logger.Info, "sender output routine closed") return nil } @@ -293,9 +304,9 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log if err == nil { break } - log(logger.Error, err.Error()) + log(logger.Error, "dial error", "error", err) if n < retries-1 { - log(logger.Info, pkg+"retry rtmp connection") + log(logger.Info, pkg+"retrying dial") } } s := &rtmpSender{ @@ -319,7 +330,7 @@ func (s *rtmpSender) output() { for { select { case <-s.done: - s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine") + s.log(logger.Info, pkg+"terminating sender output routine") defer s.wg.Done() return default: @@ -331,29 +342,32 @@ func (s *rtmpSender) output() { case nil, io.EOF: continue case vring.ErrNextTimeout: - s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") + s.log(logger.Debug, pkg+"ring buffer read timeout") continue default: - s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error()) + s.log(logger.Error, pkg+"unexpected error", "error", err.Error()) continue } } if s.conn == nil { - s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...") + s.log(logger.Warning, pkg+"no rtmp connection, re-dialing") err := s.restart() if err != nil { - s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + s.log(logger.Warning, pkg+"could not restart connection", "error", err) continue } } - _, err := s.conn.Write(elem.Bytes()) + b := elem.Bytes() + s.log(logger.Debug, "writing to conn", "len", len(b)) + _, err := s.conn.Write(b) switch err { case nil, rtmp.ErrInvalidFlvTag: + s.log(logger.Debug, "good write to conn") default: - s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) + s.log(logger.Warning, pkg+"send error, re-dialing", "error", err) err = s.restart() if err != nil { - s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) + s.log(logger.Warning, pkg+"could not restart connection", "error", err) } continue } @@ -365,12 +379,13 @@ func (s *rtmpSender) output() { // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { + s.log(logger.Debug, "writing to ring buffer") _, err := s.ring.Write(d) if err == nil { s.ring.Flush() - } - if err != nil { - s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) + s.log(logger.Debug, "good ring buffer write", "len", len(d)) + } else { + s.log(logger.Warning, pkg+"ring buffer write error", "error", err.Error()) } s.report(len(d)) return len(d), nil @@ -380,11 +395,12 @@ func (s *rtmpSender) restart() error { s.close() var err error for n := 0; n < s.retries; n++ { + s.log(logger.Debug, "dialing", "dials", n) s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) if err == nil { break } - s.log(logger.Error, err.Error()) + s.log(logger.Error, "dial error", "error", err) if n < s.retries-1 { s.log(logger.Info, pkg+"retry rtmp connection") } @@ -393,14 +409,17 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) Close() error { + s.log(logger.Debug, "closing output routine") if s.done != nil { close(s.done) } s.wg.Wait() + s.log(logger.Info, "output routine closed") return s.close() } func (s *rtmpSender) close() error { + s.log(logger.Debug, "closing connection") if s.conn == nil { return nil }