diff --git a/internal/consistenthash/consistenthash.go b/internal/consistenthash/consistenthash.go index a9c56f0..fd746b3 100644 --- a/internal/consistenthash/consistenthash.go +++ b/internal/consistenthash/consistenthash.go @@ -21,18 +21,19 @@ import ( "hash/crc32" "sort" "strconv" + + "github.com/go-redis/redis/v7/internal" ) -type Hash func(data []byte) uint32 type Map struct { - hash Hash + hash internal.Hash replicas int keys []int // Sorted hashMap map[int]string } -func New(replicas int, fn Hash) *Map { +func New(replicas int, fn internal.Hash) *Map { m := &Map{ replicas: replicas, hash: fn, diff --git a/internal/hash.go b/internal/hash.go new file mode 100644 index 0000000..64ccf22 --- /dev/null +++ b/internal/hash.go @@ -0,0 +1,3 @@ +package internal + +type Hash func(data []byte) uint32 diff --git a/internal/rendezvoushash/rendezvoushash.go b/internal/rendezvoushash/rendezvoushash.go new file mode 100644 index 0000000..2832325 --- /dev/null +++ b/internal/rendezvoushash/rendezvoushash.go @@ -0,0 +1,58 @@ +package rendezvoushash + +import ( + "crypto/sha1" + "hash/crc32" + + "github.com/go-redis/redis/v7/internal" +) + +type Map struct { + hash internal.Hash + sites []string +} + +func New(fn internal.Hash) *Map { + m := &Map{ + hash: fn, + } + if m.hash == nil { + m.hash = crc32.ChecksumIEEE + } + return m +} + +// Returns true if there are no items available. +func (m *Map) IsEmpty() bool { + return len(m.sites) == 0 +} + +// Adds some keys to the hash. +func (m *Map) Add(sites ...string) { + for _, site := range sites { + m.sites = append(m.sites, site) + } +} + +// Gets the closest item in the hash to the provided key. +func (m *Map) Get(key string) string { + if m.IsEmpty() { + return "" + } + + // find the site that, when hashed with the key, yields the largest weight + maxWeight := uint32(0) + targetSite := "" + for _, site := range m.sites { + hasher := sha1.New() + hasher.Write([]byte(site + key)) + siteWeight := m.hash(hasher.Sum(nil)) + + if siteWeight > maxWeight { + maxWeight = siteWeight + targetSite = site + } + } + + return targetSite +} diff --git a/internal/rendezvoushash/rendezvoushash_test.go b/internal/rendezvoushash/rendezvoushash_test.go new file mode 100644 index 0000000..5aabf5e --- /dev/null +++ b/internal/rendezvoushash/rendezvoushash_test.go @@ -0,0 +1,86 @@ +package rendezvoushash + +import ( + "fmt" + "hash/crc32" + "testing" +) + +func TestHashing(t *testing.T) { + hash := New(crc32.ChecksumIEEE) + hash.Add("site1", "site2", "site3") + + verifyFn := func(cases map[string]string) { + for k, v := range cases { + site := hash.Get(k) + if site != v { + t.Errorf("Asking for %s, should have return site %s, returned site %s", k, v, site) + } + } + } + + testCases := map[string]string{ + "key1": "site2", + "key2": "site1", + "key3": "site2", + "key4": "site1", + "key5": "site2", + "key6": "site3", + "key7": "site1", + "key8": "site1", + "key9": "site3", + "key10": "site2", + "key11": "site3", + "key12": "site1", + "key13": "site2", + "key14": "site2", + "key15": "site3", + "key16": "site2", + } + + verifyFn(testCases) + + hash.Add("site4") + + // remaps existing keys to all sites + testCases["key1"] = "site4" + testCases["key2"] = "site4" + testCases["key9"] = "site4" + testCases["key10"] = "site4" + testCases["key11"] = "site4" + testCases["key12"] = "site4" + testCases["key15"] = "site4" + + // add new keys + testCases["key17"] = "site1" + testCases["key18"] = "site2" + testCases["key19"] = "site4" + testCases["key20"] = "site4" + testCases["key21"] = "site1" + testCases["key22"] = "site2" + + verifyFn(testCases) +} + +func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) } +func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) } +func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) } +func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) } + +func benchmarkGet(b *testing.B, shards int) { + + hash := New(nil) + + var buckets []string + for i := 0; i < shards; i++ { + buckets = append(buckets, fmt.Sprintf("shard-%d", i)) + } + + hash.Add(buckets...) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + hash.Get(buckets[i&(shards-1)]) + } +} diff --git a/ring.go b/ring.go index 44fc623..8410394 100644 --- a/ring.go +++ b/ring.go @@ -14,10 +14,24 @@ import ( "github.com/go-redis/redis/v7/internal/consistenthash" "github.com/go-redis/redis/v7/internal/hashtag" "github.com/go-redis/redis/v7/internal/pool" + "github.com/go-redis/redis/v7/internal/rendezvoushash" ) -// Hash is type of hash function used in consistent hash. -type Hash consistenthash.Hash +// Hash is type of hash function used in hash maps. +type Hash internal.Hash + +type HashAlgorithm int + +type HashMap interface { + IsEmpty() bool + Add(keys ...string) + Get(key string) string +} + +const ( + ConsistentHash HashAlgorithm = iota + RendezvousHash +) var errRingShardsDown = errors.New("redis: all ring shards are down") @@ -35,10 +49,12 @@ type RingOptions struct { // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration - // Hash function used in consistent hash. + // Hash function used in the hash algorithm. // Default is crc32.ChecksumIEEE. Hash Hash + HashAlgorithm HashAlgorithm + // Number of replicas in consistent hash. // Default is 100 replicas. // @@ -183,7 +199,7 @@ type ringShards struct { opt *RingOptions mu sync.RWMutex - hash *consistenthash.Map + hash HashMap shards map[string]*ringShard // read only list []*ringShard // read only len int @@ -194,7 +210,7 @@ func newRingShards(opt *RingOptions) *ringShards { return &ringShards{ opt: opt, - hash: newConsistentHash(opt), + hash: newHash(opt), shards: make(map[string]*ringShard), } } @@ -294,7 +310,7 @@ func (c *ringShards) rebalance() { shards := c.shards c.mu.RUnlock() - hash := newConsistentHash(c.opt) + hash := newHash(c.opt) var shardsNum int for name, shard := range shards { if shard.IsUp() { @@ -346,7 +362,7 @@ type ring struct { cmdsInfoCache *cmdsInfoCache //nolint:structcheck } -// Ring is a Redis client that uses consistent hashing to distribute +// Ring is a Redis client that uses hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // @@ -721,6 +737,17 @@ func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error { return shards[0].Client.Watch(fn, keys...) } -func newConsistentHash(opt *RingOptions) *consistenthash.Map { - return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) +func newHash(opt *RingOptions) HashMap { + if opt.HashAlgorithm == RendezvousHash { + return newRendezvousHash(opt) + } + return newConsistentHash(opt) +} + +func newConsistentHash(opt *RingOptions) HashMap { + return consistenthash.New(opt.HashReplicas, internal.Hash(opt.Hash)) +} + +func newRendezvousHash(opt *RingOptions) HashMap { + return rendezvoushash.New(internal.Hash(opt.Hash)) }