Fix stream decoder for slow reader

This commit is contained in:
Masaaki Goshima 2021-05-03 16:25:26 +09:00
parent ac159966aa
commit 03e2c96136
3 changed files with 105 additions and 20 deletions

View File

@ -0,0 +1,78 @@
package benchmark
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"testing"
gojson "github.com/goccy/go-json"
)
// Benchmark decoding from a slow io.Reader that never fills the buffer completely
func Benchmark_Decode_SlowReader_EncodingJson(b *testing.B) {
var expected LargePayload
if err := json.Unmarshal(LargeFixture, &expected); err != nil {
b.Fatal(err)
}
for _, chunkSize := range [5]int{16384, 4096, 1024, 256, 64} {
b.Run(fmt.Sprintf("chunksize %v", chunkSize), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
index = 0
var got LargePayload
if err := json.NewDecoder(slowReader{chunkSize: chunkSize}).Decode(&got); err != nil {
b.Fatal(err)
}
if !reflect.DeepEqual(expected, got) {
b.Fatalf("failed to decode. expected:[%+v] but got:[%+v]", expected, got)
}
}
})
}
}
func Benchmark_Decode_SlowReader_GoJson(b *testing.B) {
var expected LargePayload
if err := json.Unmarshal(LargeFixture, &expected); err != nil {
b.Fatal(err)
}
for _, chunkSize := range []int{16384, 4096, 1024, 256, 64} {
b.Run(fmt.Sprintf("chunksize %v", chunkSize), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
index = 0
var got LargePayload
if err := gojson.NewDecoder(slowReader{chunkSize: chunkSize}).Decode(&got); err != nil {
b.Fatal(err)
}
if !reflect.DeepEqual(expected, got) {
b.Fatalf("failed to decode. expected:[%+v] but got:[%+v]", expected, got)
}
}
})
}
}
type slowReader struct {
chunkSize int
}
var index int
func (s slowReader) Read(p []byte) (n int, err error) {
smallBuf := make([]byte, Min(s.chunkSize, len(p)))
x := bytes.NewReader(LargeFixture)
n, err = x.ReadAt(smallBuf, int64(index))
index += n
copy(p, smallBuf)
return
}
func Min(x, y int) int {
if x < y {
return x
}
return y
}

View File

@ -17,6 +17,7 @@ type stream struct {
r io.Reader
offset int64
cursor int64
filledBuffer bool
allRead bool
useNumber bool
disallowUnknownFields bool
@ -52,6 +53,11 @@ func (s *stream) stat() ([]byte, int64, unsafe.Pointer) {
return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data
}
func (s *stream) statForRetry() ([]byte, int64, unsafe.Pointer) {
s.cursor-- // for retry ( because caller progress cursor position in each loop )
return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data
}
func (s *stream) reset() {
s.offset += s.cursor
s.buf = s.buf[s.cursor:]
@ -60,10 +66,12 @@ func (s *stream) reset() {
}
func (s *stream) readBuf() []byte {
if s.filledBuffer {
s.bufSize *= 2
remainBuf := s.buf
s.buf = make([]byte, s.bufSize)
copy(s.buf, remainBuf)
}
return s.buf[s.cursor:]
}
@ -76,6 +84,11 @@ func (s *stream) read() bool {
buf[last] = nul
n, err := s.r.Read(buf[:last])
s.length = s.cursor + int64(n)
if n == last {
s.filledBuffer = true
} else {
s.filledBuffer = false
}
if err == io.EOF {
s.allRead = true
} else if err != nil {
@ -131,8 +144,7 @@ func (s *stream) skipObject(depth int64) error {
if char(p, cursor) == nul {
s.cursor = cursor
if s.read() {
s.cursor-- // for retry current character
_, cursor, p = s.stat()
_, cursor, p = s.statForRetry()
continue
}
return errUnexpectedEndOfJSON("string of object", cursor)
@ -142,8 +154,7 @@ func (s *stream) skipObject(depth int64) error {
case nul:
s.cursor = cursor
if s.read() {
s.cursor-- // for retry current character
_, cursor, p = s.stat()
_, cursor, p = s.statForRetry()
continue
}
return errUnexpectedEndOfJSON("string of object", cursor)
@ -196,8 +207,7 @@ func (s *stream) skipArray(depth int64) error {
if char(p, cursor) == nul {
s.cursor = cursor
if s.read() {
s.cursor-- // for retry current character
_, cursor, p = s.stat()
_, cursor, p = s.statForRetry()
continue
}
return errUnexpectedEndOfJSON("string of object", cursor)
@ -207,8 +217,7 @@ func (s *stream) skipArray(depth int64) error {
case nul:
s.cursor = cursor
if s.read() {
s.cursor-- // for retry current character
_, cursor, p = s.stat()
_, cursor, p = s.statForRetry()
continue
}
return errUnexpectedEndOfJSON("string of object", cursor)
@ -256,8 +265,7 @@ func (s *stream) skipValue(depth int64) error {
if char(p, cursor) == nul {
s.cursor = cursor
if s.read() {
s.cursor-- // for retry current character
_, cursor, p = s.stat()
_, cursor, p = s.statForRetry()
continue
}
return errUnexpectedEndOfJSON("value of string", s.totalOffset())
@ -268,8 +276,7 @@ func (s *stream) skipValue(depth int64) error {
case nul:
s.cursor = cursor
if s.read() {
s.cursor-- // for retry current character
_, cursor, p = s.stat()
_, cursor, p = s.statForRetry()
continue
}
return errUnexpectedEndOfJSON("value of string", s.totalOffset())

View File

@ -360,14 +360,14 @@ func decodeKeyByBitmapUint8Stream(d *structDecoder, s *stream) (*structFieldSet,
if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
}
buf, cursor, p = s.stat()
buf, cursor, p = s.statForRetry()
}
case nul:
s.cursor = cursor
if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
}
buf, cursor, p = s.stat()
buf, cursor, p = s.statForRetry()
}
}
}
@ -457,14 +457,14 @@ func decodeKeyByBitmapUint16Stream(d *structDecoder, s *stream) (*structFieldSet
if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
}
buf, cursor, p = s.stat()
buf, cursor, p = s.statForRetry()
}
case nul:
s.cursor = cursor
if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
}
buf, cursor, p = s.stat()
buf, cursor, p = s.statForRetry()
}
}
}