Use Rendezvous in Ring. Thanks @rafaeleyng for initial idea and implementation

This commit is contained in:
Vladimir Mihailenco 2020-06-08 12:42:43 +03:00
parent 10561b3aa4
commit 694e518a8c
8 changed files with 178 additions and 306 deletions

4
.prettierrc Normal file
View File

@ -0,0 +1,4 @@
semi: false
singleQuote: true
proseWrap: always
printWidth: 80

View File

@ -2,37 +2,63 @@
## v8 (unreleased) ## v8 (unreleased)
- All commands accept `context.Context` as first argument. - All commands require `context.Context` as a first argument, e.g.
`rdb.Ping(ctx)`. If you are not using `context.Context` yet, the simplest
option is to define package variable `var ctx = context.TODO()` and use it
when `ctx` is expected.
- Ring uses Rendezvous Hashing by default which provides better distribution.
This means that existing keys must be moved to a new location or key will be
inaccessible / lost. To use old hashing scheme:
```go
import "github.com/golang/groupcache/consistenthash"
ring := redis.NewRing(&redis.RingOptions{
NewConsistentHash: func() {
return consistenthash.New(100, crc32.ChecksumIEEE)
},
})
```
- Basic support for OpenTelemetry instrumentation. - Basic support for OpenTelemetry instrumentation.
## v7.3 ## v7.3
- New option `Options.Username` which causes client to use `AuthACL`. Be aware if your connection URL contains username. - New option `Options.Username` which causes client to use `AuthACL`. Be aware
if your connection URL contains username.
## v7.2 ## v7.2
- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users. - Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored
for Redis 3 users.
## v7.1 ## v7.1
- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer` interface. - Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements
`fmt.Stringer` interface.
## v7 ## v7
- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline. - _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use
- WrapProcess is replaced with more convenient AddHook that has access to context.Context. Tx.TxPipeline for a transactional pipeline.
- WrapProcess is replaced with more convenient AddHook that has access to
context.Context.
- WithContext now can not be used to create a shallow copy of the client. - WithContext now can not be used to create a shallow copy of the client.
- New methods ProcessContext, DoContext, and ExecContext. - New methods ProcessContext, DoContext, and ExecContext.
- Client respects Context.Deadline when setting net.Conn deadline. - Client respects Context.Deadline when setting net.Conn deadline.
- Client listens on Context.Done while waiting for a connection from the pool and returns an error when context context is cancelled. - Client listens on Context.Done while waiting for a connection from the pool
- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow detecting reconnections. and returns an error when context context is cancelled.
- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse the time. - Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to
`*Message` to allow detecting reconnections.
- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()`
helper is added to parse the time.
- `SetLimiter` is removed and added `Options.Limiter` instead. - `SetLimiter` is removed and added `Options.Limiter` instead.
- `HMSet` is deprecated as of Redis v4. - `HMSet` is deprecated as of Redis v4.
## v6.15 ## v6.15
- Cluster and Ring pipelines process commands for each node in its own goroutine. - Cluster and Ring pipelines process commands for each node in its own
goroutine.
## 6.14 ## 6.14
@ -40,16 +66,23 @@
- Added Options.MaxConnAge. - Added Options.MaxConnAge.
- PoolStats.FreeConns is renamed to PoolStats.IdleConns. - PoolStats.FreeConns is renamed to PoolStats.IdleConns.
- Add Client.Do to simplify creating custom commands. - Add Client.Do to simplify creating custom commands.
- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. - Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool
helpers.
- Lower memory usage. - Lower memory usage.
## v6.13 ## v6.13
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. - Ring got new options called `HashReplicas` and `Hash`. It is recommended to
- Cluster client was optimized to use much less memory when reloading cluster state. set `HashReplicas = 1000` for better keys distribution between shards.
- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. - Cluster client was optimized to use much less memory when reloading cluster
state.
- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not
lose data when timeout occurres. In most cases it is recommended to use
PubSub.Channel instead.
- Dialer.KeepAlive is set to 5 minutes by default. - Dialer.KeepAlive is set to 5 minutes by default.
## v6.12 ## v6.12
- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup - ClusterClient got new option called `ClusterSlots` which allows to build
cluster of normal Redis Servers that don't have cluster mode enabled. See
https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup

