Merge pull request #634 from go-redis/fix/stale-conns

Fix/stale conns
This commit is contained in:
Vladimir Mihailenco 2017-09-11 09:16:46 +03:00 committed by GitHub
commit 6ee2076b1d
6 changed files with 40 additions and 47 deletions

View File

@ -205,17 +205,19 @@ func (c *clusterNodes) Close() error {
return firstErr return firstErr
} }
func (c *clusterNodes) Err() error { func (c *clusterNodes) Addrs() ([]string, error) {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() closed := c.closed
addrs := c.addrs
c.mu.RUnlock()
if c.closed { if closed {
return pool.ErrClosed return nil, pool.ErrClosed
} }
if len(c.addrs) == 0 { if len(addrs) == 0 {
return errClusterNoNodes return nil, errClusterNoNodes
} }
return nil return addrs, nil
} }
func (c *clusterNodes) NextGeneration() uint32 { func (c *clusterNodes) NextGeneration() uint32 {
@ -298,16 +300,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
} }
func (c *clusterNodes) Random() (*clusterNode, error) { func (c *clusterNodes) Random() (*clusterNode, error) {
c.mu.RLock() addrs, err := c.Addrs()
closed := c.closed if err != nil {
addrs := c.addrs return nil, err
c.mu.RUnlock()
if closed {
return nil, pool.ErrClosed
}
if len(addrs) == 0 {
return nil, errClusterNoNodes
} }
var nodeErr error var nodeErr error
@ -504,7 +499,8 @@ func (c *ClusterClient) state() (*clusterState, error) {
return v.(*clusterState), nil return v.(*clusterState), nil
} }
if err := c.nodes.Err(); err != nil { _, err := c.nodes.Addrs()
if err != nil {
return nil, err return nil, err
} }
@ -808,8 +804,10 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Requests += s.Requests acc.Requests += s.Requests
acc.Hits += s.Hits acc.Hits += s.Hits
acc.Timeouts += s.Timeouts acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns acc.FreeConns += s.FreeConns
acc.StaleConns += s.StaleConns
} }
for _, node := range state.slaves { for _, node := range state.slaves {
@ -817,8 +815,10 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Requests += s.Requests acc.Requests += s.Requests
acc.Hits += s.Hits acc.Hits += s.Hits
acc.Timeouts += s.Timeouts acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns acc.FreeConns += s.FreeConns
acc.StaleConns += s.StaleConns
} }
return &acc return &acc
@ -877,21 +877,12 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
break break
} }
var n int
for _, node := range nodes { for _, node := range nodes {
nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil { if err != nil {
internal.Logf("ReapStaleConns failed: %s", err) internal.Logf("ReapStaleConns failed: %s", err)
} else {
n += nn
} }
} }
s := c.PoolStats()
internal.Logf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
} }
} }

View File

@ -27,8 +27,9 @@ type Stats struct {
Hits uint32 // number of times free connection was found in the pool Hits uint32 // number of times free connection was found in the pool
Timeouts uint32 // number of times a wait timeout occurred Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // the number of total connections in the pool TotalConns uint32 // number of total connections in the pool
FreeConns uint32 // the number of free connections in the pool FreeConns uint32 // number of free connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
} }
type Pooler interface { type Pooler interface {
@ -265,11 +266,13 @@ func (p *ConnPool) FreeLen() int {
func (p *ConnPool) Stats() *Stats { func (p *ConnPool) Stats() *Stats {
return &Stats{ return &Stats{
Requests: atomic.LoadUint32(&p.stats.Requests), Requests: atomic.LoadUint32(&p.stats.Requests),
Hits: atomic.LoadUint32(&p.stats.Hits), Hits: atomic.LoadUint32(&p.stats.Hits),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts), Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()), TotalConns: uint32(p.Len()),
FreeConns: uint32(p.FreeLen()), FreeConns: uint32(p.FreeLen()),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
} }
} }
@ -362,10 +365,6 @@ func (p *ConnPool) reaper(frequency time.Duration) {
internal.Logf("ReapStaleConns failed: %s", err) internal.Logf("ReapStaleConns failed: %s", err)
continue continue
} }
s := p.Stats() atomic.AddUint32(&p.stats.StaleConns, uint32(n))
internal.Logf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
} }
} }

View File

@ -3,7 +3,6 @@ package redis_test
import ( import (
"errors" "errors"
"fmt" "fmt"
"log"
"net" "net"
"os" "os"
"os/exec" "os/exec"
@ -51,10 +50,6 @@ var cluster = &clusterScenario{
clients: make(map[string]*redis.Client, 6), clients: make(map[string]*redis.Client, 6),
} }
func init() {
redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
}
var _ = BeforeSuite(func() { var _ = BeforeSuite(func() {
var err error var err error

View File

@ -205,6 +205,7 @@ type PoolStats struct {
Hits uint32 // number of times free connection was found in the pool Hits uint32 // number of times free connection was found in the pool
Timeouts uint32 // number of times a wait timeout occurred Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // the number of total connections in the pool TotalConns uint32 // number of total connections in the pool
FreeConns uint32 // the number of free connections in the pool FreeConns uint32 // number of free connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
} }

View File

@ -3,6 +3,7 @@ package redis
import ( import (
"fmt" "fmt"
"log" "log"
"os"
"time" "time"
"github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal"
@ -13,6 +14,10 @@ import (
// Redis nil reply, .e.g. when key does not exist. // Redis nil reply, .e.g. when key does not exist.
const Nil = internal.Nil const Nil = internal.Nil
func init() {
SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
}
func SetLogger(logger *log.Logger) { func SetLogger(logger *log.Logger) {
internal.Logger = logger internal.Logger = logger
} }

View File

@ -301,8 +301,10 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) {
msg, err := pubsub.ReceiveMessage() msg, err := pubsub.ReceiveMessage()
if err != nil { if err != nil {
internal.Logf("sentinel: ReceiveMessage failed: %s", err) if err != pool.ErrClosed {
pubsub.Close() internal.Logf("sentinel: ReceiveMessage failed: %s", err)
pubsub.Close()
}
d.resetSentinel() d.resetSentinel()
return return
} }