av/cmd/vidforward/notifier.go

188 lines
5.7 KiB
Go

/*
DESCRIPTION
notifier.go provides a tool for notifying a systemd watchdog under healthy
operation of the vidforward service.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"os"
"os/signal"
"sync"
"syscall"
"time"
"bitbucket.org/ausocean/utils/logging"
"github.com/coreos/go-systemd/daemon"
)
// By default we assume we should be notifying a systemd watchdog. This can be
// toggled off by using the nowatchdog build tag (see nowatchdog.go file).
var notifyWatchdog = true
// watchdogNotifier keeps track of the watchdog interval from the external
// sysd service settings, the currently active request handlers and a curId
// field that is is incremented to generate new handler ids for storage.
type watchdogNotifier struct {
watchdogInterval time.Duration
activeHandlers map[int]handlerInfo
curId int
termCallback func()
log logging.Logger
mu sync.Mutex
haveRun bool
}
// handlerInfo keeps track of a handlers name (for any logging purposes) and
// time at which the handler was invoked, which is later used to calculate time
// active and therefore heatlh.
type handlerInfo struct {
name string
time time.Time
}
// newWatchdogNotifier creates a new watchdogNotifier with the provided logger
// and termination callback that is called if a SIGINT or SIGTERM signal is
// received. Recommended use of this is an attempted state save.
func newWatchdogNotifier(l logging.Logger, termCallback func()) (*watchdogNotifier, error) {
interval := 1 * time.Minute
return &watchdogNotifier{
activeHandlers: make(map[int]handlerInfo),
watchdogInterval: interval,
log: l,
termCallback: termCallback,
}, nil
}
// notify is to be called as a routine. This is responsible for checking if the
// handlers are healthy and then notifying the watchdog if so, otherwise we
// wait, continue and check again until they are. If the handlers take too long to
// become healthy, we risk exceeding the watchdog interval causing a process restart.
// notify also starts a routine to monitor for any SIGINT or SIGTERM, upon which
// a callback that's provided at initialisation is called.
func (n *watchdogNotifier) notify() {
notifyTicker := time.NewTicker(n.watchdogInterval / 2.0)
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
n.log.Warning("received termination signal, calling termination callback", "signal", sig.String())
n.termCallback()
}()
for {
if n.handlersUnhealthy() {
const unhealthyHandlerWait = 1 * time.Second
time.Sleep(unhealthyHandlerWait)
continue
}
<-notifyTicker.C
if !notifyWatchdog {
continue
}
if !n.haveRun {
n.haveRun = true
const clearEnvVars = false
ok, err := daemon.SdNotify(clearEnvVars, daemon.SdNotifyReady)
if err != nil {
n.log.Fatal("unexpected watchog notify read error", "error", err)
}
if !ok {
n.log.Fatal("watchdog notification not supported")
}
n.watchdogInterval, err = daemon.SdWatchdogEnabled(clearEnvVars)
if err != nil {
n.log.Fatal("unexpected watchdog error", "error", err)
}
if n.watchdogInterval == 0 {
n.log.Fatal("Watchdog not enabled or this is the wrong PID")
}
}
// If this fails for any reason it indicates a systemd service configuration
// issue, and therefore programmer error, so do fatal log to cause crash.
n.log.Debug("notifying watchdog")
supported, err := daemon.SdNotify(false, daemon.SdNotifyWatchdog)
if err != nil {
n.log.Fatal("error from systemd watchdog notify", "error", err)
}
if !supported {
n.log.Fatal("watchdog notification not supported")
}
}
}
// handlersUnhealthy returns true if it is detected that any handlers are unhealthy,
// that is, if they have been handling for longer than the unhealthyHandleDuration.
func (n *watchdogNotifier) handlersUnhealthy() bool {
n.mu.Lock()
defer n.mu.Unlock()
for _, info := range n.activeHandlers {
const unhealthyHandleDuration = 30 * time.Second
if time.Now().Sub(info.time) > unhealthyHandleDuration {
n.log.Warning("handler unhealthy", "name", info.name)
return true
}
}
return false
}
// handlerInvoked is to be called at the start of a request handler to indicate
// that handling has begun. The name and start time of the handler is recorded
// in the active handlers map with a unique ID as the key. A function is returned
// that must be called at exit of the handler to indicate that handling has
// finished. It is recommended this be done using a defer statement immediately
// after receiveing it.
func (n *watchdogNotifier) handlerInvoked(name string) func() {
n.mu.Lock()
defer n.mu.Unlock()
n.log.Info("handler invoked", "name", name)
id := n.curId
n.curId++
n.activeHandlers[id] = handlerInfo{time: time.Now(), name: name}
return func() {
n.mu.Lock()
defer n.mu.Unlock()
n.log.Info("handler done", "name", name)
if _, ok := n.activeHandlers[id]; !ok {
n.log.Fatal("handler id not in map", "name", name)
}
delete(n.activeHandlers, id)
}
}