From c21e5f325511e88f6f7fe97efdc69112d6c96ab6 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Sat, 24 Jan 2015 12:12:48 +0000 Subject: [PATCH] Add Redis Cluster support. --- .gitignore | 1 + .test/redis.conf | 9 + Makefile | 21 +- cluster.go | 303 +++++++++++++++++++++++++++++ cluster_client_test.go | 95 +++++++++ cluster_test.go | 232 ++++++++++++++++++++++ command.go | 181 ++++++++++------- commands.go | 432 +++++++++++++++++++++++++---------------- commands_test.go | 4 +- crc16.go | 47 +++++ crc16_test.go | 25 +++ multi.go | 34 ++-- parser.go | 48 +++++ pipeline.go | 33 ++-- pool.go | 10 + pubsub.go | 4 +- redis.go | 48 ++--- redis_test.go | 42 +++- sentinel.go | 18 +- 19 files changed, 1262 insertions(+), 325 deletions(-) create mode 100644 .test/redis.conf create mode 100644 cluster.go create mode 100644 cluster_client_test.go create mode 100644 cluster_test.go create mode 100644 crc16.go create mode 100644 crc16_test.go diff --git a/.gitignore b/.gitignore index 58c2a9d6..5959942e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.rdb +.test/ diff --git a/.test/redis.conf b/.test/redis.conf new file mode 100644 index 00000000..016fa0a7 --- /dev/null +++ b/.test/redis.conf @@ -0,0 +1,9 @@ +# Minimal redis.conf + +port 6379 +daemonize no +dir . +save "" +appendonly yes +cluster-config-file nodes.conf +cluster-node-timeout 30000 diff --git a/Makefile b/Makefile index 62a51b4c..33dc9733 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,17 @@ -all: - go test ./... - go test ./... -cpu=2 - go test ./... -short -race +all: testdeps + go test ./... -v 1 -ginkgo.slowSpecThreshold=10 -cpu=1,2,4 + go test ./... -ginkgo.slowSpecThreshold=10 -short -race + +test: testdeps + go test ./... -v 1 -ginkgo.slowSpecThreshold=10 + +testdeps: .test/redis/src/redis-server + +.PHONY: all test testdeps + +.test/redis: + mkdir -p $@ + wget -qO- https://github.com/antirez/redis/archive/3.0.tar.gz | tar xvz --strip-components=1 -C $@ + +.test/redis/src/redis-server: .test/redis + cd $< && make all diff --git a/cluster.go b/cluster.go new file mode 100644 index 00000000..e5e1475c --- /dev/null +++ b/cluster.go @@ -0,0 +1,303 @@ +package redis + +import ( + "errors" + "io" + "math/rand" + "net" + "strings" + "sync" + "sync/atomic" + "time" +) + +type ClusterClient struct { + commandable + + addrs map[string]struct{} + slots [][]string + conns map[string]*Client + opt *ClusterOptions + + // Protect addrs, slots and conns cache + cachemx sync.RWMutex + _reload uint32 +} + +// NewClusterClient initializes a new cluster-aware client using given options. +// A list of seed addresses must be provided. +func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { + addrs, err := opt.getAddrSet() + if err != nil { + return nil, err + } + + client := &ClusterClient{ + addrs: addrs, + conns: make(map[string]*Client), + opt: opt, + _reload: 1, + } + client.commandable.process = client.process + client.reloadIfDue() + return client, nil +} + +// Close closes the cluster connection +func (c *ClusterClient) Close() error { + c.cachemx.Lock() + defer c.cachemx.Unlock() + + return c.reset() +} + +// ------------------------------------------------------------------------ + +// Finds the current master address for a given hash slot +func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string { + if addrs := c.slots[hashSlot]; len(addrs) > 0 { + return addrs[0] + } + return "" +} + +// Returns a node's client for a given address +func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { + client, ok := c.conns[addr] + if !ok { + opt := c.opt.clientOptions() + opt.Addr = addr + client = NewTCPClient(opt) + c.conns[addr] = client + } + return client +} + +// Process a command +func (c *ClusterClient) process(cmd Cmder) { + var ask bool + + c.reloadIfDue() + + hashSlot := HashSlot(cmd.clusterKey()) + + c.cachemx.RLock() + defer c.cachemx.RUnlock() + + tried := make(map[string]struct{}, len(c.addrs)) + addr := c.getMasterAddrBySlot(hashSlot) + for attempt := 0; attempt < c.opt.getMaxRedirects(); attempt++ { + tried[addr] = struct{}{} + + // Pick the connection, process request + conn := c.getNodeClientByAddr(addr) + if ask { + pipe := conn.Pipeline() + pipe.Process(NewCmd("ASKING")) + pipe.Process(cmd) + _, _ = pipe.Exec() + ask = false + } else { + conn.Process(cmd) + } + + // If there is no (real) error, we are done! + err := cmd.Err() + if err == nil || err == Nil { + return + } + + // On connection errors, pick a random, previosuly untried connection + // and request again. + if _, ok := err.(*net.OpError); ok || err == io.EOF { + if addr = c.findNextAddr(tried); addr == "" { + return + } + cmd.reset() + continue + } + + // Check the error message, return if unexpected + parts := strings.SplitN(err.Error(), " ", 3) + if len(parts) != 3 { + return + } + + // Handle MOVE and ASK redirections, return on any other error + switch parts[0] { + case "MOVED": + c.forceReload() + addr = parts[2] + case "ASK": + ask = true + addr = parts[2] + default: + return + } + cmd.reset() + } +} + +// Closes all connections and reloads slot cache, if due +func (c *ClusterClient) reloadIfDue() (err error) { + if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) { + return + } + + var infos []ClusterSlotInfo + + c.cachemx.Lock() + defer c.cachemx.Unlock() + + // Try known addresses in random order (map interation order is random in Go) + // http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections + // https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157 + for addr := range c.addrs { + c.reset() + + infos, err = c.fetchClusterSlots(addr) + if err == nil { + c.update(infos) + break + } + } + return +} + +// Closes all connections and flushes slots cache +func (c *ClusterClient) reset() (err error) { + for addr, client := range c.conns { + if e := client.Close(); e != nil { + err = e + } + delete(c.conns, addr) + } + c.slots = make([][]string, hashSlots) + return +} + +// Forces a cache reload on next request +func (c *ClusterClient) forceReload() { + atomic.StoreUint32(&c._reload, 1) +} + +// Find the next untried address +func (c *ClusterClient) findNextAddr(tried map[string]struct{}) string { + for addr := range c.addrs { + if _, ok := tried[addr]; !ok { + return addr + } + } + return "" +} + +// Fetch slot information +func (c *ClusterClient) fetchClusterSlots(addr string) ([]ClusterSlotInfo, error) { + opt := c.opt.clientOptions() + opt.Addr = addr + client := NewClient(opt) + defer client.Close() + + return client.ClusterSlots().Result() +} + +// Update slot information, populate slots +func (c *ClusterClient) update(infos []ClusterSlotInfo) { + for _, info := range infos { + for i := info.Start; i <= info.End; i++ { + c.slots[i] = info.Addrs + } + + for _, addr := range info.Addrs { + c.addrs[addr] = struct{}{} + } + } +} + +//------------------------------------------------------------------------------ + +var errNoAddrs = errors.New("redis: no addresses") + +type ClusterOptions struct { + // A seed-list of host:port addresses of known cluster nodes + Addrs []string + + // An optional password + Password string + + // The maximum number of MOVED/ASK redirects to follow, before + // giving up. Default: 16 + MaxRedirects int + + // The maximum number of TCP sockets per connection. Default: 5 + PoolSize int + + // Timeout settings + DialTimeout, ReadTimeout, WriteTimeout, IdleTimeout time.Duration +} + +func (opt *ClusterOptions) getPoolSize() int { + if opt.PoolSize < 1 { + return 5 + } + return opt.PoolSize +} + +func (opt *ClusterOptions) getDialTimeout() time.Duration { + if opt.DialTimeout == 0 { + return 5 * time.Second + } + return opt.DialTimeout +} + +func (opt *ClusterOptions) getMaxRedirects() int { + if opt.MaxRedirects < 1 { + return 16 + } + return opt.MaxRedirects +} + +func (opt *ClusterOptions) getAddrSet() (map[string]struct{}, error) { + size := len(opt.Addrs) + if size < 1 { + return nil, errNoAddrs + } + + addrs := make(map[string]struct{}, size) + for _, addr := range opt.Addrs { + addrs[addr] = struct{}{} + } + return addrs, nil +} + +func (opt *ClusterOptions) clientOptions() *Options { + return &Options{ + DB: 0, + Password: opt.Password, + + DialTimeout: opt.getDialTimeout(), + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.getPoolSize(), + IdleTimeout: opt.IdleTimeout, + } +} + +//------------------------------------------------------------------------------ + +const hashSlots = 16384 + +// HashSlot returns a consistent slot number between 0 and 16383 +// for any given string key +func HashSlot(key string) int { + if s := strings.IndexByte(key, '{'); s > -1 { + if e := strings.IndexByte(key[s+1:], '}'); e > 0 { + key = key[s+1 : s+e+1] + } + } + if key == "" { + return rand.Intn(hashSlots) + } + return int(crc16sum(key)) % hashSlots +} diff --git a/cluster_client_test.go b/cluster_client_test.go new file mode 100644 index 00000000..dbeb1b2f --- /dev/null +++ b/cluster_client_test.go @@ -0,0 +1,95 @@ +package redis + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("ClusterClient", func() { + + var subject *ClusterClient + var populate = func() { + subject.reset() + subject.update([]ClusterSlotInfo{ + {0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}}, + {12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}}, + {4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}}, + {8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}}, + }) + } + + BeforeEach(func() { + var err error + subject, err = NewClusterClient(&ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"}, + }) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + subject.Close() + }) + + It("should initialize", func() { + Expect(subject.addrs).To(HaveLen(3)) + Expect(subject.slots).To(HaveLen(hashSlots)) + Expect(subject._reload).To(Equal(uint32(0))) + }) + + It("should update slots cache", func() { + populate() + Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) + Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) + Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) + Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) + Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) + Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) + Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) + Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) + Expect(subject.addrs).To(Equal(map[string]struct{}{ + "127.0.0.1:6379": struct{}{}, + "127.0.0.1:7000": struct{}{}, + "127.0.0.1:7001": struct{}{}, + "127.0.0.1:7002": struct{}{}, + "127.0.0.1:7003": struct{}{}, + "127.0.0.1:7004": struct{}{}, + "127.0.0.1:7005": struct{}{}, + "127.0.0.1:7006": struct{}{}, + "127.0.0.1:7007": struct{}{}, + })) + }) + + It("should find next addresses", func() { + populate() + seen := map[string]struct{}{ + "127.0.0.1:7000": struct{}{}, + "127.0.0.1:7001": struct{}{}, + "127.0.0.1:7003": struct{}{}, + } + + addr := subject.findNextAddr(seen) + for addr != "" { + seen[addr] = struct{}{} + addr = subject.findNextAddr(seen) + } + Expect(subject.findNextAddr(seen)).To(Equal("")) + Expect(seen).To(Equal(map[string]struct{}{ + "127.0.0.1:6379": struct{}{}, + "127.0.0.1:7000": struct{}{}, + "127.0.0.1:7001": struct{}{}, + "127.0.0.1:7002": struct{}{}, + "127.0.0.1:7003": struct{}{}, + "127.0.0.1:7004": struct{}{}, + "127.0.0.1:7005": struct{}{}, + "127.0.0.1:7006": struct{}{}, + "127.0.0.1:7007": struct{}{}, + })) + }) + + It("should check if reload is due", func() { + subject._reload = 0 + Expect(subject._reload).To(Equal(uint32(0))) + subject.forceReload() + Expect(subject._reload).To(Equal(uint32(1))) + }) +}) diff --git a/cluster_test.go b/cluster_test.go new file mode 100644 index 00000000..803338c7 --- /dev/null +++ b/cluster_test.go @@ -0,0 +1,232 @@ +package redis_test + +import ( + "math/rand" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "gopkg.in/redis.v2" +) + +var _ = Describe("Cluster", func() { + var scenario = &clusterScenario{ + ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, + nodeIDs: make([]string, 6), + processes: make(map[string]*redisProcess, 6), + clients: make(map[string]*redis.Client, 6), + } + + BeforeSuite(func() { + // Start processes, connect individual clients + for pos, port := range scenario.ports { + process, err := startRedis(port, "--cluster-enabled", "yes") + Expect(err).NotTo(HaveOccurred()) + + client := redis.NewClient(&redis.Options{Addr: "127.0.0.1:" + port}) + info, err := client.ClusterNodes().Result() + Expect(err).NotTo(HaveOccurred()) + + scenario.processes[port] = process + scenario.clients[port] = client + scenario.nodeIDs[pos] = info[:40] + } + + // Meet cluster nodes + for _, client := range scenario.clients { + err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err() + Expect(err).NotTo(HaveOccurred()) + } + + // Bootstrap masters + slots := []int{0, 5000, 10000, 16384} + for pos, client := range scenario.masters() { + err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err() + Expect(err).NotTo(HaveOccurred()) + } + + // Bootstrap slaves + for pos, client := range scenario.slaves() { + masterID := scenario.nodeIDs[pos] + + Eventually(func() string { // Wait for masters + return client.ClusterNodes().Val() + }, "10s").Should(ContainSubstring(masterID)) + + err := client.ClusterReplicate(masterID).Err() + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() string { // Wait for slaves + return scenario.primary().ClusterNodes().Val() + }, "10s").Should(ContainSubstring("slave " + masterID)) + } + + // Wait for cluster state to turn OK + for _, client := range scenario.clients { + Eventually(func() string { + return client.ClusterInfo().Val() + }, "10s").Should(ContainSubstring("cluster_state:ok")) + } + }) + + AfterSuite(func() { + for _, client := range scenario.clients { + client.Close() + } + for _, process := range scenario.processes { + process.Close() + } + }) + + Describe("HashSlot", func() { + + It("should calculate hash slots", func() { + tests := []struct { + key string + slot int + }{ + {"123456789", 12739}, + {"{}foo", 9500}, + {"foo{}", 5542}, + {"foo{}{bar}", 8363}, + {"", 10503}, + {"", 5176}, + {string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 5463}, + } + rand.Seed(100) + + for _, test := range tests { + Expect(redis.HashSlot(test.key)).To(Equal(test.slot), "for %s", test.key) + } + }) + + It("should extract keys from tags", func() { + tests := []struct { + one, two string + }{ + {"foo{bar}", "bar"}, + {"{foo}bar", "foo"}, + {"{user1000}.following", "{user1000}.followers"}, + {"foo{{bar}}zap", "{bar"}, + {"foo{bar}{zap}", "bar"}, + } + + for _, test := range tests { + Expect(redis.HashSlot(test.one)).To(Equal(redis.HashSlot(test.two)), "for %s <-> %s", test.one, test.two) + } + }) + + }) + + Describe("Commands", func() { + + It("should CLUSTER SLOTS", func() { + res, err := scenario.primary().ClusterSlots().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(3)) + Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{ + {0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}}, + {5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}}, + {10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}}, + })) + }) + + It("should CLUSTER NODES", func() { + res, err := scenario.primary().ClusterNodes().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(res)).To(BeNumerically(">", 400)) + }) + + It("should CLUSTER INFO", func() { + res, err := scenario.primary().ClusterInfo().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(ContainSubstring("cluster_known_nodes:6")) + }) + + }) + + Describe("Client", func() { + var client *redis.ClusterClient + + BeforeEach(func() { + var err error + client, err = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"}, + }) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + for _, client := range scenario.clients { + client.FlushDb() + } + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should GET/SET/DEL", func() { + val, err := client.Get("A").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(val).To(Equal("")) + + val, err = client.Set("A", "VALUE").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("OK")) + + val, err = client.Get("A").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("VALUE")) + + cnt, err := client.Del("A").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(cnt).To(Equal(int64(1))) + }) + + It("should follow redirects", func() { + Expect(client.Set("A", "VALUE").Err()).NotTo(HaveOccurred()) + Expect(redis.HashSlot("A")).To(Equal(6373)) + + // Slot 6373 is stored on the second node + defer func() { + scenario.masters()[1].ClusterFailover() + }() + + slave := scenario.slaves()[1] + Expect(slave.ClusterFailover().Err()).NotTo(HaveOccurred()) + Eventually(func() string { + return slave.Info().Val() + }, "10s", "200ms").Should(ContainSubstring("role:master")) + + val, err := client.Get("A").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("VALUE")) + }) + }) +}) + +// -------------------------------------------------------------------- + +type clusterScenario struct { + ports []string + nodeIDs []string + processes map[string]*redisProcess + clients map[string]*redis.Client +} + +func (s *clusterScenario) primary() *redis.Client { + return s.clients[s.ports[0]] +} + +func (s *clusterScenario) masters() []*redis.Client { + result := make([]*redis.Client, 3) + for pos, port := range s.ports[:3] { + result[pos] = s.clients[port] + } + return result +} + +func (s *clusterScenario) slaves() []*redis.Client { + result := make([]*redis.Client, 3) + for pos, port := range s.ports[3:] { + result[pos] = s.clients[port] + } + return result +} diff --git a/command.go b/command.go index 4636fd9c..a4bfcf8d 100644 --- a/command.go +++ b/command.go @@ -24,18 +24,19 @@ var ( _ Cmder = (*StringIntMapCmd)(nil) _ Cmder = (*ZSliceCmd)(nil) _ Cmder = (*ScanCmd)(nil) + _ Cmder = (*ClusterSlotCmd)(nil) ) type Cmder interface { args() []string parseReply(*bufio.Reader) error setErr(error) + reset() writeTimeout() *time.Duration readTimeout() *time.Duration + clusterKey() string - // Reset resets internal state of the command. - Reset() Err() error String() string } @@ -65,13 +66,9 @@ type baseCmd struct { err error - _writeTimeout, _readTimeout *time.Duration -} + _clusterKeyPos int -func newBaseCmd(args ...string) *baseCmd { - return &baseCmd{ - _args: args, - } + _writeTimeout, _readTimeout *time.Duration } func (cmd *baseCmd) Err() error { @@ -97,6 +94,13 @@ func (cmd *baseCmd) writeTimeout() *time.Duration { return cmd._writeTimeout } +func (cmd *baseCmd) clusterKey() string { + if cmd._clusterKeyPos > 0 && cmd._clusterKeyPos < len(cmd._args) { + return cmd._args[cmd._clusterKeyPos] + } + return "" +} + func (cmd *baseCmd) setWriteTimeout(d time.Duration) { cmd._writeTimeout = &d } @@ -108,18 +112,16 @@ func (cmd *baseCmd) setErr(e error) { //------------------------------------------------------------------------------ type Cmd struct { - *baseCmd + baseCmd val interface{} } func NewCmd(args ...string) *Cmd { - return &Cmd{ - baseCmd: newBaseCmd(args...), - } + return &Cmd{baseCmd: baseCmd{_args: args}} } -func (cmd *Cmd) Reset() { +func (cmd *Cmd) reset() { cmd.val = nil cmd.err = nil } @@ -144,18 +146,16 @@ func (cmd *Cmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type SliceCmd struct { - *baseCmd + baseCmd val []interface{} } func NewSliceCmd(args ...string) *SliceCmd { - return &SliceCmd{ - baseCmd: newBaseCmd(args...), - } + return &SliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *SliceCmd) Reset() { +func (cmd *SliceCmd) reset() { cmd.val = nil cmd.err = nil } @@ -185,18 +185,20 @@ func (cmd *SliceCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type StatusCmd struct { - *baseCmd + baseCmd val string } func NewStatusCmd(args ...string) *StatusCmd { - return &StatusCmd{ - baseCmd: newBaseCmd(args...), - } + return &StatusCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *StatusCmd) Reset() { +func newKeylessStatusCmd(args ...string) *StatusCmd { + return &StatusCmd{baseCmd: baseCmd{_args: args}} +} + +func (cmd *StatusCmd) reset() { cmd.val = "" cmd.err = nil } @@ -226,18 +228,16 @@ func (cmd *StatusCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type IntCmd struct { - *baseCmd + baseCmd val int64 } func NewIntCmd(args ...string) *IntCmd { - return &IntCmd{ - baseCmd: newBaseCmd(args...), - } + return &IntCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *IntCmd) Reset() { +func (cmd *IntCmd) reset() { cmd.val = 0 cmd.err = nil } @@ -267,7 +267,7 @@ func (cmd *IntCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type DurationCmd struct { - *baseCmd + baseCmd val time.Duration precision time.Duration @@ -275,12 +275,12 @@ type DurationCmd struct { func NewDurationCmd(precision time.Duration, args ...string) *DurationCmd { return &DurationCmd{ - baseCmd: newBaseCmd(args...), precision: precision, + baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}, } } -func (cmd *DurationCmd) Reset() { +func (cmd *DurationCmd) reset() { cmd.val = 0 cmd.err = nil } @@ -310,18 +310,16 @@ func (cmd *DurationCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type BoolCmd struct { - *baseCmd + baseCmd val bool } func NewBoolCmd(args ...string) *BoolCmd { - return &BoolCmd{ - baseCmd: newBaseCmd(args...), - } + return &BoolCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *BoolCmd) Reset() { +func (cmd *BoolCmd) reset() { cmd.val = false cmd.err = nil } @@ -351,18 +349,16 @@ func (cmd *BoolCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type StringCmd struct { - *baseCmd + baseCmd val string } func NewStringCmd(args ...string) *StringCmd { - return &StringCmd{ - baseCmd: newBaseCmd(args...), - } + return &StringCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *StringCmd) Reset() { +func (cmd *StringCmd) reset() { cmd.val = "" cmd.err = nil } @@ -413,18 +409,16 @@ func (cmd *StringCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type FloatCmd struct { - *baseCmd + baseCmd val float64 } func NewFloatCmd(args ...string) *FloatCmd { - return &FloatCmd{ - baseCmd: newBaseCmd(args...), - } + return &FloatCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *FloatCmd) Reset() { +func (cmd *FloatCmd) reset() { cmd.val = 0 cmd.err = nil } @@ -450,18 +444,16 @@ func (cmd *FloatCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type StringSliceCmd struct { - *baseCmd + baseCmd val []string } func NewStringSliceCmd(args ...string) *StringSliceCmd { - return &StringSliceCmd{ - baseCmd: newBaseCmd(args...), - } + return &StringSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *StringSliceCmd) Reset() { +func (cmd *StringSliceCmd) reset() { cmd.val = nil cmd.err = nil } @@ -491,18 +483,16 @@ func (cmd *StringSliceCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type BoolSliceCmd struct { - *baseCmd + baseCmd val []bool } func NewBoolSliceCmd(args ...string) *BoolSliceCmd { - return &BoolSliceCmd{ - baseCmd: newBaseCmd(args...), - } + return &BoolSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *BoolSliceCmd) Reset() { +func (cmd *BoolSliceCmd) reset() { cmd.val = nil cmd.err = nil } @@ -532,18 +522,16 @@ func (cmd *BoolSliceCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type StringStringMapCmd struct { - *baseCmd + baseCmd val map[string]string } func NewStringStringMapCmd(args ...string) *StringStringMapCmd { - return &StringStringMapCmd{ - baseCmd: newBaseCmd(args...), - } + return &StringStringMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *StringStringMapCmd) Reset() { +func (cmd *StringStringMapCmd) reset() { cmd.val = nil cmd.err = nil } @@ -573,15 +561,13 @@ func (cmd *StringStringMapCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type StringIntMapCmd struct { - *baseCmd + baseCmd val map[string]int64 } func NewStringIntMapCmd(args ...string) *StringIntMapCmd { - return &StringIntMapCmd{ - baseCmd: newBaseCmd(args...), - } + return &StringIntMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } func (cmd *StringIntMapCmd) Val() map[string]int64 { @@ -596,6 +582,11 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } +func (cmd *StringIntMapCmd) reset() { + cmd.val = nil + cmd.err = nil +} + func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error { v, err := parseReply(rd, parseStringIntMap) if err != nil { @@ -609,18 +600,16 @@ func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type ZSliceCmd struct { - *baseCmd + baseCmd val []Z } func NewZSliceCmd(args ...string) *ZSliceCmd { - return &ZSliceCmd{ - baseCmd: newBaseCmd(args...), - } + return &ZSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *ZSliceCmd) Reset() { +func (cmd *ZSliceCmd) reset() { cmd.val = nil cmd.err = nil } @@ -650,19 +639,17 @@ func (cmd *ZSliceCmd) parseReply(rd *bufio.Reader) error { //------------------------------------------------------------------------------ type ScanCmd struct { - *baseCmd + baseCmd cursor int64 keys []string } func NewScanCmd(args ...string) *ScanCmd { - return &ScanCmd{ - baseCmd: newBaseCmd(args...), - } + return &ScanCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *ScanCmd) Reset() { +func (cmd *ScanCmd) reset() { cmd.cursor = 0 cmd.keys = nil cmd.err = nil @@ -700,3 +687,47 @@ func (cmd *ScanCmd) parseReply(rd *bufio.Reader) error { return nil } + +//------------------------------------------------------------------------------ + +type ClusterSlotInfo struct { + Start, End int + Addrs []string +} + +type ClusterSlotCmd struct { + baseCmd + + val []ClusterSlotInfo +} + +func NewClusterSlotCmd(args ...string) *ClusterSlotCmd { + return &ClusterSlotCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} +} + +func (cmd *ClusterSlotCmd) Val() []ClusterSlotInfo { + return cmd.val +} + +func (cmd *ClusterSlotCmd) Result() ([]ClusterSlotInfo, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ClusterSlotCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ClusterSlotCmd) reset() { + cmd.val = nil + cmd.err = nil +} + +func (cmd *ClusterSlotCmd) parseReply(rd *bufio.Reader) error { + v, err := parseReply(rd, parseClusterSlotInfoSlice) + if err != nil { + cmd.err = err + return err + } + cmd.val = v.([]ClusterSlotInfo) + return nil +} diff --git a/commands.go b/commands.go index 796b04a4..f2a22e99 100644 --- a/commands.go +++ b/commands.go @@ -17,76 +17,85 @@ func readTimeout(sec int64) time.Duration { return time.Duration(sec+1) * time.Second } +type commandable struct { + process func(cmd Cmder) +} + +func (c *commandable) Process(cmd Cmder) { + c.process(cmd) +} + //------------------------------------------------------------------------------ -func (c *Client) Auth(password string) *StatusCmd { - cmd := NewStatusCmd("AUTH", password) +func (c *commandable) Auth(password string) *StatusCmd { + cmd := newKeylessStatusCmd("AUTH", password) c.Process(cmd) return cmd } -func (c *Client) Echo(message string) *StringCmd { +func (c *commandable) Echo(message string) *StringCmd { cmd := NewStringCmd("ECHO", message) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) Ping() *StatusCmd { - cmd := NewStatusCmd("PING") +func (c *commandable) Ping() *StatusCmd { + cmd := newKeylessStatusCmd("PING") c.Process(cmd) return cmd } -func (c *Client) Quit() *StatusCmd { +func (c *commandable) Quit() *StatusCmd { panic("not implemented") } -func (c *Client) Select(index int64) *StatusCmd { - cmd := NewStatusCmd("SELECT", strconv.FormatInt(index, 10)) +func (c *commandable) Select(index int64) *StatusCmd { + cmd := newKeylessStatusCmd("SELECT", strconv.FormatInt(index, 10)) c.Process(cmd) return cmd } //------------------------------------------------------------------------------ -func (c *Client) Del(keys ...string) *IntCmd { +func (c *commandable) Del(keys ...string) *IntCmd { args := append([]string{"DEL"}, keys...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) Dump(key string) *StringCmd { +func (c *commandable) Dump(key string) *StringCmd { cmd := NewStringCmd("DUMP", key) c.Process(cmd) return cmd } -func (c *Client) Exists(key string) *BoolCmd { +func (c *commandable) Exists(key string) *BoolCmd { cmd := NewBoolCmd("EXISTS", key) c.Process(cmd) return cmd } -func (c *Client) Expire(key string, dur time.Duration) *BoolCmd { +func (c *commandable) Expire(key string, dur time.Duration) *BoolCmd { cmd := NewBoolCmd("EXPIRE", key, strconv.FormatInt(int64(dur/time.Second), 10)) c.Process(cmd) return cmd } -func (c *Client) ExpireAt(key string, tm time.Time) *BoolCmd { +func (c *commandable) ExpireAt(key string, tm time.Time) *BoolCmd { cmd := NewBoolCmd("EXPIREAT", key, strconv.FormatInt(tm.Unix(), 10)) c.Process(cmd) return cmd } -func (c *Client) Keys(pattern string) *StringSliceCmd { +func (c *commandable) Keys(pattern string) *StringSliceCmd { cmd := NewStringSliceCmd("KEYS", pattern) c.Process(cmd) return cmd } -func (c *Client) Migrate(host, port, key string, db, timeout int64) *StatusCmd { +func (c *commandable) Migrate(host, port, key string, db, timeout int64) *StatusCmd { cmd := NewStatusCmd( "MIGRATE", host, @@ -95,51 +104,55 @@ func (c *Client) Migrate(host, port, key string, db, timeout int64) *StatusCmd { strconv.FormatInt(db, 10), strconv.FormatInt(timeout, 10), ) + cmd._clusterKeyPos = 3 cmd.setReadTimeout(readTimeout(timeout)) c.Process(cmd) return cmd } -func (c *Client) Move(key string, db int64) *BoolCmd { +func (c *commandable) Move(key string, db int64) *BoolCmd { cmd := NewBoolCmd("MOVE", key, strconv.FormatInt(db, 10)) c.Process(cmd) return cmd } -func (c *Client) ObjectRefCount(keys ...string) *IntCmd { +func (c *commandable) ObjectRefCount(keys ...string) *IntCmd { args := append([]string{"OBJECT", "REFCOUNT"}, keys...) cmd := NewIntCmd(args...) + cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } -func (c *Client) ObjectEncoding(keys ...string) *StringCmd { +func (c *commandable) ObjectEncoding(keys ...string) *StringCmd { args := append([]string{"OBJECT", "ENCODING"}, keys...) cmd := NewStringCmd(args...) + cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } -func (c *Client) ObjectIdleTime(keys ...string) *DurationCmd { +func (c *commandable) ObjectIdleTime(keys ...string) *DurationCmd { args := append([]string{"OBJECT", "IDLETIME"}, keys...) cmd := NewDurationCmd(time.Second, args...) + cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } -func (c *Client) Persist(key string) *BoolCmd { +func (c *commandable) Persist(key string) *BoolCmd { cmd := NewBoolCmd("PERSIST", key) c.Process(cmd) return cmd } -func (c *Client) PExpire(key string, dur time.Duration) *BoolCmd { +func (c *commandable) PExpire(key string, dur time.Duration) *BoolCmd { cmd := NewBoolCmd("PEXPIRE", key, strconv.FormatInt(int64(dur/time.Millisecond), 10)) c.Process(cmd) return cmd } -func (c *Client) PExpireAt(key string, tm time.Time) *BoolCmd { +func (c *commandable) PExpireAt(key string, tm time.Time) *BoolCmd { cmd := NewBoolCmd( "PEXPIREAT", key, @@ -149,31 +162,31 @@ func (c *Client) PExpireAt(key string, tm time.Time) *BoolCmd { return cmd } -func (c *Client) PTTL(key string) *DurationCmd { +func (c *commandable) PTTL(key string) *DurationCmd { cmd := NewDurationCmd(time.Millisecond, "PTTL", key) c.Process(cmd) return cmd } -func (c *Client) RandomKey() *StringCmd { +func (c *commandable) RandomKey() *StringCmd { cmd := NewStringCmd("RANDOMKEY") c.Process(cmd) return cmd } -func (c *Client) Rename(key, newkey string) *StatusCmd { +func (c *commandable) Rename(key, newkey string) *StatusCmd { cmd := NewStatusCmd("RENAME", key, newkey) c.Process(cmd) return cmd } -func (c *Client) RenameNX(key, newkey string) *BoolCmd { +func (c *commandable) RenameNX(key, newkey string) *BoolCmd { cmd := NewBoolCmd("RENAMENX", key, newkey) c.Process(cmd) return cmd } -func (c *Client) Restore(key string, ttl int64, value string) *StatusCmd { +func (c *commandable) Restore(key string, ttl int64, value string) *StatusCmd { cmd := NewStatusCmd( "RESTORE", key, @@ -193,7 +206,7 @@ type Sort struct { Store string } -func (c *Client) Sort(key string, sort Sort) *StringSliceCmd { +func (c *commandable) Sort(key string, sort Sort) *StringSliceCmd { args := []string{"SORT", key} if sort.By != "" { args = append(args, "BY", sort.By) @@ -218,19 +231,19 @@ func (c *Client) Sort(key string, sort Sort) *StringSliceCmd { return cmd } -func (c *Client) TTL(key string) *DurationCmd { +func (c *commandable) TTL(key string) *DurationCmd { cmd := NewDurationCmd(time.Second, "TTL", key) c.Process(cmd) return cmd } -func (c *Client) Type(key string) *StatusCmd { +func (c *commandable) Type(key string) *StatusCmd { cmd := NewStatusCmd("TYPE", key) c.Process(cmd) return cmd } -func (c *Client) Scan(cursor int64, match string, count int64) *ScanCmd { +func (c *commandable) Scan(cursor int64, match string, count int64) *ScanCmd { args := []string{"SCAN", strconv.FormatInt(cursor, 10)} if match != "" { args = append(args, "MATCH", match) @@ -243,7 +256,7 @@ func (c *Client) Scan(cursor int64, match string, count int64) *ScanCmd { return cmd } -func (c *Client) SScan(key string, cursor int64, match string, count int64) *ScanCmd { +func (c *commandable) SScan(key string, cursor int64, match string, count int64) *ScanCmd { args := []string{"SSCAN", key, strconv.FormatInt(cursor, 10)} if match != "" { args = append(args, "MATCH", match) @@ -256,7 +269,7 @@ func (c *Client) SScan(key string, cursor int64, match string, count int64) *Sca return cmd } -func (c *Client) HScan(key string, cursor int64, match string, count int64) *ScanCmd { +func (c *commandable) HScan(key string, cursor int64, match string, count int64) *ScanCmd { args := []string{"HSCAN", key, strconv.FormatInt(cursor, 10)} if match != "" { args = append(args, "MATCH", match) @@ -269,7 +282,7 @@ func (c *Client) HScan(key string, cursor int64, match string, count int64) *Sca return cmd } -func (c *Client) ZScan(key string, cursor int64, match string, count int64) *ScanCmd { +func (c *commandable) ZScan(key string, cursor int64, match string, count int64) *ScanCmd { args := []string{"ZSCAN", key, strconv.FormatInt(cursor, 10)} if match != "" { args = append(args, "MATCH", match) @@ -284,7 +297,7 @@ func (c *Client) ZScan(key string, cursor int64, match string, count int64) *Sca //------------------------------------------------------------------------------ -func (c *Client) Append(key, value string) *IntCmd { +func (c *commandable) Append(key, value string) *IntCmd { cmd := NewIntCmd("APPEND", key, value) c.Process(cmd) return cmd @@ -294,7 +307,7 @@ type BitCount struct { Start, End int64 } -func (c *Client) BitCount(key string, bitCount *BitCount) *IntCmd { +func (c *commandable) BitCount(key string, bitCount *BitCount) *IntCmd { args := []string{"BITCOUNT", key} if bitCount != nil { args = append( @@ -308,7 +321,7 @@ func (c *Client) BitCount(key string, bitCount *BitCount) *IntCmd { return cmd } -func (c *Client) bitOp(op, destKey string, keys ...string) *IntCmd { +func (c *commandable) bitOp(op, destKey string, keys ...string) *IntCmd { args := []string{"BITOP", op, destKey} args = append(args, keys...) cmd := NewIntCmd(args...) @@ -316,47 +329,47 @@ func (c *Client) bitOp(op, destKey string, keys ...string) *IntCmd { return cmd } -func (c *Client) BitOpAnd(destKey string, keys ...string) *IntCmd { +func (c *commandable) BitOpAnd(destKey string, keys ...string) *IntCmd { return c.bitOp("AND", destKey, keys...) } -func (c *Client) BitOpOr(destKey string, keys ...string) *IntCmd { +func (c *commandable) BitOpOr(destKey string, keys ...string) *IntCmd { return c.bitOp("OR", destKey, keys...) } -func (c *Client) BitOpXor(destKey string, keys ...string) *IntCmd { +func (c *commandable) BitOpXor(destKey string, keys ...string) *IntCmd { return c.bitOp("XOR", destKey, keys...) } -func (c *Client) BitOpNot(destKey string, key string) *IntCmd { +func (c *commandable) BitOpNot(destKey string, key string) *IntCmd { return c.bitOp("NOT", destKey, key) } -func (c *Client) Decr(key string) *IntCmd { +func (c *commandable) Decr(key string) *IntCmd { cmd := NewIntCmd("DECR", key) c.Process(cmd) return cmd } -func (c *Client) DecrBy(key string, decrement int64) *IntCmd { +func (c *commandable) DecrBy(key string, decrement int64) *IntCmd { cmd := NewIntCmd("DECRBY", key, strconv.FormatInt(decrement, 10)) c.Process(cmd) return cmd } -func (c *Client) Get(key string) *StringCmd { +func (c *commandable) Get(key string) *StringCmd { cmd := NewStringCmd("GET", key) c.Process(cmd) return cmd } -func (c *Client) GetBit(key string, offset int64) *IntCmd { +func (c *commandable) GetBit(key string, offset int64) *IntCmd { cmd := NewIntCmd("GETBIT", key, strconv.FormatInt(offset, 10)) c.Process(cmd) return cmd } -func (c *Client) GetRange(key string, start, end int64) *StringCmd { +func (c *commandable) GetRange(key string, start, end int64) *StringCmd { cmd := NewStringCmd( "GETRANGE", key, @@ -367,52 +380,52 @@ func (c *Client) GetRange(key string, start, end int64) *StringCmd { return cmd } -func (c *Client) GetSet(key, value string) *StringCmd { +func (c *commandable) GetSet(key, value string) *StringCmd { cmd := NewStringCmd("GETSET", key, value) c.Process(cmd) return cmd } -func (c *Client) Incr(key string) *IntCmd { +func (c *commandable) Incr(key string) *IntCmd { cmd := NewIntCmd("INCR", key) c.Process(cmd) return cmd } -func (c *Client) IncrBy(key string, value int64) *IntCmd { +func (c *commandable) IncrBy(key string, value int64) *IntCmd { cmd := NewIntCmd("INCRBY", key, strconv.FormatInt(value, 10)) c.Process(cmd) return cmd } -func (c *Client) IncrByFloat(key string, value float64) *FloatCmd { +func (c *commandable) IncrByFloat(key string, value float64) *FloatCmd { cmd := NewFloatCmd("INCRBYFLOAT", key, formatFloat(value)) c.Process(cmd) return cmd } -func (c *Client) MGet(keys ...string) *SliceCmd { +func (c *commandable) MGet(keys ...string) *SliceCmd { args := append([]string{"MGET"}, keys...) cmd := NewSliceCmd(args...) c.Process(cmd) return cmd } -func (c *Client) MSet(pairs ...string) *StatusCmd { +func (c *commandable) MSet(pairs ...string) *StatusCmd { args := append([]string{"MSET"}, pairs...) cmd := NewStatusCmd(args...) c.Process(cmd) return cmd } -func (c *Client) MSetNX(pairs ...string) *BoolCmd { +func (c *commandable) MSetNX(pairs ...string) *BoolCmd { args := append([]string{"MSETNX"}, pairs...) cmd := NewBoolCmd(args...) c.Process(cmd) return cmd } -func (c *Client) PSetEx(key string, dur time.Duration, value string) *StatusCmd { +func (c *commandable) PSetEx(key string, dur time.Duration, value string) *StatusCmd { cmd := NewStatusCmd( "PSETEX", key, @@ -423,13 +436,13 @@ func (c *Client) PSetEx(key string, dur time.Duration, value string) *StatusCmd return cmd } -func (c *Client) Set(key, value string) *StatusCmd { +func (c *commandable) Set(key, value string) *StatusCmd { cmd := NewStatusCmd("SET", key, value) c.Process(cmd) return cmd } -func (c *Client) SetBit(key string, offset int64, value int) *IntCmd { +func (c *commandable) SetBit(key string, offset int64, value int) *IntCmd { cmd := NewIntCmd( "SETBIT", key, @@ -440,25 +453,25 @@ func (c *Client) SetBit(key string, offset int64, value int) *IntCmd { return cmd } -func (c *Client) SetEx(key string, dur time.Duration, value string) *StatusCmd { +func (c *commandable) SetEx(key string, dur time.Duration, value string) *StatusCmd { cmd := NewStatusCmd("SETEX", key, strconv.FormatInt(int64(dur/time.Second), 10), value) c.Process(cmd) return cmd } -func (c *Client) SetNX(key, value string) *BoolCmd { +func (c *commandable) SetNX(key, value string) *BoolCmd { cmd := NewBoolCmd("SETNX", key, value) c.Process(cmd) return cmd } -func (c *Client) SetRange(key string, offset int64, value string) *IntCmd { +func (c *commandable) SetRange(key string, offset int64, value string) *IntCmd { cmd := NewIntCmd("SETRANGE", key, strconv.FormatInt(offset, 10), value) c.Process(cmd) return cmd } -func (c *Client) StrLen(key string) *IntCmd { +func (c *commandable) StrLen(key string) *IntCmd { cmd := NewIntCmd("STRLEN", key) c.Process(cmd) return cmd @@ -466,88 +479,88 @@ func (c *Client) StrLen(key string) *IntCmd { //------------------------------------------------------------------------------ -func (c *Client) HDel(key string, fields ...string) *IntCmd { +func (c *commandable) HDel(key string, fields ...string) *IntCmd { args := append([]string{"HDEL", key}, fields...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) HExists(key, field string) *BoolCmd { +func (c *commandable) HExists(key, field string) *BoolCmd { cmd := NewBoolCmd("HEXISTS", key, field) c.Process(cmd) return cmd } -func (c *Client) HGet(key, field string) *StringCmd { +func (c *commandable) HGet(key, field string) *StringCmd { cmd := NewStringCmd("HGET", key, field) c.Process(cmd) return cmd } -func (c *Client) HGetAll(key string) *StringSliceCmd { +func (c *commandable) HGetAll(key string) *StringSliceCmd { cmd := NewStringSliceCmd("HGETALL", key) c.Process(cmd) return cmd } -func (c *Client) HGetAllMap(key string) *StringStringMapCmd { +func (c *commandable) HGetAllMap(key string) *StringStringMapCmd { cmd := NewStringStringMapCmd("HGETALL", key) c.Process(cmd) return cmd } -func (c *Client) HIncrBy(key, field string, incr int64) *IntCmd { +func (c *commandable) HIncrBy(key, field string, incr int64) *IntCmd { cmd := NewIntCmd("HINCRBY", key, field, strconv.FormatInt(incr, 10)) c.Process(cmd) return cmd } -func (c *Client) HIncrByFloat(key, field string, incr float64) *FloatCmd { +func (c *commandable) HIncrByFloat(key, field string, incr float64) *FloatCmd { cmd := NewFloatCmd("HINCRBYFLOAT", key, field, formatFloat(incr)) c.Process(cmd) return cmd } -func (c *Client) HKeys(key string) *StringSliceCmd { +func (c *commandable) HKeys(key string) *StringSliceCmd { cmd := NewStringSliceCmd("HKEYS", key) c.Process(cmd) return cmd } -func (c *Client) HLen(key string) *IntCmd { +func (c *commandable) HLen(key string) *IntCmd { cmd := NewIntCmd("HLEN", key) c.Process(cmd) return cmd } -func (c *Client) HMGet(key string, fields ...string) *SliceCmd { +func (c *commandable) HMGet(key string, fields ...string) *SliceCmd { args := append([]string{"HMGET", key}, fields...) cmd := NewSliceCmd(args...) c.Process(cmd) return cmd } -func (c *Client) HMSet(key, field, value string, pairs ...string) *StatusCmd { +func (c *commandable) HMSet(key, field, value string, pairs ...string) *StatusCmd { args := append([]string{"HMSET", key, field, value}, pairs...) cmd := NewStatusCmd(args...) c.Process(cmd) return cmd } -func (c *Client) HSet(key, field, value string) *BoolCmd { +func (c *commandable) HSet(key, field, value string) *BoolCmd { cmd := NewBoolCmd("HSET", key, field, value) c.Process(cmd) return cmd } -func (c *Client) HSetNX(key, field, value string) *BoolCmd { +func (c *commandable) HSetNX(key, field, value string) *BoolCmd { cmd := NewBoolCmd("HSETNX", key, field, value) c.Process(cmd) return cmd } -func (c *Client) HVals(key string) *StringSliceCmd { +func (c *commandable) HVals(key string) *StringSliceCmd { cmd := NewStringSliceCmd("HVALS", key) c.Process(cmd) return cmd @@ -555,7 +568,7 @@ func (c *Client) HVals(key string) *StringSliceCmd { //------------------------------------------------------------------------------ -func (c *Client) BLPop(timeout int64, keys ...string) *StringSliceCmd { +func (c *commandable) BLPop(timeout int64, keys ...string) *StringSliceCmd { args := append([]string{"BLPOP"}, keys...) args = append(args, strconv.FormatInt(timeout, 10)) cmd := NewStringSliceCmd(args...) @@ -564,7 +577,7 @@ func (c *Client) BLPop(timeout int64, keys ...string) *StringSliceCmd { return cmd } -func (c *Client) BRPop(timeout int64, keys ...string) *StringSliceCmd { +func (c *commandable) BRPop(timeout int64, keys ...string) *StringSliceCmd { args := append([]string{"BRPOP"}, keys...) args = append(args, strconv.FormatInt(timeout, 10)) cmd := NewStringSliceCmd(args...) @@ -573,7 +586,7 @@ func (c *Client) BRPop(timeout int64, keys ...string) *StringSliceCmd { return cmd } -func (c *Client) BRPopLPush(source, destination string, timeout int64) *StringCmd { +func (c *commandable) BRPopLPush(source, destination string, timeout int64) *StringCmd { cmd := NewStringCmd( "BRPOPLPUSH", source, @@ -585,44 +598,44 @@ func (c *Client) BRPopLPush(source, destination string, timeout int64) *StringCm return cmd } -func (c *Client) LIndex(key string, index int64) *StringCmd { +func (c *commandable) LIndex(key string, index int64) *StringCmd { cmd := NewStringCmd("LINDEX", key, strconv.FormatInt(index, 10)) c.Process(cmd) return cmd } -func (c *Client) LInsert(key, op, pivot, value string) *IntCmd { +func (c *commandable) LInsert(key, op, pivot, value string) *IntCmd { cmd := NewIntCmd("LINSERT", key, op, pivot, value) c.Process(cmd) return cmd } -func (c *Client) LLen(key string) *IntCmd { +func (c *commandable) LLen(key string) *IntCmd { cmd := NewIntCmd("LLEN", key) c.Process(cmd) return cmd } -func (c *Client) LPop(key string) *StringCmd { +func (c *commandable) LPop(key string) *StringCmd { cmd := NewStringCmd("LPOP", key) c.Process(cmd) return cmd } -func (c *Client) LPush(key string, values ...string) *IntCmd { +func (c *commandable) LPush(key string, values ...string) *IntCmd { args := append([]string{"LPUSH", key}, values...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) LPushX(key, value string) *IntCmd { +func (c *commandable) LPushX(key, value string) *IntCmd { cmd := NewIntCmd("LPUSHX", key, value) c.Process(cmd) return cmd } -func (c *Client) LRange(key string, start, stop int64) *StringSliceCmd { +func (c *commandable) LRange(key string, start, stop int64) *StringSliceCmd { cmd := NewStringSliceCmd( "LRANGE", key, @@ -633,19 +646,19 @@ func (c *Client) LRange(key string, start, stop int64) *StringSliceCmd { return cmd } -func (c *Client) LRem(key string, count int64, value string) *IntCmd { +func (c *commandable) LRem(key string, count int64, value string) *IntCmd { cmd := NewIntCmd("LREM", key, strconv.FormatInt(count, 10), value) c.Process(cmd) return cmd } -func (c *Client) LSet(key string, index int64, value string) *StatusCmd { +func (c *commandable) LSet(key string, index int64, value string) *StatusCmd { cmd := NewStatusCmd("LSET", key, strconv.FormatInt(index, 10), value) c.Process(cmd) return cmd } -func (c *Client) LTrim(key string, start, stop int64) *StatusCmd { +func (c *commandable) LTrim(key string, start, stop int64) *StatusCmd { cmd := NewStatusCmd( "LTRIM", key, @@ -656,26 +669,26 @@ func (c *Client) LTrim(key string, start, stop int64) *StatusCmd { return cmd } -func (c *Client) RPop(key string) *StringCmd { +func (c *commandable) RPop(key string) *StringCmd { cmd := NewStringCmd("RPOP", key) c.Process(cmd) return cmd } -func (c *Client) RPopLPush(source, destination string) *StringCmd { +func (c *commandable) RPopLPush(source, destination string) *StringCmd { cmd := NewStringCmd("RPOPLPUSH", source, destination) c.Process(cmd) return cmd } -func (c *Client) RPush(key string, values ...string) *IntCmd { +func (c *commandable) RPush(key string, values ...string) *IntCmd { args := append([]string{"RPUSH", key}, values...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) RPushX(key string, value string) *IntCmd { +func (c *commandable) RPushX(key string, value string) *IntCmd { cmd := NewIntCmd("RPUSHX", key, value) c.Process(cmd) return cmd @@ -683,92 +696,92 @@ func (c *Client) RPushX(key string, value string) *IntCmd { //------------------------------------------------------------------------------ -func (c *Client) SAdd(key string, members ...string) *IntCmd { +func (c *commandable) SAdd(key string, members ...string) *IntCmd { args := append([]string{"SADD", key}, members...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SCard(key string) *IntCmd { +func (c *commandable) SCard(key string) *IntCmd { cmd := NewIntCmd("SCARD", key) c.Process(cmd) return cmd } -func (c *Client) SDiff(keys ...string) *StringSliceCmd { +func (c *commandable) SDiff(keys ...string) *StringSliceCmd { args := append([]string{"SDIFF"}, keys...) cmd := NewStringSliceCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SDiffStore(destination string, keys ...string) *IntCmd { +func (c *commandable) SDiffStore(destination string, keys ...string) *IntCmd { args := append([]string{"SDIFFSTORE", destination}, keys...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SInter(keys ...string) *StringSliceCmd { +func (c *commandable) SInter(keys ...string) *StringSliceCmd { args := append([]string{"SINTER"}, keys...) cmd := NewStringSliceCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SInterStore(destination string, keys ...string) *IntCmd { +func (c *commandable) SInterStore(destination string, keys ...string) *IntCmd { args := append([]string{"SINTERSTORE", destination}, keys...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SIsMember(key, member string) *BoolCmd { +func (c *commandable) SIsMember(key, member string) *BoolCmd { cmd := NewBoolCmd("SISMEMBER", key, member) c.Process(cmd) return cmd } -func (c *Client) SMembers(key string) *StringSliceCmd { +func (c *commandable) SMembers(key string) *StringSliceCmd { cmd := NewStringSliceCmd("SMEMBERS", key) c.Process(cmd) return cmd } -func (c *Client) SMove(source, destination, member string) *BoolCmd { +func (c *commandable) SMove(source, destination, member string) *BoolCmd { cmd := NewBoolCmd("SMOVE", source, destination, member) c.Process(cmd) return cmd } -func (c *Client) SPop(key string) *StringCmd { +func (c *commandable) SPop(key string) *StringCmd { cmd := NewStringCmd("SPOP", key) c.Process(cmd) return cmd } -func (c *Client) SRandMember(key string) *StringCmd { +func (c *commandable) SRandMember(key string) *StringCmd { cmd := NewStringCmd("SRANDMEMBER", key) c.Process(cmd) return cmd } -func (c *Client) SRem(key string, members ...string) *IntCmd { +func (c *commandable) SRem(key string, members ...string) *IntCmd { args := append([]string{"SREM", key}, members...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SUnion(keys ...string) *StringSliceCmd { +func (c *commandable) SUnion(keys ...string) *StringSliceCmd { args := append([]string{"SUNION"}, keys...) cmd := NewStringSliceCmd(args...) c.Process(cmd) return cmd } -func (c *Client) SUnionStore(destination string, keys ...string) *IntCmd { +func (c *commandable) SUnionStore(destination string, keys ...string) *IntCmd { args := append([]string{"SUNIONSTORE", destination}, keys...) cmd := NewIntCmd(args...) c.Process(cmd) @@ -787,7 +800,7 @@ type ZStore struct { Aggregate string } -func (c *Client) ZAdd(key string, members ...Z) *IntCmd { +func (c *commandable) ZAdd(key string, members ...Z) *IntCmd { args := []string{"ZADD", key} for _, m := range members { args = append(args, formatFloat(m.Score), m.Member) @@ -797,25 +810,25 @@ func (c *Client) ZAdd(key string, members ...Z) *IntCmd { return cmd } -func (c *Client) ZCard(key string) *IntCmd { +func (c *commandable) ZCard(key string) *IntCmd { cmd := NewIntCmd("ZCARD", key) c.Process(cmd) return cmd } -func (c *Client) ZCount(key, min, max string) *IntCmd { +func (c *commandable) ZCount(key, min, max string) *IntCmd { cmd := NewIntCmd("ZCOUNT", key, min, max) c.Process(cmd) return cmd } -func (c *Client) ZIncrBy(key string, increment float64, member string) *FloatCmd { +func (c *commandable) ZIncrBy(key string, increment float64, member string) *FloatCmd { cmd := NewFloatCmd("ZINCRBY", key, formatFloat(increment), member) c.Process(cmd) return cmd } -func (c *Client) ZInterStore( +func (c *commandable) ZInterStore( destination string, store ZStore, keys ...string, @@ -836,7 +849,7 @@ func (c *Client) ZInterStore( return cmd } -func (c *Client) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { +func (c *commandable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { args := []string{ "ZRANGE", key, @@ -851,11 +864,11 @@ func (c *Client) zRange(key string, start, stop int64, withScores bool) *StringS return cmd } -func (c *Client) ZRange(key string, start, stop int64) *StringSliceCmd { +func (c *commandable) ZRange(key string, start, stop int64) *StringSliceCmd { return c.zRange(key, start, stop, false) } -func (c *Client) ZRangeWithScores(key string, start, stop int64) *ZSliceCmd { +func (c *commandable) ZRangeWithScores(key string, start, stop int64) *ZSliceCmd { args := []string{ "ZRANGE", key, @@ -874,7 +887,7 @@ type ZRangeByScore struct { Offset, Count int64 } -func (c *Client) zRangeByScore(key string, opt ZRangeByScore, withScores bool) *StringSliceCmd { +func (c *commandable) zRangeByScore(key string, opt ZRangeByScore, withScores bool) *StringSliceCmd { args := []string{"ZRANGEBYSCORE", key, opt.Min, opt.Max} if withScores { args = append(args, "WITHSCORES") @@ -892,11 +905,11 @@ func (c *Client) zRangeByScore(key string, opt ZRangeByScore, withScores bool) * return cmd } -func (c *Client) ZRangeByScore(key string, opt ZRangeByScore) *StringSliceCmd { +func (c *commandable) ZRangeByScore(key string, opt ZRangeByScore) *StringSliceCmd { return c.zRangeByScore(key, opt, false) } -func (c *Client) ZRangeByScoreWithScores(key string, opt ZRangeByScore) *ZSliceCmd { +func (c *commandable) ZRangeByScoreWithScores(key string, opt ZRangeByScore) *ZSliceCmd { args := []string{"ZRANGEBYSCORE", key, opt.Min, opt.Max, "WITHSCORES"} if opt.Offset != 0 || opt.Count != 0 { args = append( @@ -911,20 +924,20 @@ func (c *Client) ZRangeByScoreWithScores(key string, opt ZRangeByScore) *ZSliceC return cmd } -func (c *Client) ZRank(key, member string) *IntCmd { +func (c *commandable) ZRank(key, member string) *IntCmd { cmd := NewIntCmd("ZRANK", key, member) c.Process(cmd) return cmd } -func (c *Client) ZRem(key string, members ...string) *IntCmd { +func (c *commandable) ZRem(key string, members ...string) *IntCmd { args := append([]string{"ZREM", key}, members...) cmd := NewIntCmd(args...) c.Process(cmd) return cmd } -func (c *Client) ZRemRangeByRank(key string, start, stop int64) *IntCmd { +func (c *commandable) ZRemRangeByRank(key string, start, stop int64) *IntCmd { cmd := NewIntCmd( "ZREMRANGEBYRANK", key, @@ -935,13 +948,13 @@ func (c *Client) ZRemRangeByRank(key string, start, stop int64) *IntCmd { return cmd } -func (c *Client) ZRemRangeByScore(key, min, max string) *IntCmd { +func (c *commandable) ZRemRangeByScore(key, min, max string) *IntCmd { cmd := NewIntCmd("ZREMRANGEBYSCORE", key, min, max) c.Process(cmd) return cmd } -func (c *Client) zRevRange(key, start, stop string, withScores bool) *StringSliceCmd { +func (c *commandable) zRevRange(key, start, stop string, withScores bool) *StringSliceCmd { args := []string{"ZREVRANGE", key, start, stop} if withScores { args = append(args, "WITHSCORES") @@ -951,18 +964,18 @@ func (c *Client) zRevRange(key, start, stop string, withScores bool) *StringSlic return cmd } -func (c *Client) ZRevRange(key, start, stop string) *StringSliceCmd { +func (c *commandable) ZRevRange(key, start, stop string) *StringSliceCmd { return c.zRevRange(key, start, stop, false) } -func (c *Client) ZRevRangeWithScores(key, start, stop string) *ZSliceCmd { +func (c *commandable) ZRevRangeWithScores(key, start, stop string) *ZSliceCmd { args := []string{"ZREVRANGE", key, start, stop, "WITHSCORES"} cmd := NewZSliceCmd(args...) c.Process(cmd) return cmd } -func (c *Client) zRevRangeByScore(key string, opt ZRangeByScore, withScores bool) *StringSliceCmd { +func (c *commandable) zRevRangeByScore(key string, opt ZRangeByScore, withScores bool) *StringSliceCmd { args := []string{"ZREVRANGEBYSCORE", key, opt.Max, opt.Min} if withScores { args = append(args, "WITHSCORES") @@ -980,11 +993,11 @@ func (c *Client) zRevRangeByScore(key string, opt ZRangeByScore, withScores bool return cmd } -func (c *Client) ZRevRangeByScore(key string, opt ZRangeByScore) *StringSliceCmd { +func (c *commandable) ZRevRangeByScore(key string, opt ZRangeByScore) *StringSliceCmd { return c.zRevRangeByScore(key, opt, false) } -func (c *Client) ZRevRangeByScoreWithScores(key string, opt ZRangeByScore) *ZSliceCmd { +func (c *commandable) ZRevRangeByScoreWithScores(key string, opt ZRangeByScore) *ZSliceCmd { args := []string{"ZREVRANGEBYSCORE", key, opt.Max, opt.Min, "WITHSCORES"} if opt.Offset != 0 || opt.Count != 0 { args = append( @@ -999,19 +1012,19 @@ func (c *Client) ZRevRangeByScoreWithScores(key string, opt ZRangeByScore) *ZSli return cmd } -func (c *Client) ZRevRank(key, member string) *IntCmd { +func (c *commandable) ZRevRank(key, member string) *IntCmd { cmd := NewIntCmd("ZREVRANK", key, member) c.Process(cmd) return cmd } -func (c *Client) ZScore(key, member string) *FloatCmd { +func (c *commandable) ZScore(key, member string) *FloatCmd { cmd := NewFloatCmd("ZSCORE", key, member) c.Process(cmd) return cmd } -func (c *Client) ZUnionStore( +func (c *commandable) ZUnionStore( destination string, store ZStore, keys ...string, @@ -1034,92 +1047,102 @@ func (c *Client) ZUnionStore( //------------------------------------------------------------------------------ -func (c *Client) BgRewriteAOF() *StatusCmd { +func (c *commandable) BgRewriteAOF() *StatusCmd { cmd := NewStatusCmd("BGREWRITEAOF") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) BgSave() *StatusCmd { +func (c *commandable) BgSave() *StatusCmd { cmd := NewStatusCmd("BGSAVE") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) ClientKill(ipPort string) *StatusCmd { +func (c *commandable) ClientKill(ipPort string) *StatusCmd { cmd := NewStatusCmd("CLIENT", "KILL", ipPort) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) ClientList() *StringCmd { +func (c *commandable) ClientList() *StringCmd { cmd := NewStringCmd("CLIENT", "LIST") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) ConfigGet(parameter string) *SliceCmd { +func (c *commandable) ConfigGet(parameter string) *SliceCmd { cmd := NewSliceCmd("CONFIG", "GET", parameter) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) ConfigResetStat() *StatusCmd { +func (c *commandable) ConfigResetStat() *StatusCmd { cmd := NewStatusCmd("CONFIG", "RESETSTAT") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) ConfigSet(parameter, value string) *StatusCmd { +func (c *commandable) ConfigSet(parameter, value string) *StatusCmd { cmd := NewStatusCmd("CONFIG", "SET", parameter, value) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) DbSize() *IntCmd { +func (c *commandable) DbSize() *IntCmd { cmd := NewIntCmd("DBSIZE") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) FlushAll() *StatusCmd { - cmd := NewStatusCmd("FLUSHALL") +func (c *commandable) FlushAll() *StatusCmd { + cmd := newKeylessStatusCmd("FLUSHALL") c.Process(cmd) return cmd } -func (c *Client) FlushDb() *StatusCmd { - cmd := NewStatusCmd("FLUSHDB") +func (c *commandable) FlushDb() *StatusCmd { + cmd := newKeylessStatusCmd("FLUSHDB") c.Process(cmd) return cmd } -func (c *Client) Info() *StringCmd { +func (c *commandable) Info() *StringCmd { cmd := NewStringCmd("INFO") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) LastSave() *IntCmd { +func (c *commandable) LastSave() *IntCmd { cmd := NewIntCmd("LASTSAVE") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) Save() *StatusCmd { - cmd := NewStatusCmd("SAVE") +func (c *commandable) Save() *StatusCmd { + cmd := newKeylessStatusCmd("SAVE") c.Process(cmd) return cmd } -func (c *Client) shutdown(modifier string) *StatusCmd { +func (c *commandable) shutdown(modifier string) *StatusCmd { var args []string if modifier == "" { args = []string{"SHUTDOWN"} } else { args = []string{"SHUTDOWN", modifier} } - cmd := NewStatusCmd(args...) + cmd := newKeylessStatusCmd(args...) c.Process(cmd) if err := cmd.Err(); err != nil { if err == io.EOF { @@ -1134,113 +1157,188 @@ func (c *Client) shutdown(modifier string) *StatusCmd { return cmd } -func (c *Client) Shutdown() *StatusCmd { +func (c *commandable) Shutdown() *StatusCmd { return c.shutdown("") } -func (c *Client) ShutdownSave() *StatusCmd { +func (c *commandable) ShutdownSave() *StatusCmd { return c.shutdown("SAVE") } -func (c *Client) ShutdownNoSave() *StatusCmd { +func (c *commandable) ShutdownNoSave() *StatusCmd { return c.shutdown("NOSAVE") } -func (c *Client) SlaveOf(host, port string) *StatusCmd { - cmd := NewStatusCmd("SLAVEOF", host, port) +func (c *commandable) SlaveOf(host, port string) *StatusCmd { + cmd := newKeylessStatusCmd("SLAVEOF", host, port) c.Process(cmd) return cmd } -func (c *Client) SlowLog() { +func (c *commandable) SlowLog() { panic("not implemented") } -func (c *Client) Sync() { +func (c *commandable) Sync() { panic("not implemented") } -func (c *Client) Time() *StringSliceCmd { +func (c *commandable) Time() *StringSliceCmd { cmd := NewStringSliceCmd("TIME") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } //------------------------------------------------------------------------------ -func (c *Client) Eval(script string, keys []string, args []string) *Cmd { +func (c *commandable) Eval(script string, keys []string, args []string) *Cmd { cmdArgs := []string{"EVAL", script, strconv.FormatInt(int64(len(keys)), 10)} cmdArgs = append(cmdArgs, keys...) cmdArgs = append(cmdArgs, args...) cmd := NewCmd(cmdArgs...) + if len(keys) > 0 { + cmd._clusterKeyPos = 3 + } c.Process(cmd) return cmd } -func (c *Client) EvalSha(sha1 string, keys []string, args []string) *Cmd { +func (c *commandable) EvalSha(sha1 string, keys []string, args []string) *Cmd { cmdArgs := []string{"EVALSHA", sha1, strconv.FormatInt(int64(len(keys)), 10)} cmdArgs = append(cmdArgs, keys...) cmdArgs = append(cmdArgs, args...) cmd := NewCmd(cmdArgs...) + if len(keys) > 0 { + cmd._clusterKeyPos = 3 + } c.Process(cmd) return cmd } -func (c *Client) ScriptExists(scripts ...string) *BoolSliceCmd { +func (c *commandable) ScriptExists(scripts ...string) *BoolSliceCmd { args := append([]string{"SCRIPT", "EXISTS"}, scripts...) cmd := NewBoolSliceCmd(args...) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) ScriptFlush() *StatusCmd { - cmd := NewStatusCmd("SCRIPT", "FLUSH") +func (c *commandable) ScriptFlush() *StatusCmd { + cmd := newKeylessStatusCmd("SCRIPT", "FLUSH") c.Process(cmd) return cmd } -func (c *Client) ScriptKill() *StatusCmd { - cmd := NewStatusCmd("SCRIPT", "KILL") +func (c *commandable) ScriptKill() *StatusCmd { + cmd := newKeylessStatusCmd("SCRIPT", "KILL") c.Process(cmd) return cmd } -func (c *Client) ScriptLoad(script string) *StringCmd { +func (c *commandable) ScriptLoad(script string) *StringCmd { cmd := NewStringCmd("SCRIPT", "LOAD", script) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } //------------------------------------------------------------------------------ -func (c *Client) DebugObject(key string) *StringCmd { +func (c *commandable) DebugObject(key string) *StringCmd { cmd := NewStringCmd("DEBUG", "OBJECT", key) + cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } //------------------------------------------------------------------------------ -func (c *Client) PubSubChannels(pattern string) *StringSliceCmd { +func (c *commandable) PubSubChannels(pattern string) *StringSliceCmd { args := []string{"PUBSUB", "CHANNELS"} if pattern != "*" { args = append(args, pattern) } cmd := NewStringSliceCmd(args...) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) PubSubNumSub(channels ...string) *StringIntMapCmd { +func (c *commandable) PubSubNumSub(channels ...string) *StringIntMapCmd { args := []string{"PUBSUB", "NUMSUB"} args = append(args, channels...) cmd := NewStringIntMapCmd(args...) + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } -func (c *Client) PubSubNumPat() *IntCmd { +func (c *commandable) PubSubNumPat() *IntCmd { cmd := NewIntCmd("PUBSUB", "NUMPAT") + cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } + +//------------------------------------------------------------------------------ + +func (c *commandable) ClusterSlots() *ClusterSlotCmd { + cmd := NewClusterSlotCmd("CLUSTER", "slots") + cmd._clusterKeyPos = 0 + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterNodes() *StringCmd { + cmd := NewStringCmd("CLUSTER", "nodes") + cmd._clusterKeyPos = 0 + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterMeet(host, port string) *StatusCmd { + cmd := newKeylessStatusCmd("CLUSTER", "meet", host, port) + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterReplicate(nodeID string) *StatusCmd { + cmd := newKeylessStatusCmd("CLUSTER", "replicate", nodeID) + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterInfo() *StringCmd { + cmd := NewStringCmd("CLUSTER", "info") + cmd._clusterKeyPos = 0 + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterFailover() *StatusCmd { + cmd := newKeylessStatusCmd("CLUSTER", "failover") + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterAddSlots(slots ...int) *StatusCmd { + args := make([]string, len(slots)+2) + args[0] = "CLUSTER" + args[1] = "addslots" + for i, num := range slots { + args[i+2] = strconv.Itoa(num) + } + cmd := newKeylessStatusCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterAddSlotsRange(min, max int) *StatusCmd { + size := max - min + 1 + slots := make([]int, size) + for i := 0; i < size; i++ { + slots[i] = min + i + } + return c.ClusterAddSlots(slots...) +} diff --git a/commands_test.go b/commands_test.go index d56518f3..dad515a0 100644 --- a/commands_test.go +++ b/commands_test.go @@ -65,7 +65,7 @@ var _ = Describe("Commands", func() { // workaround for "ERR Can't BGSAVE while AOF log rewriting is in progress" Eventually(func() string { return client.BgSave().Val() - }).Should(Equal("Background saving started")) + }, "10s").Should(Equal("Background saving started")) }) It("should ClientKill", func() { @@ -119,7 +119,7 @@ var _ = Describe("Commands", func() { // workaround for "ERR Background save already in progress" Eventually(func() string { return client.Save().Val() - }).Should(Equal("OK")) + }, "10s").Should(Equal("OK")) }) It("should SlaveOf", func() { diff --git a/crc16.go b/crc16.go new file mode 100644 index 00000000..a7f3b569 --- /dev/null +++ b/crc16.go @@ -0,0 +1,47 @@ +package redis + +// CRC16 implementation according to CCITT standards. +// Copyright 2001-2010 Georges Menie (www.menie.org) +// Copyright 2013 The Go Authors. All rights reserved. +// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c +var crc16tab = [256]uint16{ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, +} + +func crc16sum(key string) (crc uint16) { + for i := 0; i < len(key); i++ { + crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff] + } + return +} diff --git a/crc16_test.go b/crc16_test.go new file mode 100644 index 00000000..a6b34162 --- /dev/null +++ b/crc16_test.go @@ -0,0 +1,25 @@ +package redis + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("CRC16", func() { + + // http://redis.io/topics/cluster-spec#keys-distribution-model + It("should calculate CRC16", func() { + tests := []struct { + s string + n uint16 + }{ + {"123456789", 0x31C3}, + {string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 21847}, + } + + for _, test := range tests { + Expect(crc16sum(test.s)).To(Equal(test.n), "for %s", test.s) + } + }) + +}) diff --git a/multi.go b/multi.go index bff38dfa..3557ac57 100644 --- a/multi.go +++ b/multi.go @@ -9,17 +9,25 @@ var errDiscard = errors.New("redis: Discard can be used only inside Exec") // Not thread-safe. type Multi struct { - *Client + commandable + + base *baseClient + cmds []Cmder } func (c *Client) Multi() *Multi { - return &Multi{ - Client: &Client{ - baseClient: &baseClient{ - opt: c.opt, - connPool: newSingleConnPool(c.connPool, true), - }, - }, + multi := &Multi{ + base: &baseClient{opt: c.opt, connPool: newSingleConnPool(c.connPool, true)}, + } + multi.commandable.process = multi.process + return multi +} + +func (c *Multi) process(cmd Cmder) { + if c.cmds == nil { + c.base.process(cmd) + } else { + c.cmds = append(c.cmds, cmd) } } @@ -27,7 +35,7 @@ func (c *Multi) Close() error { if err := c.Unwatch().Err(); err != nil { return err } - return c.Client.Close() + return c.base.Close() } func (c *Multi) Watch(keys ...string) *StatusCmd { @@ -69,7 +77,7 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) { return []Cmder{}, nil } - cn, err := c.conn() + cn, err := c.base.conn() if err != nil { setCmdsErr(cmds[1:len(cmds)-1], err) return cmds[1 : len(cmds)-1], err @@ -77,16 +85,16 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) { err = c.execCmds(cn, cmds) if err != nil { - c.freeConn(cn, err) + c.base.freeConn(cn, err) return cmds[1 : len(cmds)-1], err } - c.putConn(cn) + c.base.putConn(cn) return cmds[1 : len(cmds)-1], nil } func (c *Multi) execCmds(cn *conn, cmds []Cmder) error { - err := c.writeCmd(cn, cmds...) + err := cn.writeCmds(cmds...) if err != nil { setCmdsErr(cmds[1:len(cmds)-1], err) return err diff --git a/parser.go b/parser.go index b5e30a4d..5a9c79f4 100644 --- a/parser.go +++ b/parser.go @@ -3,6 +3,7 @@ package redis import ( "errors" "fmt" + "net" "strconv" "gopkg.in/bufio.v1" @@ -292,3 +293,50 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) { } return zz, nil } + +func parseClusterSlotInfoSlice(rd *bufio.Reader, n int64) (interface{}, error) { + infos := make([]ClusterSlotInfo, 0, n) + for i := int64(0); i < n; i++ { + viface, err := parseReply(rd, parseSlice) + if err != nil { + return nil, err + } + + item, ok := viface.([]interface{}) + if !ok { + return nil, fmt.Errorf("got %T, expected []interface{}", viface) + } else if len(item) < 3 { + return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item) + } + + start, ok := item[0].(int64) + if !ok || start < 0 || start > hashSlots { + return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item) + } + end, ok := item[1].(int64) + if !ok || end < 0 || end > hashSlots { + return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item) + } + + info := ClusterSlotInfo{int(start), int(end), make([]string, len(item)-2)} + for n, ipair := range item[2:] { + pair, ok := ipair.([]interface{}) + if !ok || len(pair) != 2 { + return nil, fmt.Errorf("got %v, expected []interface{host, port}", viface) + } + + ip, ok := pair[0].(string) + if !ok || len(ip) < 1 { + return nil, fmt.Errorf("got %v, expected IP PORT pair", pair) + } + port, ok := pair[1].(int64) + if !ok || port < 1 { + return nil, fmt.Errorf("got %v, expected IP PORT pair", pair) + } + + info.Addrs[n] = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) + } + infos = append(infos, info) + } + return infos, nil +} diff --git a/pipeline.go b/pipeline.go index 540d6c51..33e51b4f 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,22 +2,23 @@ package redis // Not thread-safe. type Pipeline struct { - *Client + commandable + cmds []Cmder + client *baseClient closed bool } func (c *Client) Pipeline() *Pipeline { - return &Pipeline{ - Client: &Client{ - baseClient: &baseClient{ - opt: c.opt, - connPool: c.connPool, - - cmds: make([]Cmder, 0), - }, + pipe := &Pipeline{ + client: &baseClient{ + opt: c.opt, + connPool: c.connPool, }, + cmds: make([]Cmder, 0, 10), } + pipe.commandable.process = pipe.process + return pipe } func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) { @@ -30,6 +31,10 @@ func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) { return cmds, err } +func (c *Pipeline) process(cmd Cmder) { + c.cmds = append(c.cmds, cmd) +} + func (c *Pipeline) Close() error { c.closed = true return nil @@ -51,29 +56,29 @@ func (c *Pipeline) Exec() ([]Cmder, error) { } cmds := c.cmds - c.cmds = make([]Cmder, 0) + c.cmds = make([]Cmder, 0, 0) if len(cmds) == 0 { return []Cmder{}, nil } - cn, err := c.conn() + cn, err := c.client.conn() if err != nil { setCmdsErr(cmds, err) return cmds, err } if err := c.execCmds(cn, cmds); err != nil { - c.freeConn(cn, err) + c.client.freeConn(cn, err) return cmds, err } - c.putConn(cn) + c.client.putConn(cn) return cmds, nil } func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error { - if err := c.writeCmd(cn, cmds...); err != nil { + if err := cn.writeCmds(cmds...); err != nil { setCmdsErr(cmds, err) return err } diff --git a/pool.go b/pool.go index 4a674a36..bae173da 100644 --- a/pool.go +++ b/pool.go @@ -58,6 +58,16 @@ func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) { } } +func (cn *conn) writeCmds(cmds ...Cmder) error { + buf := cn.buf[:0] + for _, cmd := range cmds { + buf = appendArgs(buf, cmd.args()) + } + + _, err := cn.Write(buf) + return err +} + func (cn *conn) Read(b []byte) (int, error) { if cn.readTimeout != 0 { cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout)) diff --git a/pubsub.go b/pubsub.go index 6ac130ba..1f4e9117 100644 --- a/pubsub.go +++ b/pubsub.go @@ -103,7 +103,7 @@ func (c *PubSub) subscribe(cmd string, channels ...string) error { args := append([]string{cmd}, channels...) req := NewSliceCmd(args...) - return c.writeCmd(cn, req) + return cn.writeCmds(req) } func (c *PubSub) Subscribe(channels ...string) error { @@ -122,7 +122,7 @@ func (c *PubSub) unsubscribe(cmd string, channels ...string) error { args := append([]string{cmd}, channels...) req := NewSliceCmd(args...) - return c.writeCmd(cn, req) + return cn.writeCmds(req) } func (c *PubSub) Unsubscribe(channels ...string) error { diff --git a/redis.go b/redis.go index 4f907d93..ef659a18 100644 --- a/redis.go +++ b/redis.go @@ -9,17 +9,6 @@ import ( type baseClient struct { connPool pool opt *options - cmds []Cmder -} - -func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error { - buf := cn.buf[:0] - for _, cmd := range cmds { - buf = appendArgs(buf, cmd.args()) - } - - _, err := cn.Write(buf) - return err } func (c *baseClient) conn() (*conn, error) { @@ -47,12 +36,7 @@ func (c *baseClient) initConn(cn *conn) error { pool.SetConn(cn) // Client is not closed because we want to reuse underlying connection. - client := &Client{ - baseClient: &baseClient{ - opt: c.opt, - connPool: pool, - }, - } + client := newClient(c.opt, pool) if c.opt.Password != "" { if err := client.Auth(c.opt.Password).Err(); err != nil { @@ -91,15 +75,7 @@ func (c *baseClient) putConn(cn *conn) { } } -func (c *baseClient) Process(cmd Cmder) { - if c.cmds == nil { - c.run(cmd) - } else { - c.cmds = append(c.cmds, cmd) - } -} - -func (c *baseClient) run(cmd Cmder) { +func (c *baseClient) process(cmd Cmder) { cn, err := c.conn() if err != nil { cmd.setErr(err) @@ -118,7 +94,7 @@ func (c *baseClient) run(cmd Cmder) { cn.readTimeout = c.opt.ReadTimeout } - if err := c.writeCmd(cn, cmd); err != nil { + if err := cn.writeCmds(cmd); err != nil { c.freeConn(cn, err) cmd.setErr(err) return @@ -237,8 +213,19 @@ func (opt *Options) options() *options { } } +//------------------------------------------------------------------------------ + type Client struct { *baseClient + commandable +} + +func newClient(opt *options, pool pool) *Client { + base := &baseClient{opt: opt, connPool: pool} + return &Client{ + baseClient: base, + commandable: commandable{process: base.process}, + } } func NewClient(clOpt *Options) *Client { @@ -249,12 +236,7 @@ func NewClient(clOpt *Options) *Client { return net.DialTimeout(clOpt.getNetwork(), clOpt.Addr, opt.DialTimeout) } } - return &Client{ - baseClient: &baseClient{ - opt: opt, - connPool: newConnPool(newConnFunc(dialer), opt), - }, - } + return newClient(opt, newConnPool(newConnFunc(dialer), opt)) } // Deprecated. Use NewClient instead. diff --git a/redis_test.go b/redis_test.go index f9736df1..fc1f0b38 100644 --- a/redis_test.go +++ b/redis_test.go @@ -4,6 +4,7 @@ import ( "net" "os" "os/exec" + "path/filepath" "testing" "time" @@ -126,7 +127,7 @@ func TestGinkgoSuite(t *testing.T) { func execCmd(name string, args ...string) (*os.Process, error) { cmd := exec.Command(name, args...) - if true { + if false { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr } @@ -138,12 +139,12 @@ func connectTo(port string) (client *redis.Client, err error) { Addr: ":" + port, }) - deadline := time.Now().Add(time.Second) + deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { if err = client.Ping().Err(); err == nil { return client, nil } - time.Sleep(100 * time.Millisecond) + time.Sleep(250 * time.Millisecond) } return nil, err @@ -159,11 +160,38 @@ func (p *redisProcess) Close() error { return p.Kill() } +var ( + redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server")) + redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf")) +) + +func redisDir(port string) (string, error) { + dir, err := filepath.Abs(filepath.Join(".test", "instances", port)) + if err != nil { + return "", err + } else if err = os.RemoveAll(dir); err != nil { + return "", err + } else if err = os.MkdirAll(dir, 0775); err != nil { + return "", err + } + return dir, nil +} + func startRedis(port string, args ...string) (*redisProcess, error) { - process, err := execCmd("redis-server", append([]string{"--port", port}, args...)...) + dir, err := redisDir(port) if err != nil { return nil, err } + if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil { + return nil, err + } + + baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir} + process, err := execCmd(redisServerBin, append(baseArgs, args...)...) + if err != nil { + return nil, err + } + client, err := connectTo(port) if err != nil { process.Kill() @@ -173,7 +201,11 @@ func startRedis(port string, args ...string) (*redisProcess, error) { } func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { - process, err := execCmd("redis-server", os.DevNull, "--sentinel", "--port", port) + dir, err := redisDir(port) + if err != nil { + return nil, err + } + process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir) if err != nil { return nil, err } diff --git a/sentinel.go b/sentinel.go index 936621a6..cacf52cf 100644 --- a/sentinel.go +++ b/sentinel.go @@ -92,17 +92,13 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { opt: opt, } - return &Client{ - baseClient: &baseClient{ - opt: opt, - connPool: failover.Pool(), - }, - } + return newClient(opt, failover.Pool()) } //------------------------------------------------------------------------------ type sentinelClient struct { + commandable *baseClient } @@ -113,11 +109,13 @@ func newSentinel(clOpt *Options) *sentinelClient { dialer := func() (net.Conn, error) { return net.DialTimeout("tcp", clOpt.Addr, opt.DialTimeout) } + base := &baseClient{ + opt: opt, + connPool: newConnPool(newConnFunc(dialer), opt), + } return &sentinelClient{ - baseClient: &baseClient{ - opt: opt, - connPool: newConnPool(newConnFunc(dialer), opt), - }, + baseClient: base, + commandable: commandable{process: base.process}, } }