diff --git a/cluster.go b/cluster.go index 7689a74..6aea41b 100644 --- a/cluster.go +++ b/cluster.go @@ -12,6 +12,72 @@ import ( "gopkg.in/redis.v4/internal/pool" ) +// ClusterOptions are used to configure a cluster client and should be +// passed to NewClusterClient. +type ClusterOptions struct { + // A seed list of host:port addresses of cluster nodes. + Addrs []string + + // The maximum number of retries before giving up. Command is retried + // on network errors and MOVED/ASK redirects. + // Default is 16. + MaxRedirects int + + // Enables read queries for a connection to a Redis Cluster slave node. + ReadOnly bool + + // Enables routing read-only queries to the closest master or slave node. + RouteByLatency bool + + // Following options are copied from Options struct. + + Password string + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + // PoolSize applies per cluster node and not for the whole cluster. + PoolSize int + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration +} + +func (opt *ClusterOptions) init() { + if opt.MaxRedirects == -1 { + opt.MaxRedirects = 0 + } else if opt.MaxRedirects == 0 { + opt.MaxRedirects = 16 + } + + if opt.RouteByLatency { + opt.ReadOnly = true + } +} + +func (opt *ClusterOptions) clientOptions() *Options { + const disableIdleCheck = -1 + + return &Options{ + Password: opt.Password, + ReadOnly: opt.ReadOnly, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + + // IdleCheckFrequency is not copied to disable reaper + IdleCheckFrequency: disableIdleCheck, + } +} + +//------------------------------------------------------------------------------ + type clusterNode struct { Client *Client Latency time.Duration @@ -36,8 +102,8 @@ type ClusterClient struct { slots [][]*clusterNode closed bool - cmdsInfo map[string]*CommandInfo cmdsInfoOnce *sync.Once + cmdsInfo map[string]*CommandInfo // Reports where slots reloading is in progress. reloading uint32 @@ -81,19 +147,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { } c.cmdsInfoOnce = &sync.Once{} }) + if c.cmdsInfo == nil { + return nil + } return c.cmdsInfo[name] } func (c *ClusterClient) getNodes() map[string]*clusterNode { var nodes map[string]*clusterNode - c.mu.RLock() if !c.closed { nodes = make(map[string]*clusterNode, len(c.nodes)) + c.mu.RLock() for addr, node := range c.nodes { nodes[addr] = node } + c.mu.RUnlock() } - c.mu.RUnlock() return nodes } @@ -257,18 +326,11 @@ func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) { func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { cmdInfo := c.cmdInfo(cmd.arg(0)) - if cmdInfo == nil { - internal.Logf("info for cmd=%s not found", cmd.arg(0)) + firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + if firstKey == "" { node, err := c.randomNode() - return 0, node, err + return -1, node, err } - - if cmdInfo.FirstKeyPos == -1 { - node, err := c.randomNode() - return 0, node, err - } - - firstKey := cmd.arg(int(cmdInfo.FirstKeyPos)) slot := hashtag.Slot(firstKey) if cmdInfo.ReadOnly && c.opt.ReadOnly { @@ -330,9 +392,11 @@ func (c *ClusterClient) Process(cmd Cmder) error { var addr string moved, ask, addr = errors.IsMoved(err) if moved || ask { - master, _ := c.slotMasterNode(slot) - if moved && (master == nil || master.Client.getAddr() != addr) { - c.lazyReloadSlots() + if slot >= 0 { + master, _ := c.slotMasterNode(slot) + if moved && (master == nil || master.Client.getAddr() != addr) { + c.lazyReloadSlots() + } } node, err = c.nodeByAddr(addr) @@ -609,69 +673,3 @@ func (c *ClusterClient) execClusterCmds( return failedCmds, retErr } - -//------------------------------------------------------------------------------ - -// ClusterOptions are used to configure a cluster client and should be -// passed to NewClusterClient. -type ClusterOptions struct { - // A seed list of host:port addresses of cluster nodes. - Addrs []string - - // The maximum number of retries before giving up. Command is retried - // on network errors and MOVED/ASK redirects. - // Default is 16. - MaxRedirects int - - // Enables read queries for a connection to a Redis Cluster slave node. - ReadOnly bool - - // Enables routing read-only queries to the closest master or slave node. - RouteByLatency bool - - // Following options are copied from Options struct. - - Password string - - DialTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration - - // PoolSize applies per cluster node and not for the whole cluster. - PoolSize int - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration -} - -func (opt *ClusterOptions) init() { - if opt.MaxRedirects == -1 { - opt.MaxRedirects = 0 - } else if opt.MaxRedirects == 0 { - opt.MaxRedirects = 16 - } - - if opt.RouteByLatency { - opt.ReadOnly = true - } -} - -func (opt *ClusterOptions) clientOptions() *Options { - const disableIdleCheck = -1 - - return &Options{ - Password: opt.Password, - ReadOnly: opt.ReadOnly, - - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, - - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - - // IdleCheckFrequency is not copied to disable reaper - IdleCheckFrequency: disableIdleCheck, - } -} diff --git a/command.go b/command.go index 8ac048a..7613203 100644 --- a/command.go +++ b/command.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/go-redis/redis/internal" + "gopkg.in/redis.v4/internal/pool" "gopkg.in/redis.v4/internal/proto" ) @@ -88,6 +90,22 @@ func cmdString(cmd Cmder, val interface{}) string { } +func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { + switch cmd.arg(0) { + case "eval", "evalsha": + if cmd.arg(2) != "0" { + return 3 + } else { + return 0 + } + } + if info == nil { + internal.Logf("info for cmd=%s not found", cmd.arg(0)) + return -1 + } + return int(info.FirstKeyPos) +} + //------------------------------------------------------------------------------ type baseCmd struct { @@ -109,12 +127,11 @@ func (cmd *baseCmd) args() []interface{} { } func (cmd *baseCmd) arg(pos int) string { - if len(cmd._args) > pos { - if s, ok := cmd._args[pos].(string); ok { - return s - } + if pos < 0 || pos >= len(cmd._args) { + return "" } - return "" + s, _ := cmd._args[pos].(string) + return s } func (cmd *baseCmd) readTimeout() *time.Duration { diff --git a/main_test.go b/main_test.go index d79a143..a9afa82 100644 --- a/main_test.go +++ b/main_test.go @@ -138,7 +138,6 @@ func redisRingOptions() *redis.RingOptions { PoolTimeout: 30 * time.Second, IdleTimeout: 500 * time.Millisecond, IdleCheckFrequency: 500 * time.Millisecond, - RouteByEvalKeys: true, } } diff --git a/ring.go b/ring.go index 08d52ef..52badb3 100644 --- a/ring.go +++ b/ring.go @@ -3,6 +3,8 @@ package redis import ( "errors" "fmt" + "math/rand" + "strconv" "sync" "sync/atomic" "time" @@ -40,9 +42,6 @@ type RingOptions struct { PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration - - // RouteByEvalKeys flag to enable eval and evalsha key position parsing for sharding - RouteByEvalKeys bool } func (opt *RingOptions) init() { @@ -131,12 +130,10 @@ type Ring struct { hash *consistenthash.Map shards map[string]*ringShard - cmdsInfo map[string]*CommandInfo cmdsInfoOnce *sync.Once + cmdsInfo map[string]*CommandInfo closed bool - - routeByEvalKeys bool } var _ Cmdable = (*Ring)(nil) @@ -159,7 +156,6 @@ func NewRing(opt *RingOptions) *Ring { clopt.Addr = addr ring.addClient(name, NewClient(clopt)) } - ring.routeByEvalKeys = opt.RouteByEvalKeys go ring.heartbeat() return ring } @@ -227,30 +223,6 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { return c.cmdsInfo[name] } -func (c *Ring) getEvalFirstKey(cmd Cmder) string { - if c.routeByEvalKeys && cmd.arg(2) != "0" { - return cmd.arg(3) - } else { - return cmd.arg(0) - } -} - -func (c *Ring) cmdFirstKey(cmd Cmder) string { - switch cmd.arg(0) { - case "eval": - return c.getEvalFirstKey(cmd) - case "evalsha": - return c.getEvalFirstKey(cmd) - } - - cmdInfo := c.cmdInfo(cmd.arg(0)) - if cmdInfo == nil { - internal.Logf("info for cmd=%s not found", cmd.arg(0)) - return "" - } - return cmd.arg(int(cmdInfo.FirstKeyPos)) -} - func (c *Ring) addClient(name string, cl *Client) { c.mu.Lock() c.hash.Add(name) @@ -258,14 +230,17 @@ func (c *Ring) addClient(name string, cl *Client) { c.mu.Unlock() } -func (c *Ring) getClient(key string) (*Client, error) { +func (c *Ring) shardByKey(key string) (*Client, error) { + key = hashtag.Key(key) + c.mu.RLock() if c.closed { + c.mu.RUnlock() return nil, pool.ErrClosed } - name := c.hash.Get(hashtag.Key(key)) + name := c.hash.Get(key) if name == "" { c.mu.RUnlock() return nil, errRingShardsDown @@ -276,8 +251,32 @@ func (c *Ring) getClient(key string) (*Client, error) { return cl, nil } +func (c *Ring) randomShard() (*Client, error) { + return c.shardByKey(strconv.Itoa(rand.Int())) +} + +func (c *Ring) shardByName(name string) (*Client, error) { + if name == "" { + return c.randomShard() + } + + c.mu.RLock() + cl := c.shards[name].Client + c.mu.RUnlock() + return cl, nil +} + +func (c *Ring) cmdShard(cmd Cmder) (*Client, error) { + cmdInfo := c.cmdInfo(cmd.arg(0)) + firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + if firstKey == "" { + return c.randomShard() + } + return c.shardByKey(firstKey) +} + func (c *Ring) Process(cmd Cmder) error { - cl, err := c.getClient(c.cmdFirstKey(cmd)) + cl, err := c.cmdShard(cmd) if err != nil { cmd.setErr(err) return err @@ -285,17 +284,18 @@ func (c *Ring) Process(cmd Cmder) error { return cl.baseClient.Process(cmd) } -// rebalance removes dead shards from the c. +// rebalance removes dead shards from the Ring. func (c *Ring) rebalance() { - defer c.mu.Unlock() - c.mu.Lock() - - c.hash = consistenthash.New(c.nreplicas, nil) + hash := consistenthash.New(c.nreplicas, nil) for name, shard := range c.shards { if shard.IsUp() { - c.hash.Add(name) + hash.Add(name) } } + + c.mu.Lock() + c.hash = hash + c.mu.Unlock() } // heartbeat monitors state of each shard in the ring. @@ -370,13 +370,10 @@ func (c *Ring) pipelineExec(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { - name := c.hash.Get(hashtag.Key(c.cmdFirstKey(cmd))) - if name == "" { - cmd.setErr(errRingShardsDown) - if retErr == nil { - retErr = errRingShardsDown - } - continue + cmdInfo := c.cmdInfo(cmd.arg(0)) + name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + if name != "" { + name = c.hash.Get(hashtag.Key(name)) } cmdsMap[name] = append(cmdsMap[name], cmd) } @@ -385,7 +382,15 @@ func (c *Ring) pipelineExec(cmds []Cmder) error { failedCmdsMap := make(map[string][]Cmder) for name, cmds := range cmdsMap { - client := c.shards[name].Client + client, err := c.shardByName(name) + if err != nil { + setCmdsErr(cmds, err) + if retErr == nil { + retErr = err + } + continue + } + cn, _, err := client.conn() if err != nil { setCmdsErr(cmds, err)