fix: provide a signal channel to end heartbeat goroutine

This commit is contained in:
Max Riveiro 2022-07-14 22:32:31 +03:00
parent e061db8c13
commit f032c126db
1 changed files with 28 additions and 18 deletions

46
ring.go
View File

@ -310,25 +310,30 @@ func (c *ringShards) Random() (*ringShard, error) {
}
// Heartbeat monitors state of each shard in the ring.
func (c *ringShards) Heartbeat(frequency time.Duration) {
func (c *ringShards) Heartbeat(frequency time.Duration, closeCh chan struct{}) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
ctx := context.Background()
for range ticker.C {
var rebalance bool
for {
select {
case <-ticker.C:
var rebalance bool
for _, shard := range c.List() {
err := shard.Client.Ping(ctx).Err()
isUp := err == nil || err == pool.ErrPoolTimeout
if shard.Vote(isUp) {
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
rebalance = true
for _, shard := range c.List() {
err := shard.Client.Ping(ctx).Err()
isUp := err == nil || err == pool.ErrPoolTimeout
if shard.Vote(isUp) {
internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
rebalance = true
}
}
}
if rebalance {
c.rebalance()
if rebalance {
c.rebalance()
}
case <-closeCh:
return
}
}
}
@ -387,9 +392,10 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
type ring struct {
opt *RingOptions
shards *ringShards
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
opt *RingOptions
shards *ringShards
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
hearbeatCloseSignal chan struct{}
}
// Ring is a Redis client that uses consistent hashing to distribute
@ -415,17 +421,20 @@ type Ring struct {
func NewRing(opt *RingOptions) *Ring {
opt.init()
hearbeatCloseSignal := make(chan struct{})
ring := Ring{
ring: &ring{
opt: opt,
shards: newRingShards(opt),
opt: opt,
shards: newRingShards(opt),
hearbeatCloseSignal: hearbeatCloseSignal,
},
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.cmdable = ring.Process
go ring.shards.Heartbeat(opt.HeartbeatFrequency)
go ring.shards.Heartbeat(opt.HeartbeatFrequency, hearbeatCloseSignal)
return &ring
}
@ -713,5 +722,6 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func (c *Ring) Close() error {
close(c.hearbeatCloseSignal)
return c.shards.Close()
}