forked from mirror/redis
Add pool instrumentation.
This commit is contained in:
parent
dd1ac33826
commit
f7a4bd5023
17
cluster.go
17
cluster.go
|
@ -56,6 +56,22 @@ func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
|
|||
return client.Watch(keys...)
|
||||
}
|
||||
|
||||
// PoolStats returns accumulated connection pool stats
|
||||
func (c *ClusterClient) PoolStats() *PoolStats {
|
||||
acc := PoolStats{}
|
||||
c.clientsMx.RLock()
|
||||
for _, client := range c.clients {
|
||||
m := client.PoolStats()
|
||||
acc.TotalConns += m.TotalConns
|
||||
acc.FreeConns += m.FreeConns
|
||||
acc.Requests += m.Requests
|
||||
acc.Waits += m.Waits
|
||||
acc.Timeouts += m.Timeouts
|
||||
}
|
||||
c.clientsMx.RUnlock()
|
||||
return &acc
|
||||
}
|
||||
|
||||
// Close closes the cluster client, releasing any open resources.
|
||||
//
|
||||
// It is rare to Close a ClusterClient, as the ClusterClient is meant
|
||||
|
@ -306,6 +322,7 @@ type ClusterOptions struct {
|
|||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
|
||||
// PoolSize applies per redis node and not for the whole cluster.
|
||||
PoolSize int
|
||||
PoolTimeout time.Duration
|
||||
IdleTimeout time.Duration
|
||||
|
|
|
@ -313,6 +313,10 @@ var _ = Describe("Cluster", func() {
|
|||
Expect(cnt).To(Equal(int64(1)))
|
||||
})
|
||||
|
||||
It("should return pool stats", func() {
|
||||
Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
|
||||
})
|
||||
|
||||
It("should follow redirects", func() {
|
||||
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
|
||||
|
||||
|
|
26
pool.go
26
pool.go
|
@ -16,6 +16,16 @@ var (
|
|||
errPoolTimeout = errors.New("redis: connection pool timeout")
|
||||
)
|
||||
|
||||
// PoolStats contains pool state information and accumulated stats
|
||||
type PoolStats struct {
|
||||
Requests uint64 // number of times a connection was requested by the pool
|
||||
Waits uint64 // number of times our pool had to wait for a connection to avail
|
||||
Timeouts uint64 // number of times a wait timeout occurred
|
||||
|
||||
TotalConns uint64 // the number of total connections in the pool
|
||||
FreeConns uint64 // the number of free connections in the pool
|
||||
}
|
||||
|
||||
type pool interface {
|
||||
First() *conn
|
||||
Get() (*conn, bool, error)
|
||||
|
@ -24,6 +34,7 @@ type pool interface {
|
|||
Len() int
|
||||
FreeLen() int
|
||||
Close() error
|
||||
Stats() PoolStats
|
||||
}
|
||||
|
||||
type connList struct {
|
||||
|
@ -127,6 +138,7 @@ type connPool struct {
|
|||
opt *Options
|
||||
conns *connList
|
||||
freeConns chan *conn
|
||||
stats PoolStats
|
||||
|
||||
_closed int32
|
||||
|
||||
|
@ -226,6 +238,8 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
atomic.AddUint64(&p.stats.Requests, 1)
|
||||
|
||||
// Fetch first non-idle connection, if available.
|
||||
if cn = p.First(); cn != nil {
|
||||
return
|
||||
|
@ -244,10 +258,12 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
|
|||
}
|
||||
|
||||
// Otherwise, wait for the available connection.
|
||||
atomic.AddUint64(&p.stats.Waits, 1)
|
||||
if cn = p.wait(); cn != nil {
|
||||
return
|
||||
}
|
||||
|
||||
atomic.AddUint64(&p.stats.Timeouts, 1)
|
||||
err = errPoolTimeout
|
||||
return
|
||||
}
|
||||
|
@ -298,6 +314,12 @@ func (p *connPool) FreeLen() int {
|
|||
return len(p.freeConns)
|
||||
}
|
||||
|
||||
func (p *connPool) Stats() PoolStats {
|
||||
p.stats.TotalConns = uint64(p.Len())
|
||||
p.stats.FreeConns = uint64(p.FreeLen())
|
||||
return p.stats
|
||||
}
|
||||
|
||||
func (p *connPool) Close() (retErr error) {
|
||||
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
|
||||
return errClosed
|
||||
|
@ -386,6 +408,8 @@ func (p *singleConnPool) FreeLen() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Stats() PoolStats { return PoolStats{} }
|
||||
|
||||
func (p *singleConnPool) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
@ -493,6 +517,8 @@ func (p *stickyConnPool) FreeLen() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (p *stickyConnPool) Stats() PoolStats { return PoolStats{} }
|
||||
|
||||
func (p *stickyConnPool) Reset(reason error) (err error) {
|
||||
p.mx.Lock()
|
||||
if p.cn != nil {
|
||||
|
|
6
redis.go
6
redis.go
|
@ -192,3 +192,9 @@ func NewClient(opt *Options) *Client {
|
|||
pool := newConnPool(opt)
|
||||
return newClient(opt, pool)
|
||||
}
|
||||
|
||||
// PoolStats returns connection pool stats
|
||||
func (c *Client) PoolStats() *PoolStats {
|
||||
stats := c.baseClient.connPool.Stats()
|
||||
return &stats
|
||||
}
|
||||
|
|
|
@ -35,6 +35,10 @@ var _ = Describe("Client", func() {
|
|||
Expect(val).To(Equal("PONG"))
|
||||
})
|
||||
|
||||
It("should return pool stats", func() {
|
||||
Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
|
||||
})
|
||||
|
||||
It("should support custom dialers", func() {
|
||||
custom := redis.NewClient(&redis.Options{
|
||||
Dialer: func() (net.Conn, error) {
|
||||
|
|
Loading…
Reference in New Issue