diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 98107647..871dceaf 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -86,20 +86,45 @@ type broadcastManager struct { broadcasts map[MAC]Broadcast slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. log logging.Logger + dogNotifier *watchdogNotifier mu sync.Mutex } // newBroadcastManager returns a new broadcastManager with the provided logger. -func newBroadcastManager(l logging.Logger) *broadcastManager { - return &broadcastManager{log: l, broadcasts: make(map[MAC]Broadcast), slateExitSignals: make(map[MAC]chan struct{})} +func newBroadcastManager(l logging.Logger) (*broadcastManager, error) { + m := &broadcastManager{ + log: l, + broadcasts: make(map[MAC]Broadcast), + slateExitSignals: make(map[MAC]chan struct{}), + } + notifier, err := newWatchdogNotifier(l, func() { + err := m.save() + if err != nil { + m.log.Error("could not save on notifier termination signal", "error", err) + return + } + m.log.Info("successfully saved broadcast manager state on termination signal") + }) + m.dogNotifier = notifier + if err != nil { + return nil, err + } + return m, nil } +// save is currently just a stub, but will eventually save the broadcastManager's +// state to file. +func (b *broadcastManager) save() error { return nil } + // recvHandler handles recv requests for video forwarding. The MAC is firstly // checked to ensure it is "active" i.e. should be sending data, and then the // video is extracted from the request body and provided to the revid pipeline // corresponding to said MAC. // Clips of MPEG-TS h264 are the only accepted format and codec. func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { + done := m.dogNotifier.handlerInvoked("recv") + defer done() + m.log.Debug("recv handler") q := r.URL.Query() ma := MAC(q.Get("ma")) @@ -171,6 +196,9 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { // control handles control API requests. func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) { + done := m.dogNotifier.handlerInvoked("control") + defer done() + m.log.Info("control request", "method", r.Method) switch r.Method { case http.MethodPut: @@ -359,8 +387,14 @@ func main() { // lumberjack and netloggers. log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress) - bm := newBroadcastManager(log) - http.HandleFunc("/recv", bm.recv) - http.HandleFunc("/control", bm.control) + m, err := newBroadcastManager(log) + if err != nil { + log.Fatal("could not create new broadcast manager", "error", err) + } + http.HandleFunc("/recv", m.recv) + http.HandleFunc("/control", m.control) + + go m.dogNotifier.notify() + http.ListenAndServe(*host+":"+*port, nil) } diff --git a/cmd/vidforward/notifier.go b/cmd/vidforward/notifier.go new file mode 100644 index 00000000..cb0d87ea --- /dev/null +++ b/cmd/vidforward/notifier.go @@ -0,0 +1,171 @@ +/* +DESCRIPTION + notifier.go provides a tool for notifying a systemd watchdog under healthy + operation of the vidforward service. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package main + +import ( + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "bitbucket.org/ausocean/utils/logging" + "github.com/coreos/go-systemd/daemon" +) + +// By default we assume we should be notifying a systemd watchdog. This can be +// toggled off by using the nowatchdog build tag (see nowatchdog.go file). +var notifyWatchdog = true + +// watchdogNotifier keeps track of the watchdog interval from the external +// sysd service settings, the currently active request handlers and a curId +// field that is is incremented to generate new handler ids for storage. +type watchdogNotifier struct { + watchdogInterval time.Duration + activeHandlers map[int]handlerInfo + curId int + termCallback func() + log logging.Logger + mu sync.Mutex +} + +// handlerInfo keeps track of a handlers name (for any logging purposes) and +// time at which the handler was invoked, which is later used to calculate time +// active and therefore heatlh. +type handlerInfo struct { + name string + time time.Time +} + +// newWatchdogNotifier creates a new watchdogNotifier with the provided logger +// and termination callback that is called if a SIGINT or SIGTERM signal is +// received. Recommended use of this is an attempted state save. +func newWatchdogNotifier(l logging.Logger, termCallback func()) (*watchdogNotifier, error) { + var ( + interval = 1 * time.Minute + err error + ) + + if notifyWatchdog { + const clearEnvVars = false + interval, err = daemon.SdWatchdogEnabled(clearEnvVars) + if err != nil { + return nil, err + } + } + + return &watchdogNotifier{ + activeHandlers: make(map[int]handlerInfo), + watchdogInterval: interval, + log: l, + termCallback: termCallback, + }, nil +} + +// notify is to be called as a routine. This is responsible for checking if the +// handlers are healthy and then notifying the watchdog if so, otherwise we +// wait, continue and check again until they are. If the handlers take too long to +// become healthy, we risk exceeding the watchdog interval causing a process restart. +// notify also starts a routine to monitor for any SIGINT or SIGTERM, upon which +// a callback that's provided at initialisation is called. +func (n *watchdogNotifier) notify() { + notifyTicker := time.NewTicker(n.watchdogInterval / 2) + + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + n.log.Warning("received termination signal, calling termination callback", "signal", sig.String()) + n.termCallback() + }() + + for { + if n.handlersUnhealthy() { + const unhealthyHandlerWait = 1 * time.Second + time.Sleep(unhealthyHandlerWait) + continue + } + + <-notifyTicker.C + + if !notifyWatchdog { + continue + } + + // If this fails for any reason it indicates a systemd service configuration + // issue, and therefore programmer error, so do fatal log to cause crash. + supported, err := daemon.SdNotify(false, daemon.SdNotifyWatchdog) + if err != nil { + n.log.Fatal("error from systemd watchdog notify", "error", err) + } + + if !supported { + n.log.Fatal("watchdog notification not supported") + } + } +} + +// handlersUnhealthy returns true if it is detected that any handlers are unhealthy, +// that is, if they have been handling for longer than the unhealthyHandleDuration. +func (n *watchdogNotifier) handlersUnhealthy() bool { + n.mu.Lock() + defer n.mu.Unlock() + for _, info := range n.activeHandlers { + const unhealthyHandleDuration = 30 * time.Second + if time.Now().Sub(info.time) > unhealthyHandleDuration { + n.log.Warning("handler unhealthy", "name", info.name) + return true + } + } + return false +} + +// handlerInvoked is to be called at the start of a request handler to indicate +// that handling has begun. The name and start time of the handler is recorded +// in the active handlers map with a unique ID as the key. A function is returned +// that must be called at exit of the handler to indicate that handling has +// finished. It is recommended this be done using a defer statement immediately +// after receiveing it. +func (n *watchdogNotifier) handlerInvoked(name string) func() { + n.mu.Lock() + defer n.mu.Unlock() + + id := n.curId + n.curId++ + n.activeHandlers[id] = handlerInfo{time: time.Now(), name: name} + + return func() { + n.mu.Lock() + defer n.mu.Unlock() + + if _, ok := n.activeHandlers[id]; !ok { + log.Fatal("handler id not in map", "name", name) + } + + delete(n.activeHandlers, id) + } +} diff --git a/cmd/vidforward/nowatchdog.go b/cmd/vidforward/nowatchdog.go new file mode 100644 index 00000000..fa31fa4c --- /dev/null +++ b/cmd/vidforward/nowatchdog.go @@ -0,0 +1,32 @@ +//go:build nowatchdog +// +build nowatchdog + +/* +DESCRIPTION + nowatchdog.go sets the notifyWatchdog global to false. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package main + +func init() { + notifyWatchdog = false +} diff --git a/go.mod b/go.mod index 7addbb62..8ba7ec85 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( bitbucket.org/ausocean/iot v1.3.3 bitbucket.org/ausocean/utils v1.3.2 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 + github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/google/go-cmp v0.4.1 diff --git a/go.sum b/go.sum index d860c7a9..d340b280 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,8 @@ github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= github.com/cocoonlife/goalsa v0.0.0-20160812085113-b711ae6f3eff/go.mod h1:5aLO409bJnd+jCw0t/SB/DhHkVBhPAE31lnHJnYQxy0= github.com/cocoonlife/testify v0.0.0-20160218172820-792cc1faeb64/go.mod h1:LoCAz53rbPcqs8Da2BjB/yDy4gxMtiSQmqnYI/DGH+U= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cryptix/wav v0.0.0-20180415113528-8bdace674401/go.mod h1:knK8fd+KPlGGqSUWogv1DQzGTwnfUvAi0cIoWyOG7+U= github.com/d2r2/go-dht v0.0.0-20200119175940-4ba96621a218/go.mod h1:AzSqP4S4/6pINOKg3VC79WC7YY3zskQcrXMFzphCry0= github.com/d2r2/go-logger v0.0.0-20210606094344-60e9d1233e22/go.mod h1:eSx+YfcVy5vCjRZBNIhpIpfCGFMQ6XSOSQkDk7+VCpg=