Merge pull request #2153 from kavu/fix/2126_ringshards_panic

fix: Handle panic in ringShards Hash function when Ring got closed
This commit is contained in:
Vladimir Mihailenco 2022-07-28 13:25:06 +03:00 committed by GitHub
commit f99baf4fed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 14 deletions

26
ring.go
View File

@ -259,10 +259,11 @@ func (c *ringShards) Hash(key string) string {
var hash string
c.mu.RLock()
defer c.mu.RUnlock()
if c.numShard > 0 {
hash = c.hash.Get(key)
}
c.mu.RUnlock()
return hash
}
@ -271,27 +272,22 @@ func (c *ringShards) GetByKey(key string) (*ringShard, error) {
key = hashtag.Key(key)
c.mu.RLock()
defer c.mu.RUnlock()
if c.closed {
c.mu.RUnlock()
return nil, pool.ErrClosed
}
if c.numShard == 0 {
c.mu.RUnlock()
return nil, errRingShardsDown
}
hash := c.hash.Get(key)
if hash == "" {
c.mu.RUnlock()
return nil, errRingShardsDown
}
shard := c.shards[hash]
c.mu.RUnlock()
return shard, nil
return c.shards[hash], nil
}
func (c *ringShards) GetByName(shardName string) (*ringShard, error) {
@ -300,9 +296,9 @@ func (c *ringShards) GetByName(shardName string) (*ringShard, error) {
}
c.mu.RLock()
shard := c.shards[shardName]
c.mu.RUnlock()
return shard, nil
defer c.mu.RUnlock()
return c.shards[shardName], nil
}
func (c *ringShards) Random() (*ringShard, error) {
@ -361,9 +357,9 @@ func (c *ringShards) rebalance() {
func (c *ringShards) Len() int {
c.mu.RLock()
l := c.numShard
c.mu.RUnlock()
return l
defer c.mu.RUnlock()
return c.numShard
}
func (c *ringShards) Close() error {
@ -381,8 +377,10 @@ func (c *ringShards) Close() error {
firstErr = err
}
}
c.hash = nil
c.shards = nil
c.numShard = 0
c.list = nil
return firstErr

View File

@ -114,6 +114,21 @@ var _ = Describe("Redis Ring", func() {
})
Describe("pipeline", func() {
It("doesn't panic closed ring, returns error", func() {
pipe := ring.Pipeline()
for i := 0; i < 3; i++ {
err := pipe.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
Expect(ring.Close()).NotTo(HaveOccurred())
Expect(func() {
_, execErr := pipe.Exec(ctx)
Expect(execErr).To(HaveOccurred())
}).NotTo(Panic())
})
It("distributes keys", func() {
pipe := ring.Pipeline()
for i := 0; i < 100; i++ {