forked from mirror/redis
Merge pull request #127 from go-redis/fix/ring-keys-hashing
ring: fix key hashing in Ring pipeline.
This commit is contained in:
commit
2cfe5df7d2
|
@ -326,7 +326,7 @@ const hashSlots = 16384
|
||||||
func hashKey(key string) string {
|
func hashKey(key string) string {
|
||||||
if s := strings.IndexByte(key, '{'); s > -1 {
|
if s := strings.IndexByte(key, '{'); s > -1 {
|
||||||
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
||||||
key = key[s+1 : s+e+1]
|
return key[s+1 : s+e+1]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return key
|
return key
|
||||||
|
|
28
ring.go
28
ring.go
|
@ -150,23 +150,19 @@ func (ring *Ring) getClient(key string) (*Client, error) {
|
||||||
return nil, errClosed
|
return nil, errClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
name := ring.hash.Get(key)
|
name := ring.hash.Get(hashKey(key))
|
||||||
if name == "" {
|
if name == "" {
|
||||||
ring.mx.RUnlock()
|
ring.mx.RUnlock()
|
||||||
return nil, errRingShardsDown
|
return nil, errRingShardsDown
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard, ok := ring.shards[name]; ok {
|
cl := ring.shards[name].Client
|
||||||
ring.mx.RUnlock()
|
|
||||||
return shard.Client, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ring.mx.RUnlock()
|
ring.mx.RUnlock()
|
||||||
return nil, errRingShardsDown
|
return cl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ring *Ring) process(cmd Cmder) {
|
func (ring *Ring) process(cmd Cmder) {
|
||||||
cl, err := ring.getClient(hashKey(cmd.clusterKey()))
|
cl, err := ring.getClient(cmd.clusterKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
return
|
return
|
||||||
|
@ -299,7 +295,11 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
|
||||||
|
|
||||||
cmdsMap := make(map[string][]Cmder)
|
cmdsMap := make(map[string][]Cmder)
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
name := pipe.ring.hash.Get(cmd.clusterKey())
|
name := pipe.ring.hash.Get(hashKey(cmd.clusterKey()))
|
||||||
|
if name == "" {
|
||||||
|
cmd.setErr(errRingShardsDown)
|
||||||
|
continue
|
||||||
|
}
|
||||||
cmdsMap[name] = append(cmdsMap[name], cmd)
|
cmdsMap[name] = append(cmdsMap[name], cmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,15 +307,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
|
||||||
failedCmdsMap := make(map[string][]Cmder)
|
failedCmdsMap := make(map[string][]Cmder)
|
||||||
|
|
||||||
for name, cmds := range cmdsMap {
|
for name, cmds := range cmdsMap {
|
||||||
client, err := pipe.ring.getClient(name)
|
client := pipe.ring.shards[name].Client
|
||||||
if err != nil {
|
|
||||||
setCmdsErr(cmds, err)
|
|
||||||
if retErr == nil {
|
|
||||||
retErr = err
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
cn, err := client.conn()
|
cn, err := client.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
|
|
48
ring_test.go
48
ring_test.go
|
@ -1,6 +1,7 @@
|
||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -23,8 +24,8 @@ var _ = Describe("Redis ring", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
ring = redis.NewRing(&redis.RingOptions{
|
ring = redis.NewRing(&redis.RingOptions{
|
||||||
Addrs: map[string]string{
|
Addrs: map[string]string{
|
||||||
"ringShard1": ":" + ringShard1Port,
|
"ringShardOne": ":" + ringShard1Port,
|
||||||
"ringShard2": ":" + ringShard2Port,
|
"ringShardTwo": ":" + ringShard2Port,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -82,6 +83,16 @@ var _ = Describe("Redis ring", func() {
|
||||||
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
|
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("supports hash tags", func() {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
|
||||||
|
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
|
||||||
|
})
|
||||||
|
|
||||||
Describe("pipelining", func() {
|
Describe("pipelining", func() {
|
||||||
It("uses both shards", func() {
|
It("uses both shards", func() {
|
||||||
pipe := ring.Pipeline()
|
pipe := ring.Pipeline()
|
||||||
|
@ -104,16 +115,41 @@ var _ = Describe("Redis ring", func() {
|
||||||
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
|
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("is consistent", func() {
|
It("is consistent with ring", func() {
|
||||||
|
var keys []string
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
key := make([]byte, 64)
|
||||||
|
_, err := rand.Read(key)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
keys = append(keys, string(key))
|
||||||
|
}
|
||||||
|
|
||||||
_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
|
_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
|
||||||
pipe.Set("mykey", "pipeline", 0)
|
for _, key := range keys {
|
||||||
|
pipe.Set(key, "value", 0).Err()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
val, err := ring.Get("mykey").Result()
|
for _, key := range keys {
|
||||||
|
val, err := ring.Get(key).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).To(Equal("value"))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
It("supports hash tags", func() {
|
||||||
|
_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).To(Equal("pipeline"))
|
|
||||||
|
Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
|
||||||
|
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue