adjust the code (#1842)

* Upgrade redis-server version (#1833)

* Upgrade redis-server version

Signed-off-by: monkey <golang@88.com>

* XAutoClaim changed the return value

Signed-off-by: monkey <golang@88.com>

* add cmd: geosearch, geosearchstore (#1836)

* add cmd: geosearch, geosearchstore

Signed-off-by: monkey92t <golang@88.com>

* GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing

Signed-off-by: monkey92t <golang@88.com>

* adjust the code, and fix #1553, #1676

Signed-off-by: monkey92t <golang@88.com>
This commit is contained in:
monkey92t 2021-08-02 19:01:01 +08:00 committed by GitHub
parent b8245b56f9
commit 38d1749d56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 439 additions and 77 deletions

View File

@ -14,7 +14,7 @@ bench: testdeps
testdata/redis: testdata/redis:
mkdir -p $@ mkdir -p $@
wget -qO- https://download.redis.io/releases/redis-6.2.1.tar.gz | tar xvz --strip-components=1 -C $@ wget -qO- https://download.redis.io/releases/redis-6.2.5.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis testdata/redis/src/redis-server: testdata/redis
cd $< && make all cd $< && make all

View File

@ -795,7 +795,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
_ = pipe.Process(ctx, NewCmd(ctx, "asking")) _ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd) _ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx) _, lastErr = pipe.Exec(ctx)
_ = pipe.Close()
ask = false ask = false
} else { } else {
lastErr = node.Client.Process(ctx, cmd) lastErr = node.Client.Process(ctx, cmd)

View File

@ -515,9 +515,7 @@ var _ = Describe("ClusterClient", func() {
pipe = client.Pipeline().(*redis.Pipeline) pipe = client.Pipeline().(*redis.Pipeline)
}) })
AfterEach(func() { AfterEach(func() {})
Expect(pipe.Close()).NotTo(HaveOccurred())
})
assertPipeline() assertPipeline()
}) })
@ -527,9 +525,7 @@ var _ = Describe("ClusterClient", func() {
pipe = client.TxPipeline().(*redis.Pipeline) pipe = client.TxPipeline().(*redis.Pipeline)
}) })
AfterEach(func() { AfterEach(func() {})
Expect(pipe.Close()).NotTo(HaveOccurred())
})
assertPipeline() assertPipeline()
}) })

View File

@ -2564,6 +2564,183 @@ func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// GeoSearchQuery is used for GEOSearch/GEOSearchStore command query.
type GeoSearchQuery struct {
Member string
// Latitude and Longitude when using FromLonLat option.
Longitude float64
Latitude float64
// Distance and unit when using ByRadius option.
// Can use m, km, ft, or mi. Default is km.
Radius float64
RadiusUnit string
// Height, width and unit when using ByBox option.
// Can be m, km, ft, or mi. Default is km.
BoxWidth float64
BoxHeight float64
BoxUnit string
// Can be ASC or DESC. Default is no sort order.
Sort string
Count int
CountAny bool
}
type GeoSearchLocationQuery struct {
GeoSearchQuery
WithCoord bool
WithDist bool
WithHash bool
}
type GeoSearchStoreQuery struct {
GeoSearchQuery
// When using the StoreDist option, the command stores the items in a
// sorted set populated with their distance from the center of the circle or box,
// as a floating-point number, in the same unit specified for that shape.
StoreDist bool
}
func geoSearchLocationArgs(q *GeoSearchLocationQuery, args []interface{}) []interface{} {
args = geoSearchArgs(&q.GeoSearchQuery, args)
if q.WithCoord {
args = append(args, "withcoord")
}
if q.WithDist {
args = append(args, "withdist")
}
if q.WithHash {
args = append(args, "withhash")
}
return args
}
func geoSearchArgs(q *GeoSearchQuery, args []interface{}) []interface{} {
if q.Member != "" {
args = append(args, "frommember", q.Member)
} else {
args = append(args, "fromlonlat", q.Longitude, q.Latitude)
}
if q.Radius > 0 {
if q.RadiusUnit == "" {
q.RadiusUnit = "km"
}
args = append(args, "byradius", q.Radius, q.RadiusUnit)
} else {
if q.BoxUnit == "" {
q.BoxUnit = "km"
}
args = append(args, "bybox", q.BoxWidth, q.BoxHeight, q.BoxUnit)
}
if q.Sort != "" {
args = append(args, q.Sort)
}
if q.Count > 0 {
args = append(args, "count", q.Count)
if q.CountAny {
args = append(args, "any")
}
}
return args
}
type GeoSearchLocationCmd struct {
baseCmd
opt *GeoSearchLocationQuery
val []GeoLocation
}
var _ Cmder = (*GeoSearchLocationCmd)(nil)
func NewGeoSearchLocationCmd(
ctx context.Context, opt *GeoSearchLocationQuery, args ...interface{},
) *GeoSearchLocationCmd {
return &GeoSearchLocationCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
opt: opt,
}
}
func (cmd *GeoSearchLocationCmd) Val() []GeoLocation {
return cmd.val
}
func (cmd *GeoSearchLocationCmd) Result() ([]GeoLocation, error) {
return cmd.val, cmd.err
}
func (cmd *GeoSearchLocationCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *GeoSearchLocationCmd) readReply(rd *proto.Reader) error {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val = make([]GeoLocation, n)
for i := 0; i < n; i++ {
_, err = rd.ReadArrayLen()
if err != nil {
return err
}
var loc GeoLocation
loc.Name, err = rd.ReadString()
if err != nil {
return err
}
if cmd.opt.WithDist {
loc.Dist, err = rd.ReadFloat()
if err != nil {
return err
}
}
if cmd.opt.WithHash {
loc.GeoHash, err = rd.ReadInt()
if err != nil {
return err
}
}
if cmd.opt.WithCoord {
if err = rd.ReadFixedArrayLen(2); err != nil {
return err
}
loc.Longitude, err = rd.ReadFloat()
if err != nil {
return err
}
loc.Latitude, err = rd.ReadFloat()
if err != nil {
return err
}
}
cmd.val[i] = loc
}
return nil
}
//------------------------------------------------------------------------------
type GeoPos struct { type GeoPos struct {
Longitude, Latitude float64 Longitude, Latitude float64
} }

