Merge pull request #211 from goccy/feature/fix-stream-decoder

Fix stream decoder when use slow reader
This commit is contained in:
Masaaki Goshima 2021-05-04 02:42:07 +09:00 committed by GitHub
commit 3eb4729dc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 20 deletions

View File

@ -31,6 +31,18 @@ jobs:
GOGC: 1 GOGC: 1
- name: test with race detector - name: test with race detector
run: go test -v -race ./ -count=1 run: go test -v -race ./ -count=1
bench:
name: Benchmark
runs-on: ubuntu-latest
steps:
- name: setup Go
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: checkout
uses: actions/checkout@v2
- name: run benchmark
run: cd benchmarks && go test -bench .
coverage: coverage:
name: Coverage name: Coverage
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -0,0 +1,78 @@
package benchmark
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"testing"
gojson "github.com/goccy/go-json"
)
// Benchmark decoding from a slow io.Reader that never fills the buffer completely
func Benchmark_Decode_SlowReader_EncodingJson(b *testing.B) {
var expected LargePayload
if err := json.Unmarshal(LargeFixture, &expected); err != nil {
b.Fatal(err)
}
for _, chunkSize := range [5]int{16384, 4096, 1024, 256, 64} {
b.Run(fmt.Sprintf("chunksize %v", chunkSize), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
index = 0
var got LargePayload
if err := json.NewDecoder(slowReader{chunkSize: chunkSize}).Decode(&got); err != nil {
b.Fatal(err)
}
if !reflect.DeepEqual(expected, got) {
b.Fatalf("failed to decode. expected:[%+v] but got:[%+v]", expected, got)
}
}
})
}
}
func Benchmark_Decode_SlowReader_GoJson(b *testing.B) {
var expected LargePayload
if err := json.Unmarshal(LargeFixture, &expected); err != nil {
b.Fatal(err)
}
for _, chunkSize := range []int{16384, 4096, 1024, 256, 64} {
b.Run(fmt.Sprintf("chunksize %v", chunkSize), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
index = 0
var got LargePayload
if err := gojson.NewDecoder(slowReader{chunkSize: chunkSize}).Decode(&got); err != nil {
b.Fatal(err)
}
if !reflect.DeepEqual(expected, got) {
b.Fatalf("failed to decode. expected:[%+v] but got:[%+v]", expected, got)
}
}
})
}
}
type slowReader struct {
chunkSize int
}
var index int
func (s slowReader) Read(p []byte) (n int, err error) {
smallBuf := make([]byte, Min(s.chunkSize, len(p)))
x := bytes.NewReader(LargeFixture)
n, err = x.ReadAt(smallBuf, int64(index))
index += n
copy(p, smallBuf)
return
}
func Min(x, y int) int {
if x < y {
return x
}
return y
}

View File

