Extract commands info cache

This commit is contained in:
Vladimir Mihailenco 2018-03-06 14:50:48 +02:00
parent 0082bdcd4b
commit f13fc5381c
4 changed files with 50 additions and 47 deletions

View File

@ -483,22 +483,20 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
type ClusterClient struct { type ClusterClient struct {
cmdable cmdable
opt *ClusterOptions opt *ClusterOptions
nodes *clusterNodes nodes *clusterNodes
cmdsInfoCache *cmdsInfoCache
_state atomic.Value _state atomic.Value
stateErrMu sync.RWMutex stateErrMu sync.RWMutex
stateErr error stateErr error
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
process func(Cmder) error process func(Cmder) error
processPipeline func([]Cmder) error processPipeline func([]Cmder) error
processTxPipeline func([]Cmder) error processTxPipeline func([]Cmder) error
// Reports whether slots reloading is in progress. // Reports whether slots reloading is in progress.
reloading uint32 reloading uint32 // atomic
} }
// NewClusterClient returns a Redis Cluster client as described in // NewClusterClient returns a Redis Cluster client as described in
@ -507,8 +505,9 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init() opt.init()
c := &ClusterClient{ c := &ClusterClient{
opt: opt, opt: opt,
nodes: newClusterNodes(opt), nodes: newClusterNodes(opt),
cmdsInfoCache: newCmdsInfoCache(),
} }
c.process = c.defaultProcess c.process = c.defaultProcess
@ -535,24 +534,17 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
} }
func (c *ClusterClient) cmdInfo(name string) *CommandInfo { func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error { cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) {
node, err := c.nodes.Random() node, err := c.nodes.Random()
if err != nil { if err != nil {
return err return nil, err
} }
return node.Client.Command().Result()
cmdsInfo, err := node.Client.Command().Result()
if err != nil {
return err
}
c.cmdsInfo = cmdsInfo
return nil
}) })
if err != nil { if err != nil {
return nil return nil
} }
info := c.cmdsInfo[name] info := cmdsInfo[name]
if info == nil { if info == nil {
internal.Logf("info for cmd=%s not found", name) internal.Logf("info for cmd=%s not found", name)
} }

View File

@ -1023,3 +1023,26 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
cmd.val = v.(map[string]*CommandInfo) cmd.val = v.(map[string]*CommandInfo)
return nil return nil
} }
//------------------------------------------------------------------------------
type cmdsInfoCache struct {
once internal.Once
cmds map[string]*CommandInfo
}
func newCmdsInfoCache() *cmdsInfoCache {
return &cmdsInfoCache{}
}
func (c *cmdsInfoCache) Do(fn func() (map[string]*CommandInfo, error)) (map[string]*CommandInfo, error) {
err := c.once.Do(func() error {
cmds, err := fn()
if err != nil {
return err
}
c.cmds = cmds
return nil
})
return c.cmds, err
}

View File

@ -11,7 +11,7 @@ import (
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
) )
// Nil reply redis returned when key does not exist. // Nil reply Redis returns when key does not exist.
const Nil = proto.Nil const Nil = proto.Nil
func init() { func init() {
@ -119,10 +119,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
return nil return nil
} }
// WrapProcess replaces the process func. It takes a function createWrapper // WrapProcess wraps function that processes Redis commands.
// which is supplied by the user. createWrapper takes the old process func as
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.process = fn(c.process) c.process = fn(c.process)
} }

37
ring.go
View File

@ -15,6 +15,8 @@ import (
"github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/pool"
) )
const nreplicas = 100
var errRingShardsDown = errors.New("redis: all ring shards are down") var errRingShardsDown = errors.New("redis: all ring shards are down")
// RingOptions are used to configure a ring client and should be // RingOptions are used to configure a ring client and should be
@ -142,30 +144,25 @@ func (shard *ringShard) Vote(up bool) bool {
type Ring struct { type Ring struct {
cmdable cmdable
opt *RingOptions opt *RingOptions
nreplicas int cmdsInfoCache *cmdsInfoCache
mu sync.RWMutex mu sync.RWMutex
hash *consistenthash.Map hash *consistenthash.Map
shards map[string]*ringShard shards map[string]*ringShard // read only
shardsList []*ringShard shardsList []*ringShard // read only
processPipeline func([]Cmder) error processPipeline func([]Cmder) error
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
closed bool closed bool
} }
func NewRing(opt *RingOptions) *Ring { func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
opt.init() opt.init()
ring := &Ring{ ring := &Ring{
opt: opt, opt: opt,
nreplicas: nreplicas, cmdsInfoCache: newCmdsInfoCache(),
hash: consistenthash.New(nreplicas, nil), hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard), shards: make(map[string]*ringShard),
@ -186,11 +183,9 @@ func NewRing(opt *RingOptions) *Ring {
func (c *Ring) addShard(name string, cl *Client) { func (c *Ring) addShard(name string, cl *Client) {
shard := &ringShard{Client: cl} shard := &ringShard{Client: cl}
c.mu.Lock()
c.hash.Add(name) c.hash.Add(name)
c.shards[name] = shard c.shards[name] = shard
c.shardsList = append(c.shardsList, shard) c.shardsList = append(c.shardsList, shard)
c.mu.Unlock()
} }
// Options returns read-only Options that were used to create the client. // Options returns read-only Options that were used to create the client.
@ -285,31 +280,27 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
} }
func (c *Ring) cmdInfo(name string) *CommandInfo { func (c *Ring) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error { cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) {
c.mu.RLock() c.mu.RLock()
shards := c.shardsList shards := c.shardsList
c.mu.RUnlock() c.mu.RUnlock()
var firstErr error firstErr := errRingShardsDown
for _, shard := range shards { for _, shard := range shards {
cmdsInfo, err := shard.Client.Command().Result() cmdsInfo, err := shard.Client.Command().Result()
if err == nil { if err == nil {
c.cmdsInfo = cmdsInfo return cmdsInfo, nil
return nil
} }
if firstErr == nil { if firstErr == nil {
firstErr = err firstErr = err
} }
} }
return firstErr return nil, firstErr
}) })
if err != nil { if err != nil {
return nil return nil
} }
if c.cmdsInfo == nil { info := cmdsInfo[name]
return nil
}
info := c.cmdsInfo[name]
if info == nil { if info == nil {
internal.Logf("info for cmd=%s not found", name) internal.Logf("info for cmd=%s not found", name)
} }
@ -380,7 +371,7 @@ func (c *Ring) Process(cmd Cmder) error {
// rebalance removes dead shards from the Ring. // rebalance removes dead shards from the Ring.
func (c *Ring) rebalance() { func (c *Ring) rebalance() {
hash := consistenthash.New(c.nreplicas, nil) hash := consistenthash.New(nreplicas, nil)
for name, shard := range c.shards { for name, shard := range c.shards {
if shard.IsUp() { if shard.IsUp() {
hash.Add(name) hash.Add(name)