View File

@ -4,7 +4,7 @@ import (
"errors" "errors"
"time" "time"
redis "github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"

View File

@ -244,6 +244,7 @@ type Cmdable interface {
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
@ -304,6 +305,8 @@ type Cmdable interface {
ClientList(ctx context.Context) *StringCmd ClientList(ctx context.Context) *StringCmd
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
ClientID(ctx context.Context) *IntCmd ClientID(ctx context.Context) *IntCmd
ClientUnblock(ctx context.Context, id int64) *IntCmd
ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd
ConfigResetStat(ctx context.Context) *StatusCmd ConfigResetStat(ctx context.Context) *StatusCmd
ConfigSet(ctx context.Context, parameter, value string) *StatusCmd ConfigSet(ctx context.Context, parameter, value string) *StatusCmd
@ -320,6 +323,7 @@ type Cmdable interface {
ShutdownSave(ctx context.Context) *StatusCmd ShutdownSave(ctx context.Context) *StatusCmd
ShutdownNoSave(ctx context.Context) *StatusCmd ShutdownNoSave(ctx context.Context) *StatusCmd
SlaveOf(ctx context.Context, host, port string) *StatusCmd SlaveOf(ctx context.Context, host, port string) *StatusCmd
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
Time(ctx context.Context) *TimeCmd Time(ctx context.Context) *TimeCmd
DebugObject(ctx context.Context, key string) *StringCmd DebugObject(ctx context.Context, key string) *StringCmd
ReadOnly(ctx context.Context) *StatusCmd ReadOnly(ctx context.Context) *StatusCmd
@ -364,6 +368,9 @@ type Cmdable interface {
GeoRadiusStore(ctx context.Context, key string, longitude, latitude float64, query *GeoRadiusQuery) *IntCmd GeoRadiusStore(ctx context.Context, key string, longitude, latitude float64, query *GeoRadiusQuery) *IntCmd
GeoRadiusByMember(ctx context.Context, key, member string, query *GeoRadiusQuery) *GeoLocationCmd GeoRadiusByMember(ctx context.Context, key, member string, query *GeoRadiusQuery) *GeoLocationCmd
GeoRadiusByMemberStore(ctx context.Context, key, member string, query *GeoRadiusQuery) *IntCmd GeoRadiusByMemberStore(ctx context.Context, key, member string, query *GeoRadiusQuery) *IntCmd
GeoSearch(ctx context.Context, key string, q *GeoSearchQuery) *StringSliceCmd
GeoSearchLocation(ctx context.Context, key string, q *GeoSearchLocationQuery) *GeoSearchLocationCmd
GeoSearchStore(ctx context.Context, key, store string, q *GeoSearchStoreQuery) *IntCmd
GeoDist(ctx context.Context, key string, member1, member2, unit string) *FloatCmd GeoDist(ctx context.Context, key string, member1, member2, unit string) *FloatCmd
GeoHash(ctx context.Context, key string, members ...string) *StringSliceCmd GeoHash(ctx context.Context, key string, members ...string) *StringSliceCmd
} }
@ -3240,6 +3247,38 @@ func (c cmdable) GeoRadiusByMemberStore(
return cmd return cmd
} }
func (c cmdable) GeoSearch(ctx context.Context, key string, q *GeoSearchQuery) *StringSliceCmd {
args := make([]interface{}, 0, 13)
args = append(args, "geosearch", key)
args = geoSearchArgs(q, args)
cmd := NewStringSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) GeoSearchLocation(
ctx context.Context, key string, q *GeoSearchLocationQuery,
) *GeoSearchLocationCmd {
args := make([]interface{}, 0, 16)
args = append(args, "geosearch", key)
args = geoSearchLocationArgs(q, args)
cmd := NewGeoSearchLocationCmd(ctx, q, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) GeoSearchStore(ctx context.Context, key, store string, q *GeoSearchStoreQuery) *IntCmd {
args := make([]interface{}, 0, 15)
args = append(args, "geosearchstore", store, key)
args = geoSearchArgs(&q.GeoSearchQuery, args)
if q.StoreDist {
args = append(args, "storedist")
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) GeoDist( func (c cmdable) GeoDist(
ctx context.Context, key string, member1, member2, unit string, ctx context.Context, key string, member1, member2, unit string,
) *FloatCmd { ) *FloatCmd {

View File

@ -4657,7 +4657,7 @@ var _ = Describe("Commands", func() {
} }
msgs, start, err := client.XAutoClaim(ctx, xca).Result() msgs, start, err := client.XAutoClaim(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("2-0")) Expect(start).To(Equal("3-0"))
Expect(msgs).To(Equal([]redis.XMessage{{ Expect(msgs).To(Equal([]redis.XMessage{{
ID: "1-0", ID: "1-0",
Values: map[string]interface{}{"uno": "un"}, Values: map[string]interface{}{"uno": "un"},
@ -4669,19 +4669,16 @@ var _ = Describe("Commands", func() {
xca.Start = start xca.Start = start
msgs, start, err = client.XAutoClaim(ctx, xca).Result() msgs, start, err = client.XAutoClaim(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("3-0")) Expect(start).To(Equal("0-0"))
Expect(msgs).To(Equal([]redis.XMessage{{ Expect(msgs).To(Equal([]redis.XMessage{{
ID: "2-0",
Values: map[string]interface{}{"dos": "deux"},
}, {
ID: "3-0", ID: "3-0",
Values: map[string]interface{}{"tres": "troix"}, Values: map[string]interface{}{"tres": "troix"},
}})) }}))
ids, start, err := client.XAutoClaimJustID(ctx, xca).Result() ids, start, err := client.XAutoClaimJustID(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("3-0")) Expect(start).To(Equal("0-0"))
Expect(ids).To(Equal([]string{"2-0", "3-0"})) Expect(ids).To(Equal([]string{"3-0"}))
}) })
It("should XClaim", func() { It("should XClaim", func() {
@ -5167,6 +5164,204 @@ var _ = Describe("Commands", func() {
nil, nil,
})) }))
}) })
It("should geo search", func() {
q := &redis.GeoSearchQuery{
Member: "Catania",
BoxWidth: 400,
BoxHeight: 100,
BoxUnit: "km",
Sort: "asc",
}
val, err := client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.BoxHeight = 400
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania", "Palermo"}))
q.Count = 1
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.CountAny = true
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Palermo"}))
q = &redis.GeoSearchQuery{
Member: "Catania",
Radius: 100,
RadiusUnit: "km",
Sort: "asc",
}
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.Radius = 400
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania", "Palermo"}))
q.Count = 1
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.CountAny = true
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Palermo"}))
q = &redis.GeoSearchQuery{
Longitude: 15,
Latitude: 37,
BoxWidth: 200,
BoxHeight: 200,
BoxUnit: "km",
Sort: "asc",
}
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.BoxWidth, q.BoxHeight = 400, 400
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania", "Palermo"}))
q.Count = 1
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.CountAny = true
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Palermo"}))
q = &redis.GeoSearchQuery{
Longitude: 15,
Latitude: 37,
Radius: 100,
RadiusUnit: "km",
Sort: "asc",
}
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.Radius = 200
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania", "Palermo"}))
q.Count = 1
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Catania"}))
q.CountAny = true
val, err = client.GeoSearch(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]string{"Palermo"}))
})
It("should geo search with options", func() {
q := &redis.GeoSearchLocationQuery{
GeoSearchQuery: redis.GeoSearchQuery{
Longitude: 15,
Latitude: 37,
Radius: 200,
RadiusUnit: "km",
Sort: "asc",
},
WithHash: true,
WithDist: true,
WithCoord: true,
}
val, err := client.GeoSearchLocation(ctx, "Sicily", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal([]redis.GeoLocation{
{
Name: "Catania",
Longitude: 15.08726745843887329,
Latitude: 37.50266842333162032,
Dist: 56.4413,
GeoHash: 3479447370796909,
},
{
Name: "Palermo",
Longitude: 13.36138933897018433,
Latitude: 38.11555639549629859,
Dist: 190.4424,
GeoHash: 3479099956230698,
},
}))
})
It("should geo search store", func() {
q := &redis.GeoSearchStoreQuery{
GeoSearchQuery: redis.GeoSearchQuery{
Longitude: 15,
Latitude: 37,
Radius: 200,
RadiusUnit: "km",
Sort: "asc",
},
StoreDist: false,
}
val, err := client.GeoSearchStore(ctx, "Sicily", "key1", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal(int64(2)))
q.StoreDist = true
val, err = client.GeoSearchStore(ctx, "Sicily", "key2", q).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal(int64(2)))
loc, err := client.GeoSearchLocation(ctx, "key1", &redis.GeoSearchLocationQuery{
GeoSearchQuery: q.GeoSearchQuery,
WithCoord: true,
WithDist: true,
WithHash: true,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(loc).To(Equal([]redis.GeoLocation{
{
Name: "Catania",
Longitude: 15.08726745843887329,
Latitude: 37.50266842333162032,
Dist: 56.4413,
GeoHash: 3479447370796909,
},
{
Name: "Palermo",
Longitude: 13.36138933897018433,
Latitude: 38.11555639549629859,
Dist: 190.4424,
GeoHash: 3479099956230698,
},
}))
v, err := client.ZRangeWithScores(ctx, "key2", 0, -1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal([]redis.Z{
{
Score: 56.441257870158204,
Member: "Catania",
},
{
Score: 190.44242984775784,
Member: "Palermo",
},
}))
})
}) })
Describe("marshaling/unmarshaling", func() { Describe("marshaling/unmarshaling", func() {

View File

@ -60,21 +60,21 @@ func (c *ClusterClient) SwapNodes(ctx context.Context, key string) error {
return nil return nil
} }
func (state *clusterState) IsConsistent(ctx context.Context) bool { func (c *clusterState) IsConsistent(ctx context.Context) bool {
if len(state.Masters) < 3 { if len(c.Masters) < 3 {
return false return false
} }
for _, master := range state.Masters { for _, master := range c.Masters {
s := master.Client.Info(ctx, "replication").Val() s := master.Client.Info(ctx, "replication").Val()
if !strings.Contains(s, "role:master") { if !strings.Contains(s, "role:master") {
return false return false
} }
} }
if len(state.Slaves) < 3 { if len(c.Slaves) < 3 {
return false return false
} }
for _, slave := range state.Slaves { for _, slave := range c.Slaves {
s := slave.Client.Info(ctx, "replication").Val() s := slave.Client.Info(ctx, "replication").Val()
if !strings.Contains(s, "role:slave") { if !strings.Contains(s, "role:slave") {
return false return false

View File

@ -13,7 +13,7 @@ import (
type writer interface { type writer interface {
io.Writer io.Writer
io.ByteWriter io.ByteWriter
// io.StringWriter // WriteString implement io.StringWriter.
WriteString(s string) (n int, err error) WriteString(s string) (n int, err error)
} }

View File

@ -247,7 +247,10 @@ func setupTCPConn(u *url.URL) (*Options, error) {
} }
if u.Scheme == "rediss" { if u.Scheme == "rediss" {
o.TLSConfig = &tls.Config{ServerName: h} o.TLSConfig = &tls.Config{
ServerName: h,
MinVersion: tls.VersionTLS12,
}
} }
return o, nil return o, nil

View File

@ -3,8 +3,6 @@ package redis
import ( import (
"context" "context"
"sync" "sync"
"github.com/go-redis/redis/v8/internal/pool"
) )
type pipelineExecer func(context.Context, []Cmder) error type pipelineExecer func(context.Context, []Cmder) error
@ -26,8 +24,7 @@ type Pipeliner interface {
StatefulCmdable StatefulCmdable
Do(ctx context.Context, args ...interface{}) *Cmd Do(ctx context.Context, args ...interface{}) *Cmd
Process(ctx context.Context, cmd Cmder) error Process(ctx context.Context, cmd Cmder) error
Close() error Discard()
Discard() error
Exec(ctx context.Context) ([]Cmder, error) Exec(ctx context.Context) ([]Cmder, error)
} }
@ -45,7 +42,6 @@ type Pipeline struct {
mu sync.Mutex mu sync.Mutex
cmds []Cmder cmds []Cmder
closed bool
} }
func (c *Pipeline) init() { func (c *Pipeline) init() {
@ -67,29 +63,11 @@ func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
return nil return nil
} }
// Close closes the pipeline, releasing any open resources.
func (c *Pipeline) Close() error {
c.mu.Lock()
_ = c.discard()
c.closed = true
c.mu.Unlock()
return nil
}
// Discard resets the pipeline and discards queued commands. // Discard resets the pipeline and discards queued commands.
func (c *Pipeline) Discard() error { func (c *Pipeline) Discard() {
c.mu.Lock() c.mu.Lock()
err := c.discard()
c.mu.Unlock()
return err
}
func (c *Pipeline) discard() error {
if c.closed {
return pool.ErrClosed
}
c.cmds = c.cmds[:0] c.cmds = c.cmds[:0]
return nil c.mu.Unlock()
} }
// Exec executes all previously queued commands using one // Exec executes all previously queued commands using one
@ -101,10 +79,6 @@ func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.closed {
return nil, pool.ErrClosed
}
if len(c.cmds) == 0 { if len(c.cmds) == 0 {
return nil, nil return nil, nil
} }
@ -119,9 +93,7 @@ func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]C
if err := fn(c); err != nil { if err := fn(c); err != nil {
return nil, err return nil, err
} }
cmds, err := c.Exec(ctx) return c.Exec(ctx)
_ = c.Close()
return cmds, err
} }
func (c *Pipeline) Pipeline() Pipeliner { func (c *Pipeline) Pipeline() Pipeliner {

View File

@ -72,7 +72,6 @@ var _ = Describe("pool", func() {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
Expect(ping.Err()).NotTo(HaveOccurred()) Expect(ping.Err()).NotTo(HaveOccurred())
Expect(ping.Val()).To(Equal("PONG")) Expect(ping.Val()).To(Equal("PONG"))
Expect(pipe.Close()).NotTo(HaveOccurred())
}) })
pool := client.Pool() pool := client.Pool()

View File

@ -136,17 +136,6 @@ var _ = Describe("Client", func() {
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred()) Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
}) })
It("should close pipeline without closing the client", func() {
pipeline := client.Pipeline()
Expect(pipeline.Close()).NotTo(HaveOccurred())
pipeline.Ping(ctx)
_, err := pipeline.Exec(ctx)
Expect(err).To(MatchError("redis: client is closed"))
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})
It("should close pubsub when client is closed", func() { It("should close pubsub when client is closed", func() {
pubsub := client.Subscribe(ctx) pubsub := client.Subscribe(ctx)
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
@ -157,12 +146,6 @@ var _ = Describe("Client", func() {
Expect(pubsub.Close()).NotTo(HaveOccurred()) Expect(pubsub.Close()).NotTo(HaveOccurred())
}) })
It("should close pipeline when client is closed", func() {
pipeline := client.Pipeline()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pipeline.Close()).NotTo(HaveOccurred())
})
It("should select DB", func() { It("should select DB", func() {
db2 := redis.NewClient(&redis.Options{ db2 := redis.NewClient(&redis.Options{
Addr: redisAddr, Addr: redisAddr,

View File

@ -308,7 +308,7 @@ func (c *ringShards) Random() (*ringShard, error) {
return c.GetByKey(strconv.Itoa(rand.Int())) return c.GetByKey(strconv.Itoa(rand.Int()))
} }
// heartbeat monitors state of each shard in the ring. // Heartbeat monitors state of each shard in the ring.
func (c *ringShards) Heartbeat(frequency time.Duration) { func (c *ringShards) Heartbeat(frequency time.Duration) {
ticker := time.NewTicker(frequency) ticker := time.NewTicker(frequency)
defer ticker.Stop() defer ticker.Stop()

View File

@ -123,7 +123,6 @@ var _ = Describe("Redis Ring", func() {
cmds, err := pipe.Exec(ctx) cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(100)) Expect(cmds).To(HaveLen(100))
Expect(pipe.Close()).NotTo(HaveOccurred())
for _, cmd := range cmds { for _, cmd := range cmds {
Expect(cmd.Err()).NotTo(HaveOccurred()) Expect(cmd.Err()).NotTo(HaveOccurred())