Merged in input-loop-in-device (pull request #438)

device/file & revid/pipeline.go: add looping functionality to file device and removing from processFrom

Approved-by: Trek Hopton
This commit is contained in:
Saxon Milton 2020-12-21 02:06:21 +00:00
commit 10ede9dc84
2 changed files with 44 additions and 36 deletions

View File

@ -37,7 +37,7 @@ import (
// AVFile is an implementation of the AVDevice interface for a file containg
// audio or video data.
type AVFile struct {
f io.ReadCloser
f *os.File
cfg config.Config
isRunning bool
}
@ -82,7 +82,24 @@ func (m *AVFile) Stop() error {
// called and Stop has since been called, an error is returned.
func (m *AVFile) Read(p []byte) (int, error) {
if m.f != nil {
return m.f.Read(p)
n, err := m.f.Read(p)
if err != nil {
// In the case that we reach end of file but loop is true, we want to
// seek to start and keep reading from there.
if err == io.EOF && m.cfg.Loop {
_, err = m.f.Seek(0, io.SeekStart)
if err != nil {
return 0, fmt.Errorf("could not seek to start of file for input loop: %w", err)
}
// Now that we've seeked to start, let's try reading again.
n, err = m.f.Read(p)
if err != nil {
return n, fmt.Errorf("could not read after start seek: %w", err)
}
}
}
return n, err
}
return 0, errors.New("AV file is closed")
}

View File

@ -332,41 +332,32 @@ func (r *Revid) setLexer(c uint8, isRTSP bool) error {
func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
defer r.wg.Done()
for l := true; l; l = r.cfg.Loop {
err := in.Start()
if err != nil {
r.err <- fmt.Errorf("could not start input device: %w", err)
return
}
err := in.Start()
if err != nil {
r.err <- fmt.Errorf("could not start input device: %w", err)
return
}
// Lex data from input device, in, until finished or an error is encountered.
// For a continuous source e.g. a camera or microphone, we should remain
// in this call indefinitely unless in.Stop() is called and an io.EOF is forced.
r.cfg.Logger.Log(logger.Debug, "lexing")
err = r.lexTo(r.filters[0], in, delay)
switch err {
case nil, io.EOF:
r.cfg.Logger.Log(logger.Info, "end of file")
case io.ErrUnexpectedEOF:
r.cfg.Logger.Log(logger.Info, "unexpected EOF from input")
default:
r.err <- err
}
r.cfg.Logger.Log(logger.Info, "finished reading input")
// Lex data from input device, in, until finished or an error is encountered.
// For a continuous source e.g. a camera or microphone, we should remain
// in this call indefinitely unless in.Stop() is called and an io.EOF is forced.
r.cfg.Logger.Log(logger.Debug, "lexing")
err = r.lexTo(r.filters[0], in, delay)
switch err {
case nil, io.EOF:
r.cfg.Logger.Log(logger.Info, "end of file")
case io.ErrUnexpectedEOF:
r.cfg.Logger.Log(logger.Info, "unexpected EOF from input")
default:
r.err <- err
}
r.cfg.Logger.Log(logger.Info, "finished reading input")
r.cfg.Logger.Log(logger.Debug, "stopping input")
err = in.Stop()
if err != nil {
r.err <- fmt.Errorf("could not stop input source: %w", err)
} else {
r.cfg.Logger.Log(logger.Info, "input stopped")
}
// If we're looping and we get a stop signal we return.
select {
case <-r.stop:
return
default:
}
r.cfg.Logger.Log(logger.Debug, "stopping input")
err = in.Stop()
if err != nil {
r.err <- fmt.Errorf("could not stop input source: %w", err)
} else {
r.cfg.Logger.Log(logger.Info, "input stopped")
}
}