diff --git a/cmd/vidforward/file.go b/cmd/vidforward/file.go index 1bb96624..39ab39b1 100644 --- a/cmd/vidforward/file.go +++ b/cmd/vidforward/file.go @@ -51,7 +51,7 @@ type BroadcastBasic struct { // ManagerBasic is a crude version of the BroadcastManager struct use to simplify // marshal/unmarshal overriding. type ManagerBasic struct { - Broadcasts map[MAC]Broadcast + Broadcasts map[MAC]*Broadcast SlateExitSignals []MAC } diff --git a/cmd/vidforward/file_test.go b/cmd/vidforward/file_test.go index ee283cb5..4d6af563 100644 --- a/cmd/vidforward/file_test.go +++ b/cmd/vidforward/file_test.go @@ -86,10 +86,10 @@ func TestBroadcastUnmarshal(t *testing.T) { tests := []struct { in []byte - expect Broadcast + expect *Broadcast }{ { - expect: Broadcast{ + expect: &Broadcast{ mac: testMAC, urls: []string{testURL}, status: statusActive, @@ -106,7 +106,7 @@ func TestBroadcastUnmarshal(t *testing.T) { t.Errorf("could not marshal json for test no. %d: %v", i, err) continue } - if !broadcastsEqual(got, test.expect) { + if !broadcastsEqual(&got, test.expect) { t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect) } } @@ -124,8 +124,8 @@ func TestBroadcastManagerMarshal(t *testing.T) { }{ { in: broadcastManager{ - broadcasts: map[MAC]Broadcast{ - testMAC: Broadcast{ + broadcasts: map[MAC]*Broadcast{ + testMAC: &Broadcast{ testMAC, []string{testURL}, statusSlate, @@ -165,8 +165,8 @@ func TestBroadcastManagerUnmarshal(t *testing.T) { { in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"), expect: broadcastManager{ - broadcasts: map[MAC]Broadcast{ - testMAC: Broadcast{ + broadcasts: map[MAC]*Broadcast{ + testMAC: &Broadcast{ testMAC, []string{testURL}, statusSlate, @@ -201,7 +201,7 @@ func broadcastManagersEqual(m1, m2 broadcastManager) bool { return true } -func broadcastMapsEqual(m1, m2 map[MAC]Broadcast) bool { +func broadcastMapsEqual(m1, m2 map[MAC]*Broadcast) bool { return mapsEqual(m1, m2, broadcastsEqual) } @@ -238,7 +238,13 @@ func watchdogNotifiersEqual(w1, w2 watchdogNotifier) bool { return true } -func broadcastsEqual(b1, b2 Broadcast) bool { +func broadcastsEqual(b1, b2 *Broadcast) bool { + if b1 == nil { + panic("b1 is nil") + } + if b2 == nil { + panic("b2 is nil") + } if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status || ((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) { return false diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 814462f4..b3a7e350 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -36,7 +36,9 @@ import ( "net" "net/http" "os" + "reflect" "strconv" + "strings" "sync" "time" @@ -89,13 +91,22 @@ type Broadcast struct { rv *revid.Revid // The revid pipeline which will handle forwarding to youtube. } +// equal checks to see if the broadcast is equal to another broadcast. +// NOTE: This is not a deep equal, and is only used to check if a broadcast +// should be updated. +func (b *Broadcast) equal(other Broadcast) bool { + return b.mac == other.mac && + b.status == other.status && + reflect.DeepEqual(b.urls, other.urls) +} + // broadcastManager manages a map of Broadcasts we expect to be forwarding video // for. The broadcastManager is communicated with through a series of HTTP request // handlers. There is a basic REST API through which we can add/delete broadcasts, // and a recv handler which is invoked when a camera wishes to get its video // forwarded to youtube. type broadcastManager struct { - broadcasts map[MAC]Broadcast + broadcasts map[MAC]*Broadcast slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. log logging.Logger dogNotifier *watchdogNotifier @@ -106,7 +117,7 @@ type broadcastManager struct { func newBroadcastManager(l logging.Logger) (*broadcastManager, error) { m := &broadcastManager{ log: l, - broadcasts: make(map[MAC]Broadcast), + broadcasts: make(map[MAC]*Broadcast), slateExitSignals: make(map[MAC]chan struct{}), } notifier, err := newWatchdogNotifier(l, terminationCallback(m)) @@ -359,57 +370,86 @@ func (m *broadcastManager) isActive(ma MAC) bool { return ok } -// createOrUpdate creates or updates a Broadcast record. The revid pipeline -// corresponding to the broadcast MAC is firsty configured/re-configured, and -// the pipeline is "started", which will ready it for receiving video on its -// input. In the case that the status is "slate", we will spin up a routine to -// handle writing a slate image to the pipeline. +// createOrUpdate creates or updates a Broadcast record. If the record already +// exists, it will be updated with the new data. If it doesn't exist, a new +// revid pipeline will be created and started. Actions occur according to the +// status field of the broadcast i.e. whether we expect data from a source +// or write the slate image. func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { m.mu.Lock() defer m.mu.Unlock() - var err error - broadcast.rv, err = newRevid(m.log, broadcast.urls) - if err != nil { - return fmt.Errorf("could not initialise revid: %w", err) + // Try to get any old broadcasts for the provided MAC. + maybeOld, ok := m.broadcasts[broadcast.mac] + + // If there's no old broadcast, we need to create a new revid pipeline. + if !ok { + var err error + broadcast.rv, err = newRevid(m.log, broadcast.urls) + if err != nil { + return fmt.Errorf("could not initialise revid: %w", err) + } + maybeOld = &broadcast } - m.broadcasts[broadcast.mac] = broadcast - err = broadcast.rv.Start() + // If the URLS have changed, we need to update the revid pipeline. + // We won't enter this if we've just created a new revid pipeline. + if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) { + urls := strings.Join(broadcast.urls, ",") + err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls}) + if err != nil { + return fmt.Errorf("could not update revid: %w", err) + } + } + + // If the status has changed, we need to update accordingly. + // i.e. if the status is slate, we need to start writing the slate image. + // We won't enter this if we've just created a new revid pipeline. + if maybeOld.status != broadcast.status { + switch broadcast.status { + case statusActive, statusPlay, statusCreate: + m.log.Info("updating configuration for mac", "mac", broadcast.mac) + signal, ok := m.slateExitSignals[broadcast.mac] + if ok { + close(signal) + delete(m.slateExitSignals, broadcast.mac) + } + case statusSlate: + m.log.Info("slate request") + // If there's a signal channel it means that we're already writing the slate + // image and theres nothing to do, so return. + _, ok := m.slateExitSignals[broadcast.mac] + if ok { + m.log.Warning("already writing slate") + return nil + } + + // First create a signal that can be used to stop the slate writing routine. + // This will be provided to the writeSlate routine below. + signalCh := make(chan struct{}) + m.slateExitSignals[broadcast.mac] = signalCh + + err := writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown status string: %s", broadcast.status) + } + } + + // We need to give the revid pipeline to the new broadcast record. + broadcast.rv = maybeOld.rv + + // And then replace the old record with the new one in the map. + m.broadcasts[broadcast.mac] = &broadcast + + // Start the pipeline (it's a no-op if it's already started). + err := broadcast.rv.Start() if err != nil { return fmt.Errorf("could not start revid pipeline: %w", err) } - switch broadcast.status { - case statusActive, statusPlay, statusCreate: - m.log.Info("updating configuration for mac", "mac", broadcast.mac) - signal, ok := m.slateExitSignals[broadcast.mac] - if ok { - close(signal) - delete(m.slateExitSignals, broadcast.mac) - } - case statusSlate: - m.log.Info("slate request") - // If there's a signal channel it means that we're already writing the slate - // image and theres nothing to do, so return. - _, ok := m.slateExitSignals[broadcast.mac] - if ok { - m.log.Warning("already writing slate") - return nil - } - - // First create a signal that can be used to stop the slate writing routine. - // This will be provided to the writeSlate routine below. - signalCh := make(chan struct{}) - m.slateExitSignals[broadcast.mac] = signalCh - - err = writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log) - if err != nil { - return err - } - default: - return fmt.Errorf("unknown status string: %s", broadcast.status) - } return nil }