From 0bf3759a6dbf6f0fbb93e5c4d625a9fd638d0769 Mon Sep 17 00:00:00 2001 From: Anatolii Mihailenco Date: Wed, 30 Dec 2015 15:53:45 +0200 Subject: [PATCH] Create hashtag package. --- cluster.go | 34 ++++--------------- cluster_pipeline.go | 6 +++- cluster_test.go | 13 +++---- export_test.go | 4 --- crc16.go => internal/hashtag/hashtag.go | 28 ++++++++++++++- .../hashtag/hashtag_test.go | 2 +- ring.go | 5 +-- 7 files changed, 49 insertions(+), 43 deletions(-) rename crc16.go => internal/hashtag/hashtag.go (83%) rename crc16_test.go => internal/hashtag/hashtag_test.go (96%) diff --git a/cluster.go b/cluster.go index 418844e7..2015a85e 100644 --- a/cluster.go +++ b/cluster.go @@ -3,10 +3,11 @@ package redis import ( "log" "math/rand" - "strings" "sync" "sync/atomic" "time" + + "gopkg.in/redis.v3/internal/hashtag" ) // ClusterClient is a Redis Cluster client representing a pool of zero @@ -34,7 +35,7 @@ type ClusterClient struct { func NewClusterClient(opt *ClusterOptions) *ClusterClient { client := &ClusterClient{ addrs: opt.Addrs, - slots: make([][]string, hashSlots), + slots: make([][]string, hashtag.SlotNumber), clients: make(map[string]*Client), opt: opt, } @@ -47,7 +48,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { // Watch creates new transaction and marks the keys to be watched // for conditional execution of a transaction. func (c *ClusterClient) Watch(keys ...string) (*Multi, error) { - addr := c.slotMasterAddr(hashSlot(keys[0])) + addr := c.slotMasterAddr(hashtag.Slot(keys[0])) client, err := c.getClient(addr) if err != nil { return nil, err @@ -138,7 +139,7 @@ func (c *ClusterClient) randomClient() (client *Client, err error) { func (c *ClusterClient) process(cmd Cmder) { var ask bool - slot := hashSlot(cmd.clusterKey()) + slot := hashtag.Slot(cmd.clusterKey()) addr := c.slotMasterAddr(slot) client, err := c.getClient(addr) @@ -215,7 +216,7 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { seen[addr] = struct{}{} } - for i := 0; i < hashSlots; i++ { + for i := 0; i < hashtag.SlotNumber; i++ { c.slots[i] = c.slots[i][:0] } for _, info := range slots { @@ -333,26 +334,3 @@ func (opt *ClusterOptions) clientOptions() *Options { IdleTimeout: opt.IdleTimeout, } } - -//------------------------------------------------------------------------------ - -const hashSlots = 16384 - -func hashKey(key string) string { - if s := strings.IndexByte(key, '{'); s > -1 { - if e := strings.IndexByte(key[s+1:], '}'); e > 0 { - return key[s+1 : s+e+1] - } - } - return key -} - -// hashSlot returns a consistent slot number between 0 and 16383 -// for any given string key. -func hashSlot(key string) int { - key = hashKey(key) - if key == "" { - return rand.Intn(hashSlots) - } - return int(crc16sum(key)) % hashSlots -} diff --git a/cluster_pipeline.go b/cluster_pipeline.go index eb5cd2d0..73d272bb 100644 --- a/cluster_pipeline.go +++ b/cluster_pipeline.go @@ -1,5 +1,9 @@ package redis +import ( + "gopkg.in/redis.v3/internal/hashtag" +) + // ClusterPipeline is not thread-safe. type ClusterPipeline struct { commandable @@ -48,7 +52,7 @@ func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { - slot := hashSlot(cmd.clusterKey()) + slot := hashtag.Slot(cmd.clusterKey()) addr := pipe.cluster.slotMasterAddr(slot) cmdsMap[addr] = append(cmdsMap[addr], cmd) } diff --git a/cluster_test.go b/cluster_test.go index fe004c96..8f6b5e64 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -16,6 +16,7 @@ import ( . "github.com/onsi/gomega" "gopkg.in/redis.v3" + "gopkg.in/redis.v3/internal/hashtag" ) type clusterScenario struct { @@ -182,7 +183,7 @@ var _ = Describe("Cluster", func() { rand.Seed(100) for _, test := range tests { - Expect(redis.HashSlot(test.key)).To(Equal(test.slot), "for %s", test.key) + Expect(hashtag.Slot(test.key)).To(Equal(test.slot), "for %s", test.key) } }) @@ -198,7 +199,7 @@ var _ = Describe("Cluster", func() { } for _, test := range tests { - Expect(redis.HashSlot(test.one)).To(Equal(redis.HashSlot(test.two)), "for %s <-> %s", test.one, test.two) + Expect(hashtag.Slot(test.one)).To(Equal(hashtag.Slot(test.two)), "for %s <-> %s", test.one, test.two) } }) @@ -232,7 +233,7 @@ var _ = Describe("Cluster", func() { It("should CLUSTER KEYSLOT", func() { hashSlot, err := cluster.primary().ClusterKeySlot("somekey").Result() Expect(err).NotTo(HaveOccurred()) - Expect(hashSlot).To(Equal(int64(11058))) + Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey")))) }) It("should CLUSTER COUNT-FAILURE-REPORTS", func() { @@ -315,7 +316,7 @@ var _ = Describe("Cluster", func() { It("should follow redirects", func() { Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) - slot := redis.HashSlot("A") + slot := hashtag.Slot("A") Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) val, err := client.Get("A").Result() @@ -328,7 +329,7 @@ var _ = Describe("Cluster", func() { }) It("should perform multi-pipelines", func() { - slot := redis.HashSlot("A") + slot := hashtag.Slot("A") Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) pipe := client.Pipeline() @@ -361,7 +362,7 @@ var _ = Describe("Cluster", func() { MaxRedirects: -1, }) - slot := redis.HashSlot("A") + slot := hashtag.Slot("A") Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) err := client.Get("A").Err() diff --git a/export_test.go b/export_test.go index 66ccec25..e7b4b056 100644 --- a/export_test.go +++ b/export_test.go @@ -11,7 +11,3 @@ var NewConnDialer = newConnDialer func (cn *conn) SetNetConn(netcn net.Conn) { cn.netcn = netcn } - -func HashSlot(key string) int { - return hashSlot(key) -} diff --git a/crc16.go b/internal/hashtag/hashtag.go similarity index 83% rename from crc16.go rename to internal/hashtag/hashtag.go index a7f3b569..2866488e 100644 --- a/crc16.go +++ b/internal/hashtag/hashtag.go @@ -1,4 +1,11 @@ -package redis +package hashtag + +import ( + "math/rand" + "strings" +) + +const SlotNumber = 16384 // CRC16 implementation according to CCITT standards. // Copyright 2001-2010 Georges Menie (www.menie.org) @@ -39,6 +46,25 @@ var crc16tab = [256]uint16{ 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, } +func Key(key string) string { + if s := strings.IndexByte(key, '{'); s > -1 { + if e := strings.IndexByte(key[s+1:], '}'); e > 0 { + return key[s+1 : s+e+1] + } + } + return key +} + +// hashSlot returns a consistent slot number between 0 and 16383 +// for any given string key. +func Slot(key string) int { + key = Key(key) + if key == "" { + return rand.Intn(SlotNumber) + } + return int(crc16sum(key)) % SlotNumber +} + func crc16sum(key string) (crc uint16) { for i := 0; i < len(key); i++ { crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff] diff --git a/crc16_test.go b/internal/hashtag/hashtag_test.go similarity index 96% rename from crc16_test.go rename to internal/hashtag/hashtag_test.go index a6b34162..8132878d 100644 --- a/crc16_test.go +++ b/internal/hashtag/hashtag_test.go @@ -1,4 +1,4 @@ -package redis +package hashtag import ( . "github.com/onsi/ginkgo" diff --git a/ring.go b/ring.go index ff77f4d6..d6bcf805 100644 --- a/ring.go +++ b/ring.go @@ -8,6 +8,7 @@ import ( "time" "gopkg.in/redis.v3/internal/consistenthash" + "gopkg.in/redis.v3/internal/hashtag" ) var ( @@ -151,7 +152,7 @@ func (ring *Ring) getClient(key string) (*Client, error) { return nil, errClosed } - name := ring.hash.Get(hashKey(key)) + name := ring.hash.Get(hashtag.Key(key)) if name == "" { ring.mx.RUnlock() return nil, errRingShardsDown @@ -297,7 +298,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { - name := pipe.ring.hash.Get(hashKey(cmd.clusterKey())) + name := pipe.ring.hash.Get(hashtag.Key(cmd.clusterKey())) if name == "" { cmd.setErr(errRingShardsDown) if retErr == nil {