Fix pool waiting for connection that was removed.

This commit is contained in:
Vladimir Mihailenco 2015-04-17 14:44:56 +03:00
parent 6d8aaa46d2
commit d3fb658fef
5 changed files with 74 additions and 45 deletions

View File

@ -256,29 +256,13 @@ type ClusterOptions struct {
// giving up. Default: 16 // giving up. Default: 16
MaxRedirects int MaxRedirects int
// The maximum number of TCP sockets per connection. Default: 5 // Following options are copied from `redis.Options`.
PoolSize int PoolSize int
DialTimeout, ReadTimeout, WriteTimeout, PoolTimeout, IdleTimeout time.Duration
// Timeout settings
DialTimeout, ReadTimeout, WriteTimeout, IdleTimeout time.Duration
}
func (opt *ClusterOptions) getPoolSize() int {
if opt.PoolSize < 1 {
return 5
}
return opt.PoolSize
}
func (opt *ClusterOptions) getDialTimeout() time.Duration {
if opt.DialTimeout == 0 {
return 5 * time.Second
}
return opt.DialTimeout
} }
func (opt *ClusterOptions) getMaxRedirects() int { func (opt *ClusterOptions) getMaxRedirects() int {
if opt.MaxRedirects < 1 { if opt.MaxRedirects == 0 {
return 16 return 16
} }
return opt.MaxRedirects return opt.MaxRedirects
@ -289,11 +273,12 @@ func (opt *ClusterOptions) clientOptions() *Options {
DB: 0, DB: 0,
Password: opt.Password, Password: opt.Password,
DialTimeout: opt.getDialTimeout(), DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout, ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout, WriteTimeout: opt.WriteTimeout,
PoolSize: opt.getPoolSize(), PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
} }
} }

23
pool.go
View File

@ -133,7 +133,7 @@ func (p *connPool) First() *conn {
select { select {
case cn := <-p.freeConns: case cn := <-p.freeConns:
if cn.isIdle(p.opt.IdleTimeout) { if cn.isIdle(p.opt.IdleTimeout) {
p.remove(cn) p.Remove(cn)
continue continue
} }
return cn return cn
@ -151,7 +151,7 @@ func (p *connPool) wait(timeout time.Duration) *conn {
select { select {
case cn := <-p.freeConns: case cn := <-p.freeConns:
if cn.isIdle(p.opt.IdleTimeout) { if cn.isIdle(p.opt.IdleTimeout) {
p.remove(cn) p.Remove(cn)
continue continue
} }
return cn return cn
@ -215,10 +215,6 @@ func (p *connPool) Put(cn *conn) error {
log.Printf("redis: connection has unread data: %q", b) log.Printf("redis: connection has unread data: %q", b)
return p.Remove(cn) return p.Remove(cn)
} }
if p.isClosed() {
return errClosed
}
if p.opt.IdleTimeout > 0 { if p.opt.IdleTimeout > 0 {
cn.usedAt = time.Now() cn.usedAt = time.Now()
} }
@ -228,13 +224,18 @@ func (p *connPool) Put(cn *conn) error {
func (p *connPool) Remove(cn *conn) error { func (p *connPool) Remove(cn *conn) error {
if p.isClosed() { if p.isClosed() {
return nil atomic.AddInt32(&p.size, -1)
} return cn.Close()
return p.remove(cn)
} }
func (p *connPool) remove(cn *conn) error { // Replace existing connection with new one and unblock `wait`.
newcn, err := p.new()
if err != nil {
atomic.AddInt32(&p.size, -1) atomic.AddInt32(&p.size, -1)
} else {
p.Put(newcn)
}
return cn.Close() return cn.Close()
} }
@ -259,7 +260,7 @@ func (p *connPool) Close() (retErr error) {
if cn == nil { if cn == nil {
break break
} }
if err := p.remove(cn); err != nil { if err := p.Remove(cn); err != nil {
retErr = err retErr = err
} }
} }

View File

@ -7,11 +7,13 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"gopkg.in/redis.v2" "gopkg.in/redis.v2"
) )
var _ = Describe("Pool", func() { var _ = Describe("Pool", func() {
var client *redis.Client var client *redis.Client
var perform = func(n int, cb func()) { var perform = func(n int, cb func()) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -27,13 +29,14 @@ var _ = Describe("Pool", func() {
} }
BeforeEach(func() { BeforeEach(func() {
client = redis.NewTCPClient(&redis.Options{ client = redis.NewClient(&redis.Options{
Addr: redisAddr, Addr: redisAddr,
PoolSize: 10,
}) })
}) })
AfterEach(func() { AfterEach(func() {
client.FlushDb() Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
@ -98,8 +101,8 @@ var _ = Describe("Pool", func() {
}) })
pool := client.Pool() pool := client.Pool()
Expect(pool.Size()).To(Equal(0)) Expect(pool.Size()).To(Equal(10))
Expect(pool.Len()).To(Equal(0)) Expect(pool.Len()).To(Equal(10))
}) })
It("should remove broken connections", func() { It("should remove broken connections", func() {
@ -133,6 +136,48 @@ var _ = Describe("Pool", func() {
Expect(pool.Len()).To(Equal(1)) Expect(pool.Len()).To(Equal(1))
}) })
It("should unblock client when connection is removed", func() {
pool := client.Pool()
// Reserve one connection.
cn, _, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred())
// Reserve the rest of connections.
for i := 0; i < 9; i++ {
_, _, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred())
}
var ping *redis.StatusCmd
started := make(chan bool, 1)
done := make(chan bool, 1)
go func() {
started <- true
ping = client.Ping()
done <- true
}()
<-started
// Check that Ping is blocked.
select {
case <-done:
panic("Ping is not blocked")
default:
// ok
}
Expect(pool.Remove(cn)).NotTo(HaveOccurred())
// Check that Ping is unblocked.
select {
case <-done:
// ok
case <-time.After(time.Second):
panic("Ping is not unblocked")
}
Expect(ping.Err()).NotTo(HaveOccurred())
})
}) })
func BenchmarkPool(b *testing.B) { func BenchmarkPool(b *testing.B) {

View File

@ -160,10 +160,8 @@ type Options struct {
// The maximum number of socket connections. // The maximum number of socket connections.
// Default: 10 // Default: 10
PoolSize int PoolSize int
// If all socket connections is the pool are busy, the pool will wait // PoolTimeout specifies amount of time client waits for a free
// this amount of time for a conection to become available, before // connection in the pool. Default timeout is 1s.
// returning an error.
// Default: 5s
PoolTimeout time.Duration PoolTimeout time.Duration
// Evict connections from the pool after they have been idle for longer // Evict connections from the pool after they have been idle for longer
// than specified in this option. // than specified in this option.
@ -194,7 +192,7 @@ func (opt *Options) getDialTimeout() time.Duration {
func (opt *Options) getPoolTimeout() time.Duration { func (opt *Options) getPoolTimeout() time.Duration {
if opt.PoolTimeout == 0 { if opt.PoolTimeout == 0 {
return 5 * time.Second return 1 * time.Second
} }
return opt.PoolTimeout return opt.PoolTimeout
} }

View File

@ -92,7 +92,7 @@ var _ = Describe("Client", func() {
It("should support idle-timeouts", func() { It("should support idle-timeouts", func() {
idle := redis.NewTCPClient(&redis.Options{ idle := redis.NewTCPClient(&redis.Options{
Addr: redisAddr, Addr: redisAddr,
IdleTimeout: time.Nanosecond, IdleTimeout: 100 * time.Microsecond,
}) })
defer idle.Close() defer idle.Close()