diff --git a/cluster.go b/cluster.go index 463cae4f..ab06890b 100644 --- a/cluster.go +++ b/cluster.go @@ -146,6 +146,7 @@ func (c *ClusterClient) process(cmd Cmder) { pipe.Process(NewCmd("ASKING")) pipe.Process(cmd) _, _ = pipe.Exec() + pipe.Close() ask = false } else { client.Process(cmd) diff --git a/pool.go b/pool.go index 03bb1a00..2c387e9c 100644 --- a/pool.go +++ b/pool.go @@ -163,8 +163,12 @@ func (p *connPool) First() *conn { select { case cn := <-p.freeConns: if p.isIdle(cn) { - p.conns.Remove(cn) - continue + var err error + cn, err = p.replace(cn) + if err != nil { + log.Printf("redis: replace failed: %s", err) + continue + } } return cn default: @@ -181,8 +185,12 @@ func (p *connPool) wait() *conn { select { case cn := <-p.freeConns: if p.isIdle(cn) { - p.Remove(cn) - continue + var err error + cn, err = p.replace(cn) + if err != nil { + log.Printf("redis: replace failed: %s", err) + continue + } } return cn case <-deadline: @@ -257,16 +265,24 @@ func (p *connPool) Put(cn *conn) error { return nil } -func (p *connPool) Remove(cn *conn) error { - // Replace existing connection with new one and unblock waiter. +func (p *connPool) replace(cn *conn) (*conn, error) { newcn, err := p.new() if err != nil { - log.Printf("redis: new failed: %s", err) - return p.conns.Remove(cn) + _ = p.conns.Remove(cn) + return nil, err + } + _ = p.conns.Replace(cn, newcn) + return newcn, nil +} + +func (p *connPool) Remove(cn *conn) error { + // Replace existing connection with new one and unblock waiter. + newcn, err := p.replace(cn) + if err != nil { + return err } - err = p.conns.Replace(cn, newcn) p.freeConns <- newcn - return err + return nil } // Len returns total number of connections.