Add Redis Cluster support.

This commit is contained in:
Dimitrij Denissenko 2015-01-24 12:12:48 +00:00 committed by Vladimir Mihailenco
parent 78cf6f5eae
commit c21e5f3255
19 changed files with 1262 additions and 325 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.rdb
.test/

9
.test/redis.conf Normal file
View File

@ -0,0 +1,9 @@
# Minimal redis.conf
port 6379
daemonize no
dir .
save ""
appendonly yes
cluster-config-file nodes.conf
cluster-node-timeout 30000

View File

@ -1,4 +1,17 @@
all:
go test ./...
go test ./... -cpu=2
go test ./... -short -race
all: testdeps
go test ./... -v 1 -ginkgo.slowSpecThreshold=10 -cpu=1,2,4
go test ./... -ginkgo.slowSpecThreshold=10 -short -race
test: testdeps
go test ./... -v 1 -ginkgo.slowSpecThreshold=10
testdeps: .test/redis/src/redis-server
.PHONY: all test testdeps
.test/redis:
mkdir -p $@
wget -qO- https://github.com/antirez/redis/archive/3.0.tar.gz | tar xvz --strip-components=1 -C $@
.test/redis/src/redis-server: .test/redis
cd $< && make all

303
cluster.go Normal file
View File

@ -0,0 +1,303 @@
package redis
import (
"errors"
"io"
"math/rand"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
type ClusterClient struct {
commandable
addrs map[string]struct{}
slots [][]string
conns map[string]*Client
opt *ClusterOptions
// Protect addrs, slots and conns cache
cachemx sync.RWMutex
_reload uint32
}
// NewClusterClient initializes a new cluster-aware client using given options.
// A list of seed addresses must be provided.
func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) {
addrs, err := opt.getAddrSet()
if err != nil {
return nil, err
}
client := &ClusterClient{
addrs: addrs,
conns: make(map[string]*Client),
opt: opt,
_reload: 1,
}
client.commandable.process = client.process
client.reloadIfDue()
return client, nil
}
// Close closes the cluster connection
func (c *ClusterClient) Close() error {
c.cachemx.Lock()
defer c.cachemx.Unlock()
return c.reset()
}
// ------------------------------------------------------------------------
// Finds the current master address for a given hash slot
func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string {
if addrs := c.slots[hashSlot]; len(addrs) > 0 {
return addrs[0]
}
return ""
}
// Returns a node's client for a given address
func (c *ClusterClient) getNodeClientByAddr(addr string) *Client {
client, ok := c.conns[addr]
if !ok {
opt := c.opt.clientOptions()
opt.Addr = addr
client = NewTCPClient(opt)
c.conns[addr] = client
}
return client
}
// Process a command
func (c *ClusterClient) process(cmd Cmder) {
var ask bool
c.reloadIfDue()
hashSlot := HashSlot(cmd.clusterKey())
c.cachemx.RLock()
defer c.cachemx.RUnlock()
tried := make(map[string]struct{}, len(c.addrs))
addr := c.getMasterAddrBySlot(hashSlot)
for attempt := 0; attempt < c.opt.getMaxRedirects(); attempt++ {
tried[addr] = struct{}{}
// Pick the connection, process request
conn := c.getNodeClientByAddr(addr)
if ask {
pipe := conn.Pipeline()
pipe.Process(NewCmd("ASKING"))
pipe.Process(cmd)
_, _ = pipe.Exec()
ask = false
} else {
conn.Process(cmd)
}
// If there is no (real) error, we are done!
err := cmd.Err()
if err == nil || err == Nil {
return
}
// On connection errors, pick a random, previosuly untried connection
// and request again.
if _, ok := err.(*net.OpError); ok || err == io.EOF {
if addr = c.findNextAddr(tried); addr == "" {
return
}
cmd.reset()
continue
}
// Check the error message, return if unexpected
parts := strings.SplitN(err.Error(), " ", 3)
if len(parts) != 3 {
return
}
// Handle MOVE and ASK redirections, return on any other error
switch parts[0] {
case "MOVED":
c.forceReload()
addr = parts[2]
case "ASK":
ask = true
addr = parts[2]
default:
return
}
cmd.reset()
}
}
// Closes all connections and reloads slot cache, if due
func (c *ClusterClient) reloadIfDue() (err error) {
if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) {
return
}
var infos []ClusterSlotInfo
c.cachemx.Lock()
defer c.cachemx.Unlock()
// Try known addresses in random order (map interation order is random in Go)
// http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections
// https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157
for addr := range c.addrs {
c.reset()
infos, err = c.fetchClusterSlots(addr)
if err == nil {
c.update(infos)
break
}
}
return
}
// Closes all connections and flushes slots cache
func (c *ClusterClient) reset() (err error) {
for addr, client := range c.conns {
if e := client.Close(); e != nil {
err = e
}
delete(c.conns, addr)
}
c.slots = make([][]string, hashSlots)
return
}
// Forces a cache reload on next request
func (c *ClusterClient) forceReload() {
atomic.StoreUint32(&c._reload, 1)
}
// Find the next untried address
func (c *ClusterClient) findNextAddr(tried map[string]struct{}) string {
for addr := range c.addrs {
if _, ok := tried[addr]; !ok {
return addr
}
}
return ""
}
// Fetch slot information
func (c *ClusterClient) fetchClusterSlots(addr string) ([]ClusterSlotInfo, error) {
opt := c.opt.clientOptions()
opt.Addr = addr
client := NewClient(opt)
defer client.Close()
return client.ClusterSlots().Result()
}
// Update slot information, populate slots
func (c *ClusterClient) update(infos []ClusterSlotInfo) {
for _, info := range infos {
for i := info.Start; i <= info.End; i++ {
c.slots[i] = info.Addrs
}
for _, addr := range info.Addrs {
c.addrs[addr] = struct{}{}
}
}
}
//------------------------------------------------------------------------------
var errNoAddrs = errors.New("redis: no addresses")
type ClusterOptions struct {
// A seed-list of host:port addresses of known cluster nodes
Addrs []string
// An optional password
Password string
// The maximum number of MOVED/ASK redirects to follow, before
// giving up. Default: 16
MaxRedirects int
// The maximum number of TCP sockets per connection. Default: 5
PoolSize int
// Timeout settings
DialTimeout, ReadTimeout, WriteTimeout, IdleTimeout time.Duration
}
func (opt *ClusterOptions) getPoolSize() int {
if opt.PoolSize < 1 {
return 5
}
return opt.PoolSize
}
func (opt *ClusterOptions) getDialTimeout() time.Duration {
if opt.DialTimeout == 0 {
return 5 * time.Second
}
return opt.DialTimeout
}
func (opt *ClusterOptions) getMaxRedirects() int {
if opt.MaxRedirects < 1 {
return 16
}
return opt.MaxRedirects
}
func (opt *ClusterOptions) getAddrSet() (map[string]struct{}, error) {
size := len(opt.Addrs)
if size < 1 {
return nil, errNoAddrs
}
addrs := make(map[string]struct{}, size)
for _, addr := range opt.Addrs {
addrs[addr] = struct{}{}
}
return addrs, nil
}
func (opt *ClusterOptions) clientOptions() *Options {
return &Options{
DB: 0,
Password: opt.Password,
DialTimeout: opt.getDialTimeout(),
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.getPoolSize(),
IdleTimeout: opt.IdleTimeout,
}
}
//------------------------------------------------------------------------------
const hashSlots = 16384
// HashSlot returns a consistent slot number between 0 and 16383
// for any given string key
func HashSlot(key string) int {
if s := strings.IndexByte(key, '{'); s > -1 {
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
key = key[s+1 : s+e+1]
}
}
if key == "" {
return rand.Intn(hashSlots)
}
return int(crc16sum(key)) % hashSlots
}

