From a26777b697df9de84619ebbb639ae3f4b579cfbc Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 17 Jan 2020 16:02:51 +1030 Subject: [PATCH 1/9] codec/h264&h265/lex.go: returning errors even if io.EOF, and not bothering to return what's currently in buffer if error --- codec/h264/lex.go | 24 +++++------------------- codec/h265/lex.go | 2 -- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 176c8b3b..1f169afe 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -31,6 +31,7 @@ LICENSE package h264 import ( + "fmt" "io" "time" @@ -66,25 +67,19 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { buf := make([]byte, len(h264Prefix), bufSize) copy(buf, h264Prefix[:]) writeOut := false -outer: + for { var b byte var err error buf, b, err = c.ScanUntil(buf, 0x00) if err != nil { - if err != io.EOF { - return err - } - break + return fmt.Errorf("can't scan until: %w", err) } for n := 1; b == 0x0 && n < 4; n++ { b, err = c.ReadByte() if err != nil { - if err != io.EOF { - return err - } - break outer + return fmt.Errorf("can't read byte: %w", err) } buf = append(buf, b) @@ -106,10 +101,7 @@ outer: b, err = c.ReadByte() if err != nil { - if err != io.EOF { - return err - } - break outer + return fmt.Errorf("can't read byte: %w", err) } buf = append(buf, b) @@ -127,10 +119,4 @@ outer: } } } - if len(buf) == len(h264Prefix) { - return nil - } - <-tick - _, err := dst.Write(buf) - return err } diff --git a/codec/h265/lex.go b/codec/h265/lex.go index 55db6be6..6534e571 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -76,8 +76,6 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { n, err := src.Read(buf) switch err { case nil: // Do nothing. - case io.EOF: - return nil default: return fmt.Errorf("source read error: %w\n", err) } From e45f51a31f912925e6991213ebe937d840345717 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 17 Jan 2020 16:07:37 +1030 Subject: [PATCH 2/9] codec/mjpeg/lex.go: return error if io.EOF is encountered --- codec/mjpeg/lex.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go index 46ab4ce8..d8b671b2 100644 --- a/codec/mjpeg/lex.go +++ b/codec/mjpeg/lex.go @@ -63,10 +63,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { return nil } if err != nil { - if err == io.EOF { - return nil - } - return err + return fmt.Errorf("can't read source: %w", err) } if !bytes.Equal(buf, []byte{0xff, 0xd8}) { return fmt.Errorf("parser: not MJPEG frame start: %#v", buf) @@ -75,10 +72,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { for { b, err := r.ReadByte() if err != nil { - if err == io.EOF { - return nil - } - return err + return fmt.Errorf("can't read byte: %w", err) } buf = append(buf, b) if last == 0xff && b == 0xd9 { From 399ecb88746f84955f03c3afe67d08a8a7e18f58 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 17 Jan 2020 16:23:28 +1030 Subject: [PATCH 3/9] revid: do more thinking about errors in processFrom --- revid/revid.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index a05c9650..03f9bd05 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -847,7 +847,12 @@ func (r *Revid) Update(vars map[string]string) error { // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { - r.err <- r.lexTo(r.filters[0], read, delay) - r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") + err := r.lexTo(r.filters[0], read, delay) + switch { + case err == nil: // Do nothing. + case errors.Is(err, io.EOF): // TODO: handle this depending on loop mode. + default: + r.err <- err + } r.wg.Done() } From fdf393566a61fdfa6a6f64aadf290e77e3c6b9f4 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 17 Jan 2020 16:41:12 +1030 Subject: [PATCH 4/9] codec/h265/lex_test.go: adapted test for changes to error lex can return --- codec/h265/lex_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/codec/h265/lex_test.go b/codec/h265/lex_test.go index 1a409e4c..4ed57c12 100644 --- a/codec/h265/lex_test.go +++ b/codec/h265/lex_test.go @@ -28,6 +28,7 @@ LICENSE package h265 import ( + "errors" "io" "testing" ) @@ -247,7 +248,9 @@ func TestLex(t *testing.T) { r := &rtpReader{packets: test.packets} d := &destination{} err := NewLexer(test.donl).Lex(d, r, 0) - if err != nil { + switch { + case err == nil || errors.Is(err, io.EOF): // Do nothing + default: t.Fatalf("error lexing: %v\n", err) } From f64c986efc1096ca08515fab6a29ec2ade46dfca Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 20 Jan 2020 15:03:49 +1030 Subject: [PATCH 5/9] codec: corrected error handling in lexers --- codec/h264/lex.go | 21 +++++++++++++++++---- codec/h265/lex.go | 10 ++++++++-- codec/mjpeg/lex.go | 9 ++++++--- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 1f169afe..3d9fb8a4 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -31,7 +31,6 @@ LICENSE package h264 import ( - "fmt" "io" "time" @@ -68,18 +67,25 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { copy(buf, h264Prefix[:]) writeOut := false +outer: for { var b byte var err error buf, b, err = c.ScanUntil(buf, 0x00) if err != nil { - return fmt.Errorf("can't scan until: %w", err) + if err != io.EOF { + return err + } + break } for n := 1; b == 0x0 && n < 4; n++ { b, err = c.ReadByte() if err != nil { - return fmt.Errorf("can't read byte: %w", err) + if err != io.EOF { + return err + } + break outer } buf = append(buf, b) @@ -101,7 +107,10 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { b, err = c.ReadByte() if err != nil { - return fmt.Errorf("can't read byte: %w", err) + if err != io.EOF { + return err + } + break outer } buf = append(buf, b) @@ -119,4 +128,8 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { } } } + if len(buf) == len(h264Prefix) { + return io.EOF + } + return io.ErrUnexpectedEOF } diff --git a/codec/h265/lex.go b/codec/h265/lex.go index 6534e571..2fb41a9d 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -77,7 +77,13 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { switch err { case nil: // Do nothing. default: - return fmt.Errorf("source read error: %w\n", err) + if err == io.EOF { + if l.buf.Len() == 0 { + return io.EOF + } + return io.ErrUnexpectedEOF + } + return err } // Get payload from RTP packet. @@ -179,7 +185,7 @@ func (l *Lexer) handleFragmentation(d []byte) { } } -// handlePACI will handl PACI packets +// handlePACI will handle PACI packets // // TODO: complete this func (l *Lexer) handlePACI(d []byte) { diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go index d8b671b2..aea4ce9f 100644 --- a/codec/mjpeg/lex.go +++ b/codec/mjpeg/lex.go @@ -60,10 +60,10 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { buf := make([]byte, 2, 4<<10) n, err := r.Read(buf) if n < 2 { - return nil + return io.ErrUnexpectedEOF } if err != nil { - return fmt.Errorf("can't read source: %w", err) + return err } if !bytes.Equal(buf, []byte{0xff, 0xd8}) { return fmt.Errorf("parser: not MJPEG frame start: %#v", buf) @@ -72,7 +72,10 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { for { b, err := r.ReadByte() if err != nil { - return fmt.Errorf("can't read byte: %w", err) + if err == io.EOF { + return io.ErrUnexpectedEOF + } + return err } buf = append(buf, b) if last == 0xff && b == 0xd9 { From 2d2e1b0ad143b9cea6d5197f82382e71f71b0061 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 20 Jan 2020 16:10:10 +1030 Subject: [PATCH 6/9] revid/revid.go: added logger message back --- revid/revid.go | 1 + 1 file changed, 1 insertion(+) diff --git a/revid/revid.go b/revid/revid.go index 03f9bd05..d563cc8c 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -848,6 +848,7 @@ func (r *Revid) Update(vars map[string]string) error { // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { err := r.lexTo(r.filters[0], read, delay) + r.cfg.Logger.Log(logger.Debug, pkg+"finished lexing") switch { case err == nil: // Do nothing. case errors.Is(err, io.EOF): // TODO: handle this depending on loop mode. From bf289221deb159f4537c5fa900b0a81041178a63 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 22 Jan 2020 13:08:02 +1030 Subject: [PATCH 7/9] codec/h264/lex.go: simplified return logic --- codec/h264/lex.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 3d9fb8a4..633d3f53 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -67,16 +67,12 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { copy(buf, h264Prefix[:]) writeOut := false -outer: for { var b byte var err error buf, b, err = c.ScanUntil(buf, 0x00) if err != nil { - if err != io.EOF { - return err - } - break + return err } for n := 1; b == 0x0 && n < 4; n++ { @@ -85,7 +81,7 @@ outer: if err != io.EOF { return err } - break outer + return io.ErrUnexpectedEOF } buf = append(buf, b) @@ -110,7 +106,7 @@ outer: if err != io.EOF { return err } - break outer + return io.ErrUnexpectedEOF } buf = append(buf, b) @@ -128,8 +124,4 @@ outer: } } } - if len(buf) == len(h264Prefix) { - return io.EOF - } - return io.ErrUnexpectedEOF } From 78484b5f5bc93b6ea2f554c371872eed16236d6d Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 22 Jan 2020 13:43:08 +1030 Subject: [PATCH 8/9] codec/h264/lex.go: fixed handling of errors from ScanUntil --- codec/h264/lex.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 633d3f53..6159a15e 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -72,7 +72,13 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { var err error buf, b, err = c.ScanUntil(buf, 0x00) if err != nil { - return err + if err != io.EOF { + return err + } + if len(buf) != 0 { + return io.ErrUnexpectedEOF + } + return io.EOF } for n := 1; b == 0x0 && n < 4; n++ { From 9b8667e56ca21f7d29ca57bb127eb6e13cdfb336 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 22 Jan 2020 13:45:01 +1030 Subject: [PATCH 9/9] fixed handling of errors from lex methods --- codec/h265/lex_test.go | 5 ++--- revid/revid.go | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/codec/h265/lex_test.go b/codec/h265/lex_test.go index 4ed57c12..9f725bb9 100644 --- a/codec/h265/lex_test.go +++ b/codec/h265/lex_test.go @@ -28,7 +28,6 @@ LICENSE package h265 import ( - "errors" "io" "testing" ) @@ -248,8 +247,8 @@ func TestLex(t *testing.T) { r := &rtpReader{packets: test.packets} d := &destination{} err := NewLexer(test.donl).Lex(d, r, 0) - switch { - case err == nil || errors.Is(err, io.EOF): // Do nothing + switch err { + case nil, io.EOF: // Do nothing default: t.Fatalf("error lexing: %v\n", err) } diff --git a/revid/revid.go b/revid/revid.go index d563cc8c..aa2139fb 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -849,9 +849,9 @@ func (r *Revid) Update(vars map[string]string) error { func (r *Revid) processFrom(read io.Reader, delay time.Duration) { err := r.lexTo(r.filters[0], read, delay) r.cfg.Logger.Log(logger.Debug, pkg+"finished lexing") - switch { - case err == nil: // Do nothing. - case errors.Is(err, io.EOF): // TODO: handle this depending on loop mode. + switch err { + case nil: // Do nothing. + case io.EOF: // TODO: handle this depending on loop mode. default: r.err <- err }