Merge pull request #244 from anmic/feature/pool-metrics

Add pool instrumentation.
This commit is contained in:
Vladimir Mihailenco 2016-01-25 15:47:06 +02:00
commit d2b13f523d
5 changed files with 57 additions and 0 deletions

View File

@ -56,6 +56,22 @@ func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
return client.Watch(keys...) return client.Watch(keys...)
} }
// PoolStats returns accumulated connection pool stats
func (c *ClusterClient) PoolStats() *PoolStats {
acc := PoolStats{}
c.clientsMx.RLock()
for _, client := range c.clients {
m := client.PoolStats()
acc.TotalConns += m.TotalConns
acc.FreeConns += m.FreeConns
acc.Requests += m.Requests
acc.Waits += m.Waits
acc.Timeouts += m.Timeouts
}
c.clientsMx.RUnlock()
return &acc
}
// Close closes the cluster client, releasing any open resources. // Close closes the cluster client, releasing any open resources.
// //
// It is rare to Close a ClusterClient, as the ClusterClient is meant // It is rare to Close a ClusterClient, as the ClusterClient is meant
@ -306,6 +322,7 @@ type ClusterOptions struct {
ReadTimeout time.Duration ReadTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
// PoolSize applies per redis node and not for the whole cluster.
PoolSize int PoolSize int
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration IdleTimeout time.Duration

View File

@ -313,6 +313,10 @@ var _ = Describe("Cluster", func() {
Expect(cnt).To(Equal(int64(1))) Expect(cnt).To(Equal(int64(1)))
}) })
It("should return pool stats", func() {
Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
})
It("should follow redirects", func() { It("should follow redirects", func() {
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())

26
pool.go
View File

@ -16,6 +16,16 @@ var (
errPoolTimeout = errors.New("redis: connection pool timeout") errPoolTimeout = errors.New("redis: connection pool timeout")
) )
// PoolStats contains pool state information and accumulated stats
type PoolStats struct {
Requests uint64 // number of times a connection was requested by the pool
Waits uint64 // number of times our pool had to wait for a connection to avail
Timeouts uint64 // number of times a wait timeout occurred
TotalConns uint64 // the number of total connections in the pool
FreeConns uint64 // the number of free connections in the pool
}
type pool interface { type pool interface {
First() *conn First() *conn
Get() (*conn, bool, error) Get() (*conn, bool, error)
@ -24,6 +34,7 @@ type pool interface {
Len() int Len() int
FreeLen() int FreeLen() int
Close() error Close() error
Stats() PoolStats
} }
type connList struct { type connList struct {
@ -127,6 +138,7 @@ type connPool struct {
opt *Options opt *Options
conns *connList conns *connList
freeConns chan *conn freeConns chan *conn
stats PoolStats
_closed int32 _closed int32
@ -226,6 +238,8 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
return return
} }
atomic.AddUint64(&p.stats.Requests, 1)
// Fetch first non-idle connection, if available. // Fetch first non-idle connection, if available.
if cn = p.First(); cn != nil { if cn = p.First(); cn != nil {
return return
@ -244,10 +258,12 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
} }
// Otherwise, wait for the available connection. // Otherwise, wait for the available connection.
atomic.AddUint64(&p.stats.Waits, 1)
if cn = p.wait(); cn != nil { if cn = p.wait(); cn != nil {
return return
} }
atomic.AddUint64(&p.stats.Timeouts, 1)
err = errPoolTimeout err = errPoolTimeout
return return
} }
@ -298,6 +314,12 @@ func (p *connPool) FreeLen() int {
return len(p.freeConns) return len(p.freeConns)
} }
func (p *connPool) Stats() PoolStats {
p.stats.TotalConns = uint64(p.Len())
p.stats.FreeConns = uint64(p.FreeLen())
return p.stats
}
func (p *connPool) Close() (retErr error) { func (p *connPool) Close() (retErr error) {
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) { if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
return errClosed return errClosed
@ -386,6 +408,8 @@ func (p *singleConnPool) FreeLen() int {
return 0 return 0
} }
func (p *singleConnPool) Stats() PoolStats { return PoolStats{} }
func (p *singleConnPool) Close() error { func (p *singleConnPool) Close() error {
return nil return nil
} }
@ -493,6 +517,8 @@ func (p *stickyConnPool) FreeLen() int {
return 0 return 0
} }
func (p *stickyConnPool) Stats() PoolStats { return PoolStats{} }
func (p *stickyConnPool) Reset(reason error) (err error) { func (p *stickyConnPool) Reset(reason error) (err error) {
p.mx.Lock() p.mx.Lock()
if p.cn != nil { if p.cn != nil {

View File

@ -192,3 +192,9 @@ func NewClient(opt *Options) *Client {
pool := newConnPool(opt) pool := newConnPool(opt)
return newClient(opt, pool) return newClient(opt, pool)
} }
// PoolStats returns connection pool stats
func (c *Client) PoolStats() *PoolStats {
stats := c.baseClient.connPool.Stats()
return &stats
}

View File

@ -35,6 +35,10 @@ var _ = Describe("Client", func() {
Expect(val).To(Equal("PONG")) Expect(val).To(Equal("PONG"))
}) })
It("should return pool stats", func() {
Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
})
It("should support custom dialers", func() { It("should support custom dialers", func() {
custom := redis.NewClient(&redis.Options{ custom := redis.NewClient(&redis.Options{
Dialer: func() (net.Conn, error) { Dialer: func() (net.Conn, error) {