Speedup WithContext

This commit is contained in:
Vladimir Mihailenco 2019-05-31 17:03:20 +03:00
parent 8476dfea4a
commit 84422d7ae7
9 changed files with 717 additions and 690 deletions

View File

@ -2,6 +2,7 @@ package redis_test
import (
"bytes"
"context"
"fmt"
"strings"
"testing"
@ -198,3 +199,140 @@ func BenchmarkZAdd(b *testing.B) {
}
})
}
var clientSink *redis.Client
func BenchmarkWithContext(b *testing.B) {
rdb := benchmarkRedisClient(10)
defer rdb.Close()
ctx := context.Background()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
clientSink = rdb.WithContext(ctx)
}
}
var ringSink *redis.Ring
func BenchmarkRingWithContext(b *testing.B) {
rdb := redis.NewRing(&redis.RingOptions{})
defer rdb.Close()
ctx := context.Background()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ringSink = rdb.WithContext(ctx)
}
}
//------------------------------------------------------------------------------
func newClusterScenario() *clusterScenario {
return &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
}
func BenchmarkClusterPing(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := client.Ping().Err()
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkClusterSetString(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
value := string(bytes.Repeat([]byte{'1'}, 10000))
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := client.Set("key", value, 0).Err()
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkClusterReloadState(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.ReloadState()
if err != nil {
b.Fatal(err)
}
}
}
var clusterSink *redis.ClusterClient
func BenchmarkClusterWithContext(b *testing.B) {
rdb := redis.NewClusterClient(&redis.ClusterOptions{})
defer rdb.Close()
ctx := context.Background()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
clusterSink = rdb.WithContext(ctx)
}
}

View File

@ -639,19 +639,22 @@ func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
//------------------------------------------------------------------------------
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
type clusterClient struct {
cmdable
hooks
opt *ClusterOptions
nodes *clusterNodes
state *clusterStateHolder
cmdsInfoCache *cmdsInfoCache
}
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
*clusterClient
ctx context.Context
hooks
}
// NewClusterClient returns a Redis Cluster client as described in
@ -660,8 +663,10 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()
c := &ClusterClient{
opt: opt,
nodes: newClusterNodes(opt),
clusterClient: &clusterClient{
opt: opt,
nodes: newClusterNodes(opt),
},
}
c.state = newClusterStateHolder(c.loadState)
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
@ -675,7 +680,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
}
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
c.cmdable = c.Process
}
func (c *ClusterClient) Context() context.Context {
@ -689,15 +694,8 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2.ctx = ctx
return c2
}
func (c *ClusterClient) clone() *ClusterClient {
clone := *c
clone.hooks.copy()
clone.init()
clone.ctx = ctx
return &clone
}
@ -1013,7 +1011,7 @@ func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}
@ -1207,7 +1205,7 @@ func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.processTxPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}

View File

