Merge pull request #1114 from go-redis/fix/pool-single

Add proper SingleConnPool implementation
This commit is contained in:
Vladimir Mihailenco 2019-08-03 17:48:25 +03:00 committed by GitHub
commit 742f3ccb21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 212 additions and 69 deletions

View File

@ -787,7 +787,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
} }
// If slave is loading - pick another node. // If slave is loading - pick another node.
if c.opt.ReadOnly && internal.IsLoadingError(err) { if c.opt.ReadOnly && isLoadingError(err) {
node.MarkAsFailing() node.MarkAsFailing()
node = nil node = nil
continue continue
@ -795,7 +795,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
var moved bool var moved bool
var addr string var addr string
moved, ask, addr = internal.IsMovedError(err) moved, ask, addr = isMovedError(err)
if moved || ask { if moved || ask {
node, err = c.nodes.Get(addr) node, err = c.nodes.Get(addr)
if err != nil { if err != nil {
@ -804,12 +804,12 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
continue continue
} }
if err == pool.ErrClosed || internal.IsReadOnlyError(err) { if err == pool.ErrClosed || isReadOnlyError(err) {
node = nil node = nil
continue continue
} }
if internal.IsRetryableError(err, true) { if isRetryableError(err, true) {
// First retry the same node. // First retry the same node.
if attempt == 0 { if attempt == 0 {
continue continue
@ -1173,9 +1173,9 @@ func (c *ClusterClient) pipelineReadCmds(
continue continue
} }
if c.opt.ReadOnly && internal.IsLoadingError(err) { if c.opt.ReadOnly && isLoadingError(err) {
node.MarkAsFailing() node.MarkAsFailing()
} else if internal.IsRedisError(err) { } else if isRedisError(err) {
continue continue
} }
@ -1192,7 +1192,7 @@ func (c *ClusterClient) pipelineReadCmds(
func (c *ClusterClient) checkMovedErr( func (c *ClusterClient) checkMovedErr(
cmd Cmder, err error, failedCmds *cmdsMap, cmd Cmder, err error, failedCmds *cmdsMap,
) bool { ) bool {
moved, ask, addr := internal.IsMovedError(err) moved, ask, addr := isMovedError(err)
if moved { if moved {
c.state.LazyReload() c.state.LazyReload()
@ -1346,7 +1346,7 @@ func (c *ClusterClient) txPipelineReadQueued(
continue continue
} }
if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) { if c.checkMovedErr(cmd, err, failedCmds) || isRedisError(err) {
continue continue
} }
@ -1418,7 +1418,7 @@ func (c *ClusterClient) WatchContext(ctx context.Context, fn func(*Tx) error, ke
c.state.LazyReload() c.state.LazyReload()
} }
moved, ask, addr := internal.IsMovedError(err) moved, ask, addr := isMovedError(err)
if moved || ask { if moved || ask {
node, err = c.nodes.Get(addr) node, err = c.nodes.Get(addr)
if err != nil { if err != nil {
@ -1427,7 +1427,7 @@ func (c *ClusterClient) WatchContext(ctx context.Context, fn func(*Tx) error, ke
continue continue
} }
if err == pool.ErrClosed || internal.IsReadOnlyError(err) { if err == pool.ErrClosed || isReadOnlyError(err) {
node, err = c.slotMasterNode(slot) node, err = c.slotMasterNode(slot)
if err != nil { if err != nil {
return err return err
@ -1435,7 +1435,7 @@ func (c *ClusterClient) WatchContext(ctx context.Context, fn func(*Tx) error, ke
continue continue
} }
if internal.IsRetryableError(err, true) { if isRetryableError(err, true) {
continue continue
} }

View File

@ -1,20 +1,18 @@
package internal package redis
import ( import (
"context" "context"
"errors"
"io" "io"
"net" "net"
"strings" "strings"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
) )
var ErrSingleConnPoolClosed = errors.New("redis: SingleConnPool is closed") func isRetryableError(err error, retryTimeout bool) bool {
func IsRetryableError(err error, retryTimeout bool) bool {
switch err { switch err {
case nil, context.Canceled, context.DeadlineExceeded: case nil, context.Canceled, context.DeadlineExceeded, pool.ErrBadConn:
return false return false
case io.EOF: case io.EOF:
return true return true
@ -25,9 +23,6 @@ func IsRetryableError(err error, retryTimeout bool) bool {
} }
return true return true
} }
if err == ErrSingleConnPoolClosed {
return true
}
s := err.Error() s := err.Error()
if s == "ERR max number of clients reached" { if s == "ERR max number of clients reached" {
@ -45,18 +40,20 @@ func IsRetryableError(err error, retryTimeout bool) bool {
return false return false
} }
func IsRedisError(err error) bool { func isRedisError(err error) bool {
_, ok := err.(proto.RedisError) _, ok := err.(proto.RedisError)
return ok return ok
} }
func IsBadConn(err error, allowTimeout bool) bool { func isBadConn(err error, allowTimeout bool) bool {
if err == nil { switch err {
case nil:
return false return false
case pool.ErrBadConn:
return true
} }
if IsRedisError(err) { if isRedisError(err) {
// #790 return isReadOnlyError(err) // #790
return IsReadOnlyError(err)
} }
if allowTimeout { if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@ -66,8 +63,8 @@ func IsBadConn(err error, allowTimeout bool) bool {
return true return true
} }
func IsMovedError(err error) (moved bool, ask bool, addr string) { func isMovedError(err error) (moved bool, ask bool, addr string) {
if !IsRedisError(err) { if !isRedisError(err) {
return return
} }
@ -89,10 +86,10 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
return return
} }
func IsLoadingError(err error) bool { func isLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ") return strings.HasPrefix(err.Error(), "LOADING ")
} }
func IsReadOnlyError(err error) bool { func isReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ") return strings.HasPrefix(err.Error(), "READONLY ")
} }

View File

@ -151,6 +151,27 @@ func ExampleClient() {
// missing_key does not exist // missing_key does not exist
} }
func ExampleConn() {
conn := redisdb.Conn()
err := conn.ClientSetName("foobar").Err()
if err != nil {
panic(err)
}
// Open other connections.
for i := 0; i < 10; i++ {
go redisdb.Ping()
}
s, err := conn.ClientGetName().Result()
if err != nil {
panic(err)
}
fmt.Println(s)
// Output: foobar
}
func ExampleClient_Set() { func ExampleClient_Set() {
// Last argument is expiration. Zero means the key has no // Last argument is expiration. Zero means the key has no
// expiration time. // expiration time.

View File

@ -2,66 +2,184 @@ package pool
import ( import (
"context" "context"
"fmt"
"github.com/go-redis/redis/internal" "sync/atomic"
) )
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
var ErrBadConn = fmt.Errorf("pg: Conn is in a bad state")
type SingleConnPool struct { type SingleConnPool struct {
cn *Conn pool Pooler
cnClosed bool
state uint32 // atomic
ch chan *Conn
level int32 // atomic
_hasBadConn uint32 // atomic
} }
var _ Pooler = (*SingleConnPool)(nil) var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(cn *Conn) *SingleConnPool { func NewSingleConnPool(pool Pooler) *SingleConnPool {
return &SingleConnPool{ p, ok := pool.(*SingleConnPool)
cn: cn, if !ok {
p = &SingleConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.level, 1)
return p
}
func (p *SingleConnPool) SetConn(cn *Conn) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
p.ch <- cn
} else {
panic("not reached")
} }
} }
func (p *SingleConnPool) NewConn(context.Context) (*Conn, error) { func (p *SingleConnPool) NewConn(c context.Context) (*Conn, error) {
panic("not implemented") return p.pool.NewConn(c)
} }
func (p *SingleConnPool) CloseConn(*Conn) error { func (p *SingleConnPool) CloseConn(cn *Conn) error {
panic("not implemented") return p.pool.CloseConn(cn)
} }
func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) { func (p *SingleConnPool) Get(c context.Context) (*Conn, error) {
if p.cnClosed { // In worst case this races with Close which is not a very common operation.
return nil, internal.ErrSingleConnPoolClosed for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
cn, err := p.pool.Get(c)
if err != nil {
return nil, err
} }
return p.cn, nil if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil
}
p.pool.Remove(cn)
case stateInited:
if p.hasBadConn() {
return nil, ErrBadConn
}
cn, ok := <-p.ch
if !ok {
return nil, ErrClosed
}
return cn, nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
}
return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
} }
func (p *SingleConnPool) Put(cn *Conn) { func (p *SingleConnPool) Put(cn *Conn) {
if p.cn != cn { defer func() {
panic("p.cn != cn") if recover() != nil {
p.freeConn(cn)
}
}()
p.ch <- cn
}
func (p *SingleConnPool) freeConn(cn *Conn) {
if p.hasBadConn() {
p.pool.Remove(cn)
} else {
p.pool.Put(cn)
} }
} }
func (p *SingleConnPool) Remove(cn *Conn) { func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn { defer func() {
panic("p.cn != cn") if recover() != nil {
p.pool.Remove(cn)
} }
p.cnClosed = true }()
atomic.StoreUint32(&p._hasBadConn, 1)
p.ch <- cn
} }
func (p *SingleConnPool) Len() int { func (p *SingleConnPool) Len() int {
if p.cnClosed { switch atomic.LoadUint32(&p.state) {
case stateDefault:
return 0 return 0
} case stateInited:
return 1 return 1
case stateClosed:
return 0
default:
panic("not reached")
}
} }
func (p *SingleConnPool) IdleLen() int { func (p *SingleConnPool) IdleLen() int {
return 0 return len(p.ch)
} }
func (p *SingleConnPool) Stats() *Stats { func (p *SingleConnPool) Stats() *Stats {
return nil return &Stats{}
} }
func (p *SingleConnPool) Close() error { func (p *SingleConnPool) Close() error {
level := atomic.AddInt32(&p.level, -1)
if level > 0 {
return nil return nil
} }
for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
p.freeConn(cn)
}
return nil
}
}
return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
}
func (p *SingleConnPool) Reset() error {
if !atomic.CompareAndSwapUint32(&p._hasBadConn, 1, 0) {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
p.pool.Remove(cn)
default:
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
}
return nil
}
func (p *SingleConnPool) hasBadConn() bool {
return atomic.LoadUint32(&p._hasBadConn) == 1
}

View File

@ -142,7 +142,7 @@ func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
if c.cn != cn { if c.cn != cn {
return return
} }
if internal.IsBadConn(err, allowTimeout) { if isBadConn(err, allowTimeout) {
c.reconnect(err) c.reconnect(err)
} }
} }

View File

@ -183,7 +183,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
c.limiter.ReportResult(err) c.limiter.ReportResult(err)
} }
if internal.IsBadConn(err, false) { if isBadConn(err, false) {
c.connPool.Remove(cn) c.connPool.Remove(cn)
} else { } else {
c.connPool.Put(cn) c.connPool.Put(cn)
@ -195,7 +195,7 @@ func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
c.limiter.ReportResult(err) c.limiter.ReportResult(err)
} }
if err == nil || internal.IsRedisError(err) { if err == nil || isRedisError(err) {
c.connPool.Put(cn) c.connPool.Put(cn)
} else { } else {
c.connPool.Remove(cn) c.connPool.Remove(cn)
@ -215,7 +215,10 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
return nil return nil
} }
conn := newConn(ctx, c.opt, cn) connPool := pool.NewSingleConnPool(nil)
connPool.SetConn(cn)
conn := newConn(ctx, c.opt, connPool)
_, err := conn.Pipelined(func(pipe Pipeliner) error { _, err := conn.Pipelined(func(pipe Pipeliner) error {
if c.opt.Password != "" { if c.opt.Password != "" {
pipe.Auth(c.opt.Password) pipe.Auth(c.opt.Password)
@ -252,7 +255,7 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
cn, err := c.getConn(ctx) cn, err := c.getConn(ctx)
if err != nil { if err != nil {
cmd.setErr(err) cmd.setErr(err)
if internal.IsRetryableError(err, true) { if isRetryableError(err, true) {
continue continue
} }
return err return err
@ -264,7 +267,7 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
if err != nil { if err != nil {
c.releaseConn(cn, err) c.releaseConn(cn, err)
cmd.setErr(err) cmd.setErr(err)
if internal.IsRetryableError(err, true) { if isRetryableError(err, true) {
continue continue
} }
return err return err
@ -272,7 +275,7 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply) err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
c.releaseConn(cn, err) c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { if err != nil && isRetryableError(err, cmd.readTimeout() == nil) {
continue continue
} }
@ -347,7 +350,7 @@ func (c *baseClient) generalProcessPipeline(
canRetry, err := p(ctx, cn, cmds) canRetry, err := p(ctx, cn, cmds)
c.releaseConnStrict(cn, err) c.releaseConnStrict(cn, err)
if !canRetry || !internal.IsRetryableError(err, true) { if !canRetry || !isRetryableError(err, true) {
break break
} }
} }
@ -374,7 +377,7 @@ func (c *baseClient) pipelineProcessCmds(
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
for _, cmd := range cmds { for _, cmd := range cmds {
err := cmd.readReply(rd) err := cmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) { if err != nil && !isRedisError(err) {
return err return err
} }
} }
@ -421,7 +424,7 @@ func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
for range cmds { for range cmds {
err = statusCmd.readReply(rd) err = statusCmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) { if err != nil && !isRedisError(err) {
return err return err
} }
} }
@ -500,6 +503,10 @@ func (c *Client) WithContext(ctx context.Context) *Client {
return &clone return &clone
} }
func (c *Client) Conn() *Conn {
return newConn(c.ctx, c.opt, pool.NewSingleConnPool(c.connPool))
}
// Do creates a Cmd from the args and processes the cmd. // Do creates a Cmd from the args and processes the cmd.
func (c *Client) Do(args ...interface{}) *Cmd { func (c *Client) Do(args ...interface{}) *Cmd {
return c.DoContext(c.ctx, args...) return c.DoContext(c.ctx, args...)
@ -643,12 +650,12 @@ type Conn struct {
ctx context.Context ctx context.Context
} }
func newConn(ctx context.Context, opt *Options, cn *pool.Conn) *Conn { func newConn(ctx context.Context, opt *Options, connPool pool.Pooler) *Conn {
c := Conn{ c := Conn{
conn: &conn{ conn: &conn{
baseClient: baseClient{ baseClient: baseClient{
opt: opt, opt: opt,
connPool: pool.NewSingleConnPool(cn), connPool: connPool,
}, },
}, },
ctx: ctx, ctx: ctx,

View File

@ -568,7 +568,7 @@ func (c *Ring) process(ctx context.Context, cmd Cmder) error {
if err == nil { if err == nil {
return nil return nil
} }
if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { if !isRetryableError(err, cmd.readTimeout() == nil) {
return err return err
} }
} }
@ -662,7 +662,7 @@ func (c *Ring) generalProcessPipeline(
} }
shard.Client.releaseConnStrict(cn, err) shard.Client.releaseConnStrict(cn, err)
if canRetry && internal.IsRetryableError(err, true) { if canRetry && isRetryableError(err, true) {
mu.Lock() mu.Lock()
if failedCmdsMap == nil { if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder) failedCmdsMap = make(map[string][]Cmder)