mirror of https://github.com/go-redis/redis.git
Cleanup.
This commit is contained in:
parent
72a6069cc5
commit
678b8b3667
2
multi.go
2
multi.go
|
@ -17,7 +17,7 @@ func (c *Client) Multi() *Multi {
|
|||
Client: &Client{
|
||||
baseClient: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: newSingleConnPool(c.connPool, nil, true),
|
||||
connPool: newSingleConnPool(c.connPool, true),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ var (
|
|||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
func appendCmd(buf []byte, args []string) []byte {
|
||||
func appendArgs(buf []byte, args []string) []byte {
|
||||
buf = append(buf, '*')
|
||||
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
|
||||
buf = append(buf, '\r', '\n')
|
||||
|
|
63
pool.go
63
pool.go
|
@ -162,7 +162,7 @@ func (p *connPool) Get() (*conn, bool, error) {
|
|||
}
|
||||
|
||||
if p.conns.Len() < p.opt.PoolSize {
|
||||
cn, err := p.dial()
|
||||
cn, err := p.new()
|
||||
if err != nil {
|
||||
p.cond.L.Unlock()
|
||||
return nil, false, err
|
||||
|
@ -277,60 +277,68 @@ func (p *connPool) Close() error {
|
|||
type singleConnPool struct {
|
||||
pool pool
|
||||
|
||||
l sync.RWMutex
|
||||
cnMtx sync.Mutex
|
||||
cn *conn
|
||||
|
||||
reusable bool
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newSingleConnPool(pool pool, cn *conn, reusable bool) *singleConnPool {
|
||||
func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
|
||||
return &singleConnPool{
|
||||
pool: pool,
|
||||
cn: cn,
|
||||
reusable: reusable,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *singleConnPool) SetConn(cn *conn) {
|
||||
p.cnMtx.Lock()
|
||||
p.cn = cn
|
||||
p.cnMtx.Unlock()
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Get() (*conn, bool, error) {
|
||||
p.l.RLock()
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
|
||||
if p.closed {
|
||||
p.l.RUnlock()
|
||||
return nil, false, errClosed
|
||||
}
|
||||
if p.cn != nil {
|
||||
p.l.RUnlock()
|
||||
return p.cn, false, nil
|
||||
}
|
||||
p.l.RUnlock()
|
||||
|
||||
p.l.Lock()
|
||||
cn, isNew, err := p.pool.Get()
|
||||
if err != nil {
|
||||
p.l.Unlock()
|
||||
return nil, false, err
|
||||
}
|
||||
p.cn = cn
|
||||
p.l.Unlock()
|
||||
return cn, isNew, nil
|
||||
|
||||
return p.cn, isNew, nil
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Put(cn *conn) error {
|
||||
p.l.Lock()
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
if p.cn != cn {
|
||||
panic("p.cn != cn")
|
||||
}
|
||||
if p.closed {
|
||||
p.l.Unlock()
|
||||
return errClosed
|
||||
}
|
||||
p.l.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *singleConnPool) put() error {
|
||||
err := p.pool.Put(p.cn)
|
||||
p.cn = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Remove(cn *conn) error {
|
||||
defer p.l.Unlock()
|
||||
p.l.Lock()
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
if p.cn == nil {
|
||||
panic("p.cn == nil")
|
||||
}
|
||||
|
@ -350,8 +358,8 @@ func (p *singleConnPool) remove() error {
|
|||
}
|
||||
|
||||
func (p *singleConnPool) Len() int {
|
||||
defer p.l.Unlock()
|
||||
p.l.Lock()
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
if p.cn == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -359,8 +367,8 @@ func (p *singleConnPool) Len() int {
|
|||
}
|
||||
|
||||
func (p *singleConnPool) Size() int {
|
||||
defer p.l.Unlock()
|
||||
p.l.Lock()
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
if p.cn == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -368,18 +376,18 @@ func (p *singleConnPool) Size() int {
|
|||
}
|
||||
|
||||
func (p *singleConnPool) Filter(f func(*conn) bool) {
|
||||
p.l.Lock()
|
||||
p.cnMtx.Lock()
|
||||
if p.cn != nil {
|
||||
if !f(p.cn) {
|
||||
p.remove()
|
||||
}
|
||||
}
|
||||
p.l.Unlock()
|
||||
p.cnMtx.Unlock()
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Close() error {
|
||||
defer p.l.Unlock()
|
||||
p.l.Lock()
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
if p.closed {
|
||||
return nil
|
||||
}
|
||||
|
@ -387,11 +395,10 @@ func (p *singleConnPool) Close() error {
|
|||
var err error
|
||||
if p.cn != nil {
|
||||
if p.reusable {
|
||||
err = p.pool.Put(p.cn)
|
||||
err = p.put()
|
||||
} else {
|
||||
err = p.pool.Remove(p.cn)
|
||||
err = p.remove()
|
||||
}
|
||||
}
|
||||
p.cn = nil
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ func (c *Client) PubSub() *PubSub {
|
|||
return &PubSub{
|
||||
baseClient: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: newSingleConnPool(c.connPool, nil, false),
|
||||
connPool: newSingleConnPool(c.connPool, false),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
41
redis.go
41
redis.go
|
@ -14,9 +14,9 @@ type baseClient struct {
|
|||
}
|
||||
|
||||
func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
|
||||
buf := make([]byte, 0, 1000)
|
||||
buf := make([]byte, 0, 64)
|
||||
for _, cmd := range cmds {
|
||||
buf = appendCmd(buf, cmd.args())
|
||||
buf = appendArgs(buf, cmd.args())
|
||||
}
|
||||
|
||||
_, err := cn.Write(buf)
|
||||
|
@ -29,8 +29,8 @@ func (c *baseClient) conn() (*conn, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if isNew && (c.opt.Password != "" || c.opt.DB > 0) {
|
||||
if err = c.init(cn, c.opt.Password, c.opt.DB); err != nil {
|
||||
if isNew {
|
||||
if err := c.initConn(cn); err != nil {
|
||||
c.removeConn(cn)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -39,26 +39,31 @@ func (c *baseClient) conn() (*conn, error) {
|
|||
return cn, nil
|
||||
}
|
||||
|
||||
func (c *baseClient) init(cn *conn, password string, db int64) error {
|
||||
// Client is not closed on purpose.
|
||||
func (c *baseClient) initConn(cn *conn) error {
|
||||
if c.opt.Password == "" || c.opt.DB == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
pool := newSingleConnPool(c.connPool, false)
|
||||
pool.SetConn(cn)
|
||||
|
||||
// Client is not closed because we want to reuse underlying connection.
|
||||
client := &Client{
|
||||
baseClient: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: newSingleConnPool(c.connPool, cn, false),
|
||||
connPool: pool,
|
||||
},
|
||||
}
|
||||
|
||||
if password != "" {
|
||||
auth := client.Auth(password)
|
||||
if auth.Err() != nil {
|
||||
return auth.Err()
|
||||
if c.opt.Password != "" {
|
||||
if err := client.Auth(c.opt.Password).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if db > 0 {
|
||||
sel := client.Select(db)
|
||||
if sel.Err() != nil {
|
||||
return sel.Err()
|
||||
if c.opt.DB > 0 {
|
||||
if err := client.Select(c.opt.DB).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,14 +107,16 @@ func (c *baseClient) run(cmd Cmder) {
|
|||
return
|
||||
}
|
||||
|
||||
cn.writeTimeout = c.opt.WriteTimeout
|
||||
if timeout := cmd.writeTimeout(); timeout != nil {
|
||||
cn.writeTimeout = *timeout
|
||||
} else {
|
||||
cn.writeTimeout = c.opt.WriteTimeout
|
||||
}
|
||||
|
||||
cn.readTimeout = c.opt.ReadTimeout
|
||||
if timeout := cmd.readTimeout(); timeout != nil {
|
||||
cn.readTimeout = *timeout
|
||||
} else {
|
||||
cn.readTimeout = c.opt.ReadTimeout
|
||||
}
|
||||
|
||||
if err := c.writeCmd(cn, cmd); err != nil {
|
||||
|
|
|
@ -94,7 +94,7 @@ func (c *sentinelClient) PubSub() *PubSub {
|
|||
return &PubSub{
|
||||
baseClient: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: newSingleConnPool(c.connPool, nil, false),
|
||||
connPool: newSingleConnPool(c.connPool, false),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue