forked from mirror/redis
Hash function and nreplicas used in consistent hash can be set in RingOptions
This commit is contained in:
parent
ee41b90923
commit
2a0840b680
49
ring.go
49
ring.go
|
@ -16,7 +16,8 @@ import (
|
||||||
"github.com/go-redis/redis/internal/pool"
|
"github.com/go-redis/redis/internal/pool"
|
||||||
)
|
)
|
||||||
|
|
||||||
const nreplicas = 100
|
// Hash is type of hash function used in consistent hash
|
||||||
|
type Hash consistenthash.Hash
|
||||||
|
|
||||||
var errRingShardsDown = errors.New("redis: all ring shards are down")
|
var errRingShardsDown = errors.New("redis: all ring shards are down")
|
||||||
|
|
||||||
|
@ -30,6 +31,24 @@ type RingOptions struct {
|
||||||
// Shard is considered down after 3 subsequent failed checks.
|
// Shard is considered down after 3 subsequent failed checks.
|
||||||
HeartbeatFrequency time.Duration
|
HeartbeatFrequency time.Duration
|
||||||
|
|
||||||
|
// Hash function used in consistent hash, will use crc32.ChecksumIEEE
|
||||||
|
// from hash/crc32 package if not specified
|
||||||
|
HashFn Hash
|
||||||
|
|
||||||
|
// Number of replicas in consistent hash, default value is 100
|
||||||
|
// higher number of replicas will provide less deviation, that is keys will be
|
||||||
|
// distributed to nodes more evenly.
|
||||||
|
//
|
||||||
|
// Following is deviation for common nreplicas:
|
||||||
|
// --------------------------------------------------------
|
||||||
|
// | nreplicas | standard error | 99% confidence interval |
|
||||||
|
// | 10 | 0.3152 | (0.37, 1.98) |
|
||||||
|
// | 100 | 0.0997 | (0.76, 1.28) |
|
||||||
|
// | 1000 | 0.0316 | (0.92, 1.09) |
|
||||||
|
// --------------------------------------------------------
|
||||||
|
// See https://arxiv.org/abs/1406.2294 for reference
|
||||||
|
Nreplicas int
|
||||||
|
|
||||||
// Following options are copied from Options struct.
|
// Following options are copied from Options struct.
|
||||||
|
|
||||||
OnConnect func(*Conn) error
|
OnConnect func(*Conn) error
|
||||||
|
@ -56,6 +75,10 @@ func (opt *RingOptions) init() {
|
||||||
opt.HeartbeatFrequency = 500 * time.Millisecond
|
opt.HeartbeatFrequency = 500 * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opt.Nreplicas == 0 {
|
||||||
|
opt.Nreplicas = 100
|
||||||
|
}
|
||||||
|
|
||||||
switch opt.MinRetryBackoff {
|
switch opt.MinRetryBackoff {
|
||||||
case -1:
|
case -1:
|
||||||
opt.MinRetryBackoff = 0
|
opt.MinRetryBackoff = 0
|
||||||
|
@ -133,17 +156,21 @@ func (shard *ringShard) Vote(up bool) bool {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type ringShards struct {
|
type ringShards struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
hash *consistenthash.Map
|
nreplicas int
|
||||||
shards map[string]*ringShard // read only
|
hashfn Hash
|
||||||
list []*ringShard // read only
|
hash *consistenthash.Map
|
||||||
closed bool
|
shards map[string]*ringShard // read only
|
||||||
|
list []*ringShard // read only
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRingShards() *ringShards {
|
func newRingShards(nreplicas int, fn Hash) *ringShards {
|
||||||
return &ringShards{
|
return &ringShards{
|
||||||
hash: consistenthash.New(nreplicas, nil),
|
nreplicas: nreplicas,
|
||||||
shards: make(map[string]*ringShard),
|
hashfn: fn,
|
||||||
|
hash: consistenthash.New(nreplicas, consistenthash.Hash(fn)),
|
||||||
|
shards: make(map[string]*ringShard),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,7 +265,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
|
||||||
|
|
||||||
// rebalance removes dead shards from the Ring.
|
// rebalance removes dead shards from the Ring.
|
||||||
func (c *ringShards) rebalance() {
|
func (c *ringShards) rebalance() {
|
||||||
hash := consistenthash.New(nreplicas, nil)
|
hash := consistenthash.New(c.nreplicas, consistenthash.Hash(c.hashfn))
|
||||||
for name, shard := range c.shards {
|
for name, shard := range c.shards {
|
||||||
if shard.IsUp() {
|
if shard.IsUp() {
|
||||||
hash.Add(name)
|
hash.Add(name)
|
||||||
|
@ -305,7 +332,7 @@ func NewRing(opt *RingOptions) *Ring {
|
||||||
|
|
||||||
ring := &Ring{
|
ring := &Ring{
|
||||||
opt: opt,
|
opt: opt,
|
||||||
shards: newRingShards(),
|
shards: newRingShards(opt.Nreplicas, opt.HashFn),
|
||||||
}
|
}
|
||||||
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
|
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue