From f7a4bd5023cc73904b5e3bd6355837a990d7d5e7 Mon Sep 17 00:00:00 2001 From: Anatolii Mihailenco Date: Tue, 19 Jan 2016 18:36:40 +0200 Subject: [PATCH] Add pool instrumentation. --- cluster.go | 17 +++++++++++++++++ cluster_test.go | 4 ++++ pool.go | 26 ++++++++++++++++++++++++++ redis.go | 6 ++++++ redis_test.go | 4 ++++ 5 files changed, 57 insertions(+) diff --git a/cluster.go b/cluster.go index 2015a85e..d3139fd7 100644 --- a/cluster.go +++ b/cluster.go @@ -56,6 +56,22 @@ func (c *ClusterClient) Watch(keys ...string) (*Multi, error) { return client.Watch(keys...) } +// PoolStats returns accumulated connection pool stats +func (c *ClusterClient) PoolStats() *PoolStats { + acc := PoolStats{} + c.clientsMx.RLock() + for _, client := range c.clients { + m := client.PoolStats() + acc.TotalConns += m.TotalConns + acc.FreeConns += m.FreeConns + acc.Requests += m.Requests + acc.Waits += m.Waits + acc.Timeouts += m.Timeouts + } + c.clientsMx.RUnlock() + return &acc +} + // Close closes the cluster client, releasing any open resources. // // It is rare to Close a ClusterClient, as the ClusterClient is meant @@ -306,6 +322,7 @@ type ClusterOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration + // PoolSize applies per redis node and not for the whole cluster. PoolSize int PoolTimeout time.Duration IdleTimeout time.Duration diff --git a/cluster_test.go b/cluster_test.go index 8f6b5e64..80e1fbba 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -313,6 +313,10 @@ var _ = Describe("Cluster", func() { Expect(cnt).To(Equal(int64(1))) }) + It("should return pool stats", func() { + Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{})) + }) + It("should follow redirects", func() { Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) diff --git a/pool.go b/pool.go index 5ed72097..d007f1a7 100644 --- a/pool.go +++ b/pool.go @@ -16,6 +16,16 @@ var ( errPoolTimeout = errors.New("redis: connection pool timeout") ) +// PoolStats contains pool state information and accumulated stats +type PoolStats struct { + Requests uint64 // number of times a connection was requested by the pool + Waits uint64 // number of times our pool had to wait for a connection to avail + Timeouts uint64 // number of times a wait timeout occurred + + TotalConns uint64 // the number of total connections in the pool + FreeConns uint64 // the number of free connections in the pool +} + type pool interface { First() *conn Get() (*conn, bool, error) @@ -24,6 +34,7 @@ type pool interface { Len() int FreeLen() int Close() error + Stats() PoolStats } type connList struct { @@ -127,6 +138,7 @@ type connPool struct { opt *Options conns *connList freeConns chan *conn + stats PoolStats _closed int32 @@ -226,6 +238,8 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) { return } + atomic.AddUint64(&p.stats.Requests, 1) + // Fetch first non-idle connection, if available. if cn = p.First(); cn != nil { return @@ -244,10 +258,12 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) { } // Otherwise, wait for the available connection. + atomic.AddUint64(&p.stats.Waits, 1) if cn = p.wait(); cn != nil { return } + atomic.AddUint64(&p.stats.Timeouts, 1) err = errPoolTimeout return } @@ -298,6 +314,12 @@ func (p *connPool) FreeLen() int { return len(p.freeConns) } +func (p *connPool) Stats() PoolStats { + p.stats.TotalConns = uint64(p.Len()) + p.stats.FreeConns = uint64(p.FreeLen()) + return p.stats +} + func (p *connPool) Close() (retErr error) { if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) { return errClosed @@ -386,6 +408,8 @@ func (p *singleConnPool) FreeLen() int { return 0 } +func (p *singleConnPool) Stats() PoolStats { return PoolStats{} } + func (p *singleConnPool) Close() error { return nil } @@ -493,6 +517,8 @@ func (p *stickyConnPool) FreeLen() int { return 0 } +func (p *stickyConnPool) Stats() PoolStats { return PoolStats{} } + func (p *stickyConnPool) Reset(reason error) (err error) { p.mx.Lock() if p.cn != nil { diff --git a/redis.go b/redis.go index 087564fd..abfbc6bb 100644 --- a/redis.go +++ b/redis.go @@ -192,3 +192,9 @@ func NewClient(opt *Options) *Client { pool := newConnPool(opt) return newClient(opt, pool) } + +// PoolStats returns connection pool stats +func (c *Client) PoolStats() *PoolStats { + stats := c.baseClient.connPool.Stats() + return &stats +} diff --git a/redis_test.go b/redis_test.go index f1ebf62a..38222025 100644 --- a/redis_test.go +++ b/redis_test.go @@ -35,6 +35,10 @@ var _ = Describe("Client", func() { Expect(val).To(Equal("PONG")) }) + It("should return pool stats", func() { + Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{})) + }) + It("should support custom dialers", func() { custom := redis.NewClient(&redis.Options{ Dialer: func() (net.Conn, error) {