diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 176c8b3b..6159a15e 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -66,7 +66,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { buf := make([]byte, len(h264Prefix), bufSize) copy(buf, h264Prefix[:]) writeOut := false -outer: + for { var b byte var err error @@ -75,7 +75,10 @@ outer: if err != io.EOF { return err } - break + if len(buf) != 0 { + return io.ErrUnexpectedEOF + } + return io.EOF } for n := 1; b == 0x0 && n < 4; n++ { @@ -84,7 +87,7 @@ outer: if err != io.EOF { return err } - break outer + return io.ErrUnexpectedEOF } buf = append(buf, b) @@ -109,7 +112,7 @@ outer: if err != io.EOF { return err } - break outer + return io.ErrUnexpectedEOF } buf = append(buf, b) @@ -127,10 +130,4 @@ outer: } } } - if len(buf) == len(h264Prefix) { - return nil - } - <-tick - _, err := dst.Write(buf) - return err } diff --git a/codec/h265/lex.go b/codec/h265/lex.go index 55db6be6..2fb41a9d 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -76,10 +76,14 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { n, err := src.Read(buf) switch err { case nil: // Do nothing. - case io.EOF: - return nil default: - return fmt.Errorf("source read error: %w\n", err) + if err == io.EOF { + if l.buf.Len() == 0 { + return io.EOF + } + return io.ErrUnexpectedEOF + } + return err } // Get payload from RTP packet. @@ -181,7 +185,7 @@ func (l *Lexer) handleFragmentation(d []byte) { } } -// handlePACI will handl PACI packets +// handlePACI will handle PACI packets // // TODO: complete this func (l *Lexer) handlePACI(d []byte) { diff --git a/codec/h265/lex_test.go b/codec/h265/lex_test.go index 1a409e4c..9f725bb9 100644 --- a/codec/h265/lex_test.go +++ b/codec/h265/lex_test.go @@ -247,7 +247,9 @@ func TestLex(t *testing.T) { r := &rtpReader{packets: test.packets} d := &destination{} err := NewLexer(test.donl).Lex(d, r, 0) - if err != nil { + switch err { + case nil, io.EOF: // Do nothing + default: t.Fatalf("error lexing: %v\n", err) } diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go index 46ab4ce8..aea4ce9f 100644 --- a/codec/mjpeg/lex.go +++ b/codec/mjpeg/lex.go @@ -60,12 +60,9 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { buf := make([]byte, 2, 4<<10) n, err := r.Read(buf) if n < 2 { - return nil + return io.ErrUnexpectedEOF } if err != nil { - if err == io.EOF { - return nil - } return err } if !bytes.Equal(buf, []byte{0xff, 0xd8}) { @@ -76,7 +73,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { b, err := r.ReadByte() if err != nil { if err == io.EOF { - return nil + return io.ErrUnexpectedEOF } return err } diff --git a/revid/revid.go b/revid/revid.go index 3ab6c704..8b1b42da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -854,7 +854,13 @@ func (r *Revid) Update(vars map[string]string) error { // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { - r.err <- r.lexTo(r.filters[0], read, delay) - r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") + err := r.lexTo(r.filters[0], read, delay) + r.cfg.Logger.Log(logger.Debug, pkg+"finished lexing") + switch err { + case nil: // Do nothing. + case io.EOF: // TODO: handle this depending on loop mode. + default: + r.err <- err + } r.wg.Done() }