forked from mirror/redis
Compare commits
2 Commits
master
...
rendezvous
Author | SHA1 | Date |
---|---|---|
Vladimir Mihailenco | 039194a5d3 | |
rafaeleyng | dbd3e7ca18 |
|
@ -18,28 +18,28 @@ limitations under the License.
|
||||||
package consistenthash
|
package consistenthash
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"hash/crc32"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/cespare/xxhash"
|
||||||
|
"github.com/go-redis/redis/v7/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Hash func(data []byte) uint32
|
|
||||||
|
|
||||||
type Map struct {
|
type Map struct {
|
||||||
hash Hash
|
hash internal.Hash
|
||||||
replicas int
|
replicas int
|
||||||
keys []int // Sorted
|
keys []uint64 // Sorted
|
||||||
hashMap map[int]string
|
hashMap map[uint64]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(replicas int, fn Hash) *Map {
|
func New(replicas int, fn internal.Hash) *Map {
|
||||||
m := &Map{
|
m := &Map{
|
||||||
replicas: replicas,
|
replicas: replicas,
|
||||||
hash: fn,
|
hash: fn,
|
||||||
hashMap: make(map[int]string),
|
hashMap: make(map[uint64]string),
|
||||||
}
|
}
|
||||||
if m.hash == nil {
|
if m.hash == nil {
|
||||||
m.hash = crc32.ChecksumIEEE
|
m.hash = xxhash.Sum64
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
@ -53,12 +53,14 @@ func (m *Map) IsEmpty() bool {
|
||||||
func (m *Map) Add(keys ...string) {
|
func (m *Map) Add(keys ...string) {
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
for i := 0; i < m.replicas; i++ {
|
for i := 0; i < m.replicas; i++ {
|
||||||
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
|
hash := m.hash([]byte(strconv.Itoa(i) + key))
|
||||||
m.keys = append(m.keys, hash)
|
m.keys = append(m.keys, hash)
|
||||||
m.hashMap[hash] = key
|
m.hashMap[hash] = key
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Ints(m.keys)
|
sort.Slice(m.keys, func(i, j int) bool {
|
||||||
|
return m.keys[i] < m.keys[j]
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gets the closest item in the hash to the provided key.
|
// Gets the closest item in the hash to the provided key.
|
||||||
|
@ -67,7 +69,7 @@ func (m *Map) Get(key string) string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := int(m.hash([]byte(key)))
|
hash := m.hash([]byte(key))
|
||||||
|
|
||||||
// Binary search for appropriate replica.
|
// Binary search for appropriate replica.
|
||||||
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
|
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
|
||||||
|
|
|
@ -26,12 +26,12 @@ func TestHashing(t *testing.T) {
|
||||||
|
|
||||||
// Override the hash function to return easier to reason about values. Assumes
|
// Override the hash function to return easier to reason about values. Assumes
|
||||||
// the keys can be converted to an integer.
|
// the keys can be converted to an integer.
|
||||||
hash := New(3, func(key []byte) uint32 {
|
hash := New(3, func(key []byte) uint64 {
|
||||||
i, err := strconv.Atoi(string(key))
|
i, err := strconv.ParseUint(string(key), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return uint32(i)
|
return i
|
||||||
})
|
})
|
||||||
|
|
||||||
// Given the above hash function, this will give replicas with "hashes":
|
// Given the above hash function, this will give replicas with "hashes":
|
||||||
|
@ -76,6 +76,7 @@ func TestConsistency(t *testing.T) {
|
||||||
t.Errorf("Fetching 'Ben' from both hashes should be the same")
|
t.Errorf("Fetching 'Ben' from both hashes should be the same")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hash1.Add("Ben", "Becky", "Bobby")
|
||||||
hash2.Add("Becky", "Ben", "Bobby")
|
hash2.Add("Becky", "Ben", "Bobby")
|
||||||
|
|
||||||
if hash1.Get("Ben") != hash2.Get("Ben") ||
|
if hash1.Get("Ben") != hash2.Get("Ben") ||
|
||||||
|
@ -108,3 +109,23 @@ func benchmarkGet(b *testing.B, shards int) {
|
||||||
hash.Get(buckets[i&(shards-1)])
|
hash.Get(buckets[i&(shards-1)])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDistribution(t *testing.T) {
|
||||||
|
hash := New(1000, nil)
|
||||||
|
hash.Add("1", "2", "3", "4", "5", "6")
|
||||||
|
|
||||||
|
results := make(map[string]int, 10)
|
||||||
|
|
||||||
|
for i := 0; i < 1000000; i++ {
|
||||||
|
key := strconv.Itoa(i)
|
||||||
|
|
||||||
|
site := hash.Get(key)
|
||||||
|
if val, ok := results[site]; ok {
|
||||||
|
results[site] = val + 1
|
||||||
|
} else {
|
||||||
|
results[site] = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(results)
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
package internal
|
||||||
|
|
||||||
|
type Hash func(data []byte) uint64
|
|
@ -0,0 +1,59 @@
|
||||||
|
package rendezvoushash
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/cespare/xxhash"
|
||||||
|
"github.com/go-redis/redis/v7/internal"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Map struct {
|
||||||
|
hash internal.Hash
|
||||||
|
sites []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(fn internal.Hash) *Map {
|
||||||
|
m := &Map{
|
||||||
|
hash: fn,
|
||||||
|
}
|
||||||
|
if m.hash == nil {
|
||||||
|
m.hash = xxhash.Sum64
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if there are no items available.
|
||||||
|
func (m *Map) IsEmpty() bool {
|
||||||
|
return len(m.sites) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adds some keys to the hash.
|
||||||
|
func (m *Map) Add(sites ...string) {
|
||||||
|
for _, site := range sites {
|
||||||
|
m.sites = append(m.sites, site)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the closest item in the hash to the provided key.
|
||||||
|
func (m *Map) Get(key string) string {
|
||||||
|
if m.IsEmpty() {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the site that, when hashed with the key, yields the largest weight
|
||||||
|
var targetSite string
|
||||||
|
|
||||||
|
var maxWeight uint64
|
||||||
|
buf := make([]byte, len(key), 2*len(key))
|
||||||
|
copy(buf, key)
|
||||||
|
for _, site := range m.sites {
|
||||||
|
buf = buf[:len(key)]
|
||||||
|
buf = append(buf, site...)
|
||||||
|
siteWeight := m.hash(buf)
|
||||||
|
|
||||||
|
if siteWeight > maxWeight {
|
||||||
|
maxWeight = siteWeight
|
||||||
|
targetSite = site
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return targetSite
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package rendezvoushash
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHashing(t *testing.T) {
|
||||||
|
hash := New(nil)
|
||||||
|
hash.Add("site1", "site2", "site3")
|
||||||
|
|
||||||
|
verifyFn := func(cases map[string]string) {
|
||||||
|
for k, v := range cases {
|
||||||
|
site := hash.Get(k)
|
||||||
|
if site != v {
|
||||||
|
t.Errorf("Asking for %s, should have return site %s, returned site %s", k, v, site)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := map[string]string{
|
||||||
|
"key1": "site2",
|
||||||
|
"key2": "site1",
|
||||||
|
"key3": "site2",
|
||||||
|
"key4": "site1",
|
||||||
|
"key5": "site2",
|
||||||
|
"key6": "site3",
|
||||||
|
"key7": "site1",
|
||||||
|
"key8": "site1",
|
||||||
|
"key9": "site3",
|
||||||
|
"key10": "site2",
|
||||||
|
"key11": "site3",
|
||||||
|
"key12": "site1",
|
||||||
|
"key13": "site2",
|
||||||
|
"key14": "site2",
|
||||||
|
"key15": "site3",
|
||||||
|
"key16": "site2",
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyFn(testCases)
|
||||||
|
|
||||||
|
hash.Add("site4")
|
||||||
|
|
||||||
|
// remaps existing keys to all sites
|
||||||
|
testCases["key1"] = "site4"
|
||||||
|
testCases["key2"] = "site4"
|
||||||
|
testCases["key9"] = "site4"
|
||||||
|
testCases["key10"] = "site4"
|
||||||
|
testCases["key11"] = "site4"
|
||||||
|
testCases["key12"] = "site4"
|
||||||
|
testCases["key15"] = "site4"
|
||||||
|
|
||||||
|
// add new keys
|
||||||
|
testCases["key17"] = "site1"
|
||||||
|
testCases["key18"] = "site2"
|
||||||
|
testCases["key19"] = "site4"
|
||||||
|
testCases["key20"] = "site4"
|
||||||
|
testCases["key21"] = "site1"
|
||||||
|
testCases["key22"] = "site2"
|
||||||
|
|
||||||
|
verifyFn(testCases)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
|
||||||
|
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
|
||||||
|
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
|
||||||
|
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }
|
||||||
|
|
||||||
|
func benchmarkGet(b *testing.B, shards int) {
|
||||||
|
|
||||||
|
hash := New(nil)
|
||||||
|
|
||||||
|
var buckets []string
|
||||||
|
for i := 0; i < shards; i++ {
|
||||||
|
buckets = append(buckets, fmt.Sprintf("shard-%d", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
hash.Add(buckets...)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
hash.Get(buckets[i&(shards-1)])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDistribution(t *testing.T) {
|
||||||
|
hash := New(nil)
|
||||||
|
hash.Add("1", "2", "3", "4", "5", "6")
|
||||||
|
|
||||||
|
results := make(map[string]int, 10)
|
||||||
|
|
||||||
|
for i := 0; i < 1000000; i++ {
|
||||||
|
key := strconv.Itoa(i)
|
||||||
|
|
||||||
|
site := hash.Get(key)
|
||||||
|
if val, ok := results[site]; ok {
|
||||||
|
results[site] = val + 1
|
||||||
|
} else {
|
||||||
|
results[site] = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(results)
|
||||||
|
}
|
45
ring.go
45
ring.go
|
@ -14,10 +14,24 @@ import (
|
||||||
"github.com/go-redis/redis/v7/internal/consistenthash"
|
"github.com/go-redis/redis/v7/internal/consistenthash"
|
||||||
"github.com/go-redis/redis/v7/internal/hashtag"
|
"github.com/go-redis/redis/v7/internal/hashtag"
|
||||||
"github.com/go-redis/redis/v7/internal/pool"
|
"github.com/go-redis/redis/v7/internal/pool"
|
||||||
|
"github.com/go-redis/redis/v7/internal/rendezvoushash"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Hash is type of hash function used in consistent hash.
|
// Hash is type of hash function used in hash maps.
|
||||||
type Hash consistenthash.Hash
|
type Hash internal.Hash
|
||||||
|
|
||||||
|
type HashAlgorithm int
|
||||||
|
|
||||||
|
type HashMap interface {
|
||||||
|
IsEmpty() bool
|
||||||
|
Add(keys ...string)
|
||||||
|
Get(key string) string
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
ConsistentHash HashAlgorithm = iota
|
||||||
|
RendezvousHash
|
||||||
|
)
|
||||||
|
|
||||||
var errRingShardsDown = errors.New("redis: all ring shards are down")
|
var errRingShardsDown = errors.New("redis: all ring shards are down")
|
||||||
|
|
||||||
|
@ -35,10 +49,12 @@ type RingOptions struct {
|
||||||
// Shard is considered down after 3 subsequent failed checks.
|
// Shard is considered down after 3 subsequent failed checks.
|
||||||
HeartbeatFrequency time.Duration
|
HeartbeatFrequency time.Duration
|
||||||
|
|
||||||
// Hash function used in consistent hash.
|
// Hash function used in the hash algorithm.
|
||||||
// Default is crc32.ChecksumIEEE.
|
// Default is crc32.ChecksumIEEE.
|
||||||
Hash Hash
|
Hash Hash
|
||||||
|
|
||||||
|
HashAlgorithm HashAlgorithm
|
||||||
|
|
||||||
// Number of replicas in consistent hash.
|
// Number of replicas in consistent hash.
|
||||||
// Default is 100 replicas.
|
// Default is 100 replicas.
|
||||||
//
|
//
|
||||||
|
@ -183,7 +199,7 @@ type ringShards struct {
|
||||||
opt *RingOptions
|
opt *RingOptions
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
hash *consistenthash.Map
|
hash HashMap
|
||||||
shards map[string]*ringShard // read only
|
shards map[string]*ringShard // read only
|
||||||
list []*ringShard // read only
|
list []*ringShard // read only
|
||||||
len int
|
len int
|
||||||
|
@ -194,7 +210,7 @@ func newRingShards(opt *RingOptions) *ringShards {
|
||||||
return &ringShards{
|
return &ringShards{
|
||||||
opt: opt,
|
opt: opt,
|
||||||
|
|
||||||
hash: newConsistentHash(opt),
|
hash: newHash(opt),
|
||||||
shards: make(map[string]*ringShard),
|
shards: make(map[string]*ringShard),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -294,7 +310,7 @@ func (c *ringShards) rebalance() {
|
||||||
shards := c.shards
|
shards := c.shards
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
|
||||||
hash := newConsistentHash(c.opt)
|
hash := newHash(c.opt)
|
||||||
var shardsNum int
|
var shardsNum int
|
||||||
for name, shard := range shards {
|
for name, shard := range shards {
|
||||||
if shard.IsUp() {
|
if shard.IsUp() {
|
||||||
|
@ -346,7 +362,7 @@ type ring struct {
|
||||||
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
|
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ring is a Redis client that uses consistent hashing to distribute
|
// Ring is a Redis client that uses hashing to distribute
|
||||||
// keys across multiple Redis servers (shards). It's safe for
|
// keys across multiple Redis servers (shards). It's safe for
|
||||||
// concurrent use by multiple goroutines.
|
// concurrent use by multiple goroutines.
|
||||||
//
|
//
|
||||||
|
@ -721,6 +737,17 @@ func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
|
||||||
return shards[0].Client.Watch(fn, keys...)
|
return shards[0].Client.Watch(fn, keys...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
|
func newHash(opt *RingOptions) HashMap {
|
||||||
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
|
if opt.HashAlgorithm == RendezvousHash {
|
||||||
|
return newRendezvousHash(opt)
|
||||||
|
}
|
||||||
|
return newConsistentHash(opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConsistentHash(opt *RingOptions) HashMap {
|
||||||
|
return consistenthash.New(opt.HashReplicas, internal.Hash(opt.Hash))
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRendezvousHash(opt *RingOptions) HashMap {
|
||||||
|
return rendezvoushash.New(internal.Hash(opt.Hash))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue