From f3cf74ce513cb482ed5984462c563e0ef1e340a6 Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Sat, 19 Nov 2022 15:35:39 +1030 Subject: [PATCH] cmd/vidforward: add slate image write functionality This includes adding facilities to handle termination signals and err handling. Some modifications have been made to the file input to accomodate the concurrency requirements. The slate read mechanism is still fairly rudimentary and can only read from a hardcoded file, but at this stage is for prototyping purposes. --- cmd/vidforward/main.go | 179 +++++++++++++++++++++++++++++++++++++---- codec/codecutil/lex.go | 1 + device/file/file.go | 72 +++++++++++------ go.mod | 2 - 4 files changed, 214 insertions(+), 40 deletions(-) diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 994338c6..cd7df336 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -36,9 +36,12 @@ import ( "net/http" "strconv" "sync" + "time" "bitbucket.org/ausocean/av/codec/codecutil" + "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netlogger" @@ -54,14 +57,18 @@ const ( // Logging configuration. const ( - logPath = "/var/log/netsender/netsender.log" + logPath = "/var/log/vidforward/vidforward.log" logMaxSize = 500 // MB logMaxBackup = 10 logMaxAge = 28 // days logVerbosity = logging.Info - logSuppress = false + logSuppress = true ) +// recvErrorDelay is a delay used when there's recv issues. It is intended to +// prevent spamming from a single client. +const recvErrorDelay = 7 * time.Second + type MAC string // Broadcast is representative of a broadcast to be forwarded. @@ -78,9 +85,15 @@ type Broadcast struct { // and a recv handler which is invoked when a camera wishes to get its video // forwarded to youtube. type broadcastManager struct { - broadcasts map[MAC]Broadcast - log logging.Logger - mu sync.Mutex + broadcasts map[MAC]Broadcast + slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. + log logging.Logger + mu sync.Mutex +} + +// newBroadcastManager returns a new broadcastManager with the provided logger. +func newBroadcastManager(l logging.Logger) *broadcastManager { + return &broadcastManager{log: l, broadcasts: make(map[MAC]Broadcast), slateExitSignals: make(map[MAC]chan struct{})} } // recvHandler handles recv requests for video forwarding. The MAC is firstly @@ -93,9 +106,16 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() ma := MAC(q.Get("ma")) - _, ok := m.broadcasts[ma] - if !ok { + if !m.isActive(ma) { m.errorLogWrite(w, "forward request mac is not mapped, doing nothing", "mac", ma) + time.Sleep(recvErrorDelay) + return + } + + // We can't receive video if we're in slate mode. + if m.getStatus(ma) == "slate" { + m.errorLogWrite(w, "cannot receive video for this mac, status is slate", "mac", ma) + time.Sleep(recvErrorDelay) return } @@ -137,7 +157,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { for i, frame := range h264Clip.Frames() { _, err := rv.Write(frame.Media) if err != nil { - m.errorLogWrite(w, "could not write frame", "no.", i) + m.errorLogWrite(w, "could not write frame", "no.", i, "error", err) return } } @@ -153,6 +173,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { // control handles control API requests. func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) { + m.log.Info("control request", "method", r.Method) switch r.Method { case http.MethodPut: m.processRequest(w, r, m.createOrUpdate) @@ -204,6 +225,25 @@ func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid { return v.RV } +// getStatus gets the broadcast's status corresponding to the provided MAC. +func (m *broadcastManager) getStatus(ma MAC) string { + m.mu.Lock() + defer m.mu.Unlock() + v, ok := m.broadcasts[ma] + if !ok { + return "" + } + return v.Status +} + +// isActive returns true if a MAC is registered to the broadcast manager. +func (m *broadcastManager) isActive(ma MAC) bool { + m.mu.Lock() + defer m.mu.Unlock() + _, ok := m.broadcasts[ma] + return ok +} + // createOrUpdate creates or updates a Broadcast record. The revid pipeline // corresponding to the broadcast MAC is firsty configured/re-configured, and // the pipeline is "started", which will ready it for receiving video on its @@ -234,15 +274,126 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { return fmt.Errorf("could not start revid pipeline: %w", err) } - if broadcast.Status == "slate" { - go m.writeSlate(broadcast.MAC, broadcast.RV) + switch broadcast.Status { + case "create": + fallthrough + case "play": + signal, ok := m.slateExitSignals[broadcast.MAC] + if ok { + close(signal) + delete(m.slateExitSignals, broadcast.MAC) + } + case "slate": + m.log.Debug("slate request") + // If there's a signal channel it means that we're already writing the slate + // image and theres nothing to do, so return. + _, ok := m.slateExitSignals[broadcast.MAC] + if ok { + m.log.Warning("already writing slate") + return nil + } + + // First create a signal that can be used to stop the slate writing routine. + // This will be provided to the writeSlate routine below. + signalCh := make(chan struct{}) + m.slateExitSignals[broadcast.MAC] = signalCh + + // Also create an errCh that will be used to communicate errors from the + // writeSlate routine. + errCh := make(chan error) + + go m.writeSlate(broadcast.RV, errCh, signalCh) + + // We'll watch out for any errors that happen within a 5 second window. This + // will indicate something seriously wrong with init, like a missing file etc. + const startupWindowDuration = 5 * time.Second + startupWindow := time.NewTimer(startupWindowDuration) + select { + + // If this triggers first, we're all good. + case <-startupWindow.C: + m.log.Debug("out of error window") + + // We consider any errors after this either to be normal i.e. as a result + // of stopping the slate input, or something that can not be handled, and + // only logged, therefore we can close the error channel errCh now. + // This will also let the routine know that errors can no longer be sent + // down errCh. + close(errCh) + + // This means we got a slate error pretty early and need to let caller know. + case err := <-errCh: + return fmt.Errorf("could not write slate image: %w", err) + } + default: + return fmt.Errorf("unknown status string: %s", broadcast.Status) } return nil } -// This is just a stub. Eventually this will handle writing of a slate image -// to the destination RTMP URL. -func (m *broadcastManager) writeSlate(ma MAC, rv *revid.Revid) {} +// writeSlate is a routine that employs a file input device and h264 lexer to +// write a h264 encoded slate image to the provided revid pipeline. +func (m *broadcastManager) writeSlate(rv *revid.Revid, errCh chan error, exitSignal chan struct{}) { + m.log.Info("writing slate") + const ( + // This is temporary and will eventually be part of a broadcast configuration + // where the remote vidforward API user can provide the slate image. + slateFileName = "slate.h264" + + // Assume 25fps until this becomes configurable. + slateFrameRate = 25 + + loopSetting = true + frameDelay = time.Second / slateFrameRate + ) + + fileInput := file.NewWith(m.log, slateFileName, loopSetting) + err := fileInput.Start() + if err != nil { + errCh <- err + return + } + + // This will wait for a signal from the provided slateExitSignal (or from a + // timeout) to stop writing the slate by "Stopping" the file input which will + // terminate the Lex function. + go func() { + slateTimeoutTimer := time.NewTimer(24 * time.Hour) + select { + case <-slateTimeoutTimer.C: + m.log.Warning("slate timeout") + case <-exitSignal: + m.log.Info("slate exist signal") + } + m.log.Info("stopping file input") + fileInput.Stop() + }() + + // Begin lexing the slate file and send frames to rv pipeline. We'll stay in + // here until file input closes or there's an unexpected error. + err = h264.Lex(rv, fileInput, frameDelay) + + // If we get to this point, it means that the we've finished lexing for some + // reason; let's figure out why. + select { + // The only reason we'd get a receive on errCh from this side is if its been + // closed. This means that we've exceeded the "startup error" period, and that + // either the error is normal from stopping the input, or we can no longer inform + // the caller and just need to log the problem. + case <-errCh: + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + m.log.Debug("got expected error", "error", err) + return + } + m.log.Error("got unexpected error", "error", err) + + // This means that a problem occured pretty early in lexing. + default: + m.log.Error("unexpected error during lex startup", "error", err) + errCh <- err + } + m.log.Error("finished writing slate") +} // delete removes a broadcast from the record. func (m *broadcastManager) delete(broadcast Broadcast) error { @@ -281,7 +432,7 @@ func main() { // lumberjack and netloggers. log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress) - bm := &broadcastManager{log: log, broadcasts: map[MAC]Broadcast{}} + bm := newBroadcastManager(log) http.HandleFunc("/recv", bm.recv) http.HandleFunc("/control", bm.control) http.ListenAndServe(*host+":"+*port, nil) diff --git a/codec/codecutil/lex.go b/codec/codecutil/lex.go index b8a1ce7d..9772bd22 100644 --- a/codec/codecutil/lex.go +++ b/codec/codecutil/lex.go @@ -174,6 +174,7 @@ func newRingBuffer(sz, cap int, timeout time.Duration) *ringBuffer { buf: make([][]byte, cap), n: cap, ch: make(chan []byte, cap), + timeout: timeout, } for i := range rb.buf { rb.buf[i] = make([]byte, sz) diff --git a/device/file/file.go b/device/file/file.go index bb211782..0e3d0f01 100644 --- a/device/file/file.go +++ b/device/file/file.go @@ -30,6 +30,7 @@ import ( "fmt" "io" "os" + "sync" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/utils/logging" @@ -39,14 +40,23 @@ import ( // audio or video data. type AVFile struct { f *os.File - cfg config.Config + path string + loop bool isRunning bool log logging.Logger + set bool + mu sync.Mutex } // NewAVFile returns a new AVFile. func New(l logging.Logger) *AVFile { return &AVFile{log: l} } +// NewWith returns a new AVFile with required params provided i.e. the Set +// method does not need to be called. +func NewWith(l logging.Logger, path string, loop bool) *AVFile { + return &AVFile{log: l, path: path, loop: loop, set: true} +} + // Name returns the name of the device. func (m *AVFile) Name() string { return "File" @@ -54,15 +64,22 @@ func (m *AVFile) Name() string { // Set simply sets the AVFile's config to the passed config. func (m *AVFile) Set(c config.Config) error { - m.cfg = c + m.path = c.InputPath + m.loop = c.Loop + m.set = true return nil } // Start will open the file at the location of the InputPath field of the // config struct. func (m *AVFile) Start() error { + m.mu.Lock() + defer m.mu.Unlock() var err error - m.f, err = os.Open(m.cfg.InputPath) + if !m.set { + return errors.New("AVFile has not been set with config") + } + m.f, err = os.Open(m.path) if err != nil { return fmt.Errorf("could not open media file: %w", err) } @@ -72,6 +89,8 @@ func (m *AVFile) Start() error { // Stop will close the file such that any further reads will fail. func (m *AVFile) Stop() error { + m.mu.Lock() + defer m.mu.Unlock() err := m.f.Close() if err == nil { m.isRunning = false @@ -83,33 +102,38 @@ func (m *AVFile) Stop() error { // Read implements io.Reader. If start has not been called, or Start has been // called and Stop has since been called, an error is returned. func (m *AVFile) Read(p []byte) (int, error) { - if m.f != nil { - n, err := m.f.Read(p) - if err != nil && err != io.EOF { - return n, err - } + m.mu.Lock() + defer m.mu.Unlock() + if m.f == nil { + return 0, errors.New("AV file is closed, AVFile not started") + } - if (n < len(p) || err == io.EOF) && m.cfg.Loop { - m.log.Info("looping input file") - // In the case that we reach end of file but loop is true, we want to - // seek to start and keep reading from there. - _, err = m.f.Seek(0, io.SeekStart) - if err != nil { - return 0, fmt.Errorf("could not seek to start of file for input loop: %w", err) - } - - // Now that we've seeked to start, let's try reading again. - n, err = m.f.Read(p) - if err != nil { - return n, fmt.Errorf("could not read after start seek: %w", err) - } - } + n, err := m.f.Read(p) + if err != nil && err != io.EOF { return n, err } - return 0, errors.New("AV file is closed") + + if (n < len(p) || err == io.EOF) && m.loop { + m.log.Info("looping input file") + // In the case that we reach end of file but loop is true, we want to + // seek to start and keep reading from there. + _, err = m.f.Seek(0, io.SeekStart) + if err != nil { + return 0, fmt.Errorf("could not seek to start of file for input loop: %w", err) + } + + // Now that we've seeked to start, let's try reading again. + n, err = m.f.Read(p) + if err != nil { + return n, fmt.Errorf("could not read after start seek: %w", err) + } + } + return n, err } // IsRunning is used to determine if the AVFile device is running. func (m *AVFile) IsRunning() bool { + m.mu.Lock() + defer m.mu.Unlock() return m.f != nil && m.isRunning } diff --git a/go.mod b/go.mod index 85f7d39d..7addbb62 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module bitbucket.org/ausocean/av go 1.16 -replace bitbucket.org/ausocean/utils v1.3.2 => ../utils - require ( bitbucket.org/ausocean/iot v1.3.3 bitbucket.org/ausocean/utils v1.3.2