diff --git a/Makefile b/Makefile index 1fbdac9..187c746 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,8 @@ all: testdeps go test ./... -short -race env GOOS=linux GOARCH=386 go test ./... go vet + go get github.com/gordonklaus/ineffassign + ineffassign . testdeps: testdata/redis/src/redis-server diff --git a/cluster.go b/cluster.go index 55bc5ba..03c186c 100644 --- a/cluster.go +++ b/cluster.go @@ -533,10 +533,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { n := rand.Intn(len(nodes)-1) + 1 slave = nodes[n] if !slave.Loading() { - break + return slave, nil } } - return slave, nil + + // All slaves are loading - use master. + return nodes[0], nil } } @@ -979,9 +981,10 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { break } - // If slave is loading - read from master. + // If slave is loading - pick another node. if c.opt.ReadOnly && internal.IsLoadingError(err) { node.MarkAsLoading() + node = nil continue } diff --git a/commands.go b/commands.go index 4a2eadc..85262cc 100644 --- a/commands.go +++ b/commands.go @@ -1447,6 +1447,7 @@ type XReadGroupArgs struct { Streams []string Count int64 Block time.Duration + NoAck bool } func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { @@ -1458,6 +1459,9 @@ func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { if a.Block >= 0 { args = append(args, "block", int64(a.Block/time.Millisecond)) } + if a.NoAck { + args = append(args, "noack") + } args = append(args, "streams") for _, s := range a.Streams { args = append(args, s) diff --git a/commands_test.go b/commands_test.go index 3c57323..5bea31f 100644 --- a/commands_test.go +++ b/commands_test.go @@ -69,7 +69,7 @@ var _ = Describe("Commands", func() { val, err := client.Wait(1, wait).Result() Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal(int64(0))) - Expect(time.Now()).To(BeTemporally("~", start.Add(wait), time.Second)) + Expect(time.Now()).To(BeTemporally("~", start.Add(wait), 3*time.Second)) }) It("should Select", func() { @@ -2793,10 +2793,13 @@ var _ = Describe("Commands", func() { Member: "one", }).Err() Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset", redis.Z{ Score: 2, Member: "two", }).Err() + Expect(err).NotTo(HaveOccurred()) + members, err = client.ZPopMin("zset", 10).Result() Expect(err).NotTo(HaveOccurred()) Expect(members).To(Equal([]redis.Z{{ @@ -3541,8 +3544,9 @@ var _ = Describe("Commands", func() { res, err := client.XReadGroup(&redis.XReadGroupArgs{ Group: "group", Consumer: "consumer", - Streams: []string{"stream", "0"}, + Streams: []string{"stream", ">"}, }).Result() + Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]redis.XStream{{ Stream: "stream", Messages: []redis.XMessage{