From f032c126db3e2c1a239ce1790b0ab81994df75cf Mon Sep 17 00:00:00 2001 From: Max Riveiro Date: Thu, 14 Jul 2022 22:32:31 +0300 Subject: [PATCH] fix: provide a signal channel to end heartbeat goroutine --- ring.go | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/ring.go b/ring.go index 7126da6..5ea4ae5 100644 --- a/ring.go +++ b/ring.go @@ -310,25 +310,30 @@ func (c *ringShards) Random() (*ringShard, error) { } // Heartbeat monitors state of each shard in the ring. -func (c *ringShards) Heartbeat(frequency time.Duration) { +func (c *ringShards) Heartbeat(frequency time.Duration, closeCh chan struct{}) { ticker := time.NewTicker(frequency) defer ticker.Stop() ctx := context.Background() - for range ticker.C { - var rebalance bool + for { + select { + case <-ticker.C: + var rebalance bool - for _, shard := range c.List() { - err := shard.Client.Ping(ctx).Err() - isUp := err == nil || err == pool.ErrPoolTimeout - if shard.Vote(isUp) { - internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard) - rebalance = true + for _, shard := range c.List() { + err := shard.Client.Ping(ctx).Err() + isUp := err == nil || err == pool.ErrPoolTimeout + if shard.Vote(isUp) { + internal.Logger.Printf(ctx, "ring shard state changed: %s", shard) + rebalance = true + } } - } - if rebalance { - c.rebalance() + if rebalance { + c.rebalance() + } + case <-closeCh: + return } } } @@ -387,9 +392,10 @@ func (c *ringShards) Close() error { //------------------------------------------------------------------------------ type ring struct { - opt *RingOptions - shards *ringShards - cmdsInfoCache *cmdsInfoCache //nolint:structcheck + opt *RingOptions + shards *ringShards + cmdsInfoCache *cmdsInfoCache //nolint:structcheck + hearbeatCloseSignal chan struct{} } // Ring is a Redis client that uses consistent hashing to distribute @@ -415,17 +421,20 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { opt.init() + hearbeatCloseSignal := make(chan struct{}) + ring := Ring{ ring: &ring{ - opt: opt, - shards: newRingShards(opt), + opt: opt, + shards: newRingShards(opt), + hearbeatCloseSignal: hearbeatCloseSignal, }, } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdable = ring.Process - go ring.shards.Heartbeat(opt.HeartbeatFrequency) + go ring.shards.Heartbeat(opt.HeartbeatFrequency, hearbeatCloseSignal) return &ring } @@ -713,5 +722,6 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er // It is rare to Close a Ring, as the Ring is meant to be long-lived // and shared between many goroutines. func (c *Ring) Close() error { + close(c.hearbeatCloseSignal) return c.shards.Close() }