forked from mirror/redis
Merge pull request #812 from hyfrey/feature/ringhash
Hash function and nreplicas in consistent hash can be set in RingOptions
This commit is contained in:
commit
493945402e
42
ring.go
42
ring.go
|
@ -16,7 +16,8 @@ import (
|
|||
"github.com/go-redis/redis/internal/pool"
|
||||
)
|
||||
|
||||
const nreplicas = 100
|
||||
// Hash is type of hash function used in consistent hash.
|
||||
type Hash consistenthash.Hash
|
||||
|
||||
var errRingShardsDown = errors.New("redis: all ring shards are down")
|
||||
|
||||
|
@ -30,6 +31,27 @@ type RingOptions struct {
|
|||
// Shard is considered down after 3 subsequent failed checks.
|
||||
HeartbeatFrequency time.Duration
|
||||
|
||||
// Hash function used in consistent hash.
|
||||
// Default is crc32.ChecksumIEEE.
|
||||
Hash Hash
|
||||
|
||||
// Number of replicas in consistent hash.
|
||||
// Default is 100 replicas.
|
||||
//
|
||||
// Higher number of replicas will provide less deviation, that is keys will be
|
||||
// distributed to nodes more evenly.
|
||||
//
|
||||
// Following is deviation for common nreplicas:
|
||||
// --------------------------------------------------------
|
||||
// | nreplicas | standard error | 99% confidence interval |
|
||||
// | 10 | 0.3152 | (0.37, 1.98) |
|
||||
// | 100 | 0.0997 | (0.76, 1.28) |
|
||||
// | 1000 | 0.0316 | (0.92, 1.09) |
|
||||
// --------------------------------------------------------
|
||||
//
|
||||
// See https://arxiv.org/abs/1406.2294 for reference
|
||||
HashReplicas int
|
||||
|
||||
// Following options are copied from Options struct.
|
||||
|
||||
OnConnect func(*Conn) error
|
||||
|
@ -56,6 +78,10 @@ func (opt *RingOptions) init() {
|
|||
opt.HeartbeatFrequency = 500 * time.Millisecond
|
||||
}
|
||||
|
||||
if opt.HashReplicas == 0 {
|
||||
opt.HashReplicas = 100
|
||||
}
|
||||
|
||||
switch opt.MinRetryBackoff {
|
||||
case -1:
|
||||
opt.MinRetryBackoff = 0
|
||||
|
@ -133,6 +159,8 @@ func (shard *ringShard) Vote(up bool) bool {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type ringShards struct {
|
||||
opt *RingOptions
|
||||
|
||||
mu sync.RWMutex
|
||||
hash *consistenthash.Map
|
||||
shards map[string]*ringShard // read only
|
||||
|
@ -140,9 +168,9 @@ type ringShards struct {
|
|||
closed bool
|
||||
}
|
||||
|
||||
func newRingShards() *ringShards {
|
||||
func newRingShards(opt *RingOptions) *ringShards {
|
||||
return &ringShards{
|
||||
hash: consistenthash.New(nreplicas, nil),
|
||||
hash: newConsistentHash(opt),
|
||||
shards: make(map[string]*ringShard),
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +266,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
|
|||
|
||||
// rebalance removes dead shards from the Ring.
|
||||
func (c *ringShards) rebalance() {
|
||||
hash := consistenthash.New(nreplicas, nil)
|
||||
hash := newConsistentHash(c.opt)
|
||||
for name, shard := range c.shards {
|
||||
if shard.IsUp() {
|
||||
hash.Add(name)
|
||||
|
@ -305,7 +333,7 @@ func NewRing(opt *RingOptions) *Ring {
|
|||
|
||||
ring := &Ring{
|
||||
opt: opt,
|
||||
shards: newRingShards(),
|
||||
shards: newRingShards(opt),
|
||||
}
|
||||
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
|
||||
|
||||
|
@ -570,3 +598,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
|||
func (c *Ring) Close() error {
|
||||
return c.shards.Close()
|
||||
}
|
||||
|
||||
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
|
||||
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue