mirror of https://github.com/go-redis/redis.git
Merge pull request #232 from anmic/feature/hash-tags
Create hashtag package.
This commit is contained in:
commit
8a0be38c7e
34
cluster.go
34
cluster.go
|
@ -3,10 +3,11 @@ package redis
|
|||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gopkg.in/redis.v3/internal/hashtag"
|
||||
)
|
||||
|
||||
// ClusterClient is a Redis Cluster client representing a pool of zero
|
||||
|
@ -34,7 +35,7 @@ type ClusterClient struct {
|
|||
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
||||
client := &ClusterClient{
|
||||
addrs: opt.Addrs,
|
||||
slots: make([][]string, hashSlots),
|
||||
slots: make([][]string, hashtag.SlotNumber),
|
||||
clients: make(map[string]*Client),
|
||||
opt: opt,
|
||||
}
|
||||
|
@ -47,7 +48,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
|||
// Watch creates new transaction and marks the keys to be watched
|
||||
// for conditional execution of a transaction.
|
||||
func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
|
||||
addr := c.slotMasterAddr(hashSlot(keys[0]))
|
||||
addr := c.slotMasterAddr(hashtag.Slot(keys[0]))
|
||||
client, err := c.getClient(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -138,7 +139,7 @@ func (c *ClusterClient) randomClient() (client *Client, err error) {
|
|||
func (c *ClusterClient) process(cmd Cmder) {
|
||||
var ask bool
|
||||
|
||||
slot := hashSlot(cmd.clusterKey())
|
||||
slot := hashtag.Slot(cmd.clusterKey())
|
||||
|
||||
addr := c.slotMasterAddr(slot)
|
||||
client, err := c.getClient(addr)
|
||||
|
@ -215,7 +216,7 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
|||
seen[addr] = struct{}{}
|
||||
}
|
||||
|
||||
for i := 0; i < hashSlots; i++ {
|
||||
for i := 0; i < hashtag.SlotNumber; i++ {
|
||||
c.slots[i] = c.slots[i][:0]
|
||||
}
|
||||
for _, info := range slots {
|
||||
|
@ -333,26 +334,3 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
|||
IdleTimeout: opt.IdleTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
const hashSlots = 16384
|
||||
|
||||
func hashKey(key string) string {
|
||||
if s := strings.IndexByte(key, '{'); s > -1 {
|
||||
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
||||
return key[s+1 : s+e+1]
|
||||
}
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// hashSlot returns a consistent slot number between 0 and 16383
|
||||
// for any given string key.
|
||||
func hashSlot(key string) int {
|
||||
key = hashKey(key)
|
||||
if key == "" {
|
||||
return rand.Intn(hashSlots)
|
||||
}
|
||||
return int(crc16sum(key)) % hashSlots
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"gopkg.in/redis.v3/internal/hashtag"
|
||||
)
|
||||
|
||||
// ClusterPipeline is not thread-safe.
|
||||
type ClusterPipeline struct {
|
||||
commandable
|
||||
|
@ -48,7 +52,7 @@ func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
|
|||
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
slot := hashSlot(cmd.clusterKey())
|
||||
slot := hashtag.Slot(cmd.clusterKey())
|
||||
addr := pipe.cluster.slotMasterAddr(slot)
|
||||
cmdsMap[addr] = append(cmdsMap[addr], cmd)
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
. "github.com/onsi/gomega"
|
||||
|
||||
"gopkg.in/redis.v3"
|
||||
"gopkg.in/redis.v3/internal/hashtag"
|
||||
)
|
||||
|
||||
type clusterScenario struct {
|
||||
|
@ -182,7 +183,7 @@ var _ = Describe("Cluster", func() {
|
|||
rand.Seed(100)
|
||||
|
||||
for _, test := range tests {
|
||||
Expect(redis.HashSlot(test.key)).To(Equal(test.slot), "for %s", test.key)
|
||||
Expect(hashtag.Slot(test.key)).To(Equal(test.slot), "for %s", test.key)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -198,7 +199,7 @@ var _ = Describe("Cluster", func() {
|
|||
}
|
||||
|
||||
for _, test := range tests {
|
||||
Expect(redis.HashSlot(test.one)).To(Equal(redis.HashSlot(test.two)), "for %s <-> %s", test.one, test.two)
|
||||
Expect(hashtag.Slot(test.one)).To(Equal(hashtag.Slot(test.two)), "for %s <-> %s", test.one, test.two)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -232,7 +233,7 @@ var _ = Describe("Cluster", func() {
|
|||
It("should CLUSTER KEYSLOT", func() {
|
||||
hashSlot, err := cluster.primary().ClusterKeySlot("somekey").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(hashSlot).To(Equal(int64(11058)))
|
||||
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
|
||||
})
|
||||
|
||||
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
|
||||
|
@ -315,7 +316,7 @@ var _ = Describe("Cluster", func() {
|
|||
It("should follow redirects", func() {
|
||||
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
|
||||
|
||||
slot := redis.HashSlot("A")
|
||||
slot := hashtag.Slot("A")
|
||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||
|
||||
val, err := client.Get("A").Result()
|
||||
|
@ -328,7 +329,7 @@ var _ = Describe("Cluster", func() {
|
|||
})
|
||||
|
||||
It("should perform multi-pipelines", func() {
|
||||
slot := redis.HashSlot("A")
|
||||
slot := hashtag.Slot("A")
|
||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||
|
||||
pipe := client.Pipeline()
|
||||
|
@ -361,7 +362,7 @@ var _ = Describe("Cluster", func() {
|
|||
MaxRedirects: -1,
|
||||
})
|
||||
|
||||
slot := redis.HashSlot("A")
|
||||
slot := hashtag.Slot("A")
|
||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||
|
||||
err := client.Get("A").Err()
|
||||
|
|
|
@ -11,7 +11,3 @@ var NewConnDialer = newConnDialer
|
|||
func (cn *conn) SetNetConn(netcn net.Conn) {
|
||||
cn.netcn = netcn
|
||||
}
|
||||
|
||||
func HashSlot(key string) int {
|
||||
return hashSlot(key)
|
||||
}
|
||||
|
|
|
@ -1,4 +1,11 @@
|
|||
package redis
|
||||
package hashtag
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const SlotNumber = 16384
|
||||
|
||||
// CRC16 implementation according to CCITT standards.
|
||||
// Copyright 2001-2010 Georges Menie (www.menie.org)
|
||||
|
@ -39,6 +46,25 @@ var crc16tab = [256]uint16{
|
|||
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
|
||||
}
|
||||
|
||||
func Key(key string) string {
|
||||
if s := strings.IndexByte(key, '{'); s > -1 {
|
||||
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
||||
return key[s+1 : s+e+1]
|
||||
}
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// hashSlot returns a consistent slot number between 0 and 16383
|
||||
// for any given string key.
|
||||
func Slot(key string) int {
|
||||
key = Key(key)
|
||||
if key == "" {
|
||||
return rand.Intn(SlotNumber)
|
||||
}
|
||||
return int(crc16sum(key)) % SlotNumber
|
||||
}
|
||||
|
||||
func crc16sum(key string) (crc uint16) {
|
||||
for i := 0; i < len(key); i++ {
|
||||
crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
|
|
@ -1,4 +1,4 @@
|
|||
package redis
|
||||
package hashtag
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
5
ring.go
5
ring.go
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"gopkg.in/redis.v3/internal/consistenthash"
|
||||
"gopkg.in/redis.v3/internal/hashtag"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -151,7 +152,7 @@ func (ring *Ring) getClient(key string) (*Client, error) {
|
|||
return nil, errClosed
|
||||
}
|
||||
|
||||
name := ring.hash.Get(hashKey(key))
|
||||
name := ring.hash.Get(hashtag.Key(key))
|
||||
if name == "" {
|
||||
ring.mx.RUnlock()
|
||||
return nil, errRingShardsDown
|
||||
|
@ -297,7 +298,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
|
|||
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
name := pipe.ring.hash.Get(hashKey(cmd.clusterKey()))
|
||||
name := pipe.ring.hash.Get(hashtag.Key(cmd.clusterKey()))
|
||||
if name == "" {
|
||||
cmd.setErr(errRingShardsDown)
|
||||
if retErr == nil {
|
||||
|
|
Loading…
Reference in New Issue