From fcf53a2a787f0e1398a961fdccf877b81e97535b Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sun, 9 Oct 2016 08:18:57 +0000 Subject: [PATCH] Better cluster loading handling. --- cluster.go | 41 ++++++++++++++++++++++++--------------- internal/errors/errors.go | 4 +--- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/cluster.go b/cluster.go index e5011544..7689a74e 100644 --- a/cluster.go +++ b/cluster.go @@ -15,6 +15,11 @@ import ( type clusterNode struct { Client *Client Latency time.Duration + loading time.Time +} + +func (n *clusterNode) Loading() bool { + return !n.loading.IsZero() && time.Since(n.loading) < time.Minute } // ClusterClient is a Redis Cluster client representing a pool of zero @@ -218,10 +223,20 @@ func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) { case 1: return nodes[0], nil case 2: - return nodes[1], nil + if slave := nodes[1]; !slave.Loading() { + return slave, nil + } + return nodes[0], nil default: - n := rand.Intn(len(nodes)-1) + 1 - return nodes[n], nil + var slave *clusterNode + for i := 0; i < 10; i++ { + n := rand.Intn(len(nodes)-1) + 1 + slave = nodes[n] + if !slave.Loading() { + break + } + } + return slave, nil } } @@ -287,28 +302,22 @@ func (c *ClusterClient) Process(cmd Cmder) error { pipe := node.Client.Pipeline() pipe.Process(NewCmd("ASKING")) pipe.Process(cmd) - _, _ = pipe.Exec() + _, err = pipe.Exec() pipe.Close() ask = false } else { - node.Client.Process(cmd) + err = node.Client.Process(cmd) } - // If there is no (real) error, we are done! - err := cmd.Err() + // If there is no (real) error - we are done. if err == nil { return nil } - // If slave is loading, read from master - if errors.IsLoading(cmd.Err()) && c.opt.ReadOnly { - trynode, err := c.slotMasterNode(slot) - if err == nil && trynode != node { - node = trynode - continue - } else { - break - } + // If slave is loading - read from master. + if c.opt.ReadOnly && errors.IsLoading(err) { + node.loading = time.Now() + continue } // On network errors try random node. diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 49a80eae..c5abc0be 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -10,8 +10,6 @@ const Nil = RedisError("redis: nil") type RedisError string -var LoadingError RedisError = "LOADING Redis is loading the dataset in memory" - func (e RedisError) Error() string { return string(e) } func IsRetryable(err error) bool { @@ -69,5 +67,5 @@ func IsMoved(err error) (moved bool, ask bool, addr string) { } func IsLoading(err error) bool { - return err.Error() == string(LoadingError) + return strings.HasPrefix(err.Error(), "LOADING") }