95
cluster_client_test.go Normal file
View File

@ -0,0 +1,95 @@
package redis
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("ClusterClient", func() {
var subject *ClusterClient
var populate = func() {
subject.reset()
subject.update([]ClusterSlotInfo{
{0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}},
{12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}},
{4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}},
{8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}},
})
}
BeforeEach(func() {
var err error
subject, err = NewClusterClient(&ClusterOptions{
Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"},
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
subject.Close()
})
It("should initialize", func() {
Expect(subject.addrs).To(HaveLen(3))
Expect(subject.slots).To(HaveLen(hashSlots))
Expect(subject._reload).To(Equal(uint32(0)))
})
It("should update slots cache", func() {
populate()
Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
Expect(subject.addrs).To(Equal(map[string]struct{}{
"127.0.0.1:6379": struct{}{},
"127.0.0.1:7000": struct{}{},
"127.0.0.1:7001": struct{}{},
"127.0.0.1:7002": struct{}{},
"127.0.0.1:7003": struct{}{},
"127.0.0.1:7004": struct{}{},
"127.0.0.1:7005": struct{}{},
"127.0.0.1:7006": struct{}{},
"127.0.0.1:7007": struct{}{},
}))
})
It("should find next addresses", func() {
populate()
seen := map[string]struct{}{
"127.0.0.1:7000": struct{}{},
"127.0.0.1:7001": struct{}{},
"127.0.0.1:7003": struct{}{},
}
addr := subject.findNextAddr(seen)
for addr != "" {
seen[addr] = struct{}{}
addr = subject.findNextAddr(seen)
}
Expect(subject.findNextAddr(seen)).To(Equal(""))
Expect(seen).To(Equal(map[string]struct{}{
"127.0.0.1:6379": struct{}{},
"127.0.0.1:7000": struct{}{},
"127.0.0.1:7001": struct{}{},
"127.0.0.1:7002": struct{}{},
"127.0.0.1:7003": struct{}{},
"127.0.0.1:7004": struct{}{},
"127.0.0.1:7005": struct{}{},
"127.0.0.1:7006": struct{}{},
"127.0.0.1:7007": struct{}{},
}))
})
It("should check if reload is due", func() {
subject._reload = 0
Expect(subject._reload).To(Equal(uint32(0)))
subject.forceReload()
Expect(subject._reload).To(Equal(uint32(1)))
})
})

