Rework pipeline retrying

This commit is contained in:
Vladimir Mihailenco 2017-08-31 15:22:47 +03:00
parent 0daeac9c3e
commit dbd2c99ba9
13 changed files with 388 additions and 256 deletions

View File

@ -14,8 +14,8 @@ import (
"github.com/go-redis/redis/internal/proto"
)
var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
@ -64,6 +64,19 @@ func (opt *ClusterOptions) init() {
opt.ReadOnly = true
}
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
case 0:
opt.ReadTimeout = 3 * time.Second
}
switch opt.WriteTimeout {
case -1:
opt.WriteTimeout = 0
case 0:
opt.WriteTimeout = opt.ReadTimeout
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
@ -192,6 +205,19 @@ func (c *clusterNodes) Close() error {
return firstErr
}
func (c *clusterNodes) Err() error {
c.mu.RLock()
defer c.mu.RUnlock()
if c.closed {
return pool.ErrClosed
}
if len(c.addrs) == 0 {
return errClusterNoNodes
}
return nil
}
func (c *clusterNodes) NextGeneration() uint32 {
c.generation++
return c.generation
@ -468,13 +494,22 @@ func (c *ClusterClient) Options() *ClusterOptions {
return c.opt
}
func (c *ClusterClient) state() *clusterState {
func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
func (c *ClusterClient) state() (*clusterState, error) {
v := c._state.Load()
if v != nil {
return v.(*clusterState)
return v.(*clusterState), nil
}
if err := c.nodes.Err(); err != nil {
return nil, err
}
c.lazyReloadState()
return nil
return nil, errNilClusterState
}
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
@ -495,15 +530,20 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
if err != nil {
return nil
}
return c.cmdsInfo[name]
info := c.cmdsInfo[name]
if info == nil {
internal.Logf("info for cmd=%s not found", name)
}
return info
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
cmdInfo := c.cmdInfo(cmd.Name())
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
return hashtag.Slot(firstKey)
}
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
if state == nil {
node, err := c.nodes.Random()
return 0, node, err
}
cmdInfo := c.cmdInfo(cmd.Name())
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
slot := hashtag.Slot(firstKey)
@ -523,19 +563,51 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
state := c.state()
var node *clusterNode
var err error
if state != nil && len(keys) > 0 {
node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
} else {
node, err = c.nodes.Random()
if len(keys) == 0 {
return fmt.Errorf("redis: keys don't hash to the same slot")
}
state, err := c.state()
if err != nil {
return err
}
return node.Client.Watch(fn, keys...)
slot := hashtag.Slot(keys[0])
for _, key := range keys[1:] {
if hashtag.Slot(key) != slot {
return fmt.Errorf("redis: Watch requires all keys to be in the same slot")
}
}
node, err := state.slotMasterNode(slot)
if err != nil {
return err
}
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
err = node.Client.Watch(fn, keys...)
if err == nil {
break
}
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
c.lazyReloadState()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
continue
}
return err
}
return err
}
// Close closes the cluster client, releasing any open resources.
@ -547,7 +619,13 @@ func (c *ClusterClient) Close() error {
}
func (c *ClusterClient) Process(cmd Cmder) error {
slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
state, err := c.state()
if err != nil {
cmd.setErr(err)
return err
}
_, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
return err
@ -556,7 +634,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(node.Client.retryBackoff(attempt))
time.Sleep(c.retryBackoff(attempt))
}
if ask {
@ -572,7 +650,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// If there is no error - we are done.
if err == nil {
return nil
break
}
// If slave is loading - read from master.
@ -582,12 +660,11 @@ func (c *ClusterClient) Process(cmd Cmder) error {
continue
}
// On network errors try random node.
if internal.IsRetryableError(err) || internal.IsClusterDownError(err) {
node, err = c.nodes.Random()
if err != nil {
cmd.setErr(err)
return err
if internal.IsRetryableError(err) {
var nodeErr error
node, nodeErr = c.nodes.Random()
if nodeErr != nil {
break
}
continue
}
@ -596,20 +673,13 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
state := c.state()
if state != nil && slot >= 0 {
master, _ := state.slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) {
c.lazyReloadState()
}
}
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
cmd.setErr(err)
return err
var nodeErr error
node, nodeErr = c.nodes.GetOrCreate(addr)
if nodeErr != nil {
break
}
continue
}
@ -622,9 +692,9 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
state, err := c.state()
if err != nil {
return err
}
var wg sync.WaitGroup
@ -655,9 +725,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
state, err := c.state()
if err != nil {
return err
}
var wg sync.WaitGroup
@ -688,9 +758,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
state, err := c.state()
if err != nil {
return err
}
var wg sync.WaitGroup
@ -728,7 +798,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
state := c.state()
state, _ := c.state()
if state == nil {
return &acc
}
@ -762,10 +832,8 @@ func (c *ClusterClient) lazyReloadState() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
var state *clusterState
for {
var err error
state, err = c.reloadState()
state, err := c.reloadState()
if err == pool.ErrClosed {
return
}
@ -776,11 +844,10 @@ func (c *ClusterClient) lazyReloadState() {
}
c._state.Store(state)
time.Sleep(5 * time.Second)
c.nodes.GC(state.generation)
break
}
time.Sleep(3 * time.Second)
c.nodes.GC(state.generation)
}()
}
@ -843,10 +910,15 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
return err
}
for i := 0; i <= c.opt.MaxRedirects; i++ {
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
@ -856,8 +928,12 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
continue
}
err = c.pipelineProcessCmds(cn, cmds, failedCmds)
node.Client.releaseConn(cn, err)
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) {
_ = node.Client.connPool.Put(cn)
} else {
_ = node.Client.connPool.Remove(cn)
}
}
if len(failedCmds) == 0 {
@ -866,21 +942,20 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap = failedCmds
}
var firstErr error
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
firstErr = err
break
}
}
return firstErr
return firstCmdsErr(cmds)
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
state := c.state()
state, err := c.state()
if err != nil {
setCmdsErr(cmds, err)
return nil, err
}
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
_, node, err := c.cmdSlotAndNode(state, cmd)
slot := c.cmdSlot(cmd)
node, err := state.slotMasterNode(slot)
if err != nil {
return nil, err
}
@ -890,11 +965,12 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
}
func (c *ClusterClient) pipelineProcessCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}
@ -907,46 +983,53 @@ func (c *ClusterClient) pipelineProcessCmds(
func (c *ClusterClient) pipelineReadCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error
for _, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}
if firstErr == nil {
firstErr = err
if c.checkMovedErr(cmd, err, failedCmds) {
continue
}
err = c.checkMovedErr(cmd, failedCmds)
if err != nil && firstErr == nil {
firstErr = err
if internal.IsRedisError(err) {
continue
}
return err
}
return firstErr
return nil
}
func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
moved, ask, addr := internal.IsMovedError(cmd.Err())
func (c *ClusterClient) checkMovedErr(
cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
) bool {
moved, ask, addr := internal.IsMovedError(err)
if moved {
c.lazyReloadState()
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
return false
}
failedCmds[node] = append(failedCmds[node], cmd)
return true
}
if ask {
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
return false
}
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
return true
}
return nil
return false
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
@ -963,25 +1046,25 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
}
func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsBySlot(cmds)
state, err := c.state()
if err != nil {
return err
}
state := c.state()
if state == nil {
return errNilClusterState
}
cmdsMap := c.mapCmdsBySlot(cmds)
for slot, cmds := range cmdsMap {
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
}
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for i := 0; i <= c.opt.MaxRedirects; i++ {
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
@ -992,7 +1075,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.releaseConn(cn, err)
if err == nil || internal.IsRedisError(err) {
_ = node.Client.connPool.Put(cn)
} else {
_ = node.Client.connPool.Remove(cn)
}
}
if len(failedCmds) == 0 {
@ -1002,27 +1089,16 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
}
}
var firstErr error
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
firstErr = err
break
}
}
return firstErr
return firstCmdsErr(cmds)
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) {
state := c.state()
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
for _, cmd := range cmds {
slot, _, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
return nil, err
}
slot := c.cmdSlot(cmd)
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
return cmdsMap, nil
return cmdsMap
}
func (c *ClusterClient) txPipelineProcessCmds(
@ -1039,22 +1115,20 @@ func (c *ClusterClient) txPipelineProcessCmds(
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
setCmdsErr(cmds, err)
return err
}
_, err := pipelineReadCmds(cn, cmds)
return err
return pipelineReadCmds(cn, cmds)
}
func (c *ClusterClient) txPipelineReadQueued(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error
// Parse queued replies.
var statusCmd StatusCmd
if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
firstErr = err
if err := statusCmd.readReply(cn); err != nil {
return err
}
for _, cmd := range cmds {
@ -1063,15 +1137,11 @@ func (c *ClusterClient) txPipelineReadQueued(
continue
}
cmd.setErr(err)
if firstErr == nil {
firstErr = err
if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
continue
}
err = c.checkMovedErr(cmd, failedCmds)
if err != nil && firstErr == nil {
firstErr = err
}
return err
}
// Parse number of replies.
@ -1085,7 +1155,13 @@ func (c *ClusterClient) txPipelineReadQueued(
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
err := proto.ParseErrorReply(line)
for _, cmd := range cmds {
if !c.checkMovedErr(cmd, err, failedCmds) {
break
}
}
return err
case proto.ArrayReply:
// ok
default:
@ -1093,7 +1169,7 @@ func (c *ClusterClient) txPipelineReadQueued(
return err
}
return firstErr
return nil
}
func (c *ClusterClient) pubSub(channels []string) *PubSub {
@ -1112,7 +1188,12 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
slot = -1
}
masterNode, err := c.state().slotMasterNode(slot)
state, err := c.state()
if err != nil {
return nil, err
}
masterNode, err := state.slotMasterNode(slot)
if err != nil {
return nil, err
}

View File

@ -200,7 +200,7 @@ var _ = Describe("ClusterClient", func() {
Eventually(func() string {
return client.Get("A").Val()
}).Should(Equal("VALUE"))
}, 30*time.Second).Should(Equal("VALUE"))
cnt, err := client.Del("A").Result()
Expect(err).NotTo(HaveOccurred())
@ -215,7 +215,7 @@ var _ = Describe("ClusterClient", func() {
Eventually(func() string {
return client.Get("A").Val()
}).Should(Equal("VALUE"))
}, 30*time.Second).Should(Equal("VALUE"))
})
It("distributes keys", func() {
@ -227,7 +227,7 @@ var _ = Describe("ClusterClient", func() {
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
}, 5*time.Second).Should(Or(
}, 30*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
@ -251,7 +251,7 @@ var _ = Describe("ClusterClient", func() {
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
}, 5*time.Second).Should(Or(
}, 30*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
@ -320,10 +320,6 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
if opt.RouteByLatency {
return
}
for _, key := range keys {
slot := hashtag.Slot(key)
client.SwapSlotNodes(slot)
@ -432,6 +428,9 @@ var _ = Describe("ClusterClient", func() {
})
AfterEach(func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})
@ -560,6 +559,9 @@ var _ = Describe("ClusterClient", func() {
})
AfterEach(func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})
@ -575,10 +577,19 @@ var _ = Describe("ClusterClient", func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
_ = client.ForEachSlave(func(slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize().Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
})
AfterEach(func() {
client.FlushDB()
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})
@ -597,7 +608,7 @@ var _ = Describe("ClusterClient without nodes", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("returns an error", func() {
It("Ping returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
@ -626,7 +637,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
It("returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
Expect(err).To(MatchError("redis: cannot load cluster slots"))
})
It("pipeline returns an error", func() {
@ -634,7 +645,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
Expect(err).To(MatchError("redis: cannot load cluster slots"))
})
})
@ -664,7 +675,7 @@ var _ = Describe("ClusterClient timeout", func() {
It("Tx timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
return tx.Ping().Err()
})
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
@ -676,42 +687,20 @@ var _ = Describe("ClusterClient timeout", func() {
return nil
})
return err
})
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
}
Context("read timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = -1
client = cluster.clusterClient(opt)
})
testTimeout()
})
Context("write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = -1
client = cluster.clusterClient(opt)
})
testTimeout()
})
Context("ClientPause timeout", func() {
const pause = time.Second
Context("read/write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = pause / 10
opt.WriteTimeout = pause / 10
opt.MaxRedirects = -1
opt.ReadTimeout = 100 * time.Millisecond
opt.WriteTimeout = 100 * time.Millisecond
opt.MaxRedirects = 1
client = cluster.clusterClient(opt)
err := client.ForEachNode(func(client *redis.Client) error {

View File

@ -46,8 +46,19 @@ type Cmder interface {
func setCmdsErr(cmds []Cmder, e error) {
for _, cmd := range cmds {
if cmd.Err() == nil {
cmd.setErr(e)
}
}
}
func firstCmdsErr(cmds []Cmder) error {
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
}
}
return nil
}
func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
@ -95,7 +106,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
return 1
}
if info == nil {
internal.Logf("info for cmd=%s not found", cmd.Name())
return -1
}
return int(info.FirstKeyPos)

View File

@ -27,11 +27,21 @@ var _ = Describe("Commands", func() {
Describe("server", func() {
It("should Auth", func() {
_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Auth("password")
pipe.Auth("")
return nil
})
Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
stats := client.Pool().Stats()
Expect(stats.Requests).To(Equal(uint32(2)))
Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
Expect(stats.TotalConns).To(Equal(uint32(1)))
Expect(stats.FreeConns).To(Equal(uint32(1)))
})
It("should Echo", func() {
@ -187,6 +197,29 @@ var _ = Describe("Commands", func() {
Expect(tm).To(BeTemporally("~", time.Now(), 3*time.Second))
})
It("Should Command", func() {
cmds, err := client.Command().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(cmds)).To(BeNumerically("~", 180, 10))
cmd := cmds["mget"]
Expect(cmd.Name).To(Equal("mget"))
Expect(cmd.Arity).To(Equal(int8(-2)))
Expect(cmd.Flags).To(ContainElement("readonly"))
Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
Expect(cmd.StepCount).To(Equal(int8(1)))
cmd = cmds["ping"]
Expect(cmd.Name).To(Equal("ping"))
Expect(cmd.Arity).To(Equal(int8(-1)))
Expect(cmd.Flags).To(ContainElement("stale"))
Expect(cmd.Flags).To(ContainElement("fast"))
Expect(cmd.FirstKeyPos).To(Equal(int8(0)))
Expect(cmd.LastKeyPos).To(Equal(int8(0)))
Expect(cmd.StepCount).To(Equal(int8(0)))
})
})
Describe("debugging", func() {
@ -2887,24 +2920,6 @@ var _ = Describe("Commands", func() {
})
Describe("Command", func() {
It("returns map of commands", func() {
cmds, err := client.Command().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(cmds)).To(BeNumerically("~", 180, 10))
cmd := cmds["mget"]
Expect(cmd.Name).To(Equal("mget"))
Expect(cmd.Arity).To(Equal(int8(-2)))
Expect(cmd.Flags).To(ContainElement("readonly"))
Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
Expect(cmd.StepCount).To(Equal(int8(1)))
})
})
Describe("Eval", func() {
It("returns keys and values", func() {

View File

@ -20,8 +20,13 @@ func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error)
}
func (c *ClusterClient) SlotAddrs(slot int) []string {
state, err := c.state()
if err != nil {
panic(err)
}
var addrs []string
for _, n := range c.state().slotNodes(slot) {
for _, n := range state.slotNodes(slot) {
addrs = append(addrs, n.Client.getAddr())
}
return addrs
@ -29,7 +34,12 @@ func (c *ClusterClient) SlotAddrs(slot int) []string {
// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
func (c *ClusterClient) SwapSlotNodes(slot int) {
nodes := c.state().slots[slot]
state, err := c.state()
if err != nil {
panic(err)
}
nodes := state.slots[slot]
if len(nodes) == 2 {
nodes[0], nodes[1] = nodes[1], nodes[0]
}

View File

@ -13,10 +13,23 @@ type RedisError string
func (e RedisError) Error() string { return string(e) }
func IsRetryableError(err error) bool {
return IsNetworkError(err) || err.Error() == "ERR max number of clients reached"
if IsNetworkError(err) {
return true
}
s := err.Error()
if s == "ERR max number of clients reached" {
return true
}
if strings.HasPrefix(s, "LOADING ") {
return true
}
if strings.HasPrefix(s, "CLUSTERDOWN ") {
return true
}
return false
}
func IsInternalError(err error) bool {
func IsRedisError(err error) bool {
_, ok := err.(RedisError)
return ok
}
@ -33,7 +46,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
if IsInternalError(err) {
if IsRedisError(err) {
return false
}
if allowTimeout {
@ -45,7 +58,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
}
func IsMovedError(err error) (moved bool, ask bool, addr string) {
if !IsInternalError(err) {
if !IsRedisError(err) {
return
}
@ -69,7 +82,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
func IsClusterDownError(err error) bool {
return strings.HasPrefix(err.Error(), "CLUSTERDOWN ")
}

View File

@ -63,7 +63,7 @@ func (p *Reader) ReadLine() ([]byte, error) {
return nil, bufio.ErrBufferFull
}
if len(line) == 0 {
return nil, internal.RedisError("redis: reply is empty")
return nil, fmt.Errorf("redis: reply is empty")
}
if isNilReply(line) {
return nil, internal.Nil

View File

@ -11,7 +11,7 @@ import (
func Scan(b []byte, v interface{}) error {
switch v := v.(type) {
case nil:
return internal.RedisError("redis: Scan(nil)")
return fmt.Errorf("redis: Scan(nil)")
case *string:
*v = internal.BytesToString(b)
return nil

View File

@ -3,6 +3,7 @@ package redis_test
import (
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
@ -51,7 +52,7 @@ var cluster = &clusterScenario{
}
func init() {
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
}
var _ = BeforeSuite(func() {

View File

@ -95,7 +95,10 @@ func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
}
func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
if internal.IsBadConn(err, true) && c.cn == cn {
if c.cn != cn {
return
}
if internal.IsBadConn(err, true) {
_ = c.closeTheCn()
}
}

View File

@ -197,8 +197,11 @@ type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
return func(cmds []Cmder) error {
var firstErr error
for i := 0; i <= c.opt.MaxRetries; i++ {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
cn, _, err := c.getConn()
if err != nil {
setCmdsErr(cmds, err)
@ -206,18 +209,18 @@ func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
}
canRetry, err := p(cn, cmds)
c.releaseConn(cn, err)
if err == nil {
return nil
}
if firstErr == nil {
firstErr = err
if err == nil || internal.IsRedisError(err) {
_ = c.connPool.Put(cn)
break
}
_ = c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err) {
break
}
}
return firstErr
return firstCmdsErr(cmds)
}
}
@ -230,23 +233,17 @@ func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, err
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
return pipelineReadCmds(cn, cmds)
return true, pipelineReadCmds(cn, cmds)
}
func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
for i, cmd := range cmds {
func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
for _, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}
if i == 0 {
retry = true
}
if firstErr == nil {
firstErr = err
if err != nil && !internal.IsRedisError(err) {
return err
}
}
return
return nil
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@ -260,11 +257,11 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds); err != nil {
setCmdsErr(cmds, err)
return false, err
}
_, err := pipelineReadCmds(cn, cmds)
return false, err
return false, pipelineReadCmds(cn, cmds)
}
func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
@ -276,21 +273,16 @@ func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
}
func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
var firstErr error
// Parse queued replies.
var statusCmd StatusCmd
if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
firstErr = err
if err := statusCmd.readReply(cn); err != nil {
return err
}
for _, cmd := range cmds {
for _ = range cmds {
err := statusCmd.readReply(cn)
if err != nil {
cmd.setErr(err)
if firstErr == nil {
firstErr = err
}
if err != nil && !internal.IsRedisError(err) {
return err
}
}

53
ring.go
View File

@ -35,6 +35,8 @@ type RingOptions struct {
Password string
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
@ -50,6 +52,19 @@ func (opt *RingOptions) init() {
if opt.HeartbeatFrequency == 0 {
opt.HeartbeatFrequency = 500 * time.Millisecond
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
case 0:
opt.MinRetryBackoff = 8 * time.Millisecond
}
switch opt.MaxRetryBackoff {
case -1:
opt.MaxRetryBackoff = 0
case 0:
opt.MaxRetryBackoff = 512 * time.Millisecond
}
}
func (opt *RingOptions) clientOptions() *Options {
@ -165,6 +180,10 @@ func (c *Ring) Options() *RingOptions {
return c.opt
}
func (c *Ring) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
// PoolStats returns accumulated connection pool stats.
func (c *Ring) PoolStats() *PoolStats {
var acc PoolStats
@ -241,6 +260,9 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
func (c *Ring) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error {
c.mu.RLock()
defer c.mu.RUnlock()
var firstErr error
for _, shard := range c.shards {
cmdsInfo, err := shard.Client.Command().Result()
@ -257,7 +279,11 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
if err != nil {
return nil
}
return c.cmdsInfo[name]
info := c.cmdsInfo[name]
if info == nil {
internal.Logf("info for cmd=%s not found", name)
}
return info
}
func (c *Ring) addClient(name string, cl *Client) {
@ -399,7 +425,7 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}
func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
func (c *Ring) pipelineExec(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())
@ -410,36 +436,33 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
cmdsMap[name] = append(cmdsMap[name], cmd)
}
for i := 0; i <= c.opt.MaxRetries; i++ {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
var failedCmdsMap map[string][]Cmder
for name, cmds := range cmdsMap {
shard, err := c.shardByName(name)
if err != nil {
setCmdsErr(cmds, err)
if firstErr == nil {
firstErr = err
}
continue
}
cn, _, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
if firstErr == nil {
firstErr = err
}
continue
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
shard.Client.releaseConn(cn, err)
if err == nil {
if err == nil || internal.IsRedisError(err) {
_ = shard.Client.connPool.Put(cn)
continue
}
if firstErr == nil {
firstErr = err
}
_ = shard.Client.connPool.Remove(cn)
if canRetry && internal.IsRetryableError(err) {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
@ -454,5 +477,5 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
cmdsMap = failedCmdsMap
}
return firstErr
return firstCmdsErr(cmds)
}

13
tx.go
View File

@ -36,11 +36,10 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
return err
}
}
firstErr := fn(tx)
if err := tx.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
err := fn(tx)
_ = tx.Close()
return err
}
// close closes the transaction, releasing any open resources.
@ -53,7 +52,7 @@ func (c *Tx) Close() error {
// of a transaction.
func (c *Tx) Watch(keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "WATCH"
args[0] = "watch"
for i, key := range keys {
args[1+i] = key
}
@ -65,7 +64,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd {
// Unwatch flushes all the previously watched keys for a transaction.
func (c *Tx) Unwatch(keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "UNWATCH"
args[0] = "unwatch"
for i, key := range keys {
args[1+i] = key
}