forked from mirror/redis
Create hashtag package.
This commit is contained in:
parent
cbc5360e78
commit
0bf3759a6d
34
cluster.go
34
cluster.go
|
@ -3,10 +3,11 @@ package redis
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/redis.v3/internal/hashtag"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClusterClient is a Redis Cluster client representing a pool of zero
|
// ClusterClient is a Redis Cluster client representing a pool of zero
|
||||||
|
@ -34,7 +35,7 @@ type ClusterClient struct {
|
||||||
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
||||||
client := &ClusterClient{
|
client := &ClusterClient{
|
||||||
addrs: opt.Addrs,
|
addrs: opt.Addrs,
|
||||||
slots: make([][]string, hashSlots),
|
slots: make([][]string, hashtag.SlotNumber),
|
||||||
clients: make(map[string]*Client),
|
clients: make(map[string]*Client),
|
||||||
opt: opt,
|
opt: opt,
|
||||||
}
|
}
|
||||||
|
@ -47,7 +48,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
||||||
// Watch creates new transaction and marks the keys to be watched
|
// Watch creates new transaction and marks the keys to be watched
|
||||||
// for conditional execution of a transaction.
|
// for conditional execution of a transaction.
|
||||||
func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
|
func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
|
||||||
addr := c.slotMasterAddr(hashSlot(keys[0]))
|
addr := c.slotMasterAddr(hashtag.Slot(keys[0]))
|
||||||
client, err := c.getClient(addr)
|
client, err := c.getClient(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -138,7 +139,7 @@ func (c *ClusterClient) randomClient() (client *Client, err error) {
|
||||||
func (c *ClusterClient) process(cmd Cmder) {
|
func (c *ClusterClient) process(cmd Cmder) {
|
||||||
var ask bool
|
var ask bool
|
||||||
|
|
||||||
slot := hashSlot(cmd.clusterKey())
|
slot := hashtag.Slot(cmd.clusterKey())
|
||||||
|
|
||||||
addr := c.slotMasterAddr(slot)
|
addr := c.slotMasterAddr(slot)
|
||||||
client, err := c.getClient(addr)
|
client, err := c.getClient(addr)
|
||||||
|
@ -215,7 +216,7 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
||||||
seen[addr] = struct{}{}
|
seen[addr] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < hashSlots; i++ {
|
for i := 0; i < hashtag.SlotNumber; i++ {
|
||||||
c.slots[i] = c.slots[i][:0]
|
c.slots[i] = c.slots[i][:0]
|
||||||
}
|
}
|
||||||
for _, info := range slots {
|
for _, info := range slots {
|
||||||
|
@ -333,26 +334,3 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
const hashSlots = 16384
|
|
||||||
|
|
||||||
func hashKey(key string) string {
|
|
||||||
if s := strings.IndexByte(key, '{'); s > -1 {
|
|
||||||
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
|
||||||
return key[s+1 : s+e+1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return key
|
|
||||||
}
|
|
||||||
|
|
||||||
// hashSlot returns a consistent slot number between 0 and 16383
|
|
||||||
// for any given string key.
|
|
||||||
func hashSlot(key string) int {
|
|
||||||
key = hashKey(key)
|
|
||||||
if key == "" {
|
|
||||||
return rand.Intn(hashSlots)
|
|
||||||
}
|
|
||||||
return int(crc16sum(key)) % hashSlots
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
package redis
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gopkg.in/redis.v3/internal/hashtag"
|
||||||
|
)
|
||||||
|
|
||||||
// ClusterPipeline is not thread-safe.
|
// ClusterPipeline is not thread-safe.
|
||||||
type ClusterPipeline struct {
|
type ClusterPipeline struct {
|
||||||
commandable
|
commandable
|
||||||
|
@ -48,7 +52,7 @@ func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
|
||||||
|
|
||||||
cmdsMap := make(map[string][]Cmder)
|
cmdsMap := make(map[string][]Cmder)
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
slot := hashSlot(cmd.clusterKey())
|
slot := hashtag.Slot(cmd.clusterKey())
|
||||||
addr := pipe.cluster.slotMasterAddr(slot)
|
addr := pipe.cluster.slotMasterAddr(slot)
|
||||||
cmdsMap[addr] = append(cmdsMap[addr], cmd)
|
cmdsMap[addr] = append(cmdsMap[addr], cmd)
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"gopkg.in/redis.v3"
|
"gopkg.in/redis.v3"
|
||||||
|
"gopkg.in/redis.v3/internal/hashtag"
|
||||||
)
|
)
|
||||||
|
|
||||||
type clusterScenario struct {
|
type clusterScenario struct {
|
||||||
|
@ -182,7 +183,7 @@ var _ = Describe("Cluster", func() {
|
||||||
rand.Seed(100)
|
rand.Seed(100)
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
Expect(redis.HashSlot(test.key)).To(Equal(test.slot), "for %s", test.key)
|
Expect(hashtag.Slot(test.key)).To(Equal(test.slot), "for %s", test.key)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -198,7 +199,7 @@ var _ = Describe("Cluster", func() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
Expect(redis.HashSlot(test.one)).To(Equal(redis.HashSlot(test.two)), "for %s <-> %s", test.one, test.two)
|
Expect(hashtag.Slot(test.one)).To(Equal(hashtag.Slot(test.two)), "for %s <-> %s", test.one, test.two)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -232,7 +233,7 @@ var _ = Describe("Cluster", func() {
|
||||||
It("should CLUSTER KEYSLOT", func() {
|
It("should CLUSTER KEYSLOT", func() {
|
||||||
hashSlot, err := cluster.primary().ClusterKeySlot("somekey").Result()
|
hashSlot, err := cluster.primary().ClusterKeySlot("somekey").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(hashSlot).To(Equal(int64(11058)))
|
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
|
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
|
||||||
|
@ -315,7 +316,7 @@ var _ = Describe("Cluster", func() {
|
||||||
It("should follow redirects", func() {
|
It("should follow redirects", func() {
|
||||||
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
|
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
slot := redis.HashSlot("A")
|
slot := hashtag.Slot("A")
|
||||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
val, err := client.Get("A").Result()
|
val, err := client.Get("A").Result()
|
||||||
|
@ -328,7 +329,7 @@ var _ = Describe("Cluster", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should perform multi-pipelines", func() {
|
It("should perform multi-pipelines", func() {
|
||||||
slot := redis.HashSlot("A")
|
slot := hashtag.Slot("A")
|
||||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
pipe := client.Pipeline()
|
pipe := client.Pipeline()
|
||||||
|
@ -361,7 +362,7 @@ var _ = Describe("Cluster", func() {
|
||||||
MaxRedirects: -1,
|
MaxRedirects: -1,
|
||||||
})
|
})
|
||||||
|
|
||||||
slot := redis.HashSlot("A")
|
slot := hashtag.Slot("A")
|
||||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
err := client.Get("A").Err()
|
err := client.Get("A").Err()
|
||||||
|
|
|
@ -11,7 +11,3 @@ var NewConnDialer = newConnDialer
|
||||||
func (cn *conn) SetNetConn(netcn net.Conn) {
|
func (cn *conn) SetNetConn(netcn net.Conn) {
|
||||||
cn.netcn = netcn
|
cn.netcn = netcn
|
||||||
}
|
}
|
||||||
|
|
||||||
func HashSlot(key string) int {
|
|
||||||
return hashSlot(key)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,4 +1,11 @@
|
||||||
package redis
|
package hashtag
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
const SlotNumber = 16384
|
||||||
|
|
||||||
// CRC16 implementation according to CCITT standards.
|
// CRC16 implementation according to CCITT standards.
|
||||||
// Copyright 2001-2010 Georges Menie (www.menie.org)
|
// Copyright 2001-2010 Georges Menie (www.menie.org)
|
||||||
|
@ -39,6 +46,25 @@ var crc16tab = [256]uint16{
|
||||||
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
|
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Key(key string) string {
|
||||||
|
if s := strings.IndexByte(key, '{'); s > -1 {
|
||||||
|
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
||||||
|
return key[s+1 : s+e+1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
// hashSlot returns a consistent slot number between 0 and 16383
|
||||||
|
// for any given string key.
|
||||||
|
func Slot(key string) int {
|
||||||
|
key = Key(key)
|
||||||
|
if key == "" {
|
||||||
|
return rand.Intn(SlotNumber)
|
||||||
|
}
|
||||||
|
return int(crc16sum(key)) % SlotNumber
|
||||||
|
}
|
||||||
|
|
||||||
func crc16sum(key string) (crc uint16) {
|
func crc16sum(key string) (crc uint16) {
|
||||||
for i := 0; i < len(key); i++ {
|
for i := 0; i < len(key); i++ {
|
||||||
crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
|
crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
|
|
@ -1,4 +1,4 @@
|
||||||
package redis
|
package hashtag
|
||||||
|
|
||||||
import (
|
import (
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
5
ring.go
5
ring.go
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/redis.v3/internal/consistenthash"
|
"gopkg.in/redis.v3/internal/consistenthash"
|
||||||
|
"gopkg.in/redis.v3/internal/hashtag"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -151,7 +152,7 @@ func (ring *Ring) getClient(key string) (*Client, error) {
|
||||||
return nil, errClosed
|
return nil, errClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
name := ring.hash.Get(hashKey(key))
|
name := ring.hash.Get(hashtag.Key(key))
|
||||||
if name == "" {
|
if name == "" {
|
||||||
ring.mx.RUnlock()
|
ring.mx.RUnlock()
|
||||||
return nil, errRingShardsDown
|
return nil, errRingShardsDown
|
||||||
|
@ -297,7 +298,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
|
||||||
|
|
||||||
cmdsMap := make(map[string][]Cmder)
|
cmdsMap := make(map[string][]Cmder)
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
name := pipe.ring.hash.Get(hashKey(cmd.clusterKey()))
|
name := pipe.ring.hash.Get(hashtag.Key(cmd.clusterKey()))
|
||||||
if name == "" {
|
if name == "" {
|
||||||
cmd.setErr(errRingShardsDown)
|
cmd.setErr(errRingShardsDown)
|
||||||
if retErr == nil {
|
if retErr == nil {
|
||||||
|
|
Loading…
Reference in New Issue