redis/internal/pool/pool_single.go

209 lines
3.8 KiB
Go
Raw Normal View History

package pool
2019-08-02 00:59:53 +03:00
import (
"context"
"fmt"
"sync/atomic"
)
2019-08-02 00:59:53 +03:00
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
2019-08-02 00:59:53 +03:00
)
2019-06-04 14:05:29 +03:00
2019-08-08 10:36:13 +03:00
type BadConnError struct {
wrapped error
}
2019-08-08 14:04:19 +03:00
var _ error = (*BadConnError)(nil)
2019-08-08 10:36:13 +03:00
func (e BadConnError) Error() string {
s := "redis: Conn is in a bad state"
if e.wrapped != nil {
s += ": " + e.wrapped.Error()
}
return s
2019-08-08 10:36:13 +03:00
}
func (e BadConnError) Unwrap() error {
return e.wrapped
}
type SingleConnPool struct {
2019-08-08 14:22:01 +03:00
pool Pooler
level int32 // atomic
state uint32 // atomic
ch chan *Conn
2019-08-08 10:36:13 +03:00
_badConnError atomic.Value
}
var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(pool Pooler) *SingleConnPool {
p, ok := pool.(*SingleConnPool)
if !ok {
p = &SingleConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.level, 1)
return p
}
func (p *SingleConnPool) SetConn(cn *Conn) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
p.ch <- cn
} else {
panic("not reached")
}
}
2019-08-18 17:03:32 +03:00
func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error) {
return p.pool.NewConn(ctx)
}
func (p *SingleConnPool) CloseConn(cn *Conn) error {
return p.pool.CloseConn(cn)
}
2019-08-18 17:03:32 +03:00
func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) {
// In worst case this races with Close which is not a very common operation.
for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
2019-08-18 17:03:32 +03:00
cn, err := p.pool.Get(ctx)
if err != nil {
return nil, err
}
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil
}
2019-08-08 10:36:13 +03:00
p.pool.Remove(cn, ErrClosed)
case stateInited:
2019-08-08 10:36:13 +03:00
if err := p.badConnError(); err != nil {
return nil, err
}
cn, ok := <-p.ch
if !ok {
return nil, ErrClosed
}
return cn, nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
2019-08-02 00:59:53 +03:00
}
return nil, fmt.Errorf("redis: SingleConnPool.Get: infinite loop")
}
2018-05-28 17:27:24 +03:00
func (p *SingleConnPool) Put(cn *Conn) {
defer func() {
if recover() != nil {
p.freeConn(cn)
}
}()
p.ch <- cn
}
func (p *SingleConnPool) freeConn(cn *Conn) {
2019-08-08 10:36:13 +03:00
if err := p.badConnError(); err != nil {
p.pool.Remove(cn, err)
} else {
p.pool.Put(cn)
}
}
2019-08-08 10:36:13 +03:00
func (p *SingleConnPool) Remove(cn *Conn, reason error) {
defer func() {
if recover() != nil {
2019-08-08 10:36:13 +03:00
p.pool.Remove(cn, ErrClosed)
}
}()
2019-08-08 10:36:13 +03:00
p._badConnError.Store(BadConnError{wrapped: reason})
p.ch <- cn
}
func (p *SingleConnPool) Len() int {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
2019-08-02 00:59:53 +03:00
return 0
case stateInited:
return 1
case stateClosed:
return 0
default:
panic("not reached")
2019-08-02 00:59:53 +03:00
}
}
2018-05-28 17:27:24 +03:00
func (p *SingleConnPool) IdleLen() int {
return len(p.ch)
}
func (p *SingleConnPool) Stats() *Stats {
return &Stats{}
}
func (p *SingleConnPool) Close() error {
level := atomic.AddInt32(&p.level, -1)
if level > 0 {
return nil
}
for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
p.freeConn(cn)
}
return nil
}
}
return fmt.Errorf("redis: SingleConnPool.Close: infinite loop")
}
func (p *SingleConnPool) Reset() error {
2019-08-08 10:36:13 +03:00
if p.badConnError() == nil {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
2019-08-08 10:36:13 +03:00
p.pool.Remove(cn, ErrClosed)
2019-08-08 14:04:19 +03:00
p._badConnError.Store(BadConnError{wrapped: nil})
default:
return fmt.Errorf("redis: SingleConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("redis: invalid SingleConnPool state: %d", state)
}
return nil
}
2019-08-08 10:36:13 +03:00
func (p *SingleConnPool) badConnError() error {
if v := p._badConnError.Load(); v != nil {
2019-08-08 14:04:19 +03:00
err := v.(BadConnError)
if err.wrapped != nil {
return err
}
2019-08-08 10:36:13 +03:00
}
return nil
}