Compare commits

...

2 Commits

Author SHA1 Message Date
Vladimir Mihailenco 039194a5d3 Switch to xxhash64 2020-04-19 08:52:58 +03:00
rafaeleyng dbd3e7ca18 implement randezvous-hashing as an option for ring 2020-04-18 19:18:11 -03:00
6 changed files with 242 additions and 24 deletions

View File

@ -18,28 +18,28 @@ limitations under the License.
package consistenthash package consistenthash
import ( import (
"hash/crc32"
"sort" "sort"
"strconv" "strconv"
"github.com/cespare/xxhash"
"github.com/go-redis/redis/v7/internal"
) )
type Hash func(data []byte) uint32
type Map struct { type Map struct {
hash Hash hash internal.Hash
replicas int replicas int
keys []int // Sorted keys []uint64 // Sorted
hashMap map[int]string hashMap map[uint64]string
} }
func New(replicas int, fn Hash) *Map { func New(replicas int, fn internal.Hash) *Map {
m := &Map{ m := &Map{
replicas: replicas, replicas: replicas,
hash: fn, hash: fn,
hashMap: make(map[int]string), hashMap: make(map[uint64]string),
} }
if m.hash == nil { if m.hash == nil {
m.hash = crc32.ChecksumIEEE m.hash = xxhash.Sum64
} }
return m return m
} }
@ -53,12 +53,14 @@ func (m *Map) IsEmpty() bool {
func (m *Map) Add(keys ...string) { func (m *Map) Add(keys ...string) {
for _, key := range keys { for _, key := range keys {
for i := 0; i < m.replicas; i++ { for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key))) hash := m.hash([]byte(strconv.Itoa(i) + key))
m.keys = append(m.keys, hash) m.keys = append(m.keys, hash)
m.hashMap[hash] = key m.hashMap[hash] = key
} }
} }
sort.Ints(m.keys) sort.Slice(m.keys, func(i, j int) bool {
return m.keys[i] < m.keys[j]
})
} }
// Gets the closest item in the hash to the provided key. // Gets the closest item in the hash to the provided key.
@ -67,7 +69,7 @@ func (m *Map) Get(key string) string {
return "" return ""
} }
hash := int(m.hash([]byte(key))) hash := m.hash([]byte(key))
// Binary search for appropriate replica. // Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

View File

@ -26,12 +26,12 @@ func TestHashing(t *testing.T) {
// Override the hash function to return easier to reason about values. Assumes // Override the hash function to return easier to reason about values. Assumes
// the keys can be converted to an integer. // the keys can be converted to an integer.
hash := New(3, func(key []byte) uint32 { hash := New(3, func(key []byte) uint64 {
i, err := strconv.Atoi(string(key)) i, err := strconv.ParseUint(string(key), 10, 64)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return uint32(i) return i
}) })
// Given the above hash function, this will give replicas with "hashes": // Given the above hash function, this will give replicas with "hashes":
@ -76,6 +76,7 @@ func TestConsistency(t *testing.T) {
t.Errorf("Fetching 'Ben' from both hashes should be the same") t.Errorf("Fetching 'Ben' from both hashes should be the same")
} }
hash1.Add("Ben", "Becky", "Bobby")
hash2.Add("Becky", "Ben", "Bobby") hash2.Add("Becky", "Ben", "Bobby")
if hash1.Get("Ben") != hash2.Get("Ben") || if hash1.Get("Ben") != hash2.Get("Ben") ||
@ -108,3 +109,23 @@ func benchmarkGet(b *testing.B, shards int) {
hash.Get(buckets[i&(shards-1)]) hash.Get(buckets[i&(shards-1)])
} }
} }
func TestDistribution(t *testing.T) {
hash := New(1000, nil)
hash.Add("1", "2", "3", "4", "5", "6")
results := make(map[string]int, 10)
for i := 0; i < 1000000; i++ {
key := strconv.Itoa(i)
site := hash.Get(key)
if val, ok := results[site]; ok {
results[site] = val + 1
} else {
results[site] = 1
}
}
fmt.Println(results)
}

3
internal/hash.go Normal file
View File

@ -0,0 +1,3 @@
package internal
type Hash func(data []byte) uint64

View File

