diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..77b7be5 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: +- package-ecosystem: gomod + directory: / + schedule: + interval: weekly +- package-ecosystem: github-actions + directory: / + schedule: + interval: weekly diff --git a/README.md b/README.md index bc4624e..2c36f78 100644 --- a/README.md +++ b/README.md @@ -166,3 +166,11 @@ Lastly, run: ``` go test ``` + +## Contributors + +Thanks to all the people who already contributed! + + + + diff --git a/cluster.go b/cluster.go index 95d6585..d85c043 100644 --- a/cluster.go +++ b/cluster.go @@ -68,6 +68,9 @@ type ClusterOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration + // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). + PoolFIFO bool + // PoolSize applies per cluster node and not for the whole cluster. PoolSize int MinIdleConns int @@ -146,6 +149,7 @@ func (opt *ClusterOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, + PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, @@ -591,8 +595,16 @@ func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) { if len(nodes) == 0 { return c.nodes.Random() } - n := rand.Intn(len(nodes)) - return nodes[n], nil + if len(nodes) == 1 { + return nodes[0], nil + } + randomNodes := rand.Perm(len(nodes)) + for _, idx := range randomNodes { + if node := nodes[idx]; !node.Failing() { + return node, nil + } + } + return nodes[randomNodes[0]], nil } func (c *clusterState) slotNodes(slot int) []*clusterNode { diff --git a/cluster_commands.go b/cluster_commands.go index 336ea98..085bce8 100644 --- a/cluster_commands.go +++ b/cluster_commands.go @@ -8,55 +8,63 @@ import ( func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd { cmd := NewIntCmd(ctx, "dbsize") - var size int64 - err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error { - n, err := master.DBSize(ctx).Result() + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + var size int64 + err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error { + n, err := master.DBSize(ctx).Result() + if err != nil { + return err + } + atomic.AddInt64(&size, n) + return nil + }) if err != nil { - return err + cmd.SetErr(err) + } else { + cmd.val = size } - atomic.AddInt64(&size, n) return nil }) - if err != nil { - cmd.SetErr(err) - return cmd - } - cmd.val = size return cmd } func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd { cmd := NewStringCmd(ctx, "script", "load", script) - mu := &sync.Mutex{} - err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { - val, err := shard.ScriptLoad(ctx, script).Result() + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + mu := &sync.Mutex{} + err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { + val, err := shard.ScriptLoad(ctx, script).Result() + if err != nil { + return err + } + + mu.Lock() + if cmd.Val() == "" { + cmd.val = val + } + mu.Unlock() + + return nil + }) if err != nil { - return err + cmd.SetErr(err) } - - mu.Lock() - if cmd.Val() == "" { - cmd.val = val - } - mu.Unlock() - return nil }) - if err != nil { - cmd.SetErr(err) - } - return cmd } func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd { cmd := NewStatusCmd(ctx, "script", "flush") - _ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { - shard.ScriptFlush(ctx) - + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { + return shard.ScriptFlush(ctx).Err() + }) + if err != nil { + cmd.SetErr(err) + } return nil }) - return cmd } @@ -74,26 +82,28 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo result[i] = true } - mu := &sync.Mutex{} - err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { - val, err := shard.ScriptExists(ctx, hashes...).Result() + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + mu := &sync.Mutex{} + err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { + val, err := shard.ScriptExists(ctx, hashes...).Result() + if err != nil { + return err + } + + mu.Lock() + for i, v := range val { + result[i] = result[i] && v + } + mu.Unlock() + + return nil + }) if err != nil { - return err + cmd.SetErr(err) + } else { + cmd.val = result } - - mu.Lock() - for i, v := range val { - result[i] = result[i] && v - } - mu.Unlock() - return nil }) - if err != nil { - cmd.SetErr(err) - } - - cmd.val = result - return cmd } diff --git a/commands.go b/commands.go index 57ff0d0..346e38e 100644 --- a/commands.go +++ b/commands.go @@ -190,6 +190,7 @@ type Cmdable interface { LSet(ctx context.Context, key string, index int64, value interface{}) *StatusCmd LTrim(ctx context.Context, key string, start, stop int64) *StatusCmd RPop(ctx context.Context, key string) *StringCmd + RPopCount(ctx context.Context, key string, count int) *StringSliceCmd RPopLPush(ctx context.Context, source, destination string) *StringCmd RPush(ctx context.Context, key string, values ...interface{}) *IntCmd RPushX(ctx context.Context, key string, values ...interface{}) *IntCmd @@ -1452,6 +1453,12 @@ func (c cmdable) RPop(ctx context.Context, key string) *StringCmd { return cmd } +func (c cmdable) RPopCount(ctx context.Context, key string, count int) *StringSliceCmd { + cmd := NewStringSliceCmd(ctx, "rpop", key, count) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) RPopLPush(ctx context.Context, source, destination string) *StringCmd { cmd := NewStringCmd(ctx, "rpoplpush", source, destination) _ = c(ctx, cmd) diff --git a/commands_test.go b/commands_test.go index 66f69a5..fb9bbfc 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2282,6 +2282,20 @@ var _ = Describe("Commands", func() { Expect(lRange.Val()).To(Equal([]string{"one", "two"})) }) + It("should RPopCount", func() { + rPush := client.RPush(ctx, "list", "one", "two", "three", "four") + Expect(rPush.Err()).NotTo(HaveOccurred()) + Expect(rPush.Val()).To(Equal(int64(4))) + + rPopCount := client.RPopCount(ctx, "list", 2) + Expect(rPopCount.Err()).NotTo(HaveOccurred()) + Expect(rPopCount.Val()).To(Equal([]string{"four", "three"})) + + lRange := client.LRange(ctx, "list", 0, -1) + Expect(lRange.Err()).NotTo(HaveOccurred()) + Expect(lRange.Val()).To(Equal([]string{"one", "two"})) + }) + It("should RPopLPush", func() { rPush := client.RPush(ctx, "list", "one") Expect(rPush.Err()).NotTo(HaveOccurred()) @@ -4113,6 +4127,45 @@ var _ = Describe("Commands", func() { })) }) + It("should ZUnion", func() { + err := client.ZAddArgs(ctx, "zset1", redis.ZAddArgs{ + Members: []redis.Z{ + {Score: 1, Member: "one"}, + {Score: 2, Member: "two"}, + }, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.ZAddArgs(ctx, "zset2", redis.ZAddArgs{ + Members: []redis.Z{ + {Score: 1, Member: "one"}, + {Score: 2, Member: "two"}, + {Score: 3, Member: "three"}, + }, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + union, err := client.ZUnion(ctx, redis.ZStore{ + Keys: []string{"zset1", "zset2"}, + Weights: []float64{2, 3}, + Aggregate: "sum", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(union).To(Equal([]string{"one", "three", "two"})) + + unionScores, err := client.ZUnionWithScores(ctx, redis.ZStore{ + Keys: []string{"zset1", "zset2"}, + Weights: []float64{2, 3}, + Aggregate: "sum", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(unionScores).To(Equal([]redis.Z{ + {Score: 5, Member: "one"}, + {Score: 9, Member: "three"}, + {Score: 10, Member: "two"}, + })) + }) + It("should ZUnionStore", func() { err := client.ZAdd(ctx, "zset1", redis.Z{Score: 1, Member: "one"}).Err() Expect(err).NotTo(HaveOccurred()) @@ -4339,6 +4392,33 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(3))) }) + // TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter. + // TODO Don't test it for now. + // TODO link: https://github.com/redis/redis/issues/9046 + It("should XTrimMaxLen", func() { + n, err := client.XTrimMaxLen(ctx, "stream", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimMaxLenApprox", func() { + n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimMinID", func() { + n, err := client.XTrimMinID(ctx, "stream", "4-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimMinIDApprox", func() { + n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + It("should XAdd", func() { id, err := client.XAdd(ctx, &redis.XAddArgs{ Stream: "stream", diff --git a/example_test.go b/example_test.go index 7e04275..53bd90f 100644 --- a/example_test.go +++ b/example_test.go @@ -417,7 +417,7 @@ func ExampleClient_Watch() { // Actual opperation (local in optimistic lock). n++ - // Operation is commited only if the watched keys remain unchanged. + // Operation is committed only if the watched keys remain unchanged. _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { pipe.Set(ctx, key, n, 0) return nil diff --git a/internal/pool/conncheck.go b/internal/pool/conncheck.go new file mode 100644 index 0000000..5dd60df --- /dev/null +++ b/internal/pool/conncheck.go @@ -0,0 +1,45 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos + +package pool + +import ( + "errors" + "io" + "net" + "syscall" +) + +var errUnexpectedRead = errors.New("unexpected read from socket") + +func connCheck(conn net.Conn) error { + sysConn, ok := conn.(syscall.Conn) + if !ok { + return nil + } + rawConn, err := sysConn.SyscallConn() + if err != nil { + return err + } + + var sysErr error + err = rawConn.Read(func(fd uintptr) bool { + var buf [1]byte + n, err := syscall.Read(int(fd), buf[:]) + switch { + case n == 0 && err == nil: + sysErr = io.EOF + case n > 0: + sysErr = errUnexpectedRead + case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK: + sysErr = nil + default: + sysErr = err + } + return true + }) + if err != nil { + return err + } + + return sysErr +} diff --git a/internal/pool/conncheck_dummy.go b/internal/pool/conncheck_dummy.go new file mode 100644 index 0000000..1daf986 --- /dev/null +++ b/internal/pool/conncheck_dummy.go @@ -0,0 +1,9 @@ +// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos + +package pool + +import "net" + +func connCheck(conn net.Conn) error { + return nil +} diff --git a/internal/pool/conncheck_test.go b/internal/pool/conncheck_test.go new file mode 100644 index 0000000..0332174 --- /dev/null +++ b/internal/pool/conncheck_test.go @@ -0,0 +1,46 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos + +package pool + +import ( + "net" + "net/http/httptest" + "testing" + "time" +) + +func Test_connCheck(t *testing.T) { + // tests with real conns + ts := httptest.NewServer(nil) + defer ts.Close() + + t.Run("good conn", func(t *testing.T) { + conn, err := net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second) + if err != nil { + t.Fatalf(err.Error()) + } + defer conn.Close() + if err = connCheck(conn); err != nil { + t.Fatalf(err.Error()) + } + conn.Close() + + if err = connCheck(conn); err == nil { + t.Fatalf("expect has error") + } + }) + + t.Run("bad conn 2", func(t *testing.T) { + conn, err := net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second) + if err != nil { + t.Fatalf(err.Error()) + } + defer conn.Close() + + ts.Close() + + if err = connCheck(conn); err == nil { + t.Fatalf("expect has err") + } + }) +} diff --git a/internal/pool/main_test.go b/internal/pool/main_test.go index 2365dbc..c54a38d 100644 --- a/internal/pool/main_test.go +++ b/internal/pool/main_test.go @@ -2,9 +2,12 @@ package pool_test import ( "context" + "fmt" "net" "sync" + "syscall" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -32,5 +35,87 @@ func perform(n int, cbs ...func(int)) { } func dummyDialer(context.Context) (net.Conn, error) { - return &net.TCPConn{}, nil + // return &net.TCPConn{}, nil + return newDummyConn(), nil +} + +func newDummyConn() net.Conn { + return &dummyConn{ + rawConn: &dummyRawConn{}, + } +} + +var _ net.Conn = (*dummyConn)(nil) +var _ syscall.Conn = (*dummyConn)(nil) + +type dummyConn struct { + rawConn *dummyRawConn +} + +func (d *dummyConn) SyscallConn() (syscall.RawConn, error) { + return d.rawConn, nil +} + +var errDummy = fmt.Errorf("dummyConn err") + +func (d *dummyConn) Read(b []byte) (n int, err error) { + return 0, errDummy +} + +func (d *dummyConn) Write(b []byte) (n int, err error) { + return 0, errDummy +} + +func (d *dummyConn) Close() error { + d.rawConn.Close() + return nil +} + +func (d *dummyConn) LocalAddr() net.Addr { + return &net.TCPAddr{} +} + +func (d *dummyConn) RemoteAddr() net.Addr { + return &net.TCPAddr{} +} + +func (d *dummyConn) SetDeadline(t time.Time) error { + return nil +} + +func (d *dummyConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (d *dummyConn) SetWriteDeadline(t time.Time) error { + return nil +} + +var _ syscall.RawConn = (*dummyRawConn)(nil) + +type dummyRawConn struct { + closed bool + mux sync.Mutex +} + +func (d *dummyRawConn) Control(f func(fd uintptr)) error { + return nil +} + +func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error { + d.mux.Lock() + defer d.mux.Unlock() + if d.closed { + return fmt.Errorf("dummyRawConn closed") + } + return nil +} + +func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error { + return nil +} +func (d *dummyRawConn) Close() { + d.mux.Lock() + d.closed = true + d.mux.Unlock() } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index a8f0a6d..577923a 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -57,6 +57,7 @@ type Options struct { Dialer func(context.Context) (net.Conn, error) OnClose func(*Conn) error + PoolFIFO bool PoolSize int MinIdleConns int MaxConnAge time.Duration @@ -308,13 +309,21 @@ func (p *ConnPool) freeTurn() { } func (p *ConnPool) popIdle() *Conn { - if len(p.idleConns) == 0 { + n := len(p.idleConns) + if n == 0 { return nil } - idx := len(p.idleConns) - 1 - cn := p.idleConns[idx] - p.idleConns = p.idleConns[:idx] + var cn *Conn + if p.opt.PoolFIFO { + cn = p.idleConns[0] + copy(p.idleConns, p.idleConns[1:]) + p.idleConns = p.idleConns[:n-1] + } else { + idx := n - 1 + cn = p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + } p.idleConnsLen-- p.checkMinIdleConns() return cn @@ -511,7 +520,7 @@ func (p *ConnPool) reapStaleConn() *Conn { func (p *ConnPool) isStaleConn(cn *Conn) bool { if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { - return false + return connCheck(cn.netConn) != nil } now := time.Now() @@ -522,5 +531,5 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool { return true } - return false + return connCheck(cn.netConn) != nil } diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 795aef3..6c94fc2 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -6,10 +6,10 @@ import ( "testing" "time" - "github.com/go-redis/redis/v8/internal/pool" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "github.com/go-redis/redis/v8/internal/pool" ) var _ = Describe("ConnPool", func() { @@ -285,6 +285,8 @@ var _ = Describe("conns reaper", func() { cn.SetUsedAt(time.Now().Add(-2 * idleTimeout)) case "aged": cn.SetCreatedAt(time.Now().Add(-2 * maxAge)) + case "conncheck": + cn.Close() } conns = append(conns, cn) staleConns = append(staleConns, cn) @@ -371,6 +373,7 @@ var _ = Describe("conns reaper", func() { assert("idle") assert("aged") + assert("conncheck") }) var _ = Describe("race", func() { diff --git a/main_test.go b/main_test.go index 0cb2b1d..dd9d874 100644 --- a/main_test.go +++ b/main_test.go @@ -12,10 +12,10 @@ import ( "testing" "time" - "github.com/go-redis/redis/v8" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "github.com/go-redis/redis/v8" ) const ( @@ -117,7 +117,7 @@ func TestGinkgoSuite(t *testing.T) { RunSpecs(t, "go-redis") } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ func redisOptions() *redis.Options { return &redis.Options{ @@ -364,7 +364,7 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { return p, nil } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ type badConnError string @@ -409,7 +409,7 @@ func (cn *badConn) Write([]byte) (int, error) { return 0, badConnError("bad connection") } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ type hook struct { beforeProcess func(ctx context.Context, cmd redis.Cmder) (context.Context, error) diff --git a/options.go b/options.go index 8bbc27b..5d39bf0 100644 --- a/options.go +++ b/options.go @@ -76,6 +76,10 @@ type Options struct { // Default is ReadTimeout. WriteTimeout time.Duration + // Type of connection pool. + // true for FIFO pool, false for LIFO pool. + // Note that fifo has higher overhead compared to lifo. + PoolFIFO bool // Maximum number of socket connections. // Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS. PoolSize int @@ -291,6 +295,7 @@ func newConnPool(opt *Options) *pool.ConnPool { Dialer: func(ctx context.Context) (net.Conn, error) { return opt.Dialer(ctx, opt.Network, opt.Addr) }, + PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, diff --git a/pool_test.go b/pool_test.go index 08acc6d..8131819 100644 --- a/pool_test.go +++ b/pool_test.go @@ -87,8 +87,9 @@ var _ = Describe("pool", func() { cn.SetNetConn(&badConn{}) client.Pool().Put(ctx, cn) + // connCheck will automatically remove damaged connections. err = client.Ping(ctx).Err() - Expect(err).To(MatchError("bad connection")) + Expect(err).NotTo(HaveOccurred()) val, err := client.Ping(ctx).Result() Expect(err).NotTo(HaveOccurred()) diff --git a/ring.go b/ring.go index 34d05f3..5e35b6b 100644 --- a/ring.go +++ b/ring.go @@ -78,6 +78,9 @@ type RingOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration + // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). + PoolFIFO bool + PoolSize int MinIdleConns int MaxConnAge time.Duration @@ -138,6 +141,7 @@ func (opt *RingOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, + PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, diff --git a/sentinel.go b/sentinel.go index 15f3366..361f229 100644 --- a/sentinel.go +++ b/sentinel.go @@ -57,6 +57,9 @@ type FailoverOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration + // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). + PoolFIFO bool + PoolSize int MinIdleConns int MaxConnAge time.Duration @@ -86,6 +89,7 @@ func (opt *FailoverOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, + PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, @@ -115,6 +119,7 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, + PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, @@ -146,6 +151,7 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, + PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, diff --git a/sentinel_test.go b/sentinel_test.go index f5cfa3d..12e734d 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -192,7 +192,7 @@ var _ = Describe("NewFailoverClusterClient", func() { err = master.Shutdown(ctx).Err() Expect(err).NotTo(HaveOccurred()) Eventually(func() error { - return sentinelMaster.Ping(ctx).Err() + return master.Ping(ctx).Err() }, "15s", "100ms").Should(HaveOccurred()) // Check that client picked up new master. diff --git a/tx_test.go b/tx_test.go index 4681122..11e5b0d 100644 --- a/tx_test.go +++ b/tx_test.go @@ -123,7 +123,7 @@ var _ = Describe("Tx", func() { Expect(num).To(Equal(int64(N))) }) - It("should recover from bad connection", func() { + It("should remove from bad connection", func() { // Put bad connection in the pool. cn, err := client.Pool().Get(context.Background()) Expect(err).NotTo(HaveOccurred()) @@ -134,17 +134,14 @@ var _ = Describe("Tx", func() { do := func() error { err := client.Watch(ctx, func(tx *redis.Tx) error { _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - pipe.Ping(ctx) - return nil + return pipe.Ping(ctx).Err() }) return err }) return err } - err = do() - Expect(err).To(MatchError("bad connection")) - + // connCheck will automatically remove damaged connections. err = do() Expect(err).NotTo(HaveOccurred()) }) diff --git a/universal.go b/universal.go index bb5f8b6..1e962ab 100644 --- a/universal.go +++ b/universal.go @@ -35,6 +35,9 @@ type UniversalOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration + // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). + PoolFIFO bool + PoolSize int MinIdleConns int MaxConnAge time.Duration @@ -83,6 +86,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions { DialTimeout: o.DialTimeout, ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, + PoolFIFO: o.PoolFIFO, PoolSize: o.PoolSize, MinIdleConns: o.MinIdleConns, MaxConnAge: o.MaxConnAge, @@ -120,6 +124,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions { ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, + PoolFIFO: o.PoolFIFO, PoolSize: o.PoolSize, MinIdleConns: o.MinIdleConns, MaxConnAge: o.MaxConnAge, @@ -155,6 +160,7 @@ func (o *UniversalOptions) Simple() *Options { ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, + PoolFIFO: o.PoolFIFO, PoolSize: o.PoolSize, MinIdleConns: o.MinIdleConns, MaxConnAge: o.MaxConnAge,