From 26e77709aa0884317353f2715d1a50281fad9ffc Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Thu, 15 Jun 2023 06:39:31 +0930 Subject: [PATCH] vidforward: only update broadcast if changed For control requests, we only update aspects of the broadcast if they have changed. Therefore, if the urls list has not changed the revid pipeline is not updated, similarly if the status has not changed, we don't do anything there. --- cmd/vidforward/file.go | 2 +- cmd/vidforward/file_test.go | 24 ++++--- cmd/vidforward/main.go | 126 ++++++++++++++++++++++++------------ 3 files changed, 99 insertions(+), 53 deletions(-) diff --git a/cmd/vidforward/file.go b/cmd/vidforward/file.go index 1bb96624..39ab39b1 100644 --- a/cmd/vidforward/file.go +++ b/cmd/vidforward/file.go @@ -51,7 +51,7 @@ type BroadcastBasic struct { // ManagerBasic is a crude version of the BroadcastManager struct use to simplify // marshal/unmarshal overriding. type ManagerBasic struct { - Broadcasts map[MAC]Broadcast + Broadcasts map[MAC]*Broadcast SlateExitSignals []MAC } diff --git a/cmd/vidforward/file_test.go b/cmd/vidforward/file_test.go index ee283cb5..4d6af563 100644 --- a/cmd/vidforward/file_test.go +++ b/cmd/vidforward/file_test.go @@ -86,10 +86,10 @@ func TestBroadcastUnmarshal(t *testing.T) { tests := []struct { in []byte - expect Broadcast + expect *Broadcast }{ { - expect: Broadcast{ + expect: &Broadcast{ mac: testMAC, urls: []string{testURL}, status: statusActive, @@ -106,7 +106,7 @@ func TestBroadcastUnmarshal(t *testing.T) { t.Errorf("could not marshal json for test no. %d: %v", i, err) continue } - if !broadcastsEqual(got, test.expect) { + if !broadcastsEqual(&got, test.expect) { t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect) } } @@ -124,8 +124,8 @@ func TestBroadcastManagerMarshal(t *testing.T) { }{ { in: broadcastManager{ - broadcasts: map[MAC]Broadcast{ - testMAC: Broadcast{ + broadcasts: map[MAC]*Broadcast{ + testMAC: &Broadcast{ testMAC, []string{testURL}, statusSlate, @@ -165,8 +165,8 @@ func TestBroadcastManagerUnmarshal(t *testing.T) { { in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"), expect: broadcastManager{ - broadcasts: map[MAC]Broadcast{ - testMAC: Broadcast{ + broadcasts: map[MAC]*Broadcast{ + testMAC: &Broadcast{ testMAC, []string{testURL}, statusSlate, @@ -201,7 +201,7 @@ func broadcastManagersEqual(m1, m2 broadcastManager) bool { return true } -func broadcastMapsEqual(m1, m2 map[MAC]Broadcast) bool { +func broadcastMapsEqual(m1, m2 map[MAC]*Broadcast) bool { return mapsEqual(m1, m2, broadcastsEqual) } @@ -238,7 +238,13 @@ func watchdogNotifiersEqual(w1, w2 watchdogNotifier) bool { return true } -func broadcastsEqual(b1, b2 Broadcast) bool { +func broadcastsEqual(b1, b2 *Broadcast) bool { + if b1 == nil { + panic("b1 is nil") + } + if b2 == nil { + panic("b2 is nil") + } if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status || ((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) { return false diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 814462f4..b3a7e350 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -36,7 +36,9 @@ import ( "net" "net/http" "os" + "reflect" "strconv" + "strings" "sync" "time" @@ -89,13 +91,22 @@ type Broadcast struct { rv *revid.Revid // The revid pipeline which will handle forwarding to youtube. } +// equal checks to see if the broadcast is equal to another broadcast. +// NOTE: This is not a deep equal, and is only used to check if a broadcast +// should be updated. +func (b *Broadcast) equal(other Broadcast) bool { + return b.mac == other.mac && + b.status == other.status && + reflect.DeepEqual(b.urls, other.urls) +} + // broadcastManager manages a map of Broadcasts we expect to be forwarding video // for. The broadcastManager is communicated with through a series of HTTP request // handlers. There is a basic REST API through which we can add/delete broadcasts, // and a recv handler which is invoked when a camera wishes to get its video // forwarded to youtube. type broadcastManager struct { - broadcasts map[MAC]Broadcast + broadcasts map[MAC]*Broadcast slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. log logging.Logger dogNotifier *watchdogNotifier @@ -106,7 +117,7 @@ type broadcastManager struct { func newBroadcastManager(l logging.Logger) (*broadcastManager, error) { m := &broadcastManager{ log: l, - broadcasts: make(map[MAC]Broadcast), + broadcasts: make(map[MAC]*Broadcast), slateExitSignals: make(map[MAC]chan struct{}), } notifier, err := newWatchdogNotifier(l, terminationCallback(m)) @@ -359,57 +370,86 @@ func (m *broadcastManager) isActive(ma MAC) bool { return ok } -// createOrUpdate creates or updates a Broadcast record. The revid pipeline -// corresponding to the broadcast MAC is firsty configured/re-configured, and -// the pipeline is "started", which will ready it for receiving video on its -// input. In the case that the status is "slate", we will spin up a routine to -// handle writing a slate image to the pipeline. +// createOrUpdate creates or updates a Broadcast record. If the record already +// exists, it will be updated with the new data. If it doesn't exist, a new +// revid pipeline will be created and started. Actions occur according to the +// status field of the broadcast i.e. whether we expect data from a source +// or write the slate image. func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { m.mu.Lock() defer m.mu.Unlock() - var err error - broadcast.rv, err = newRevid(m.log, broadcast.urls) - if err != nil { - return fmt.Errorf("could not initialise revid: %w", err) + // Try to get any old broadcasts for the provided MAC. + maybeOld, ok := m.broadcasts[broadcast.mac] + + // If there's no old broadcast, we need to create a new revid pipeline. + if !ok { + var err error + broadcast.rv, err = newRevid(m.log, broadcast.urls) + if err != nil { + return fmt.Errorf("could not initialise revid: %w", err) + } + maybeOld = &broadcast } - m.broadcasts[broadcast.mac] = broadcast - err = broadcast.rv.Start() + // If the URLS have changed, we need to update the revid pipeline. + // We won't enter this if we've just created a new revid pipeline. + if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) { + urls := strings.Join(broadcast.urls, ",") + err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls}) + if err != nil { + return fmt.Errorf("could not update revid: %w", err) + } + } + + // If the status has changed, we need to update accordingly. + // i.e. if the status is slate, we need to start writing the slate image. + // We won't enter this if we've just created a new revid pipeline. + if maybeOld.status != broadcast.status { + switch broadcast.status { + case statusActive, statusPlay, statusCreate: + m.log.Info("updating configuration for mac", "mac", broadcast.mac) + signal, ok := m.slateExitSignals[broadcast.mac] + if ok { + close(signal) + delete(m.slateExitSignals, broadcast.mac) + } + case statusSlate: + m.log.Info("slate request") + // If there's a signal channel it means that we're already writing the slate + // image and theres nothing to do, so return. + _, ok := m.slateExitSignals[broadcast.mac] + if ok { + m.log.Warning("already writing slate") + return nil + } + + // First create a signal that can be used to stop the slate writing routine. + // This will be provided to the writeSlate routine below. + signalCh := make(chan struct{}) + m.slateExitSignals[broadcast.mac] = signalCh + + err := writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown status string: %s", broadcast.status) + } + } + + // We need to give the revid pipeline to the new broadcast record. + broadcast.rv = maybeOld.rv + + // And then replace the old record with the new one in the map. + m.broadcasts[broadcast.mac] = &broadcast + + // Start the pipeline (it's a no-op if it's already started). + err := broadcast.rv.Start() if err != nil { return fmt.Errorf("could not start revid pipeline: %w", err) } - switch broadcast.status { - case statusActive, statusPlay, statusCreate: - m.log.Info("updating configuration for mac", "mac", broadcast.mac) - signal, ok := m.slateExitSignals[broadcast.mac] - if ok { - close(signal) - delete(m.slateExitSignals, broadcast.mac) - } - case statusSlate: - m.log.Info("slate request") - // If there's a signal channel it means that we're already writing the slate - // image and theres nothing to do, so return. - _, ok := m.slateExitSignals[broadcast.mac] - if ok { - m.log.Warning("already writing slate") - return nil - } - - // First create a signal that can be used to stop the slate writing routine. - // This will be provided to the writeSlate routine below. - signalCh := make(chan struct{}) - m.slateExitSignals[broadcast.mac] = signalCh - - err = writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log) - if err != nil { - return err - } - default: - return fmt.Errorf("unknown status string: %s", broadcast.status) - } return nil }