diff --git a/ring.go b/ring.go index 7126da6..444acae 100644 --- a/ring.go +++ b/ring.go @@ -310,25 +310,29 @@ func (c *ringShards) Random() (*ringShard, error) { } // Heartbeat monitors state of each shard in the ring. -func (c *ringShards) Heartbeat(frequency time.Duration) { +func (c *ringShards) Heartbeat(ctx context.Context, frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop() - ctx := context.Background() - for range ticker.C { - var rebalance bool + for { + select { + case <-ticker.C: + var rebalance bool - for _, shard := range c.List() { - err := shard.Client.Ping(ctx).Err() - isUp := err == nil || err == pool.ErrPoolTimeout - if shard.Vote(isUp) { - internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard) - rebalance = true + for _, shard := range c.List() { + err := shard.Client.Ping(ctx).Err() + isUp := err == nil || err == pool.ErrPoolTimeout + if shard.Vote(isUp) { + internal.Logger.Printf(ctx, "ring shard state changed: %s", shard) + rebalance = true + } } - } - if rebalance { - c.rebalance() + if rebalance { + c.rebalance() + } + case <-ctx.Done(): + return } } } @@ -387,9 +391,10 @@ func (c *ringShards) Close() error { //------------------------------------------------------------------------------ type ring struct { - opt *RingOptions - shards *ringShards - cmdsInfoCache *cmdsInfoCache //nolint:structcheck + opt *RingOptions + shards *ringShards + cmdsInfoCache *cmdsInfoCache //nolint:structcheck + heartbeatCancelFn context.CancelFunc } // Ring is a Redis client that uses consistent hashing to distribute @@ -415,17 +420,20 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { opt.init() + hbCtx, hbCancel := context.WithCancel(context.Background()) + ring := Ring{ ring: &ring{ - opt: opt, - shards: newRingShards(opt), + opt: opt, + shards: newRingShards(opt), + heartbeatCancelFn: hbCancel, }, } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdable = ring.Process - go ring.shards.Heartbeat(opt.HeartbeatFrequency) + go ring.shards.Heartbeat(hbCtx, opt.HeartbeatFrequency) return &ring } @@ -713,5 +721,7 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er // It is rare to Close a Ring, as the Ring is meant to be long-lived // and shared between many goroutines. func (c *Ring) Close() error { + c.heartbeatCancelFn() + return c.shards.Close() }