From 1173a9589f1e25b15a43e155eb5443aa213453fb Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 11 Sep 2017 08:58:56 +0300 Subject: [PATCH 1/2] Cleanup code --- cluster.go | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/cluster.go b/cluster.go index 72bace76..487faa9f 100644 --- a/cluster.go +++ b/cluster.go @@ -205,17 +205,19 @@ func (c *clusterNodes) Close() error { return firstErr } -func (c *clusterNodes) Err() error { +func (c *clusterNodes) Addrs() ([]string, error) { c.mu.RLock() - defer c.mu.RUnlock() + closed := c.closed + addrs := c.addrs + c.mu.RUnlock() - if c.closed { - return pool.ErrClosed + if closed { + return nil, pool.ErrClosed } - if len(c.addrs) == 0 { - return errClusterNoNodes + if len(addrs) == 0 { + return nil, errClusterNoNodes } - return nil + return addrs, nil } func (c *clusterNodes) NextGeneration() uint32 { @@ -298,16 +300,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { } func (c *clusterNodes) Random() (*clusterNode, error) { - c.mu.RLock() - closed := c.closed - addrs := c.addrs - c.mu.RUnlock() - - if closed { - return nil, pool.ErrClosed - } - if len(addrs) == 0 { - return nil, errClusterNoNodes + addrs, err := c.Addrs() + if err != nil { + return nil, err } var nodeErr error @@ -504,7 +499,8 @@ func (c *ClusterClient) state() (*clusterState, error) { return v.(*clusterState), nil } - if err := c.nodes.Err(); err != nil { + _, err := c.nodes.Addrs() + if err != nil { return nil, err } From 5294b5dae18eff1e93fa83164d4f9dcf6f3aebe7 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 11 Sep 2017 09:10:17 +0300 Subject: [PATCH 2/2] Add PoolStats.StaleConns and enable logging by default --- cluster.go | 15 +++++---------- internal/pool/pool.go | 19 +++++++++---------- main_test.go | 5 ----- options.go | 5 +++-- redis.go | 5 +++++ sentinel.go | 6 ++++-- 6 files changed, 26 insertions(+), 29 deletions(-) diff --git a/cluster.go b/cluster.go index 487faa9f..6f435251 100644 --- a/cluster.go +++ b/cluster.go @@ -804,8 +804,10 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Requests += s.Requests acc.Hits += s.Hits acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns + acc.StaleConns += s.StaleConns } for _, node := range state.slaves { @@ -813,8 +815,10 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Requests += s.Requests acc.Hits += s.Hits acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns + acc.StaleConns += s.StaleConns } return &acc @@ -873,21 +877,12 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { break } - var n int for _, node := range nodes { - nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() + _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() if err != nil { internal.Logf("ReapStaleConns failed: %s", err) - } else { - n += nn } } - - s := c.PoolStats() - internal.Logf( - "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", - n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, - ) } } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 25e78aa3..26a891bf 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -27,8 +27,9 @@ type Stats struct { Hits uint32 // number of times free connection was found in the pool Timeouts uint32 // number of times a wait timeout occurred - TotalConns uint32 // the number of total connections in the pool - FreeConns uint32 // the number of free connections in the pool + TotalConns uint32 // number of total connections in the pool + FreeConns uint32 // number of free connections in the pool + StaleConns uint32 // number of stale connections removed from the pool } type Pooler interface { @@ -265,11 +266,13 @@ func (p *ConnPool) FreeLen() int { func (p *ConnPool) Stats() *Stats { return &Stats{ - Requests: atomic.LoadUint32(&p.stats.Requests), - Hits: atomic.LoadUint32(&p.stats.Hits), - Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + Requests: atomic.LoadUint32(&p.stats.Requests), + Hits: atomic.LoadUint32(&p.stats.Hits), + Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + TotalConns: uint32(p.Len()), FreeConns: uint32(p.FreeLen()), + StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } } @@ -362,10 +365,6 @@ func (p *ConnPool) reaper(frequency time.Duration) { internal.Logf("ReapStaleConns failed: %s", err) continue } - s := p.Stats() - internal.Logf( - "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", - n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, - ) + atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } diff --git a/main_test.go b/main_test.go index 64f25d99..7c5a6a96 100644 --- a/main_test.go +++ b/main_test.go @@ -3,7 +3,6 @@ package redis_test import ( "errors" "fmt" - "log" "net" "os" "os/exec" @@ -51,10 +50,6 @@ var cluster = &clusterScenario{ clients: make(map[string]*redis.Client, 6), } -func init() { - redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) -} - var _ = BeforeSuite(func() { var err error diff --git a/options.go b/options.go index dea04545..6c4e68d2 100644 --- a/options.go +++ b/options.go @@ -205,6 +205,7 @@ type PoolStats struct { Hits uint32 // number of times free connection was found in the pool Timeouts uint32 // number of times a wait timeout occurred - TotalConns uint32 // the number of total connections in the pool - FreeConns uint32 // the number of free connections in the pool + TotalConns uint32 // number of total connections in the pool + FreeConns uint32 // number of free connections in the pool + StaleConns uint32 // number of stale connections removed from the pool } diff --git a/redis.go b/redis.go index db1f39c3..d18a152e 100644 --- a/redis.go +++ b/redis.go @@ -3,6 +3,7 @@ package redis import ( "fmt" "log" + "os" "time" "github.com/go-redis/redis/internal" @@ -13,6 +14,10 @@ import ( // Redis nil reply, .e.g. when key does not exist. const Nil = internal.Nil +func init() { + SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) +} + func SetLogger(logger *log.Logger) { internal.Logger = logger } diff --git a/sentinel.go b/sentinel.go index 3bfdb4a3..37d06b48 100644 --- a/sentinel.go +++ b/sentinel.go @@ -301,8 +301,10 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) { msg, err := pubsub.ReceiveMessage() if err != nil { - internal.Logf("sentinel: ReceiveMessage failed: %s", err) - pubsub.Close() + if err != pool.ErrClosed { + internal.Logf("sentinel: ReceiveMessage failed: %s", err) + pubsub.Close() + } d.resetSentinel() return }