2
go.mod
View File

@ -1,6 +1,8 @@
module github.com/go-redis/redis/v8 module github.com/go-redis/redis/v8
require ( require (
github.com/cespare/xxhash v1.1.0
github.com/dgryski/go-rendezvous v0.0.0-20180401054734-3692eb46c031
github.com/onsi/ginkgo v1.10.1 github.com/onsi/ginkgo v1.10.1
github.com/onsi/gomega v1.7.0 github.com/onsi/gomega v1.7.0
go.opentelemetry.io/otel v0.5.0 go.opentelemetry.io/otel v0.5.0

6
go.sum
View File

@ -2,14 +2,19 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs= github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60= github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w= github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w=
github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20180401054734-3692eb46c031 h1:GqrUYGzmGuc00lpc+K0wwrqshfkKLwgYFJiCyOZFMVE=
github.com/dgryski/go-rendezvous v0.0.0-20180401054734-3692eb46c031/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@ -45,6 +50,7 @@ github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mo
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=

View File

@ -1,81 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package consistenthash provides an implementation of a ring hash.
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
type Hash func(data []byte) uint32
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
return len(m.keys) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
// Means we have cycled back to the first replica.
if idx == len(m.keys) {
idx = 0
}
return m.hashMap[m.keys[idx]]
}

View File

@ -1,110 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistenthash
import (
"fmt"
"strconv"
"testing"
)
func TestHashing(t *testing.T) {
// Override the hash function to return easier to reason about values. Assumes
// the keys can be converted to an integer.
hash := New(3, func(key []byte) uint32 {
i, err := strconv.Atoi(string(key))
if err != nil {
panic(err)
}
return uint32(i)
})
// Given the above hash function, this will give replicas with "hashes":
// 2, 4, 6, 12, 14, 16, 22, 24, 26
hash.Add("6", "4", "2")
testCases := map[string]string{
"2": "2",
"11": "2",
"23": "4",
"27": "2",
}
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
// Adds 8, 18, 28
hash.Add("8")
// 27 should now map to 8.
testCases["27"] = "8"
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
}
func TestConsistency(t *testing.T) {
hash1 := New(1, nil)
hash2 := New(1, nil)
hash1.Add("Bill", "Bob", "Bonny")
hash2.Add("Bob", "Bonny", "Bill")
if hash1.Get("Ben") != hash2.Get("Ben") {
t.Errorf("Fetching 'Ben' from both hashes should be the same")
}
hash2.Add("Becky", "Ben", "Bobby")
if hash1.Get("Ben") != hash2.Get("Ben") ||
hash1.Get("Bob") != hash2.Get("Bob") ||
hash1.Get("Bonny") != hash2.Get("Bonny") {
t.Errorf("Direct matches should always return the same entry")
}
}
func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }
func benchmarkGet(b *testing.B, shards int) {
hash := New(50, nil)
var buckets []string
for i := 0; i < shards; i++ {
buckets = append(buckets, fmt.Sprintf("shard-%d", i))
}
hash.Add(buckets...)
b.ResetTimer()
for i := 0; i < b.N; i++ {
hash.Get(buckets[i&(shards-1)])
}
}

186
ring.go
View File

