diff --git a/cluster.go b/cluster.go index c33b5bc..1ada183 100644 --- a/cluster.go +++ b/cluster.go @@ -155,7 +155,7 @@ type clusterNode struct { latency uint32 // atomic generation uint32 // atomic - loading uint32 // atomic + failing uint32 // atomic } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -203,21 +203,21 @@ func (n *clusterNode) Latency() time.Duration { return time.Duration(latency) * time.Microsecond } -func (n *clusterNode) MarkAsLoading() { - atomic.StoreUint32(&n.loading, uint32(time.Now().Unix())) +func (n *clusterNode) MarkAsFailing() { + atomic.StoreUint32(&n.failing, uint32(time.Now().Unix())) } -func (n *clusterNode) Loading() bool { - const minute = int64(time.Minute / time.Second) +func (n *clusterNode) Failing() bool { + const timeout = 15 // 15 seconds - loading := atomic.LoadUint32(&n.loading) - if loading == 0 { + failing := atomic.LoadUint32(&n.failing) + if failing == 0 { return false } - if time.Now().Unix()-int64(loading) < minute { + if time.Now().Unix()-int64(failing) < timeout { return true } - atomic.StoreUint32(&n.loading, 0) + atomic.StoreUint32(&n.failing, 0) return false } @@ -522,7 +522,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { case 1: return nodes[0], nil case 2: - if slave := nodes[1]; !slave.Loading() { + if slave := nodes[1]; !slave.Failing() { return slave, nil } return nodes[0], nil @@ -531,7 +531,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { for i := 0; i < 10; i++ { n := rand.Intn(len(nodes)-1) + 1 slave = nodes[n] - if !slave.Loading() { + if !slave.Failing() { return slave, nil } } @@ -551,7 +551,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { var node *clusterNode for _, n := range nodes { - if n.Loading() { + if n.Failing() { continue } if node == nil || node.Latency()-n.Latency() > threshold { @@ -561,10 +561,13 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { return node, nil } -func (c *clusterState) slotRandomNode(slot int) *clusterNode { +func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) + if len(nodes) == 0 { + return c.nodes.Random() + } n := rand.Intn(len(nodes)) - return nodes[n] + return nodes[n], nil } func (c *clusterState) slotNodes(slot int) []*clusterNode { @@ -742,23 +745,26 @@ func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error { } func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { + cmdInfo := c.cmdInfo(cmd.Name()) + slot := c.cmdSlot(cmd) + var node *clusterNode var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + var err error + if attempt > 0 { time.Sleep(c.retryBackoff(attempt)) } if node == nil { - var err error - _, node, err = c.cmdSlotAndNode(cmd) + node, err = c.cmdNode(cmdInfo, slot) if err != nil { cmd.setErr(err) break } } - var err error if ask { pipe := node.Client.Pipeline() _ = pipe.Process(NewCmd("ASKING")) @@ -780,7 +786,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { // If slave is loading - pick another node. if c.opt.ReadOnly && internal.IsLoadingError(err) { - node.MarkAsLoading() + node.MarkAsFailing() node = nil continue } @@ -807,11 +813,9 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { continue } - // Second try random node. - node, err = c.nodes.Random() - if err != nil { - break - } + // Second try another node. + node.MarkAsFailing() + node = nil continue } @@ -1100,17 +1104,20 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { + slot := c.cmdSlot(cmd) + var node *clusterNode var err error if cmdsAreReadOnly { - _, node, err = c.cmdSlotAndNode(cmd) + cmdInfo := c.cmdInfo(cmd.Name()) + node, err = c.cmdNode(cmdInfo, slot) } else { - slot := c.cmdSlot(cmd) node, err = state.slotMasterNode(slot) } if err != nil { return err } + cmdsMap.mu.Lock() cmdsMap.m[node] = append(cmdsMap.m[node], cmd) cmdsMap.mu.Unlock() @@ -1162,7 +1169,7 @@ func (c *ClusterClient) pipelineReadCmds( } if c.opt.ReadOnly && internal.IsLoadingError(err) { - node.MarkAsLoading() + node.MarkAsFailing() } else if internal.IsRedisError(err) { continue } @@ -1529,14 +1536,6 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { return info } -func cmdSlot(cmd Cmder, pos int) int { - if pos == 0 { - return hashtag.RandomSlot() - } - firstKey := cmd.stringArg(pos) - return hashtag.Slot(firstKey) -} - func (c *ClusterClient) cmdSlot(cmd Cmder) int { args := cmd.Args() if args[0] == "cluster" && args[1] == "getkeysinslot" { @@ -1547,32 +1546,31 @@ func (c *ClusterClient) cmdSlot(cmd Cmder) int { return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } -func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { +func cmdSlot(cmd Cmder, pos int) int { + if pos == 0 { + return hashtag.RandomSlot() + } + firstKey := cmd.stringArg(pos) + return hashtag.Slot(firstKey) +} + +func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, error) { state, err := c.state.Get() if err != nil { - return 0, nil, err + return nil, err } - cmdInfo := c.cmdInfo(cmd.Name()) - slot := c.cmdSlot(cmd) - if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if c.opt.RouteByLatency { - node, err := state.slotClosestNode(slot) - return slot, node, err + return state.slotClosestNode(slot) } - if c.opt.RouteRandomly { - node := state.slotRandomNode(slot) - return slot, node, nil + return state.slotRandomNode(slot) } - - node, err := state.slotSlaveNode(slot) - return slot, node, err + return state.slotSlaveNode(slot) } - node, err := state.slotMasterNode(slot) - return slot, node, err + return state.slotMasterNode(slot) } func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {