mirror of https://bitbucket.org/ausocean/av.git
device/file & revid/pipeline.go: add looping functionality to file device and removing from processFrom
This commit is contained in:
parent
528733c559
commit
6506a3021f
|
@ -82,7 +82,26 @@ func (m *AVFile) Stop() error {
|
||||||
// called and Stop has since been called, an error is returned.
|
// called and Stop has since been called, an error is returned.
|
||||||
func (m *AVFile) Read(p []byte) (int, error) {
|
func (m *AVFile) Read(p []byte) (int, error) {
|
||||||
if m.f != nil {
|
if m.f != nil {
|
||||||
return m.f.Read(p)
|
n, err := m.f.Read()
|
||||||
|
if err != nil {
|
||||||
|
// In the case that we reach end of file but loop is true, we want to
|
||||||
|
// seek to start and keep reading from there.
|
||||||
|
if err == io.EOF && m.cfg.Loop {
|
||||||
|
_, err = m.f.Seek(0,io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("could not seek to start of file for input loop: %w",err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we've seeked to start, let's try reading again.
|
||||||
|
n, err = m.f.Read()
|
||||||
|
if err != nil {
|
||||||
|
return n, fmt.Errorf("could not read after start seek: %w",err)
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
}
|
}
|
||||||
return 0, errors.New("AV file is closed")
|
return 0, errors.New("AV file is closed")
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,41 +332,32 @@ func (r *Revid) setLexer(c uint8, isRTSP bool) error {
|
||||||
func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
|
func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
|
|
||||||
for l := true; l; l = r.cfg.Loop {
|
err := in.Start()
|
||||||
err := in.Start()
|
if err != nil {
|
||||||
if err != nil {
|
r.err <- fmt.Errorf("could not start input device: %w", err)
|
||||||
r.err <- fmt.Errorf("could not start input device: %w", err)
|
return
|
||||||
return
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Lex data from input device, in, until finished or an error is encountered.
|
// Lex data from input device, in, until finished or an error is encountered.
|
||||||
// For a continuous source e.g. a camera or microphone, we should remain
|
// For a continuous source e.g. a camera or microphone, we should remain
|
||||||
// in this call indefinitely unless in.Stop() is called and an io.EOF is forced.
|
// in this call indefinitely unless in.Stop() is called and an io.EOF is forced.
|
||||||
r.cfg.Logger.Log(logger.Debug, "lexing")
|
r.cfg.Logger.Log(logger.Debug, "lexing")
|
||||||
err = r.lexTo(r.filters[0], in, delay)
|
err = r.lexTo(r.filters[0], in, delay)
|
||||||
switch err {
|
switch err {
|
||||||
case nil, io.EOF:
|
case nil, io.EOF:
|
||||||
r.cfg.Logger.Log(logger.Info, "end of file")
|
r.cfg.Logger.Log(logger.Info, "end of file")
|
||||||
case io.ErrUnexpectedEOF:
|
case io.ErrUnexpectedEOF:
|
||||||
r.cfg.Logger.Log(logger.Info, "unexpected EOF from input")
|
r.cfg.Logger.Log(logger.Info, "unexpected EOF from input")
|
||||||
default:
|
default:
|
||||||
r.err <- err
|
r.err <- err
|
||||||
}
|
}
|
||||||
r.cfg.Logger.Log(logger.Info, "finished reading input")
|
r.cfg.Logger.Log(logger.Info, "finished reading input")
|
||||||
|
|
||||||
r.cfg.Logger.Log(logger.Debug, "stopping input")
|
r.cfg.Logger.Log(logger.Debug, "stopping input")
|
||||||
err = in.Stop()
|
err = in.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.err <- fmt.Errorf("could not stop input source: %w", err)
|
r.err <- fmt.Errorf("could not stop input source: %w", err)
|
||||||
} else {
|
} else {
|
||||||
r.cfg.Logger.Log(logger.Info, "input stopped")
|
r.cfg.Logger.Log(logger.Info, "input stopped")
|
||||||
}
|
|
||||||
|
|
||||||
// If we're looping and we get a stop signal we return.
|
|
||||||
select {
|
|
||||||
case <-r.stop:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue