From 38d1749d56f6ee41baf8222b3aa0aa0c6950f41d Mon Sep 17 00:00:00 2001 From: monkey92t Date: Mon, 2 Aug 2021 19:01:01 +0800 Subject: [PATCH] adjust the code (#1842) * Upgrade redis-server version (#1833) * Upgrade redis-server version Signed-off-by: monkey * XAutoClaim changed the return value Signed-off-by: monkey * add cmd: geosearch, geosearchstore (#1836) * add cmd: geosearch, geosearchstore Signed-off-by: monkey92t * GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing Signed-off-by: monkey92t * adjust the code, and fix #1553, #1676 Signed-off-by: monkey92t --- Makefile | 2 +- cluster.go | 1 - cluster_test.go | 8 +- command.go | 177 +++++++++++++++++++++++++++++++++ command_test.go | 2 +- commands.go | 39 ++++++++ commands_test.go | 209 +++++++++++++++++++++++++++++++++++++-- export_test.go | 10 +- internal/proto/writer.go | 2 +- options.go | 5 +- pipeline.go | 40 ++------ pool_test.go | 1 - redis_test.go | 17 ---- ring.go | 2 +- ring_test.go | 1 - 15 files changed, 439 insertions(+), 77 deletions(-) diff --git a/Makefile b/Makefile index 5501164f..b16709c8 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ bench: testdeps testdata/redis: mkdir -p $@ - wget -qO- https://download.redis.io/releases/redis-6.2.1.tar.gz | tar xvz --strip-components=1 -C $@ + wget -qO- https://download.redis.io/releases/redis-6.2.5.tar.gz | tar xvz --strip-components=1 -C $@ testdata/redis/src/redis-server: testdata/redis cd $< && make all diff --git a/cluster.go b/cluster.go index d85c043a..bfc7d4e5 100644 --- a/cluster.go +++ b/cluster.go @@ -795,7 +795,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { _ = pipe.Process(ctx, NewCmd(ctx, "asking")) _ = pipe.Process(ctx, cmd) _, lastErr = pipe.Exec(ctx) - _ = pipe.Close() ask = false } else { lastErr = node.Client.Process(ctx, cmd) diff --git a/cluster_test.go b/cluster_test.go index 4c4e4d31..fa692620 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -515,9 +515,7 @@ var _ = Describe("ClusterClient", func() { pipe = client.Pipeline().(*redis.Pipeline) }) - AfterEach(func() { - Expect(pipe.Close()).NotTo(HaveOccurred()) - }) + AfterEach(func() {}) assertPipeline() }) @@ -527,9 +525,7 @@ var _ = Describe("ClusterClient", func() { pipe = client.TxPipeline().(*redis.Pipeline) }) - AfterEach(func() { - Expect(pipe.Close()).NotTo(HaveOccurred()) - }) + AfterEach(func() {}) assertPipeline() }) diff --git a/command.go b/command.go index 1c53a660..d1be1043 100644 --- a/command.go +++ b/command.go @@ -2564,6 +2564,183 @@ func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { //------------------------------------------------------------------------------ +// GeoSearchQuery is used for GEOSearch/GEOSearchStore command query. +type GeoSearchQuery struct { + Member string + + // Latitude and Longitude when using FromLonLat option. + Longitude float64 + Latitude float64 + + // Distance and unit when using ByRadius option. + // Can use m, km, ft, or mi. Default is km. + Radius float64 + RadiusUnit string + + // Height, width and unit when using ByBox option. + // Can be m, km, ft, or mi. Default is km. + BoxWidth float64 + BoxHeight float64 + BoxUnit string + + // Can be ASC or DESC. Default is no sort order. + Sort string + Count int + CountAny bool +} + +type GeoSearchLocationQuery struct { + GeoSearchQuery + + WithCoord bool + WithDist bool + WithHash bool +} + +type GeoSearchStoreQuery struct { + GeoSearchQuery + + // When using the StoreDist option, the command stores the items in a + // sorted set populated with their distance from the center of the circle or box, + // as a floating-point number, in the same unit specified for that shape. + StoreDist bool +} + +func geoSearchLocationArgs(q *GeoSearchLocationQuery, args []interface{}) []interface{} { + args = geoSearchArgs(&q.GeoSearchQuery, args) + + if q.WithCoord { + args = append(args, "withcoord") + } + if q.WithDist { + args = append(args, "withdist") + } + if q.WithHash { + args = append(args, "withhash") + } + + return args +} + +func geoSearchArgs(q *GeoSearchQuery, args []interface{}) []interface{} { + if q.Member != "" { + args = append(args, "frommember", q.Member) + } else { + args = append(args, "fromlonlat", q.Longitude, q.Latitude) + } + + if q.Radius > 0 { + if q.RadiusUnit == "" { + q.RadiusUnit = "km" + } + args = append(args, "byradius", q.Radius, q.RadiusUnit) + } else { + if q.BoxUnit == "" { + q.BoxUnit = "km" + } + args = append(args, "bybox", q.BoxWidth, q.BoxHeight, q.BoxUnit) + } + + if q.Sort != "" { + args = append(args, q.Sort) + } + + if q.Count > 0 { + args = append(args, "count", q.Count) + if q.CountAny { + args = append(args, "any") + } + } + + return args +} + +type GeoSearchLocationCmd struct { + baseCmd + + opt *GeoSearchLocationQuery + val []GeoLocation +} + +var _ Cmder = (*GeoSearchLocationCmd)(nil) + +func NewGeoSearchLocationCmd( + ctx context.Context, opt *GeoSearchLocationQuery, args ...interface{}, +) *GeoSearchLocationCmd { + return &GeoSearchLocationCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + opt: opt, + } +} + +func (cmd *GeoSearchLocationCmd) Val() []GeoLocation { + return cmd.val +} + +func (cmd *GeoSearchLocationCmd) Result() ([]GeoLocation, error) { + return cmd.val, cmd.err +} + +func (cmd *GeoSearchLocationCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *GeoSearchLocationCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + + cmd.val = make([]GeoLocation, n) + for i := 0; i < n; i++ { + _, err = rd.ReadArrayLen() + if err != nil { + return err + } + + var loc GeoLocation + + loc.Name, err = rd.ReadString() + if err != nil { + return err + } + if cmd.opt.WithDist { + loc.Dist, err = rd.ReadFloat() + if err != nil { + return err + } + } + if cmd.opt.WithHash { + loc.GeoHash, err = rd.ReadInt() + if err != nil { + return err + } + } + if cmd.opt.WithCoord { + if err = rd.ReadFixedArrayLen(2); err != nil { + return err + } + loc.Longitude, err = rd.ReadFloat() + if err != nil { + return err + } + loc.Latitude, err = rd.ReadFloat() + if err != nil { + return err + } + } + + cmd.val[i] = loc + } + + return nil +} + +//------------------------------------------------------------------------------ + type GeoPos struct { Longitude, Latitude float64 } diff --git a/command_test.go b/command_test.go index d110d0c3..9877df42 100644 --- a/command_test.go +++ b/command_test.go @@ -4,7 +4,7 @@ import ( "errors" "time" - redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redis/v8" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" diff --git a/commands.go b/commands.go index 346e38e9..b63aa013 100644 --- a/commands.go +++ b/commands.go @@ -244,6 +244,7 @@ type Cmdable interface { XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd XInfoStream(ctx context.Context, key string) *XInfoStreamCmd + XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd @@ -304,6 +305,8 @@ type Cmdable interface { ClientList(ctx context.Context) *StringCmd ClientPause(ctx context.Context, dur time.Duration) *BoolCmd ClientID(ctx context.Context) *IntCmd + ClientUnblock(ctx context.Context, id int64) *IntCmd + ClientUnblockWithError(ctx context.Context, id int64) *IntCmd ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd ConfigResetStat(ctx context.Context) *StatusCmd ConfigSet(ctx context.Context, parameter, value string) *StatusCmd @@ -320,6 +323,7 @@ type Cmdable interface { ShutdownSave(ctx context.Context) *StatusCmd ShutdownNoSave(ctx context.Context) *StatusCmd SlaveOf(ctx context.Context, host, port string) *StatusCmd + SlowLogGet(ctx context.Context, num int64) *SlowLogCmd Time(ctx context.Context) *TimeCmd DebugObject(ctx context.Context, key string) *StringCmd ReadOnly(ctx context.Context) *StatusCmd @@ -364,6 +368,9 @@ type Cmdable interface { GeoRadiusStore(ctx context.Context, key string, longitude, latitude float64, query *GeoRadiusQuery) *IntCmd GeoRadiusByMember(ctx context.Context, key, member string, query *GeoRadiusQuery) *GeoLocationCmd GeoRadiusByMemberStore(ctx context.Context, key, member string, query *GeoRadiusQuery) *IntCmd + GeoSearch(ctx context.Context, key string, q *GeoSearchQuery) *StringSliceCmd + GeoSearchLocation(ctx context.Context, key string, q *GeoSearchLocationQuery) *GeoSearchLocationCmd + GeoSearchStore(ctx context.Context, key, store string, q *GeoSearchStoreQuery) *IntCmd GeoDist(ctx context.Context, key string, member1, member2, unit string) *FloatCmd GeoHash(ctx context.Context, key string, members ...string) *StringSliceCmd } @@ -3240,6 +3247,38 @@ func (c cmdable) GeoRadiusByMemberStore( return cmd } +func (c cmdable) GeoSearch(ctx context.Context, key string, q *GeoSearchQuery) *StringSliceCmd { + args := make([]interface{}, 0, 13) + args = append(args, "geosearch", key) + args = geoSearchArgs(q, args) + cmd := NewStringSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) GeoSearchLocation( + ctx context.Context, key string, q *GeoSearchLocationQuery, +) *GeoSearchLocationCmd { + args := make([]interface{}, 0, 16) + args = append(args, "geosearch", key) + args = geoSearchLocationArgs(q, args) + cmd := NewGeoSearchLocationCmd(ctx, q, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) GeoSearchStore(ctx context.Context, key, store string, q *GeoSearchStoreQuery) *IntCmd { + args := make([]interface{}, 0, 15) + args = append(args, "geosearchstore", store, key) + args = geoSearchArgs(&q.GeoSearchQuery, args) + if q.StoreDist { + args = append(args, "storedist") + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) GeoDist( ctx context.Context, key string, member1, member2, unit string, ) *FloatCmd { diff --git a/commands_test.go b/commands_test.go index eb46a790..06d6e401 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4657,7 +4657,7 @@ var _ = Describe("Commands", func() { } msgs, start, err := client.XAutoClaim(ctx, xca).Result() Expect(err).NotTo(HaveOccurred()) - Expect(start).To(Equal("2-0")) + Expect(start).To(Equal("3-0")) Expect(msgs).To(Equal([]redis.XMessage{{ ID: "1-0", Values: map[string]interface{}{"uno": "un"}, @@ -4669,19 +4669,16 @@ var _ = Describe("Commands", func() { xca.Start = start msgs, start, err = client.XAutoClaim(ctx, xca).Result() Expect(err).NotTo(HaveOccurred()) - Expect(start).To(Equal("3-0")) + Expect(start).To(Equal("0-0")) Expect(msgs).To(Equal([]redis.XMessage{{ - ID: "2-0", - Values: map[string]interface{}{"dos": "deux"}, - }, { ID: "3-0", Values: map[string]interface{}{"tres": "troix"}, }})) ids, start, err := client.XAutoClaimJustID(ctx, xca).Result() Expect(err).NotTo(HaveOccurred()) - Expect(start).To(Equal("3-0")) - Expect(ids).To(Equal([]string{"2-0", "3-0"})) + Expect(start).To(Equal("0-0")) + Expect(ids).To(Equal([]string{"3-0"})) }) It("should XClaim", func() { @@ -5167,6 +5164,204 @@ var _ = Describe("Commands", func() { nil, })) }) + + It("should geo search", func() { + q := &redis.GeoSearchQuery{ + Member: "Catania", + BoxWidth: 400, + BoxHeight: 100, + BoxUnit: "km", + Sort: "asc", + } + val, err := client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.BoxHeight = 400 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania", "Palermo"})) + + q.Count = 1 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.CountAny = true + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Palermo"})) + + q = &redis.GeoSearchQuery{ + Member: "Catania", + Radius: 100, + RadiusUnit: "km", + Sort: "asc", + } + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.Radius = 400 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania", "Palermo"})) + + q.Count = 1 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.CountAny = true + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Palermo"})) + + q = &redis.GeoSearchQuery{ + Longitude: 15, + Latitude: 37, + BoxWidth: 200, + BoxHeight: 200, + BoxUnit: "km", + Sort: "asc", + } + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.BoxWidth, q.BoxHeight = 400, 400 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania", "Palermo"})) + + q.Count = 1 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.CountAny = true + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Palermo"})) + + q = &redis.GeoSearchQuery{ + Longitude: 15, + Latitude: 37, + Radius: 100, + RadiusUnit: "km", + Sort: "asc", + } + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.Radius = 200 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania", "Palermo"})) + + q.Count = 1 + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Catania"})) + + q.CountAny = true + val, err = client.GeoSearch(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]string{"Palermo"})) + }) + + It("should geo search with options", func() { + q := &redis.GeoSearchLocationQuery{ + GeoSearchQuery: redis.GeoSearchQuery{ + Longitude: 15, + Latitude: 37, + Radius: 200, + RadiusUnit: "km", + Sort: "asc", + }, + WithHash: true, + WithDist: true, + WithCoord: true, + } + val, err := client.GeoSearchLocation(ctx, "Sicily", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal([]redis.GeoLocation{ + { + Name: "Catania", + Longitude: 15.08726745843887329, + Latitude: 37.50266842333162032, + Dist: 56.4413, + GeoHash: 3479447370796909, + }, + { + Name: "Palermo", + Longitude: 13.36138933897018433, + Latitude: 38.11555639549629859, + Dist: 190.4424, + GeoHash: 3479099956230698, + }, + })) + }) + + It("should geo search store", func() { + q := &redis.GeoSearchStoreQuery{ + GeoSearchQuery: redis.GeoSearchQuery{ + Longitude: 15, + Latitude: 37, + Radius: 200, + RadiusUnit: "km", + Sort: "asc", + }, + StoreDist: false, + } + + val, err := client.GeoSearchStore(ctx, "Sicily", "key1", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(int64(2))) + + q.StoreDist = true + val, err = client.GeoSearchStore(ctx, "Sicily", "key2", q).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(int64(2))) + + loc, err := client.GeoSearchLocation(ctx, "key1", &redis.GeoSearchLocationQuery{ + GeoSearchQuery: q.GeoSearchQuery, + WithCoord: true, + WithDist: true, + WithHash: true, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(loc).To(Equal([]redis.GeoLocation{ + { + Name: "Catania", + Longitude: 15.08726745843887329, + Latitude: 37.50266842333162032, + Dist: 56.4413, + GeoHash: 3479447370796909, + }, + { + Name: "Palermo", + Longitude: 13.36138933897018433, + Latitude: 38.11555639549629859, + Dist: 190.4424, + GeoHash: 3479099956230698, + }, + })) + + v, err := client.ZRangeWithScores(ctx, "key2", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal([]redis.Z{ + { + Score: 56.441257870158204, + Member: "Catania", + }, + { + Score: 190.44242984775784, + Member: "Palermo", + }, + })) + }) }) Describe("marshaling/unmarshaling", func() { diff --git a/export_test.go b/export_test.go index 49c4b94c..e243a192 100644 --- a/export_test.go +++ b/export_test.go @@ -60,21 +60,21 @@ func (c *ClusterClient) SwapNodes(ctx context.Context, key string) error { return nil } -func (state *clusterState) IsConsistent(ctx context.Context) bool { - if len(state.Masters) < 3 { +func (c *clusterState) IsConsistent(ctx context.Context) bool { + if len(c.Masters) < 3 { return false } - for _, master := range state.Masters { + for _, master := range c.Masters { s := master.Client.Info(ctx, "replication").Val() if !strings.Contains(s, "role:master") { return false } } - if len(state.Slaves) < 3 { + if len(c.Slaves) < 3 { return false } - for _, slave := range state.Slaves { + for _, slave := range c.Slaves { s := slave.Client.Info(ctx, "replication").Val() if !strings.Contains(s, "role:slave") { return false diff --git a/internal/proto/writer.go b/internal/proto/writer.go index 72b30441..f2dc5c93 100644 --- a/internal/proto/writer.go +++ b/internal/proto/writer.go @@ -13,7 +13,7 @@ import ( type writer interface { io.Writer io.ByteWriter - // io.StringWriter + // WriteString implement io.StringWriter. WriteString(s string) (n int, err error) } diff --git a/options.go b/options.go index 5d39bf04..e1e85a66 100644 --- a/options.go +++ b/options.go @@ -247,7 +247,10 @@ func setupTCPConn(u *url.URL) (*Options, error) { } if u.Scheme == "rediss" { - o.TLSConfig = &tls.Config{ServerName: h} + o.TLSConfig = &tls.Config{ + ServerName: h, + MinVersion: tls.VersionTLS12, + } } return o, nil diff --git a/pipeline.go b/pipeline.go index c6ec3409..aa2fc8f0 100644 --- a/pipeline.go +++ b/pipeline.go @@ -3,8 +3,6 @@ package redis import ( "context" "sync" - - "github.com/go-redis/redis/v8/internal/pool" ) type pipelineExecer func(context.Context, []Cmder) error @@ -26,8 +24,7 @@ type Pipeliner interface { StatefulCmdable Do(ctx context.Context, args ...interface{}) *Cmd Process(ctx context.Context, cmd Cmder) error - Close() error - Discard() error + Discard() Exec(ctx context.Context) ([]Cmder, error) } @@ -43,9 +40,8 @@ type Pipeline struct { ctx context.Context exec pipelineExecer - mu sync.Mutex - cmds []Cmder - closed bool + mu sync.Mutex + cmds []Cmder } func (c *Pipeline) init() { @@ -67,29 +63,11 @@ func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error { return nil } -// Close closes the pipeline, releasing any open resources. -func (c *Pipeline) Close() error { - c.mu.Lock() - _ = c.discard() - c.closed = true - c.mu.Unlock() - return nil -} - // Discard resets the pipeline and discards queued commands. -func (c *Pipeline) Discard() error { +func (c *Pipeline) Discard() { c.mu.Lock() - err := c.discard() - c.mu.Unlock() - return err -} - -func (c *Pipeline) discard() error { - if c.closed { - return pool.ErrClosed - } c.cmds = c.cmds[:0] - return nil + c.mu.Unlock() } // Exec executes all previously queued commands using one @@ -101,10 +79,6 @@ func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) { c.mu.Lock() defer c.mu.Unlock() - if c.closed { - return nil, pool.ErrClosed - } - if len(c.cmds) == 0 { return nil, nil } @@ -119,9 +93,7 @@ func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]C if err := fn(c); err != nil { return nil, err } - cmds, err := c.Exec(ctx) - _ = c.Close() - return cmds, err + return c.Exec(ctx) } func (c *Pipeline) Pipeline() Pipeliner { diff --git a/pool_test.go b/pool_test.go index 81318199..5e83549c 100644 --- a/pool_test.go +++ b/pool_test.go @@ -72,7 +72,6 @@ var _ = Describe("pool", func() { Expect(cmds).To(HaveLen(1)) Expect(ping.Err()).NotTo(HaveOccurred()) Expect(ping.Val()).To(Equal("PONG")) - Expect(pipe.Close()).NotTo(HaveOccurred()) }) pool := client.Pool() diff --git a/redis_test.go b/redis_test.go index 9f189052..c2d2dc02 100644 --- a/redis_test.go +++ b/redis_test.go @@ -136,17 +136,6 @@ var _ = Describe("Client", func() { Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred()) }) - It("should close pipeline without closing the client", func() { - pipeline := client.Pipeline() - Expect(pipeline.Close()).NotTo(HaveOccurred()) - - pipeline.Ping(ctx) - _, err := pipeline.Exec(ctx) - Expect(err).To(MatchError("redis: client is closed")) - - Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred()) - }) - It("should close pubsub when client is closed", func() { pubsub := client.Subscribe(ctx) Expect(client.Close()).NotTo(HaveOccurred()) @@ -157,12 +146,6 @@ var _ = Describe("Client", func() { Expect(pubsub.Close()).NotTo(HaveOccurred()) }) - It("should close pipeline when client is closed", func() { - pipeline := client.Pipeline() - Expect(client.Close()).NotTo(HaveOccurred()) - Expect(pipeline.Close()).NotTo(HaveOccurred()) - }) - It("should select DB", func() { db2 := redis.NewClient(&redis.Options{ Addr: redisAddr, diff --git a/ring.go b/ring.go index 5e35b6bb..7446d32c 100644 --- a/ring.go +++ b/ring.go @@ -308,7 +308,7 @@ func (c *ringShards) Random() (*ringShard, error) { return c.GetByKey(strconv.Itoa(rand.Int())) } -// heartbeat monitors state of each shard in the ring. +// Heartbeat monitors state of each shard in the ring. func (c *ringShards) Heartbeat(frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop() diff --git a/ring_test.go b/ring_test.go index 4a434a5b..432a1c07 100644 --- a/ring_test.go +++ b/ring_test.go @@ -123,7 +123,6 @@ var _ = Describe("Redis Ring", func() { cmds, err := pipe.Exec(ctx) Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(100)) - Expect(pipe.Close()).NotTo(HaveOccurred()) for _, cmd := range cmds { Expect(cmd.Err()).NotTo(HaveOccurred())