From 03e2c96136f44c66f415da6e12664f9b378e284f Mon Sep 17 00:00:00 2001 From: Masaaki Goshima Date: Mon, 3 May 2021 16:25:26 +0900 Subject: [PATCH] Fix stream decoder for slow reader --- benchmarks/slow_reader_test.go | 78 ++++++++++++++++++++++++++++++++++ decode_stream.go | 39 ++++++++++------- decode_struct.go | 8 ++-- 3 files changed, 105 insertions(+), 20 deletions(-) create mode 100644 benchmarks/slow_reader_test.go diff --git a/benchmarks/slow_reader_test.go b/benchmarks/slow_reader_test.go new file mode 100644 index 0000000..bd9623c --- /dev/null +++ b/benchmarks/slow_reader_test.go @@ -0,0 +1,78 @@ +package benchmark + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" + "testing" + + gojson "github.com/goccy/go-json" +) + +// Benchmark decoding from a slow io.Reader that never fills the buffer completely +func Benchmark_Decode_SlowReader_EncodingJson(b *testing.B) { + var expected LargePayload + if err := json.Unmarshal(LargeFixture, &expected); err != nil { + b.Fatal(err) + } + for _, chunkSize := range [5]int{16384, 4096, 1024, 256, 64} { + b.Run(fmt.Sprintf("chunksize %v", chunkSize), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + index = 0 + var got LargePayload + if err := json.NewDecoder(slowReader{chunkSize: chunkSize}).Decode(&got); err != nil { + b.Fatal(err) + } + if !reflect.DeepEqual(expected, got) { + b.Fatalf("failed to decode. expected:[%+v] but got:[%+v]", expected, got) + } + } + }) + } +} + +func Benchmark_Decode_SlowReader_GoJson(b *testing.B) { + var expected LargePayload + if err := json.Unmarshal(LargeFixture, &expected); err != nil { + b.Fatal(err) + } + for _, chunkSize := range []int{16384, 4096, 1024, 256, 64} { + b.Run(fmt.Sprintf("chunksize %v", chunkSize), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + index = 0 + var got LargePayload + if err := gojson.NewDecoder(slowReader{chunkSize: chunkSize}).Decode(&got); err != nil { + b.Fatal(err) + } + if !reflect.DeepEqual(expected, got) { + b.Fatalf("failed to decode. expected:[%+v] but got:[%+v]", expected, got) + } + } + }) + } +} + +type slowReader struct { + chunkSize int +} + +var index int + +func (s slowReader) Read(p []byte) (n int, err error) { + smallBuf := make([]byte, Min(s.chunkSize, len(p))) + x := bytes.NewReader(LargeFixture) + n, err = x.ReadAt(smallBuf, int64(index)) + index += n + copy(p, smallBuf) + return +} + +func Min(x, y int) int { + if x < y { + return x + } + return y +} diff --git a/decode_stream.go b/decode_stream.go index b1c99f9..d7fc08a 100644 --- a/decode_stream.go +++ b/decode_stream.go @@ -17,6 +17,7 @@ type stream struct { r io.Reader offset int64 cursor int64 + filledBuffer bool allRead bool useNumber bool disallowUnknownFields bool @@ -52,6 +53,11 @@ func (s *stream) stat() ([]byte, int64, unsafe.Pointer) { return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data } +func (s *stream) statForRetry() ([]byte, int64, unsafe.Pointer) { + s.cursor-- // for retry ( because caller progress cursor position in each loop ) + return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data +} + func (s *stream) reset() { s.offset += s.cursor s.buf = s.buf[s.cursor:] @@ -60,10 +66,12 @@ func (s *stream) reset() { } func (s *stream) readBuf() []byte { - s.bufSize *= 2 - remainBuf := s.buf - s.buf = make([]byte, s.bufSize) - copy(s.buf, remainBuf) + if s.filledBuffer { + s.bufSize *= 2 + remainBuf := s.buf + s.buf = make([]byte, s.bufSize) + copy(s.buf, remainBuf) + } return s.buf[s.cursor:] } @@ -76,6 +84,11 @@ func (s *stream) read() bool { buf[last] = nul n, err := s.r.Read(buf[:last]) s.length = s.cursor + int64(n) + if n == last { + s.filledBuffer = true + } else { + s.filledBuffer = false + } if err == io.EOF { s.allRead = true } else if err != nil { @@ -131,8 +144,7 @@ func (s *stream) skipObject(depth int64) error { if char(p, cursor) == nul { s.cursor = cursor if s.read() { - s.cursor-- // for retry current character - _, cursor, p = s.stat() + _, cursor, p = s.statForRetry() continue } return errUnexpectedEndOfJSON("string of object", cursor) @@ -142,8 +154,7 @@ func (s *stream) skipObject(depth int64) error { case nul: s.cursor = cursor if s.read() { - s.cursor-- // for retry current character - _, cursor, p = s.stat() + _, cursor, p = s.statForRetry() continue } return errUnexpectedEndOfJSON("string of object", cursor) @@ -196,8 +207,7 @@ func (s *stream) skipArray(depth int64) error { if char(p, cursor) == nul { s.cursor = cursor if s.read() { - s.cursor-- // for retry current character - _, cursor, p = s.stat() + _, cursor, p = s.statForRetry() continue } return errUnexpectedEndOfJSON("string of object", cursor) @@ -207,8 +217,7 @@ func (s *stream) skipArray(depth int64) error { case nul: s.cursor = cursor if s.read() { - s.cursor-- // for retry current character - _, cursor, p = s.stat() + _, cursor, p = s.statForRetry() continue } return errUnexpectedEndOfJSON("string of object", cursor) @@ -256,8 +265,7 @@ func (s *stream) skipValue(depth int64) error { if char(p, cursor) == nul { s.cursor = cursor if s.read() { - s.cursor-- // for retry current character - _, cursor, p = s.stat() + _, cursor, p = s.statForRetry() continue } return errUnexpectedEndOfJSON("value of string", s.totalOffset()) @@ -268,8 +276,7 @@ func (s *stream) skipValue(depth int64) error { case nul: s.cursor = cursor if s.read() { - s.cursor-- // for retry current character - _, cursor, p = s.stat() + _, cursor, p = s.statForRetry() continue } return errUnexpectedEndOfJSON("value of string", s.totalOffset()) diff --git a/decode_struct.go b/decode_struct.go index 4c79d6f..da32d6f 100644 --- a/decode_struct.go +++ b/decode_struct.go @@ -360,14 +360,14 @@ func decodeKeyByBitmapUint8Stream(d *structDecoder, s *stream) (*structFieldSet, if !s.read() { return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) } - buf, cursor, p = s.stat() + buf, cursor, p = s.statForRetry() } case nul: s.cursor = cursor if !s.read() { return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) } - buf, cursor, p = s.stat() + buf, cursor, p = s.statForRetry() } } } @@ -457,14 +457,14 @@ func decodeKeyByBitmapUint16Stream(d *structDecoder, s *stream) (*structFieldSet if !s.read() { return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) } - buf, cursor, p = s.stat() + buf, cursor, p = s.statForRetry() } case nul: s.cursor = cursor if !s.read() { return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) } - buf, cursor, p = s.stat() + buf, cursor, p = s.statForRetry() } } }