forked from mirror/redis
Merge pull request #90 from go-redis/fix/pool-waiting-for-connection
Fix pool waiting for connection that was removed.
This commit is contained in:
commit
e91b88fa17
27
cluster.go
27
cluster.go
|
@ -256,29 +256,13 @@ type ClusterOptions struct {
|
||||||
// giving up. Default: 16
|
// giving up. Default: 16
|
||||||
MaxRedirects int
|
MaxRedirects int
|
||||||
|
|
||||||
// The maximum number of TCP sockets per connection. Default: 5
|
// Following options are copied from `redis.Options`.
|
||||||
PoolSize int
|
PoolSize int
|
||||||
|
DialTimeout, ReadTimeout, WriteTimeout, PoolTimeout, IdleTimeout time.Duration
|
||||||
// Timeout settings
|
|
||||||
DialTimeout, ReadTimeout, WriteTimeout, IdleTimeout time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (opt *ClusterOptions) getPoolSize() int {
|
|
||||||
if opt.PoolSize < 1 {
|
|
||||||
return 5
|
|
||||||
}
|
|
||||||
return opt.PoolSize
|
|
||||||
}
|
|
||||||
|
|
||||||
func (opt *ClusterOptions) getDialTimeout() time.Duration {
|
|
||||||
if opt.DialTimeout == 0 {
|
|
||||||
return 5 * time.Second
|
|
||||||
}
|
|
||||||
return opt.DialTimeout
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *ClusterOptions) getMaxRedirects() int {
|
func (opt *ClusterOptions) getMaxRedirects() int {
|
||||||
if opt.MaxRedirects < 1 {
|
if opt.MaxRedirects == 0 {
|
||||||
return 16
|
return 16
|
||||||
}
|
}
|
||||||
return opt.MaxRedirects
|
return opt.MaxRedirects
|
||||||
|
@ -289,11 +273,12 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
DB: 0,
|
DB: 0,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
|
||||||
DialTimeout: opt.getDialTimeout(),
|
DialTimeout: opt.DialTimeout,
|
||||||
ReadTimeout: opt.ReadTimeout,
|
ReadTimeout: opt.ReadTimeout,
|
||||||
WriteTimeout: opt.WriteTimeout,
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
|
||||||
PoolSize: opt.getPoolSize(),
|
PoolSize: opt.PoolSize,
|
||||||
|
PoolTimeout: opt.PoolTimeout,
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
23
pool.go
23
pool.go
|
@ -133,7 +133,7 @@ func (p *connPool) First() *conn {
|
||||||
select {
|
select {
|
||||||
case cn := <-p.freeConns:
|
case cn := <-p.freeConns:
|
||||||
if cn.isIdle(p.opt.IdleTimeout) {
|
if cn.isIdle(p.opt.IdleTimeout) {
|
||||||
p.remove(cn)
|
p.Remove(cn)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return cn
|
return cn
|
||||||
|
@ -151,7 +151,7 @@ func (p *connPool) wait(timeout time.Duration) *conn {
|
||||||
select {
|
select {
|
||||||
case cn := <-p.freeConns:
|
case cn := <-p.freeConns:
|
||||||
if cn.isIdle(p.opt.IdleTimeout) {
|
if cn.isIdle(p.opt.IdleTimeout) {
|
||||||
p.remove(cn)
|
p.Remove(cn)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return cn
|
return cn
|
||||||
|
@ -215,10 +215,6 @@ func (p *connPool) Put(cn *conn) error {
|
||||||
log.Printf("redis: connection has unread data: %q", b)
|
log.Printf("redis: connection has unread data: %q", b)
|
||||||
return p.Remove(cn)
|
return p.Remove(cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.isClosed() {
|
|
||||||
return errClosed
|
|
||||||
}
|
|
||||||
if p.opt.IdleTimeout > 0 {
|
if p.opt.IdleTimeout > 0 {
|
||||||
cn.usedAt = time.Now()
|
cn.usedAt = time.Now()
|
||||||
}
|
}
|
||||||
|
@ -228,13 +224,18 @@ func (p *connPool) Put(cn *conn) error {
|
||||||
|
|
||||||
func (p *connPool) Remove(cn *conn) error {
|
func (p *connPool) Remove(cn *conn) error {
|
||||||
if p.isClosed() {
|
if p.isClosed() {
|
||||||
return nil
|
atomic.AddInt32(&p.size, -1)
|
||||||
}
|
return cn.Close()
|
||||||
return p.remove(cn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) remove(cn *conn) error {
|
// Replace existing connection with new one and unblock `wait`.
|
||||||
|
newcn, err := p.new()
|
||||||
|
if err != nil {
|
||||||
atomic.AddInt32(&p.size, -1)
|
atomic.AddInt32(&p.size, -1)
|
||||||
|
} else {
|
||||||
|
p.Put(newcn)
|
||||||
|
}
|
||||||
|
|
||||||
return cn.Close()
|
return cn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +260,7 @@ func (p *connPool) Close() (retErr error) {
|
||||||
if cn == nil {
|
if cn == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := p.remove(cn); err != nil {
|
if err := p.Remove(cn); err != nil {
|
||||||
retErr = err
|
retErr = err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
53
pool_test.go
53
pool_test.go
|
@ -7,11 +7,13 @@ import (
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"gopkg.in/redis.v2"
|
"gopkg.in/redis.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Pool", func() {
|
var _ = Describe("Pool", func() {
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
|
|
||||||
var perform = func(n int, cb func()) {
|
var perform = func(n int, cb func()) {
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -27,13 +29,14 @@ var _ = Describe("Pool", func() {
|
||||||
}
|
}
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewTCPClient(&redis.Options{
|
client = redis.NewClient(&redis.Options{
|
||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
|
PoolSize: 10,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
client.FlushDb()
|
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -98,8 +101,8 @@ var _ = Describe("Pool", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Size()).To(Equal(0))
|
Expect(pool.Size()).To(Equal(10))
|
||||||
Expect(pool.Len()).To(Equal(0))
|
Expect(pool.Len()).To(Equal(10))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should remove broken connections", func() {
|
It("should remove broken connections", func() {
|
||||||
|
@ -133,6 +136,48 @@ var _ = Describe("Pool", func() {
|
||||||
Expect(pool.Len()).To(Equal(1))
|
Expect(pool.Len()).To(Equal(1))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should unblock client when connection is removed", func() {
|
||||||
|
pool := client.Pool()
|
||||||
|
|
||||||
|
// Reserve one connection.
|
||||||
|
cn, _, err := client.Pool().Get()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Reserve the rest of connections.
|
||||||
|
for i := 0; i < 9; i++ {
|
||||||
|
_, _, err := client.Pool().Get()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
var ping *redis.StatusCmd
|
||||||
|
started := make(chan bool, 1)
|
||||||
|
done := make(chan bool, 1)
|
||||||
|
go func() {
|
||||||
|
started <- true
|
||||||
|
ping = client.Ping()
|
||||||
|
done <- true
|
||||||
|
}()
|
||||||
|
<-started
|
||||||
|
|
||||||
|
// Check that Ping is blocked.
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
panic("Ping is not blocked")
|
||||||
|
default:
|
||||||
|
// ok
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(pool.Remove(cn)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Check that Ping is unblocked.
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// ok
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
panic("Ping is not unblocked")
|
||||||
|
}
|
||||||
|
Expect(ping.Err()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
func BenchmarkPool(b *testing.B) {
|
func BenchmarkPool(b *testing.B) {
|
||||||
|
|
8
redis.go
8
redis.go
|
@ -160,10 +160,8 @@ type Options struct {
|
||||||
// The maximum number of socket connections.
|
// The maximum number of socket connections.
|
||||||
// Default: 10
|
// Default: 10
|
||||||
PoolSize int
|
PoolSize int
|
||||||
// If all socket connections is the pool are busy, the pool will wait
|
// PoolTimeout specifies amount of time client waits for a free
|
||||||
// this amount of time for a conection to become available, before
|
// connection in the pool. Default timeout is 1s.
|
||||||
// returning an error.
|
|
||||||
// Default: 5s
|
|
||||||
PoolTimeout time.Duration
|
PoolTimeout time.Duration
|
||||||
// Evict connections from the pool after they have been idle for longer
|
// Evict connections from the pool after they have been idle for longer
|
||||||
// than specified in this option.
|
// than specified in this option.
|
||||||
|
@ -194,7 +192,7 @@ func (opt *Options) getDialTimeout() time.Duration {
|
||||||
|
|
||||||
func (opt *Options) getPoolTimeout() time.Duration {
|
func (opt *Options) getPoolTimeout() time.Duration {
|
||||||
if opt.PoolTimeout == 0 {
|
if opt.PoolTimeout == 0 {
|
||||||
return 5 * time.Second
|
return 1 * time.Second
|
||||||
}
|
}
|
||||||
return opt.PoolTimeout
|
return opt.PoolTimeout
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ var _ = Describe("Client", func() {
|
||||||
It("should support idle-timeouts", func() {
|
It("should support idle-timeouts", func() {
|
||||||
idle := redis.NewTCPClient(&redis.Options{
|
idle := redis.NewTCPClient(&redis.Options{
|
||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
IdleTimeout: time.Nanosecond,
|
IdleTimeout: 100 * time.Microsecond,
|
||||||
})
|
})
|
||||||
defer idle.Close()
|
defer idle.Close()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue