From 6506a3021fee732935515800b80419c2f966da00 Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Sat, 19 Dec 2020 12:52:49 +1030 Subject: [PATCH] device/file & revid/pipeline.go: add looping functionality to file device and removing from processFrom --- device/file/file.go | 21 +++++++++++++++- revid/pipeline.go | 59 +++++++++++++++++++-------------------------- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/device/file/file.go b/device/file/file.go index 0ec2d928..c78f56e5 100644 --- a/device/file/file.go +++ b/device/file/file.go @@ -82,7 +82,26 @@ func (m *AVFile) Stop() error { // called and Stop has since been called, an error is returned. func (m *AVFile) Read(p []byte) (int, error) { if m.f != nil { - return m.f.Read(p) + n, err := m.f.Read() + if err != nil { + // In the case that we reach end of file but loop is true, we want to + // seek to start and keep reading from there. + if err == io.EOF && m.cfg.Loop { + _, err = m.f.Seek(0,io.SeekStart) + if err != nil { + return 0, fmt.Errorf("could not seek to start of file for input loop: %w",err) + } + + // Now that we've seeked to start, let's try reading again. + n, err = m.f.Read() + if err != nil { + return n, fmt.Errorf("could not read after start seek: %w",err) + } + return n, nil + } + return n, err + } + return n, nil } return 0, errors.New("AV file is closed") } diff --git a/revid/pipeline.go b/revid/pipeline.go index 7d770a42..7cda947d 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -332,41 +332,32 @@ func (r *Revid) setLexer(c uint8, isRTSP bool) error { func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { defer r.wg.Done() - for l := true; l; l = r.cfg.Loop { - err := in.Start() - if err != nil { - r.err <- fmt.Errorf("could not start input device: %w", err) - return - } + err := in.Start() + if err != nil { + r.err <- fmt.Errorf("could not start input device: %w", err) + return + } - // Lex data from input device, in, until finished or an error is encountered. - // For a continuous source e.g. a camera or microphone, we should remain - // in this call indefinitely unless in.Stop() is called and an io.EOF is forced. - r.cfg.Logger.Log(logger.Debug, "lexing") - err = r.lexTo(r.filters[0], in, delay) - switch err { - case nil, io.EOF: - r.cfg.Logger.Log(logger.Info, "end of file") - case io.ErrUnexpectedEOF: - r.cfg.Logger.Log(logger.Info, "unexpected EOF from input") - default: - r.err <- err - } - r.cfg.Logger.Log(logger.Info, "finished reading input") + // Lex data from input device, in, until finished or an error is encountered. + // For a continuous source e.g. a camera or microphone, we should remain + // in this call indefinitely unless in.Stop() is called and an io.EOF is forced. + r.cfg.Logger.Log(logger.Debug, "lexing") + err = r.lexTo(r.filters[0], in, delay) + switch err { + case nil, io.EOF: + r.cfg.Logger.Log(logger.Info, "end of file") + case io.ErrUnexpectedEOF: + r.cfg.Logger.Log(logger.Info, "unexpected EOF from input") + default: + r.err <- err + } + r.cfg.Logger.Log(logger.Info, "finished reading input") - r.cfg.Logger.Log(logger.Debug, "stopping input") - err = in.Stop() - if err != nil { - r.err <- fmt.Errorf("could not stop input source: %w", err) - } else { - r.cfg.Logger.Log(logger.Info, "input stopped") - } - - // If we're looping and we get a stop signal we return. - select { - case <-r.stop: - return - default: - } + r.cfg.Logger.Log(logger.Debug, "stopping input") + err = in.Stop() + if err != nil { + r.err <- fmt.Errorf("could not stop input source: %w", err) + } else { + r.cfg.Logger.Log(logger.Info, "input stopped") } }