@ -17,6 +17,7 @@ type stream struct {
r io.Reader r io.Reader
offset int64 offset int64
cursor int64 cursor int64
filledBuffer bool
allRead bool allRead bool
useNumber bool useNumber bool
disallowUnknownFields bool disallowUnknownFields bool
@ -52,6 +53,11 @@ func (s *stream) stat() ([]byte, int64, unsafe.Pointer) {
return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data
} }
func (s *stream) statForRetry() ([]byte, int64, unsafe.Pointer) {
s.cursor-- // for retry ( because caller progress cursor position in each loop )
return s.buf, s.cursor, (*sliceHeader)(unsafe.Pointer(&s.buf)).data
}
func (s *stream) reset() { func (s *stream) reset() {
s.offset += s.cursor s.offset += s.cursor
s.buf = s.buf[s.cursor:] s.buf = s.buf[s.cursor:]
@ -60,10 +66,12 @@ func (s *stream) reset() {
} }
func (s *stream) readBuf() []byte { func (s *stream) readBuf() []byte {
if s.filledBuffer {
s.bufSize *= 2 s.bufSize *= 2
remainBuf := s.buf remainBuf := s.buf
s.buf = make([]byte, s.bufSize) s.buf = make([]byte, s.bufSize)
copy(s.buf, remainBuf) copy(s.buf, remainBuf)
}
return s.buf[s.cursor:] return s.buf[s.cursor:]
} }
@ -76,6 +84,11 @@ func (s *stream) read() bool {
buf[last] = nul buf[last] = nul
n, err := s.r.Read(buf[:last]) n, err := s.r.Read(buf[:last])
s.length = s.cursor + int64(n) s.length = s.cursor + int64(n)
if n == last {
s.filledBuffer = true
} else {
s.filledBuffer = false
}
if err == io.EOF { if err == io.EOF {
s.allRead = true s.allRead = true
} else if err != nil { } else if err != nil {
@ -131,8 +144,7 @@ func (s *stream) skipObject(depth int64) error {
if char(p, cursor) == nul { if char(p, cursor) == nul {
s.cursor = cursor s.cursor = cursor
if s.read() { if s.read() {
s.cursor-- // for retry current character _, cursor, p = s.statForRetry()
_, cursor, p = s.stat()
continue continue
} }
return errUnexpectedEndOfJSON("string of object", cursor) return errUnexpectedEndOfJSON("string of object", cursor)
@ -142,8 +154,7 @@ func (s *stream) skipObject(depth int64) error {
case nul: case nul:
s.cursor = cursor s.cursor = cursor
if s.read() { if s.read() {
s.cursor-- // for retry current character _, cursor, p = s.statForRetry()
_, cursor, p = s.stat()
continue continue
} }
return errUnexpectedEndOfJSON("string of object", cursor) return errUnexpectedEndOfJSON("string of object", cursor)
@ -196,8 +207,7 @@ func (s *stream) skipArray(depth int64) error {
if char(p, cursor) == nul { if char(p, cursor) == nul {
s.cursor = cursor s.cursor = cursor
if s.read() { if s.read() {
s.cursor-- // for retry current character _, cursor, p = s.statForRetry()
_, cursor, p = s.stat()
continue continue
} }
return errUnexpectedEndOfJSON("string of object", cursor) return errUnexpectedEndOfJSON("string of object", cursor)
@ -207,8 +217,7 @@ func (s *stream) skipArray(depth int64) error {
case nul: case nul:
s.cursor = cursor s.cursor = cursor
if s.read() { if s.read() {
s.cursor-- // for retry current character _, cursor, p = s.statForRetry()
_, cursor, p = s.stat()
continue continue
} }
return errUnexpectedEndOfJSON("string of object", cursor) return errUnexpectedEndOfJSON("string of object", cursor)
@ -256,8 +265,7 @@ func (s *stream) skipValue(depth int64) error {
if char(p, cursor) == nul { if char(p, cursor) == nul {
s.cursor = cursor s.cursor = cursor
if s.read() { if s.read() {
s.cursor-- // for retry current character _, cursor, p = s.statForRetry()
_, cursor, p = s.stat()
continue continue
} }
return errUnexpectedEndOfJSON("value of string", s.totalOffset()) return errUnexpectedEndOfJSON("value of string", s.totalOffset())
@ -268,8 +276,7 @@ func (s *stream) skipValue(depth int64) error {
case nul: case nul:
s.cursor = cursor s.cursor = cursor
if s.read() { if s.read() {
s.cursor-- // for retry current character _, cursor, p = s.statForRetry()
_, cursor, p = s.stat()
continue continue
} }
return errUnexpectedEndOfJSON("value of string", s.totalOffset()) return errUnexpectedEndOfJSON("value of string", s.totalOffset())

View File

@ -360,14 +360,14 @@ func decodeKeyByBitmapUint8Stream(d *structDecoder, s *stream) (*structFieldSet,
if !s.read() { if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
} }
buf, cursor, p = s.stat() buf, cursor, p = s.statForRetry()
} }
case nul: case nul:
s.cursor = cursor s.cursor = cursor
if !s.read() { if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
} }
buf, cursor, p = s.stat() buf, cursor, p = s.statForRetry()
} }
} }
} }
@ -457,14 +457,14 @@ func decodeKeyByBitmapUint16Stream(d *structDecoder, s *stream) (*structFieldSet
if !s.read() { if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
} }
buf, cursor, p = s.stat() buf, cursor, p = s.statForRetry()
} }
case nul: case nul:
s.cursor = cursor s.cursor = cursor
if !s.read() { if !s.read() {
return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset()) return nil, "", errUnexpectedEndOfJSON("string", s.totalOffset())
} }
buf, cursor, p = s.stat() buf, cursor, p = s.statForRetry()
} }
} }
} }