@ -0,0 +1,59 @@
package rendezvoushash
import (
"github.com/cespare/xxhash"
"github.com/go-redis/redis/v7/internal"
)
type Map struct {
hash internal.Hash
sites []string
}
func New(fn internal.Hash) *Map {
m := &Map{
hash: fn,
}
if m.hash == nil {
m.hash = xxhash.Sum64
}
return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
return len(m.sites) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(sites ...string) {
for _, site := range sites {
m.sites = append(m.sites, site)
}
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
// find the site that, when hashed with the key, yields the largest weight
var targetSite string
var maxWeight uint64
buf := make([]byte, len(key), 2*len(key))
copy(buf, key)
for _, site := range m.sites {
buf = buf[:len(key)]
buf = append(buf, site...)
siteWeight := m.hash(buf)
if siteWeight > maxWeight {
maxWeight = siteWeight
targetSite = site
}
}
return targetSite
}

View File

@ -0,0 +1,106 @@
package rendezvoushash
import (
"fmt"
"strconv"
"testing"
)
func TestHashing(t *testing.T) {
hash := New(nil)
hash.Add("site1", "site2", "site3")
verifyFn := func(cases map[string]string) {
for k, v := range cases {
site := hash.Get(k)
if site != v {
t.Errorf("Asking for %s, should have return site %s, returned site %s", k, v, site)
}
}
}
testCases := map[string]string{
"key1": "site2",
"key2": "site1",
"key3": "site2",
"key4": "site1",
"key5": "site2",
"key6": "site3",
"key7": "site1",
"key8": "site1",
"key9": "site3",
"key10": "site2",
"key11": "site3",
"key12": "site1",
"key13": "site2",
"key14": "site2",
"key15": "site3",
"key16": "site2",
}
verifyFn(testCases)
hash.Add("site4")
// remaps existing keys to all sites
testCases["key1"] = "site4"
testCases["key2"] = "site4"
testCases["key9"] = "site4"
testCases["key10"] = "site4"
testCases["key11"] = "site4"
testCases["key12"] = "site4"
testCases["key15"] = "site4"
// add new keys
testCases["key17"] = "site1"
testCases["key18"] = "site2"
testCases["key19"] = "site4"
testCases["key20"] = "site4"
testCases["key21"] = "site1"
testCases["key22"] = "site2"
verifyFn(testCases)
}
func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }
func benchmarkGet(b *testing.B, shards int) {
hash := New(nil)
var buckets []string
for i := 0; i < shards; i++ {
buckets = append(buckets, fmt.Sprintf("shard-%d", i))
}
hash.Add(buckets...)
b.ResetTimer()
for i := 0; i < b.N; i++ {
hash.Get(buckets[i&(shards-1)])
}
}
func TestDistribution(t *testing.T) {
hash := New(nil)
hash.Add("1", "2", "3", "4", "5", "6")
results := make(map[string]int, 10)
for i := 0; i < 1000000; i++ {
key := strconv.Itoa(i)
site := hash.Get(key)
if val, ok := results[site]; ok {
results[site] = val + 1
} else {
results[site] = 1
}
}
fmt.Println(results)
}

45
ring.go
View File

@ -14,10 +14,24 @@ import (
"github.com/go-redis/redis/v7/internal/consistenthash" "github.com/go-redis/redis/v7/internal/consistenthash"
"github.com/go-redis/redis/v7/internal/hashtag" "github.com/go-redis/redis/v7/internal/hashtag"
"github.com/go-redis/redis/v7/internal/pool" "github.com/go-redis/redis/v7/internal/pool"
"github.com/go-redis/redis/v7/internal/rendezvoushash"
) )
// Hash is type of hash function used in consistent hash. // Hash is type of hash function used in hash maps.
type Hash consistenthash.Hash type Hash internal.Hash
type HashAlgorithm int
type HashMap interface {
IsEmpty() bool
Add(keys ...string)
Get(key string) string
}
const (
ConsistentHash HashAlgorithm = iota
RendezvousHash
)
var errRingShardsDown = errors.New("redis: all ring shards are down") var errRingShardsDown = errors.New("redis: all ring shards are down")
@ -35,10 +49,12 @@ type RingOptions struct {
// Shard is considered down after 3 subsequent failed checks. // Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration HeartbeatFrequency time.Duration
// Hash function used in consistent hash. // Hash function used in the hash algorithm.
// Default is crc32.ChecksumIEEE. // Default is crc32.ChecksumIEEE.
Hash Hash Hash Hash
HashAlgorithm HashAlgorithm
// Number of replicas in consistent hash. // Number of replicas in consistent hash.
// Default is 100 replicas. // Default is 100 replicas.
// //
@ -183,7 +199,7 @@ type ringShards struct {
opt *RingOptions opt *RingOptions
mu sync.RWMutex mu sync.RWMutex
hash *consistenthash.Map hash HashMap
shards map[string]*ringShard // read only shards map[string]*ringShard // read only
list []*ringShard // read only list []*ringShard // read only
len int len int
@ -194,7 +210,7 @@ func newRingShards(opt *RingOptions) *ringShards {
return &ringShards{ return &ringShards{
opt: opt, opt: opt,
hash: newConsistentHash(opt), hash: newHash(opt),
shards: make(map[string]*ringShard), shards: make(map[string]*ringShard),
} }
} }
@ -294,7 +310,7 @@ func (c *ringShards) rebalance() {
shards := c.shards shards := c.shards
c.mu.RUnlock() c.mu.RUnlock()
hash := newConsistentHash(c.opt) hash := newHash(c.opt)
var shardsNum int var shardsNum int
for name, shard := range shards { for name, shard := range shards {
if shard.IsUp() { if shard.IsUp() {
@ -346,7 +362,7 @@ type ring struct {
cmdsInfoCache *cmdsInfoCache //nolint:structcheck cmdsInfoCache *cmdsInfoCache //nolint:structcheck
} }
// Ring is a Redis client that uses consistent hashing to distribute // Ring is a Redis client that uses hashing to distribute
// keys across multiple Redis servers (shards). It's safe for // keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines. // concurrent use by multiple goroutines.
// //
@ -721,6 +737,17 @@ func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
return shards[0].Client.Watch(fn, keys...) return shards[0].Client.Watch(fn, keys...)
} }
func newConsistentHash(opt *RingOptions) *consistenthash.Map { func newHash(opt *RingOptions) HashMap {
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) if opt.HashAlgorithm == RendezvousHash {
return newRendezvousHash(opt)
}
return newConsistentHash(opt)
}
func newConsistentHash(opt *RingOptions) HashMap {
return consistenthash.New(opt.HashReplicas, internal.Hash(opt.Hash))
}
func newRendezvousHash(opt *RingOptions) HashMap {
return rendezvoushash.New(internal.Hash(opt.Hash))
} }