forked from mirror/redis
Better cluster loading handling.
This commit is contained in:
parent
be32042426
commit
fcf53a2a78
41
cluster.go
41
cluster.go
|
@ -15,6 +15,11 @@ import (
|
||||||
type clusterNode struct {
|
type clusterNode struct {
|
||||||
Client *Client
|
Client *Client
|
||||||
Latency time.Duration
|
Latency time.Duration
|
||||||
|
loading time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) Loading() bool {
|
||||||
|
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClusterClient is a Redis Cluster client representing a pool of zero
|
// ClusterClient is a Redis Cluster client representing a pool of zero
|
||||||
|
@ -218,10 +223,20 @@ func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) {
|
||||||
case 1:
|
case 1:
|
||||||
return nodes[0], nil
|
return nodes[0], nil
|
||||||
case 2:
|
case 2:
|
||||||
return nodes[1], nil
|
if slave := nodes[1]; !slave.Loading() {
|
||||||
|
return slave, nil
|
||||||
|
}
|
||||||
|
return nodes[0], nil
|
||||||
default:
|
default:
|
||||||
n := rand.Intn(len(nodes)-1) + 1
|
var slave *clusterNode
|
||||||
return nodes[n], nil
|
for i := 0; i < 10; i++ {
|
||||||
|
n := rand.Intn(len(nodes)-1) + 1
|
||||||
|
slave = nodes[n]
|
||||||
|
if !slave.Loading() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return slave, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -287,28 +302,22 @@ func (c *ClusterClient) Process(cmd Cmder) error {
|
||||||
pipe := node.Client.Pipeline()
|
pipe := node.Client.Pipeline()
|
||||||
pipe.Process(NewCmd("ASKING"))
|
pipe.Process(NewCmd("ASKING"))
|
||||||
pipe.Process(cmd)
|
pipe.Process(cmd)
|
||||||
_, _ = pipe.Exec()
|
_, err = pipe.Exec()
|
||||||
pipe.Close()
|
pipe.Close()
|
||||||
ask = false
|
ask = false
|
||||||
} else {
|
} else {
|
||||||
node.Client.Process(cmd)
|
err = node.Client.Process(cmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is no (real) error, we are done!
|
// If there is no (real) error - we are done.
|
||||||
err := cmd.Err()
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If slave is loading, read from master
|
// If slave is loading - read from master.
|
||||||
if errors.IsLoading(cmd.Err()) && c.opt.ReadOnly {
|
if c.opt.ReadOnly && errors.IsLoading(err) {
|
||||||
trynode, err := c.slotMasterNode(slot)
|
node.loading = time.Now()
|
||||||
if err == nil && trynode != node {
|
continue
|
||||||
node = trynode
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// On network errors try random node.
|
// On network errors try random node.
|
||||||
|
|
|
@ -10,8 +10,6 @@ const Nil = RedisError("redis: nil")
|
||||||
|
|
||||||
type RedisError string
|
type RedisError string
|
||||||
|
|
||||||
var LoadingError RedisError = "LOADING Redis is loading the dataset in memory"
|
|
||||||
|
|
||||||
func (e RedisError) Error() string { return string(e) }
|
func (e RedisError) Error() string { return string(e) }
|
||||||
|
|
||||||
func IsRetryable(err error) bool {
|
func IsRetryable(err error) bool {
|
||||||
|
@ -69,5 +67,5 @@ func IsMoved(err error) (moved bool, ask bool, addr string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsLoading(err error) bool {
|
func IsLoading(err error) bool {
|
||||||
return err.Error() == string(LoadingError)
|
return strings.HasPrefix(err.Error(), "LOADING")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue