Merge pull request #639 from go-redis/fix/misses

Replace PoolStats.Requests with PoolStats.Misses
This commit is contained in:
Vladimir Mihailenco 2017-09-22 12:38:59 +03:00 committed by GitHub
commit 975882d73d
6 changed files with 49 additions and 35 deletions

View File

@ -801,8 +801,8 @@ func (c *ClusterClient) PoolStats() *PoolStats {
for _, node := range state.masters { for _, node := range state.masters {
s := node.Client.connPool.Stats() s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits acc.Hits += s.Hits
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns acc.TotalConns += s.TotalConns
@ -812,8 +812,8 @@ func (c *ClusterClient) PoolStats() *PoolStats {
for _, node := range state.slaves { for _, node := range state.slaves {
s := node.Client.connPool.Stats() s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits acc.Hits += s.Hits
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns acc.TotalConns += s.TotalConns

View File

@ -36,9 +36,9 @@ var _ = Describe("Commands", func() {
Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
stats := client.Pool().Stats() stats := client.PoolStats()
Expect(stats.Requests).To(Equal(uint32(2)))
Expect(stats.Hits).To(Equal(uint32(1))) Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Misses).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.Timeouts).To(Equal(uint32(0)))
Expect(stats.TotalConns).To(Equal(uint32(1))) Expect(stats.TotalConns).To(Equal(uint32(1)))
Expect(stats.FreeConns).To(Equal(uint32(1))) Expect(stats.FreeConns).To(Equal(uint32(1)))
@ -1391,8 +1391,8 @@ var _ = Describe("Commands", func() {
Expect(client.Ping().Err()).NotTo(HaveOccurred()) Expect(client.Ping().Err()).NotTo(HaveOccurred())
stats := client.PoolStats() stats := client.PoolStats()
Expect(stats.Requests).To(Equal(uint32(3)))
Expect(stats.Hits).To(Equal(uint32(1))) Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Misses).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.Timeouts).To(Equal(uint32(0)))
}) })

View File

@ -23,8 +23,8 @@ var timers = sync.Pool{
// Stats contains pool state information and accumulated stats. // Stats contains pool state information and accumulated stats.
type Stats struct { type Stats struct {
Requests uint32 // number of times a connection was requested by the pool
Hits uint32 // number of times free connection was found in the pool Hits uint32 // number of times free connection was found in the pool
Misses uint32 // number of times free connection was NOT found in the pool
Timeouts uint32 // number of times a wait timeout occurred Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool TotalConns uint32 // number of total connections in the pool
@ -151,8 +151,6 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
return nil, false, ErrClosed return nil, false, ErrClosed
} }
atomic.AddUint32(&p.stats.Requests, 1)
select { select {
case p.queue <- struct{}{}: case p.queue <- struct{}{}:
default: default:
@ -190,6 +188,8 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
return cn, false, nil return cn, false, nil
} }
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.NewConn() newcn, err := p.NewConn()
if err != nil { if err != nil {
<-p.queue <-p.queue
@ -266,8 +266,8 @@ func (p *ConnPool) FreeLen() int {
func (p *ConnPool) Stats() *Stats { func (p *ConnPool) Stats() *Stats {
return &Stats{ return &Stats{
Requests: atomic.LoadUint32(&p.stats.Requests),
Hits: atomic.LoadUint32(&p.stats.Hits), Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts), Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()), TotalConns: uint32(p.Len()),

View File

@ -95,8 +95,8 @@ var _ = Describe("pool", func() {
Expect(pool.FreeLen()).To(Equal(1)) Expect(pool.FreeLen()).To(Equal(1))
stats := pool.Stats() stats := pool.Stats()
Expect(stats.Requests).To(Equal(uint32(4)))
Expect(stats.Hits).To(Equal(uint32(2))) Expect(stats.Hits).To(Equal(uint32(2)))
Expect(stats.Misses).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.Timeouts).To(Equal(uint32(0)))
}) })
@ -112,16 +112,16 @@ var _ = Describe("pool", func() {
Expect(pool.FreeLen()).To(Equal(1)) Expect(pool.FreeLen()).To(Equal(1))
stats := pool.Stats() stats := pool.Stats()
Expect(stats.Requests).To(Equal(uint32(101)))
Expect(stats.Hits).To(Equal(uint32(100))) Expect(stats.Hits).To(Equal(uint32(100)))
Expect(stats.Misses).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.Timeouts).To(Equal(uint32(0)))
}) })
It("removes idle connections", func() { It("removes idle connections", func() {
stats := client.PoolStats() stats := client.PoolStats()
Expect(stats).To(Equal(&redis.PoolStats{ Expect(stats).To(Equal(&redis.PoolStats{
Requests: 1,
Hits: 0, Hits: 0,
Misses: 1,
Timeouts: 0, Timeouts: 0,
TotalConns: 1, TotalConns: 1,
FreeConns: 1, FreeConns: 1,
@ -132,8 +132,8 @@ var _ = Describe("pool", func() {
stats = client.PoolStats() stats = client.PoolStats()
Expect(stats).To(Equal(&redis.PoolStats{ Expect(stats).To(Equal(&redis.PoolStats{
Requests: 1,
Hits: 0, Hits: 0,
Misses: 1,
Timeouts: 0, Timeouts: 0,
TotalConns: 0, TotalConns: 0,
FreeConns: 0, FreeConns: 0,

View File

@ -68,7 +68,7 @@ var _ = Describe("PubSub", func() {
} }
stats := client.PoolStats() stats := client.PoolStats()
Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) Expect(stats.Misses).To(Equal(uint32(2)))
}) })
It("should pub/sub channels", func() { It("should pub/sub channels", func() {
@ -191,7 +191,7 @@ var _ = Describe("PubSub", func() {
} }
stats := client.PoolStats() stats := client.PoolStats()
Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) Expect(stats.Misses).To(Equal(uint32(2)))
}) })
It("should ping/pong", func() { It("should ping/pong", func() {
@ -290,8 +290,8 @@ var _ = Describe("PubSub", func() {
Eventually(done).Should(Receive()) Eventually(done).Should(Receive())
stats := client.PoolStats() stats := client.PoolStats()
Expect(stats.Requests).To(Equal(uint32(2)))
Expect(stats.Hits).To(Equal(uint32(1))) Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Misses).To(Equal(uint32(1)))
}) })
It("returns an error when subscribe fails", func() { It("returns an error when subscribe fails", func() {

52
ring.go
View File

@ -145,9 +145,10 @@ type Ring struct {
opt *RingOptions opt *RingOptions
nreplicas int nreplicas int
mu sync.RWMutex mu sync.RWMutex
hash *consistenthash.Map hash *consistenthash.Map
shards map[string]*ringShard shards map[string]*ringShard
shardsList []*ringShard
cmdsInfoOnce internal.Once cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo cmdsInfo map[string]*CommandInfo
@ -169,12 +170,21 @@ func NewRing(opt *RingOptions) *Ring {
for name, addr := range opt.Addrs { for name, addr := range opt.Addrs {
clopt := opt.clientOptions() clopt := opt.clientOptions()
clopt.Addr = addr clopt.Addr = addr
ring.addClient(name, NewClient(clopt)) ring.addShard(name, NewClient(clopt))
} }
go ring.heartbeat() go ring.heartbeat()
return ring return ring
} }
func (c *Ring) addShard(name string, cl *Client) {
shard := &ringShard{Client: cl}
c.mu.Lock()
c.hash.Add(name)
c.shards[name] = shard
c.shardsList = append(c.shardsList, shard)
c.mu.Unlock()
}
// Options returns read-only Options that were used to create the client. // Options returns read-only Options that were used to create the client.
func (c *Ring) Options() *RingOptions { func (c *Ring) Options() *RingOptions {
return c.opt return c.opt
@ -186,11 +196,15 @@ func (c *Ring) retryBackoff(attempt int) time.Duration {
// PoolStats returns accumulated connection pool stats. // PoolStats returns accumulated connection pool stats.
func (c *Ring) PoolStats() *PoolStats { func (c *Ring) PoolStats() *PoolStats {
c.mu.RLock()
shards := c.shardsList
c.mu.RUnlock()
var acc PoolStats var acc PoolStats
for _, shard := range c.shards { for _, shard := range shards {
s := shard.Client.connPool.Stats() s := shard.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits acc.Hits += s.Hits
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns acc.FreeConns += s.FreeConns
@ -229,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub {
// ForEachShard concurrently calls the fn on each live shard in the ring. // ForEachShard concurrently calls the fn on each live shard in the ring.
// It returns the first error if any. // It returns the first error if any.
func (c *Ring) ForEachShard(fn func(client *Client) error) error { func (c *Ring) ForEachShard(fn func(client *Client) error) error {
c.mu.RLock()
shards := c.shardsList
c.mu.RUnlock()
var wg sync.WaitGroup var wg sync.WaitGroup
errCh := make(chan error, 1) errCh := make(chan error, 1)
for _, shard := range c.shards { for _, shard := range shards {
if shard.IsDown() { if shard.IsDown() {
continue continue
} }
@ -261,10 +279,11 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
func (c *Ring) cmdInfo(name string) *CommandInfo { func (c *Ring) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error { err := c.cmdsInfoOnce.Do(func() error {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() shards := c.shardsList
c.mu.RUnlock()
var firstErr error var firstErr error
for _, shard := range c.shards { for _, shard := range shards {
cmdsInfo, err := shard.Client.Command().Result() cmdsInfo, err := shard.Client.Command().Result()
if err == nil { if err == nil {
c.cmdsInfo = cmdsInfo c.cmdsInfo = cmdsInfo
@ -286,13 +305,6 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
return info return info
} }
func (c *Ring) addClient(name string, cl *Client) {
c.mu.Lock()
c.hash.Add(name)
c.shards[name] = &ringShard{Client: cl}
c.mu.Unlock()
}
func (c *Ring) shardByKey(key string) (*ringShard, error) { func (c *Ring) shardByKey(key string) (*ringShard, error) {
key = hashtag.Key(key) key = hashtag.Key(key)
@ -372,7 +384,10 @@ func (c *Ring) heartbeat() {
break break
} }
for _, shard := range c.shards { shards := c.shardsList
c.mu.RUnlock()
for _, shard := range shards {
err := shard.Client.Ping().Err() err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
internal.Logf("ring shard state changed: %s", shard) internal.Logf("ring shard state changed: %s", shard)
@ -380,8 +395,6 @@ func (c *Ring) heartbeat() {
} }
} }
c.mu.RUnlock()
if rebalance { if rebalance {
c.rebalance() c.rebalance()
} }
@ -409,6 +422,7 @@ func (c *Ring) Close() error {
} }
c.hash = nil c.hash = nil
c.shards = nil c.shards = nil
c.shardsList = nil
return firstErr return firstErr
} }