232
cluster_test.go Normal file
View File

@ -0,0 +1,232 @@
package redis_test
import (
"math/rand"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"gopkg.in/redis.v2"
)
var _ = Describe("Cluster", func() {
var scenario = &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIDs: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
BeforeSuite(func() {
// Start processes, connect individual clients
for pos, port := range scenario.ports {
process, err := startRedis(port, "--cluster-enabled", "yes")
Expect(err).NotTo(HaveOccurred())
client := redis.NewClient(&redis.Options{Addr: "127.0.0.1:" + port})
info, err := client.ClusterNodes().Result()
Expect(err).NotTo(HaveOccurred())
scenario.processes[port] = process
scenario.clients[port] = client
scenario.nodeIDs[pos] = info[:40]
}
// Meet cluster nodes
for _, client := range scenario.clients {
err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
Expect(err).NotTo(HaveOccurred())
}
// Bootstrap masters
slots := []int{0, 5000, 10000, 16384}
for pos, client := range scenario.masters() {
err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
Expect(err).NotTo(HaveOccurred())
}
// Bootstrap slaves
for pos, client := range scenario.slaves() {
masterID := scenario.nodeIDs[pos]
Eventually(func() string { // Wait for masters
return client.ClusterNodes().Val()
}, "10s").Should(ContainSubstring(masterID))
err := client.ClusterReplicate(masterID).Err()
Expect(err).NotTo(HaveOccurred())
Eventually(func() string { // Wait for slaves
return scenario.primary().ClusterNodes().Val()
}, "10s").Should(ContainSubstring("slave " + masterID))
}
// Wait for cluster state to turn OK
for _, client := range scenario.clients {
Eventually(func() string {
return client.ClusterInfo().Val()
}, "10s").Should(ContainSubstring("cluster_state:ok"))
}
})
AfterSuite(func() {
for _, client := range scenario.clients {
client.Close()
}
for _, process := range scenario.processes {
process.Close()
}
})
Describe("HashSlot", func() {
It("should calculate hash slots", func() {
tests := []struct {
key string
slot int
}{
{"123456789", 12739},
{"{}foo", 9500},
{"foo{}", 5542},
{"foo{}{bar}", 8363},
{"", 10503},
{"", 5176},
{string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 5463},
}
rand.Seed(100)
for _, test := range tests {
Expect(redis.HashSlot(test.key)).To(Equal(test.slot), "for %s", test.key)
}
})
It("should extract keys from tags", func() {
tests := []struct {
one, two string
}{
{"foo{bar}", "bar"},
{"{foo}bar", "foo"},
{"{user1000}.following", "{user1000}.followers"},
{"foo{{bar}}zap", "{bar"},
{"foo{bar}{zap}", "bar"},
}
for _, test := range tests {
Expect(redis.HashSlot(test.one)).To(Equal(redis.HashSlot(test.two)), "for %s <-> %s", test.one, test.two)
}
})
})
Describe("Commands", func() {
It("should CLUSTER SLOTS", func() {
res, err := scenario.primary().ClusterSlots().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(3))
Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{
{0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}},
{5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}},
{10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}},
}))
})
It("should CLUSTER NODES", func() {
res, err := scenario.primary().ClusterNodes().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res)).To(BeNumerically(">", 400))
})
It("should CLUSTER INFO", func() {
res, err := scenario.primary().ClusterInfo().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
})
})
Describe("Client", func() {
var client *redis.ClusterClient
BeforeEach(func() {
var err error
client, err = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"},
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
for _, client := range scenario.clients {
client.FlushDb()
}
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should GET/SET/DEL", func() {
val, err := client.Get("A").Result()
Expect(err).To(Equal(redis.Nil))
Expect(val).To(Equal(""))
val, err = client.Set("A", "VALUE").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("OK"))
val, err = client.Get("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("VALUE"))
cnt, err := client.Del("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(cnt).To(Equal(int64(1)))
})
It("should follow redirects", func() {
Expect(client.Set("A", "VALUE").Err()).NotTo(HaveOccurred())
Expect(redis.HashSlot("A")).To(Equal(6373))
// Slot 6373 is stored on the second node
defer func() {
scenario.masters()[1].ClusterFailover()
}()
slave := scenario.slaves()[1]
Expect(slave.ClusterFailover().Err()).NotTo(HaveOccurred())
Eventually(func() string {
return slave.Info().Val()
}, "10s", "200ms").Should(ContainSubstring("role:master"))
val, err := client.Get("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("VALUE"))
})
})
})
// --------------------------------------------------------------------
type clusterScenario struct {
ports []string
nodeIDs []string
processes map[string]*redisProcess
clients map[string]*redis.Client
}
func (s *clusterScenario) primary() *redis.Client {
return s.clients[s.ports[0]]
}
func (s *clusterScenario) masters() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[:3] {
result[pos] = s.clients[port]
}
return result
}
func (s *clusterScenario) slaves() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[3:] {
result[pos] = s.clients[port]
}
return result
}

