forked from mirror/redis
Merge pull request #391 from go-redis/fix/cluster-loading
Better cluster loading handling.
This commit is contained in:
commit
5aae583e0c
41
cluster.go
41
cluster.go
|
@ -15,6 +15,11 @@ import (
|
|||
type clusterNode struct {
|
||||
Client *Client
|
||||
Latency time.Duration
|
||||
loading time.Time
|
||||
}
|
||||
|
||||
func (n *clusterNode) Loading() bool {
|
||||
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
|
||||
}
|
||||
|
||||
// ClusterClient is a Redis Cluster client representing a pool of zero
|
||||
|
@ -218,10 +223,20 @@ func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) {
|
|||
case 1:
|
||||
return nodes[0], nil
|
||||
case 2:
|
||||
return nodes[1], nil
|
||||
if slave := nodes[1]; !slave.Loading() {
|
||||
return slave, nil
|
||||
}
|
||||
return nodes[0], nil
|
||||
default:
|
||||
n := rand.Intn(len(nodes)-1) + 1
|
||||
return nodes[n], nil
|
||||
var slave *clusterNode
|
||||
for i := 0; i < 10; i++ {
|
||||
n := rand.Intn(len(nodes)-1) + 1
|
||||
slave = nodes[n]
|
||||
if !slave.Loading() {
|
||||
break
|
||||
}
|
||||
}
|
||||
return slave, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -287,28 +302,22 @@ func (c *ClusterClient) Process(cmd Cmder) error {
|
|||
pipe := node.Client.Pipeline()
|
||||
pipe.Process(NewCmd("ASKING"))
|
||||
pipe.Process(cmd)
|
||||
_, _ = pipe.Exec()
|
||||
_, err = pipe.Exec()
|
||||
pipe.Close()
|
||||
ask = false
|
||||
} else {
|
||||
node.Client.Process(cmd)
|
||||
err = node.Client.Process(cmd)
|
||||
}
|
||||
|
||||
// If there is no (real) error, we are done!
|
||||
err := cmd.Err()
|
||||
// If there is no (real) error - we are done.
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If slave is loading, read from master
|
||||
if errors.IsLoading(cmd.Err()) && c.opt.ReadOnly {
|
||||
trynode, err := c.slotMasterNode(slot)
|
||||
if err == nil && trynode != node {
|
||||
node = trynode
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
// If slave is loading - read from master.
|
||||
if c.opt.ReadOnly && errors.IsLoading(err) {
|
||||
node.loading = time.Now()
|
||||
continue
|
||||
}
|
||||
|
||||
// On network errors try random node.
|
||||
|
|
|
@ -10,8 +10,6 @@ const Nil = RedisError("redis: nil")
|
|||
|
||||
type RedisError string
|
||||
|
||||
var LoadingError RedisError = "LOADING Redis is loading the dataset in memory"
|
||||
|
||||
func (e RedisError) Error() string { return string(e) }
|
||||
|
||||
func IsRetryable(err error) bool {
|
||||
|
@ -69,5 +67,5 @@ func IsMoved(err error) (moved bool, ask bool, addr string) {
|
|||
}
|
||||
|
||||
func IsLoading(err error) bool {
|
||||
return err.Error() == string(LoadingError)
|
||||
return strings.HasPrefix(err.Error(), "LOADING")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue