diff --git a/cluster.go b/cluster.go index 781dedb..7a4c2c8 100644 --- a/cluster.go +++ b/cluster.go @@ -483,22 +483,20 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { type ClusterClient struct { cmdable - opt *ClusterOptions - nodes *clusterNodes + opt *ClusterOptions + nodes *clusterNodes + cmdsInfoCache *cmdsInfoCache _state atomic.Value stateErrMu sync.RWMutex stateErr error - cmdsInfoOnce internal.Once - cmdsInfo map[string]*CommandInfo - process func(Cmder) error processPipeline func([]Cmder) error processTxPipeline func([]Cmder) error // Reports whether slots reloading is in progress. - reloading uint32 + reloading uint32 // atomic } // NewClusterClient returns a Redis Cluster client as described in @@ -507,8 +505,9 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { opt.init() c := &ClusterClient{ - opt: opt, - nodes: newClusterNodes(opt), + opt: opt, + nodes: newClusterNodes(opt), + cmdsInfoCache: newCmdsInfoCache(), } c.process = c.defaultProcess @@ -535,24 +534,17 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration { } func (c *ClusterClient) cmdInfo(name string) *CommandInfo { - err := c.cmdsInfoOnce.Do(func() error { + cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { node, err := c.nodes.Random() if err != nil { - return err + return nil, err } - - cmdsInfo, err := node.Client.Command().Result() - if err != nil { - return err - } - - c.cmdsInfo = cmdsInfo - return nil + return node.Client.Command().Result() }) if err != nil { return nil } - info := c.cmdsInfo[name] + info := cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) } diff --git a/command.go b/command.go index 7d45c3f..1588ca2 100644 --- a/command.go +++ b/command.go @@ -1023,3 +1023,26 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { cmd.val = v.(map[string]*CommandInfo) return nil } + +//------------------------------------------------------------------------------ + +type cmdsInfoCache struct { + once internal.Once + cmds map[string]*CommandInfo +} + +func newCmdsInfoCache() *cmdsInfoCache { + return &cmdsInfoCache{} +} + +func (c *cmdsInfoCache) Do(fn func() (map[string]*CommandInfo, error)) (map[string]*CommandInfo, error) { + err := c.once.Do(func() error { + cmds, err := fn() + if err != nil { + return err + } + c.cmds = cmds + return nil + }) + return c.cmds, err +} diff --git a/redis.go b/redis.go index 20f1ed1..1f2167a 100644 --- a/redis.go +++ b/redis.go @@ -11,7 +11,7 @@ import ( "github.com/go-redis/redis/internal/proto" ) -// Nil reply redis returned when key does not exist. +// Nil reply Redis returns when key does not exist. const Nil = proto.Nil func init() { @@ -119,10 +119,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } -// WrapProcess replaces the process func. It takes a function createWrapper -// which is supplied by the user. createWrapper takes the old process func as -// an input and returns the new wrapper process func. createWrapper should -// use call the old process func within the new process func. +// WrapProcess wraps function that processes Redis commands. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { c.process = fn(c.process) } diff --git a/ring.go b/ring.go index 10f33ed..d07e806 100644 --- a/ring.go +++ b/ring.go @@ -15,6 +15,8 @@ import ( "github.com/go-redis/redis/internal/pool" ) +const nreplicas = 100 + var errRingShardsDown = errors.New("redis: all ring shards are down") // RingOptions are used to configure a ring client and should be @@ -142,30 +144,25 @@ func (shard *ringShard) Vote(up bool) bool { type Ring struct { cmdable - opt *RingOptions - nreplicas int + opt *RingOptions + cmdsInfoCache *cmdsInfoCache mu sync.RWMutex hash *consistenthash.Map - shards map[string]*ringShard - shardsList []*ringShard + shards map[string]*ringShard // read only + shardsList []*ringShard // read only processPipeline func([]Cmder) error - cmdsInfoOnce internal.Once - cmdsInfo map[string]*CommandInfo - closed bool } func NewRing(opt *RingOptions) *Ring { - const nreplicas = 100 - opt.init() ring := &Ring{ - opt: opt, - nreplicas: nreplicas, + opt: opt, + cmdsInfoCache: newCmdsInfoCache(), hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), @@ -186,11 +183,9 @@ func NewRing(opt *RingOptions) *Ring { func (c *Ring) addShard(name string, cl *Client) { shard := &ringShard{Client: cl} - c.mu.Lock() c.hash.Add(name) c.shards[name] = shard c.shardsList = append(c.shardsList, shard) - c.mu.Unlock() } // Options returns read-only Options that were used to create the client. @@ -285,31 +280,27 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { } func (c *Ring) cmdInfo(name string) *CommandInfo { - err := c.cmdsInfoOnce.Do(func() error { + cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { c.mu.RLock() shards := c.shardsList c.mu.RUnlock() - var firstErr error + firstErr := errRingShardsDown for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { - c.cmdsInfo = cmdsInfo - return nil + return cmdsInfo, nil } if firstErr == nil { firstErr = err } } - return firstErr + return nil, firstErr }) if err != nil { return nil } - if c.cmdsInfo == nil { - return nil - } - info := c.cmdsInfo[name] + info := cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) } @@ -380,7 +371,7 @@ func (c *Ring) Process(cmd Cmder) error { // rebalance removes dead shards from the Ring. func (c *Ring) rebalance() { - hash := consistenthash.New(c.nreplicas, nil) + hash := consistenthash.New(nreplicas, nil) for name, shard := range c.shards { if shard.IsUp() { hash.Add(name)