From 2dd2c464c653d4756db29bf6ac9ecc4038dffdd0 Mon Sep 17 00:00:00 2001 From: Saxon Milton Date: Wed, 10 Apr 2024 22:17:40 +0000 Subject: [PATCH 1/2] Simplify createOrUpdate method Every time we want to create or update, let's just start a new revid pipeline. Now that our frequency of requests to vidforward to create or update are much lower (because of the new state machine) we can deal with the overhead of creating a new pipeline and reduce the complexity of the code. Approved-by: Trek Hopton --- cmd/vidforward/main.go | 90 ++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 57 deletions(-) diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 94bafc41..a07b3b2d 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -38,7 +38,6 @@ import ( "os" "reflect" "strconv" - "strings" "sync" "time" "runtime/debug" @@ -139,7 +138,7 @@ func terminationCallback(m *broadcastManager) func() { return } m.log.Info("successfully saved broadcast manager state on termination signal") - logTrace(m.log.Debug,m.log.Warning) + logTrace(m.log.Debug, m.log.Warning) } } @@ -449,66 +448,43 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { // Try to get any old broadcasts for the provided MAC. maybeOld, ok := m.broadcasts[broadcast.mac] - - // If there's no old broadcast, we need to create a new revid pipeline. - if !ok { - maybeOld = &broadcast - } - - m.log.Debug("attempting to start init or start pipeline") - var err error - maybeOld.rv, err = m.initOrStartPipeline(maybeOld.rv, broadcast.urls) - if err != nil { - return fmt.Errorf("could not get revid pipeline: %w", err) - } - m.log.Debug("finished attempting to init or start pipeline") - - // If the URLS have changed, we need to update the revid pipeline. - // We won't enter this if we've just created a new revid pipeline. - if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) { - m.log.Debug("RTMP URLs have changed, updating pipeline config", "mac", broadcast.mac, "old", maybeOld.urls, "new", broadcast.urls) - urls := strings.Join(broadcast.urls, ",") - err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls}) - if err != nil { - return fmt.Errorf("could not update revid: %w", err) + if ok { + if maybeOld.rv != nil { + m.log.Debug("stopping old revid pipeline", "mac", broadcast.mac) + closeDone := make(chan struct{}) + go func() { maybeOld.rv.Stop(); close(closeDone) }() + select { + case <-closeDone: + m.log.Debug("stopped old revid pipeline", "mac", broadcast.mac) + case <-time.After(5 * time.Second): + m.log.Warning("could not stop old revid pipeline, looks like we'll end up with some leaked memory then :(", "mac", broadcast.mac) + } } - m.log.Debug("finished updating pipeline config") } - // If the status has changed, we need to update accordingly. - // i.e. if the status is slate, we need to start writing the slate image. - // We won't enter this if we've just created a new revid pipeline. - if maybeOld.status != broadcast.status { - m.log.Debug("status has changed, starting or stopping slate", "mac", broadcast.mac, "old", maybeOld.status, "new", broadcast.status) - switch broadcast.status { - case statusActive, statusPlay, statusCreate: - m.log.Info("updating configuration for mac", "mac", broadcast.mac) - signal, ok := m.slateExitSignals[broadcast.mac] - if ok { - close(signal) - delete(m.slateExitSignals, broadcast.mac) - } - case statusSlate: - m.log.Info("slate request") - // If there's a signal channel it means that we're already writing the slate - // image and theres nothing to do, so return. - _, ok := m.slateExitSignals[broadcast.mac] - if ok { - m.log.Warning("already writing slate") - return nil - } + var err error + maybeOld.rv, err = newRevid(m.log, broadcast.urls) + if err != nil { + return fmt.Errorf("could not create new revid: %w", err) + } + maybeOld.rv.Start() - // First create a signal that can be used to stop the slate writing routine. - // This will be provided to the writeSlate routine below. - signalCh := make(chan struct{}) - m.slateExitSignals[broadcast.mac] = signalCh + m.log.Info("updating configuration for mac", "mac", broadcast.mac) + signal, ok := m.slateExitSignals[broadcast.mac] + if ok { + close(signal) + delete(m.slateExitSignals, broadcast.mac) + } - err = writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log) - if err != nil { - return fmt.Errorf("could not write slate and check for errors: %w", err) - } - default: - return fmt.Errorf("unknown status string: %s", broadcast.status) + if broadcast.status == statusSlate { + // First create a signal that can be used to stop the slate writing routine. + // This will be provided to the writeSlate routine below. + signalCh := make(chan struct{}) + m.slateExitSignals[broadcast.mac] = signalCh + + err = writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log) + if err != nil { + return fmt.Errorf("could not write slate and check for errors: %w", err) } } From 9f56bee095184c0c580fdd50be2b352424572273 Mon Sep 17 00:00:00 2001 From: Trek Hopton Date: Wed, 17 Apr 2024 07:17:57 +0000 Subject: [PATCH 2/2] cmd/treatment: remove build tags for treatment This was done because I don't know why this wouldn't build on circleci with the standard pi audio command in pi.go. cmd/treatment: merge pi.go into main.go There's no need for two files Approved-by: Alan Noble --- cmd/treatment/circleci.go | 34 ---------------------------------- cmd/treatment/main.go | 25 ++++++++++++++----------- cmd/treatment/pi3.go | 38 -------------------------------------- 3 files changed, 14 insertions(+), 83 deletions(-) delete mode 100644 cmd/treatment/circleci.go delete mode 100644 cmd/treatment/pi3.go diff --git a/cmd/treatment/circleci.go b/cmd/treatment/circleci.go deleted file mode 100644 index 5e496075..00000000 --- a/cmd/treatment/circleci.go +++ /dev/null @@ -1,34 +0,0 @@ -// +build !pi3 - -/* -DESCRIPTION - circleci.go defines a dummy initialisation command for running on circleci. - -AUTHORS - Ella Pietraroia - Scott Barnard - -LICENSE - Copyright (C) 2020 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package main - -import "bitbucket.org/ausocean/utils/logging" - -const audioCmd = "" - -func initCommand(log logging.Logger) {} diff --git a/cmd/treatment/main.go b/cmd/treatment/main.go index c6460456..825eb4f8 100644 --- a/cmd/treatment/main.go +++ b/cmd/treatment/main.go @@ -81,17 +81,21 @@ const ( // Channel modes. const ( modeStereo = "Stereo" - modeLeft = "LeftMono" - modeRight = "RightMono" - modeMute = "Mute" + modeLeft = "LeftMono" + modeRight = "RightMono" + modeMute = "Mute" ) // Variable map to send to netreceiver/vidgrind. var varMap = map[string]string{ - "SpeakerMode": "enum:"+strings.Join([]string{modeStereo, modeLeft, modeRight, modeMute}, ","), - "AudioFilePath": "string", + "SpeakerMode": "enum:" + strings.Join([]string{modeStereo, modeLeft, modeRight, modeMute}, ","), + "AudioFilePath": "string", } +const audioCmd = "aplay" + +func initCommand(l logging.Logger) { checkPath(audioCmd, l) } + func main() { mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) @@ -214,15 +218,15 @@ func run(rv *revid.Revid, ns *netsender.Sender, file *string, l logging.Logger, // setChannels handles the muting of one, both, or neither of the channels. It takes in SpeakerMode // and sets the relevant volumes. -func setChannels(mode string, l logging.Logger) error { +func setChannels(mode string, l logging.Logger) error { l.Info("mode is", "mode", mode) // Set the volume of each channel. vols := map[string]string{ modeStereo: "100%,100%", - modeLeft: "0%,100%", - modeRight: "100%,0%", - modeMute: "0%,0%", + modeLeft: "0%,100%", + modeRight: "100%,0%", + modeMute: "0%,0%", }[mode] if vols == "" { l.Warning("invalid SpeakeMode", "SpeakerMode", mode) @@ -231,7 +235,7 @@ func setChannels(mode string, l logging.Logger) error { // Create the command to change the channel volumes. cmd := exec.Command("amixer", "sset", "Speaker", vols) - + // Pipe the output to stdout and stderr. outPipe, err := cmd.StdoutPipe() if err != nil { @@ -274,7 +278,6 @@ func setChannels(mode string, l logging.Logger) error { return fmt.Errorf("channel set command failed: %s", &errBuff) } - l.Info("mode set to", "mode", mode) return nil } diff --git a/cmd/treatment/pi3.go b/cmd/treatment/pi3.go deleted file mode 100644 index bbad0764..00000000 --- a/cmd/treatment/pi3.go +++ /dev/null @@ -1,38 +0,0 @@ -// +build pi3 - -/* -DESCRIPTION - pi3.go defines an initialisation function for use when running on the - Raspberry Pi 3. - -AUTHORS - Ella Pietraroia - Scott Barnard - Saxon Nelson-Milton - -LICENSE - Copyright (C) 2020 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package main - -import ( - "bitbucket.org/ausocean/utils/logging" -) - -const audioCmd = "aplay" - -func initCommand(l logging.Logger) { checkPath(audioCmd, l) }