From 18b2e30835f248bffe475cb69c7267204de1bc1e Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 17 May 2018 15:21:51 +0300 Subject: [PATCH] Cleanup cmds info --- cluster.go | 51 +++++++++++++++++++++++++++++++++++++++------------ command.go | 12 ++++++++---- ring.go | 37 ++++++++++++++++++++----------------- 3 files changed, 67 insertions(+), 33 deletions(-) diff --git a/cluster.go b/cluster.go index 5e6b52c6..67e07e35 100644 --- a/cluster.go +++ b/cluster.go @@ -300,10 +300,9 @@ func (c *clusterNodes) GC(generation uint32) { } } -func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { +func (c *clusterNodes) Get(addr string) (*clusterNode, error) { var node *clusterNode var err error - c.mu.RLock() if c.closed { err = pool.ErrClosed @@ -311,6 +310,11 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { node = c.allNodes[addr] } c.mu.RUnlock() + return node, err +} + +func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { + node, err := c.Get(addr) if err != nil { return nil, err } @@ -580,11 +584,11 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { opt.init() c := &ClusterClient{ - opt: opt, - nodes: newClusterNodes(opt), - cmdsInfoCache: newCmdsInfoCache(), + opt: opt, + nodes: newClusterNodes(opt), } c.state = newClusterStateHolder(c.loadState) + c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) c.process = c.defaultProcess c.processPipeline = c.defaultProcessPipeline @@ -630,17 +634,39 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration { return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) } -func (c *ClusterClient) cmdInfo(name string) *CommandInfo { - cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { - node, err := c.nodes.Random() +func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) { + addrs, err := c.nodes.Addrs() + if err != nil { + return nil, err + } + + var firstErr error + for _, addr := range addrs { + node, err := c.nodes.Get(addr) if err != nil { return nil, err } - return node.Client.Command().Result() - }) + if node == nil { + continue + } + + info, err := node.Client.Command().Result() + if err == nil { + return info, nil + } + if firstErr == nil { + firstErr = err + } + } + return nil, firstErr +} + +func (c *ClusterClient) cmdInfo(name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get() if err != nil { return nil } + info := cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) @@ -704,13 +730,14 @@ func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if len(keys) == 0 { - return fmt.Errorf("redis: keys don't hash to the same slot") + return fmt.Errorf("redis: Watch requires at least one key") } slot := hashtag.Slot(keys[0]) for _, key := range keys[1:] { if hashtag.Slot(key) != slot { - return fmt.Errorf("redis: Watch requires all keys to be in the same slot") + err := fmt.Errorf("redis: Watch requires all keys to be in the same slot") + return err } } diff --git a/command.go b/command.go index 1588ca25..552c897b 100644 --- a/command.go +++ b/command.go @@ -1027,17 +1027,21 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { //------------------------------------------------------------------------------ type cmdsInfoCache struct { + fn func() (map[string]*CommandInfo, error) + once internal.Once cmds map[string]*CommandInfo } -func newCmdsInfoCache() *cmdsInfoCache { - return &cmdsInfoCache{} +func newCmdsInfoCache(fn func() (map[string]*CommandInfo, error)) *cmdsInfoCache { + return &cmdsInfoCache{ + fn: fn, + } } -func (c *cmdsInfoCache) Do(fn func() (map[string]*CommandInfo, error)) (map[string]*CommandInfo, error) { +func (c *cmdsInfoCache) Get() (map[string]*CommandInfo, error) { err := c.once.Do(func() error { - cmds, err := fn() + cmds, err := c.fn() if err != nil { return err } diff --git a/ring.go b/ring.go index 6d287741..362bd031 100644 --- a/ring.go +++ b/ring.go @@ -304,10 +304,11 @@ func NewRing(opt *RingOptions) *Ring { opt.init() ring := &Ring{ - opt: opt, - shards: newRingShards(), - cmdsInfoCache: newCmdsInfoCache(), + opt: opt, + shards: newRingShards(), } + ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -428,21 +429,23 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { } } -func (c *Ring) cmdInfo(name string) *CommandInfo { - cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { - shards := c.shards.List() - firstErr := errRingShardsDown - for _, shard := range shards { - cmdsInfo, err := shard.Client.Command().Result() - if err == nil { - return cmdsInfo, nil - } - if firstErr == nil { - firstErr = err - } +func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) { + shards := c.shards.List() + firstErr := errRingShardsDown + for _, shard := range shards { + cmdsInfo, err := shard.Client.Command().Result() + if err == nil { + return cmdsInfo, nil } - return nil, firstErr - }) + if firstErr == nil { + firstErr = err + } + } + return nil, firstErr +} + +func (c *Ring) cmdInfo(name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get() if err != nil { return nil }