ShardHealthCheckFn renamed to HeartbeatFn

This commit is contained in:
Mykhailo Alipa 2024-03-14 11:38:24 +01:00
parent 61baaf5f5d
commit 7b064d44d6
1 changed files with 8 additions and 10 deletions

18
ring.go
View File

@ -22,8 +22,8 @@ import (
var errRingShardsDown = errors.New("redis: all ring shards are down") var errRingShardsDown = errors.New("redis: all ring shards are down")
// defaultShardHealthCheckFn is the default function used to check the shard liveness // defaultHeartbeatFn is the default function used to check the shard liveness
var defaultShardHealthCheckFn = func(ctx context.Context, client *Client) bool { var defaultHeartbeatFn = func(ctx context.Context, client *Client) bool {
err := client.Ping(ctx).Err() err := client.Ping(ctx).Err()
return err == nil || err == pool.ErrPoolTimeout return err == nil || err == pool.ErrPoolTimeout
} }
@ -34,8 +34,6 @@ type ConsistentHash interface {
Get(string) string Get(string) string
} }
type ShardHealthCheckFn func(ctx context.Context, client *Client) bool
type rendezvousWrapper struct { type rendezvousWrapper struct {
*rendezvous.Rendezvous *rendezvous.Rendezvous
} }
@ -62,13 +60,13 @@ type RingOptions struct {
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn. // ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string ClientName string
// Frequency of executing ShardHealthCheckFn to check shards availability. // Frequency of executing HeartbeatFn to check shards availability.
// Shard is considered down after 3 subsequent failed checks. // Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration HeartbeatFrequency time.Duration
// A function used to check the shard liveness // A function used to check the shard liveness
// if not set, defaults to defaultShardHealthCheckFn // if not set, defaults to defaultHeartbeatFn
ShardHealthCheckFn ShardHealthCheckFn HeartbeatFn func(ctx context.Context, client *Client) bool
// NewConsistentHash returns a consistent hash that is used // NewConsistentHash returns a consistent hash that is used
// to distribute keys across the shards. // to distribute keys across the shards.
@ -125,8 +123,8 @@ func (opt *RingOptions) init() {
opt.HeartbeatFrequency = 500 * time.Millisecond opt.HeartbeatFrequency = 500 * time.Millisecond
} }
if opt.ShardHealthCheckFn == nil { if opt.HeartbeatFn == nil {
opt.ShardHealthCheckFn = defaultShardHealthCheckFn opt.HeartbeatFn = defaultHeartbeatFn
} }
if opt.NewConsistentHash == nil { if opt.NewConsistentHash == nil {
@ -424,7 +422,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {
var rebalance bool var rebalance bool
for _, shard := range c.List() { for _, shard := range c.List() {
isUp := c.opt.ShardHealthCheckFn(ctx, shard.Client) isUp := c.opt.HeartbeatFn(ctx, shard.Client)
if shard.Vote(isUp) { if shard.Vote(isUp) {
internal.Logger.Printf(ctx, "ring shard state changed: %s", shard) internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
rebalance = true rebalance = true