vidforward: add config file watcher

This change adds a file watcher and then uses this file watcher
to perform updates based on changes in a configuration file. This
configuration file contains logging parameters for the time being
in the hope that it will help with debugging.

Testing was also added for this functionality.
This commit is contained in:
Saxon Nelson-Milton 2023-06-13 21:01:47 +09:30
parent 853aa31260
commit bcbb187bef
5 changed files with 271 additions and 3 deletions

View File

@ -32,6 +32,7 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
@ -46,6 +47,10 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
)
// This is the path to the vidforward configuration.
// This contains parameters such as log level and logging filters.
const configFileName = "/etc/vidforward/config.json"
// Server defaults.
const (
defaultPort = "8080"
@ -125,6 +130,49 @@ func terminationCallback(m *broadcastManager) func() {
}
}
// loadConfig loads the vidforward configuration file. This primarily concerns logging
// configuration for the time being, with the intended use case of debugging.
func (m *broadcastManager) loadConfig() error {
m.mu.Lock()
defer m.mu.Unlock()
data, err := ioutil.ReadFile(configFileName)
if err != nil {
return fmt.Errorf("could not read config file: %w", err)
}
var cfg struct {
LogLevel string `json:"LogLevel"`
LogSuppress bool `json:"LogSuppress"`
LogCallerFilters []string `json:"LogCallerFilters"`
}
if err = json.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("could not unmarshal config file: %w", err)
}
m.log.(*logging.JSONLogger).SetLevel(map[string]int8{
"debug": logging.Debug,
"info": logging.Info,
"warning": logging.Warning,
"error": logging.Error,
"fatal": logging.Fatal,
}[cfg.LogLevel])
m.log.(*logging.JSONLogger).SetSuppress(cfg.LogSuppress)
m.log.(*logging.JSONLogger).SetCallerFilters(cfg.LogCallerFilters...)
return nil
}
// This is a callback that can be used by file watchers to reload the config.
func (m *broadcastManager) onConfigChange() {
err := m.loadConfig()
if err != nil {
m.log.Error("could not load config", "error", err)
return
}
}
// recvHandler handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
@ -413,6 +461,16 @@ func main() {
log.Warning("could not load previous state", "error", err)
}
// Try to load the config file.
err = m.loadConfig()
if err != nil {
log.Warning("could not load config file", "error", err)
}
// Set up a file watcher to watch the config file. This will allow us
// to perform updates to configuration while the service is running.
watchFile(configFileName, m.onConfigChange, log)
http.HandleFunc("/recv", m.recv)
http.HandleFunc("/control", m.control)
http.HandleFunc("/slate", m.slate)

79
cmd/vidforward/watcher.go Normal file
View File

