mirror of https://github.com/go-redis/redis.git
Merge pull request #70 from go-redis/feature/faster-pool
Faster, lock-free pool
This commit is contained in:
commit
f9d30778d2
|
@ -4,7 +4,6 @@ services:
|
||||||
- redis-server
|
- redis-server
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.2
|
|
||||||
- 1.3
|
- 1.3
|
||||||
- 1.4
|
- 1.4
|
||||||
- tip
|
- tip
|
||||||
|
|
|
@ -150,7 +150,9 @@ var _ = Describe("Command", func() {
|
||||||
wg.Add(n)
|
wg.Add(n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := client.Incr(key).Err()
|
err := client.Incr(key).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -18,6 +18,7 @@ var _ = Describe("Commands", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewTCPClient(&redis.Options{
|
client = redis.NewTCPClient(&redis.Options{
|
||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
|
PoolTimeout: 30 * time.Second,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1116,6 +1117,8 @@ var _ = Describe("Commands", func() {
|
||||||
started := make(chan bool)
|
started := make(chan bool)
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
|
||||||
started <- true
|
started <- true
|
||||||
bLPop := client.BLPop(0, "list")
|
bLPop := client.BLPop(0, "list")
|
||||||
Expect(bLPop.Err()).NotTo(HaveOccurred())
|
Expect(bLPop.Err()).NotTo(HaveOccurred())
|
||||||
|
@ -1161,6 +1164,8 @@ var _ = Describe("Commands", func() {
|
||||||
started := make(chan bool)
|
started := make(chan bool)
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
|
||||||
started <- true
|
started <- true
|
||||||
brpop := client.BRPop(0, "list")
|
brpop := client.BRPop(0, "list")
|
||||||
Expect(brpop.Err()).NotTo(HaveOccurred())
|
Expect(brpop.Err()).NotTo(HaveOccurred())
|
||||||
|
@ -2190,7 +2195,9 @@ var _ = Describe("Commands", func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
cmds, err := safeIncr()
|
cmds, err := safeIncr()
|
||||||
if err == redis.TxFailedErr {
|
if err == redis.TxFailedErr {
|
||||||
|
|
181
pool.go
181
pool.go
|
@ -1,12 +1,12 @@
|
||||||
package redis
|
package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/bufio.v1"
|
"gopkg.in/bufio.v1"
|
||||||
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errClosed = errors.New("redis: client is closed")
|
errClosed = errors.New("redis: client is closed")
|
||||||
|
errPoolTimeout = errors.New("redis: connection pool timeout")
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -37,13 +38,9 @@ type conn struct {
|
||||||
rd *bufio.Reader
|
rd *bufio.Reader
|
||||||
buf []byte
|
buf []byte
|
||||||
|
|
||||||
inUse bool
|
|
||||||
usedAt time.Time
|
usedAt time.Time
|
||||||
|
|
||||||
readTimeout time.Duration
|
readTimeout time.Duration
|
||||||
writeTimeout time.Duration
|
writeTimeout time.Duration
|
||||||
|
|
||||||
elem *list.Element
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
|
func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
|
||||||
|
@ -87,6 +84,10 @@ func (cn *conn) Close() error {
|
||||||
return cn.netcn.Close()
|
return cn.netcn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cn *conn) isIdle(timeout time.Duration) bool {
|
||||||
|
return timeout > 0 && time.Since(cn.usedAt) > timeout
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type connPool struct {
|
type connPool struct {
|
||||||
|
@ -94,12 +95,10 @@ type connPool struct {
|
||||||
rl *rateLimiter
|
rl *rateLimiter
|
||||||
|
|
||||||
opt *options
|
opt *options
|
||||||
|
conns chan *conn
|
||||||
|
|
||||||
cond *sync.Cond
|
size int32
|
||||||
conns *list.List
|
closed int32
|
||||||
|
|
||||||
idleNum int
|
|
||||||
closed bool
|
|
||||||
|
|
||||||
lastDialErr error
|
lastDialErr error
|
||||||
}
|
}
|
||||||
|
@ -110,12 +109,46 @@ func newConnPool(dial func() (*conn, error), opt *options) *connPool {
|
||||||
rl: newRateLimiter(time.Second, 2*opt.PoolSize),
|
rl: newRateLimiter(time.Second, 2*opt.PoolSize),
|
||||||
|
|
||||||
opt: opt,
|
opt: opt,
|
||||||
|
conns: make(chan *conn, opt.PoolSize),
|
||||||
cond: sync.NewCond(&sync.Mutex{}),
|
|
||||||
conns: list.New(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *connPool) isClosed() bool { return atomic.LoadInt32(&p.closed) > 0 }
|
||||||
|
|
||||||
|
// First available connection, non-blocking
|
||||||
|
func (p *connPool) first() *conn {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case cn := <-p.conns:
|
||||||
|
if !cn.isIdle(p.opt.IdleTimeout) {
|
||||||
|
return cn
|
||||||
|
}
|
||||||
|
p.remove(cn)
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic("not reached")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for available connection, blocking
|
||||||
|
func (p *connPool) wait() (*conn, error) {
|
||||||
|
deadline := time.After(p.opt.PoolTimeout)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case cn := <-p.conns:
|
||||||
|
if !cn.isIdle(p.opt.IdleTimeout) {
|
||||||
|
return cn, nil
|
||||||
|
}
|
||||||
|
p.remove(cn)
|
||||||
|
case <-deadline:
|
||||||
|
return nil, errPoolTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic("not reached")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Establish a new connection
|
||||||
func (p *connPool) new() (*conn, error) {
|
func (p *connPool) new() (*conn, error) {
|
||||||
if !p.rl.Check() {
|
if !p.rl.Check() {
|
||||||
err := fmt.Errorf(
|
err := fmt.Errorf(
|
||||||
|
@ -132,60 +165,29 @@ func (p *connPool) new() (*conn, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) Get() (*conn, bool, error) {
|
func (p *connPool) Get() (*conn, bool, error) {
|
||||||
p.cond.L.Lock()
|
if p.isClosed() {
|
||||||
|
|
||||||
if p.closed {
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil, false, errClosed
|
return nil, false, errClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.opt.IdleTimeout > 0 {
|
// Fetch first non-idle connection, if available
|
||||||
for el := p.conns.Front(); el != nil; el = el.Next() {
|
if cn := p.first(); cn != nil {
|
||||||
cn := el.Value.(*conn)
|
|
||||||
if cn.inUse {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if time.Since(cn.usedAt) > p.opt.IdleTimeout {
|
|
||||||
if err := p.remove(cn); err != nil {
|
|
||||||
log.Printf("remove failed: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {
|
|
||||||
p.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.idleNum > 0 {
|
|
||||||
elem := p.conns.Front()
|
|
||||||
cn := elem.Value.(*conn)
|
|
||||||
if cn.inUse {
|
|
||||||
panic("pool: precondition failed")
|
|
||||||
}
|
|
||||||
cn.inUse = true
|
|
||||||
p.conns.MoveToBack(elem)
|
|
||||||
p.idleNum--
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return cn, false, nil
|
return cn, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.conns.Len() < p.opt.PoolSize {
|
// Try to create a new one
|
||||||
|
if ref := atomic.AddInt32(&p.size, 1); int(ref) <= p.opt.PoolSize {
|
||||||
cn, err := p.new()
|
cn, err := p.new()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.cond.L.Unlock()
|
atomic.AddInt32(&p.size, -1) // Undo ref increment
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cn.inUse = true
|
|
||||||
cn.elem = p.conns.PushBack(cn)
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return cn, true, nil
|
return cn, true, nil
|
||||||
}
|
}
|
||||||
|
atomic.AddInt32(&p.size, -1)
|
||||||
|
|
||||||
panic("not reached")
|
// Otherwise, wait for the available connection
|
||||||
|
cn, err := p.wait()
|
||||||
|
return cn, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) Put(cn *conn) error {
|
func (p *connPool) Put(cn *conn) error {
|
||||||
|
@ -195,92 +197,67 @@ func (p *connPool) Put(cn *conn) error {
|
||||||
return p.Remove(cn)
|
return p.Remove(cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.isClosed() {
|
||||||
|
return errClosed
|
||||||
|
}
|
||||||
if p.opt.IdleTimeout > 0 {
|
if p.opt.IdleTimeout > 0 {
|
||||||
cn.usedAt = time.Now()
|
cn.usedAt = time.Now()
|
||||||
}
|
}
|
||||||
|
p.conns <- cn
|
||||||
p.cond.L.Lock()
|
|
||||||
if p.closed {
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return errClosed
|
|
||||||
}
|
|
||||||
cn.inUse = false
|
|
||||||
p.conns.MoveToFront(cn.elem)
|
|
||||||
p.idleNum++
|
|
||||||
p.cond.Signal()
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) Remove(cn *conn) error {
|
func (p *connPool) Remove(cn *conn) error {
|
||||||
p.cond.L.Lock()
|
if p.isClosed() {
|
||||||
if p.closed {
|
|
||||||
// Noop, connection is already closed.
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err := p.remove(cn)
|
return p.remove(cn)
|
||||||
p.cond.Signal()
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) remove(cn *conn) error {
|
func (p *connPool) remove(cn *conn) error {
|
||||||
p.conns.Remove(cn.elem)
|
atomic.AddInt32(&p.size, -1)
|
||||||
cn.elem = nil
|
|
||||||
if !cn.inUse {
|
|
||||||
p.idleNum--
|
|
||||||
}
|
|
||||||
return cn.Close()
|
return cn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns number of idle connections.
|
// Len returns number of idle connections.
|
||||||
func (p *connPool) Len() int {
|
func (p *connPool) Len() int {
|
||||||
defer p.cond.L.Unlock()
|
return len(p.conns)
|
||||||
p.cond.L.Lock()
|
|
||||||
return p.idleNum
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns number of connections in the pool.
|
// Size returns number of connections in the pool.
|
||||||
func (p *connPool) Size() int {
|
func (p *connPool) Size() int {
|
||||||
defer p.cond.L.Unlock()
|
return int(atomic.LoadInt32(&p.size))
|
||||||
p.cond.L.Lock()
|
|
||||||
return p.conns.Len()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) Filter(f func(*conn) bool) {
|
func (p *connPool) Filter(f func(*conn) bool) {
|
||||||
p.cond.L.Lock()
|
for {
|
||||||
for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {
|
select {
|
||||||
next = el.Next()
|
case cn := <-p.conns:
|
||||||
cn := el.Value.(*conn)
|
|
||||||
if !f(cn) {
|
if !f(cn) {
|
||||||
p.remove(cn)
|
p.remove(cn)
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
p.cond.L.Unlock()
|
}
|
||||||
|
panic("not reached")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) Close() error {
|
func (p *connPool) Close() (err error) {
|
||||||
defer p.cond.L.Unlock()
|
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
|
||||||
p.cond.L.Lock()
|
|
||||||
if p.closed {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
p.closed = true
|
|
||||||
p.rl.Close()
|
p.rl.Close()
|
||||||
var retErr error
|
|
||||||
for {
|
for {
|
||||||
e := p.conns.Front()
|
if p.Size() < 1 {
|
||||||
if e == nil {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := p.remove(e.Value.(*conn)); err != nil {
|
if e := p.remove(<-p.conns); e != nil {
|
||||||
log.Printf("cn.Close failed: %s", err)
|
err = e
|
||||||
retErr = err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return retErr
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
25
pool_test.go
25
pool_test.go
|
@ -2,6 +2,8 @@ package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
@ -15,7 +17,9 @@ var _ = Describe("Pool", func() {
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
cb()
|
cb()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -130,3 +134,24 @@ var _ = Describe("Pool", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
func BenchmarkPool(b *testing.B) {
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: redisAddr,
|
||||||
|
IdleTimeout: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
pool := client.Pool()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
conn, _, err := pool.Get()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("no error expected on pool.Get but received: %s", err.Error())
|
||||||
|
}
|
||||||
|
if err = pool.Put(conn); err != nil {
|
||||||
|
b.Fatalf("no error expected on pool.Put but received: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
32
redis.go
32
redis.go
|
@ -148,25 +148,49 @@ type options struct {
|
||||||
WriteTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
|
|
||||||
PoolSize int
|
PoolSize int
|
||||||
|
PoolTimeout time.Duration
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
|
// The network type, either "tcp" or "unix".
|
||||||
|
// Default: "tcp"
|
||||||
Network string
|
Network string
|
||||||
|
// The network address.
|
||||||
Addr string
|
Addr string
|
||||||
|
|
||||||
// Dialer creates new network connection and has priority over
|
// Dialer creates new network connection and has priority over
|
||||||
// Network and Addr options.
|
// Network and Addr options.
|
||||||
Dialer func() (net.Conn, error)
|
Dialer func() (net.Conn, error)
|
||||||
|
|
||||||
|
// An optional password. Must match the password specified in the
|
||||||
|
// `requirepass` server configuration option.
|
||||||
Password string
|
Password string
|
||||||
|
// Select a database.
|
||||||
|
// Default: 0
|
||||||
DB int64
|
DB int64
|
||||||
|
|
||||||
|
// Sets the deadline for establishing new connections. If reached,
|
||||||
|
// deal attepts will fail with a timeout.
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
|
// Sets the deadline for socket reads. If reached, commands will
|
||||||
|
// fail with a timeout instead of blocking.
|
||||||
ReadTimeout time.Duration
|
ReadTimeout time.Duration
|
||||||
|
// Sets the deadline for socket writes. If reached, commands will
|
||||||
|
// fail with a timeout instead of blocking.
|
||||||
WriteTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
|
|
||||||
|
// The maximum number of socket connections.
|
||||||
|
// Default: 10
|
||||||
PoolSize int
|
PoolSize int
|
||||||
|
// If all socket connections is the pool are busy, the pool will wait
|
||||||
|
// this amount of time for a conection to become available, before
|
||||||
|
// returning an error.
|
||||||
|
// Default: 5s
|
||||||
|
PoolTimeout time.Duration
|
||||||
|
// Evict connections from the pool after they have been idle for longer
|
||||||
|
// than specified in this option.
|
||||||
|
// Default: 0 = no eviction
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,6 +215,13 @@ func (opt *Options) getDialTimeout() time.Duration {
|
||||||
return opt.DialTimeout
|
return opt.DialTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (opt *Options) getPoolTimeout() time.Duration {
|
||||||
|
if opt.PoolTimeout == 0 {
|
||||||
|
return 5 * time.Second
|
||||||
|
}
|
||||||
|
return opt.PoolTimeout
|
||||||
|
}
|
||||||
|
|
||||||
func (opt *Options) options() *options {
|
func (opt *Options) options() *options {
|
||||||
return &options{
|
return &options{
|
||||||
DB: opt.DB,
|
DB: opt.DB,
|
||||||
|
@ -201,6 +232,7 @@ func (opt *Options) options() *options {
|
||||||
WriteTimeout: opt.WriteTimeout,
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
|
||||||
PoolSize: opt.getPoolSize(),
|
PoolSize: opt.getPoolSize(),
|
||||||
|
PoolTimeout: opt.getPoolTimeout(),
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
35
sentinel.go
35
sentinel.go
|
@ -12,17 +12,39 @@ import (
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type FailoverOptions struct {
|
type FailoverOptions struct {
|
||||||
|
// The master name.
|
||||||
MasterName string
|
MasterName string
|
||||||
|
// Seed addresses of sentinel nodes.
|
||||||
SentinelAddrs []string
|
SentinelAddrs []string
|
||||||
|
|
||||||
|
// An optional password. Must match the password specified in the
|
||||||
|
// `requirepass` server configuration option.
|
||||||
Password string
|
Password string
|
||||||
|
// Select a database.
|
||||||
|
// Default: 0
|
||||||
DB int64
|
DB int64
|
||||||
|
|
||||||
PoolSize int
|
// Sets the deadline for establishing new connections. If reached,
|
||||||
|
// deal attepts will fail with a timeout.
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
|
// Sets the deadline for socket reads. If reached, commands will
|
||||||
|
// fail with a timeout instead of blocking.
|
||||||
ReadTimeout time.Duration
|
ReadTimeout time.Duration
|
||||||
|
// Sets the deadline for socket writes. If reached, commands will
|
||||||
|
// fail with a timeout instead of blocking.
|
||||||
WriteTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
|
|
||||||
|
// The maximum number of socket connections.
|
||||||
|
// Default: 10
|
||||||
|
PoolSize int
|
||||||
|
// If all socket connections is the pool are busy, the pool will wait
|
||||||
|
// this amount of time for a conection to become available, before
|
||||||
|
// returning an error.
|
||||||
|
// Default: 5s
|
||||||
|
PoolTimeout time.Duration
|
||||||
|
// Evict connections from the pool after they have been idle for longer
|
||||||
|
// than specified in this option.
|
||||||
|
// Default: 0 = no eviction
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,6 +55,13 @@ func (opt *FailoverOptions) getPoolSize() int {
|
||||||
return opt.PoolSize
|
return opt.PoolSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (opt *FailoverOptions) getPoolTimeout() time.Duration {
|
||||||
|
if opt.PoolTimeout == 0 {
|
||||||
|
return 5 * time.Second
|
||||||
|
}
|
||||||
|
return opt.PoolTimeout
|
||||||
|
}
|
||||||
|
|
||||||
func (opt *FailoverOptions) getDialTimeout() time.Duration {
|
func (opt *FailoverOptions) getDialTimeout() time.Duration {
|
||||||
if opt.DialTimeout == 0 {
|
if opt.DialTimeout == 0 {
|
||||||
return 5 * time.Second
|
return 5 * time.Second
|
||||||
|
@ -50,6 +79,7 @@ func (opt *FailoverOptions) options() *options {
|
||||||
WriteTimeout: opt.WriteTimeout,
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
|
||||||
PoolSize: opt.getPoolSize(),
|
PoolSize: opt.getPoolSize(),
|
||||||
|
PoolTimeout: opt.getPoolTimeout(),
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -169,6 +199,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) {
|
||||||
WriteTimeout: d.opt.WriteTimeout,
|
WriteTimeout: d.opt.WriteTimeout,
|
||||||
|
|
||||||
PoolSize: d.opt.PoolSize,
|
PoolSize: d.opt.PoolSize,
|
||||||
|
PoolTimeout: d.opt.PoolTimeout,
|
||||||
IdleTimeout: d.opt.IdleTimeout,
|
IdleTimeout: d.opt.IdleTimeout,
|
||||||
})
|
})
|
||||||
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
|
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
|
||||||
|
|
Loading…
Reference in New Issue