From f6974ebb5c40a8adf90d2cacab6dc297f4eba4c2 Mon Sep 17 00:00:00 2001 From: ffenix113 Date: Fri, 5 Nov 2021 18:46:21 +0100 Subject: [PATCH] fix: update some argument counts in pre-allocs In some cases number of pre-allocated places in argument array is missing 1 or 2 elements, which results in re-allocation of twice as large array --- bench_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ commands.go | 8 ++++---- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/bench_test.go b/bench_test.go index 708da5d9..5644f50c 100644 --- a/bench_test.go +++ b/bench_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strconv" "strings" "sync" "testing" @@ -233,6 +234,45 @@ func BenchmarkZAdd(b *testing.B) { }) } +func BenchmarkXRead(b *testing.B) { + ctx := context.Background() + client := benchmarkRedisClient(ctx, 10) + defer client.Close() + + args := redis.XAddArgs{ + Stream: "1", + ID: "*", + Values: map[string]string{"uno": "dos"}, + } + + lenStreams := 16 + streams := make([]string, 0, lenStreams) + for i := 0; i < lenStreams; i++ { + streams = append(streams, strconv.Itoa(i)) + } + for i := 0; i < lenStreams; i++ { + streams = append(streams, "0") + } + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + client.XAdd(ctx, &args) + + err := client.XRead(ctx, &redis.XReadArgs{ + Streams: streams, + Count: 1, + Block: time.Second, + }).Err() + if err != nil { + b.Fatal(err) + } + } + }) +} + var clientSink *redis.Client func BenchmarkWithContext(b *testing.B) { diff --git a/commands.go b/commands.go index c76242d7..9196692d 100644 --- a/commands.go +++ b/commands.go @@ -1778,7 +1778,7 @@ type XReadArgs struct { } func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 5+len(a.Streams)) + args := make([]interface{}, 0, 6+len(a.Streams)) args = append(args, "xread") keyPos := int8(1) @@ -1860,7 +1860,7 @@ type XReadGroupArgs struct { } func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 8+len(a.Streams)) + args := make([]interface{}, 0, 10+len(a.Streams)) args = append(args, "xreadgroup", "group", a.Group, a.Consumer) keyPos := int8(4) @@ -1957,7 +1957,7 @@ func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAuto } func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} { - args := make([]interface{}, 0, 9) + args := make([]interface{}, 0, 8) args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start) if a.Count > 0 { args = append(args, "count", a.Count) @@ -1989,7 +1989,7 @@ func (c cmdable) XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCm } func xClaimArgs(a *XClaimArgs) []interface{} { - args := make([]interface{}, 0, 4+len(a.Messages)) + args := make([]interface{}, 0, 5+len(a.Messages)) args = append(args, "xclaim", a.Stream,