Merge pull request #2157 from kavu/fix/ring_heartbeat_leak

fix: provide a signal channel to end heartbeat goroutine
This commit is contained in:
Vladimir Mihailenco 2022-07-28 13:23:32 +03:00 committed by GitHub
commit 1492628ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 19 deletions

20
ring.go
View File

@ -310,19 +310,20 @@ func (c *ringShards) Random() (*ringShard, error) {
} }
// Heartbeat monitors state of each shard in the ring. // Heartbeat monitors state of each shard in the ring.
func (c *ringShards) Heartbeat(frequency time.Duration) { func (c *ringShards) Heartbeat(ctx context.Context, frequency time.Duration) {
ticker := time.NewTicker(frequency) ticker := time.NewTicker(frequency)
defer ticker.Stop() defer ticker.Stop()
ctx := context.Background() for {
for range ticker.C { select {
case <-ticker.C:
var rebalance bool var rebalance bool
for _, shard := range c.List() { for _, shard := range c.List() {
err := shard.Client.Ping(ctx).Err() err := shard.Client.Ping(ctx).Err()
isUp := err == nil || err == pool.ErrPoolTimeout isUp := err == nil || err == pool.ErrPoolTimeout
if shard.Vote(isUp) { if shard.Vote(isUp) {
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard) internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
rebalance = true rebalance = true
} }
} }
@ -330,6 +331,9 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
if rebalance { if rebalance {
c.rebalance() c.rebalance()
} }
case <-ctx.Done():
return
}
} }
} }
@ -390,6 +394,7 @@ type ring struct {
opt *RingOptions opt *RingOptions
shards *ringShards shards *ringShards
cmdsInfoCache *cmdsInfoCache //nolint:structcheck cmdsInfoCache *cmdsInfoCache //nolint:structcheck
heartbeatCancelFn context.CancelFunc
} }
// Ring is a Redis client that uses consistent hashing to distribute // Ring is a Redis client that uses consistent hashing to distribute
@ -415,17 +420,20 @@ type Ring struct {
func NewRing(opt *RingOptions) *Ring { func NewRing(opt *RingOptions) *Ring {
opt.init() opt.init()
hbCtx, hbCancel := context.WithCancel(context.Background())
ring := Ring{ ring := Ring{
ring: &ring{ ring: &ring{
opt: opt, opt: opt,
shards: newRingShards(opt), shards: newRingShards(opt),
heartbeatCancelFn: hbCancel,
}, },
} }
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.cmdable = ring.Process ring.cmdable = ring.Process
go ring.shards.Heartbeat(opt.HeartbeatFrequency) go ring.shards.Heartbeat(hbCtx, opt.HeartbeatFrequency)
return &ring return &ring
} }
@ -713,5 +721,7 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
// It is rare to Close a Ring, as the Ring is meant to be long-lived // It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines. // and shared between many goroutines.
func (c *Ring) Close() error { func (c *Ring) Close() error {
c.heartbeatCancelFn()
return c.shards.Close() return c.shards.Close()
} }