Merge pull request #30 from go-redis/fix/cleanup

Cleanup.
This commit is contained in:
Vladimir Mihailenco 2014-06-28 14:51:39 +03:00
commit f25e9b104b
6 changed files with 64 additions and 50 deletions

View File

@ -17,7 +17,7 @@ func (c *Client) Multi() *Multi {
Client: &Client{ Client: &Client{
baseClient: &baseClient{ baseClient: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, nil, true), connPool: newSingleConnPool(c.connPool, true),
}, },
}, },
} }

View File

@ -17,7 +17,7 @@ var (
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func appendCmd(buf []byte, args []string) []byte { func appendArgs(buf []byte, args []string) []byte {
buf = append(buf, '*') buf = append(buf, '*')
buf = strconv.AppendUint(buf, uint64(len(args)), 10) buf = strconv.AppendUint(buf, uint64(len(args)), 10)
buf = append(buf, '\r', '\n') buf = append(buf, '\r', '\n')

65
pool.go
View File

@ -162,7 +162,7 @@ func (p *connPool) Get() (*conn, bool, error) {
} }
if p.conns.Len() < p.opt.PoolSize { if p.conns.Len() < p.opt.PoolSize {
cn, err := p.dial() cn, err := p.new()
if err != nil { if err != nil {
p.cond.L.Unlock() p.cond.L.Unlock()
return nil, false, err return nil, false, err
@ -277,60 +277,68 @@ func (p *connPool) Close() error {
type singleConnPool struct { type singleConnPool struct {
pool pool pool pool
l sync.RWMutex cnMtx sync.Mutex
cn *conn cn *conn
reusable bool reusable bool
closed bool closed bool
} }
func newSingleConnPool(pool pool, cn *conn, reusable bool) *singleConnPool { func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
return &singleConnPool{ return &singleConnPool{
pool: pool, pool: pool,
cn: cn,
reusable: reusable, reusable: reusable,
} }
} }
func (p *singleConnPool) SetConn(cn *conn) {
p.cnMtx.Lock()
p.cn = cn
p.cnMtx.Unlock()
}
func (p *singleConnPool) Get() (*conn, bool, error) { func (p *singleConnPool) Get() (*conn, bool, error) {
p.l.RLock() defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.closed { if p.closed {
p.l.RUnlock()
return nil, false, errClosed return nil, false, errClosed
} }
if p.cn != nil { if p.cn != nil {
p.l.RUnlock()
return p.cn, false, nil return p.cn, false, nil
} }
p.l.RUnlock()
p.l.Lock()
cn, isNew, err := p.pool.Get() cn, isNew, err := p.pool.Get()
if err != nil { if err != nil {
p.l.Unlock()
return nil, false, err return nil, false, err
} }
p.cn = cn p.cn = cn
p.l.Unlock()
return cn, isNew, nil return p.cn, isNew, nil
} }
func (p *singleConnPool) Put(cn *conn) error { func (p *singleConnPool) Put(cn *conn) error {
p.l.Lock() defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
if p.closed { if p.closed {
p.l.Unlock()
return errClosed return errClosed
} }
p.l.Unlock()
return nil return nil
} }
func (p *singleConnPool) put() error {
err := p.pool.Put(p.cn)
p.cn = nil
return err
}
func (p *singleConnPool) Remove(cn *conn) error { func (p *singleConnPool) Remove(cn *conn) error {
defer p.l.Unlock() defer p.cnMtx.Unlock()
p.l.Lock() p.cnMtx.Lock()
if p.cn == nil { if p.cn == nil {
panic("p.cn == nil") panic("p.cn == nil")
} }
@ -350,8 +358,8 @@ func (p *singleConnPool) remove() error {
} }
func (p *singleConnPool) Len() int { func (p *singleConnPool) Len() int {
defer p.l.Unlock() defer p.cnMtx.Unlock()
p.l.Lock() p.cnMtx.Lock()
if p.cn == nil { if p.cn == nil {
return 0 return 0
} }
@ -359,8 +367,8 @@ func (p *singleConnPool) Len() int {
} }
func (p *singleConnPool) Size() int { func (p *singleConnPool) Size() int {
defer p.l.Unlock() defer p.cnMtx.Unlock()
p.l.Lock() p.cnMtx.Lock()
if p.cn == nil { if p.cn == nil {
return 0 return 0
} }
@ -368,18 +376,18 @@ func (p *singleConnPool) Size() int {
} }
func (p *singleConnPool) Filter(f func(*conn) bool) { func (p *singleConnPool) Filter(f func(*conn) bool) {
p.l.Lock() p.cnMtx.Lock()
if p.cn != nil { if p.cn != nil {
if !f(p.cn) { if !f(p.cn) {
p.remove() p.remove()
} }
} }
p.l.Unlock() p.cnMtx.Unlock()
} }
func (p *singleConnPool) Close() error { func (p *singleConnPool) Close() error {
defer p.l.Unlock() defer p.cnMtx.Unlock()
p.l.Lock() p.cnMtx.Lock()
if p.closed { if p.closed {
return nil return nil
} }
@ -387,11 +395,10 @@ func (p *singleConnPool) Close() error {
var err error var err error
if p.cn != nil { if p.cn != nil {
if p.reusable { if p.reusable {
err = p.pool.Put(p.cn) err = p.put()
} else { } else {
err = p.pool.Remove(p.cn) err = p.remove()
} }
} }
p.cn = nil
return err return err
} }

View File

@ -14,7 +14,7 @@ func (c *Client) PubSub() *PubSub {
return &PubSub{ return &PubSub{
baseClient: &baseClient{ baseClient: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, nil, false), connPool: newSingleConnPool(c.connPool, false),
}, },
} }
} }

View File

@ -14,9 +14,9 @@ type baseClient struct {
} }
func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error { func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
buf := make([]byte, 0, 1000) buf := make([]byte, 0, 64)
for _, cmd := range cmds { for _, cmd := range cmds {
buf = appendCmd(buf, cmd.args()) buf = appendArgs(buf, cmd.args())
} }
_, err := cn.Write(buf) _, err := cn.Write(buf)
@ -29,8 +29,8 @@ func (c *baseClient) conn() (*conn, error) {
return nil, err return nil, err
} }
if isNew && (c.opt.Password != "" || c.opt.DB > 0) { if isNew {
if err = c.init(cn, c.opt.Password, c.opt.DB); err != nil { if err := c.initConn(cn); err != nil {
c.removeConn(cn) c.removeConn(cn)
return nil, err return nil, err
} }
@ -39,26 +39,31 @@ func (c *baseClient) conn() (*conn, error) {
return cn, nil return cn, nil
} }
func (c *baseClient) init(cn *conn, password string, db int64) error { func (c *baseClient) initConn(cn *conn) error {
// Client is not closed on purpose. if c.opt.Password == "" || c.opt.DB == 0 {
return nil
}
pool := newSingleConnPool(c.connPool, false)
pool.SetConn(cn)
// Client is not closed because we want to reuse underlying connection.
client := &Client{ client := &Client{
baseClient: &baseClient{ baseClient: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, cn, false), connPool: pool,
}, },
} }
if password != "" { if c.opt.Password != "" {
auth := client.Auth(password) if err := client.Auth(c.opt.Password).Err(); err != nil {
if auth.Err() != nil { return err
return auth.Err()
} }
} }
if db > 0 { if c.opt.DB > 0 {
sel := client.Select(db) if err := client.Select(c.opt.DB).Err(); err != nil {
if sel.Err() != nil { return err
return sel.Err()
} }
} }
@ -102,14 +107,16 @@ func (c *baseClient) run(cmd Cmder) {
return return
} }
cn.writeTimeout = c.opt.WriteTimeout
if timeout := cmd.writeTimeout(); timeout != nil { if timeout := cmd.writeTimeout(); timeout != nil {
cn.writeTimeout = *timeout cn.writeTimeout = *timeout
} else {
cn.writeTimeout = c.opt.WriteTimeout
} }
cn.readTimeout = c.opt.ReadTimeout
if timeout := cmd.readTimeout(); timeout != nil { if timeout := cmd.readTimeout(); timeout != nil {
cn.readTimeout = *timeout cn.readTimeout = *timeout
} else {
cn.readTimeout = c.opt.ReadTimeout
} }
if err := c.writeCmd(cn, cmd); err != nil { if err := c.writeCmd(cn, cmd); err != nil {

View File

@ -94,7 +94,7 @@ func (c *sentinelClient) PubSub() *PubSub {
return &PubSub{ return &PubSub{
baseClient: &baseClient{ baseClient: &baseClient{
opt: c.opt, opt: c.opt,
connPool: newSingleConnPool(c.connPool, nil, false), connPool: newSingleConnPool(c.connPool, false),
}, },
} }
} }