diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 00000000..d44905ff --- /dev/null +++ b/.prettierrc @@ -0,0 +1,4 @@ +semi: false +singleQuote: true +proseWrap: always +printWidth: 80 diff --git a/CHANGELOG.md b/CHANGELOG.md index 3764d87c..fc4dc03e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,37 +2,63 @@ ## v8 (unreleased) -- All commands accept `context.Context` as first argument. +- All commands require `context.Context` as a first argument, e.g. + `rdb.Ping(ctx)`. If you are not using `context.Context` yet, the simplest + option is to define package variable `var ctx = context.TODO()` and use it + when `ctx` is expected. +- Ring uses Rendezvous Hashing by default which provides better distribution. + This means that existing keys must be moved to a new location or key will be + inaccessible / lost. To use old hashing scheme: + +```go +import "github.com/golang/groupcache/consistenthash" + +ring := redis.NewRing(&redis.RingOptions{ + NewConsistentHash: func() { + return consistenthash.New(100, crc32.ChecksumIEEE) + }, +}) +``` + - Basic support for OpenTelemetry instrumentation. ## v7.3 -- New option `Options.Username` which causes client to use `AuthACL`. Be aware if your connection URL contains username. +- New option `Options.Username` which causes client to use `AuthACL`. Be aware + if your connection URL contains username. ## v7.2 -- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users. +- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored + for Redis 3 users. ## v7.1 -- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer` interface. +- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements + `fmt.Stringer` interface. ## v7 -- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline. -- WrapProcess is replaced with more convenient AddHook that has access to context.Context. +- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use + Tx.TxPipeline for a transactional pipeline. +- WrapProcess is replaced with more convenient AddHook that has access to + context.Context. - WithContext now can not be used to create a shallow copy of the client. - New methods ProcessContext, DoContext, and ExecContext. - Client respects Context.Deadline when setting net.Conn deadline. -- Client listens on Context.Done while waiting for a connection from the pool and returns an error when context context is cancelled. -- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow detecting reconnections. -- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse the time. +- Client listens on Context.Done while waiting for a connection from the pool + and returns an error when context context is cancelled. +- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to + `*Message` to allow detecting reconnections. +- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` + helper is added to parse the time. - `SetLimiter` is removed and added `Options.Limiter` instead. - `HMSet` is deprecated as of Redis v4. ## v6.15 -- Cluster and Ring pipelines process commands for each node in its own goroutine. +- Cluster and Ring pipelines process commands for each node in its own + goroutine. ## 6.14 @@ -40,16 +66,23 @@ - Added Options.MaxConnAge. - PoolStats.FreeConns is renamed to PoolStats.IdleConns. - Add Client.Do to simplify creating custom commands. -- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. +- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool + helpers. - Lower memory usage. ## v6.13 -- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. -- Cluster client was optimized to use much less memory when reloading cluster state. -- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. +- Ring got new options called `HashReplicas` and `Hash`. It is recommended to + set `HashReplicas = 1000` for better keys distribution between shards. +- Cluster client was optimized to use much less memory when reloading cluster + state. +- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not + lose data when timeout occurres. In most cases it is recommended to use + PubSub.Channel instead. - Dialer.KeepAlive is set to 5 minutes by default. ## v6.12 -- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup +- ClusterClient got new option called `ClusterSlots` which allows to build + cluster of normal Redis Servers that don't have cluster mode enabled. See + https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup diff --git a/go.mod b/go.mod index 553e9949..94099e31 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/go-redis/redis/v8 require ( + github.com/cespare/xxhash v1.1.0 + github.com/dgryski/go-rendezvous v0.0.0-20180401054734-3692eb46c031 github.com/onsi/ginkgo v1.10.1 github.com/onsi/gomega v1.7.0 go.opentelemetry.io/otel v0.5.0 diff --git a/go.sum b/go.sum index b7f8fe65..969e540e 100644 --- a/go.sum +++ b/go.sum @@ -2,14 +2,19 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs= github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w= github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20180401054734-3692eb46c031 h1:GqrUYGzmGuc00lpc+K0wwrqshfkKLwgYFJiCyOZFMVE= +github.com/dgryski/go-rendezvous v0.0.0-20180401054734-3692eb46c031/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -45,6 +50,7 @@ github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mo github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= diff --git a/internal/consistenthash/consistenthash.go b/internal/consistenthash/consistenthash.go deleted file mode 100644 index a9c56f07..00000000 --- a/internal/consistenthash/consistenthash.go +++ /dev/null @@ -1,81 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package consistenthash provides an implementation of a ring hash. -package consistenthash - -import ( - "hash/crc32" - "sort" - "strconv" -) - -type Hash func(data []byte) uint32 - -type Map struct { - hash Hash - replicas int - keys []int // Sorted - hashMap map[int]string -} - -func New(replicas int, fn Hash) *Map { - m := &Map{ - replicas: replicas, - hash: fn, - hashMap: make(map[int]string), - } - if m.hash == nil { - m.hash = crc32.ChecksumIEEE - } - return m -} - -// Returns true if there are no items available. -func (m *Map) IsEmpty() bool { - return len(m.keys) == 0 -} - -// Adds some keys to the hash. -func (m *Map) Add(keys ...string) { - for _, key := range keys { - for i := 0; i < m.replicas; i++ { - hash := int(m.hash([]byte(strconv.Itoa(i) + key))) - m.keys = append(m.keys, hash) - m.hashMap[hash] = key - } - } - sort.Ints(m.keys) -} - -// Gets the closest item in the hash to the provided key. -func (m *Map) Get(key string) string { - if m.IsEmpty() { - return "" - } - - hash := int(m.hash([]byte(key))) - - // Binary search for appropriate replica. - idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) - - // Means we have cycled back to the first replica. - if idx == len(m.keys) { - idx = 0 - } - - return m.hashMap[m.keys[idx]] -} diff --git a/internal/consistenthash/consistenthash_test.go b/internal/consistenthash/consistenthash_test.go deleted file mode 100644 index 1a37fd7f..00000000 --- a/internal/consistenthash/consistenthash_test.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package consistenthash - -import ( - "fmt" - "strconv" - "testing" -) - -func TestHashing(t *testing.T) { - - // Override the hash function to return easier to reason about values. Assumes - // the keys can be converted to an integer. - hash := New(3, func(key []byte) uint32 { - i, err := strconv.Atoi(string(key)) - if err != nil { - panic(err) - } - return uint32(i) - }) - - // Given the above hash function, this will give replicas with "hashes": - // 2, 4, 6, 12, 14, 16, 22, 24, 26 - hash.Add("6", "4", "2") - - testCases := map[string]string{ - "2": "2", - "11": "2", - "23": "4", - "27": "2", - } - - for k, v := range testCases { - if hash.Get(k) != v { - t.Errorf("Asking for %s, should have yielded %s", k, v) - } - } - - // Adds 8, 18, 28 - hash.Add("8") - - // 27 should now map to 8. - testCases["27"] = "8" - - for k, v := range testCases { - if hash.Get(k) != v { - t.Errorf("Asking for %s, should have yielded %s", k, v) - } - } - -} - -func TestConsistency(t *testing.T) { - hash1 := New(1, nil) - hash2 := New(1, nil) - - hash1.Add("Bill", "Bob", "Bonny") - hash2.Add("Bob", "Bonny", "Bill") - - if hash1.Get("Ben") != hash2.Get("Ben") { - t.Errorf("Fetching 'Ben' from both hashes should be the same") - } - - hash2.Add("Becky", "Ben", "Bobby") - - if hash1.Get("Ben") != hash2.Get("Ben") || - hash1.Get("Bob") != hash2.Get("Bob") || - hash1.Get("Bonny") != hash2.Get("Bonny") { - t.Errorf("Direct matches should always return the same entry") - } - -} - -func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) } -func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) } -func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) } -func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) } - -func benchmarkGet(b *testing.B, shards int) { - - hash := New(50, nil) - - var buckets []string - for i := 0; i < shards; i++ { - buckets = append(buckets, fmt.Sprintf("shard-%d", i)) - } - - hash.Add(buckets...) - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - hash.Get(buckets[i&(shards-1)]) - } -} diff --git a/ring.go b/ring.go index 059ffdf9..9e02f944 100644 --- a/ring.go +++ b/ring.go @@ -10,55 +10,61 @@ import ( "sync/atomic" "time" + "github.com/cespare/xxhash" + "github.com/dgryski/go-rendezvous" + "github.com/go-redis/redis/v8/internal" - "github.com/go-redis/redis/v8/internal/consistenthash" "github.com/go-redis/redis/v8/internal/hashtag" "github.com/go-redis/redis/v8/internal/pool" ) -// Hash is type of hash function used in consistent hash. -type Hash consistenthash.Hash - var errRingShardsDown = errors.New("redis: all ring shards are down") +//------------------------------------------------------------------------------ + +type ConsistentHash interface { + Get(string) string +} + +type rendezvousWrapper struct { + *rendezvous.Rendezvous +} + +func (w rendezvousWrapper) Get(key string) string { + return w.Lookup(key) +} + +func newRendezvous(shards []string) ConsistentHash { + return rendezvousWrapper{rendezvous.New(shards, xxhash.Sum64String)} +} + +//------------------------------------------------------------------------------ + // RingOptions are used to configure a ring client and should be // passed to NewRing. type RingOptions struct { // Map of name => host:port addresses of ring shards. Addrs map[string]string + // NewClient creates a shard client with provided name and options. + NewClient func(name string, opt *Options) *Client + // Frequency of PING commands sent to check shards availability. // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration - // Hash function used in consistent hash. - // Default is crc32.ChecksumIEEE. - Hash Hash - - // Number of replicas in consistent hash. - // Default is 100 replicas. + // NewConsistentHash returns a consistent hash that is used + // to distribute keys across the shards. // - // Higher number of replicas will provide less deviation, that is keys will be - // distributed to nodes more evenly. - // - // Following is deviation for common nreplicas: - // -------------------------------------------------------- - // | nreplicas | standard error | 99% confidence interval | - // | 10 | 0.3152 | (0.37, 1.98) | - // | 100 | 0.0997 | (0.76, 1.28) | - // | 1000 | 0.0316 | (0.92, 1.09) | - // -------------------------------------------------------- - // - // See https://arxiv.org/abs/1406.2294 for reference - HashReplicas int - - // NewClient creates a shard client with provided name and options. - NewClient func(name string, opt *Options) *Client + // See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8 + // for consistent hashing algorithmic tradeoffs. + NewConsistentHash func(shards []string) ConsistentHash // Following options are copied from Options struct. OnConnect func(*Conn) error + Username string DB int Password string @@ -79,12 +85,18 @@ type RingOptions struct { } func (opt *RingOptions) init() { + if opt.NewClient == nil { + opt.NewClient = func(name string, opt *Options) *Client { + return NewClient(opt) + } + } + if opt.HeartbeatFrequency == 0 { opt.HeartbeatFrequency = 500 * time.Millisecond } - if opt.HashReplicas == 0 { - opt.HashReplicas = 100 + if opt.NewConsistentHash == nil { + opt.NewConsistentHash = newRendezvous } switch opt.MinRetryBackoff { @@ -127,6 +139,15 @@ type ringShard struct { down int32 } +func newRingShard(opt *RingOptions, name, addr string) *ringShard { + clopt := opt.clientOptions() + clopt.Addr = addr + + return &ringShard{ + Client: opt.NewClient(name, clopt), + } +} + func (shard *ringShard) String() string { var state string if shard.IsUp() { @@ -167,41 +188,59 @@ func (shard *ringShard) Vote(up bool) bool { type ringShards struct { opt *RingOptions - mu sync.RWMutex - hash *consistenthash.Map - shards map[string]*ringShard // read only - list []*ringShard // read only - len int - closed bool + mu sync.RWMutex + hash ConsistentHash + shards map[string]*ringShard // read only + list []*ringShard // read only + numShard int + closed bool } func newRingShards(opt *RingOptions) *ringShards { - return &ringShards{ + shards := make(map[string]*ringShard, len(opt.Addrs)) + list := make([]*ringShard, 0, len(shards)) + + for name, addr := range opt.Addrs { + shard := newRingShard(opt, name, addr) + shards[name] = shard + + list = append(list, shard) + } + + c := &ringShards{ opt: opt, - hash: newConsistentHash(opt), - shards: make(map[string]*ringShard), + shards: shards, + list: list, } -} + c.rebalance() -func (c *ringShards) Add(name string, cl *Client) { - shard := &ringShard{Client: cl} - c.hash.Add(name) - c.shards[name] = shard - c.list = append(c.list, shard) + return c } func (c *ringShards) List() []*ringShard { + var list []*ringShard + c.mu.RLock() - list := c.list + if !c.closed { + list = c.list + } c.mu.RUnlock() + return list } func (c *ringShards) Hash(key string) string { + key = hashtag.Key(key) + + var hash string + c.mu.RLock() - hash := c.hash.Get(key) + if c.numShard > 0 { + hash = c.hash.Get(key) + } c.mu.RUnlock() + return hash } @@ -215,6 +254,11 @@ func (c *ringShards) GetByKey(key string) (*ringShard, error) { return nil, pool.ErrClosed } + if c.numShard == 0 { + c.mu.RUnlock() + return nil, errRingShardsDown + } + hash := c.hash.Get(key) if hash == "" { c.mu.RUnlock() @@ -227,13 +271,13 @@ func (c *ringShards) GetByKey(key string) (*ringShard, error) { return shard, nil } -func (c *ringShards) GetByHash(name string) (*ringShard, error) { - if name == "" { +func (c *ringShards) GetByName(shardName string) (*ringShard, error) { + if shardName == "" { return c.Random() } c.mu.RLock() - shard := c.shards[name] + shard := c.shards[shardName] c.mu.RUnlock() return shard, nil } @@ -247,23 +291,14 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop() - ctx := context.TODO() + ctx := context.Background() for range ticker.C { var rebalance bool - c.mu.RLock() - - if c.closed { - c.mu.RUnlock() - break - } - - shards := c.list - c.mu.RUnlock() - - for _, shard := range shards { + for _, shard := range c.List() { err := shard.Client.Ping(ctx).Err() - if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { + isUp := err == nil || err == pool.ErrPoolTimeout + if shard.Vote(isUp) { internal.Logger.Printf("ring shard state changed: %s", shard) rebalance = true } @@ -281,24 +316,25 @@ func (c *ringShards) rebalance() { shards := c.shards c.mu.RUnlock() - hash := newConsistentHash(c.opt) - var shardsNum int + liveShards := make([]string, 0, len(shards)) + for name, shard := range shards { if shard.IsUp() { - hash.Add(name) - shardsNum++ + liveShards = append(liveShards, name) } } + hash := c.opt.NewConsistentHash(liveShards) + c.mu.Lock() c.hash = hash - c.len = shardsNum + c.numShard = len(liveShards) c.mu.Unlock() } func (c *ringShards) Len() int { c.mu.RLock() - l := c.len + l := c.numShard c.mu.RUnlock() return l } @@ -364,29 +400,15 @@ func NewRing(opt *RingOptions) *Ring { }, ctx: context.Background(), } + ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdable = ring.Process - for name, addr := range opt.Addrs { - shard := newRingShard(opt, name, addr) - ring.shards.Add(name, shard) - } - go ring.shards.Heartbeat(opt.HeartbeatFrequency) return &ring } -func newRingShard(opt *RingOptions, name, addr string) *Client { - clopt := opt.clientOptions() - clopt.Addr = addr - - if opt.NewClient != nil { - return opt.NewClient(name, clopt) - } - return NewClient(clopt) -} - func (c *Ring) Context() context.Context { return c.ctx } @@ -623,7 +645,7 @@ func (c *Ring) generalProcessPipeline( cmdInfo := c.cmdInfo(cmd.Name()) hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) if hash != "" { - hash = c.shards.Hash(hashtag.Key(hash)) + hash = c.shards.Hash(hash) } cmdsMap[hash] = append(cmdsMap[hash], cmd) } @@ -646,7 +668,7 @@ func (c *Ring) processShardPipeline( ctx context.Context, hash string, cmds []Cmder, tx bool, ) error { //TODO: retry? - shard, err := c.shards.GetByHash(hash) + shard, err := c.shards.GetByName(hash) if err != nil { setCmdsErr(cmds, err) return err @@ -700,7 +722,3 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er return shards[0].Client.Watch(ctx, fn, keys...) } - -func newConsistentHash(opt *RingOptions) *consistenthash.Map { - return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) -} diff --git a/ring_test.go b/ring_test.go index ba48382b..0017aed6 100644 --- a/ring_test.go +++ b/ring_test.go @@ -54,8 +54,8 @@ var _ = Describe("Redis Ring", func() { setRingKeys() // Both shards should have some keys now. - Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=57")) - Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=43")) + Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56")) + Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44")) }) It("distributes keys when using EVAL", func() { @@ -71,8 +71,8 @@ var _ = Describe("Redis Ring", func() { Expect(err).NotTo(HaveOccurred()) } - Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=57")) - Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=43")) + Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56")) + Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44")) }) It("uses single shard when one of the shards is down", func() { @@ -100,7 +100,7 @@ var _ = Describe("Redis Ring", func() { setRingKeys() // RingShard2 should have its keys. - Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=43")) + Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44")) }) It("supports hash tags", func() { @@ -131,8 +131,8 @@ var _ = Describe("Redis Ring", func() { } // Both shards should have some keys now. - Expect(ringShard1.Info(ctx).Val()).To(ContainSubstring("keys=57")) - Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=43")) + Expect(ringShard1.Info(ctx).Val()).To(ContainSubstring("keys=56")) + Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=44")) }) It("is consistent with ring", func() { @@ -427,22 +427,22 @@ var _ = Describe("Ring watch", func() { It("should discard", func() { err := ring.Watch(ctx, func(tx *redis.Tx) error { cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - pipe.Set(ctx, "key1", "hello1", 0) + pipe.Set(ctx, "{shard}key1", "hello1", 0) pipe.Discard() - pipe.Set(ctx, "key2", "hello2", 0) + pipe.Set(ctx, "{shard}key2", "hello2", 0) return nil }) Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(1)) return err - }, "key1", "key2") + }, "{shard}key1", "{shard}key2") Expect(err).NotTo(HaveOccurred()) - get := ring.Get(ctx, "key1") + get := ring.Get(ctx, "{shard}key1") Expect(get.Err()).To(Equal(redis.Nil)) Expect(get.Val()).To(Equal("")) - get = ring.Get(ctx, "key2") + get = ring.Get(ctx, "{shard}key2") Expect(get.Err()).NotTo(HaveOccurred()) Expect(get.Val()).To(Equal("hello2")) })