mirror of https://github.com/go-redis/redis.git
fix: replace heartbeat signal channel with context.WithCancel
This commit is contained in:
parent
f032c126db
commit
20d0ca235e
26
ring.go
26
ring.go
|
@ -310,11 +310,10 @@ func (c *ringShards) Random() (*ringShard, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heartbeat monitors state of each shard in the ring.
|
// Heartbeat monitors state of each shard in the ring.
|
||||||
func (c *ringShards) Heartbeat(frequency time.Duration, closeCh chan struct{}) {
|
func (c *ringShards) Heartbeat(ctx context.Context, frequency time.Duration) {
|
||||||
ticker := time.NewTicker(frequency)
|
ticker := time.NewTicker(frequency)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -332,7 +331,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration, closeCh chan struct{}) {
|
||||||
if rebalance {
|
if rebalance {
|
||||||
c.rebalance()
|
c.rebalance()
|
||||||
}
|
}
|
||||||
case <-closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -392,10 +391,10 @@ func (c *ringShards) Close() error {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type ring struct {
|
type ring struct {
|
||||||
opt *RingOptions
|
opt *RingOptions
|
||||||
shards *ringShards
|
shards *ringShards
|
||||||
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
|
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
|
||||||
hearbeatCloseSignal chan struct{}
|
heartbeatCancelFn context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ring is a Redis client that uses consistent hashing to distribute
|
// Ring is a Redis client that uses consistent hashing to distribute
|
||||||
|
@ -421,20 +420,20 @@ type Ring struct {
|
||||||
func NewRing(opt *RingOptions) *Ring {
|
func NewRing(opt *RingOptions) *Ring {
|
||||||
opt.init()
|
opt.init()
|
||||||
|
|
||||||
hearbeatCloseSignal := make(chan struct{})
|
hbCtx, hbCancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
ring := Ring{
|
ring := Ring{
|
||||||
ring: &ring{
|
ring: &ring{
|
||||||
opt: opt,
|
opt: opt,
|
||||||
shards: newRingShards(opt),
|
shards: newRingShards(opt),
|
||||||
hearbeatCloseSignal: hearbeatCloseSignal,
|
heartbeatCancelFn: hbCancel,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
|
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
|
||||||
ring.cmdable = ring.Process
|
ring.cmdable = ring.Process
|
||||||
|
|
||||||
go ring.shards.Heartbeat(opt.HeartbeatFrequency, hearbeatCloseSignal)
|
go ring.shards.Heartbeat(hbCtx, opt.HeartbeatFrequency)
|
||||||
|
|
||||||
return &ring
|
return &ring
|
||||||
}
|
}
|
||||||
|
@ -722,6 +721,7 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
|
||||||
// It is rare to Close a Ring, as the Ring is meant to be long-lived
|
// It is rare to Close a Ring, as the Ring is meant to be long-lived
|
||||||
// and shared between many goroutines.
|
// and shared between many goroutines.
|
||||||
func (c *Ring) Close() error {
|
func (c *Ring) Close() error {
|
||||||
close(c.hearbeatCloseSignal)
|
c.heartbeatCancelFn()
|
||||||
|
|
||||||
return c.shards.Close()
|
return c.shards.Close()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue