diff --git a/benchmark_decode_test.go b/benchmark_decode_test.go new file mode 100644 index 0000000..dc4c9b4 --- /dev/null +++ b/benchmark_decode_test.go @@ -0,0 +1,317 @@ +package redis + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8/internal/proto" + "net" + "testing" + "time" +) + +var ctx = context.TODO() + +type ConnStub struct { + buff []byte + idx int +} + +func (c *ConnStub) Read(b []byte) (n int, err error) { + n = copy(b, c.buff[c.idx:]) + if len(c.buff) > c.idx+n { + c.idx += n + } else { + c.idx = 0 + } + return n, nil +} +func (c *ConnStub) Write(b []byte) (n int, err error) { return len(b), nil } +func (c *ConnStub) Close() error { return nil } +func (c *ConnStub) LocalAddr() net.Addr { return nil } +func (c *ConnStub) RemoteAddr() net.Addr { return nil } +func (c *ConnStub) SetDeadline(_ time.Time) error { return nil } +func (c *ConnStub) SetReadDeadline(_ time.Time) error { return nil } +func (c *ConnStub) SetWriteDeadline(_ time.Time) error { return nil } + +type ClientStub struct { + Cmdable + name string + buff []byte + conn []*ConnStub +} + +func (stub *ClientStub) SetResponse(b []byte) { + stub.buff = make([]byte, len(b)) + copy(stub.buff, b) + + for _, c := range stub.conn { + c.buff = make([]byte, len(b)) + copy(c.buff, b) + } +} + +func (stub *ClientStub) AppendConn(c *ConnStub) { + if len(stub.buff) > 0 { + c.buff = make([]byte, len(stub.buff)) + copy(c.buff, stub.buff) + } + stub.conn = append(stub.conn, c) +} + +func (stub *ClientStub) Name() string { + return stub.name +} + +func NewDecodeClient() *ClientStub { + stub := &ClientStub{ + name: "Client", + } + stub.Cmdable = NewClient(&Options{ + PoolSize: 128, + Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { + conn := &ConnStub{} + stub.AppendConn(conn) + return conn, nil + }, + }) + return stub +} + +func NewDecodeClusterClient() *ClientStub { + stub := &ClientStub{ + name: "Cluster", + } + client := NewClusterClient(&ClusterOptions{ + PoolSize: 128, + Addrs: []string{"127.0.0.1:6379"}, + Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { + c := &ConnStub{} + stub.AppendConn(c) + return c, nil + }, + ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) { + return []ClusterSlot{ + { + Start: 0, + End: 16383, + Nodes: []ClusterNode{{Addr: "127.0.0.1:6379"}}, + }, + }, nil + }, + }) + // init command. + tmpClient := NewClient(&Options{Addr: ":6379"}) + cmdCache, err := tmpClient.Command(ctx).Result() + _ = tmpClient.Close() + client.cmdsInfoCache = newCmdsInfoCache(func(_ context.Context) (map[string]*CommandInfo, error) { + return cmdCache, err + }) + + stub.Cmdable = client + return stub +} + +func BenchmarkDecode(b *testing.B) { + stubs := []*ClientStub{ + NewDecodeClient(), + NewDecodeClusterClient(), + } + + for _, stub := range stubs { + b.Run(fmt.Sprintf("RespError-%s", stub.Name()), func(b *testing.B) { + respError(b, stub) + }) + b.Run(fmt.Sprintf("RespStatus-%s", stub.Name()), func(b *testing.B) { + respStatus(b, stub) + }) + b.Run(fmt.Sprintf("RespInt-%s", stub.Name()), func(b *testing.B) { + respInt(b, stub) + }) + b.Run(fmt.Sprintf("RespString-%s", stub.Name()), func(b *testing.B) { + respString(b, stub) + }) + b.Run(fmt.Sprintf("RespArray-%s", stub.Name()), func(b *testing.B) { + respArray(b, stub) + }) + b.Run(fmt.Sprintf("RespPipeline-%s", stub.Name()), func(b *testing.B) { + respPipeline(b, stub) + }) + b.Run(fmt.Sprintf("RespTxPipeline-%s", stub.Name()), func(b *testing.B) { + respTxPipeline(b, stub) + }) + + // goroutine + b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=5", stub.Name()), func(b *testing.B) { + dynamicGoroutine(b, stub, 5) + }) + b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=20", stub.Name()), func(b *testing.B) { + dynamicGoroutine(b, stub, 20) + }) + b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=50", stub.Name()), func(b *testing.B) { + dynamicGoroutine(b, stub, 50) + }) + b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=100", stub.Name()), func(b *testing.B) { + dynamicGoroutine(b, stub, 100) + }) + + b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=5", stub.Name()), func(b *testing.B) { + staticGoroutine(b, stub, 5) + }) + b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=20", stub.Name()), func(b *testing.B) { + staticGoroutine(b, stub, 20) + }) + b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=50", stub.Name()), func(b *testing.B) { + staticGoroutine(b, stub, 50) + }) + b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=100", stub.Name()), func(b *testing.B) { + staticGoroutine(b, stub, 100) + }) + } +} + +func respError(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte("-ERR test error\r\n")) + respErr := proto.RedisError("ERR test error") + var err error + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err = stub.Get(ctx, "key").Err(); err != respErr { + b.Fatalf("response error, got %q, want %q", err, respErr) + } + } +} + +func respStatus(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte("+OK\r\n")) + var val string + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if val = stub.Set(ctx, "key", "value", 0).Val(); val != "OK" { + b.Fatalf("response error, got %q, want OK", val) + } + } +} + +func respInt(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte(":10\r\n")) + var val int64 + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if val = stub.Incr(ctx, "key").Val(); val != 10 { + b.Fatalf("response error, got %q, want 10", val) + } + } +} + +func respString(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte("$5\r\nhello\r\n")) + var val string + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if val = stub.Get(ctx, "key").Val(); val != "hello" { + b.Fatalf("response error, got %q, want hello", val) + } + } +} + +func respArray(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte("*3\r\n$5\r\nhello\r\n:10\r\n+OK\r\n")) + var val []interface{} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if val = stub.MGet(ctx, "key").Val(); len(val) != 3 { + b.Fatalf("response error, got len(%d), want len(3)", len(val)) + } + } +} + +func respPipeline(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte("+OK\r\n$5\r\nhello\r\n:1\r\n")) + var pipe Pipeliner + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pipe = stub.Pipeline() + set := pipe.Set(ctx, "key", "value", 0) + get := pipe.Get(ctx, "key") + del := pipe.Del(ctx, "key") + _, err := pipe.Exec(ctx) + if err != nil { + b.Fatalf("response error, got %q, want nil", err) + } + if set.Val() != "OK" || get.Val() != "hello" || del.Val() != 1 { + b.Fatal("response error") + } + } +} + +func respTxPipeline(b *testing.B, stub *ClientStub) { + stub.SetResponse([]byte("+OK\r\n+QUEUED\r\n+QUEUED\r\n+QUEUED\r\n*3\r\n+OK\r\n$5\r\nhello\r\n:1\r\n")) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + var set *StatusCmd + var get *StringCmd + var del *IntCmd + _, err := stub.TxPipelined(ctx, func(pipe Pipeliner) error { + set = pipe.Set(ctx, "key", "value", 0) + get = pipe.Get(ctx, "key") + del = pipe.Del(ctx, "key") + return nil + }) + if err != nil { + b.Fatalf("response error, got %q, want nil", err) + } + if set.Val() != "OK" || get.Val() != "hello" || del.Val() != 1 { + b.Fatal("response error") + } + } +} + +func dynamicGoroutine(b *testing.B, stub *ClientStub, concurrency int) { + stub.SetResponse([]byte("$5\r\nhello\r\n")) + c := make(chan struct{}, concurrency) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + c <- struct{}{} + go func() { + if val := stub.Get(ctx, "key").Val(); val != "hello" { + panic(fmt.Sprintf("response error, got %q, want hello", val)) + } + <-c + }() + } + // Here no longer wait for all goroutines to complete, it will not affect the test results. + close(c) +} + +func staticGoroutine(b *testing.B, stub *ClientStub, concurrency int) { + stub.SetResponse([]byte("$5\r\nhello\r\n")) + c := make(chan struct{}, concurrency) + + b.ResetTimer() + + for i := 0; i < concurrency; i++ { + go func() { + for { + _, ok := <-c + if !ok { + return + } + if val := stub.Get(ctx, "key").Val(); val != "hello" { + panic(fmt.Sprintf("response error, got %q, want hello", val)) + } + } + }() + } + for i := 0; i < b.N; i++ { + c <- struct{}{} + } + close(c) +}