@ -10,55 +10,61 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/cespare/xxhash"
"github.com/dgryski/go-rendezvous"
"github.com/go-redis/redis/v8/internal" "github.com/go-redis/redis/v8/internal"
"github.com/go-redis/redis/v8/internal/consistenthash"
"github.com/go-redis/redis/v8/internal/hashtag" "github.com/go-redis/redis/v8/internal/hashtag"
"github.com/go-redis/redis/v8/internal/pool" "github.com/go-redis/redis/v8/internal/pool"
) )
// Hash is type of hash function used in consistent hash.
type Hash consistenthash.Hash
var errRingShardsDown = errors.New("redis: all ring shards are down") var errRingShardsDown = errors.New("redis: all ring shards are down")
//------------------------------------------------------------------------------
type ConsistentHash interface {
Get(string) string
}
type rendezvousWrapper struct {
*rendezvous.Rendezvous
}
func (w rendezvousWrapper) Get(key string) string {
return w.Lookup(key)
}
func newRendezvous(shards []string) ConsistentHash {
return rendezvousWrapper{rendezvous.New(shards, xxhash.Sum64String)}
}
//------------------------------------------------------------------------------
// RingOptions are used to configure a ring client and should be // RingOptions are used to configure a ring client and should be
// passed to NewRing. // passed to NewRing.
type RingOptions struct { type RingOptions struct {
// Map of name => host:port addresses of ring shards. // Map of name => host:port addresses of ring shards.
Addrs map[string]string Addrs map[string]string
// NewClient creates a shard client with provided name and options.
NewClient func(name string, opt *Options) *Client
// Frequency of PING commands sent to check shards availability. // Frequency of PING commands sent to check shards availability.
// Shard is considered down after 3 subsequent failed checks. // Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration HeartbeatFrequency time.Duration
// Hash function used in consistent hash. // NewConsistentHash returns a consistent hash that is used
// Default is crc32.ChecksumIEEE. // to distribute keys across the shards.
Hash Hash
// Number of replicas in consistent hash.
// Default is 100 replicas.
// //
// Higher number of replicas will provide less deviation, that is keys will be // See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8
// distributed to nodes more evenly. // for consistent hashing algorithmic tradeoffs.
// NewConsistentHash func(shards []string) ConsistentHash
// Following is deviation for common nreplicas:
// --------------------------------------------------------
// | nreplicas | standard error | 99% confidence interval |
// | 10 | 0.3152 | (0.37, 1.98) |
// | 100 | 0.0997 | (0.76, 1.28) |
// | 1000 | 0.0316 | (0.92, 1.09) |
// --------------------------------------------------------
//
// See https://arxiv.org/abs/1406.2294 for reference
HashReplicas int
// NewClient creates a shard client with provided name and options.
NewClient func(name string, opt *Options) *Client
// Following options are copied from Options struct. // Following options are copied from Options struct.
OnConnect func(*Conn) error OnConnect func(*Conn) error
Username string
DB int DB int
Password string Password string
@ -79,12 +85,18 @@ type RingOptions struct {
} }
func (opt *RingOptions) init() { func (opt *RingOptions) init() {
if opt.NewClient == nil {
opt.NewClient = func(name string, opt *Options) *Client {
return NewClient(opt)
}
}
if opt.HeartbeatFrequency == 0 { if opt.HeartbeatFrequency == 0 {
opt.HeartbeatFrequency = 500 * time.Millisecond opt.HeartbeatFrequency = 500 * time.Millisecond
} }
if opt.HashReplicas == 0 { if opt.NewConsistentHash == nil {
opt.HashReplicas = 100 opt.NewConsistentHash = newRendezvous
} }
switch opt.MinRetryBackoff { switch opt.MinRetryBackoff {
@ -127,6 +139,15 @@ type ringShard struct {
down int32 down int32
} }
func newRingShard(opt *RingOptions, name, addr string) *ringShard {
clopt := opt.clientOptions()
clopt.Addr = addr
return &ringShard{
Client: opt.NewClient(name, clopt),
}
}
func (shard *ringShard) String() string { func (shard *ringShard) String() string {
var state string var state string
if shard.IsUp() { if shard.IsUp() {
@ -168,40 +189,58 @@ type ringShards struct {
opt *RingOptions opt *RingOptions
mu sync.RWMutex mu sync.RWMutex
hash *consistenthash.Map hash ConsistentHash
shards map[string]*ringShard // read only shards map[string]*ringShard // read only
list []*ringShard // read only list []*ringShard // read only
len int numShard int
closed bool closed bool
} }
func newRingShards(opt *RingOptions) *ringShards { func newRingShards(opt *RingOptions) *ringShards {
return &ringShards{ shards := make(map[string]*ringShard, len(opt.Addrs))
list := make([]*ringShard, 0, len(shards))
for name, addr := range opt.Addrs {
shard := newRingShard(opt, name, addr)
shards[name] = shard
list = append(list, shard)
}
c := &ringShards{
opt: opt, opt: opt,
hash: newConsistentHash(opt), shards: shards,
shards: make(map[string]*ringShard), list: list,
}
} }
c.rebalance()
func (c *ringShards) Add(name string, cl *Client) { return c
shard := &ringShard{Client: cl}
c.hash.Add(name)
c.shards[name] = shard
c.list = append(c.list, shard)
} }
func (c *ringShards) List() []*ringShard { func (c *ringShards) List() []*ringShard {
var list []*ringShard
c.mu.RLock() c.mu.RLock()
list := c.list if !c.closed {
list = c.list
}
c.mu.RUnlock() c.mu.RUnlock()
return list return list
} }
func (c *ringShards) Hash(key string) string { func (c *ringShards) Hash(key string) string {
key = hashtag.Key(key)
var hash string
c.mu.RLock() c.mu.RLock()
hash := c.hash.Get(key) if c.numShard > 0 {
hash = c.hash.Get(key)
}
c.mu.RUnlock() c.mu.RUnlock()
return hash return hash
} }
@ -215,6 +254,11 @@ func (c *ringShards) GetByKey(key string) (*ringShard, error) {
return nil, pool.ErrClosed return nil, pool.ErrClosed
} }
if c.numShard == 0 {
c.mu.RUnlock()
return nil, errRingShardsDown
}
hash := c.hash.Get(key) hash := c.hash.Get(key)
if hash == "" { if hash == "" {
c.mu.RUnlock() c.mu.RUnlock()
@ -227,13 +271,13 @@ func (c *ringShards) GetByKey(key string) (*ringShard, error) {
return shard, nil return shard, nil
} }
func (c *ringShards) GetByHash(name string) (*ringShard, error) { func (c *ringShards) GetByName(shardName string) (*ringShard, error) {
if name == "" { if shardName == "" {
return c.Random() return c.Random()
} }
c.mu.RLock() c.mu.RLock()
shard := c.shards[name] shard := c.shards[shardName]
c.mu.RUnlock() c.mu.RUnlock()
return shard, nil return shard, nil
} }
@ -247,23 +291,14 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
ticker := time.NewTicker(frequency) ticker := time.NewTicker(frequency)
defer ticker.Stop() defer ticker.Stop()
ctx := context.TODO() ctx := context.Background()
for range ticker.C { for range ticker.C {
var rebalance bool var rebalance bool
c.mu.RLock() for _, shard := range c.List() {
if c.closed {
c.mu.RUnlock()
break
}
shards := c.list
c.mu.RUnlock()
for _, shard := range shards {
err := shard.Client.Ping(ctx).Err() err := shard.Client.Ping(ctx).Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { isUp := err == nil || err == pool.ErrPoolTimeout
if shard.Vote(isUp) {
internal.Logger.Printf("ring shard state changed: %s", shard) internal.Logger.Printf("ring shard state changed: %s", shard)
rebalance = true rebalance = true
} }
@ -281,24 +316,25 @@ func (c *ringShards) rebalance() {
shards := c.shards shards := c.shards
c.mu.RUnlock() c.mu.RUnlock()
hash := newConsistentHash(c.opt) liveShards := make([]string, 0, len(shards))
var shardsNum int
for name, shard := range shards { for name, shard := range shards {
if shard.IsUp() { if shard.IsUp() {
hash.Add(name) liveShards = append(liveShards, name)
shardsNum++
} }
} }
hash := c.opt.NewConsistentHash(liveShards)
c.mu.Lock() c.mu.Lock()
c.hash = hash c.hash = hash
c.len = shardsNum c.numShard = len(liveShards)
c.mu.Unlock() c.mu.Unlock()
} }
func (c *ringShards) Len() int { func (c *ringShards) Len() int {
c.mu.RLock() c.mu.RLock()
l := c.len l := c.numShard
c.mu.RUnlock() c.mu.RUnlock()
return l return l
} }
@ -364,29 +400,15 @@ func NewRing(opt *RingOptions) *Ring {
}, },
ctx: context.Background(), ctx: context.Background(),
} }
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.cmdable = ring.Process ring.cmdable = ring.Process
for name, addr := range opt.Addrs {
shard := newRingShard(opt, name, addr)
ring.shards.Add(name, shard)
}
go ring.shards.Heartbeat(opt.HeartbeatFrequency) go ring.shards.Heartbeat(opt.HeartbeatFrequency)
return &ring return &ring
} }
func newRingShard(opt *RingOptions, name, addr string) *Client {
clopt := opt.clientOptions()
clopt.Addr = addr
if opt.NewClient != nil {
return opt.NewClient(name, clopt)
}
return NewClient(clopt)
}
func (c *Ring) Context() context.Context { func (c *Ring) Context() context.Context {
return c.ctx return c.ctx
} }
@ -623,7 +645,7 @@ func (c *Ring) generalProcessPipeline(
cmdInfo := c.cmdInfo(cmd.Name()) cmdInfo := c.cmdInfo(cmd.Name())
hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
if hash != "" { if hash != "" {
hash = c.shards.Hash(hashtag.Key(hash)) hash = c.shards.Hash(hash)
} }
cmdsMap[hash] = append(cmdsMap[hash], cmd) cmdsMap[hash] = append(cmdsMap[hash], cmd)
} }
@ -646,7 +668,7 @@ func (c *Ring) processShardPipeline(
ctx context.Context, hash string, cmds []Cmder, tx bool, ctx context.Context, hash string, cmds []Cmder, tx bool,
) error { ) error {
//TODO: retry? //TODO: retry?
shard, err := c.shards.GetByHash(hash) shard, err := c.shards.GetByName(hash)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
@ -700,7 +722,3 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
return shards[0].Client.Watch(ctx, fn, keys...) return shards[0].Client.Watch(ctx, fn, keys...)
} }
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
}

View File

@ -54,8 +54,8 @@ var _ = Describe("Redis Ring", func() {
setRingKeys() setRingKeys()
// Both shards should have some keys now. // Both shards should have some keys now.
Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=57")) Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56"))
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=43")) Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
}) })
It("distributes keys when using EVAL", func() { It("distributes keys when using EVAL", func() {
@ -71,8 +71,8 @@ var _ = Describe("Redis Ring", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=57")) Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56"))
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=43")) Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
}) })
It("uses single shard when one of the shards is down", func() { It("uses single shard when one of the shards is down", func() {
@ -100,7 +100,7 @@ var _ = Describe("Redis Ring", func() {
setRingKeys() setRingKeys()
// RingShard2 should have its keys. // RingShard2 should have its keys.
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=43")) Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
}) })
It("supports hash tags", func() { It("supports hash tags", func() {
@ -131,8 +131,8 @@ var _ = Describe("Redis Ring", func() {
} }
// Both shards should have some keys now. // Both shards should have some keys now.
Expect(ringShard1.Info(ctx).Val()).To(ContainSubstring("keys=57")) Expect(ringShard1.Info(ctx).Val()).To(ContainSubstring("keys=56"))
Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=43")) Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=44"))
}) })
It("is consistent with ring", func() { It("is consistent with ring", func() {
@ -427,22 +427,22 @@ var _ = Describe("Ring watch", func() {
It("should discard", func() { It("should discard", func() {
err := ring.Watch(ctx, func(tx *redis.Tx) error { err := ring.Watch(ctx, func(tx *redis.Tx) error {
cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "key1", "hello1", 0) pipe.Set(ctx, "{shard}key1", "hello1", 0)
pipe.Discard() pipe.Discard()
pipe.Set(ctx, "key2", "hello2", 0) pipe.Set(ctx, "{shard}key2", "hello2", 0)
return nil return nil
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
return err return err
}, "key1", "key2") }, "{shard}key1", "{shard}key2")
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
get := ring.Get(ctx, "key1") get := ring.Get(ctx, "{shard}key1")
Expect(get.Err()).To(Equal(redis.Nil)) Expect(get.Err()).To(Equal(redis.Nil))
Expect(get.Val()).To(Equal("")) Expect(get.Val()).To(Equal(""))
get = ring.Get(ctx, "key2") get = ring.Get(ctx, "{shard}key2")
Expect(get.Err()).NotTo(HaveOccurred()) Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("hello2")) Expect(get.Val()).To(Equal("hello2"))
}) })