mirror of https://bitbucket.org/ausocean/av.git
cmd/vidforward: add watchdog notifier
This change adds a "watchdog notifier" utility which tracks the health of request handlers and notifies an external systemd watchdog if everything looks good. This allows us to cause a termination if any request handlers get hung.
This commit is contained in:
parent
f6e00342f7
commit
8f8b9ca0f4
|
@ -86,20 +86,45 @@ type broadcastManager struct {
|
|||
broadcasts map[MAC]Broadcast
|
||||
slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image.
|
||||
log logging.Logger
|
||||
dogNotifier *watchdogNotifier
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// newBroadcastManager returns a new broadcastManager with the provided logger.
|
||||
func newBroadcastManager(l logging.Logger) *broadcastManager {
|
||||
return &broadcastManager{log: l, broadcasts: make(map[MAC]Broadcast), slateExitSignals: make(map[MAC]chan struct{})}
|
||||
func newBroadcastManager(l logging.Logger) (*broadcastManager, error) {
|
||||
m := &broadcastManager{
|
||||
log: l,
|
||||
broadcasts: make(map[MAC]Broadcast),
|
||||
slateExitSignals: make(map[MAC]chan struct{}),
|
||||
}
|
||||
notifier, err := newWatchdogNotifier(l, func() {
|
||||
err := m.save()
|
||||
if err != nil {
|
||||
m.log.Error("could not save on notifier termination signal", "error", err)
|
||||
return
|
||||
}
|
||||
m.log.Info("successfully saved broadcast manager state on termination signal")
|
||||
})
|
||||
m.dogNotifier = notifier
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// save is currently just a stub, but will eventually save the broadcastManager's
|
||||
// state to file.
|
||||
func (b *broadcastManager) save() error { return nil }
|
||||
|
||||
// recvHandler handles recv requests for video forwarding. The MAC is firstly
|
||||
// checked to ensure it is "active" i.e. should be sending data, and then the
|
||||
// video is extracted from the request body and provided to the revid pipeline
|
||||
// corresponding to said MAC.
|
||||
// Clips of MPEG-TS h264 are the only accepted format and codec.
|
||||
func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
||||
done := m.dogNotifier.handlerInvoked("recv")
|
||||
defer done()
|
||||
|
||||
m.log.Debug("recv handler")
|
||||
q := r.URL.Query()
|
||||
ma := MAC(q.Get("ma"))
|
||||
|
@ -171,6 +196,9 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// control handles control API requests.
|
||||
func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) {
|
||||
done := m.dogNotifier.handlerInvoked("control")
|
||||
defer done()
|
||||
|
||||
m.log.Info("control request", "method", r.Method)
|
||||
switch r.Method {
|
||||
case http.MethodPut:
|
||||
|
@ -359,8 +387,14 @@ func main() {
|
|||
// lumberjack and netloggers.
|
||||
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
|
||||
|
||||
bm := newBroadcastManager(log)
|
||||
http.HandleFunc("/recv", bm.recv)
|
||||
http.HandleFunc("/control", bm.control)
|
||||
m, err := newBroadcastManager(log)
|
||||
if err != nil {
|
||||
log.Fatal("could not create new broadcast manager", "error", err)
|
||||
}
|
||||
http.HandleFunc("/recv", m.recv)
|
||||
http.HandleFunc("/control", m.control)
|
||||
|
||||
go m.dogNotifier.notify()
|
||||
|
||||
http.ListenAndServe(*host+":"+*port, nil)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
DESCRIPTION
|
||||
notifier.go provides a tool for notifying a systemd watchdog under healthy
|
||||
operation of the vidforward service.
|
||||
|
||||
AUTHORS
|
||||
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||
|
||||
LICENSE
|
||||
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
|
||||
|
||||
It is free software: you can redistribute it and/or modify them
|
||||
under the terms of the GNU General Public License as published by the
|
||||
Free Software Foundation, either version 3 of the License, or (at your
|
||||
option) any later version.
|
||||
|
||||
It is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"bitbucket.org/ausocean/utils/logging"
|
||||
"github.com/coreos/go-systemd/daemon"
|
||||
)
|
||||
|
||||
// By default we assume we should be notifying a systemd watchdog. This can be
|
||||
// toggled off by using the nowatchdog build tag (see nowatchdog.go file).
|
||||
var notifyWatchdog = true
|
||||
|
||||
// watchdogNotifier keeps track of the watchdog interval from the external
|
||||
// sysd service settings, the currently active request handlers and a curId
|
||||
// field that is is incremented to generate new handler ids for storage.
|
||||
type watchdogNotifier struct {
|
||||
watchdogInterval time.Duration
|
||||
activeHandlers map[int]handlerInfo
|
||||
curId int
|
||||
termCallback func()
|
||||
log logging.Logger
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// handlerInfo keeps track of a handlers name (for any logging purposes) and
|
||||
// time at which the handler was invoked, which is later used to calculate time
|
||||
// active and therefore heatlh.
|
||||
type handlerInfo struct {
|
||||
name string
|
||||
time time.Time
|
||||
}
|
||||
|
||||
// newWatchdogNotifier creates a new watchdogNotifier with the provided logger
|
||||
// and termination callback that is called if a SIGINT or SIGTERM signal is
|
||||
// received. Recommended use of this is an attempted state save.
|
||||
func newWatchdogNotifier(l logging.Logger, termCallback func()) (*watchdogNotifier, error) {
|
||||
var (
|
||||
interval = 1 * time.Minute
|
||||
err error
|
||||
)
|
||||
|
||||
if notifyWatchdog {
|
||||
const clearEnvVars = false
|
||||
interval, err = daemon.SdWatchdogEnabled(clearEnvVars)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &watchdogNotifier{
|
||||
activeHandlers: make(map[int]handlerInfo),
|
||||
watchdogInterval: interval,
|
||||
log: l,
|
||||
termCallback: termCallback,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// notify is to be called as a routine. This is responsible for checking if the
|
||||
// handlers are healthy and then notifying the watchdog if so, otherwise we
|
||||
// wait, continue and check again until they are. If the handlers take too long to
|
||||
// become healthy, we risk exceeding the watchdog interval causing a process restart.
|
||||
// notify also starts a routine to monitor for any SIGINT or SIGTERM, upon which
|
||||
// a callback that's provided at initialisation is called.
|
||||
func (n *watchdogNotifier) notify() {
|
||||
notifyTicker := time.NewTicker(n.watchdogInterval / 2)
|
||||
|
||||
go func() {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
sig := <-sigs
|
||||
n.log.Warning("received termination signal, calling termination callback", "signal", sig.String())
|
||||
n.termCallback()
|
||||
}()
|
||||
|
||||
for {
|
||||
if n.handlersUnhealthy() {
|
||||
const unhealthyHandlerWait = 1 * time.Second
|
||||
time.Sleep(unhealthyHandlerWait)
|
||||
continue
|
||||
}
|
||||
|
||||
<-notifyTicker.C
|
||||
|
||||
if !notifyWatchdog {
|
||||
continue
|
||||
}
|
||||
|
||||
// If this fails for any reason it indicates a systemd service configuration
|
||||
// issue, and therefore programmer error, so do fatal log to cause crash.
|
||||
supported, err := daemon.SdNotify(false, daemon.SdNotifyWatchdog)
|
||||
if err != nil {
|
||||
n.log.Fatal("error from systemd watchdog notify", "error", err)
|
||||
}
|
||||
|
||||
if !supported {
|
||||
n.log.Fatal("watchdog notification not supported")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handlersUnhealthy returns true if it is detected that any handlers are unhealthy,
|
||||
// that is, if they have been handling for longer than the unhealthyHandleDuration.
|
||||
func (n *watchdogNotifier) handlersUnhealthy() bool {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
for _, info := range n.activeHandlers {
|
||||
const unhealthyHandleDuration = 30 * time.Second
|
||||
if time.Now().Sub(info.time) > unhealthyHandleDuration {
|
||||
n.log.Warning("handler unhealthy", "name", info.name)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// handlerInvoked is to be called at the start of a request handler to indicate
|
||||
// that handling has begun. The name and start time of the handler is recorded
|
||||
// in the active handlers map with a unique ID as the key. A function is returned
|
||||
// that must be called at exit of the handler to indicate that handling has
|
||||
// finished. It is recommended this be done using a defer statement immediately
|
||||
// after receiveing it.
|
||||
func (n *watchdogNotifier) handlerInvoked(name string) func() {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
id := n.curId
|
||||
n.curId++
|
||||
n.activeHandlers[id] = handlerInfo{time: time.Now(), name: name}
|
||||
|
||||
return func() {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
if _, ok := n.activeHandlers[id]; !ok {
|
||||
log.Fatal("handler id not in map", "name", name)
|
||||
}
|
||||
|
||||
delete(n.activeHandlers, id)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
//go:build nowatchdog
|
||||
// +build nowatchdog
|
||||
|
||||
/*
|
||||
DESCRIPTION
|
||||
nowatchdog.go sets the notifyWatchdog global to false.
|
||||
|
||||
AUTHORS
|
||||
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||
|
||||
LICENSE
|
||||
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
|
||||
|
||||
It is free software: you can redistribute it and/or modify them
|
||||
under the terms of the GNU General Public License as published by the
|
||||
Free Software Foundation, either version 3 of the License, or (at your
|
||||
option) any later version.
|
||||
|
||||
It is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
func init() {
|
||||
notifyWatchdog = false
|
||||
}
|
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
bitbucket.org/ausocean/iot v1.3.3
|
||||
bitbucket.org/ausocean/utils v1.3.2
|
||||
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
|
||||
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
|
||||
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
|
||||
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884
|
||||
github.com/google/go-cmp v0.4.1
|
||||
|
|
2
go.sum
2
go.sum
|
@ -26,6 +26,8 @@ github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
|
|||
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U=
|
||||
github.com/cocoonlife/goalsa v0.0.0-20160812085113-b711ae6f3eff/go.mod h1:5aLO409bJnd+jCw0t/SB/DhHkVBhPAE31lnHJnYQxy0=
|
||||
github.com/cocoonlife/testify v0.0.0-20160218172820-792cc1faeb64/go.mod h1:LoCAz53rbPcqs8Da2BjB/yDy4gxMtiSQmqnYI/DGH+U=
|
||||
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU=
|
||||
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/cryptix/wav v0.0.0-20180415113528-8bdace674401/go.mod h1:knK8fd+KPlGGqSUWogv1DQzGTwnfUvAi0cIoWyOG7+U=
|
||||
github.com/d2r2/go-dht v0.0.0-20200119175940-4ba96621a218/go.mod h1:AzSqP4S4/6pINOKg3VC79WC7YY3zskQcrXMFzphCry0=
|
||||
github.com/d2r2/go-logger v0.0.0-20210606094344-60e9d1233e22/go.mod h1:eSx+YfcVy5vCjRZBNIhpIpfCGFMQ6XSOSQkDk7+VCpg=
|
||||
|
|
Loading…
Reference in New Issue