diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 1d6ee8a7..7c3e7e8d 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -380,22 +380,26 @@ func (m *broadcastManager) getPipeline(ma MAC) (*revid.Revid, error) { panic("shouldn't be getting pipeline if this mac isn't registered") } + return m.initOrStartPipeline(b.rv, b.urls) +} + +// initOrStartPipeline ensures that provided Revid pointer points to an +// initialised and running revid pipeline. +func (m *broadcastManager) initOrStartPipeline(rv *revid.Revid, urls []string) (*revid.Revid, error){ var err error - if b.rv == nil { - b.rv, err = newRevid(m.log, b.urls) + if rv == nil { + rv, err = newRevid(m.log, urls) if err != nil { return nil, fmt.Errorf("could not create new revid: %v",err) } } - - if !b.rv.Running() { - err = b.rv.Start() + if !rv.Running() { + err = rv.Start() if err != nil { return nil, fmt.Errorf("could not start revid pipeline: %v",err) } } - - return b.rv, nil + return rv, nil } // getStatus gets the broadcast's status corresponding to the provided MAC. @@ -432,15 +436,15 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { // If there's no old broadcast, we need to create a new revid pipeline. if !ok { - m.log.Debug("no old broadcast, creating new pipeline","mac",broadcast.mac) - var err error - broadcast.rv, err = newRevid(m.log, broadcast.urls) - if err != nil { - return fmt.Errorf("could not initialise revid: %w", err) - } maybeOld = &broadcast } + var err error + maybeOld.rv, err = m.initOrStartPipeline(maybeOld.rv, broadcast.urls) + if err != nil { + return fmt.Errorf("could not get revid pipeline: %w", err) + } + // If the URLS have changed, we need to update the revid pipeline. // We won't enter this if we've just created a new revid pipeline. if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) { @@ -480,9 +484,9 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { signalCh := make(chan struct{}) m.slateExitSignals[broadcast.mac] = signalCh - err := writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log) + err = writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log) if err != nil { - return err + return fmt.Errorf("could not write slate and check for errors: %w", err) } default: return fmt.Errorf("unknown status string: %s", broadcast.status) @@ -495,12 +499,6 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { // And then replace the old record with the new one in the map. m.broadcasts[broadcast.mac] = &broadcast - // Start the pipeline (it's a no-op if it's already started). - err := broadcast.rv.Start() - if err != nil { - return fmt.Errorf("could not start revid pipeline: %w", err) - } - return nil }