diff --git a/cluster.go b/cluster.go index 43498ee9..178db178 100644 --- a/cluster.go +++ b/cluster.go @@ -256,29 +256,13 @@ type ClusterOptions struct { // giving up. Default: 16 MaxRedirects int - // The maximum number of TCP sockets per connection. Default: 5 - PoolSize int - - // Timeout settings - DialTimeout, ReadTimeout, WriteTimeout, IdleTimeout time.Duration -} - -func (opt *ClusterOptions) getPoolSize() int { - if opt.PoolSize < 1 { - return 5 - } - return opt.PoolSize -} - -func (opt *ClusterOptions) getDialTimeout() time.Duration { - if opt.DialTimeout == 0 { - return 5 * time.Second - } - return opt.DialTimeout + // Following options are copied from `redis.Options`. + PoolSize int + DialTimeout, ReadTimeout, WriteTimeout, PoolTimeout, IdleTimeout time.Duration } func (opt *ClusterOptions) getMaxRedirects() int { - if opt.MaxRedirects < 1 { + if opt.MaxRedirects == 0 { return 16 } return opt.MaxRedirects @@ -289,11 +273,12 @@ func (opt *ClusterOptions) clientOptions() *Options { DB: 0, Password: opt.Password, - DialTimeout: opt.getDialTimeout(), + DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolSize: opt.getPoolSize(), + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, } } diff --git a/pool.go b/pool.go index 7681836b..764777b7 100644 --- a/pool.go +++ b/pool.go @@ -133,7 +133,7 @@ func (p *connPool) First() *conn { select { case cn := <-p.freeConns: if cn.isIdle(p.opt.IdleTimeout) { - p.remove(cn) + p.Remove(cn) continue } return cn @@ -151,7 +151,7 @@ func (p *connPool) wait(timeout time.Duration) *conn { select { case cn := <-p.freeConns: if cn.isIdle(p.opt.IdleTimeout) { - p.remove(cn) + p.Remove(cn) continue } return cn @@ -215,10 +215,6 @@ func (p *connPool) Put(cn *conn) error { log.Printf("redis: connection has unread data: %q", b) return p.Remove(cn) } - - if p.isClosed() { - return errClosed - } if p.opt.IdleTimeout > 0 { cn.usedAt = time.Now() } @@ -228,13 +224,18 @@ func (p *connPool) Put(cn *conn) error { func (p *connPool) Remove(cn *conn) error { if p.isClosed() { - return nil + atomic.AddInt32(&p.size, -1) + return cn.Close() + } + + // Replace existing connection with new one and unblock `wait`. + newcn, err := p.new() + if err != nil { + atomic.AddInt32(&p.size, -1) + } else { + p.Put(newcn) } - return p.remove(cn) -} -func (p *connPool) remove(cn *conn) error { - atomic.AddInt32(&p.size, -1) return cn.Close() } @@ -259,7 +260,7 @@ func (p *connPool) Close() (retErr error) { if cn == nil { break } - if err := p.remove(cn); err != nil { + if err := p.Remove(cn); err != nil { retErr = err } } diff --git a/pool_test.go b/pool_test.go index 2960daad..310730b2 100644 --- a/pool_test.go +++ b/pool_test.go @@ -7,11 +7,13 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "gopkg.in/redis.v2" ) var _ = Describe("Pool", func() { var client *redis.Client + var perform = func(n int, cb func()) { wg := &sync.WaitGroup{} for i := 0; i < n; i++ { @@ -27,13 +29,14 @@ var _ = Describe("Pool", func() { } BeforeEach(func() { - client = redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, + client = redis.NewClient(&redis.Options{ + Addr: redisAddr, + PoolSize: 10, }) }) AfterEach(func() { - client.FlushDb() + Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -98,8 +101,8 @@ var _ = Describe("Pool", func() { }) pool := client.Pool() - Expect(pool.Size()).To(Equal(0)) - Expect(pool.Len()).To(Equal(0)) + Expect(pool.Size()).To(Equal(10)) + Expect(pool.Len()).To(Equal(10)) }) It("should remove broken connections", func() { @@ -133,6 +136,48 @@ var _ = Describe("Pool", func() { Expect(pool.Len()).To(Equal(1)) }) + It("should unblock client when connection is removed", func() { + pool := client.Pool() + + // Reserve one connection. + cn, _, err := client.Pool().Get() + Expect(err).NotTo(HaveOccurred()) + + // Reserve the rest of connections. + for i := 0; i < 9; i++ { + _, _, err := client.Pool().Get() + Expect(err).NotTo(HaveOccurred()) + } + + var ping *redis.StatusCmd + started := make(chan bool, 1) + done := make(chan bool, 1) + go func() { + started <- true + ping = client.Ping() + done <- true + }() + <-started + + // Check that Ping is blocked. + select { + case <-done: + panic("Ping is not blocked") + default: + // ok + } + + Expect(pool.Remove(cn)).NotTo(HaveOccurred()) + + // Check that Ping is unblocked. + select { + case <-done: + // ok + case <-time.After(time.Second): + panic("Ping is not unblocked") + } + Expect(ping.Err()).NotTo(HaveOccurred()) + }) }) func BenchmarkPool(b *testing.B) { diff --git a/redis.go b/redis.go index b8fd3e82..b5e335d2 100644 --- a/redis.go +++ b/redis.go @@ -160,10 +160,8 @@ type Options struct { // The maximum number of socket connections. // Default: 10 PoolSize int - // If all socket connections is the pool are busy, the pool will wait - // this amount of time for a conection to become available, before - // returning an error. - // Default: 5s + // PoolTimeout specifies amount of time client waits for a free + // connection in the pool. Default timeout is 1s. PoolTimeout time.Duration // Evict connections from the pool after they have been idle for longer // than specified in this option. @@ -194,7 +192,7 @@ func (opt *Options) getDialTimeout() time.Duration { func (opt *Options) getPoolTimeout() time.Duration { if opt.PoolTimeout == 0 { - return 5 * time.Second + return 1 * time.Second } return opt.PoolTimeout } diff --git a/redis_test.go b/redis_test.go index b4b9195e..09b7e79e 100644 --- a/redis_test.go +++ b/redis_test.go @@ -92,7 +92,7 @@ var _ = Describe("Client", func() { It("should support idle-timeouts", func() { idle := redis.NewTCPClient(&redis.Options{ Addr: redisAddr, - IdleTimeout: time.Nanosecond, + IdleTimeout: 100 * time.Microsecond, }) defer idle.Close()