diff --git a/ring.go b/ring.go index b47a109..5cbfb9b 100644 --- a/ring.go +++ b/ring.go @@ -16,7 +16,8 @@ import ( "github.com/go-redis/redis/internal/pool" ) -const nreplicas = 100 +// Hash is type of hash function used in consistent hash. +type Hash consistenthash.Hash var errRingShardsDown = errors.New("redis: all ring shards are down") @@ -30,6 +31,27 @@ type RingOptions struct { // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration + // Hash function used in consistent hash. + // Default is crc32.ChecksumIEEE. + Hash Hash + + // Number of replicas in consistent hash. + // Default is 100 replicas. + // + // Higher number of replicas will provide less deviation, that is keys will be + // distributed to nodes more evenly. + // + // Following is deviation for common nreplicas: + // -------------------------------------------------------- + // | nreplicas | standard error | 99% confidence interval | + // | 10 | 0.3152 | (0.37, 1.98) | + // | 100 | 0.0997 | (0.76, 1.28) | + // | 1000 | 0.0316 | (0.92, 1.09) | + // -------------------------------------------------------- + // + // See https://arxiv.org/abs/1406.2294 for reference + HashReplicas int + // Following options are copied from Options struct. OnConnect func(*Conn) error @@ -56,6 +78,10 @@ func (opt *RingOptions) init() { opt.HeartbeatFrequency = 500 * time.Millisecond } + if opt.HashReplicas == 0 { + opt.HashReplicas = 100 + } + switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -133,6 +159,8 @@ func (shard *ringShard) Vote(up bool) bool { //------------------------------------------------------------------------------ type ringShards struct { + opt *RingOptions + mu sync.RWMutex hash *consistenthash.Map shards map[string]*ringShard // read only @@ -140,9 +168,9 @@ type ringShards struct { closed bool } -func newRingShards() *ringShards { +func newRingShards(opt *RingOptions) *ringShards { return &ringShards{ - hash: consistenthash.New(nreplicas, nil), + hash: newConsistentHash(opt), shards: make(map[string]*ringShard), } } @@ -238,7 +266,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { // rebalance removes dead shards from the Ring. func (c *ringShards) rebalance() { - hash := consistenthash.New(nreplicas, nil) + hash := newConsistentHash(c.opt) for name, shard := range c.shards { if shard.IsUp() { hash.Add(name) @@ -305,7 +333,7 @@ func NewRing(opt *RingOptions) *Ring { ring := &Ring{ opt: opt, - shards: newRingShards(), + shards: newRingShards(opt), } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) @@ -570,3 +598,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Ring) Close() error { return c.shards.Close() } + +func newConsistentHash(opt *RingOptions) *consistenthash.Map { + return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) +}