Add functionality for broadcastManager state save

This change adds functionality for broadcastManager state save.
This is done by marshalling the broadcastManager and saving to a
file. Loading is performed by unmarshalling an re-populating a
broadcastManager value. Testing has been added for this
functionality. Other functionality has been added a side effect.
This commit is contained in:
Saxon Nelson-Milton 2023-01-11 09:01:38 +10:30
parent 94a7971ec2
commit 842e2a2d93
9 changed files with 677 additions and 89 deletions

35
cmd/vidforward/debug.go Normal file
View File

@ -0,0 +1,35 @@
//go:build debug
// +build debug
/*
DESCRIPTION
When this file is included in build by using the debug build tag, the logging
level is changed to debug.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022-2023 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import "bitbucket.org/ausocean/utils/logging"
func init() {
loggingLevel = logging.Debug
}

172
cmd/vidforward/file.go Normal file
View File

@ -0,0 +1,172 @@
/*
DESCRIPTION
files.go provides the functionality required for saving and loading
broadcastManager state. This includes marshalling/unmarshalling overrides.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022-2023 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"encoding/json"
"fmt"
"os"
"bitbucket.org/ausocean/av/cmd/vidforward/global"
)
// inTest is used to indicate if we are within a test; some functionality is not
// employed in this case.
var inTest bool
// The file name for the broadcast manager state save.
const fileName = "state.json"
// BroadcastBasic is a crude version of the Broadcast used to simplify
// marshal/unmarshal overriding.
type BroadcastBasic struct {
MAC
URL string
Status string
}
// ManagerBasic is a crude version of the BroadcastManager struct use to simplify
// marshal/unmarshal overriding.
type ManagerBasic struct {
Broadcasts map[MAC]Broadcast
SlateExitSignals []MAC
}
// MarshalJSON calls the default marshalling behaviour for the BroadcastBasic
// struct using the information from b.
func (b Broadcast) MarshalJSON() ([]byte, error) {
return json.Marshal(BroadcastBasic{
MAC: b.mac,
URL: b.url,
Status: b.status,
})
}
// UnmarshalJSON unmarshals into a value of the BroadcastBasic struct and then
// populates a Broadcast value.
func (b *Broadcast) UnmarshalJSON(data []byte) error {
var bm BroadcastBasic
err := json.Unmarshal(data, &bm)
if err != nil {
return fmt.Errorf("could not unmarshal JSON: %w", err)
}
b.mac = bm.MAC
b.url = bm.URL
b.status = bm.Status
b.rv, err = newRevid(global.GetLogger(), b.url)
if err != nil {
return fmt.Errorf("could not populate RV field: %w", err)
}
return nil
}
// MarshalJSON calls the default marshaller for a ManagerBasic value using data
// from a broadcastManager value.
func (m *broadcastManager) MarshalJSON() ([]byte, error) {
var signals []MAC
for k := range m.slateExitSignals {
signals = append(signals, k)
}
return json.Marshal(ManagerBasic{
Broadcasts: m.broadcasts,
SlateExitSignals: signals,
})
}
// UnmarshalJSON populates a ManagerBasic value from the provided data and then
// populates the receiver broadcastManager to a usable state based on this data.
func (m *broadcastManager) UnmarshalJSON(data []byte) error {
var mb ManagerBasic
err := json.Unmarshal(data, &mb)
if err != nil {
return fmt.Errorf("could not unmarshal JSON: %w", err)
}
m.broadcasts = mb.Broadcasts
m.slateExitSignals = make(map[MAC]chan struct{})
m.log = global.GetLogger()
notifier, err := newWatchdogNotifier(m.log, terminationCallback(m))
if err != nil {
return fmt.Errorf("could not create watchdog notifier: %w", err)
}
m.dogNotifier = notifier
for _, mac := range mb.SlateExitSignals {
sigCh := make(chan struct{})
m.slateExitSignals[mac] = sigCh
rv := m.getPipeline(mac)
if rv == nil {
panic("no pipeline for MAC")
}
if !inTest {
err := writeSlateAndCheckErrors(rv, sigCh, m.log)
if err != nil {
return fmt.Errorf("couldn't write slate for MAC %v: %w", mac, err)
}
}
}
return nil
}
// save utilises marshalling functionality to save the broadcastManager state
// to a file.
func (m *broadcastManager) save() error {
f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
return fmt.Errorf("could not open file: %w", err)
}
defer f.Close()
bytes, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("could not marshal broadcast manager: %w", err)
}
_, err = f.Write(bytes)
if err != nil {
return fmt.Errorf("could not write bytes to file: %w", err)
}
return nil
}
// load populates a broadcastManager value based on the previously saved state.
func (m *broadcastManager) load() error {
bytes, err := os.ReadFile(fileName)
if err != nil {
return fmt.Errorf("could not read state file: %w", err)
}
err = json.Unmarshal(bytes, &m)
if err != nil {
return fmt.Errorf("could not unmarshal state data: %w", err)
}
return nil
}

298
cmd/vidforward/file_test.go Normal file
View File

@ -0,0 +1,298 @@
/*
DESCRIPTION
file_test.go provides testing for functionality contained in file.go.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022-2023 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"bytes"
"encoding/json"
"reflect"
"testing"
"bitbucket.org/ausocean/av/cmd/vidforward/global"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logging"
)
const (
testURL = "rtmp://some-random-url.abcdef-12345"
testMAC = "78:90:AE:7B:2C:76"
)
func init(){
inTest = true
}
func TestBroadcastMarshal(t *testing.T) {
logger := (*logging.TestLogger)(t)
// Marshalling functionality uses this.
global.SetLogger(logger)
tests := []struct {
in Broadcast
expect []byte
}{
{
in: Broadcast{
mac: testMAC,
url: testURL,
status: statusActive,
rv: newRevidForTest((*logging.TestLogger)(t), testURL, t),
},
expect: []byte("{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusActive + "\"}"),
},
}
for i, test := range tests {
got, err := test.in.MarshalJSON()
if err != nil {
t.Errorf("could not marshal json for test no. %d: %v", i, err)
continue
}
if !bytes.Equal(got, test.expect) {
t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", string(got), string(test.expect))
}
}
}
func TestBroadcastUnmarshal(t *testing.T) {
logger := (*logging.TestLogger)(t)
// Marshalling functionality uses this.
global.SetLogger(logger)
tests := []struct {
in []byte
expect Broadcast
}{
{
expect: Broadcast{
mac: testMAC,
url: testURL,
status: statusActive,
rv: newRevidForTest(logger, testURL, t),
},
in: []byte("{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusActive + "\"}"),
},
}
for i, test := range tests {
var got Broadcast
err := got.UnmarshalJSON(test.in)
if err != nil {
t.Errorf("could not marshal json for test no. %d: %v", i, err)
continue
}
if !broadcastsEqual(got, test.expect) {
t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect)
}
}
}
func TestBroadcastManagerMarshal(t *testing.T) {
logger := (*logging.TestLogger)(t)
// Marshalling functionality uses this.
global.SetLogger(logger)
tests := []struct {
in broadcastManager
expect []byte
}{
{
in: broadcastManager{
broadcasts: map[MAC]Broadcast{
testMAC: Broadcast{
testMAC,
testURL,
statusSlate,
newRevidForTest((*logging.TestLogger)(t), testURL, t),
},
},
slateExitSignals: newExitSignalsForTest(t, testMAC),
log: logger,
dogNotifier: newWatchdogNotifierForTest(t, logger),
},
expect: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"),
},
}
for i, test := range tests {
got, err := test.in.MarshalJSON()
if err != nil {
t.Errorf("could not marshal json for test no. %d: %v", i, err)
continue
}
if !bytes.Equal(got, test.expect) {
t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", string(got), string(test.expect))
}
}
}
func TestBroadcastManagerUnmarshal(t *testing.T) {
logger := (*logging.TestLogger)(t)
// Marshalling functionality uses this.
global.SetLogger(logger)
tests := []struct {
in []byte
expect broadcastManager
}{
{
in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"),
expect: broadcastManager{
broadcasts: map[MAC]Broadcast{
testMAC: Broadcast{
testMAC,
testURL,
statusSlate,
newRevidForTest((*logging.TestLogger)(t), testURL, t),
},
},
slateExitSignals: newExitSignalsForTest(t, testMAC),
log: logger,
dogNotifier: newWatchdogNotifierForTest(t, logger),
},
},
}
for i, test := range tests {
var got broadcastManager
if err := json.Unmarshal(test.in, &got); err != nil {
t.Errorf("could not unmarshal json for test no. %d: %v", i, err)
continue
}
if !broadcastManagersEqual(got, test.expect) {
t.Errorf("did not get expected result.\nGot: %+v\nWnt: %+v\n", got, test.expect)
}
}
}
func broadcastManagersEqual(m1, m2 broadcastManager) bool {
if !broadcastMapsEqual(m1.broadcasts, m2.broadcasts) ||
!slateExitSignalMapsEqual(m1.slateExitSignals, m2.slateExitSignals) ||
!watchdogNotifiersEqual(*m1.dogNotifier, *m2.dogNotifier) {
return false
}
return true
}
func broadcastMapsEqual(m1, m2 map[MAC]Broadcast) bool {
return mapsEqual(m1,m2,broadcastsEqual)
}
func slateExitSignalMapsEqual(m1, m2 map[MAC]chan struct{}) bool {
return mapsEqual(m1,m2,func(v1, v2 chan struct{}) bool {
return ((v1 == nil || v2 == nil ) && v1 == v2) || (v1 != nil && v2 != nil)
})
}
func activeHandlersMapEqual(m1, m2 map[int]handlerInfo) bool {
return mapsEqual(m1,m2, func(v1, v2 handlerInfo) bool { return v1.name == v2.name })
}
// mapsEqual is a generic function to check that any two maps are equal based on
// the provided value compare function cmp.
func mapsEqual[K comparable, V any](m1, m2 map[K]V, cmp func(v1, v2 V) bool) bool {
if len(m1) != len(m2) {
return false
}
for k, v1 := range m1 {
v2, ok := m2[k]
if !ok || !cmp(v1,v2) {
return false
}
}
return true
}
func watchdogNotifiersEqual(w1, w2 watchdogNotifier) bool {
if w1.watchdogInterval != w2.watchdogInterval ||
!activeHandlersMapEqual(w1.activeHandlers, w2.activeHandlers) {
return false
}
return true
}
func broadcastsEqual(b1, b2 Broadcast) bool {
if b1.mac != b2.mac || b1.url != b2.url || b1.status != b2.status ||
((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) {
return false
}
if b1.rv != nil && !configsEqual(b1.rv.Config(), b2.rv.Config()) {
return false
}
return true
}
// configsEqual returns true if the provided config.Config values are equal. The
// comparison is shallow given that only fields of basic types are compared, not
// structs or interfaces.
func configsEqual(cfg1, cfg2 config.Config) bool {
cfg1ValOf := reflect.ValueOf(cfg1)
cfg2ValOf := reflect.ValueOf(cfg2)
for i := 0; i < cfg1ValOf.NumField(); i++ {
if cfg1ValOf.Field(i).Kind() == reflect.Struct || cfg1ValOf.Field(i).Kind() == reflect.Interface {
continue
}
if !reflect.DeepEqual(cfg1ValOf.Field(i).Interface(), cfg2ValOf.Field(i).Interface()) {
return false
}
}
return true
}
// newRevidForTest allows us to create revid in table driven test entry.
func newRevidForTest(log logging.Logger, url string, t *testing.T) *revid.Revid {
r, err := newRevid(log, url)
if err != nil {
t.Fatalf("could not create revid pipeline: %v", err)
return nil
}
return r
}
// newExitSignalsForTest creates a map of chan struct{} for the provided MACs.
// This is used to populate the slateExitSignals field in the broadcastManager.
func newExitSignalsForTest(t *testing.T, macs ...MAC) map[MAC]chan struct{} {
sigMap := make(map[MAC]chan struct{})
for _, m := range macs {
sigMap[m] = make(chan struct{})
}
return sigMap
}
// newWatchdogNotifierForTest allows us to create watchdog notifier in test table.
func newWatchdogNotifierForTest(t *testing.T, l logging.Logger) *watchdogNotifier {
n, err := newWatchdogNotifier(l, func() {})
if err != nil {
t.Fatalf("could not create new watchdog notifier: %v", err)
return nil
}
return n
}

View File

@ -0,0 +1,55 @@
/*
DESCRIPTION
logger.go provides a "safe" global logger by following the singleton pattern.
Usage of this should be avoided if possible, but in some instances it might be
necessary, for example implementations of interfaces where logging is required
but do not offer parameters where a logger can be passed as an argument.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package global
import "bitbucket.org/ausocean/utils/logging"
var logger *globalLogger = nil
type globalLogger struct {
logging.Logger
}
// SetLogger sets the global logger. This must be set, and only once, before
// the GetLogger function is called. If these requirements are violated panics
// will occur.
func SetLogger(l logging.Logger) {
if logger != nil {
logger.Fatal("attempting set of already instantiated global logger")
}
logger = &globalLogger{l}
}
// GetLogger returns the global logger. If this has not been set, a panic will
// occur.
func GetLogger() logging.Logger {
if logger == nil {
panic("attempted get of uninstantiated global logger")
}
return logger
}

View File

@ -38,11 +38,11 @@ import (
"sync"
"time"
"bitbucket.org/ausocean/av/cmd/vidforward/global"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netlogger"
"bitbucket.org/ausocean/utils/logging"
"gopkg.in/natefinch/lumberjack.v2"
)
@ -69,12 +69,20 @@ const recvErrorDelay = 7 * time.Second
type MAC string
// The possible states for a broadcast.
const (
statusActive = "active"
statusSlate = "slate"
statusCreate = "create"
statusPlay = "play"
)
// Broadcast is representative of a broadcast to be forwarded.
type Broadcast struct {
MAC // MAC address of the device from which the video is being received.
URL string // The destination youtube RTMP URL.
Status string // The broadcast status i.e. active, inactive and slate.
RV *revid.Revid // The revid pipeline which will handle forwarding to youtube.
mac MAC // MAC address of the device from which the video is being received.
url string // The destination youtube RTMP URL.
status string // The broadcast status i.e. active or slate.
rv *revid.Revid // The revid pipeline which will handle forwarding to youtube.
}
// broadcastManager manages a map of Broadcasts we expect to be forwarding video
@ -97,25 +105,27 @@ func newBroadcastManager(l logging.Logger) (*broadcastManager, error) {
broadcasts: make(map[MAC]Broadcast),
slateExitSignals: make(map[MAC]chan struct{}),
}
notifier, err := newWatchdogNotifier(l, func() {
notifier, err := newWatchdogNotifier(l, terminationCallback(m))
if err != nil {
return nil, err
}
m.dogNotifier = notifier
return m, nil
}
// terminationCallback provides a callback that saves the provided
// broadcastManagers state.
func terminationCallback(m *broadcastManager) func() {
return func() {
err := m.save()
if err != nil {
m.log.Error("could not save on notifier termination signal", "error", err)
return
}
m.log.Info("successfully saved broadcast manager state on termination signal")
})
m.dogNotifier = notifier
if err != nil {
return nil, err
}
return m, nil
}
// save is currently just a stub, but will eventually save the broadcastManager's
// state to file.
func (b *broadcastManager) save() error { return nil }
// recvHandler handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
@ -136,7 +146,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
}
// We can't receive video if we're in slate mode.
if m.getStatus(ma) == "slate" {
if m.getStatus(ma) == statusSlate {
m.errorLogWrite(m.log, w, "cannot receive video for this mac, status is slate", "mac", ma)
time.Sleep(recvErrorDelay)
return
@ -241,7 +251,7 @@ func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid {
if !ok {
return nil
}
return v.RV
return v.rv
}
// getStatus gets the broadcast's status corresponding to the provided MAC.
@ -252,7 +262,7 @@ func (m *broadcastManager) getStatus(ma MAC) string {
if !ok {
return ""
}
return v.Status
return v.status
}
// isActive returns true if a MAC is registered to the broadcast manager.
@ -277,36 +287,36 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
Input: config.InputManual,
InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP},
RTMPURL: broadcast.URL,
RTMPURL: broadcast.url,
LogLevel: logging.Debug,
}
var err error
broadcast.RV, err = revid.New(cfg, nil)
broadcast.rv, err = revid.New(cfg, nil)
if err != nil {
return fmt.Errorf("could not initialise revid: %w", err)
}
m.broadcasts[broadcast.MAC] = broadcast
err = broadcast.RV.Start()
m.broadcasts[broadcast.mac] = broadcast
err = broadcast.rv.Start()
if err != nil {
return fmt.Errorf("could not start revid pipeline: %w", err)
}
switch broadcast.Status {
case "create":
switch broadcast.status {
case statusCreate:
fallthrough
case "play":
signal, ok := m.slateExitSignals[broadcast.MAC]
case statusActive, statusPlay:
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.MAC)
delete(m.slateExitSignals, broadcast.mac)
}
case "slate":
case statusSlate:
m.log.Debug("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.MAC]
_, ok := m.slateExitSignals[broadcast.mac]
if ok {
m.log.Warning("already writing slate")
return nil
@ -315,37 +325,14 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.MAC] = signalCh
m.slateExitSignals[broadcast.mac] = signalCh
// Also create an errCh that will be used to communicate errors from the
// writeSlate routine.
errCh := make(chan error)
go writeSlate(broadcast.RV, errCh, signalCh, m.log)
// We'll watch out for any errors that happen within a 5 second window. This
// will indicate something seriously wrong with init, like a missing file etc.
const startupWindowDuration = 5 * time.Second
startupWindow := time.NewTimer(startupWindowDuration)
select {
// If this triggers first, we're all good.
case <-startupWindow.C:
m.log.Debug("out of error window")
// We consider any errors after this either to be normal i.e. as a result
// of stopping the slate input, or something that can not be handled, and
// only logged, therefore we can close the error channel errCh now.
// This will also let the routine know that errors can no longer be sent
// down errCh.
close(errCh)
// This means we got a slate error pretty early and need to let caller know.
case err := <-errCh:
return fmt.Errorf("could not write slate image: %w", err)
err = writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.Status)
return fmt.Errorf("unknown status string: %s", broadcast.status)
}
return nil
}
@ -353,12 +340,12 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
// delete removes a broadcast from the record.
func (m *broadcastManager) delete(broadcast Broadcast) error {
m.mu.Lock()
b, ok := m.broadcasts[broadcast.MAC]
b, ok := m.broadcasts[broadcast.mac]
if !ok {
return errors.New("no broadcast by that mac in record")
}
b.RV.Stop()
delete(m.broadcasts, broadcast.MAC)
b.rv.Stop()
delete(m.broadcasts, broadcast.mac)
m.mu.Unlock()
return nil
}
@ -380,12 +367,11 @@ func main() {
MaxAge: logMaxAge,
}
// Create netlogger to handle logging to cloud.
netLog := netlogger.New()
// Create logger that we call methods on to log, which in turn writes to the
// lumberjack and netloggers.
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
log := logging.New(logVerbosity, io.MultiWriter(fileLog), logSuppress)
global.SetLogger(log)
m, err := newBroadcastManager(log)
if err != nil {

View File

@ -27,6 +27,7 @@ package main
import (
"errors"
"fmt"
"io"
"time"
@ -35,6 +36,37 @@ import (
"bitbucket.org/ausocean/utils/logging"
)
func writeSlateAndCheckErrors(dst io.Writer, signalCh chan struct{}, log logging.Logger) error {
// Also create an errCh that will be used to communicate errors from the
// writeSlate routine.
errCh := make(chan error)
go writeSlate(dst, errCh, signalCh, log)
// We'll watch out for any errors that happen within a 5 second window. This
// will indicate something seriously wrong with init, like a missing file etc.
const startupWindowDuration = 5 * time.Second
startupWindow := time.NewTimer(startupWindowDuration)
select {
// If this triggers first, we're all good.
case <-startupWindow.C:
log.Debug("out of error window")
// We consider any errors after this either to be normal i.e. as a result
// of stopping the slate input, or something that can not be handled, and
// only logged, therefore we can close the error channel errCh now.
// This will also let the routine know that errors can no longer be sent
// down errCh.
close(errCh)
// This means we got a slate error pretty early and need to let caller know.
case err := <-errCh:
return fmt.Errorf("could not write slate image: %w", err)
}
return nil
}
// writeSlate is a routine that employs a file input device and h264 lexer to
// write a h264 encoded slate image to the provided revid pipeline.
func writeSlate(dst io.Writer, errCh chan error, exitSignal chan struct{}, log logging.Logger) {
@ -67,7 +99,7 @@ func writeSlate(dst io.Writer, errCh chan error, exitSignal chan struct{}, log l
case <-slateTimeoutTimer.C:
log.Warning("slate timeout")
case <-exitSignal:
log.Info("slate exist signal")
log.Info("slate exit signal")
}
log.Info("stopping file input")
fileInput.Stop()

View File

@ -29,9 +29,26 @@ import (
"net/http"
"strconv"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logging"
)
var loggingLevel = logging.Info
func newRevid(log logging.Logger, url string) (*revid.Revid, error) {
return revid.New(
config.Config{
Logger: log,
Input: config.InputManual,
InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP},
RTMPURL: url,
LogLevel: loggingLevel,
}, nil)
}
// writeError logs an error and writes to w in JSON format.
func (m *broadcastManager) errorLogWrite(log logging.Logger, w http.ResponseWriter, msg string, args ...interface{}) {
log.Error(msg, args...)

19
go.mod
View File

@ -1,6 +1,6 @@
module bitbucket.org/ausocean/av
go 1.16
go 1.18
require (
bitbucket.org/ausocean/iot v1.3.3
@ -16,8 +16,23 @@ require (
github.com/pkg/errors v0.9.1
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e
gocv.io/x/gocv v0.29.0
golang.org/x/tools v0.5.0 // indirect
gonum.org/v1/gonum v0.8.2
gonum.org/v1/plot v0.9.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
require (
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af // indirect
github.com/fogleman/gg v1.3.0 // indirect
github.com/go-fonts/liberation v0.1.1 // indirect
github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/mattetti/audio v0.0.0-20180912171649-01576cde1f21 // indirect
github.com/phpdave11/gofpdf v1.4.2 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/image v0.0.0-20210216034530-4410531fe030 // indirect
golang.org/x/text v0.3.5 // indirect
)

22
go.sum
View File

@ -96,7 +96,6 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e h1:3NIzz7weXhh3NToPgbtlQtKiVgerEaG4/nY2skGoGG0=
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e/go.mod h1:CaowXBWOiSGWEpBBV8LoVnQTVPV4ycyviC9IBLj8dRw=
github.com/yryz/ds18b20 v0.0.0-20180211073435-3cf383a40624/go.mod h1:MqFju5qeLDFh+S9PqxYT7TEla8xeW7bgGr/69q3oki0=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
@ -110,7 +109,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -129,18 +127,11 @@ golang.org/x/image v0.0.0-20210216034530-4410531fe030 h1:lP9pYkih3DUSC641giIXa2X
golang.org/x/image v0.0.0-20210216034530-4410531fe030/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -150,28 +141,15 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210304124612-50617c2ba197/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4=
golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=