From 1608a33e551385996c40013451d22c70d6d732d5 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 24 Jun 2015 15:37:41 +0300 Subject: [PATCH] ring: fix key hashing in Ring pipeline. --- cluster.go | 2 +- ring.go | 28 ++++++++++------------------ ring_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/cluster.go b/cluster.go index 99d3eeb..cbf00b2 100644 --- a/cluster.go +++ b/cluster.go @@ -326,7 +326,7 @@ const hashSlots = 16384 func hashKey(key string) string { if s := strings.IndexByte(key, '{'); s > -1 { if e := strings.IndexByte(key[s+1:], '}'); e > 0 { - key = key[s+1 : s+e+1] + return key[s+1 : s+e+1] } } return key diff --git a/ring.go b/ring.go index 4b620da..26b06b6 100644 --- a/ring.go +++ b/ring.go @@ -150,23 +150,19 @@ func (ring *Ring) getClient(key string) (*Client, error) { return nil, errClosed } - name := ring.hash.Get(key) + name := ring.hash.Get(hashKey(key)) if name == "" { ring.mx.RUnlock() return nil, errRingShardsDown } - if shard, ok := ring.shards[name]; ok { - ring.mx.RUnlock() - return shard.Client, nil - } - + cl := ring.shards[name].Client ring.mx.RUnlock() - return nil, errRingShardsDown + return cl, nil } func (ring *Ring) process(cmd Cmder) { - cl, err := ring.getClient(hashKey(cmd.clusterKey())) + cl, err := ring.getClient(cmd.clusterKey()) if err != nil { cmd.setErr(err) return @@ -299,7 +295,11 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { - name := pipe.ring.hash.Get(cmd.clusterKey()) + name := pipe.ring.hash.Get(hashKey(cmd.clusterKey())) + if name == "" { + cmd.setErr(errRingShardsDown) + continue + } cmdsMap[name] = append(cmdsMap[name], cmd) } @@ -307,15 +307,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) { failedCmdsMap := make(map[string][]Cmder) for name, cmds := range cmdsMap { - client, err := pipe.ring.getClient(name) - if err != nil { - setCmdsErr(cmds, err) - if retErr == nil { - retErr = err - } - continue - } - + client := pipe.ring.shards[name].Client cn, err := client.conn() if err != nil { setCmdsErr(cmds, err) diff --git a/ring_test.go b/ring_test.go index b77e39f..117a973 100644 --- a/ring_test.go +++ b/ring_test.go @@ -1,6 +1,7 @@ package redis_test import ( + "crypto/rand" "fmt" "time" @@ -23,8 +24,8 @@ var _ = Describe("Redis ring", func() { BeforeEach(func() { ring = redis.NewRing(&redis.RingOptions{ Addrs: map[string]string{ - "ringShard1": ":" + ringShard1Port, - "ringShard2": ":" + ringShard2Port, + "ringShardOne": ":" + ringShard1Port, + "ringShardTwo": ":" + ringShard2Port, }, }) @@ -82,6 +83,16 @@ var _ = Describe("Redis ring", func() { Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) }) + It("supports hash tags", func() { + for i := 0; i < 100; i++ { + err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + + Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys=")) + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100")) + }) + Describe("pipelining", func() { It("uses both shards", func() { pipe := ring.Pipeline() @@ -104,16 +115,41 @@ var _ = Describe("Redis ring", func() { Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) }) - It("is consistent", func() { + It("is consistent with ring", func() { + var keys []string + for i := 0; i < 100; i++ { + key := make([]byte, 64) + _, err := rand.Read(key) + Expect(err).NotTo(HaveOccurred()) + keys = append(keys, string(key)) + } + _, err := ring.Pipelined(func(pipe *redis.RingPipeline) error { - pipe.Set("mykey", "pipeline", 0) + for _, key := range keys { + pipe.Set(key, "value", 0).Err() + } return nil }) Expect(err).NotTo(HaveOccurred()) - val, err := ring.Get("mykey").Result() + for _, key := range keys { + val, err := ring.Get(key).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value")) + } + }) + + It("supports hash tags", func() { + _, err := ring.Pipelined(func(pipe *redis.RingPipeline) error { + for i := 0; i < 100; i++ { + pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err() + } + return nil + }) Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("pipeline")) + + Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys=")) + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100")) }) }) })