Rework retrying

This commit is contained in:
Vladimir Mihailenco 2018-02-15 13:00:54 +02:00
parent 4598ed0eac
commit fa7f64f7f2
4 changed files with 219 additions and 129 deletions

View File

@ -13,10 +13,10 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
@ -122,7 +122,7 @@ type clusterNode struct {
Client *Client
latency uint32 // atomic
generation uint32
generation uint32 // atomic
loading int64 // atomic
}
@ -141,6 +141,14 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
return &node
}
func (n *clusterNode) Close() error {
return n.Client.Close()
}
func (n *clusterNode) Test() error {
return n.Client.ClusterInfo().Err()
}
func (n *clusterNode) updateLatency() {
const probes = 10
@ -154,14 +162,6 @@ func (n *clusterNode) updateLatency() {
atomic.StoreUint32(&n.latency, latency)
}
func (n *clusterNode) Close() error {
return n.Client.Close()
}
func (n *clusterNode) Test() error {
return n.Client.ClusterInfo().Err()
}
func (n *clusterNode) Latency() time.Duration {
latency := atomic.LoadUint32(&n.latency)
return time.Duration(latency) * time.Microsecond
@ -186,14 +186,16 @@ func (n *clusterNode) Loading() bool {
}
func (n *clusterNode) Generation() uint32 {
return n.generation
return atomic.LoadUint32(&n.generation)
}
func (n *clusterNode) SetGeneration(gen uint32) {
if gen < n.generation {
panic("gen < n.generation")
for {
v := atomic.LoadUint32(&n.generation)
if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
break
}
}
n.generation = gen
}
//------------------------------------------------------------------------------
@ -202,16 +204,21 @@ type clusterNodes struct {
opt *ClusterOptions
mu sync.RWMutex
allAddrs []string
addrs []string
nodes map[string]*clusterNode
closed bool
nodeCreateGroup singleflight.Group
generation uint32
}
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
return &clusterNodes{
opt: opt,
allAddrs: opt.Addrs,
nodes: make(map[string]*clusterNode),
}
}
@ -231,6 +238,7 @@ func (c *clusterNodes) Close() error {
firstErr = err
}
}
c.addrs = nil
c.nodes = nil
@ -238,9 +246,16 @@ func (c *clusterNodes) Close() error {
}
func (c *clusterNodes) Addrs() ([]string, error) {
var addrs []string
c.mu.RLock()
closed := c.closed
addrs := c.addrs
if !closed {
if len(c.addrs) > 0 {
addrs = c.addrs
} else {
addrs = c.allAddrs
}
}
c.mu.RUnlock()
if closed {
@ -310,6 +325,14 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil
}
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
return node, node.Test()
})
if err != nil {
return nil, err
}
c.mu.Lock()
defer c.mu.Unlock()
@ -319,12 +342,15 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok = c.nodes[addr]
if ok {
_ = v.(*clusterNode).Close()
return node, nil
}
node = v.(*clusterNode)
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
c.addrs = append(c.addrs, addr)
node = newClusterNode(c.opt, addr)
c.nodes[addr] = node
return node, nil
}
@ -334,20 +360,8 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
return nil, err
}
var nodeErr error
for i := 0; i <= c.opt.MaxRedirects; i++ {
n := rand.Intn(len(addrs))
node, err := c.GetOrCreate(addrs[n])
if err != nil {
return nil, err
}
nodeErr = node.Test()
if nodeErr == nil {
return node, nil
}
}
return nil, nodeErr
return c.GetOrCreate(addrs[n])
}
//------------------------------------------------------------------------------
@ -472,7 +486,10 @@ type ClusterClient struct {
opt *ClusterOptions
nodes *clusterNodes
_state atomic.Value
stateErrMu sync.RWMutex
stateErr error
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
@ -501,20 +518,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.cmdable.setProcessor(c.Process)
// Add initial nodes.
for _, addr := range opt.Addrs {
_, _ = c.nodes.GetOrCreate(addr)
}
// Preload cluster slots.
for i := 0; i < 10; i++ {
state, err := c.reloadState()
if err == nil {
c._state.Store(state)
break
}
}
c.reloadState()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@ -531,21 +535,6 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
func (c *ClusterClient) state() (*clusterState, error) {
v := c._state.Load()
if v != nil {
return v.(*clusterState), nil
}
_, err := c.nodes.Addrs()
if err != nil {
return nil, err
}
c.lazyReloadState()
return nil, errNilClusterState
}
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error {
node, err := c.nodes.Random()
@ -584,7 +573,12 @@ func (c *ClusterClient) cmdSlot(cmd Cmder) int {
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
state, err := c.state()
if err != nil {
return 0, nil, err
}
cmdInfo := c.cmdInfo(cmd.Name())
slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
@ -602,16 +596,24 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
return slot, node, err
}
func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
state, err := c.state()
if err != nil {
return nil, err
}
nodes := state.slotNodes(slot)
if len(nodes) > 0 {
return nodes[0], nil
}
return c.nodes.Random()
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 {
return fmt.Errorf("redis: keys don't hash to the same slot")
}
state, err := c.state()
if err != nil {
return err
}
slot := hashtag.Slot(keys[0])
for _, key := range keys[1:] {
if hashtag.Slot(key) != slot {
@ -619,7 +621,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
}
}
node, err := state.slotMasterNode(slot)
node, err := c.slotMasterNode(slot)
if err != nil {
return err
}
@ -649,10 +651,11 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
}
if err == pool.ErrClosed {
node, err = state.slotMasterNode(slot)
node, err = c.slotMasterNode(slot)
if err != nil {
return err
}
continue
}
return err
@ -683,13 +686,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
}
func (c *ClusterClient) defaultProcess(cmd Cmder) error {
state, err := c.state()
if err != nil {
cmd.setErr(err)
return err
}
_, node, err := c.cmdSlotAndNode(state, cmd)
_, node, err := c.cmdSlotAndNode(cmd)
if err != nil {
cmd.setErr(err)
return err
@ -747,11 +744,12 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
}
if err == pool.ErrClosed {
_, node, err = c.cmdSlotAndNode(state, cmd)
_, node, err = c.cmdSlotAndNode(cmd)
if err != nil {
cmd.setErr(err)
return err
break
}
continue
}
break
@ -903,31 +901,37 @@ func (c *ClusterClient) lazyReloadState() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
for {
state, err := c.reloadState()
if err == pool.ErrClosed {
return
}
if err != nil {
time.Sleep(time.Millisecond)
continue
}
c._state.Store(state)
time.Sleep(5 * time.Second)
c.nodes.GC(state.generation)
break
if c.reloadState() {
time.Sleep(time.Second)
}
atomic.StoreUint32(&c.reloading, 0)
}()
}
// Not thread-safe.
func (c *ClusterClient) reloadState() (*clusterState, error) {
func (c *ClusterClient) reloadState() bool {
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
state, err := c.loadState()
if err == nil {
c._state.Store(state)
c.nodes.GC(state.generation)
return true
}
c.setStateErr(err)
switch err {
case pool.ErrClosed, errClusterNoNodes:
return false
}
}
return false
}
func (c *ClusterClient) loadState() (*clusterState, error) {
node, err := c.nodes.Random()
if err != nil {
return nil, err
@ -941,6 +945,27 @@ func (c *ClusterClient) reloadState() (*clusterState, error) {
return newClusterState(c.nodes, slots, node.Client.opt.Addr)
}
func (c *ClusterClient) state() (*clusterState, error) {
v := c._state.Load()
if v != nil {
return v.(*clusterState), nil
}
return nil, c.getStateErr()
}
func (c *ClusterClient) setStateErr(err error) {
c.stateErrMu.Lock()
c.stateErr = err
c.stateErrMu.Unlock()
}
func (c *ClusterClient) getStateErr() error {
c.stateErrMu.RLock()
err := c.stateErr
c.stateErrMu.RUnlock()
return err
}
// reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
ticker := time.NewTicker(idleCheckFrequency)
@ -1055,15 +1080,17 @@ func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cm
func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
_ = cn.SetWriteTimeout(c.opt.WriteTimeout)
err := writeCmd(cn, cmds...)
if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
_ = cn.SetReadTimeout(c.opt.ReadTimeout)
return c.pipelineReadCmds(cn, cmds, failedCmds)
}
@ -1280,12 +1307,7 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
slot = -1
}
state, err := c.state()
if err != nil {
return nil, err
}
masterNode, err := state.slotMasterNode(slot)
masterNode, err := c.slotMasterNode(slot)
if err != nil {
return nil, err
}

