From 2a0840b680b10bb94e07f2956b4794f723766d60 Mon Sep 17 00:00:00 2001 From: hyfrey Date: Thu, 19 Jul 2018 17:10:40 +0800 Subject: [PATCH] Hash function and nreplicas used in consistent hash can be set in RingOptions --- ring.go | 49 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/ring.go b/ring.go index b47a109..99783b3 100644 --- a/ring.go +++ b/ring.go @@ -16,7 +16,8 @@ import ( "github.com/go-redis/redis/internal/pool" ) -const nreplicas = 100 +// Hash is type of hash function used in consistent hash +type Hash consistenthash.Hash var errRingShardsDown = errors.New("redis: all ring shards are down") @@ -30,6 +31,24 @@ type RingOptions struct { // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration + // Hash function used in consistent hash, will use crc32.ChecksumIEEE + // from hash/crc32 package if not specified + HashFn Hash + + // Number of replicas in consistent hash, default value is 100 + // higher number of replicas will provide less deviation, that is keys will be + // distributed to nodes more evenly. + // + // Following is deviation for common nreplicas: + // -------------------------------------------------------- + // | nreplicas | standard error | 99% confidence interval | + // | 10 | 0.3152 | (0.37, 1.98) | + // | 100 | 0.0997 | (0.76, 1.28) | + // | 1000 | 0.0316 | (0.92, 1.09) | + // -------------------------------------------------------- + // See https://arxiv.org/abs/1406.2294 for reference + Nreplicas int + // Following options are copied from Options struct. OnConnect func(*Conn) error @@ -56,6 +75,10 @@ func (opt *RingOptions) init() { opt.HeartbeatFrequency = 500 * time.Millisecond } + if opt.Nreplicas == 0 { + opt.Nreplicas = 100 + } + switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -133,17 +156,21 @@ func (shard *ringShard) Vote(up bool) bool { //------------------------------------------------------------------------------ type ringShards struct { - mu sync.RWMutex - hash *consistenthash.Map - shards map[string]*ringShard // read only - list []*ringShard // read only - closed bool + mu sync.RWMutex + nreplicas int + hashfn Hash + hash *consistenthash.Map + shards map[string]*ringShard // read only + list []*ringShard // read only + closed bool } -func newRingShards() *ringShards { +func newRingShards(nreplicas int, fn Hash) *ringShards { return &ringShards{ - hash: consistenthash.New(nreplicas, nil), - shards: make(map[string]*ringShard), + nreplicas: nreplicas, + hashfn: fn, + hash: consistenthash.New(nreplicas, consistenthash.Hash(fn)), + shards: make(map[string]*ringShard), } } @@ -238,7 +265,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { // rebalance removes dead shards from the Ring. func (c *ringShards) rebalance() { - hash := consistenthash.New(nreplicas, nil) + hash := consistenthash.New(c.nreplicas, consistenthash.Hash(c.hashfn)) for name, shard := range c.shards { if shard.IsUp() { hash.Add(name) @@ -305,7 +332,7 @@ func NewRing(opt *RingOptions) *Ring { ring := &Ring{ opt: opt, - shards: newRingShards(), + shards: newRingShards(opt.Nreplicas, opt.HashFn), } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)