Merged in improve-logging (pull request #389)

Improve debug and info logging quality and frequency

* cmd/rv: added some more debug and info logging

* revid: improving logging in revid.go

* revid: improved logging in senders.go

* revid: don't need to set log level as netsender does it

* added some extract log messages

Approved-by: Trek Hopton <trek.hopton@gmail.com>
This commit is contained in:
Saxon Milton 2020-03-27 12:20:51 +00:00
parent fc3c89231d
commit 0fa50d1a46
3 changed files with 145 additions and 67 deletions

View File

@ -129,20 +129,24 @@ func main() {
if canProfile {
profile(log)
defer pprof.StopCPUProfile()
log.Log(logger.Info, "profiling started")
}
var rv *revid.Revid
log.Log(logger.Debug, "initialising netsender client")
ns, err := netsender.New(log, nil, readPin(rv), nil, config.TypeData)
if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise netsender client: "+err.Error())
}
log.Log(logger.Debug, "initialising revid")
rv, err = revid.New(config.Config{Logger: log}, ns)
if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
}
log.Log(logger.Debug, "beginning main loop")
run(rv, ns, log, netLog)
}
@ -151,6 +155,7 @@ func main() {
func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger.Logger) {
var vs int
for {
l.Log(logger.Debug, "running netsender")
err := ns.Run()
if err != nil {
l.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error())
@ -158,38 +163,47 @@ func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger.
continue
}
l.Log(logger.Debug, "sending logs")
err = nl.Send(ns)
if err != nil {
l.Log(logger.Warning, pkg+"Logs could not be sent", "error", err.Error())
}
// If var sum hasn't changed we skip rest of loop.
l.Log(logger.Debug, "checking varsum")
newVs := ns.VarSum()
if vs == newVs {
sleep(ns, l)
continue
}
vs = newVs
l.Log(logger.Info, "varsum changed", "vs", vs)
l.Log(logger.Debug, "getting new vars")
vars, err := ns.Vars()
if err != nil {
l.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Log(logger.Debug, "got new vars", "vars", vars)
// Configure revid based on the vars.
l.Log(logger.Debug, "updating revid's configuration")
err = rv.Update(vars)
if err != nil {
l.Log(logger.Warning, pkg+"Couldn't update revid", "error", err.Error())
l.Log(logger.Warning, pkg+"couldn't update revid", "error", err.Error())
sleep(ns, l)
continue
}
l.Log(logger.Info, "revid successfully reconfigured")
l.Log(logger.Debug, "checking mode")
switch ns.Mode() {
case modePaused:
l.Log(logger.Debug, "mode is Paused, stopping revid")
rv.Stop()
case modeNormal, modeLoop:
l.Log(logger.Debug, "mode is Normal or Loop, starting revid")
err = rv.Start()
if err != nil {
l.Log(logger.Error, pkg+"could not start revid", "error", err.Error())
@ -198,6 +212,7 @@ func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger.
continue
}
case modeBurst:
l.Log(logger.Debug, "mode is Burst, bursting revid")
err = rv.Burst()
if err != nil {
l.Log(logger.Warning, pkg+"could not start burst", "error", err.Error())
@ -207,6 +222,7 @@ func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger.
}
ns.SetMode(modePaused, &vs)
}
l.Log(logger.Info, "revid updated with new mode")
sleep(ns, l)
}
@ -227,12 +243,14 @@ func profile(l *logger.Logger) {
// sleep uses a delay to halt the program based on the monitoring period
// netsender parameter (mp) defined in the netsender.conf config.
func sleep(ns *netsender.Sender, l *logger.Logger) {
l.Log(logger.Debug, "sleeping")
t, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
l.Log(logger.Error, pkg+"could not get sleep time, using default", "error", err)
t = defaultSleepTime
}
time.Sleep(time.Duration(t) * time.Second)
l.Log(logger.Debug, "finished sleeping")
}
// readPin provides a callback function of consistent signature for use by

View File

@ -154,13 +154,14 @@ func (r *Revid) Bitrate() int {
// configuration; checking validity and returning errors if not valid. It then
// sets up the data pipeline accordingly to this configuration.
func (r *Revid) reset(c config.Config) error {
r.cfg.Logger.Log(logger.Debug, "setting config")
err := r.setConfig(c)
if err != nil {
return err
}
r.cfg.Logger.Log(logger.Info, "config set")
r.cfg.Logger.SetLevel(c.LogLevel)
r.cfg.Logger.Log(logger.Debug, "setting up revid pipeline")
err = r.setupPipeline(
func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) {
var st int
@ -215,6 +216,7 @@ func (r *Revid) reset(c config.Config) error {
},
ioext.MultiWriteCloser,
)
r.cfg.Logger.Log(logger.Info, "finished setting pipeline")
if err != nil {
return err
@ -227,10 +229,12 @@ func (r *Revid) reset(c config.Config) error {
// revid config.
func (r *Revid) setConfig(config config.Config) error {
r.cfg.Logger = config.Logger
r.cfg.Logger.Log(logger.Debug, "validating config")
err := config.Validate()
if err != nil {
return errors.New("Config struct is bad: " + err.Error())
}
r.cfg.Logger.Log(logger.Info, "config validated")
r.cfg = config
return nil
}
@ -257,6 +261,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
for _, out := range r.cfg.Outputs {
switch out {
case config.OutputHTTP:
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
if err != nil {
return fmt.Errorf("could not initialise MTS ring buffer: %w", err)
@ -269,18 +274,21 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
)
mtsSenders = append(mtsSenders, w)
case config.OutputRTP:
r.cfg.Logger.Log(logger.Debug, "using RTP output")
w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error())
}
mtsSenders = append(mtsSenders, w)
case config.OutputFile:
r.cfg.Logger.Log(logger.Debug, "using File output")
w, err := newFileSender(r.cfg.OutputPath)
if err != nil {
return err
}
mtsSenders = append(mtsSenders, w)
case config.OutputRTMP:
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
rb, err := vring.NewBuffer(r.cfg.RBMaxElements, r.cfg.RBCapacity, time.Duration(r.cfg.RBWriteTimeout)*time.Second)
if err != nil {
return fmt.Errorf("could not initialise RTMP ring buffer: %w", err)
@ -326,73 +334,61 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
l := len(r.cfg.Filters)
r.filters = []filter.Filter{filter.NewNoOp(r.encoders)}
if l != 0 {
r.cfg.Logger.Log(logger.Debug, "setting up filters", "filters", r.cfg.Filters)
r.filters = make([]filter.Filter, l)
dst := r.encoders
for i := l - 1; i >= 0; i-- {
switch r.cfg.Filters[i] {
case config.FilterNoOp:
r.cfg.Logger.Log(logger.Debug, "using NoOp filter")
r.filters[i] = filter.NewNoOp(dst)
case config.FilterMOG:
r.cfg.Logger.Log(logger.Debug, "using MOG filter")
r.filters[i] = filter.NewMOG(dst, r.cfg)
case config.FilterVariableFPS:
r.cfg.Logger.Log(logger.Debug, "using Variable FPS MOG filter")
r.filters[i] = filter.NewVariableFPS(dst, r.cfg.MinFPS, filter.NewMOG(dst, r.cfg))
case config.FilterKNN:
r.cfg.Logger.Log(logger.Debug, "using KNN filter")
r.filters[i] = filter.NewKNN(dst, r.cfg)
case config.FilterDiff:
r.cfg.Logger.Log(logger.Debug, "using gocv difference filter")
r.filters[i] = filter.NewDiff(dst, r.cfg)
case config.FilterBasic:
r.cfg.Logger.Log(logger.Debug, "using go difference filter")
r.filters[i] = filter.NewBasic(dst, r.cfg)
default:
panic("Undefined Filter")
}
dst = r.filters[i]
}
r.cfg.Logger.Log(logger.Info, "filters set up")
}
switch r.cfg.Input {
case config.InputRaspivid:
r.cfg.Logger.Log(logger.Debug, "using raspivid input")
r.input = raspivid.New(r.cfg.Logger)
switch r.cfg.InputCodec {
case codecutil.H264:
r.lexTo = h264.Lex
case codecutil.MJPEG:
r.lexTo = mjpeg.Lex
}
r.setLexer(r.cfg.InputCodec, false)
case config.InputV4L:
r.cfg.Logger.Log(logger.Debug, "using V4L input")
r.input = webcam.New(r.cfg.Logger)
switch r.cfg.InputCodec {
case codecutil.H264:
r.lexTo = h264.Lex
case codecutil.MJPEG:
r.lexTo = mjpeg.Lex
}
r.setLexer(r.cfg.InputCodec, false)
case config.InputFile:
r.cfg.Logger.Log(logger.Debug, "using file input")
r.input = file.New()
switch r.cfg.InputCodec {
case codecutil.H264:
r.lexTo = h264.Lex
case codecutil.MJPEG:
r.lexTo = mjpeg.Lex
}
r.setLexer(r.cfg.InputCodec, false)
case config.InputRTSP:
r.cfg.Logger.Log(logger.Debug, "using RTSP input")
r.input = geovision.New(r.cfg.Logger)
switch r.cfg.InputCodec {
case codecutil.H264:
r.lexTo = h264.NewExtractor().Extract
case codecutil.H265:
r.lexTo = h265.NewLexer(false).Lex
case codecutil.MJPEG:
r.lexTo = mjpeg.NewExtractor().Extract
}
r.setLexer(r.cfg.InputCodec, true)
case config.InputAudio:
r.cfg.Logger.Log(logger.Debug, "using audio input")
err := r.setupAudio()
if err != nil {
return err
@ -401,14 +397,43 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
// Configure the input device. We know that defaults are set, so no need to
// return error, but we should log.
r.cfg.Logger.Log(logger.Debug, "configuring input device")
err := r.input.Set(r.cfg)
if err != nil {
r.cfg.Logger.Log(logger.Warning, pkg+"errors from configuring input device", "errors", err)
}
r.cfg.Logger.Log(logger.Info, "input device configured")
return nil
}
// setLexer sets the revid input lexer based on input codec and whether input
// is RTSP or not, in which case an RTP/<codec> extractor is used.
func (r *Revid) setLexer(c uint8, isRTSP bool) {
switch c {
case codecutil.H264:
r.cfg.Logger.Log(logger.Debug, "using H.264 codec")
r.lexTo = h264.Lex
if isRTSP {
r.lexTo = h264.NewExtractor().Extract
}
case codecutil.H265:
r.cfg.Logger.Log(logger.Debug, "using H.265 codec")
r.lexTo = h265.NewLexer(false).Lex
if !isRTSP {
panic("byte stream H.265 lexing not implemented")
}
case codecutil.MJPEG:
r.cfg.Logger.Log(logger.Debug, "using MJPEG codec")
r.lexTo = mjpeg.Lex
if isRTSP {
r.lexTo = mjpeg.NewExtractor().Extract
}
default:
panic("unrecognised codec")
}
}
// Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
//
@ -422,18 +447,21 @@ func (r *Revid) Start() error {
r.mu.Lock()
defer r.mu.Unlock()
r.cfg.Logger.Log(logger.Info, pkg+"starting Revid")
r.cfg.Logger.Log(logger.Debug, pkg+"resetting revid")
err := r.reset(r.cfg)
if err != nil {
r.Stop()
return err
}
r.cfg.Logger.Log(logger.Info, "revid reset")
// Calculate delay between frames based on FileFPS.
d := time.Duration(0)
if r.cfg.FileFPS != 0 {
d = time.Duration(1000/r.cfg.FileFPS) * time.Millisecond
}
r.cfg.Logger.Log(logger.Debug, "starting input processing routine")
r.wg.Add(1)
go r.processFrom(r.input, d)
@ -454,48 +482,54 @@ func (r *Revid) Stop() {
r.mu.Lock()
defer r.mu.Unlock()
r.cfg.Logger.Log(logger.Debug, "stopping input")
err := r.input.Stop()
if err != nil {
r.cfg.Logger.Log(logger.Error, pkg+"could not stop input", "error", err.Error())
} else {
r.cfg.Logger.Log(logger.Info, "input stopped")
}
r.cfg.Logger.Log(logger.Info, pkg+"closing pid peline")
r.cfg.Logger.Log(logger.Debug, "closing pipeline")
err = r.encoders.Close()
if err != nil {
r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
} else {
r.cfg.Logger.Log(logger.Info, "pipeline closed")
}
for _, filter := range r.filters {
err = filter.Close()
if err != nil {
r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
r.cfg.Logger.Log(logger.Error, pkg+"failed to close filters", "error", err.Error())
} else {
r.cfg.Logger.Log(logger.Info, "filters closed")
}
}
r.cfg.Logger.Log(logger.Info, pkg+"closed pipeline")
r.cfg.Logger.Log(logger.Info, pkg+"waiting for routines to close")
r.cfg.Logger.Log(logger.Debug, "waiting for routines to finish")
r.wg.Wait()
r.cfg.Logger.Log(logger.Info, "routines finished")
r.cfg.Logger.Log(logger.Info, pkg+"revid stopped")
r.running = false
}
// Burst starts revid, waits for time specified, and then stops revid.
func (r *Revid) Burst() error {
r.cfg.Logger.Log(logger.Info, pkg+"starting burst")
r.cfg.Logger.Log(logger.Debug, "starting revid")
err := r.Start()
if err != nil {
return fmt.Errorf("could not start revid: %w", err)
}
r.cfg.Logger.Log(logger.Info, "revid started")
dur := time.Duration(r.cfg.BurstPeriod) * time.Second
time.Sleep(dur)
r.cfg.Logger.Log(logger.Info, pkg+"stopping burst")
r.cfg.Logger.Log(logger.Debug, "stopping revid")
r.Stop()
r.cfg.Logger.Log(logger.Info, "revid stopped")
return nil
}
@ -511,12 +545,15 @@ func (r *Revid) IsRunning() bool {
// Update is safe for concurrent use.
func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() {
r.cfg.Logger.Log(logger.Debug, "revid running; stopping for re-config")
r.Stop()
r.cfg.Logger.Log(logger.Info, "revid was running; stopped for re-config")
}
r.mu.Lock()
defer r.mu.Unlock()
//look through the vars and update revid where needed
r.cfg.Logger.Log(logger.Debug, "checking vars from server", "vars", vars)
for key, value := range vars {
switch key {
case "Input":
@ -863,7 +900,8 @@ func (r *Revid) Update(vars map[string]string) error {
}
}
}
r.cfg.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.cfg))
r.cfg.Logger.Log(logger.Info, "finished reconfig")
r.cfg.Logger.Log(logger.Debug, pkg+"config changed", "config", r.cfg)
return nil
}
@ -882,21 +920,24 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
// Lex data from input device, in, until finished or an error is encountered.
// For a continuous source e.g. a camera or microphone, we should remain
// in this call indefinitely unless in.Stop() is called and an io.EOF is forced.
r.cfg.Logger.Log(logger.Info, pkg+"lexing")
r.cfg.Logger.Log(logger.Debug, pkg+"lexing")
err = r.lexTo(r.filters[0], in, delay)
switch err {
case nil, io.EOF:
r.cfg.Logger.Log(logger.Info, "end of file")
case io.ErrUnexpectedEOF:
r.cfg.Logger.Log(logger.Info, pkg+"unexpected EOF from input")
default:
r.err <- err
}
r.cfg.Logger.Log(logger.Info, "finished reading input")
r.cfg.Logger.Log(logger.Debug, "stopping input")
err = in.Stop()
if err != nil {
r.err <- fmt.Errorf("could not stop input source: %w", err)
} else {
r.cfg.Logger.Log(logger.Info, "input stopped")
}
}
r.cfg.Logger.Log(logger.Info, pkg+"finished lexing")
}

View File

@ -75,9 +75,13 @@ func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...
// Write implements io.Writer.
func (s *httpSender) Write(d []byte) (int, error) {
s.log(logger.Debug, "HTTP sending")
err := httpSend(d, s.client, s.log)
if err == nil {
s.log(logger.Debug, "good send", "len", len(d))
s.report(len(d))
} else {
s.log(logger.Debug, "bad send", "error", err)
}
return len(d), err
}
@ -88,6 +92,7 @@ func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string,
// Only send if "V0" is configured as an input.
send := false
ip := client.Param("ip")
log(logger.Debug, "making pins, and sending recv request", "ip", ip)
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
@ -108,6 +113,7 @@ func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string,
if err != nil {
return err
}
log(logger.Debug, "good request", "reply", reply)
return extractMeta(reply, log)
}
@ -200,7 +206,7 @@ func (s *mtsSender) output() {
for {
select {
case <-s.done:
s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine")
s.log(logger.Info, pkg+"terminating sender output routine")
defer s.wg.Done()
return
default:
@ -212,10 +218,10 @@ func (s *mtsSender) output() {
case nil, io.EOF:
continue
case vring.ErrNextTimeout:
s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout")
s.log(logger.Debug, pkg+"ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error())
s.log(logger.Error, pkg+"unexpected error", "error", err.Error())
continue
}
}
@ -225,11 +231,14 @@ func (s *mtsSender) output() {
elem = nil
continue
}
s.log(logger.Debug, pkg+"mtsSender: writing")
s.log(logger.Debug, pkg+"writing")
_, err = s.dst.Write(elem.Bytes())
if err != nil {
s.log(logger.Debug, "failed write, repairing MTS", "error", err)
s.repairer.Failed()
continue
} else {
s.log(logger.Debug, "good write")
}
elem.Close()
elem = nil
@ -258,7 +267,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
s.ring.Flush()
}
if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf))
s.log(logger.Warning, pkg+"ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf))
}
s.buf = s.buf[:0]
}
@ -267,8 +276,10 @@ func (s *mtsSender) Write(d []byte) (int, error) {
// Close implements io.Closer.
func (s *mtsSender) Close() error {
s.log(logger.Debug, "closing sender output routine")
close(s.done)
s.wg.Wait()
s.log(logger.Info, "sender output routine closed")
return nil
}
@ -293,9 +304,9 @@ func newRtmpSender(url string, timeout uint, retries int, rb *vring.Buffer, log
if err == nil {
break
}
log(logger.Error, err.Error())
log(logger.Error, "dial error", "error", err)
if n < retries-1 {
log(logger.Info, pkg+"retry rtmp connection")
log(logger.Info, pkg+"retrying dial")
}
}
s := &rtmpSender{
@ -319,7 +330,7 @@ func (s *rtmpSender) output() {
for {
select {
case <-s.done:
s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine")
s.log(logger.Info, pkg+"terminating sender output routine")
defer s.wg.Done()
return
default:
@ -331,29 +342,32 @@ func (s *rtmpSender) output() {
case nil, io.EOF:
continue
case vring.ErrNextTimeout:
s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout")
s.log(logger.Debug, pkg+"ring buffer read timeout")
continue
default:
s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error())
s.log(logger.Error, pkg+"unexpected error", "error", err.Error())
continue
}
}
if s.conn == nil {
s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...")
s.log(logger.Warning, pkg+"no rtmp connection, re-dialing")
err := s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
s.log(logger.Warning, pkg+"could not restart connection", "error", err)
continue
}
}
_, err := s.conn.Write(elem.Bytes())
b := elem.Bytes()
s.log(logger.Debug, "writing to conn", "len", len(b))
_, err := s.conn.Write(b)
switch err {
case nil, rtmp.ErrInvalidFlvTag:
s.log(logger.Debug, "good write to conn")
default:
s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error())
s.log(logger.Warning, pkg+"send error, re-dialing", "error", err)
err = s.restart()
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error())
s.log(logger.Warning, pkg+"could not restart connection", "error", err)
}
continue
}
@ -365,12 +379,13 @@ func (s *rtmpSender) output() {
// Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) {
s.log(logger.Debug, "writing to ring buffer")
_, err := s.ring.Write(d)
if err == nil {
s.ring.Flush()
}
if err != nil {
s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error())
s.log(logger.Debug, "good ring buffer write", "len", len(d))
} else {
s.log(logger.Warning, pkg+"ring buffer write error", "error", err.Error())
}
s.report(len(d))
return len(d), nil
@ -380,11 +395,12 @@ func (s *rtmpSender) restart() error {
s.close()
var err error
for n := 0; n < s.retries; n++ {
s.log(logger.Debug, "dialing", "dials", n)
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
if err == nil {
break
}
s.log(logger.Error, err.Error())
s.log(logger.Error, "dial error", "error", err)
if n < s.retries-1 {
s.log(logger.Info, pkg+"retry rtmp connection")
}
@ -393,14 +409,17 @@ func (s *rtmpSender) restart() error {
}
func (s *rtmpSender) Close() error {
s.log(logger.Debug, "closing output routine")
if s.done != nil {
close(s.done)
}
s.wg.Wait()
s.log(logger.Info, "output routine closed")
return s.close()
}
func (s *rtmpSender) close() error {
s.log(logger.Debug, "closing connection")
if s.conn == nil {
return nil
}