@ -0,0 +1,79 @@
/*
DESCRIPTION
watcher.go provides a tool for watching a file for modifications and
performing an action when the file is modified.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"fmt"
"path"
"bitbucket.org/ausocean/utils/logging"
"github.com/fsnotify/fsnotify"
)
// watchFile watches a file for modifications and calls onWrite when the file
// is modified. Technically, the directory is watched instead of the file.
// This is because watching the file itself will cause problems if changes
// are done atomically.
// See fsnotify documentation:
// https://godocs.io/github.com/fsnotify/fsnotify#hdr-Watching_files
func watchFile(file string, onWrite func(), l logging.Logger) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("could not create watcher: %w", err)
}
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
l.Warning("watcher events chan closed, terminating")
return
}
if event.Op&fsnotify.Write == fsnotify.Write && event.Name == file {
l.Info("file modification event", "file", file)
onWrite()
}
case err, ok := <-watcher.Errors:
if !ok {
l.Warning("watcher error chan closed, terminating")
return
}
l.Error("file watcher error", "error", err)
}
}
}()
// Watch the directory over the file.
err = watcher.Add(path.Dir(file))
if err != nil {
return fmt.Errorf("could not add file %s to watcher: %w", file, err)
}
return nil
}

View File

@ -0,0 +1,124 @@
package main
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"bitbucket.org/ausocean/utils/logging"
)
// Delay between changing a file and the file watcher picking up the change.
const watchTimeAllowance = 1 * time.Second
// TestWatchFile tests the watchFile function. It creates a temporary file,
// watches it, writes to it, and checks if the onWrite function was called.
func TestWatchFile(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "example")
if err != nil {
t.Fatalf("could not create temporary file: %v", err)
}
defer os.Remove(tmpFile.Name())
// We'll check this to see if the onWrite function was called.
called := false
err = watchFile(tmpFile.Name(), func() {
called = true
}, (*logging.TestLogger)(t))
if err != nil {
t.Fatalf("watchFile failed: %v", err)
}
if _, err := tmpFile.Write([]byte("hello world")); err != nil {
t.Fatalf("could not write to temporary file: %v", err)
}
if err := tmpFile.Close(); err != nil {
t.Fatalf("could not close temporary file: %v", err)
}
// Allow some time for the file watcher to pick up the change.
time.Sleep(watchTimeAllowance)
if !called {
t.Errorf("onWrite was not called after modifying the file")
}
}
// TestWatchFileFileNotExistYet tests the watchFile function in the case
// that the file to be watched does not exist on the first call to watchFile.
// It creates a temporary directory, watches a file in that directory, creates
// and writes to the file, and checks if the onWrite function was called.
func TestWatchFileFileNotExistYet(t *testing.T) {
// Create a temporary directory.
tmpDir, err := ioutil.TempDir("", "example")
if err != nil {
t.Fatalf("could not create temporary directory: %v", err)
}
defer os.Remove(tmpDir) // clean up
// File that does not exist yet but will be created in the temporary directory.
fileName := filepath.Join(tmpDir, "testfile")
called := false
err = watchFile(fileName, func() {
called = true
}, (*logging.TestLogger)(t))
if err != nil {
t.Fatalf("watchFile failed: %v", err)
}
// Create and write to the file.
err = ioutil.WriteFile(fileName, []byte("hello world"), 0666)
if err != nil {
t.Fatalf("could not write to file: %v", err)
}
// Allow some time for the file watcher to pick up the change.
time.Sleep(watchTimeAllowance)
if !called {
t.Errorf("onWrite was not called after creating and modifying the file")
}
}
// TestWatchFileMultipleChanges tests the watchFile function in the case
// that the file to be watched is modified multiple times. It creates a
// temporary file, watches it, writes to it twice, and checks if the onWrite
// function was called twice.
func TestWatchFileMultipleChanges(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "example")
if err != nil {
t.Fatalf("could not create temporary file: %v", err)
}
defer os.Remove(tmpfile.Name())
// We'll count how many times onWrite was called.
calledCount := 0
err = watchFile(tmpfile.Name(), func() {
calledCount++
}, (*logging.TestLogger)(t))
if err != nil {
t.Fatalf("watchFile failed: %v", err)
}
// Write to the file twice.
for i := 0; i < 2; i++ {
if _, err := tmpfile.Write([]byte("hello world")); err != nil {
t.Fatalf("could not write to temporary file: %v", err)
}
if err := tmpfile.Sync(); err != nil {
t.Fatalf("could not sync temporary file: %v", err)
}
// Allow some time for the file watcher to pick up the change.
time.Sleep(watchTimeAllowance)
}
if calledCount != 2 {
t.Errorf("onWrite was not called the expected number of times after modifying the file")
}
}

5
go.mod
View File

@ -4,7 +4,7 @@ go 1.18
require (
bitbucket.org/ausocean/iot v1.4.1
bitbucket.org/ausocean/utils v1.3.2
bitbucket.org/ausocean/utils v1.4.0
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
@ -22,6 +22,8 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
require github.com/fsnotify/fsnotify v1.6.0
require (
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af // indirect
github.com/fogleman/gg v1.3.0 // indirect
@ -35,4 +37,5 @@ require (
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/image v0.0.0-20210216034530-4410531fe030 // indirect
golang.org/x/sys v0.0.0-20220908164124-27713097b956 // indirect
)

8
go.sum
View File

@ -3,8 +3,8 @@ bitbucket.org/ausocean/iot v1.4.1 h1:PcRu9dS5CbKyw1FZjEc4MR9CQ+ku9MkH9ZjA0f6Mm1c
bitbucket.org/ausocean/iot v1.4.1/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o=
bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.3.0/go.mod h1:yWsulKjbBgwL17/w55MQ6cIT9jmNeOkwpd2gUIxAcIY=
bitbucket.org/ausocean/utils v1.3.2 h1:U+jNixVPH1Ih0QpOckI4djhEc/T6FAzt9znADRcJ07s=
bitbucket.org/ausocean/utils v1.3.2/go.mod h1:XgvCH4DQLCd6NYMzsSqwhHmPr+qzYks5M8IDpdNnZiU=
bitbucket.org/ausocean/utils v1.4.0 h1:ceNRscC49fAVY9tIOAN3Jyg7g+A9JTKjqgxEGb7A+9E=
bitbucket.org/ausocean/utils v1.4.0/go.mod h1:XgvCH4DQLCd6NYMzsSqwhHmPr+qzYks5M8IDpdNnZiU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
@ -36,6 +36,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/go-audio/aiff v0.0.0-20180403003018-6c3a8a6aff12/go.mod h1:AMSAp6W1zd0koOdX6QDgGIuBDTUvLa2SLQtm7d9eM3c=
github.com/go-audio/audio v0.0.0-20180206231410-b697a35b5608/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 h1:4sGU+UABMMsRJyD+Y2yzMYxq0GJFUsRRESI0P1gZ2ig=
@ -141,6 +143,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210304124612-50617c2ba197/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=