pool: put connection to the list before returning it.

This commit is contained in:
Vladimir Mihailenco 2015-06-03 16:45:46 +03:00
parent 3fc16811b5
commit a8fe55571b
4 changed files with 55 additions and 28 deletions

View File

@ -80,6 +80,7 @@ var _ = BeforeSuite(func() {
var _ = AfterSuite(func() { var _ = AfterSuite(func() {
Expect(redisMain.Close()).NotTo(HaveOccurred()) Expect(redisMain.Close()).NotTo(HaveOccurred())
Expect(ringShard1.Close()).NotTo(HaveOccurred()) Expect(ringShard1.Close()).NotTo(HaveOccurred())
Expect(ringShard2.Close()).NotTo(HaveOccurred()) Expect(ringShard2.Close()).NotTo(HaveOccurred())

View File

@ -3,6 +3,7 @@ package redis
import ( import (
"errors" "errors"
"fmt" "fmt"
"log"
) )
var errDiscard = errors.New("redis: Discard can be used only inside Exec") var errDiscard = errors.New("redis: Discard can be used only inside Exec")
@ -18,7 +19,10 @@ type Multi struct {
func (c *Client) Multi() *Multi { func (c *Client) Multi() *Multi {
multi := &Multi{ multi := &Multi{
base: &baseClient{opt: c.opt, connPool: newSingleConnPool(c.connPool, true)}, base: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, true),
},
} }
multi.commandable.process = multi.process multi.commandable.process = multi.process
return multi return multi
@ -34,7 +38,7 @@ func (c *Multi) process(cmd Cmder) {
func (c *Multi) Close() error { func (c *Multi) Close() error {
if err := c.Unwatch().Err(); err != nil { if err := c.Unwatch().Err(); err != nil {
return err log.Printf("redis: Unwatch failed: %s", err)
} }
return c.base.Close() return c.base.Close()
} }

54
pool.go
View File

@ -258,10 +258,12 @@ func (p *connPool) Remove(cn *conn) error {
// Replace existing connection with new one and unblock waiter. // Replace existing connection with new one and unblock waiter.
newcn, err := p.new() newcn, err := p.new()
if err != nil { if err != nil {
log.Printf("redis: new failed: %s", err)
return p.conns.Remove(cn) return p.conns.Remove(cn)
} }
err = p.conns.Replace(cn, newcn)
p.freeConns <- newcn p.freeConns <- newcn
return p.conns.Replace(cn, newcn) return err
} }
// Len returns total number of connections. // Len returns total number of connections.
@ -313,13 +315,11 @@ func (p *connPool) reaper() {
type singleConnPool struct { type singleConnPool struct {
pool pool pool pool
cnMtx sync.Mutex
cn *conn
reusable bool reusable bool
cn *conn
closed bool closed bool
mx sync.Mutex
} }
func newSingleConnPool(pool pool, reusable bool) *singleConnPool { func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
@ -330,20 +330,24 @@ func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
} }
func (p *singleConnPool) SetConn(cn *conn) { func (p *singleConnPool) SetConn(cn *conn) {
p.cnMtx.Lock() p.mx.Lock()
if p.cn != nil {
panic("p.cn != nil")
}
p.cn = cn p.cn = cn
p.cnMtx.Unlock() p.mx.Unlock()
} }
func (p *singleConnPool) First() *conn { func (p *singleConnPool) First() *conn {
defer p.cnMtx.Unlock() p.mx.Lock()
p.cnMtx.Lock() cn := p.cn
return p.cn p.mx.Unlock()
return cn
} }
func (p *singleConnPool) Get() (*conn, error) { func (p *singleConnPool) Get() (*conn, error) {
defer p.cnMtx.Unlock() defer p.mx.Unlock()
p.cnMtx.Lock() p.mx.Lock()
if p.closed { if p.closed {
return nil, errClosed return nil, errClosed
@ -362,8 +366,8 @@ func (p *singleConnPool) Get() (*conn, error) {
} }
func (p *singleConnPool) Put(cn *conn) error { func (p *singleConnPool) Put(cn *conn) error {
defer p.cnMtx.Unlock() defer p.mx.Unlock()
p.cnMtx.Lock() p.mx.Lock()
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
@ -374,8 +378,8 @@ func (p *singleConnPool) Put(cn *conn) error {
} }
func (p *singleConnPool) Remove(cn *conn) error { func (p *singleConnPool) Remove(cn *conn) error {
defer p.cnMtx.Unlock() defer p.mx.Unlock()
p.cnMtx.Lock() p.mx.Lock()
if p.cn == nil { if p.cn == nil {
panic("p.cn == nil") panic("p.cn == nil")
} }
@ -395,8 +399,8 @@ func (p *singleConnPool) remove() error {
} }
func (p *singleConnPool) Len() int { func (p *singleConnPool) Len() int {
defer p.cnMtx.Unlock() defer p.mx.Unlock()
p.cnMtx.Lock() p.mx.Lock()
if p.cn == nil { if p.cn == nil {
return 0 return 0
} }
@ -404,19 +408,19 @@ func (p *singleConnPool) Len() int {
} }
func (p *singleConnPool) FreeLen() int { func (p *singleConnPool) FreeLen() int {
defer p.cnMtx.Unlock() defer p.mx.Unlock()
p.cnMtx.Lock() p.mx.Lock()
if p.cn == nil { if p.cn == nil {
return 0
}
return 1 return 1
} }
return 0
}
func (p *singleConnPool) Close() error { func (p *singleConnPool) Close() error {
defer p.cnMtx.Unlock() defer p.mx.Unlock()
p.cnMtx.Lock() p.mx.Lock()
if p.closed { if p.closed {
return nil return errClosed
} }
p.closed = true p.closed = true
var err error var err error

View File

@ -88,6 +88,24 @@ var _ = Describe("Client", func() {
Expect(client.Ping().Err()).NotTo(HaveOccurred()) Expect(client.Ping().Err()).NotTo(HaveOccurred())
}) })
It("should close pubsub when client is closed", func() {
pubsub := client.PubSub()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred())
})
It("should close multi when client is closed", func() {
multi := client.Multi()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(multi.Close()).NotTo(HaveOccurred())
})
It("should close pipeline when client is closed", func() {
pipeline := client.Pipeline()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pipeline.Close()).NotTo(HaveOccurred())
})
It("should support idle-timeouts", func() { It("should support idle-timeouts", func() {
idle := redis.NewClient(&redis.Options{ idle := redis.NewClient(&redis.Options{
Addr: redisAddr, Addr: redisAddr,