View File

@ -190,13 +190,11 @@ var _ = Describe("ClusterClient", func() {
assertClusterClient := func() {
It("should GET/SET/DEL", func() {
val, err := client.Get("A").Result()
err := client.Get("A").Err()
Expect(err).To(Equal(redis.Nil))
Expect(val).To(Equal(""))
val, err = client.Set("A", "VALUE", 0).Result()
err = client.Set("A", "VALUE", 0).Err()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("OK"))
Eventually(func() string {
return client.Get("A").Val()
@ -295,9 +293,9 @@ var _ = Describe("ClusterClient", func() {
}
wg.Wait()
n, err := client.Get("key").Int64()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(100)))
Eventually(func() string {
return client.Get("key").Val()
}, 30*time.Second).Should(Equal("100"))
})
Describe("pipelining", func() {
@ -578,12 +576,13 @@ var _ = Describe("ClusterClient", func() {
opt = redisClusterOptions()
client = cluster.clusterClient(opt)
_ = client.ForEachSlave(func(slave *redis.Client) error {
defer GinkgoRecover()
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
_ = client.ForEachSlave(func(slave *redis.Client) error {
defer GinkgoRecover()
Eventually(func() int64 {
return slave.DBSize().Val()
}, 30*time.Second).Should(Equal(int64(0)))
@ -670,7 +669,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
It("returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("redis: cannot load cluster slots"))
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
It("pipeline returns an error", func() {
@ -678,7 +677,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("redis: cannot load cluster slots"))
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
})
@ -743,7 +742,8 @@ var _ = Describe("ClusterClient timeout", func() {
})
AfterEach(func() {
client.ForEachNode(func(client *redis.Client) error {
_ = client.ForEachNode(func(client *redis.Client) error {
defer GinkgoRecover()
Eventually(func() error {
return client.Ping().Err()
}, 2*pause).ShouldNot(HaveOccurred())

View File

@ -42,6 +42,10 @@ func IsNetworkError(err error) bool {
return ok
}
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}
func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false

View File

@ -0,0 +1,64 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}