@ -1,13 +1,11 @@
package redis_test
import (
"bytes"
"fmt"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/go-redis/redis"
@ -1054,92 +1052,3 @@ var _ = Describe("ClusterClient timeout", func() {
testTimeout()
})
})
//------------------------------------------------------------------------------
func newClusterScenario() *clusterScenario {
return &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
}
func BenchmarkClusterPing(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := client.Ping().Err()
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkClusterSetString(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
value := string(bytes.Repeat([]byte{'1'}, 10000))
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := client.Set("key", value, 0).Err()
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkClusterReloadState(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.ReloadState()
if err != nil {
b.Fatal(err)
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -36,6 +36,7 @@ var _ Pipeliner = (*Pipeline)(nil)
// http://redis.io/topics/pipelining. It's safe for concurrent use
// by multiple goroutines.
type Pipeline struct {
cmdable
statefulCmdable
exec pipelineExecer
@ -45,6 +46,11 @@ type Pipeline struct {
closed bool
}
func (c *Pipeline) init() {
c.cmdable = c.Process
c.statefulCmdable = c.Process
}
func (c *Pipeline) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
_ = c.Process(cmd)

View File

@ -37,12 +37,12 @@ type hooks struct {
hooks []Hook
}
func (hs *hooks) AddHook(hook Hook) {
hs.hooks = append(hs.hooks, hook)
func (hs *hooks) lazyCopy() {
hs.hooks = hs.hooks[:len(hs.hooks):len(hs.hooks)]
}
func (hs *hooks) copy() {
hs.hooks = hs.hooks[:len(hs.hooks):len(hs.hooks)]
func (hs *hooks) AddHook(hook Hook) {
hs.hooks = append(hs.hooks, hook)
}
func (hs hooks) process(ctx context.Context, cmd Cmder, fn func(Cmder) error) error {
@ -452,15 +452,18 @@ func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
//------------------------------------------------------------------------------
type client struct {
baseClient
cmdable
hooks
}
// Client is a Redis client representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
type Client struct {
baseClient
cmdable
*client
ctx context.Context
hooks
}
// NewClient returns a client to the Redis Server specified by Options.
@ -468,9 +471,11 @@ func NewClient(opt *Options) *Client {
opt.init()
c := Client{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
client: &client{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
},
},
}
c.init()
@ -479,7 +484,7 @@ func NewClient(opt *Options) *Client {
}
func (c *Client) init() {
c.cmdable.setProcessor(c.Process)
c.cmdable = c.Process
}
func (c *Client) Context() context.Context {
@ -493,15 +498,8 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2.ctx = ctx
return c2
}
func (c *Client) clone() *Client {
clone := *c
clone.hooks.copy()
clone.init()
clone.ctx = ctx
return &clone
}
@ -543,7 +541,7 @@ func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}
@ -556,7 +554,7 @@ func (c *Client) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.processTxPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}
@ -622,6 +620,7 @@ func (c *Client) PSubscribe(channels ...string) *PubSub {
// Conn is like Client, but its pool contains single connection.
type Conn struct {
baseClient
cmdable
statefulCmdable
}
@ -632,7 +631,8 @@ func newConn(opt *Options, cn *pool.Conn) *Conn {
connPool: pool.NewSingleConnPool(cn),
},
}
c.statefulCmdable.setProcessor(c.Process)
c.cmdable = c.Process
c.statefulCmdable = c.Process
return &c
}
@ -648,7 +648,7 @@ func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}
@ -661,6 +661,6 @@ func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.processTxPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}

70
ring.go
View File

@ -323,6 +323,14 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
type ring struct {
cmdable
hooks
opt *RingOptions
shards *ringShards
cmdsInfoCache *cmdsInfoCache
}
// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
@ -338,26 +346,23 @@ func (c *ringShards) Close() error {
// and can tolerate losing data when one of the servers dies.
// Otherwise you should use Redis Cluster.
type Ring struct {
cmdable
opt *RingOptions
shards *ringShards
cmdsInfoCache *cmdsInfoCache
*ring
ctx context.Context
hooks
}
func NewRing(opt *RingOptions) *Ring {
opt.init()
ring := &Ring{
opt: opt,
shards: newRingShards(opt),
ring := Ring{
ring: &ring{
opt: opt,
shards: newRingShards(opt),
},
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.init()
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
@ -366,11 +371,11 @@ func NewRing(opt *RingOptions) *Ring {
go ring.shards.Heartbeat(opt.HeartbeatFrequency)
return ring
return &ring
}
func (c *Ring) init() {
c.cmdable.setProcessor(c.Process)
c.cmdable = c.Process
}
func (c *Ring) Context() context.Context {
@ -384,16 +389,20 @@ func (c *Ring) WithContext(ctx context.Context) *Ring {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2.ctx = ctx
return c2
clone := *c
clone.ctx = ctx
return &clone
}
func (c *Ring) clone() *Ring {
cp := *c
cp.init()
// Do creates a Cmd from the args and processes the cmd.
func (c *Ring) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
c.Process(cmd)
return cmd
}
return &cp
func (c *Ring) Process(cmd Cmder) error {
return c.hooks.process(c.ctx, cmd, c.process)
}
// Options returns read-only Options that were used to create the client.
@ -523,17 +532,6 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shards.GetByKey(firstKey)
}
// Do creates a Cmd from the args and processes the cmd.
func (c *Ring) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
c.Process(cmd)
return cmd
}
func (c *Ring) Process(cmd Cmder) error {
return c.hooks.process(c.ctx, cmd, c.process)
}
func (c *Ring) process(cmd Cmder) error {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
@ -557,18 +555,18 @@ func (c *Ring) process(cmd Cmder) error {
return cmd.Err()
}
func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
}
pipe.cmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}
func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
func (c *Ring) processPipeline(cmds []Cmder) error {
return c.hooks.processPipeline(c.ctx, cmds, c._processPipeline)
}

View File

@ -87,14 +87,15 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
}
c := Client{
baseClient: baseClient{
opt: opt,
connPool: failover.Pool(),
onClose: failover.Close,
client: &client{
baseClient: baseClient{
opt: opt,
connPool: failover.Pool(),
onClose: failover.Close,
},
},
}
c.cmdable.setProcessor(c.Process)
c.cmdable = c.Process
return &c
}
@ -129,13 +130,8 @@ func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2.ctx = ctx
return c2
}
func (c *SentinelClient) clone() *SentinelClient {
clone := *c
clone.ctx = ctx
return &clone
}

14
tx.go
View File

@ -15,6 +15,7 @@ const TxFailedErr = proto.RedisError("redis: transaction failed")
// by multiple goroutines, because Exec resets list of watched keys.
// If you don't need WATCH it is better to use Pipeline.
type Tx struct {
cmdable
statefulCmdable
baseClient
@ -34,7 +35,8 @@ func (c *Client) newTx() *Tx {
}
func (c *Tx) init() {
c.statefulCmdable.setProcessor(c.Process)
c.cmdable = c.Process
c.statefulCmdable = c.Process
}
func (c *Tx) Context() context.Context {
@ -48,14 +50,8 @@ func (c *Tx) WithContext(ctx context.Context) *Tx {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2.ctx = ctx
return c2
}
func (c *Tx) clone() *Tx {
clone := *c
clone.init()
clone.ctx = ctx
return &clone
}
@ -117,7 +113,7 @@ func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processTxPipeline,
}
pipe.statefulCmdable.setProcessor(pipe.Process)
pipe.init()
return &pipe
}