From bc5f9a68785e14425561c26a1383f7401ee31bf0 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 22 Sep 2017 12:23:46 +0300 Subject: [PATCH] Replace PoolStats.Requests with PoolStats.Misses --- cluster.go | 4 ++-- commands_test.go | 6 ++--- internal/pool/pool.go | 8 +++---- pool_test.go | 8 +++---- pubsub_test.go | 6 ++--- ring.go | 52 +++++++++++++++++++++++++++---------------- 6 files changed, 49 insertions(+), 35 deletions(-) diff --git a/cluster.go b/cluster.go index 6f435251..019a2d44 100644 --- a/cluster.go +++ b/cluster.go @@ -801,8 +801,8 @@ func (c *ClusterClient) PoolStats() *PoolStats { for _, node := range state.masters { s := node.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns @@ -812,8 +812,8 @@ func (c *ClusterClient) PoolStats() *PoolStats { for _, node := range state.slaves { s := node.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns diff --git a/commands_test.go b/commands_test.go index 0811d12a..6b81f23c 100644 --- a/commands_test.go +++ b/commands_test.go @@ -36,9 +36,9 @@ var _ = Describe("Commands", func() { Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) - stats := client.Pool().Stats() - Expect(stats.Requests).To(Equal(uint32(2))) + stats := client.PoolStats() Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(1))) Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.TotalConns).To(Equal(uint32(1))) Expect(stats.FreeConns).To(Equal(uint32(1))) @@ -1391,8 +1391,8 @@ var _ = Describe("Commands", func() { Expect(client.Ping().Err()).NotTo(HaveOccurred()) stats := client.PoolStats() - Expect(stats.Requests).To(Equal(uint32(3))) Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(2))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 26a891bf..836ec104 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -23,8 +23,8 @@ var timers = sync.Pool{ // Stats contains pool state information and accumulated stats. type Stats struct { - Requests uint32 // number of times a connection was requested by the pool Hits uint32 // number of times free connection was found in the pool + Misses uint32 // number of times free connection was NOT found in the pool Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool @@ -151,8 +151,6 @@ func (p *ConnPool) Get() (*Conn, bool, error) { return nil, false, ErrClosed } - atomic.AddUint32(&p.stats.Requests, 1) - select { case p.queue <- struct{}{}: default: @@ -190,6 +188,8 @@ func (p *ConnPool) Get() (*Conn, bool, error) { return cn, false, nil } + atomic.AddUint32(&p.stats.Misses, 1) + newcn, err := p.NewConn() if err != nil { <-p.queue @@ -266,8 +266,8 @@ func (p *ConnPool) FreeLen() int { func (p *ConnPool) Stats() *Stats { return &Stats{ - Requests: atomic.LoadUint32(&p.stats.Requests), Hits: atomic.LoadUint32(&p.stats.Hits), + Misses: atomic.LoadUint32(&p.stats.Misses), Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), diff --git a/pool_test.go b/pool_test.go index 2cc1cdfb..0ca09adc 100644 --- a/pool_test.go +++ b/pool_test.go @@ -95,8 +95,8 @@ var _ = Describe("pool", func() { Expect(pool.FreeLen()).To(Equal(1)) stats := pool.Stats() - Expect(stats.Requests).To(Equal(uint32(4))) Expect(stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(2))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) @@ -112,16 +112,16 @@ var _ = Describe("pool", func() { Expect(pool.FreeLen()).To(Equal(1)) stats := pool.Stats() - Expect(stats.Requests).To(Equal(uint32(101))) Expect(stats.Hits).To(Equal(uint32(100))) + Expect(stats.Misses).To(Equal(uint32(1))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) It("removes idle connections", func() { stats := client.PoolStats() Expect(stats).To(Equal(&redis.PoolStats{ - Requests: 1, Hits: 0, + Misses: 1, Timeouts: 0, TotalConns: 1, FreeConns: 1, @@ -132,8 +132,8 @@ var _ = Describe("pool", func() { stats = client.PoolStats() Expect(stats).To(Equal(&redis.PoolStats{ - Requests: 1, Hits: 0, + Misses: 1, Timeouts: 0, TotalConns: 0, FreeConns: 0, diff --git a/pubsub_test.go b/pubsub_test.go index 1d9dfcb9..6fc04a19 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -68,7 +68,7 @@ var _ = Describe("PubSub", func() { } stats := client.PoolStats() - Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(2))) }) It("should pub/sub channels", func() { @@ -191,7 +191,7 @@ var _ = Describe("PubSub", func() { } stats := client.PoolStats() - Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(2))) }) It("should ping/pong", func() { @@ -290,8 +290,8 @@ var _ = Describe("PubSub", func() { Eventually(done).Should(Receive()) stats := client.PoolStats() - Expect(stats.Requests).To(Equal(uint32(2))) Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(1))) }) It("returns an error when subscribe fails", func() { diff --git a/ring.go b/ring.go index a9314fb5..7b3d2d14 100644 --- a/ring.go +++ b/ring.go @@ -145,9 +145,10 @@ type Ring struct { opt *RingOptions nreplicas int - mu sync.RWMutex - hash *consistenthash.Map - shards map[string]*ringShard + mu sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard + shardsList []*ringShard cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -169,12 +170,21 @@ func NewRing(opt *RingOptions) *Ring { for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr - ring.addClient(name, NewClient(clopt)) + ring.addShard(name, NewClient(clopt)) } go ring.heartbeat() return ring } +func (c *Ring) addShard(name string, cl *Client) { + shard := &ringShard{Client: cl} + c.mu.Lock() + c.hash.Add(name) + c.shards[name] = shard + c.shardsList = append(c.shardsList, shard) + c.mu.Unlock() +} + // Options returns read-only Options that were used to create the client. func (c *Ring) Options() *RingOptions { return c.opt @@ -186,11 +196,15 @@ func (c *Ring) retryBackoff(attempt int) time.Duration { // PoolStats returns accumulated connection pool stats. func (c *Ring) PoolStats() *PoolStats { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var acc PoolStats - for _, shard := range c.shards { + for _, shard := range shards { s := shard.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns @@ -229,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub { // ForEachShard concurrently calls the fn on each live shard in the ring. // It returns the first error if any. func (c *Ring) ForEachShard(fn func(client *Client) error) error { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var wg sync.WaitGroup errCh := make(chan error, 1) - for _, shard := range c.shards { + for _, shard := range shards { if shard.IsDown() { continue } @@ -261,10 +279,11 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { func (c *Ring) cmdInfo(name string) *CommandInfo { err := c.cmdsInfoOnce.Do(func() error { c.mu.RLock() - defer c.mu.RUnlock() + shards := c.shardsList + c.mu.RUnlock() var firstErr error - for _, shard := range c.shards { + for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { c.cmdsInfo = cmdsInfo @@ -286,13 +305,6 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { return info } -func (c *Ring) addClient(name string, cl *Client) { - c.mu.Lock() - c.hash.Add(name) - c.shards[name] = &ringShard{Client: cl} - c.mu.Unlock() -} - func (c *Ring) shardByKey(key string) (*ringShard, error) { key = hashtag.Key(key) @@ -372,7 +384,10 @@ func (c *Ring) heartbeat() { break } - for _, shard := range c.shards { + shards := c.shardsList + c.mu.RUnlock() + + for _, shard := range shards { err := shard.Client.Ping().Err() if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { internal.Logf("ring shard state changed: %s", shard) @@ -380,8 +395,6 @@ func (c *Ring) heartbeat() { } } - c.mu.RUnlock() - if rebalance { c.rebalance() } @@ -409,6 +422,7 @@ func (c *Ring) Close() error { } c.hash = nil c.shards = nil + c.shardsList = nil return firstErr }