View File

@ -24,18 +24,19 @@ var (
_ Cmder = (*StringIntMapCmd)(nil)
_ Cmder = (*ZSliceCmd)(nil)
_ Cmder = (*ScanCmd)(nil)
_ Cmder = (*ClusterSlotCmd)(nil)
)
type Cmder interface {
args() []string
parseReply(*bufio.Reader) error
setErr(error)
reset()
writeTimeout() *time.Duration
readTimeout() *time.Duration
clusterKey() string
// Reset resets internal state of the command.
Reset()
Err() error
String() string
}
@ -65,13 +66,9 @@ type baseCmd struct {
err error
_writeTimeout, _readTimeout *time.Duration
}
_clusterKeyPos int
func newBaseCmd(args ...string) *baseCmd {
return &baseCmd{
_args: args,
}
_writeTimeout, _readTimeout *time.Duration
}
func (cmd *baseCmd) Err() error {
@ -97,6 +94,13 @@ func (cmd *baseCmd) writeTimeout() *time.Duration {
return cmd._writeTimeout
}
func (cmd *baseCmd) clusterKey() string {
if cmd._clusterKeyPos > 0 && cmd._clusterKeyPos < len(cmd._args) {
return cmd._args[cmd._clusterKeyPos]
}
return ""
}
func (cmd *baseCmd) setWriteTimeout(d time.Duration) {
cmd._writeTimeout = &d
}
@ -108,18 +112,16 @@ func (cmd *baseCmd) setErr(e error) {
//------------------------------------------------------------------------------
type Cmd struct {
*baseCmd
baseCmd
val interface{}
}
func NewCmd(args ...string) *Cmd {
return &Cmd{
baseCmd: newBaseCmd(args...),
}
return &Cmd{baseCmd: baseCmd{_args: args}}
}
func (cmd *Cmd) Reset() {
func (cmd *Cmd) reset() {
cmd.val = nil
cmd.err = nil
}
@ -144,18 +146,16 @@ func (cmd *Cmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type SliceCmd struct {
*baseCmd
baseCmd
val []interface{}
}
func NewSliceCmd(args ...string) *SliceCmd {
return &SliceCmd{
baseCmd: newBaseCmd(args...),
}
return &SliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *SliceCmd) Reset() {
func (cmd *SliceCmd) reset() {
cmd.val = nil
cmd.err = nil
}
@ -185,18 +185,20 @@ func (cmd *SliceCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type StatusCmd struct {
*baseCmd
baseCmd
val string
}
func NewStatusCmd(args ...string) *StatusCmd {
return &StatusCmd{
baseCmd: newBaseCmd(args...),
}
return &StatusCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *StatusCmd) Reset() {
func newKeylessStatusCmd(args ...string) *StatusCmd {
return &StatusCmd{baseCmd: baseCmd{_args: args}}
}
func (cmd *StatusCmd) reset() {
cmd.val = ""
cmd.err = nil
}
@ -226,18 +228,16 @@ func (cmd *StatusCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type IntCmd struct {
*baseCmd
baseCmd
val int64
}
func NewIntCmd(args ...string) *IntCmd {
return &IntCmd{
baseCmd: newBaseCmd(args...),
}
return &IntCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *IntCmd) Reset() {
func (cmd *IntCmd) reset() {
cmd.val = 0
cmd.err = nil
}
@ -267,7 +267,7 @@ func (cmd *IntCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type DurationCmd struct {
*baseCmd
baseCmd
val time.Duration
precision time.Duration
@ -275,12 +275,12 @@ type DurationCmd struct {
func NewDurationCmd(precision time.Duration, args ...string) *DurationCmd {
return &DurationCmd{
baseCmd: newBaseCmd(args...),
precision: precision,
baseCmd: baseCmd{_args: args, _clusterKeyPos: 1},
}
}
func (cmd *DurationCmd) Reset() {
func (cmd *DurationCmd) reset() {
cmd.val = 0
cmd.err = nil
}
@ -310,18 +310,16 @@ func (cmd *DurationCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type BoolCmd struct {
*baseCmd
baseCmd
val bool
}
func NewBoolCmd(args ...string) *BoolCmd {
return &BoolCmd{
baseCmd: newBaseCmd(args...),
}
return &BoolCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *BoolCmd) Reset() {
func (cmd *BoolCmd) reset() {
cmd.val = false
cmd.err = nil
}
@ -351,18 +349,16 @@ func (cmd *BoolCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type StringCmd struct {
*baseCmd
baseCmd
val string
}
func NewStringCmd(args ...string) *StringCmd {
return &StringCmd{
baseCmd: newBaseCmd(args...),
}
return &StringCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *StringCmd) Reset() {
func (cmd *StringCmd) reset() {
cmd.val = ""
cmd.err = nil
}
@ -413,18 +409,16 @@ func (cmd *StringCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type FloatCmd struct {
*baseCmd
baseCmd
val float64
}
func NewFloatCmd(args ...string) *FloatCmd {
return &FloatCmd{
baseCmd: newBaseCmd(args...),
}
return &FloatCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *FloatCmd) Reset() {
func (cmd *FloatCmd) reset() {
cmd.val = 0
cmd.err = nil
}
@ -450,18 +444,16 @@ func (cmd *FloatCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type StringSliceCmd struct {
*baseCmd
baseCmd
val []string
}
func NewStringSliceCmd(args ...string) *StringSliceCmd {
return &StringSliceCmd{
baseCmd: newBaseCmd(args...),
}
return &StringSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *StringSliceCmd) Reset() {
func (cmd *StringSliceCmd) reset() {
cmd.val = nil
cmd.err = nil
}
@ -491,18 +483,16 @@ func (cmd *StringSliceCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type BoolSliceCmd struct {
*baseCmd
baseCmd
val []bool
}
func NewBoolSliceCmd(args ...string) *BoolSliceCmd {
return &BoolSliceCmd{
baseCmd: newBaseCmd(args...),
}
return &BoolSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *BoolSliceCmd) Reset() {
func (cmd *BoolSliceCmd) reset() {
cmd.val = nil
cmd.err = nil
}
@ -532,18 +522,16 @@ func (cmd *BoolSliceCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type StringStringMapCmd struct {
*baseCmd
baseCmd
val map[string]string
}
func NewStringStringMapCmd(args ...string) *StringStringMapCmd {
return &StringStringMapCmd{
baseCmd: newBaseCmd(args...),
}
return &StringStringMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *StringStringMapCmd) Reset() {
func (cmd *StringStringMapCmd) reset() {
cmd.val = nil
cmd.err = nil
}
@ -573,15 +561,13 @@ func (cmd *StringStringMapCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type StringIntMapCmd struct {
*baseCmd
baseCmd
val map[string]int64
}
func NewStringIntMapCmd(args ...string) *StringIntMapCmd {
return &StringIntMapCmd{
baseCmd: newBaseCmd(args...),
}
return &StringIntMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *StringIntMapCmd) Val() map[string]int64 {
@ -596,6 +582,11 @@ func (cmd *StringIntMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringIntMapCmd) reset() {
cmd.val = nil
cmd.err = nil
}
func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error {
v, err := parseReply(rd, parseStringIntMap)
if err != nil {
@ -609,18 +600,16 @@ func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type ZSliceCmd struct {
*baseCmd
baseCmd
val []Z
}
func NewZSliceCmd(args ...string) *ZSliceCmd {
return &ZSliceCmd{
baseCmd: newBaseCmd(args...),
}
return &ZSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *ZSliceCmd) Reset() {
func (cmd *ZSliceCmd) reset() {
cmd.val = nil
cmd.err = nil
}
@ -650,19 +639,17 @@ func (cmd *ZSliceCmd) parseReply(rd *bufio.Reader) error {
//------------------------------------------------------------------------------
type ScanCmd struct {
*baseCmd
baseCmd
cursor int64
keys []string
}
func NewScanCmd(args ...string) *ScanCmd {
return &ScanCmd{
baseCmd: newBaseCmd(args...),
}
return &ScanCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *ScanCmd) Reset() {
func (cmd *ScanCmd) reset() {
cmd.cursor = 0
cmd.keys = nil
cmd.err = nil
@ -700,3 +687,47 @@ func (cmd *ScanCmd) parseReply(rd *bufio.Reader) error {
return nil
}
//------------------------------------------------------------------------------
type ClusterSlotInfo struct {
Start, End int
Addrs []string
}
type ClusterSlotCmd struct {
baseCmd
val []ClusterSlotInfo
}
func NewClusterSlotCmd(args ...string) *ClusterSlotCmd {
return &ClusterSlotCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *ClusterSlotCmd) Val() []ClusterSlotInfo {
return cmd.val
}
func (cmd *ClusterSlotCmd) Result() ([]ClusterSlotInfo, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *ClusterSlotCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ClusterSlotCmd) reset() {
cmd.val = nil
cmd.err = nil
}
func (cmd *ClusterSlotCmd) parseReply(rd *bufio.Reader) error {
v, err := parseReply(rd, parseClusterSlotInfoSlice)
if err != nil {
cmd.err = err
return err
}
cmd.val = v.([]ClusterSlotInfo)
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -65,7 +65,7 @@ var _ = Describe("Commands", func() {
// workaround for "ERR Can't BGSAVE while AOF log rewriting is in progress"
Eventually(func() string {
return client.BgSave().Val()
}).Should(Equal("Background saving started"))
}, "10s").Should(Equal("Background saving started"))
})
It("should ClientKill", func() {
@ -119,7 +119,7 @@ var _ = Describe("Commands", func() {
// workaround for "ERR Background save already in progress"
Eventually(func() string {
return client.Save().Val()
}).Should(Equal("OK"))
}, "10s").Should(Equal("OK"))
})
It("should SlaveOf", func() {

47
crc16.go Normal file
View File

@ -0,0 +1,47 @@
package redis
// CRC16 implementation according to CCITT standards.
// Copyright 2001-2010 Georges Menie (www.menie.org)
// Copyright 2013 The Go Authors. All rights reserved.
// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c
var crc16tab = [256]uint16{
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
}
func crc16sum(key string) (crc uint16) {
for i := 0; i < len(key); i++ {
crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
}
return
}

25
crc16_test.go Normal file
View File

@ -0,0 +1,25 @@
package redis
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("CRC16", func() {
// http://redis.io/topics/cluster-spec#keys-distribution-model
It("should calculate CRC16", func() {
tests := []struct {
s string
n uint16
}{
{"123456789", 0x31C3},
{string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 21847},
}
for _, test := range tests {
Expect(crc16sum(test.s)).To(Equal(test.n), "for %s", test.s)
}
})
})

View File

@ -9,17 +9,25 @@ var errDiscard = errors.New("redis: Discard can be used only inside Exec")
// Not thread-safe.
type Multi struct {
*Client
commandable
base *baseClient
cmds []Cmder
}
func (c *Client) Multi() *Multi {
return &Multi{
Client: &Client{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, true),
},
},
multi := &Multi{
base: &baseClient{opt: c.opt, connPool: newSingleConnPool(c.connPool, true)},
}
multi.commandable.process = multi.process
return multi
}
func (c *Multi) process(cmd Cmder) {
if c.cmds == nil {
c.base.process(cmd)
} else {
c.cmds = append(c.cmds, cmd)
}
}
@ -27,7 +35,7 @@ func (c *Multi) Close() error {
if err := c.Unwatch().Err(); err != nil {
return err
}
return c.Client.Close()
return c.base.Close()
}
func (c *Multi) Watch(keys ...string) *StatusCmd {
@ -69,7 +77,7 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) {
return []Cmder{}, nil
}
cn, err := c.conn()
cn, err := c.base.conn()
if err != nil {
setCmdsErr(cmds[1:len(cmds)-1], err)
return cmds[1 : len(cmds)-1], err
@ -77,16 +85,16 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) {
err = c.execCmds(cn, cmds)
if err != nil {
c.freeConn(cn, err)
c.base.freeConn(cn, err)
return cmds[1 : len(cmds)-1], err
}
c.putConn(cn)
c.base.putConn(cn)
return cmds[1 : len(cmds)-1], nil
}
func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
err := c.writeCmd(cn, cmds...)
err := cn.writeCmds(cmds...)
if err != nil {
setCmdsErr(cmds[1:len(cmds)-1], err)
return err

View File

@ -3,6 +3,7 @@ package redis
import (
"errors"
"fmt"
"net"
"strconv"
"gopkg.in/bufio.v1"
@ -292,3 +293,50 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) {
}
return zz, nil
}
func parseClusterSlotInfoSlice(rd *bufio.Reader, n int64) (interface{}, error) {
infos := make([]ClusterSlotInfo, 0, n)
for i := int64(0); i < n; i++ {
viface, err := parseReply(rd, parseSlice)
if err != nil {
return nil, err
}
item, ok := viface.([]interface{})
if !ok {
return nil, fmt.Errorf("got %T, expected []interface{}", viface)
} else if len(item) < 3 {
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
}
start, ok := item[0].(int64)
if !ok || start < 0 || start > hashSlots {
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
}
end, ok := item[1].(int64)
if !ok || end < 0 || end > hashSlots {
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
}
info := ClusterSlotInfo{int(start), int(end), make([]string, len(item)-2)}
for n, ipair := range item[2:] {
pair, ok := ipair.([]interface{})
if !ok || len(pair) != 2 {
return nil, fmt.Errorf("got %v, expected []interface{host, port}", viface)
}
ip, ok := pair[0].(string)
if !ok || len(ip) < 1 {
return nil, fmt.Errorf("got %v, expected IP PORT pair", pair)
}
port, ok := pair[1].(int64)
if !ok || port < 1 {
return nil, fmt.Errorf("got %v, expected IP PORT pair", pair)
}
info.Addrs[n] = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
}
infos = append(infos, info)
}
return infos, nil
}

View File

@ -2,22 +2,23 @@ package redis
// Not thread-safe.
type Pipeline struct {
*Client
commandable
cmds []Cmder
client *baseClient
closed bool
}
func (c *Client) Pipeline() *Pipeline {
return &Pipeline{
Client: &Client{
baseClient: &baseClient{
opt: c.opt,
connPool: c.connPool,
cmds: make([]Cmder, 0),
},
pipe := &Pipeline{
client: &baseClient{
opt: c.opt,
connPool: c.connPool,
},
cmds: make([]Cmder, 0, 10),
}
pipe.commandable.process = pipe.process
return pipe
}
func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
@ -30,6 +31,10 @@ func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
return cmds, err
}
func (c *Pipeline) process(cmd Cmder) {
c.cmds = append(c.cmds, cmd)
}
func (c *Pipeline) Close() error {
c.closed = true
return nil
@ -51,29 +56,29 @@ func (c *Pipeline) Exec() ([]Cmder, error) {
}
cmds := c.cmds
c.cmds = make([]Cmder, 0)
c.cmds = make([]Cmder, 0, 0)
if len(cmds) == 0 {
return []Cmder{}, nil
}
cn, err := c.conn()
cn, err := c.client.conn()
if err != nil {
setCmdsErr(cmds, err)
return cmds, err
}
if err := c.execCmds(cn, cmds); err != nil {
c.freeConn(cn, err)
c.client.freeConn(cn, err)
return cmds, err
}
c.putConn(cn)
c.client.putConn(cn)
return cmds, nil
}
func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
if err := c.writeCmd(cn, cmds...); err != nil {
if err := cn.writeCmds(cmds...); err != nil {
setCmdsErr(cmds, err)
return err
}

10
pool.go
View File

@ -58,6 +58,16 @@ func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
}
}
func (cn *conn) writeCmds(cmds ...Cmder) error {
buf := cn.buf[:0]
for _, cmd := range cmds {
buf = appendArgs(buf, cmd.args())
}
_, err := cn.Write(buf)
return err
}
func (cn *conn) Read(b []byte) (int, error) {
if cn.readTimeout != 0 {
cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))

View File

@ -103,7 +103,7 @@ func (c *PubSub) subscribe(cmd string, channels ...string) error {
args := append([]string{cmd}, channels...)
req := NewSliceCmd(args...)
return c.writeCmd(cn, req)
return cn.writeCmds(req)
}
func (c *PubSub) Subscribe(channels ...string) error {
@ -122,7 +122,7 @@ func (c *PubSub) unsubscribe(cmd string, channels ...string) error {
args := append([]string{cmd}, channels...)
req := NewSliceCmd(args...)
return c.writeCmd(cn, req)
return cn.writeCmds(req)
}
func (c *PubSub) Unsubscribe(channels ...string) error {

View File

@ -9,17 +9,6 @@ import (
type baseClient struct {
connPool pool
opt *options
cmds []Cmder
}
func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
buf := cn.buf[:0]
for _, cmd := range cmds {
buf = appendArgs(buf, cmd.args())
}
_, err := cn.Write(buf)
return err
}
func (c *baseClient) conn() (*conn, error) {
@ -47,12 +36,7 @@ func (c *baseClient) initConn(cn *conn) error {
pool.SetConn(cn)
// Client is not closed because we want to reuse underlying connection.
client := &Client{
baseClient: &baseClient{
opt: c.opt,
connPool: pool,
},
}
client := newClient(c.opt, pool)
if c.opt.Password != "" {
if err := client.Auth(c.opt.Password).Err(); err != nil {
@ -91,15 +75,7 @@ func (c *baseClient) putConn(cn *conn) {
}
}
func (c *baseClient) Process(cmd Cmder) {
if c.cmds == nil {
c.run(cmd)
} else {
c.cmds = append(c.cmds, cmd)
}
}
func (c *baseClient) run(cmd Cmder) {
func (c *baseClient) process(cmd Cmder) {
cn, err := c.conn()
if err != nil {
cmd.setErr(err)
@ -118,7 +94,7 @@ func (c *baseClient) run(cmd Cmder) {
cn.readTimeout = c.opt.ReadTimeout
}
if err := c.writeCmd(cn, cmd); err != nil {
if err := cn.writeCmds(cmd); err != nil {
c.freeConn(cn, err)
cmd.setErr(err)
return
@ -237,8 +213,19 @@ func (opt *Options) options() *options {
}
}
//------------------------------------------------------------------------------
type Client struct {
*baseClient
commandable
}
func newClient(opt *options, pool pool) *Client {
base := &baseClient{opt: opt, connPool: pool}
return &Client{
baseClient: base,
commandable: commandable{process: base.process},
}
}
func NewClient(clOpt *Options) *Client {
@ -249,12 +236,7 @@ func NewClient(clOpt *Options) *Client {
return net.DialTimeout(clOpt.getNetwork(), clOpt.Addr, opt.DialTimeout)
}
}
return &Client{
baseClient: &baseClient{
opt: opt,
connPool: newConnPool(newConnFunc(dialer), opt),
},
}
return newClient(opt, newConnPool(newConnFunc(dialer), opt))
}
// Deprecated. Use NewClient instead.

View File

@ -4,6 +4,7 @@ import (
"net"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
@ -126,7 +127,7 @@ func TestGinkgoSuite(t *testing.T) {
func execCmd(name string, args ...string) (*os.Process, error) {
cmd := exec.Command(name, args...)
if true {
if false {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
@ -138,12 +139,12 @@ func connectTo(port string) (client *redis.Client, err error) {
Addr: ":" + port,
})
deadline := time.Now().Add(time.Second)
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if err = client.Ping().Err(); err == nil {
return client, nil
}
time.Sleep(100 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
}
return nil, err
@ -159,11 +160,38 @@ func (p *redisProcess) Close() error {
return p.Kill()
}
var (
redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server"))
redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf"))
)
func redisDir(port string) (string, error) {
dir, err := filepath.Abs(filepath.Join(".test", "instances", port))
if err != nil {
return "", err
} else if err = os.RemoveAll(dir); err != nil {
return "", err
} else if err = os.MkdirAll(dir, 0775); err != nil {
return "", err
}
return dir, nil
}
func startRedis(port string, args ...string) (*redisProcess, error) {
process, err := execCmd("redis-server", append([]string{"--port", port}, args...)...)
dir, err := redisDir(port)
if err != nil {
return nil, err
}
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
return nil, err
}
baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
@ -173,7 +201,11 @@ func startRedis(port string, args ...string) (*redisProcess, error) {
}
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
process, err := execCmd("redis-server", os.DevNull, "--sentinel", "--port", port)
dir, err := redisDir(port)
if err != nil {
return nil, err
}
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
if err != nil {
return nil, err
}

View File

@ -92,17 +92,13 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt,
}
return &Client{
baseClient: &baseClient{
opt: opt,
connPool: failover.Pool(),
},
}
return newClient(opt, failover.Pool())
}
//------------------------------------------------------------------------------
type sentinelClient struct {
commandable
*baseClient
}
@ -113,11 +109,13 @@ func newSentinel(clOpt *Options) *sentinelClient {
dialer := func() (net.Conn, error) {
return net.DialTimeout("tcp", clOpt.Addr, opt.DialTimeout)
}
base := &baseClient{
opt: opt,
connPool: newConnPool(newConnFunc(dialer), opt),
}
return &sentinelClient{
baseClient: &baseClient{
opt: opt,
connPool: newConnPool(newConnFunc(dialer), opt),
},
baseClient: base,
commandable: commandable{process: base.process},
}
}