vidforward: only update broadcast if changed

For control requests, we only update aspects of the broadcast if
they have changed. Therefore, if the urls list has not changed
the revid pipeline is not updated, similarly if the status has
not changed, we don't do anything there.
This commit is contained in:
Saxon Nelson-Milton 2023-06-15 06:39:31 +09:30
parent 41fd1e345b
commit 26e77709aa
3 changed files with 99 additions and 53 deletions

View File

@ -51,7 +51,7 @@ type BroadcastBasic struct {
// ManagerBasic is a crude version of the BroadcastManager struct use to simplify // ManagerBasic is a crude version of the BroadcastManager struct use to simplify
// marshal/unmarshal overriding. // marshal/unmarshal overriding.
type ManagerBasic struct { type ManagerBasic struct {
Broadcasts map[MAC]Broadcast Broadcasts map[MAC]*Broadcast
SlateExitSignals []MAC SlateExitSignals []MAC
} }

View File

@ -86,10 +86,10 @@ func TestBroadcastUnmarshal(t *testing.T) {
tests := []struct { tests := []struct {
in []byte in []byte
expect Broadcast expect *Broadcast
}{ }{
{ {
expect: Broadcast{ expect: &Broadcast{
mac: testMAC, mac: testMAC,
urls: []string{testURL}, urls: []string{testURL},
status: statusActive, status: statusActive,
@ -106,7 +106,7 @@ func TestBroadcastUnmarshal(t *testing.T) {
t.Errorf("could not marshal json for test no. %d: %v", i, err) t.Errorf("could not marshal json for test no. %d: %v", i, err)
continue continue
} }
if !broadcastsEqual(got, test.expect) { if !broadcastsEqual(&got, test.expect) {
t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect) t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect)
} }
} }
@ -124,8 +124,8 @@ func TestBroadcastManagerMarshal(t *testing.T) {
}{ }{
{ {
in: broadcastManager{ in: broadcastManager{
broadcasts: map[MAC]Broadcast{ broadcasts: map[MAC]*Broadcast{
testMAC: Broadcast{ testMAC: &Broadcast{
testMAC, testMAC,
[]string{testURL}, []string{testURL},
statusSlate, statusSlate,
@ -165,8 +165,8 @@ func TestBroadcastManagerUnmarshal(t *testing.T) {
{ {
in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"), in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"),
expect: broadcastManager{ expect: broadcastManager{
broadcasts: map[MAC]Broadcast{ broadcasts: map[MAC]*Broadcast{
testMAC: Broadcast{ testMAC: &Broadcast{
testMAC, testMAC,
[]string{testURL}, []string{testURL},
statusSlate, statusSlate,
@ -201,7 +201,7 @@ func broadcastManagersEqual(m1, m2 broadcastManager) bool {
return true return true
} }
func broadcastMapsEqual(m1, m2 map[MAC]Broadcast) bool { func broadcastMapsEqual(m1, m2 map[MAC]*Broadcast) bool {
return mapsEqual(m1, m2, broadcastsEqual) return mapsEqual(m1, m2, broadcastsEqual)
} }
@ -238,7 +238,13 @@ func watchdogNotifiersEqual(w1, w2 watchdogNotifier) bool {
return true return true
} }
func broadcastsEqual(b1, b2 Broadcast) bool { func broadcastsEqual(b1, b2 *Broadcast) bool {
if b1 == nil {
panic("b1 is nil")
}
if b2 == nil {
panic("b2 is nil")
}
if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status || if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status ||
((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) { ((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) {
return false return false

View File

@ -36,7 +36,9 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"reflect"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -89,13 +91,22 @@ type Broadcast struct {
rv *revid.Revid // The revid pipeline which will handle forwarding to youtube. rv *revid.Revid // The revid pipeline which will handle forwarding to youtube.
} }
// equal checks to see if the broadcast is equal to another broadcast.
// NOTE: This is not a deep equal, and is only used to check if a broadcast
// should be updated.
func (b *Broadcast) equal(other Broadcast) bool {
return b.mac == other.mac &&
b.status == other.status &&
reflect.DeepEqual(b.urls, other.urls)
}
// broadcastManager manages a map of Broadcasts we expect to be forwarding video // broadcastManager manages a map of Broadcasts we expect to be forwarding video
// for. The broadcastManager is communicated with through a series of HTTP request // for. The broadcastManager is communicated with through a series of HTTP request
// handlers. There is a basic REST API through which we can add/delete broadcasts, // handlers. There is a basic REST API through which we can add/delete broadcasts,
// and a recv handler which is invoked when a camera wishes to get its video // and a recv handler which is invoked when a camera wishes to get its video
// forwarded to youtube. // forwarded to youtube.
type broadcastManager struct { type broadcastManager struct {
broadcasts map[MAC]Broadcast broadcasts map[MAC]*Broadcast
slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image.
log logging.Logger log logging.Logger
dogNotifier *watchdogNotifier dogNotifier *watchdogNotifier
@ -106,7 +117,7 @@ type broadcastManager struct {
func newBroadcastManager(l logging.Logger) (*broadcastManager, error) { func newBroadcastManager(l logging.Logger) (*broadcastManager, error) {
m := &broadcastManager{ m := &broadcastManager{
log: l, log: l,
broadcasts: make(map[MAC]Broadcast), broadcasts: make(map[MAC]*Broadcast),
slateExitSignals: make(map[MAC]chan struct{}), slateExitSignals: make(map[MAC]chan struct{}),
} }
notifier, err := newWatchdogNotifier(l, terminationCallback(m)) notifier, err := newWatchdogNotifier(l, terminationCallback(m))
@ -359,57 +370,86 @@ func (m *broadcastManager) isActive(ma MAC) bool {
return ok return ok
} }
// createOrUpdate creates or updates a Broadcast record. The revid pipeline // createOrUpdate creates or updates a Broadcast record. If the record already
// corresponding to the broadcast MAC is firsty configured/re-configured, and // exists, it will be updated with the new data. If it doesn't exist, a new
// the pipeline is "started", which will ready it for receiving video on its // revid pipeline will be created and started. Actions occur according to the
// input. In the case that the status is "slate", we will spin up a routine to // status field of the broadcast i.e. whether we expect data from a source
// handle writing a slate image to the pipeline. // or write the slate image.
func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
var err error // Try to get any old broadcasts for the provided MAC.
broadcast.rv, err = newRevid(m.log, broadcast.urls) maybeOld, ok := m.broadcasts[broadcast.mac]
if err != nil {
return fmt.Errorf("could not initialise revid: %w", err) // If there's no old broadcast, we need to create a new revid pipeline.
if !ok {
var err error
broadcast.rv, err = newRevid(m.log, broadcast.urls)
if err != nil {
return fmt.Errorf("could not initialise revid: %w", err)
}
maybeOld = &broadcast
} }
m.broadcasts[broadcast.mac] = broadcast // If the URLS have changed, we need to update the revid pipeline.
err = broadcast.rv.Start() // We won't enter this if we've just created a new revid pipeline.
if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) {
urls := strings.Join(broadcast.urls, ",")
err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls})
if err != nil {
return fmt.Errorf("could not update revid: %w", err)
}
}
// If the status has changed, we need to update accordingly.
// i.e. if the status is slate, we need to start writing the slate image.
// We won't enter this if we've just created a new revid pipeline.
if maybeOld.status != broadcast.status {
switch broadcast.status {
case statusActive, statusPlay, statusCreate:
m.log.Info("updating configuration for mac", "mac", broadcast.mac)
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.mac)
}
case statusSlate:
m.log.Info("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.mac]
if ok {
m.log.Warning("already writing slate")
return nil
}
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.mac] = signalCh
err := writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.status)
}
}
// We need to give the revid pipeline to the new broadcast record.
broadcast.rv = maybeOld.rv
// And then replace the old record with the new one in the map.
m.broadcasts[broadcast.mac] = &broadcast
// Start the pipeline (it's a no-op if it's already started).
err := broadcast.rv.Start()
if err != nil { if err != nil {
return fmt.Errorf("could not start revid pipeline: %w", err) return fmt.Errorf("could not start revid pipeline: %w", err)
} }
switch broadcast.status {
case statusActive, statusPlay, statusCreate:
m.log.Info("updating configuration for mac", "mac", broadcast.mac)
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.mac)
}
case statusSlate:
m.log.Info("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.mac]
if ok {
m.log.Warning("already writing slate")
return nil
}
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.mac] = signalCh
err = writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.status)
}
return nil return nil
} }