ring: fix key hashing in Ring pipeline.

This commit is contained in:
Vladimir Mihailenco 2015-06-24 15:37:41 +03:00
parent a43e6ec46d
commit 1608a33e55
3 changed files with 53 additions and 25 deletions

View File

@ -326,7 +326,7 @@ const hashSlots = 16384
func hashKey(key string) string { func hashKey(key string) string {
if s := strings.IndexByte(key, '{'); s > -1 { if s := strings.IndexByte(key, '{'); s > -1 {
if e := strings.IndexByte(key[s+1:], '}'); e > 0 { if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
key = key[s+1 : s+e+1] return key[s+1 : s+e+1]
} }
} }
return key return key

28
ring.go
View File

@ -150,23 +150,19 @@ func (ring *Ring) getClient(key string) (*Client, error) {
return nil, errClosed return nil, errClosed
} }
name := ring.hash.Get(key) name := ring.hash.Get(hashKey(key))
if name == "" { if name == "" {
ring.mx.RUnlock() ring.mx.RUnlock()
return nil, errRingShardsDown return nil, errRingShardsDown
} }
if shard, ok := ring.shards[name]; ok { cl := ring.shards[name].Client
ring.mx.RUnlock()
return shard.Client, nil
}
ring.mx.RUnlock() ring.mx.RUnlock()
return nil, errRingShardsDown return cl, nil
} }
func (ring *Ring) process(cmd Cmder) { func (ring *Ring) process(cmd Cmder) {
cl, err := ring.getClient(hashKey(cmd.clusterKey())) cl, err := ring.getClient(cmd.clusterKey())
if err != nil { if err != nil {
cmd.setErr(err) cmd.setErr(err)
return return
@ -299,7 +295,11 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
cmdsMap := make(map[string][]Cmder) cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds { for _, cmd := range cmds {
name := pipe.ring.hash.Get(cmd.clusterKey()) name := pipe.ring.hash.Get(hashKey(cmd.clusterKey()))
if name == "" {
cmd.setErr(errRingShardsDown)
continue
}
cmdsMap[name] = append(cmdsMap[name], cmd) cmdsMap[name] = append(cmdsMap[name], cmd)
} }
@ -307,15 +307,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
failedCmdsMap := make(map[string][]Cmder) failedCmdsMap := make(map[string][]Cmder)
for name, cmds := range cmdsMap { for name, cmds := range cmdsMap {
client, err := pipe.ring.getClient(name) client := pipe.ring.shards[name].Client
if err != nil {
setCmdsErr(cmds, err)
if retErr == nil {
retErr = err
}
continue
}
cn, err := client.conn() cn, err := client.conn()
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)

View File

@ -1,6 +1,7 @@
package redis_test package redis_test
import ( import (
"crypto/rand"
"fmt" "fmt"
"time" "time"
@ -23,8 +24,8 @@ var _ = Describe("Redis ring", func() {
BeforeEach(func() { BeforeEach(func() {
ring = redis.NewRing(&redis.RingOptions{ ring = redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{ Addrs: map[string]string{
"ringShard1": ":" + ringShard1Port, "ringShardOne": ":" + ringShard1Port,
"ringShard2": ":" + ringShard2Port, "ringShardTwo": ":" + ringShard2Port,
}, },
}) })
@ -82,6 +83,16 @@ var _ = Describe("Redis ring", func() {
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
}) })
It("supports hash tags", func() {
for i := 0; i < 100; i++ {
err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
})
Describe("pipelining", func() { Describe("pipelining", func() {
It("uses both shards", func() { It("uses both shards", func() {
pipe := ring.Pipeline() pipe := ring.Pipeline()
@ -104,16 +115,41 @@ var _ = Describe("Redis ring", func() {
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
}) })
It("is consistent", func() { It("is consistent with ring", func() {
var keys []string
for i := 0; i < 100; i++ {
key := make([]byte, 64)
_, err := rand.Read(key)
Expect(err).NotTo(HaveOccurred())
keys = append(keys, string(key))
}
_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error { _, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
pipe.Set("mykey", "pipeline", 0) for _, key := range keys {
pipe.Set(key, "value", 0).Err()
}
return nil return nil
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
val, err := ring.Get("mykey").Result() for _, key := range keys {
val, err := ring.Get(key).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("value"))
}
})
It("supports hash tags", func() {
_, err := ring.Pipelined(func(pipe *redis.RingPipeline) error {
for i := 0; i < 100; i++ {
pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
}
return nil
})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("pipeline"))
Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
}) })
}) })
}) })