Simplify createOrUpdate method

Every time we want to create or update, let's just start a new
revid pipeline. Now that our frequency of requests to vidforward
to create or update are much lower (because of the new state
machine) we can deal with the overhead of creating a new pipeline
and reduce the complexity of the code.

Approved-by: Trek Hopton
This commit is contained in:
Saxon Milton 2024-04-10 22:17:40 +00:00
parent b992358fd6
commit 2dd2c464c6
1 changed files with 33 additions and 57 deletions

View File

@ -38,7 +38,6 @@ import (
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
"runtime/debug"
@ -139,7 +138,7 @@ func terminationCallback(m *broadcastManager) func() {
return
}
m.log.Info("successfully saved broadcast manager state on termination signal")
logTrace(m.log.Debug,m.log.Warning)
logTrace(m.log.Debug, m.log.Warning)
}
}
@ -449,66 +448,43 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
// Try to get any old broadcasts for the provided MAC.
maybeOld, ok := m.broadcasts[broadcast.mac]
// If there's no old broadcast, we need to create a new revid pipeline.
if !ok {
maybeOld = &broadcast
}
m.log.Debug("attempting to start init or start pipeline")
var err error
maybeOld.rv, err = m.initOrStartPipeline(maybeOld.rv, broadcast.urls)
if err != nil {
return fmt.Errorf("could not get revid pipeline: %w", err)
}
m.log.Debug("finished attempting to init or start pipeline")
// If the URLS have changed, we need to update the revid pipeline.
// We won't enter this if we've just created a new revid pipeline.
if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) {
m.log.Debug("RTMP URLs have changed, updating pipeline config", "mac", broadcast.mac, "old", maybeOld.urls, "new", broadcast.urls)
urls := strings.Join(broadcast.urls, ",")
err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls})
if err != nil {
return fmt.Errorf("could not update revid: %w", err)
if ok {
if maybeOld.rv != nil {
m.log.Debug("stopping old revid pipeline", "mac", broadcast.mac)
closeDone := make(chan struct{})
go func() { maybeOld.rv.Stop(); close(closeDone) }()
select {
case <-closeDone:
m.log.Debug("stopped old revid pipeline", "mac", broadcast.mac)
case <-time.After(5 * time.Second):
m.log.Warning("could not stop old revid pipeline, looks like we'll end up with some leaked memory then :(", "mac", broadcast.mac)
}
}
m.log.Debug("finished updating pipeline config")
}
// If the status has changed, we need to update accordingly.
// i.e. if the status is slate, we need to start writing the slate image.
// We won't enter this if we've just created a new revid pipeline.
if maybeOld.status != broadcast.status {
m.log.Debug("status has changed, starting or stopping slate", "mac", broadcast.mac, "old", maybeOld.status, "new", broadcast.status)
switch broadcast.status {
case statusActive, statusPlay, statusCreate:
m.log.Info("updating configuration for mac", "mac", broadcast.mac)
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.mac)
}
case statusSlate:
m.log.Info("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.mac]
if ok {
m.log.Warning("already writing slate")
return nil
}
var err error
maybeOld.rv, err = newRevid(m.log, broadcast.urls)
if err != nil {
return fmt.Errorf("could not create new revid: %w", err)
}
maybeOld.rv.Start()
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.mac] = signalCh
m.log.Info("updating configuration for mac", "mac", broadcast.mac)
signal, ok := m.slateExitSignals[broadcast.mac]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.mac)
}
err = writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log)
if err != nil {
return fmt.Errorf("could not write slate and check for errors: %w", err)
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.status)
if broadcast.status == statusSlate {
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.mac] = signalCh
err = writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log)
if err != nil {
return fmt.Errorf("could not write slate and check for errors: %w", err)
}
}