Cleanup cmds info

This commit is contained in:
Vladimir Mihailenco 2018-05-17 15:21:51 +03:00
parent d5f5c24505
commit 18b2e30835
3 changed files with 67 additions and 33 deletions

View File

@ -300,10 +300,9 @@ func (c *clusterNodes) GC(generation uint32) {
} }
} }
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
var node *clusterNode var node *clusterNode
var err error var err error
c.mu.RLock() c.mu.RLock()
if c.closed { if c.closed {
err = pool.ErrClosed err = pool.ErrClosed
@ -311,6 +310,11 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node = c.allNodes[addr] node = c.allNodes[addr]
} }
c.mu.RUnlock() c.mu.RUnlock()
return node, err
}
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, err := c.Get(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -580,11 +584,11 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init() opt.init()
c := &ClusterClient{ c := &ClusterClient{
opt: opt, opt: opt,
nodes: newClusterNodes(opt), nodes: newClusterNodes(opt),
cmdsInfoCache: newCmdsInfoCache(),
} }
c.state = newClusterStateHolder(c.loadState) c.state = newClusterStateHolder(c.loadState)
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
c.process = c.defaultProcess c.process = c.defaultProcess
c.processPipeline = c.defaultProcessPipeline c.processPipeline = c.defaultProcessPipeline
@ -630,17 +634,39 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
} }
func (c *ClusterClient) cmdInfo(name string) *CommandInfo { func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { addrs, err := c.nodes.Addrs()
node, err := c.nodes.Random() if err != nil {
return nil, err
}
var firstErr error
for _, addr := range addrs {
node, err := c.nodes.Get(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return node.Client.Command().Result() if node == nil {
}) continue
}
info, err := node.Client.Command().Result()
if err == nil {
return info, nil
}
if firstErr == nil {
firstErr = err
}
}
return nil, firstErr
}
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
cmdsInfo, err := c.cmdsInfoCache.Get()
if err != nil { if err != nil {
return nil return nil
} }
info := cmdsInfo[name] info := cmdsInfo[name]
if info == nil { if info == nil {
internal.Logf("info for cmd=%s not found", name) internal.Logf("info for cmd=%s not found", name)
@ -704,13 +730,14 @@ func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 { if len(keys) == 0 {
return fmt.Errorf("redis: keys don't hash to the same slot") return fmt.Errorf("redis: Watch requires at least one key")
} }
slot := hashtag.Slot(keys[0]) slot := hashtag.Slot(keys[0])
for _, key := range keys[1:] { for _, key := range keys[1:] {
if hashtag.Slot(key) != slot { if hashtag.Slot(key) != slot {
return fmt.Errorf("redis: Watch requires all keys to be in the same slot") err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
return err
} }
} }

View File

@ -1027,17 +1027,21 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type cmdsInfoCache struct { type cmdsInfoCache struct {
fn func() (map[string]*CommandInfo, error)
once internal.Once once internal.Once
cmds map[string]*CommandInfo cmds map[string]*CommandInfo
} }
func newCmdsInfoCache() *cmdsInfoCache { func newCmdsInfoCache(fn func() (map[string]*CommandInfo, error)) *cmdsInfoCache {
return &cmdsInfoCache{} return &cmdsInfoCache{
fn: fn,
}
} }
func (c *cmdsInfoCache) Do(fn func() (map[string]*CommandInfo, error)) (map[string]*CommandInfo, error) { func (c *cmdsInfoCache) Get() (map[string]*CommandInfo, error) {
err := c.once.Do(func() error { err := c.once.Do(func() error {
cmds, err := fn() cmds, err := c.fn()
if err != nil { if err != nil {
return err return err
} }

37
ring.go
View File

@ -304,10 +304,11 @@ func NewRing(opt *RingOptions) *Ring {
opt.init() opt.init()
ring := &Ring{ ring := &Ring{
opt: opt, opt: opt,
shards: newRingShards(), shards: newRingShards(),
cmdsInfoCache: newCmdsInfoCache(),
} }
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.processPipeline = ring.defaultProcessPipeline ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process) ring.cmdable.setProcessor(ring.Process)
@ -428,21 +429,23 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
} }
} }
func (c *Ring) cmdInfo(name string) *CommandInfo { func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { shards := c.shards.List()
shards := c.shards.List() firstErr := errRingShardsDown
firstErr := errRingShardsDown for _, shard := range shards {
for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result()
cmdsInfo, err := shard.Client.Command().Result() if err == nil {
if err == nil { return cmdsInfo, nil
return cmdsInfo, nil
}
if firstErr == nil {
firstErr = err
}
} }
return nil, firstErr if firstErr == nil {
}) firstErr = err
}
}
return nil, firstErr
}
func (c *Ring) cmdInfo(name string) *CommandInfo {
cmdsInfo, err := c.cmdsInfoCache.Get()
if err != nil { if err != nil {
return nil return nil
} }