Merged in modify-update (pull request #506)

vidforward: only update broadcast if changed

Resolves issues #405 and #406

Approved-by: Trek Hopton
Approved-by: David Sutton
This commit is contained in:
Saxon Milton 2023-06-21 10:34:37 +00:00
commit d232734d50
3 changed files with 99 additions and 53 deletions

View File

@ -51,7 +51,7 @@ type BroadcastBasic struct {
// ManagerBasic is a crude version of the BroadcastManager struct use to simplify // ManagerBasic is a crude version of the BroadcastManager struct use to simplify
// marshal/unmarshal overriding. // marshal/unmarshal overriding.
type ManagerBasic struct { type ManagerBasic struct {
Broadcasts map[MAC]Broadcast Broadcasts map[MAC]*Broadcast
SlateExitSignals []MAC SlateExitSignals []MAC
} }

View File

@ -86,10 +86,10 @@ func TestBroadcastUnmarshal(t *testing.T) {
tests := []struct { tests := []struct {
in []byte in []byte
expect Broadcast expect *Broadcast
}{ }{
{ {
expect: Broadcast{ expect: &Broadcast{
mac: testMAC, mac: testMAC,
urls: []string{testURL}, urls: []string{testURL},
status: statusActive, status: statusActive,
@ -106,7 +106,7 @@ func TestBroadcastUnmarshal(t *testing.T) {
t.Errorf("could not marshal json for test no. %d: %v", i, err) t.Errorf("could not marshal json for test no. %d: %v", i, err)
continue continue
} }
if !broadcastsEqual(got, test.expect) { if !broadcastsEqual(&got, test.expect) {
t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect) t.Errorf("did not get expected result.\nGot: %v\nWnt: %v\n", got, test.expect)
} }
} }
@ -124,8 +124,8 @@ func TestBroadcastManagerMarshal(t *testing.T) {
}{ }{
{ {
in: broadcastManager{ in: broadcastManager{
broadcasts: map[MAC]Broadcast{ broadcasts: map[MAC]*Broadcast{
testMAC: Broadcast{ testMAC: &Broadcast{
testMAC, testMAC,
[]string{testURL}, []string{testURL},
statusSlate, statusSlate,
@ -165,8 +165,8 @@ func TestBroadcastManagerUnmarshal(t *testing.T) {
{ {
in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"), in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"),
expect: broadcastManager{ expect: broadcastManager{
broadcasts: map[MAC]Broadcast{ broadcasts: map[MAC]*Broadcast{
testMAC: Broadcast{ testMAC: &Broadcast{
testMAC, testMAC,
[]string{testURL}, []string{testURL},
statusSlate, statusSlate,
@ -201,7 +201,7 @@ func broadcastManagersEqual(m1, m2 broadcastManager) bool {
return true return true
} }
func broadcastMapsEqual(m1, m2 map[MAC]Broadcast) bool { func broadcastMapsEqual(m1, m2 map[MAC]*Broadcast) bool {
return mapsEqual(m1, m2, broadcastsEqual) return mapsEqual(m1, m2, broadcastsEqual)
} }
@ -238,7 +238,13 @@ func watchdogNotifiersEqual(w1, w2 watchdogNotifier) bool {
return true return true
} }
func broadcastsEqual(b1, b2 Broadcast) bool { func broadcastsEqual(b1, b2 *Broadcast) bool {
if b1 == nil {
panic("b1 is nil")
}
if b2 == nil {
panic("b2 is nil")
}
if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status || if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status ||
((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) { ((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) {
return false return false

View File

@ -36,7 +36,9 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"reflect"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -89,13 +91,22 @@ type Broadcast struct {
rv *revid.Revid // The revid pipeline which will handle forwarding to youtube. rv *revid.Revid // The revid pipeline which will handle forwarding to youtube.
} }
// equal checks to see if the broadcast is equal to another broadcast.
// NOTE: This is not a deep equal, and is only used to check if a broadcast
// should be updated.
func (b *Broadcast) equal(other Broadcast) bool {
return b.mac == other.mac &&
b.status == other.status &&
reflect.DeepEqual(b.urls, other.urls)
}
// broadcastManager manages a map of Broadcasts we expect to be forwarding video // broadcastManager manages a map of Broadcasts we expect to be forwarding video
// for. The broadcastManager is communicated with through a series of HTTP request // for. The broadcastManager is communicated with through a series of HTTP request
// handlers. There is a basic REST API through which we can add/delete broadcasts, // handlers. There is a basic REST API through which we can add/delete broadcasts,
// and a recv handler which is invoked when a camera wishes to get its video // and a recv handler which is invoked when a camera wishes to get its video
// forwarded to youtube. // forwarded to youtube.
type broadcastManager struct { type broadcastManager struct {
broadcasts map[MAC]Broadcast broadcasts map[MAC]*Broadcast
slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image.
log logging.Logger log logging.Logger
dogNotifier *watchdogNotifier dogNotifier *watchdogNotifier
@ -106,7 +117,7 @@ type broadcastManager struct {
func newBroadcastManager(l logging.Logger) (*broadcastManager, error) { func newBroadcastManager(l logging.Logger) (*broadcastManager, error) {
m := &broadcastManager{ m := &broadcastManager{
log: l, log: l,
broadcasts: make(map[MAC]Broadcast), broadcasts: make(map[MAC]*Broadcast),
slateExitSignals: make(map[MAC]chan struct{}), slateExitSignals: make(map[MAC]chan struct{}),
} }
notifier, err := newWatchdogNotifier(l, terminationCallback(m)) notifier, err := newWatchdogNotifier(l, terminationCallback(m))
@ -359,57 +370,86 @@ func (m *broadcastManager) isActive(ma MAC) bool {
return ok return ok
} }
// createOrUpdate creates or updates a Broadcast record. The revid pipeline // createOrUpdate creates or updates a Broadcast record. If the record already
// corresponding to the broadcast MAC is firsty configured/re-configured, and // exists, it will be updated with the new data. If it doesn't exist, a new
// the pipeline is "started", which will ready it for receiving video on its // revid pipeline will be created and started. Actions occur according to the
// input. In the case that the status is "slate", we will spin up a routine to // status field of the broadcast i.e. whether we expect data from a source
// handle writing a slate image to the pipeline. // or write the slate image.
func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
var err error // Try to get any old broadcasts for the provided MAC.
broadcast.rv, err = newRevid(m.log, broadcast.urls) maybeOld, ok := m.broadcasts[broadcast.mac]
if err != nil {
return fmt.Errorf("could not initialise revid: %w", err) // If there's no old broadcast, we need to create a new revid pipeline.
if !ok {
var err error
broadcast.rv, err = newRevid(m.log, broadcast.urls)
if err != nil {
return fmt.Errorf("could not initialise revid: %w", err)
}
maybeOld = &broadcast
} }
m.broadcasts[broadcast.mac] = broadcast // If the URLS have changed, we need to update the revid pipeline.
err = broadcast.rv.Start() // We won't enter this if we've just created a new revid pipeline.
if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) {
urls := strings.Join(broadcast.urls, ",")
err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls})
if err != nil {
return fmt.Errorf("could not update revid: %w", err)
}
}
// If the status has changed, we need to update accordingly.
// i.e. if the status is slate, we need to start writing the slate image.
// We won't enter this if we've just created a new revid pipeline.
if maybeOld.status != broadcast.status {
switch broadcast.status {
case statusActive, statusPlay, statusCreate:
m.log.Info("updating configuration for mac", "mac", broadcast.mac)
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.mac)
}
case statusSlate:
m.log.Info("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.mac]
if ok {
m.log.Warning("already writing slate")
return nil
}
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.mac] = signalCh
err := writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.status)
}
}
// We need to give the revid pipeline to the new broadcast record.
broadcast.rv = maybeOld.rv
// And then replace the old record with the new one in the map.
m.broadcasts[broadcast.mac] = &broadcast
// Start the pipeline (it's a no-op if it's already started).
err := broadcast.rv.Start()
if err != nil { if err != nil {
return fmt.Errorf("could not start revid pipeline: %w", err) return fmt.Errorf("could not start revid pipeline: %w", err)
} }
switch broadcast.status {
case statusActive, statusPlay, statusCreate:
m.log.Info("updating configuration for mac", "mac", broadcast.mac)
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.mac)
}
case statusSlate:
m.log.Info("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.mac]
if ok {
m.log.Warning("already writing slate")
return nil
}
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.mac] = signalCh
err = writeSlateAndCheckErrors(broadcast.rv, signalCh, m.log)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.status)
}
return nil return nil
} }