Fix pool panic on slow connection with MaxRetries > 0.

This commit is contained in:
Vladimir Mihailenco 2015-10-13 12:02:29 +03:00
parent 8ca66a53e6
commit 25164333ff
10 changed files with 112 additions and 39 deletions

View File

@ -43,11 +43,8 @@ func (cn *conn) init(opt *Options) error {
return nil return nil
} }
// Use connection to connect to Redis. // Temp client for Auth and Select.
pool := newSingleConnPoolConn(cn) client := newClient(opt, newSingleConnPool(cn))
// Client is not closed because we want to reuse underlying connection.
client := newClient(opt, pool)
if opt.Password != "" { if opt.Password != "" {
if err := client.Auth(opt.Password).Err(); err != nil { if err := client.Auth(opt.Password).Err(); err != nil {

25
conn_test.go Normal file
View File

@ -0,0 +1,25 @@
package redis_test
import (
"net"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"gopkg.in/redis.v3"
)
var _ = Describe("newConnDialer with bad connection", func() {
It("should return an error", func() {
dialer := redis.NewConnDialer(&redis.Options{
Dialer: func() (net.Conn, error) {
return &badConn{}, nil
},
MaxRetries: 3,
Password: "password",
DB: 1,
})
_, err := dialer()
Expect(err).To(MatchError("bad connection"))
})
})

View File

@ -6,6 +6,8 @@ func (c *baseClient) Pool() pool {
return c.connPool return c.connPool
} }
var NewConnDialer = newConnDialer
func (cn *conn) SetNetConn(netcn net.Conn) { func (cn *conn) SetNetConn(netcn net.Conn) {
cn.netcn = netcn cn.netcn = netcn
} }

View File

@ -232,7 +232,15 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
var errTimeout = syscall.ETIMEDOUT var (
errTimeout = syscall.ETIMEDOUT
)
type badConnError string
func (e badConnError) Error() string { return string(e) }
func (e badConnError) Timeout() bool { return false }
func (e badConnError) Temporary() bool { return false }
type badConn struct { type badConn struct {
net.TCPConn net.TCPConn
@ -250,7 +258,7 @@ func (cn *badConn) Read([]byte) (int, error) {
if cn.readErr != nil { if cn.readErr != nil {
return 0, cn.readErr return 0, cn.readErr
} }
return 0, net.UnknownNetworkError("badConn") return 0, badConnError("bad connection")
} }
func (cn *badConn) Write([]byte) (int, error) { func (cn *badConn) Write([]byte) (int, error) {
@ -260,5 +268,5 @@ func (cn *badConn) Write([]byte) (int, error) {
if cn.writeErr != nil { if cn.writeErr != nil {
return 0, cn.writeErr return 0, cn.writeErr
} }
return 0, net.UnknownNetworkError("badConn") return 0, badConnError("bad connection")
} }

View File

@ -22,7 +22,7 @@ func (c *Client) Multi() *Multi {
multi := &Multi{ multi := &Multi{
base: &baseClient{ base: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, true), connPool: newStickyConnPool(c.connPool, true),
}, },
} }
multi.commandable.process = multi.process multi.commandable.process = multi.process

86
pool.go
View File

@ -314,6 +314,52 @@ func (p *connPool) reaper() {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type singleConnPool struct { type singleConnPool struct {
cn *conn
}
func newSingleConnPool(cn *conn) *singleConnPool {
return &singleConnPool{
cn: cn,
}
}
func (p *singleConnPool) First() *conn {
return p.cn
}
func (p *singleConnPool) Get() (*conn, error) {
return p.cn, nil
}
func (p *singleConnPool) Put(cn *conn) error {
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}
func (p *singleConnPool) Remove(cn *conn) error {
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}
func (p *singleConnPool) Len() int {
return 1
}
func (p *singleConnPool) FreeLen() int {
return 0
}
func (p *singleConnPool) Close() error {
return nil
}
//------------------------------------------------------------------------------
type stickyConnPool struct {
pool pool pool pool
reusable bool reusable bool
@ -322,27 +368,21 @@ type singleConnPool struct {
mx sync.Mutex mx sync.Mutex
} }
func newSingleConnPool(pool pool, reusable bool) *singleConnPool { func newStickyConnPool(pool pool, reusable bool) *stickyConnPool {
return &singleConnPool{ return &stickyConnPool{
pool: pool, pool: pool,
reusable: reusable, reusable: reusable,
} }
} }
func newSingleConnPoolConn(cn *conn) *singleConnPool { func (p *stickyConnPool) First() *conn {
return &singleConnPool{
cn: cn,
}
}
func (p *singleConnPool) First() *conn {
p.mx.Lock() p.mx.Lock()
cn := p.cn cn := p.cn
p.mx.Unlock() p.mx.Unlock()
return cn return cn
} }
func (p *singleConnPool) Get() (*conn, error) { func (p *stickyConnPool) Get() (*conn, error) {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
@ -362,15 +402,13 @@ func (p *singleConnPool) Get() (*conn, error) {
return p.cn, nil return p.cn, nil
} }
func (p *singleConnPool) put() (err error) { func (p *stickyConnPool) put() (err error) {
if p.pool != nil {
err = p.pool.Put(p.cn) err = p.pool.Put(p.cn)
}
p.cn = nil p.cn = nil
return err return err
} }
func (p *singleConnPool) Put(cn *conn) error { func (p *stickyConnPool) Put(cn *conn) error {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
if p.cn != cn { if p.cn != cn {
@ -382,30 +420,32 @@ func (p *singleConnPool) Put(cn *conn) error {
return nil return nil
} }
func (p *singleConnPool) remove() (err error) { func (p *stickyConnPool) remove() (err error) {
if p.pool != nil {
err = p.pool.Remove(p.cn) err = p.pool.Remove(p.cn)
}
p.cn = nil p.cn = nil
return err return err
} }
func (p *singleConnPool) Remove(cn *conn) error { func (p *stickyConnPool) Remove(cn *conn) error {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
if p.cn == nil { if p.cn == nil {
panic("p.cn == nil") panic("p.cn == nil")
} }
if cn != nil && cn != p.cn { if cn != nil && p.cn != cn {
panic("cn != p.cn") panic("p.cn != cn")
} }
if p.closed { if p.closed {
return errClosed return errClosed
} }
if cn == nil {
return p.remove() return p.remove()
} else {
return nil
}
} }
func (p *singleConnPool) Len() int { func (p *stickyConnPool) Len() int {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
if p.cn == nil { if p.cn == nil {
@ -414,7 +454,7 @@ func (p *singleConnPool) Len() int {
return 1 return 1
} }
func (p *singleConnPool) FreeLen() int { func (p *stickyConnPool) FreeLen() int {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
if p.cn == nil { if p.cn == nil {
@ -423,7 +463,7 @@ func (p *singleConnPool) FreeLen() int {
return 0 return 0
} }
func (p *singleConnPool) Close() error { func (p *stickyConnPool) Close() error {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
if p.closed { if p.closed {

View File

@ -11,7 +11,7 @@ import (
"gopkg.in/redis.v3" "gopkg.in/redis.v3"
) )
var _ = Describe("Pool", func() { var _ = Describe("pool", func() {
var client *redis.Client var client *redis.Client
var perform = func(n int, cb func()) { var perform = func(n int, cb func()) {

View File

@ -29,7 +29,7 @@ func (c *Client) PubSub() *PubSub {
return &PubSub{ return &PubSub{
baseClient: &baseClient{ baseClient: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, false), connPool: newStickyConnPool(c.connPool, false),
}, },
} }
} }

View File

@ -161,7 +161,8 @@ var _ = Describe("Client", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{}) cn.SetNetConn(&badConn{})
Expect(client.Pool().Put(cn)).NotTo(HaveOccurred()) err = client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())
err = client.Ping().Err() err = client.Ping().Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -90,7 +90,7 @@ func (c *sentinelClient) PubSub() *PubSub {
return &PubSub{ return &PubSub{
baseClient: &baseClient{ baseClient: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, false), connPool: newStickyConnPool(c.connPool, false),
}, },
} }
} }