forked from mirror/redis
Merge pull request #72 from go-redis/feature/cluster-without-pipeline
Feature/cluster without pipeline
This commit is contained in:
commit
f8b9d6219b
|
@ -1 +1,2 @@
|
|||
*.rdb
|
||||
.test/
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
# Minimal redis.conf
|
||||
|
||||
port 6379
|
||||
daemonize no
|
||||
dir .
|
||||
save ""
|
||||
appendonly yes
|
||||
cluster-config-file nodes.conf
|
||||
cluster-node-timeout 30000
|
21
Makefile
21
Makefile
|
@ -1,4 +1,17 @@
|
|||
all:
|
||||
go test ./...
|
||||
go test ./... -cpu=2
|
||||
go test ./... -short -race
|
||||
all: testdeps
|
||||
go test ./... -v 1 -ginkgo.slowSpecThreshold=10 -cpu=1,2,4
|
||||
go test ./... -ginkgo.slowSpecThreshold=10 -short -race
|
||||
|
||||
test: testdeps
|
||||
go test ./... -v 1 -ginkgo.slowSpecThreshold=10
|
||||
|
||||
testdeps: .test/redis/src/redis-server
|
||||
|
||||
.PHONY: all test testdeps
|
||||
|
||||
.test/redis:
|
||||
mkdir -p $@
|
||||
wget -qO- https://github.com/antirez/redis/archive/3.0.tar.gz | tar xvz --strip-components=1 -C $@
|
||||
|
||||
.test/redis/src/redis-server: .test/redis
|
||||
cd $< && make all
|
||||
|
|
|
@ -0,0 +1,303 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ClusterClient struct {
|
||||
commandable
|
||||
|
||||
addrs map[string]struct{}
|
||||
slots [][]string
|
||||
conns map[string]*Client
|
||||
opt *ClusterOptions
|
||||
|
||||
// Protect addrs, slots and conns cache
|
||||
cachemx sync.RWMutex
|
||||
_reload uint32
|
||||
}
|
||||
|
||||
// NewClusterClient initializes a new cluster-aware client using given options.
|
||||
// A list of seed addresses must be provided.
|
||||
func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) {
|
||||
addrs, err := opt.getAddrSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &ClusterClient{
|
||||
addrs: addrs,
|
||||
conns: make(map[string]*Client),
|
||||
opt: opt,
|
||||
_reload: 1,
|
||||
}
|
||||
client.commandable.process = client.process
|
||||
client.reloadIfDue()
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// Close closes the cluster connection
|
||||
func (c *ClusterClient) Close() error {
|
||||
c.cachemx.Lock()
|
||||
defer c.cachemx.Unlock()
|
||||
|
||||
return c.reset()
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
// Finds the current master address for a given hash slot
|
||||
func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string {
|
||||
if addrs := c.slots[hashSlot]; len(addrs) > 0 {
|
||||
return addrs[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Returns a node's client for a given address
|
||||
func (c *ClusterClient) getNodeClientByAddr(addr string) *Client {
|
||||
client, ok := c.conns[addr]
|
||||
if !ok {
|
||||
opt := c.opt.clientOptions()
|
||||
opt.Addr = addr
|
||||
client = NewTCPClient(opt)
|
||||
c.conns[addr] = client
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// Process a command
|
||||
func (c *ClusterClient) process(cmd Cmder) {
|
||||
var ask bool
|
||||
|
||||
c.reloadIfDue()
|
||||
|
||||
hashSlot := HashSlot(cmd.clusterKey())
|
||||
|
||||
c.cachemx.RLock()
|
||||
defer c.cachemx.RUnlock()
|
||||
|
||||
tried := make(map[string]struct{}, len(c.addrs))
|
||||
addr := c.getMasterAddrBySlot(hashSlot)
|
||||
for attempt := 0; attempt < c.opt.getMaxRedirects(); attempt++ {
|
||||
tried[addr] = struct{}{}
|
||||
|
||||
// Pick the connection, process request
|
||||
conn := c.getNodeClientByAddr(addr)
|
||||
if ask {
|
||||
pipe := conn.Pipeline()
|
||||
pipe.Process(NewCmd("ASKING"))
|
||||
pipe.Process(cmd)
|
||||
_, _ = pipe.Exec()
|
||||
ask = false
|
||||
} else {
|
||||
conn.Process(cmd)
|
||||
}
|
||||
|
||||
// If there is no (real) error, we are done!
|
||||
err := cmd.Err()
|
||||
if err == nil || err == Nil {
|
||||
return
|
||||
}
|
||||
|
||||
// On connection errors, pick a random, previosuly untried connection
|
||||
// and request again.
|
||||
if _, ok := err.(*net.OpError); ok || err == io.EOF {
|
||||
if addr = c.findNextAddr(tried); addr == "" {
|
||||
return
|
||||
}
|
||||
cmd.reset()
|
||||
continue
|
||||
}
|
||||
|
||||
// Check the error message, return if unexpected
|
||||
parts := strings.SplitN(err.Error(), " ", 3)
|
||||
if len(parts) != 3 {
|
||||
return
|
||||
}
|
||||
|
||||
// Handle MOVE and ASK redirections, return on any other error
|
||||
switch parts[0] {
|
||||
case "MOVED":
|
||||
c.forceReload()
|
||||
addr = parts[2]
|
||||
case "ASK":
|
||||
ask = true
|
||||
addr = parts[2]
|
||||
default:
|
||||
return
|
||||
}
|
||||
cmd.reset()
|
||||
}
|
||||
}
|
||||
|
||||
// Closes all connections and reloads slot cache, if due
|
||||
func (c *ClusterClient) reloadIfDue() (err error) {
|
||||
if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) {
|
||||
return
|
||||
}
|
||||
|
||||
var infos []ClusterSlotInfo
|
||||
|
||||
c.cachemx.Lock()
|
||||
defer c.cachemx.Unlock()
|
||||
|
||||
// Try known addresses in random order (map interation order is random in Go)
|
||||
// http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections
|
||||
// https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157
|
||||
for addr := range c.addrs {
|
||||
c.reset()
|
||||
|
||||
infos, err = c.fetchClusterSlots(addr)
|
||||
if err == nil {
|
||||
c.update(infos)
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Closes all connections and flushes slots cache
|
||||
func (c *ClusterClient) reset() (err error) {
|
||||
for addr, client := range c.conns {
|
||||
if e := client.Close(); e != nil {
|
||||
err = e
|
||||
}
|
||||
delete(c.conns, addr)
|
||||
}
|
||||
c.slots = make([][]string, hashSlots)
|
||||
return
|
||||
}
|
||||
|
||||
// Forces a cache reload on next request
|
||||
func (c *ClusterClient) forceReload() {
|
||||
atomic.StoreUint32(&c._reload, 1)
|
||||
}
|
||||
|
||||
// Find the next untried address
|
||||
func (c *ClusterClient) findNextAddr(tried map[string]struct{}) string {
|
||||
for addr := range c.addrs {
|
||||
if _, ok := tried[addr]; !ok {
|
||||
return addr
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Fetch slot information
|
||||
func (c *ClusterClient) fetchClusterSlots(addr string) ([]ClusterSlotInfo, error) {
|
||||
opt := c.opt.clientOptions()
|
||||
opt.Addr = addr
|
||||
client := NewClient(opt)
|
||||
defer client.Close()
|
||||
|
||||
return client.ClusterSlots().Result()
|
||||
}
|
||||
|
||||
// Update slot information, populate slots
|
||||
func (c *ClusterClient) update(infos []ClusterSlotInfo) {
|
||||
for _, info := range infos {
|
||||
for i := info.Start; i <= info.End; i++ {
|
||||
c.slots[i] = info.Addrs
|
||||
}
|
||||
|
||||
for _, addr := range info.Addrs {
|
||||
c.addrs[addr] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
var errNoAddrs = errors.New("redis: no addresses")
|
||||
|
||||
type ClusterOptions struct {
|
||||
// A seed-list of host:port addresses of known cluster nodes
|
||||
Addrs []string
|
||||
|
||||
// An optional password
|
||||
Password string
|
||||
|
||||
// The maximum number of MOVED/ASK redirects to follow, before
|
||||
// giving up. Default: 16
|
||||
MaxRedirects int
|
||||
|
||||
// The maximum number of TCP sockets per connection. Default: 5
|
||||
PoolSize int
|
||||
|
||||
// Timeout settings
|
||||
DialTimeout, ReadTimeout, WriteTimeout, IdleTimeout time.Duration
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) getPoolSize() int {
|
||||
if opt.PoolSize < 1 {
|
||||
return 5
|
||||
}
|
||||
return opt.PoolSize
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) getDialTimeout() time.Duration {
|
||||
if opt.DialTimeout == 0 {
|
||||
return 5 * time.Second
|
||||
}
|
||||
return opt.DialTimeout
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) getMaxRedirects() int {
|
||||
if opt.MaxRedirects < 1 {
|
||||
return 16
|
||||
}
|
||||
return opt.MaxRedirects
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) getAddrSet() (map[string]struct{}, error) {
|
||||
size := len(opt.Addrs)
|
||||
if size < 1 {
|
||||
return nil, errNoAddrs
|
||||
}
|
||||
|
||||
addrs := make(map[string]struct{}, size)
|
||||
for _, addr := range opt.Addrs {
|
||||
addrs[addr] = struct{}{}
|
||||
}
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) clientOptions() *Options {
|
||||
return &Options{
|
||||
DB: 0,
|
||||
Password: opt.Password,
|
||||
|
||||
DialTimeout: opt.getDialTimeout(),
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
|
||||
PoolSize: opt.getPoolSize(),
|
||||
IdleTimeout: opt.IdleTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
const hashSlots = 16384
|
||||
|
||||
// HashSlot returns a consistent slot number between 0 and 16383
|
||||
// for any given string key
|
||||
func HashSlot(key string) int {
|
||||
if s := strings.IndexByte(key, '{'); s > -1 {
|
||||
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
|
||||
key = key[s+1 : s+e+1]
|
||||
}
|
||||
}
|
||||
if key == "" {
|
||||
return rand.Intn(hashSlots)
|
||||
}
|
||||
return int(crc16sum(key)) % hashSlots
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("ClusterClient", func() {
|
||||
|
||||
var subject *ClusterClient
|
||||
var populate = func() {
|
||||
subject.reset()
|
||||
subject.update([]ClusterSlotInfo{
|
||||
{0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}},
|
||||
{12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}},
|
||||
{4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}},
|
||||
{8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}},
|
||||
})
|
||||
}
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
subject, err = NewClusterClient(&ClusterOptions{
|
||||
Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"},
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
subject.Close()
|
||||
})
|
||||
|
||||
It("should initialize", func() {
|
||||
Expect(subject.addrs).To(HaveLen(3))
|
||||
Expect(subject.slots).To(HaveLen(hashSlots))
|
||||
Expect(subject._reload).To(Equal(uint32(0)))
|
||||
})
|
||||
|
||||
It("should update slots cache", func() {
|
||||
populate()
|
||||
Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
|
||||
Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
|
||||
Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
|
||||
Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
|
||||
Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
|
||||
Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
|
||||
Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
|
||||
Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
|
||||
Expect(subject.addrs).To(Equal(map[string]struct{}{
|
||||
"127.0.0.1:6379": struct{}{},
|
||||
"127.0.0.1:7000": struct{}{},
|
||||
"127.0.0.1:7001": struct{}{},
|
||||
"127.0.0.1:7002": struct{}{},
|
||||
"127.0.0.1:7003": struct{}{},
|
||||
"127.0.0.1:7004": struct{}{},
|
||||
"127.0.0.1:7005": struct{}{},
|
||||
"127.0.0.1:7006": struct{}{},
|
||||
"127.0.0.1:7007": struct{}{},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should find next addresses", func() {
|
||||
populate()
|
||||
seen := map[string]struct{}{
|
||||
"127.0.0.1:7000": struct{}{},
|
||||
"127.0.0.1:7001": struct{}{},
|
||||
"127.0.0.1:7003": struct{}{},
|
||||
}
|
||||
|
||||
addr := subject.findNextAddr(seen)
|
||||
for addr != "" {
|
||||
seen[addr] = struct{}{}
|
||||
addr = subject.findNextAddr(seen)
|
||||
}
|
||||
Expect(subject.findNextAddr(seen)).To(Equal(""))
|
||||
Expect(seen).To(Equal(map[string]struct{}{
|
||||
"127.0.0.1:6379": struct{}{},
|
||||
"127.0.0.1:7000": struct{}{},
|
||||
"127.0.0.1:7001": struct{}{},
|
||||
"127.0.0.1:7002": struct{}{},
|
||||
"127.0.0.1:7003": struct{}{},
|
||||
"127.0.0.1:7004": struct{}{},
|
||||
"127.0.0.1:7005": struct{}{},
|
||||
"127.0.0.1:7006": struct{}{},
|
||||
"127.0.0.1:7007": struct{}{},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should check if reload is due", func() {
|
||||
subject._reload = 0
|
||||
Expect(subject._reload).To(Equal(uint32(0)))
|
||||
subject.forceReload()
|
||||
Expect(subject._reload).To(Equal(uint32(1)))
|
||||
})
|
||||
})
|
|
@ -0,0 +1,232 @@
|
|||
package redis_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"gopkg.in/redis.v2"
|
||||
)
|
||||
|
||||
var _ = Describe("Cluster", func() {
|
||||
var scenario = &clusterScenario{
|
||||
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
|
||||
nodeIDs: make([]string, 6),
|
||||
processes: make(map[string]*redisProcess, 6),
|
||||
clients: make(map[string]*redis.Client, 6),
|
||||
}
|
||||
|
||||
BeforeSuite(func() {
|
||||
// Start processes, connect individual clients
|
||||
for pos, port := range scenario.ports {
|
||||
process, err := startRedis(port, "--cluster-enabled", "yes")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
client := redis.NewClient(&redis.Options{Addr: "127.0.0.1:" + port})
|
||||
info, err := client.ClusterNodes().Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
scenario.processes[port] = process
|
||||
scenario.clients[port] = client
|
||||
scenario.nodeIDs[pos] = info[:40]
|
||||
}
|
||||
|
||||
// Meet cluster nodes
|
||||
for _, client := range scenario.clients {
|
||||
err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Bootstrap masters
|
||||
slots := []int{0, 5000, 10000, 16384}
|
||||
for pos, client := range scenario.masters() {
|
||||
err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Bootstrap slaves
|
||||
for pos, client := range scenario.slaves() {
|
||||
masterID := scenario.nodeIDs[pos]
|
||||
|
||||
Eventually(func() string { // Wait for masters
|
||||
return client.ClusterNodes().Val()
|
||||
}, "10s").Should(ContainSubstring(masterID))
|
||||
|
||||
err := client.ClusterReplicate(masterID).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
Eventually(func() string { // Wait for slaves
|
||||
return scenario.primary().ClusterNodes().Val()
|
||||
}, "10s").Should(ContainSubstring("slave " + masterID))
|
||||
}
|
||||
|
||||
// Wait for cluster state to turn OK
|
||||
for _, client := range scenario.clients {
|
||||
Eventually(func() string {
|
||||
return client.ClusterInfo().Val()
|
||||
}, "10s").Should(ContainSubstring("cluster_state:ok"))
|
||||
}
|
||||
})
|
||||
|
||||
AfterSuite(func() {
|
||||
for _, client := range scenario.clients {
|
||||
client.Close()
|
||||
}
|
||||
for _, process := range scenario.processes {
|
||||
process.Close()
|
||||
}
|
||||
})
|
||||
|
||||
Describe("HashSlot", func() {
|
||||
|
||||
It("should calculate hash slots", func() {
|
||||
tests := []struct {
|
||||
key string
|
||||
slot int
|
||||
}{
|
||||
{"123456789", 12739},
|
||||
{"{}foo", 9500},
|
||||
{"foo{}", 5542},
|
||||
{"foo{}{bar}", 8363},
|
||||
{"", 10503},
|
||||
{"", 5176},
|
||||
{string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 5463},
|
||||
}
|
||||
rand.Seed(100)
|
||||
|
||||
for _, test := range tests {
|
||||
Expect(redis.HashSlot(test.key)).To(Equal(test.slot), "for %s", test.key)
|
||||
}
|
||||
})
|
||||
|
||||
It("should extract keys from tags", func() {
|
||||
tests := []struct {
|
||||
one, two string
|
||||
}{
|
||||
{"foo{bar}", "bar"},
|
||||
{"{foo}bar", "foo"},
|
||||
{"{user1000}.following", "{user1000}.followers"},
|
||||
{"foo{{bar}}zap", "{bar"},
|
||||
{"foo{bar}{zap}", "bar"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
Expect(redis.HashSlot(test.one)).To(Equal(redis.HashSlot(test.two)), "for %s <-> %s", test.one, test.two)
|
||||
}
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
Describe("Commands", func() {
|
||||
|
||||
It("should CLUSTER SLOTS", func() {
|
||||
res, err := scenario.primary().ClusterSlots().Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(HaveLen(3))
|
||||
Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{
|
||||
{0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}},
|
||||
{5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}},
|
||||
{10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should CLUSTER NODES", func() {
|
||||
res, err := scenario.primary().ClusterNodes().Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(res)).To(BeNumerically(">", 400))
|
||||
})
|
||||
|
||||
It("should CLUSTER INFO", func() {
|
||||
res, err := scenario.primary().ClusterInfo().Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
Describe("Client", func() {
|
||||
var client *redis.ClusterClient
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
client, err = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"},
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
for _, client := range scenario.clients {
|
||||
client.FlushDb()
|
||||
}
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should GET/SET/DEL", func() {
|
||||
val, err := client.Get("A").Result()
|
||||
Expect(err).To(Equal(redis.Nil))
|
||||
Expect(val).To(Equal(""))
|
||||
|
||||
val, err = client.Set("A", "VALUE").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(val).To(Equal("OK"))
|
||||
|
||||
val, err = client.Get("A").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(val).To(Equal("VALUE"))
|
||||
|
||||
cnt, err := client.Del("A").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(cnt).To(Equal(int64(1)))
|
||||
})
|
||||
|
||||
It("should follow redirects", func() {
|
||||
Expect(client.Set("A", "VALUE").Err()).NotTo(HaveOccurred())
|
||||
Expect(redis.HashSlot("A")).To(Equal(6373))
|
||||
|
||||
// Slot 6373 is stored on the second node
|
||||
defer func() {
|
||||
scenario.masters()[1].ClusterFailover()
|
||||
}()
|
||||
|
||||
slave := scenario.slaves()[1]
|
||||
Expect(slave.ClusterFailover().Err()).NotTo(HaveOccurred())
|
||||
Eventually(func() string {
|
||||
return slave.Info().Val()
|
||||
}, "10s", "200ms").Should(ContainSubstring("role:master"))
|
||||
|
||||
val, err := client.Get("A").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(val).To(Equal("VALUE"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
type clusterScenario struct {
|
||||
ports []string
|
||||
nodeIDs []string
|
||||
processes map[string]*redisProcess
|
||||
clients map[string]*redis.Client
|
||||
}
|
||||
|
||||
func (s *clusterScenario) primary() *redis.Client {
|
||||
return s.clients[s.ports[0]]
|
||||
}
|
||||
|
||||
func (s *clusterScenario) masters() []*redis.Client {
|
||||
result := make([]*redis.Client, 3)
|
||||
for pos, port := range s.ports[:3] {
|
||||
result[pos] = s.clients[port]
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *clusterScenario) slaves() []*redis.Client {
|
||||
result := make([]*redis.Client, 3)
|
||||
for pos, port := range s.ports[3:] {
|
||||
result[pos] = s.clients[port]
|
||||
}
|
||||
return result
|
||||
}
|
195
command.go
195
command.go
|
@ -24,15 +24,18 @@ var (
|
|||
_ Cmder = (*StringIntMapCmd)(nil)
|
||||
_ Cmder = (*ZSliceCmd)(nil)
|
||||
_ Cmder = (*ScanCmd)(nil)
|
||||
_ Cmder = (*ClusterSlotCmd)(nil)
|
||||
)
|
||||
|
||||
type Cmder interface {
|
||||
args() []string
|
||||
parseReply(*bufio.Reader) error
|
||||
setErr(error)
|
||||
reset()
|
||||
|
||||
writeTimeout() *time.Duration
|
||||
readTimeout() *time.Duration
|
||||
clusterKey() string
|
||||
|
||||
Err() error
|
||||
String() string
|
||||
|
@ -63,13 +66,9 @@ type baseCmd struct {
|
|||
|
||||
err error
|
||||
|
||||
_writeTimeout, _readTimeout *time.Duration
|
||||
}
|
||||
_clusterKeyPos int
|
||||
|
||||
func newBaseCmd(args ...string) *baseCmd {
|
||||
return &baseCmd{
|
||||
_args: args,
|
||||
}
|
||||
_writeTimeout, _readTimeout *time.Duration
|
||||
}
|
||||
|
||||
func (cmd *baseCmd) Err() error {
|
||||
|
@ -95,6 +94,13 @@ func (cmd *baseCmd) writeTimeout() *time.Duration {
|
|||
return cmd._writeTimeout
|
||||
}
|
||||
|
||||
func (cmd *baseCmd) clusterKey() string {
|
||||
if cmd._clusterKeyPos > 0 && cmd._clusterKeyPos < len(cmd._args) {
|
||||
return cmd._args[cmd._clusterKeyPos]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (cmd *baseCmd) setWriteTimeout(d time.Duration) {
|
||||
cmd._writeTimeout = &d
|
||||
}
|
||||
|
@ -106,15 +112,18 @@ func (cmd *baseCmd) setErr(e error) {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type Cmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val interface{}
|
||||
}
|
||||
|
||||
func NewCmd(args ...string) *Cmd {
|
||||
return &Cmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &Cmd{baseCmd: baseCmd{_args: args}}
|
||||
}
|
||||
|
||||
func (cmd *Cmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *Cmd) Val() interface{} {
|
||||
|
@ -137,15 +146,18 @@ func (cmd *Cmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type SliceCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val []interface{}
|
||||
}
|
||||
|
||||
func NewSliceCmd(args ...string) *SliceCmd {
|
||||
return &SliceCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &SliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *SliceCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *SliceCmd) Val() []interface{} {
|
||||
|
@ -173,15 +185,22 @@ func (cmd *SliceCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type StatusCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val string
|
||||
}
|
||||
|
||||
func NewStatusCmd(args ...string) *StatusCmd {
|
||||
return &StatusCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &StatusCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func newKeylessStatusCmd(args ...string) *StatusCmd {
|
||||
return &StatusCmd{baseCmd: baseCmd{_args: args}}
|
||||
}
|
||||
|
||||
func (cmd *StatusCmd) reset() {
|
||||
cmd.val = ""
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *StatusCmd) Val() string {
|
||||
|
@ -209,15 +228,18 @@ func (cmd *StatusCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type IntCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val int64
|
||||
}
|
||||
|
||||
func NewIntCmd(args ...string) *IntCmd {
|
||||
return &IntCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &IntCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *IntCmd) reset() {
|
||||
cmd.val = 0
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *IntCmd) Val() int64 {
|
||||
|
@ -245,7 +267,7 @@ func (cmd *IntCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type DurationCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val time.Duration
|
||||
precision time.Duration
|
||||
|
@ -253,11 +275,16 @@ type DurationCmd struct {
|
|||
|
||||
func NewDurationCmd(precision time.Duration, args ...string) *DurationCmd {
|
||||
return &DurationCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
precision: precision,
|
||||
baseCmd: baseCmd{_args: args, _clusterKeyPos: 1},
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd *DurationCmd) reset() {
|
||||
cmd.val = 0
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *DurationCmd) Val() time.Duration {
|
||||
return cmd.val
|
||||
}
|
||||
|
@ -283,15 +310,18 @@ func (cmd *DurationCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type BoolCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val bool
|
||||
}
|
||||
|
||||
func NewBoolCmd(args ...string) *BoolCmd {
|
||||
return &BoolCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &BoolCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *BoolCmd) reset() {
|
||||
cmd.val = false
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *BoolCmd) Val() bool {
|
||||
|
@ -319,15 +349,18 @@ func (cmd *BoolCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type StringCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val string
|
||||
}
|
||||
|
||||
func NewStringCmd(args ...string) *StringCmd {
|
||||
return &StringCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &StringCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *StringCmd) reset() {
|
||||
cmd.val = ""
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *StringCmd) Val() string {
|
||||
|
@ -376,15 +409,18 @@ func (cmd *StringCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type FloatCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val float64
|
||||
}
|
||||
|
||||
func NewFloatCmd(args ...string) *FloatCmd {
|
||||
return &FloatCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &FloatCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *FloatCmd) reset() {
|
||||
cmd.val = 0
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *FloatCmd) Val() float64 {
|
||||
|
@ -408,15 +444,18 @@ func (cmd *FloatCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type StringSliceCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val []string
|
||||
}
|
||||
|
||||
func NewStringSliceCmd(args ...string) *StringSliceCmd {
|
||||
return &StringSliceCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &StringSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *StringSliceCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *StringSliceCmd) Val() []string {
|
||||
|
@ -444,15 +483,18 @@ func (cmd *StringSliceCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type BoolSliceCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val []bool
|
||||
}
|
||||
|
||||
func NewBoolSliceCmd(args ...string) *BoolSliceCmd {
|
||||
return &BoolSliceCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &BoolSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *BoolSliceCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *BoolSliceCmd) Val() []bool {
|
||||
|
@ -480,15 +522,18 @@ func (cmd *BoolSliceCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type StringStringMapCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val map[string]string
|
||||
}
|
||||
|
||||
func NewStringStringMapCmd(args ...string) *StringStringMapCmd {
|
||||
return &StringStringMapCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &StringStringMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *StringStringMapCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *StringStringMapCmd) Val() map[string]string {
|
||||
|
@ -516,15 +561,13 @@ func (cmd *StringStringMapCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type StringIntMapCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val map[string]int64
|
||||
}
|
||||
|
||||
func NewStringIntMapCmd(args ...string) *StringIntMapCmd {
|
||||
return &StringIntMapCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
}
|
||||
return &StringIntMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *StringIntMapCmd) Val() map[string]int64 {
|
||||
|
@ -539,6 +582,11 @@ func (cmd *StringIntMapCmd) String() string {
|
|||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
||||
func (cmd *StringIntMapCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error {
|
||||
v, err := parseReply(rd, parseStringIntMap)
|
||||
if err != nil {
|
||||
|
@ -552,15 +600,18 @@ func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type ZSliceCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
val []Z
|
||||
}
|
||||
|
||||
func NewZSliceCmd(args ...string) *ZSliceCmd {
|
||||
return &ZSliceCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &ZSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *ZSliceCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *ZSliceCmd) Val() []Z {
|
||||
|
@ -588,16 +639,20 @@ func (cmd *ZSliceCmd) parseReply(rd *bufio.Reader) error {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type ScanCmd struct {
|
||||
*baseCmd
|
||||
baseCmd
|
||||
|
||||
cursor int64
|
||||
keys []string
|
||||
}
|
||||
|
||||
func NewScanCmd(args ...string) *ScanCmd {
|
||||
return &ScanCmd{
|
||||
baseCmd: newBaseCmd(args...),
|
||||
return &ScanCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *ScanCmd) reset() {
|
||||
cmd.cursor = 0
|
||||
cmd.keys = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *ScanCmd) Val() (int64, []string) {
|
||||
|
@ -632,3 +687,47 @@ func (cmd *ScanCmd) parseReply(rd *bufio.Reader) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type ClusterSlotInfo struct {
|
||||
Start, End int
|
||||
Addrs []string
|
||||
}
|
||||
|
||||
type ClusterSlotCmd struct {
|
||||
baseCmd
|
||||
|
||||
val []ClusterSlotInfo
|
||||
}
|
||||
|
||||
func NewClusterSlotCmd(args ...string) *ClusterSlotCmd {
|
||||
return &ClusterSlotCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||
}
|
||||
|
||||
func (cmd *ClusterSlotCmd) Val() []ClusterSlotInfo {
|
||||
return cmd.val
|
||||
}
|
||||
|
||||
func (cmd *ClusterSlotCmd) Result() ([]ClusterSlotInfo, error) {
|
||||
return cmd.Val(), cmd.Err()
|
||||
}
|
||||
|
||||
func (cmd *ClusterSlotCmd) String() string {
|
||||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
||||
func (cmd *ClusterSlotCmd) reset() {
|
||||
cmd.val = nil
|
||||
cmd.err = nil
|
||||
}
|
||||
|
||||
func (cmd *ClusterSlotCmd) parseReply(rd *bufio.Reader) error {
|
||||
v, err := parseReply(rd, parseClusterSlotInfoSlice)
|
||||
if err != nil {
|
||||
cmd.err = err
|
||||
return err
|
||||
}
|
||||
cmd.val = v.([]ClusterSlotInfo)
|
||||
return nil
|
||||
}
|
||||
|
|
432
commands.go
432
commands.go
File diff suppressed because it is too large
Load Diff
|
@ -65,7 +65,7 @@ var _ = Describe("Commands", func() {
|
|||
// workaround for "ERR Can't BGSAVE while AOF log rewriting is in progress"
|
||||
Eventually(func() string {
|
||||
return client.BgSave().Val()
|
||||
}).Should(Equal("Background saving started"))
|
||||
}, "10s").Should(Equal("Background saving started"))
|
||||
})
|
||||
|
||||
It("should ClientKill", func() {
|
||||
|
@ -119,7 +119,7 @@ var _ = Describe("Commands", func() {
|
|||
// workaround for "ERR Background save already in progress"
|
||||
Eventually(func() string {
|
||||
return client.Save().Val()
|
||||
}).Should(Equal("OK"))
|
||||
}, "10s").Should(Equal("OK"))
|
||||
})
|
||||
|
||||
It("should SlaveOf", func() {
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
package redis
|
||||
|
||||
// CRC16 implementation according to CCITT standards.
|
||||
// Copyright 2001-2010 Georges Menie (www.menie.org)
|
||||
// Copyright 2013 The Go Authors. All rights reserved.
|
||||
// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c
|
||||
var crc16tab = [256]uint16{
|
||||
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
|
||||
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
|
||||
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
|
||||
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
|
||||
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
|
||||
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
|
||||
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
|
||||
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
|
||||
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
|
||||
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
|
||||
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
|
||||
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
|
||||
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
|
||||
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
|
||||
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
|
||||
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
|
||||
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
|
||||
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
|
||||
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
|
||||
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
|
||||
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
|
||||
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
|
||||
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
|
||||
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
|
||||
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
|
||||
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
|
||||
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
|
||||
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
|
||||
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
|
||||
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
|
||||
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
|
||||
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
|
||||
}
|
||||
|
||||
func crc16sum(key string) (crc uint16) {
|
||||
for i := 0; i < len(key); i++ {
|
||||
crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
|
||||
}
|
||||
return
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("CRC16", func() {
|
||||
|
||||
// http://redis.io/topics/cluster-spec#keys-distribution-model
|
||||
It("should calculate CRC16", func() {
|
||||
tests := []struct {
|
||||
s string
|
||||
n uint16
|
||||
}{
|
||||
{"123456789", 0x31C3},
|
||||
{string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 21847},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
Expect(crc16sum(test.s)).To(Equal(test.n), "for %s", test.s)
|
||||
}
|
||||
})
|
||||
|
||||
})
|
34
multi.go
34
multi.go
|
@ -9,17 +9,25 @@ var errDiscard = errors.New("redis: Discard can be used only inside Exec")
|
|||
|
||||
// Not thread-safe.
|
||||
type Multi struct {
|
||||
*Client
|
||||
commandable
|
||||
|
||||
base *baseClient
|
||||
cmds []Cmder
|
||||
}
|
||||
|
||||
func (c *Client) Multi() *Multi {
|
||||
return &Multi{
|
||||
Client: &Client{
|
||||
baseClient: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: newSingleConnPool(c.connPool, true),
|
||||
},
|
||||
},
|
||||
multi := &Multi{
|
||||
base: &baseClient{opt: c.opt, connPool: newSingleConnPool(c.connPool, true)},
|
||||
}
|
||||
multi.commandable.process = multi.process
|
||||
return multi
|
||||
}
|
||||
|
||||
func (c *Multi) process(cmd Cmder) {
|
||||
if c.cmds == nil {
|
||||
c.base.process(cmd)
|
||||
} else {
|
||||
c.cmds = append(c.cmds, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +35,7 @@ func (c *Multi) Close() error {
|
|||
if err := c.Unwatch().Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Client.Close()
|
||||
return c.base.Close()
|
||||
}
|
||||
|
||||
func (c *Multi) Watch(keys ...string) *StatusCmd {
|
||||
|
@ -69,7 +77,7 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) {
|
|||
return []Cmder{}, nil
|
||||
}
|
||||
|
||||
cn, err := c.conn()
|
||||
cn, err := c.base.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(cmds[1:len(cmds)-1], err)
|
||||
return cmds[1 : len(cmds)-1], err
|
||||
|
@ -77,16 +85,16 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) {
|
|||
|
||||
err = c.execCmds(cn, cmds)
|
||||
if err != nil {
|
||||
c.freeConn(cn, err)
|
||||
c.base.freeConn(cn, err)
|
||||
return cmds[1 : len(cmds)-1], err
|
||||
}
|
||||
|
||||
c.putConn(cn)
|
||||
c.base.putConn(cn)
|
||||
return cmds[1 : len(cmds)-1], nil
|
||||
}
|
||||
|
||||
func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
|
||||
err := c.writeCmd(cn, cmds...)
|
||||
err := cn.writeCmds(cmds...)
|
||||
if err != nil {
|
||||
setCmdsErr(cmds[1:len(cmds)-1], err)
|
||||
return err
|
||||
|
|
48
parser.go
48
parser.go
|
@ -3,6 +3,7 @@ package redis
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"gopkg.in/bufio.v1"
|
||||
|
@ -292,3 +293,50 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) {
|
|||
}
|
||||
return zz, nil
|
||||
}
|
||||
|
||||
func parseClusterSlotInfoSlice(rd *bufio.Reader, n int64) (interface{}, error) {
|
||||
infos := make([]ClusterSlotInfo, 0, n)
|
||||
for i := int64(0); i < n; i++ {
|
||||
viface, err := parseReply(rd, parseSlice)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
item, ok := viface.([]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("got %T, expected []interface{}", viface)
|
||||
} else if len(item) < 3 {
|
||||
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
|
||||
}
|
||||
|
||||
start, ok := item[0].(int64)
|
||||
if !ok || start < 0 || start > hashSlots {
|
||||
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
|
||||
}
|
||||
end, ok := item[1].(int64)
|
||||
if !ok || end < 0 || end > hashSlots {
|
||||
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
|
||||
}
|
||||
|
||||
info := ClusterSlotInfo{int(start), int(end), make([]string, len(item)-2)}
|
||||
for n, ipair := range item[2:] {
|
||||
pair, ok := ipair.([]interface{})
|
||||
if !ok || len(pair) != 2 {
|
||||
return nil, fmt.Errorf("got %v, expected []interface{host, port}", viface)
|
||||
}
|
||||
|
||||
ip, ok := pair[0].(string)
|
||||
if !ok || len(ip) < 1 {
|
||||
return nil, fmt.Errorf("got %v, expected IP PORT pair", pair)
|
||||
}
|
||||
port, ok := pair[1].(int64)
|
||||
if !ok || port < 1 {
|
||||
return nil, fmt.Errorf("got %v, expected IP PORT pair", pair)
|
||||
}
|
||||
|
||||
info.Addrs[n] = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
|
||||
}
|
||||
infos = append(infos, info)
|
||||
}
|
||||
return infos, nil
|
||||
}
|
||||
|
|
29
pipeline.go
29
pipeline.go
|
@ -2,22 +2,23 @@ package redis
|
|||
|
||||
// Not thread-safe.
|
||||
type Pipeline struct {
|
||||
*Client
|
||||
commandable
|
||||
|
||||
cmds []Cmder
|
||||
client *baseClient
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (c *Client) Pipeline() *Pipeline {
|
||||
return &Pipeline{
|
||||
Client: &Client{
|
||||
baseClient: &baseClient{
|
||||
pipe := &Pipeline{
|
||||
client: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: c.connPool,
|
||||
|
||||
cmds: make([]Cmder, 0),
|
||||
},
|
||||
},
|
||||
cmds: make([]Cmder, 0, 10),
|
||||
}
|
||||
pipe.commandable.process = pipe.process
|
||||
return pipe
|
||||
}
|
||||
|
||||
func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
|
||||
|
@ -30,6 +31,10 @@ func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
|
|||
return cmds, err
|
||||
}
|
||||
|
||||
func (c *Pipeline) process(cmd Cmder) {
|
||||
c.cmds = append(c.cmds, cmd)
|
||||
}
|
||||
|
||||
func (c *Pipeline) Close() error {
|
||||
c.closed = true
|
||||
return nil
|
||||
|
@ -51,29 +56,29 @@ func (c *Pipeline) Exec() ([]Cmder, error) {
|
|||
}
|
||||
|
||||
cmds := c.cmds
|
||||
c.cmds = make([]Cmder, 0)
|
||||
c.cmds = make([]Cmder, 0, 0)
|
||||
|
||||
if len(cmds) == 0 {
|
||||
return []Cmder{}, nil
|
||||
}
|
||||
|
||||
cn, err := c.conn()
|
||||
cn, err := c.client.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return cmds, err
|
||||
}
|
||||
|
||||
if err := c.execCmds(cn, cmds); err != nil {
|
||||
c.freeConn(cn, err)
|
||||
c.client.freeConn(cn, err)
|
||||
return cmds, err
|
||||
}
|
||||
|
||||
c.putConn(cn)
|
||||
c.client.putConn(cn)
|
||||
return cmds, nil
|
||||
}
|
||||
|
||||
func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
|
||||
if err := c.writeCmd(cn, cmds...); err != nil {
|
||||
if err := cn.writeCmds(cmds...); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return err
|
||||
}
|
||||
|
|
10
pool.go
10
pool.go
|
@ -58,6 +58,16 @@ func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (cn *conn) writeCmds(cmds ...Cmder) error {
|
||||
buf := cn.buf[:0]
|
||||
for _, cmd := range cmds {
|
||||
buf = appendArgs(buf, cmd.args())
|
||||
}
|
||||
|
||||
_, err := cn.Write(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
func (cn *conn) Read(b []byte) (int, error) {
|
||||
if cn.readTimeout != 0 {
|
||||
cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))
|
||||
|
|
|
@ -103,7 +103,7 @@ func (c *PubSub) subscribe(cmd string, channels ...string) error {
|
|||
|
||||
args := append([]string{cmd}, channels...)
|
||||
req := NewSliceCmd(args...)
|
||||
return c.writeCmd(cn, req)
|
||||
return cn.writeCmds(req)
|
||||
}
|
||||
|
||||
func (c *PubSub) Subscribe(channels ...string) error {
|
||||
|
@ -122,7 +122,7 @@ func (c *PubSub) unsubscribe(cmd string, channels ...string) error {
|
|||
|
||||
args := append([]string{cmd}, channels...)
|
||||
req := NewSliceCmd(args...)
|
||||
return c.writeCmd(cn, req)
|
||||
return cn.writeCmds(req)
|
||||
}
|
||||
|
||||
func (c *PubSub) Unsubscribe(channels ...string) error {
|
||||
|
|
48
redis.go
48
redis.go
|
@ -9,17 +9,6 @@ import (
|
|||
type baseClient struct {
|
||||
connPool pool
|
||||
opt *options
|
||||
cmds []Cmder
|
||||
}
|
||||
|
||||
func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
|
||||
buf := cn.buf[:0]
|
||||
for _, cmd := range cmds {
|
||||
buf = appendArgs(buf, cmd.args())
|
||||
}
|
||||
|
||||
_, err := cn.Write(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *baseClient) conn() (*conn, error) {
|
||||
|
@ -47,12 +36,7 @@ func (c *baseClient) initConn(cn *conn) error {
|
|||
pool.SetConn(cn)
|
||||
|
||||
// Client is not closed because we want to reuse underlying connection.
|
||||
client := &Client{
|
||||
baseClient: &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: pool,
|
||||
},
|
||||
}
|
||||
client := newClient(c.opt, pool)
|
||||
|
||||
if c.opt.Password != "" {
|
||||
if err := client.Auth(c.opt.Password).Err(); err != nil {
|
||||
|
@ -91,15 +75,7 @@ func (c *baseClient) putConn(cn *conn) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *baseClient) Process(cmd Cmder) {
|
||||
if c.cmds == nil {
|
||||
c.run(cmd)
|
||||
} else {
|
||||
c.cmds = append(c.cmds, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *baseClient) run(cmd Cmder) {
|
||||
func (c *baseClient) process(cmd Cmder) {
|
||||
cn, err := c.conn()
|
||||
if err != nil {
|
||||
cmd.setErr(err)
|
||||
|
@ -118,7 +94,7 @@ func (c *baseClient) run(cmd Cmder) {
|
|||
cn.readTimeout = c.opt.ReadTimeout
|
||||
}
|
||||
|
||||
if err := c.writeCmd(cn, cmd); err != nil {
|
||||
if err := cn.writeCmds(cmd); err != nil {
|
||||
c.freeConn(cn, err)
|
||||
cmd.setErr(err)
|
||||
return
|
||||
|
@ -237,8 +213,19 @@ func (opt *Options) options() *options {
|
|||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type Client struct {
|
||||
*baseClient
|
||||
commandable
|
||||
}
|
||||
|
||||
func newClient(opt *options, pool pool) *Client {
|
||||
base := &baseClient{opt: opt, connPool: pool}
|
||||
return &Client{
|
||||
baseClient: base,
|
||||
commandable: commandable{process: base.process},
|
||||
}
|
||||
}
|
||||
|
||||
func NewClient(clOpt *Options) *Client {
|
||||
|
@ -249,12 +236,7 @@ func NewClient(clOpt *Options) *Client {
|
|||
return net.DialTimeout(clOpt.getNetwork(), clOpt.Addr, opt.DialTimeout)
|
||||
}
|
||||
}
|
||||
return &Client{
|
||||
baseClient: &baseClient{
|
||||
opt: opt,
|
||||
connPool: newConnPool(newConnFunc(dialer), opt),
|
||||
},
|
||||
}
|
||||
return newClient(opt, newConnPool(newConnFunc(dialer), opt))
|
||||
}
|
||||
|
||||
// Deprecated. Use NewClient instead.
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -126,7 +127,7 @@ func TestGinkgoSuite(t *testing.T) {
|
|||
|
||||
func execCmd(name string, args ...string) (*os.Process, error) {
|
||||
cmd := exec.Command(name, args...)
|
||||
if true {
|
||||
if false {
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
}
|
||||
|
@ -138,12 +139,12 @@ func connectTo(port string) (client *redis.Client, err error) {
|
|||
Addr: ":" + port,
|
||||
})
|
||||
|
||||
deadline := time.Now().Add(time.Second)
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if err = client.Ping().Err(); err == nil {
|
||||
return client, nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
|
@ -159,11 +160,38 @@ func (p *redisProcess) Close() error {
|
|||
return p.Kill()
|
||||
}
|
||||
|
||||
var (
|
||||
redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server"))
|
||||
redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf"))
|
||||
)
|
||||
|
||||
func redisDir(port string) (string, error) {
|
||||
dir, err := filepath.Abs(filepath.Join(".test", "instances", port))
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if err = os.RemoveAll(dir); err != nil {
|
||||
return "", err
|
||||
} else if err = os.MkdirAll(dir, 0775); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return dir, nil
|
||||
}
|
||||
|
||||
func startRedis(port string, args ...string) (*redisProcess, error) {
|
||||
process, err := execCmd("redis-server", append([]string{"--port", port}, args...)...)
|
||||
dir, err := redisDir(port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
|
||||
process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := connectTo(port)
|
||||
if err != nil {
|
||||
process.Kill()
|
||||
|
@ -173,7 +201,11 @@ func startRedis(port string, args ...string) (*redisProcess, error) {
|
|||
}
|
||||
|
||||
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
||||
process, err := execCmd("redis-server", os.DevNull, "--sentinel", "--port", port)
|
||||
dir, err := redisDir(port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
16
sentinel.go
16
sentinel.go
|
@ -92,17 +92,13 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
|||
|
||||
opt: opt,
|
||||
}
|
||||
return &Client{
|
||||
baseClient: &baseClient{
|
||||
opt: opt,
|
||||
connPool: failover.Pool(),
|
||||
},
|
||||
}
|
||||
return newClient(opt, failover.Pool())
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type sentinelClient struct {
|
||||
commandable
|
||||
*baseClient
|
||||
}
|
||||
|
||||
|
@ -113,11 +109,13 @@ func newSentinel(clOpt *Options) *sentinelClient {
|
|||
dialer := func() (net.Conn, error) {
|
||||
return net.DialTimeout("tcp", clOpt.Addr, opt.DialTimeout)
|
||||
}
|
||||
return &sentinelClient{
|
||||
baseClient: &baseClient{
|
||||
base := &baseClient{
|
||||
opt: opt,
|
||||
connPool: newConnPool(newConnFunc(dialer), opt),
|
||||
},
|
||||
}
|
||||
return &sentinelClient{
|
||||
baseClient: base,
|
||||
commandable: commandable{process: base.process},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue