diff --git a/cluster.go b/cluster.go index 647a25be..72bace76 100644 --- a/cluster.go +++ b/cluster.go @@ -14,8 +14,8 @@ import ( "github.com/go-redis/redis/internal/proto" ) -var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes") -var errNilClusterState = internal.RedisError("redis: cannot load cluster slots") +var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") +var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots") // ClusterOptions are used to configure a cluster client and should be // passed to NewClusterClient. @@ -64,6 +64,19 @@ func (opt *ClusterOptions) init() { opt.ReadOnly = true } + switch opt.ReadTimeout { + case -1: + opt.ReadTimeout = 0 + case 0: + opt.ReadTimeout = 3 * time.Second + } + switch opt.WriteTimeout { + case -1: + opt.WriteTimeout = 0 + case 0: + opt.WriteTimeout = opt.ReadTimeout + } + switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -192,6 +205,19 @@ func (c *clusterNodes) Close() error { return firstErr } +func (c *clusterNodes) Err() error { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.closed { + return pool.ErrClosed + } + if len(c.addrs) == 0 { + return errClusterNoNodes + } + return nil +} + func (c *clusterNodes) NextGeneration() uint32 { c.generation++ return c.generation @@ -468,13 +494,22 @@ func (c *ClusterClient) Options() *ClusterOptions { return c.opt } -func (c *ClusterClient) state() *clusterState { +func (c *ClusterClient) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +func (c *ClusterClient) state() (*clusterState, error) { v := c._state.Load() if v != nil { - return v.(*clusterState) + return v.(*clusterState), nil } + + if err := c.nodes.Err(); err != nil { + return nil, err + } + c.lazyReloadState() - return nil + return nil, errNilClusterState } func (c *ClusterClient) cmdInfo(name string) *CommandInfo { @@ -495,15 +530,20 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { if err != nil { return nil } - return c.cmdsInfo[name] + info := c.cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info +} + +func (c *ClusterClient) cmdSlot(cmd Cmder) int { + cmdInfo := c.cmdInfo(cmd.Name()) + firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + return hashtag.Slot(firstKey) } func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { - if state == nil { - node, err := c.nodes.Random() - return 0, node, err - } - cmdInfo := c.cmdInfo(cmd.Name()) firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) slot := hashtag.Slot(firstKey) @@ -523,19 +563,51 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl } func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { - state := c.state() - - var node *clusterNode - var err error - if state != nil && len(keys) > 0 { - node, err = state.slotMasterNode(hashtag.Slot(keys[0])) - } else { - node, err = c.nodes.Random() + if len(keys) == 0 { + return fmt.Errorf("redis: keys don't hash to the same slot") } + + state, err := c.state() if err != nil { return err } - return node.Client.Watch(fn, keys...) + + slot := hashtag.Slot(keys[0]) + for _, key := range keys[1:] { + if hashtag.Slot(key) != slot { + return fmt.Errorf("redis: Watch requires all keys to be in the same slot") + } + } + + node, err := state.slotMasterNode(slot) + if err != nil { + return err + } + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + err = node.Client.Watch(fn, keys...) + if err == nil { + break + } + + moved, ask, addr := internal.IsMovedError(err) + if moved || ask { + c.lazyReloadState() + node, err = c.nodes.GetOrCreate(addr) + if err != nil { + return err + } + continue + } + + return err + } + + return err } // Close closes the cluster client, releasing any open resources. @@ -547,7 +619,13 @@ func (c *ClusterClient) Close() error { } func (c *ClusterClient) Process(cmd Cmder) error { - slot, node, err := c.cmdSlotAndNode(c.state(), cmd) + state, err := c.state() + if err != nil { + cmd.setErr(err) + return err + } + + _, node, err := c.cmdSlotAndNode(state, cmd) if err != nil { cmd.setErr(err) return err @@ -556,7 +634,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { - time.Sleep(node.Client.retryBackoff(attempt)) + time.Sleep(c.retryBackoff(attempt)) } if ask { @@ -572,7 +650,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { // If there is no error - we are done. if err == nil { - return nil + break } // If slave is loading - read from master. @@ -582,12 +660,11 @@ func (c *ClusterClient) Process(cmd Cmder) error { continue } - // On network errors try random node. - if internal.IsRetryableError(err) || internal.IsClusterDownError(err) { - node, err = c.nodes.Random() - if err != nil { - cmd.setErr(err) - return err + if internal.IsRetryableError(err) { + var nodeErr error + node, nodeErr = c.nodes.Random() + if nodeErr != nil { + break } continue } @@ -596,20 +673,13 @@ func (c *ClusterClient) Process(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { - state := c.state() - if state != nil && slot >= 0 { - master, _ := state.slotMasterNode(slot) - if moved && (master == nil || master.Client.getAddr() != addr) { - c.lazyReloadState() - } - } + c.lazyReloadState() - node, err = c.nodes.GetOrCreate(addr) - if err != nil { - cmd.setErr(err) - return err + var nodeErr error + node, nodeErr = c.nodes.GetOrCreate(addr) + if nodeErr != nil { + break } - continue } @@ -622,9 +692,9 @@ func (c *ClusterClient) Process(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -655,9 +725,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -688,9 +758,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -728,7 +798,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { func (c *ClusterClient) PoolStats() *PoolStats { var acc PoolStats - state := c.state() + state, _ := c.state() if state == nil { return &acc } @@ -762,10 +832,8 @@ func (c *ClusterClient) lazyReloadState() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - var state *clusterState for { - var err error - state, err = c.reloadState() + state, err := c.reloadState() if err == pool.ErrClosed { return } @@ -776,11 +844,10 @@ func (c *ClusterClient) lazyReloadState() { } c._state.Store(state) + time.Sleep(5 * time.Second) + c.nodes.GC(state.generation) break } - - time.Sleep(3 * time.Second) - c.nodes.GC(state.generation) }() } @@ -843,10 +910,15 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *ClusterClient) pipelineExec(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { + setCmdsErr(cmds, err) return err } - for i := 0; i <= c.opt.MaxRedirects; i++ { + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { @@ -856,8 +928,12 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { continue } - err = c.pipelineProcessCmds(cn, cmds, failedCmds) - node.Client.releaseConn(cn, err) + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + _ = node.Client.connPool.Put(cn) + } else { + _ = node.Client.connPool.Remove(cn) + } } if len(failedCmds) == 0 { @@ -866,21 +942,20 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { cmdsMap = failedCmds } - var firstErr error - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - firstErr = err - break - } - } - return firstErr + return firstCmdsErr(cmds) } func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { - state := c.state() + state, err := c.state() + if err != nil { + setCmdsErr(cmds, err) + return nil, err + } + cmdsMap := make(map[*clusterNode][]Cmder) for _, cmd := range cmds { - _, node, err := c.cmdSlotAndNode(state, cmd) + slot := c.cmdSlot(cmd) + node, err := state.slotMasterNode(slot) if err != nil { return nil, err } @@ -890,11 +965,12 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e } func (c *ClusterClient) pipelineProcessCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { cn.SetWriteTimeout(c.opt.WriteTimeout) if err := writeCmd(cn, cmds...); err != nil { setCmdsErr(cmds, err) + failedCmds[node] = cmds return err } @@ -907,46 +983,53 @@ func (c *ClusterClient) pipelineProcessCmds( func (c *ClusterClient) pipelineReadCmds( cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error for _, cmd := range cmds { err := cmd.readReply(cn) if err == nil { continue } - if firstErr == nil { - firstErr = err + if c.checkMovedErr(cmd, err, failedCmds) { + continue } - err = c.checkMovedErr(cmd, failedCmds) - if err != nil && firstErr == nil { - firstErr = err + if internal.IsRedisError(err) { + continue } + + return err } - return firstErr + return nil } -func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error { - moved, ask, addr := internal.IsMovedError(cmd.Err()) +func (c *ClusterClient) checkMovedErr( + cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, +) bool { + moved, ask, addr := internal.IsMovedError(err) + if moved { c.lazyReloadState() node, err := c.nodes.GetOrCreate(addr) if err != nil { - return err + return false } failedCmds[node] = append(failedCmds[node], cmd) + return true } + if ask { node, err := c.nodes.GetOrCreate(addr) if err != nil { - return err + return false } failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + return true } - return nil + + return false } // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. @@ -963,25 +1046,25 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { } func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsBySlot(cmds) + state, err := c.state() if err != nil { return err } - state := c.state() - if state == nil { - return errNilClusterState - } - + cmdsMap := c.mapCmdsBySlot(cmds) for slot, cmds := range cmdsMap { node, err := state.slotMasterNode(slot) if err != nil { setCmdsErr(cmds, err) continue } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for i := 0; i <= c.opt.MaxRedirects; i++ { + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { @@ -992,7 +1075,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { } err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.releaseConn(cn, err) + if err == nil || internal.IsRedisError(err) { + _ = node.Client.connPool.Put(cn) + } else { + _ = node.Client.connPool.Remove(cn) + } } if len(failedCmds) == 0 { @@ -1002,27 +1089,16 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { } } - var firstErr error - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - firstErr = err - break - } - } - return firstErr + return firstCmdsErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) { - state := c.state() +func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { cmdsMap := make(map[int][]Cmder) for _, cmd := range cmds { - slot, _, err := c.cmdSlotAndNode(state, cmd) - if err != nil { - return nil, err - } + slot := c.cmdSlot(cmd) cmdsMap[slot] = append(cmdsMap[slot], cmd) } - return cmdsMap, nil + return cmdsMap } func (c *ClusterClient) txPipelineProcessCmds( @@ -1039,22 +1115,20 @@ func (c *ClusterClient) txPipelineProcessCmds( cn.SetReadTimeout(c.opt.ReadTimeout) if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + setCmdsErr(cmds, err) return err } - _, err := pipelineReadCmds(cn, cmds) - return err + return pipelineReadCmds(cn, cmds) } func (c *ClusterClient) txPipelineReadQueued( cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error - // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil && firstErr == nil { - firstErr = err + if err := statusCmd.readReply(cn); err != nil { + return err } for _, cmd := range cmds { @@ -1063,15 +1137,11 @@ func (c *ClusterClient) txPipelineReadQueued( continue } - cmd.setErr(err) - if firstErr == nil { - firstErr = err + if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) { + continue } - err = c.checkMovedErr(cmd, failedCmds) - if err != nil && firstErr == nil { - firstErr = err - } + return err } // Parse number of replies. @@ -1085,7 +1155,13 @@ func (c *ClusterClient) txPipelineReadQueued( switch line[0] { case proto.ErrorReply: - return proto.ParseErrorReply(line) + err := proto.ParseErrorReply(line) + for _, cmd := range cmds { + if !c.checkMovedErr(cmd, err, failedCmds) { + break + } + } + return err case proto.ArrayReply: // ok default: @@ -1093,7 +1169,7 @@ func (c *ClusterClient) txPipelineReadQueued( return err } - return firstErr + return nil } func (c *ClusterClient) pubSub(channels []string) *PubSub { @@ -1112,7 +1188,12 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub { slot = -1 } - masterNode, err := c.state().slotMasterNode(slot) + state, err := c.state() + if err != nil { + return nil, err + } + + masterNode, err := state.slotMasterNode(slot) if err != nil { return nil, err } diff --git a/cluster_test.go b/cluster_test.go index 324bd1ce..b2106da9 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -200,7 +200,7 @@ var _ = Describe("ClusterClient", func() { Eventually(func() string { return client.Get("A").Val() - }).Should(Equal("VALUE")) + }, 30*time.Second).Should(Equal("VALUE")) cnt, err := client.Del("A").Result() Expect(err).NotTo(HaveOccurred()) @@ -215,7 +215,7 @@ var _ = Describe("ClusterClient", func() { Eventually(func() string { return client.Get("A").Val() - }).Should(Equal("VALUE")) + }, 30*time.Second).Should(Equal("VALUE")) }) It("distributes keys", func() { @@ -227,7 +227,7 @@ var _ = Describe("ClusterClient", func() { for _, master := range cluster.masters() { Eventually(func() string { return master.Info("keyspace").Val() - }, 5*time.Second).Should(Or( + }, 30*time.Second).Should(Or( ContainSubstring("keys=31"), ContainSubstring("keys=29"), ContainSubstring("keys=40"), @@ -251,7 +251,7 @@ var _ = Describe("ClusterClient", func() { for _, master := range cluster.masters() { Eventually(func() string { return master.Info("keyspace").Val() - }, 5*time.Second).Should(Or( + }, 30*time.Second).Should(Or( ContainSubstring("keys=31"), ContainSubstring("keys=29"), ContainSubstring("keys=40"), @@ -320,10 +320,6 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) - if opt.RouteByLatency { - return - } - for _, key := range keys { slot := hashtag.Slot(key) client.SwapSlotNodes(slot) @@ -432,6 +428,9 @@ var _ = Describe("ClusterClient", func() { }) AfterEach(func() { + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -560,6 +559,9 @@ var _ = Describe("ClusterClient", func() { }) AfterEach(func() { + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -575,10 +577,19 @@ var _ = Describe("ClusterClient", func() { _ = client.ForEachMaster(func(master *redis.Client) error { return master.FlushDB().Err() }) + + _ = client.ForEachSlave(func(slave *redis.Client) error { + Eventually(func() int64 { + return client.DBSize().Val() + }, 30*time.Second).Should(Equal(int64(0))) + return nil + }) }) AfterEach(func() { - client.FlushDB() + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -597,7 +608,7 @@ var _ = Describe("ClusterClient without nodes", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - It("returns an error", func() { + It("Ping returns an error", func() { err := client.Ping().Err() Expect(err).To(MatchError("redis: cluster has no nodes")) }) @@ -626,7 +637,7 @@ var _ = Describe("ClusterClient without valid nodes", func() { It("returns an error", func() { err := client.Ping().Err() - Expect(err).To(MatchError("ERR This instance has cluster support disabled")) + Expect(err).To(MatchError("redis: cannot load cluster slots")) }) It("pipeline returns an error", func() { @@ -634,7 +645,7 @@ var _ = Describe("ClusterClient without valid nodes", func() { pipe.Ping() return nil }) - Expect(err).To(MatchError("ERR This instance has cluster support disabled")) + Expect(err).To(MatchError("redis: cannot load cluster slots")) }) }) @@ -664,7 +675,7 @@ var _ = Describe("ClusterClient timeout", func() { It("Tx timeouts", func() { err := client.Watch(func(tx *redis.Tx) error { return tx.Ping().Err() - }) + }, "foo") Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) }) @@ -676,42 +687,20 @@ var _ = Describe("ClusterClient timeout", func() { return nil }) return err - }) + }, "foo") Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) }) } - Context("read timeout", func() { + const pause = time.Second + + Context("read/write timeout", func() { BeforeEach(func() { opt := redisClusterOptions() - opt.ReadTimeout = time.Nanosecond - opt.WriteTimeout = -1 - client = cluster.clusterClient(opt) - }) - - testTimeout() - }) - - Context("write timeout", func() { - BeforeEach(func() { - opt := redisClusterOptions() - opt.ReadTimeout = time.Nanosecond - opt.WriteTimeout = -1 - client = cluster.clusterClient(opt) - }) - - testTimeout() - }) - - Context("ClientPause timeout", func() { - const pause = time.Second - - BeforeEach(func() { - opt := redisClusterOptions() - opt.ReadTimeout = pause / 10 - opt.WriteTimeout = pause / 10 - opt.MaxRedirects = -1 + opt.ReadTimeout = 100 * time.Millisecond + opt.WriteTimeout = 100 * time.Millisecond + opt.MaxRedirects = 1 client = cluster.clusterClient(opt) err := client.ForEachNode(func(client *redis.Client) error { diff --git a/command.go b/command.go index 0e5b2016..a796a93f 100644 --- a/command.go +++ b/command.go @@ -46,10 +46,21 @@ type Cmder interface { func setCmdsErr(cmds []Cmder, e error) { for _, cmd := range cmds { - cmd.setErr(e) + if cmd.Err() == nil { + cmd.setErr(e) + } } } +func firstCmdsErr(cmds []Cmder) error { + for _, cmd := range cmds { + if err := cmd.Err(); err != nil { + return err + } + } + return nil +} + func writeCmd(cn *pool.Conn, cmds ...Cmder) error { cn.Wb.Reset() for _, cmd := range cmds { @@ -95,7 +106,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { return 1 } if info == nil { - internal.Logf("info for cmd=%s not found", cmd.Name()) return -1 } return int(info.FirstKeyPos) diff --git a/commands_test.go b/commands_test.go index 4298cba6..0811d12a 100644 --- a/commands_test.go +++ b/commands_test.go @@ -27,11 +27,21 @@ var _ = Describe("Commands", func() { Describe("server", func() { It("should Auth", func() { - _, err := client.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error { pipe.Auth("password") + pipe.Auth("") return nil }) Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set")) + Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) + Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) + + stats := client.Pool().Stats() + Expect(stats.Requests).To(Equal(uint32(2))) + Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Timeouts).To(Equal(uint32(0))) + Expect(stats.TotalConns).To(Equal(uint32(1))) + Expect(stats.FreeConns).To(Equal(uint32(1))) }) It("should Echo", func() { @@ -187,6 +197,29 @@ var _ = Describe("Commands", func() { Expect(tm).To(BeTemporally("~", time.Now(), 3*time.Second)) }) + It("Should Command", func() { + cmds, err := client.Command().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cmds)).To(BeNumerically("~", 180, 10)) + + cmd := cmds["mget"] + Expect(cmd.Name).To(Equal("mget")) + Expect(cmd.Arity).To(Equal(int8(-2))) + Expect(cmd.Flags).To(ContainElement("readonly")) + Expect(cmd.FirstKeyPos).To(Equal(int8(1))) + Expect(cmd.LastKeyPos).To(Equal(int8(-1))) + Expect(cmd.StepCount).To(Equal(int8(1))) + + cmd = cmds["ping"] + Expect(cmd.Name).To(Equal("ping")) + Expect(cmd.Arity).To(Equal(int8(-1))) + Expect(cmd.Flags).To(ContainElement("stale")) + Expect(cmd.Flags).To(ContainElement("fast")) + Expect(cmd.FirstKeyPos).To(Equal(int8(0))) + Expect(cmd.LastKeyPos).To(Equal(int8(0))) + Expect(cmd.StepCount).To(Equal(int8(0))) + }) + }) Describe("debugging", func() { @@ -2887,24 +2920,6 @@ var _ = Describe("Commands", func() { }) - Describe("Command", func() { - - It("returns map of commands", func() { - cmds, err := client.Command().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(cmds)).To(BeNumerically("~", 180, 10)) - - cmd := cmds["mget"] - Expect(cmd.Name).To(Equal("mget")) - Expect(cmd.Arity).To(Equal(int8(-2))) - Expect(cmd.Flags).To(ContainElement("readonly")) - Expect(cmd.FirstKeyPos).To(Equal(int8(1))) - Expect(cmd.LastKeyPos).To(Equal(int8(-1))) - Expect(cmd.StepCount).To(Equal(int8(1))) - }) - - }) - Describe("Eval", func() { It("returns keys and values", func() { diff --git a/export_test.go b/export_test.go index 3b7965d7..bcc18c45 100644 --- a/export_test.go +++ b/export_test.go @@ -20,8 +20,13 @@ func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) } func (c *ClusterClient) SlotAddrs(slot int) []string { + state, err := c.state() + if err != nil { + panic(err) + } + var addrs []string - for _, n := range c.state().slotNodes(slot) { + for _, n := range state.slotNodes(slot) { addrs = append(addrs, n.Client.getAddr()) } return addrs @@ -29,7 +34,12 @@ func (c *ClusterClient) SlotAddrs(slot int) []string { // SwapSlot swaps a slot's master/slave address for testing MOVED redirects. func (c *ClusterClient) SwapSlotNodes(slot int) { - nodes := c.state().slots[slot] + state, err := c.state() + if err != nil { + panic(err) + } + + nodes := state.slots[slot] if len(nodes) == 2 { nodes[0], nodes[1] = nodes[1], nodes[0] } diff --git a/internal/error.go b/internal/error.go index 90f6503a..e1b8be6b 100644 --- a/internal/error.go +++ b/internal/error.go @@ -13,10 +13,23 @@ type RedisError string func (e RedisError) Error() string { return string(e) } func IsRetryableError(err error) bool { - return IsNetworkError(err) || err.Error() == "ERR max number of clients reached" + if IsNetworkError(err) { + return true + } + s := err.Error() + if s == "ERR max number of clients reached" { + return true + } + if strings.HasPrefix(s, "LOADING ") { + return true + } + if strings.HasPrefix(s, "CLUSTERDOWN ") { + return true + } + return false } -func IsInternalError(err error) bool { +func IsRedisError(err error) bool { _, ok := err.(RedisError) return ok } @@ -33,7 +46,7 @@ func IsBadConn(err error, allowTimeout bool) bool { if err == nil { return false } - if IsInternalError(err) { + if IsRedisError(err) { return false } if allowTimeout { @@ -45,7 +58,7 @@ func IsBadConn(err error, allowTimeout bool) bool { } func IsMovedError(err error) (moved bool, ask bool, addr string) { - if !IsInternalError(err) { + if !IsRedisError(err) { return } @@ -69,7 +82,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { func IsLoadingError(err error) bool { return strings.HasPrefix(err.Error(), "LOADING ") } - -func IsClusterDownError(err error) bool { - return strings.HasPrefix(err.Error(), "CLUSTERDOWN ") -} diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 2159cf63..cd94329d 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -63,7 +63,7 @@ func (p *Reader) ReadLine() ([]byte, error) { return nil, bufio.ErrBufferFull } if len(line) == 0 { - return nil, internal.RedisError("redis: reply is empty") + return nil, fmt.Errorf("redis: reply is empty") } if isNilReply(line) { return nil, internal.Nil diff --git a/internal/proto/scan.go b/internal/proto/scan.go index 3ab40b94..0431a877 100644 --- a/internal/proto/scan.go +++ b/internal/proto/scan.go @@ -11,7 +11,7 @@ import ( func Scan(b []byte, v interface{}) error { switch v := v.(type) { case nil: - return internal.RedisError("redis: Scan(nil)") + return fmt.Errorf("redis: Scan(nil)") case *string: *v = internal.BytesToString(b) return nil diff --git a/main_test.go b/main_test.go index 30f09c61..64f25d99 100644 --- a/main_test.go +++ b/main_test.go @@ -3,6 +3,7 @@ package redis_test import ( "errors" "fmt" + "log" "net" "os" "os/exec" @@ -51,7 +52,7 @@ var cluster = &clusterScenario{ } func init() { - //redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) + redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) } var _ = BeforeSuite(func() { diff --git a/pubsub.go b/pubsub.go index 4a5c65f5..e754a16f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -95,7 +95,10 @@ func (c *PubSub) releaseConn(cn *pool.Conn, err error) { } func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { - if internal.IsBadConn(err, true) && c.cn == cn { + if c.cn != cn { + return + } + if internal.IsBadConn(err, true) { _ = c.closeTheCn() } } diff --git a/redis.go b/redis.go index b18973cd..db1f39c3 100644 --- a/redis.go +++ b/redis.go @@ -197,8 +197,11 @@ type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { return func(cmds []Cmder) error { - var firstErr error - for i := 0; i <= c.opt.MaxRetries; i++ { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + cn, _, err := c.getConn() if err != nil { setCmdsErr(cmds, err) @@ -206,18 +209,18 @@ func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { } canRetry, err := p(cn, cmds) - c.releaseConn(cn, err) - if err == nil { - return nil - } - if firstErr == nil { - firstErr = err + + if err == nil || internal.IsRedisError(err) { + _ = c.connPool.Put(cn) + break } + _ = c.connPool.Remove(cn) + if !canRetry || !internal.IsRetryableError(err) { break } } - return firstErr + return firstCmdsErr(cmds) } } @@ -230,23 +233,17 @@ func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, err // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - return pipelineReadCmds(cn, cmds) + return true, pipelineReadCmds(cn, cmds) } -func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) { - for i, cmd := range cmds { +func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { + for _, cmd := range cmds { err := cmd.readReply(cn) - if err == nil { - continue - } - if i == 0 { - retry = true - } - if firstErr == nil { - firstErr = err + if err != nil && !internal.IsRedisError(err) { + return err } } - return + return nil } func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { @@ -260,11 +257,11 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e cn.SetReadTimeout(c.opt.ReadTimeout) if err := c.txPipelineReadQueued(cn, cmds); err != nil { + setCmdsErr(cmds, err) return false, err } - _, err := pipelineReadCmds(cn, cmds) - return false, err + return false, pipelineReadCmds(cn, cmds) } func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { @@ -276,21 +273,16 @@ func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { } func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { - var firstErr error - // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil && firstErr == nil { - firstErr = err + if err := statusCmd.readReply(cn); err != nil { + return err } - for _, cmd := range cmds { + for _ = range cmds { err := statusCmd.readReply(cn) - if err != nil { - cmd.setErr(err) - if firstErr == nil { - firstErr = err - } + if err != nil && !internal.IsRedisError(err) { + return err } } diff --git a/ring.go b/ring.go index 72d52bf7..a9314fb5 100644 --- a/ring.go +++ b/ring.go @@ -34,7 +34,9 @@ type RingOptions struct { DB int Password string - MaxRetries int + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration @@ -50,6 +52,19 @@ func (opt *RingOptions) init() { if opt.HeartbeatFrequency == 0 { opt.HeartbeatFrequency = 500 * time.Millisecond } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } } func (opt *RingOptions) clientOptions() *Options { @@ -165,6 +180,10 @@ func (c *Ring) Options() *RingOptions { return c.opt } +func (c *Ring) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + // PoolStats returns accumulated connection pool stats. func (c *Ring) PoolStats() *PoolStats { var acc PoolStats @@ -241,6 +260,9 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { func (c *Ring) cmdInfo(name string) *CommandInfo { err := c.cmdsInfoOnce.Do(func() error { + c.mu.RLock() + defer c.mu.RUnlock() + var firstErr error for _, shard := range c.shards { cmdsInfo, err := shard.Client.Command().Result() @@ -257,7 +279,11 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { if err != nil { return nil } - return c.cmdsInfo[name] + info := c.cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info } func (c *Ring) addClient(name string, cl *Client) { @@ -399,7 +425,7 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { +func (c *Ring) pipelineExec(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) @@ -410,36 +436,33 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { cmdsMap[name] = append(cmdsMap[name], cmd) } - for i := 0; i <= c.opt.MaxRetries; i++ { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + var failedCmdsMap map[string][]Cmder for name, cmds := range cmdsMap { shard, err := c.shardByName(name) if err != nil { setCmdsErr(cmds, err) - if firstErr == nil { - firstErr = err - } continue } cn, _, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) - if firstErr == nil { - firstErr = err - } continue } canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - shard.Client.releaseConn(cn, err) - if err == nil { + if err == nil || internal.IsRedisError(err) { + _ = shard.Client.connPool.Put(cn) continue } - if firstErr == nil { - firstErr = err - } + _ = shard.Client.connPool.Remove(cn) + if canRetry && internal.IsRetryableError(err) { if failedCmdsMap == nil { failedCmdsMap = make(map[string][]Cmder) @@ -454,5 +477,5 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { cmdsMap = failedCmdsMap } - return firstErr + return firstCmdsErr(cmds) } diff --git a/tx.go b/tx.go index 5ef89619..93c295d9 100644 --- a/tx.go +++ b/tx.go @@ -36,11 +36,10 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { return err } } - firstErr := fn(tx) - if err := tx.Close(); err != nil && firstErr == nil { - firstErr = err - } - return firstErr + + err := fn(tx) + _ = tx.Close() + return err } // close closes the transaction, releasing any open resources. @@ -53,7 +52,7 @@ func (c *Tx) Close() error { // of a transaction. func (c *Tx) Watch(keys ...string) *StatusCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "WATCH" + args[0] = "watch" for i, key := range keys { args[1+i] = key } @@ -65,7 +64,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd { // Unwatch flushes all the previously watched keys for a transaction. func (c *Tx) Unwatch(keys ...string) *StatusCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "UNWATCH" + args[0] = "unwatch" for i, key := range keys { args[1+i] = key }