From 487feebef16306e9bc31cbedff76be1840f092a2 Mon Sep 17 00:00:00 2001 From: Joris Minjat Date: Fri, 6 May 2016 11:12:31 -0700 Subject: [PATCH] Add latency based routing to Redis Cluster client. --- cluster.go | 402 +++++++++++++++++++------------- cluster_client_test.go | 50 ++-- cluster_test.go | 10 +- command.go | 133 ++++++++--- command_test.go | 4 +- commands.go | 509 +++++++++++++++++++---------------------- commands_test.go | 21 +- options.go | 3 + parser.go | 69 +++++- redis.go | 8 +- ring.go | 60 +++-- 11 files changed, 760 insertions(+), 509 deletions(-) diff --git a/cluster.go b/cluster.go index 5d56a4f..cd433e7 100644 --- a/cluster.go +++ b/cluster.go @@ -11,6 +11,12 @@ import ( "gopkg.in/redis.v4/internal/pool" ) +type clusterNode struct { + Addr string + Latency int + Client *Client +} + // ClusterClient is a Redis Cluster client representing a pool of zero // or more underlying connections. It's safe for concurrent use by // multiple goroutines. @@ -19,14 +25,14 @@ type ClusterClient struct { opt *ClusterOptions - slotsMx sync.RWMutex // protects slots and addrs - addrs []string - slots [][]string + mu sync.RWMutex + addrs []string + nodes map[string]*clusterNode + slots [][]*clusterNode + closed bool - clientsMx sync.RWMutex // protects clients and closed - clients map[string]*Client - - _closed int32 // atomic + cmdsInfo map[string]*CommandInfo + cmdsInfoOnce *sync.Once // Reports where slots reloading is in progress. reloading uint32 @@ -35,44 +41,63 @@ type ClusterClient struct { // NewClusterClient returns a Redis Cluster client as described in // http://redis.io/topics/cluster-spec. func NewClusterClient(opt *ClusterOptions) *ClusterClient { + if opt.RouteByLatency { + opt.ReadOnly = true + } + client := &ClusterClient{ - opt: opt, - addrs: opt.Addrs, - slots: make([][]string, hashtag.SlotNumber), - clients: make(map[string]*Client), + opt: opt, + nodes: make(map[string]*clusterNode), + + cmdsInfoOnce: new(sync.Once), } client.commandable.process = client.process + + for _, addr := range opt.Addrs { + _ = client.nodeByAddr(addr) + } client.reloadSlots() + return client } -// getClients returns a snapshot of clients for cluster nodes -// this ClusterClient has been working with recently. -// Note that snapshot can contain closed clients. -func (c *ClusterClient) getClients() map[string]*Client { - c.clientsMx.RLock() - clients := make(map[string]*Client, len(c.clients)) - for addr, client := range c.clients { - clients[addr] = client +func (c *ClusterClient) cmdInfo(name string) *CommandInfo { + c.cmdsInfoOnce.Do(func() { + for _, node := range c.nodes { + cmdsInfo, err := node.Client.Command().Result() + if err == nil { + c.cmdsInfo = cmdsInfo + return + } + } + c.cmdsInfoOnce = &sync.Once{} + }) + return c.cmdsInfo[name] +} + +func (c *ClusterClient) getNodes() map[string]*clusterNode { + c.mu.RLock() + var nodes map[string]*clusterNode + if !c.closed { + nodes = make(map[string]*clusterNode, len(c.nodes)) + for addr, node := range c.nodes { + nodes[addr] = node + } } - c.clientsMx.RUnlock() - return clients + c.mu.RUnlock() + return nodes } func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { - addr := c.slotMasterAddr(hashtag.Slot(keys[0])) - client, err := c.getClient(addr) - if err != nil { - return err - } - return client.Watch(fn, keys...) + node := c.slotMasterNode(hashtag.Slot(keys[0])) + return node.Client.Watch(fn, keys...) } // PoolStats returns accumulated connection pool stats. func (c *ClusterClient) PoolStats() *PoolStats { acc := PoolStats{} - for _, client := range c.getClients() { - s := client.connPool.Stats() + for _, node := range c.getNodes() { + s := node.Client.connPool.Stats() acc.Requests += s.Requests acc.Hits += s.Hits acc.Timeouts += s.Timeouts @@ -82,113 +107,163 @@ func (c *ClusterClient) PoolStats() *PoolStats { return &acc } -func (c *ClusterClient) closed() bool { - return atomic.LoadInt32(&c._closed) == 1 -} - // Close closes the cluster client, releasing any open resources. // // It is rare to Close a ClusterClient, as the ClusterClient is meant // to be long-lived and shared between many goroutines. func (c *ClusterClient) Close() error { - if !atomic.CompareAndSwapInt32(&c._closed, 0, 1) { - return pool.ErrClosed + c.mu.Lock() + if !c.closed { + c.closeClients() + c.addrs = nil + c.nodes = nil + c.slots = nil + c.cmdsInfo = nil } - - c.clientsMx.Lock() - c.resetClients() - c.clientsMx.Unlock() - c.setSlots(nil) + c.closed = true + c.mu.Unlock() return nil } -// getClient returns a Client for a given address. -func (c *ClusterClient) getClient(addr string) (*Client, error) { - if c.closed() { - return nil, pool.ErrClosed - } - - if addr == "" { - return c.randomClient() - } - - c.clientsMx.RLock() - client, ok := c.clients[addr] - c.clientsMx.RUnlock() +func (c *ClusterClient) nodeByAddr(addr string) *clusterNode { + c.mu.RLock() + node, ok := c.nodes[addr] + c.mu.RUnlock() if ok { - return client, nil + return node } - c.clientsMx.Lock() - client, ok = c.clients[addr] - if !ok { - opt := c.opt.clientOptions() - opt.Addr = addr - client = NewClient(opt) - c.clients[addr] = client + c.mu.Lock() + if !c.closed { + node, ok = c.nodes[addr] + if !ok { + node = c.newNode(addr) + c.nodes[addr] = node + c.addrs = append(c.addrs, node.Addr) + } } - c.clientsMx.Unlock() + c.mu.Unlock() - return client, nil + return node } -func (c *ClusterClient) slotAddrs(slot int) []string { - c.slotsMx.RLock() - addrs := c.slots[slot] - c.slotsMx.RUnlock() - return addrs -} - -func (c *ClusterClient) slotMasterAddr(slot int) string { - addrs := c.slotAddrs(slot) - if len(addrs) > 0 { - return addrs[0] +func (c *ClusterClient) newNode(addr string) *clusterNode { + opt := c.opt.clientOptions() + opt.Addr = addr + return &clusterNode{ + Addr: addr, + Client: NewClient(opt), } - return "" } -// randomClient returns a Client for the first live node. -func (c *ClusterClient) randomClient() (client *Client, err error) { +func (c *ClusterClient) slotNodes(slot int) []*clusterNode { + c.mu.RLock() + nodes := c.slots[slot] + c.mu.RUnlock() + return nodes +} + +// randomNode returns random live node. +func (c *ClusterClient) randomNode() *clusterNode { + var node *clusterNode for i := 0; i < 10; i++ { - n := rand.Intn(len(c.addrs)) - client, err = c.getClient(c.addrs[n]) - if err != nil { - continue + c.mu.RLock() + addrs := c.addrs + c.mu.RUnlock() + + if len(addrs) == 0 { + return nil } - err = client.ClusterInfo().Err() - if err == nil { - return client, nil + + n := rand.Intn(len(addrs)) + node = c.nodeByAddr(addrs[n]) + + if node.Client.ClusterInfo().Err() == nil { + return node } } - return nil, err + return node +} + +func (c *ClusterClient) slotMasterNode(slot int) *clusterNode { + nodes := c.slotNodes(slot) + if len(nodes) == 0 { + return c.randomNode() + } + return nodes[0] +} + +func (c *ClusterClient) slotSlaveNode(slot int) *clusterNode { + nodes := c.slotNodes(slot) + switch len(nodes) { + case 0: + return c.randomNode() + case 1: + return nodes[0] + case 2: + return nodes[1] + default: + n := rand.Intn(len(nodes)-1) + 1 + return nodes[n] + } + +} + +func (c *ClusterClient) slotClosestNode(slot int) *clusterNode { + nodes := c.slotNodes(slot) + var node *clusterNode + for _, n := range nodes { + if node == nil || n.Latency < node.Latency { + node = n + } + } + return node +} + +func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode) { + cmdInfo := c.cmdInfo(cmd.arg(0)) + if cmdInfo == nil { + return 0, c.randomNode() + } + + if cmdInfo.FirstKeyPos == -1 { + return 0, c.randomNode() + } + + firstKey := cmd.arg(int(cmdInfo.FirstKeyPos)) + slot := hashtag.Slot(firstKey) + + if cmdInfo.ReadOnly && c.opt.ReadOnly { + if c.opt.RouteByLatency { + return slot, c.slotClosestNode(slot) + } + return slot, c.slotSlaveNode(slot) + } + return slot, c.slotMasterNode(slot) } func (c *ClusterClient) process(cmd Cmder) { var ask bool - - slot := hashtag.Slot(cmd.clusterKey()) - - addr := c.slotMasterAddr(slot) - client, err := c.getClient(addr) - if err != nil { - cmd.setErr(err) - return - } + slot, node := c.cmdSlotAndNode(cmd) for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { if attempt > 0 { cmd.reset() } + if node == nil { + cmd.setErr(pool.ErrClosed) + return + } if ask { - pipe := client.Pipeline() + pipe := node.Client.Pipeline() pipe.Process(NewCmd("ASKING")) pipe.Process(cmd) _, _ = pipe.Exec() pipe.Close() ask = false } else { - client.Process(cmd) + node.Client.Process(cmd) } // If there is no (real) error, we are done! @@ -199,10 +274,7 @@ func (c *ClusterClient) process(cmd Cmder) { // On network errors try random node. if shouldRetry(err) { - client, err = c.randomClient() - if err != nil { - return - } + node = c.randomNode() continue } @@ -210,13 +282,11 @@ func (c *ClusterClient) process(cmd Cmder) { var addr string moved, ask, addr = isMovedError(err) if moved || ask { - if moved && c.slotMasterAddr(slot) != addr { + if moved && c.slotMasterNode(slot).Addr != addr { c.lazyReloadSlots() } - client, err = c.getClient(addr) - if err != nil { - return - } + + node = c.nodeByAddr(addr) continue } @@ -224,64 +294,71 @@ func (c *ClusterClient) process(cmd Cmder) { } } -// Closes all clients and returns last error if there are any. -func (c *ClusterClient) resetClients() (retErr error) { - for addr, client := range c.clients { - if err := client.Close(); err != nil && retErr == nil { +// closeClients closes all clients and returns the first error if there are any. +func (c *ClusterClient) closeClients() error { + var retErr error + for _, node := range c.nodes { + if err := node.Client.Close(); err != nil && retErr == nil { retErr = err } - delete(c.clients, addr) } return retErr } -func (c *ClusterClient) setSlots(slots []ClusterSlot) { - c.slotsMx.Lock() - - seen := make(map[string]struct{}) - for _, addr := range c.addrs { - seen[addr] = struct{}{} - } - +func (c *ClusterClient) setSlots(cs []ClusterSlot) { + slots := make([][]*clusterNode, hashtag.SlotNumber) for i := 0; i < hashtag.SlotNumber; i++ { - c.slots[i] = c.slots[i][:0] + slots[i] = nil } - for _, slot := range slots { - var addrs []string - for _, node := range slot.Nodes { - addrs = append(addrs, node.Addr) + for _, s := range cs { + var nodes []*clusterNode + for _, n := range s.Nodes { + nodes = append(nodes, c.nodeByAddr(n.Addr)) } - for i := slot.Start; i <= slot.End; i++ { - c.slots[i] = addrs - } - - for _, node := range slot.Nodes { - if _, ok := seen[node.Addr]; !ok { - c.addrs = append(c.addrs, node.Addr) - seen[node.Addr] = struct{}{} - } + for i := s.Start; i <= s.End; i++ { + slots[i] = nodes } } - c.slotsMx.Unlock() + c.mu.Lock() + if !c.closed { + c.slots = slots + } + c.mu.Unlock() +} + +func (c *ClusterClient) setNodesLatency() { + nodes := c.getNodes() + for _, node := range nodes { + var latency int + for i := 0; i < 10; i++ { + t1 := time.Now() + node.Client.Ping() + latency += int(time.Since(t1) / time.Millisecond) + } + node.Latency = latency + } } func (c *ClusterClient) reloadSlots() { defer atomic.StoreUint32(&c.reloading, 0) - client, err := c.randomClient() - if err != nil { - internal.Logf("randomClient failed: %s", err) + node := c.randomNode() + if node == nil { return } - slots, err := client.ClusterSlots().Result() + slots, err := node.Client.ClusterSlots().Result() if err != nil { - internal.Logf("ClusterSlots failed: %s", err) + internal.Logf("ClusterSlots on addr=%q failed: %s", node.Addr, err) return } + c.setSlots(slots) + if c.opt.RouteByLatency { + c.setNodesLatency() + } } func (c *ClusterClient) lazyReloadSlots() { @@ -297,13 +374,14 @@ func (c *ClusterClient) reaper(frequency time.Duration) { defer ticker.Stop() for _ = range ticker.C { - if c.closed() { + nodes := c.getNodes() + if nodes == nil { break } var n int - for _, client := range c.getClients() { - nn, err := client.connPool.(*pool.ConnPool).ReapStaleConns() + for _, node := range nodes { + nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() if err != nil { internal.Logf("ReapStaleConns failed: %s", err) } else { @@ -334,25 +412,21 @@ func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *ClusterClient) pipelineExec(cmds []Cmder) error { var retErr error - cmdsMap := make(map[string][]Cmder) + cmdsMap := make(map[*clusterNode][]Cmder) for _, cmd := range cmds { - slot := hashtag.Slot(cmd.clusterKey()) - addr := c.slotMasterAddr(slot) - cmdsMap[addr] = append(cmdsMap[addr], cmd) + _, node := c.cmdSlotAndNode(cmd) + cmdsMap[node] = append(cmdsMap[node], cmd) } for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { - failedCmds := make(map[string][]Cmder) + failedCmds := make(map[*clusterNode][]Cmder) - for addr, cmds := range cmdsMap { - client, err := c.getClient(addr) - if err != nil { - setCmdsErr(cmds, err) - retErr = err - continue + for node, cmds := range cmdsMap { + if node == nil { + node = c.randomNode() } - cn, err := client.conn() + cn, err := node.Client.conn() if err != nil { setCmdsErr(cmds, err) retErr = err @@ -363,7 +437,7 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { if err != nil { retErr = err } - client.putConn(cn, err, false) + node.Client.putConn(cn, err, false) } cmdsMap = failedCmds @@ -373,8 +447,8 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { } func (c *ClusterClient) execClusterCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[string][]Cmder, -) (map[string][]Cmder, error) { + cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, +) (map[*clusterNode][]Cmder, error) { if err := writeCmd(cn, cmds...); err != nil { setCmdsErr(cmds, err) return failedCmds, err @@ -388,15 +462,17 @@ func (c *ClusterClient) execClusterCmds( } if isNetworkError(err) { cmd.reset() - failedCmds[""] = append(failedCmds[""], cmds[i:]...) + failedCmds[nil] = append(failedCmds[nil], cmds[i:]...) break } else if moved, ask, addr := isMovedError(err); moved { c.lazyReloadSlots() cmd.reset() - failedCmds[addr] = append(failedCmds[addr], cmd) + node := c.nodeByAddr(addr) + failedCmds[node] = append(failedCmds[node], cmd) } else if ask { cmd.reset() - failedCmds[addr] = append(failedCmds[addr], NewCmd("ASKING"), cmd) + node := c.nodeByAddr(addr) + failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) } else if firstCmdErr == nil { firstCmdErr = err } @@ -418,6 +494,12 @@ type ClusterOptions struct { // Default is 16. MaxRedirects int + // Enables read queries for a connection to a Redis Cluster slave node. + ReadOnly bool + + // Enables routing read-only queries to the closest master or slave node. + RouteByLatency bool + // Following options are copied from Options struct. Password string @@ -446,6 +528,7 @@ func (opt *ClusterOptions) getMaxRedirects() int { func (opt *ClusterOptions) clientOptions() *Options { return &Options{ Password: opt.Password, + ReadOnly: opt.ReadOnly, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, @@ -454,6 +537,7 @@ func (opt *ClusterOptions) clientOptions() *Options { PoolSize: opt.PoolSize, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, + // IdleCheckFrequency is not copied to disable reaper } } diff --git a/cluster_client_test.go b/cluster_client_test.go index 560c438..8277abd 100644 --- a/cluster_client_test.go +++ b/cluster_client_test.go @@ -6,16 +6,21 @@ import ( ) func (c *ClusterClient) SlotAddrs(slot int) []string { - return c.slotAddrs(slot) + var addrs []string + for _, n := range c.slotNodes(slot) { + addrs = append(addrs, n.Addr) + } + return addrs } // SwapSlot swaps a slot's master/slave address // for testing MOVED redirects -func (c *ClusterClient) SwapSlot(pos int) []string { - c.slotsMx.Lock() - defer c.slotsMx.Unlock() - c.slots[pos][0], c.slots[pos][1] = c.slots[pos][1], c.slots[pos][0] - return c.slots[pos] +func (c *ClusterClient) SwapSlotNodes(slot int) []string { + c.mu.Lock() + nodes := c.slots[slot] + nodes[0], nodes[1] = nodes[1], nodes[0] + c.mu.Unlock() + return c.SlotAddrs(slot) } var _ = Describe("ClusterClient", func() { @@ -42,19 +47,26 @@ var _ = Describe("ClusterClient", func() { It("should initialize", func() { Expect(subject.addrs).To(HaveLen(3)) - Expect(subject.slots).To(HaveLen(16384)) }) It("should update slots cache", func() { populate() - Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) - Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) - Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) - Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) - Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) - Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) - Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) - Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) + Expect(subject.slots[0][0].Addr).To(Equal("127.0.0.1:7000")) + Expect(subject.slots[0][1].Addr).To(Equal("127.0.0.1:7004")) + Expect(subject.slots[4095][0].Addr).To(Equal("127.0.0.1:7000")) + Expect(subject.slots[4095][1].Addr).To(Equal("127.0.0.1:7004")) + Expect(subject.slots[4096][0].Addr).To(Equal("127.0.0.1:7001")) + Expect(subject.slots[4096][1].Addr).To(Equal("127.0.0.1:7005")) + Expect(subject.slots[8191][0].Addr).To(Equal("127.0.0.1:7001")) + Expect(subject.slots[8191][1].Addr).To(Equal("127.0.0.1:7005")) + Expect(subject.slots[8192][0].Addr).To(Equal("127.0.0.1:7002")) + Expect(subject.slots[8192][1].Addr).To(Equal("127.0.0.1:7006")) + Expect(subject.slots[12287][0].Addr).To(Equal("127.0.0.1:7002")) + Expect(subject.slots[12287][1].Addr).To(Equal("127.0.0.1:7006")) + Expect(subject.slots[12288][0].Addr).To(Equal("127.0.0.1:7003")) + Expect(subject.slots[12288][1].Addr).To(Equal("127.0.0.1:7007")) + Expect(subject.slots[16383][0].Addr).To(Equal("127.0.0.1:7003")) + Expect(subject.slots[16383][1].Addr).To(Equal("127.0.0.1:7007")) Expect(subject.addrs).To(Equal([]string{ "127.0.0.1:6379", "127.0.0.1:7003", @@ -71,11 +83,9 @@ var _ = Describe("ClusterClient", func() { It("should close", func() { populate() Expect(subject.Close()).NotTo(HaveOccurred()) - Expect(subject.clients).To(BeEmpty()) - Expect(subject.slots[0]).To(BeEmpty()) - Expect(subject.slots[8191]).To(BeEmpty()) - Expect(subject.slots[8192]).To(BeEmpty()) - Expect(subject.slots[16383]).To(BeEmpty()) + Expect(subject.addrs).To(BeEmpty()) + Expect(subject.nodes).To(BeEmpty()) + Expect(subject.slots).To(BeEmpty()) Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed")) }) }) diff --git a/cluster_test.go b/cluster_test.go index 69ffa7a..6d081df 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -301,7 +301,7 @@ var _ = Describe("Cluster", func() { }) It("should CLUSTER READONLY", func() { - res, err := cluster.primary().Readonly().Result() + res, err := cluster.primary().ReadOnly().Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal("OK")) }) @@ -353,7 +353,7 @@ var _ = Describe("Cluster", func() { Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) slot := hashtag.Slot("A") - Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) val, err := client.Get("A").Result() Expect(err).NotTo(HaveOccurred()) @@ -361,7 +361,7 @@ var _ = Describe("Cluster", func() { Eventually(func() []string { return client.SlotAddrs(slot) - }, "5s").Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) + }, "10s").Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) }) It("should return error when there are no attempts left", func() { @@ -371,7 +371,7 @@ var _ = Describe("Cluster", func() { }) slot := hashtag.Slot("A") - Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) err := client.Get("A").Err() Expect(err).To(HaveOccurred()) @@ -435,7 +435,7 @@ var _ = Describe("Cluster", func() { It("performs multi-pipelines", func() { slot := hashtag.Slot("A") - Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) pipe := client.Pipeline() defer pipe.Close() diff --git a/command.go b/command.go index 248625e..2db8fa2 100644 --- a/command.go +++ b/command.go @@ -30,12 +30,12 @@ var ( type Cmder interface { args() []interface{} + arg(int) string readReply(*pool.Conn) error setErr(error) reset() readTimeout() *time.Duration - clusterKey() string Err() error fmt.Stringer @@ -92,10 +92,7 @@ func cmdString(cmd Cmder, val interface{}) string { type baseCmd struct { _args []interface{} - - err error - - _clusterKeyPos int + err error _readTimeout *time.Duration } @@ -111,6 +108,15 @@ func (cmd *baseCmd) args() []interface{} { return cmd._args } +func (cmd *baseCmd) arg(pos int) string { + if len(cmd._args) > pos { + if s, ok := cmd._args[pos].(string); ok { + return s + } + } + return "" +} + func (cmd *baseCmd) readTimeout() *time.Duration { return cmd._readTimeout } @@ -119,17 +125,14 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) { cmd._readTimeout = &d } -func (cmd *baseCmd) clusterKey() string { - if cmd._clusterKeyPos > 0 && cmd._clusterKeyPos < len(cmd._args) { - return fmt.Sprint(cmd._args[cmd._clusterKeyPos]) - } - return "" -} - func (cmd *baseCmd) setErr(e error) { cmd.err = e } +func newBaseCmd(args []interface{}) baseCmd { + return baseCmd{_args: args} +} + //------------------------------------------------------------------------------ type Cmd struct { @@ -139,7 +142,7 @@ type Cmd struct { } func NewCmd(args ...interface{}) *Cmd { - return &Cmd{baseCmd: baseCmd{_args: args}} + return &Cmd{baseCmd: newBaseCmd(args)} } func (cmd *Cmd) reset() { @@ -183,7 +186,8 @@ type SliceCmd struct { } func NewSliceCmd(args ...interface{}) *SliceCmd { - return &SliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &SliceCmd{baseCmd: cmd} } func (cmd *SliceCmd) reset() { @@ -222,11 +226,8 @@ type StatusCmd struct { } func NewStatusCmd(args ...interface{}) *StatusCmd { - return &StatusCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} -} - -func newKeylessStatusCmd(args ...interface{}) *StatusCmd { - return &StatusCmd{baseCmd: baseCmd{_args: args}} + cmd := newBaseCmd(args) + return &StatusCmd{baseCmd: cmd} } func (cmd *StatusCmd) reset() { @@ -260,7 +261,8 @@ type IntCmd struct { } func NewIntCmd(args ...interface{}) *IntCmd { - return &IntCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &IntCmd{baseCmd: cmd} } func (cmd *IntCmd) reset() { @@ -295,9 +297,10 @@ type DurationCmd struct { } func NewDurationCmd(precision time.Duration, args ...interface{}) *DurationCmd { + cmd := newBaseCmd(args) return &DurationCmd{ precision: precision, - baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}, + baseCmd: cmd, } } @@ -337,7 +340,8 @@ type BoolCmd struct { } func NewBoolCmd(args ...interface{}) *BoolCmd { - return &BoolCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &BoolCmd{baseCmd: cmd} } func (cmd *BoolCmd) reset() { @@ -393,7 +397,8 @@ type StringCmd struct { } func NewStringCmd(args ...interface{}) *StringCmd { - return &StringCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &StringCmd{baseCmd: cmd} } func (cmd *StringCmd) reset() { @@ -468,7 +473,8 @@ type FloatCmd struct { } func NewFloatCmd(args ...interface{}) *FloatCmd { - return &FloatCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &FloatCmd{baseCmd: cmd} } func (cmd *FloatCmd) reset() { @@ -502,7 +508,8 @@ type StringSliceCmd struct { } func NewStringSliceCmd(args ...interface{}) *StringSliceCmd { - return &StringSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &StringSliceCmd{baseCmd: cmd} } func (cmd *StringSliceCmd) reset() { @@ -541,7 +548,8 @@ type BoolSliceCmd struct { } func NewBoolSliceCmd(args ...interface{}) *BoolSliceCmd { - return &BoolSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &BoolSliceCmd{baseCmd: cmd} } func (cmd *BoolSliceCmd) reset() { @@ -580,7 +588,8 @@ type StringStringMapCmd struct { } func NewStringStringMapCmd(args ...interface{}) *StringStringMapCmd { - return &StringStringMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &StringStringMapCmd{baseCmd: cmd} } func (cmd *StringStringMapCmd) reset() { @@ -619,7 +628,8 @@ type StringIntMapCmd struct { } func NewStringIntMapCmd(args ...interface{}) *StringIntMapCmd { - return &StringIntMapCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &StringIntMapCmd{baseCmd: cmd} } func (cmd *StringIntMapCmd) Val() map[string]int64 { @@ -658,7 +668,8 @@ type ZSliceCmd struct { } func NewZSliceCmd(args ...interface{}) *ZSliceCmd { - return &ZSliceCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &ZSliceCmd{baseCmd: cmd} } func (cmd *ZSliceCmd) reset() { @@ -698,8 +709,9 @@ type ScanCmd struct { } func NewScanCmd(args ...interface{}) *ScanCmd { + cmd := newBaseCmd(args) return &ScanCmd{ - baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}, + baseCmd: cmd, } } @@ -752,7 +764,8 @@ type ClusterSlotsCmd struct { } func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd { - return &ClusterSlotsCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} + cmd := newBaseCmd(args) + return &ClusterSlotsCmd{baseCmd: cmd} } func (cmd *ClusterSlotsCmd) Val() []ClusterSlot { @@ -833,12 +846,10 @@ func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd { if q.Sort != "" { args = append(args, q.Sort) } + cmd := newBaseCmd(args) return &GeoLocationCmd{ - baseCmd: baseCmd{ - _args: args, - _clusterKeyPos: 1, - }, - q: q, + baseCmd: cmd, + q: q, } } @@ -868,3 +879,53 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { cmd.locations = reply.([]GeoLocation) return nil } + +//------------------------------------------------------------------------------ + +type CommandInfo struct { + Name string + Arity int8 + Flags []string + FirstKeyPos int8 + LastKeyPos int8 + StepCount int8 + ReadOnly bool +} + +type CommandsInfoCmd struct { + baseCmd + + val map[string]*CommandInfo +} + +func NewCommandsInfoCmd(args ...interface{}) *CommandsInfoCmd { + cmd := newBaseCmd(args) + return &CommandsInfoCmd{baseCmd: cmd} +} + +func (cmd *CommandsInfoCmd) Val() map[string]*CommandInfo { + return cmd.val +} + +func (cmd *CommandsInfoCmd) Result() (map[string]*CommandInfo, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *CommandsInfoCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *CommandsInfoCmd) reset() { + cmd.val = nil + cmd.err = nil +} + +func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { + v, err := readArrayReply(cn, commandInfoSliceParser) + if err != nil { + cmd.err = err + return err + } + cmd.val = v.(map[string]*CommandInfo) + return nil +} diff --git a/command_test.go b/command_test.go index 77391e4..ab31f3f 100644 --- a/command_test.go +++ b/command_test.go @@ -21,10 +21,10 @@ var _ = Describe("Cmd", func() { It("implements Stringer", func() { set := client.Set("foo", "bar", 0) - Expect(set.String()).To(Equal("SET foo bar: OK")) + Expect(set.String()).To(Equal("set foo bar: OK")) get := client.Get("foo") - Expect(get.String()).To(Equal("GET foo: bar")) + Expect(get.String()).To(Equal("get foo: bar")) }) It("has val/err", func() { diff --git a/commands.go b/commands.go index e6c7e8a..00b047f 100644 --- a/commands.go +++ b/commands.go @@ -62,20 +62,19 @@ func (c *commandable) Process(cmd Cmder) { //------------------------------------------------------------------------------ func (c *commandable) Auth(password string) *StatusCmd { - cmd := newKeylessStatusCmd("AUTH", password) + cmd := NewStatusCmd("auth", password) c.Process(cmd) return cmd } func (c *commandable) Echo(message string) *StringCmd { - cmd := NewStringCmd("ECHO", message) - cmd._clusterKeyPos = 0 + cmd := NewStringCmd("echo", message) c.Process(cmd) return cmd } func (c *commandable) Ping() *StatusCmd { - cmd := newKeylessStatusCmd("PING") + cmd := NewStatusCmd("ping") c.Process(cmd) return cmd } @@ -85,7 +84,7 @@ func (c *commandable) Quit() *StatusCmd { } func (c *commandable) Select(index int64) *StatusCmd { - cmd := newKeylessStatusCmd("SELECT", index) + cmd := NewStatusCmd("select", index) c.Process(cmd) return cmd } @@ -104,110 +103,106 @@ func (c *commandable) Del(keys ...string) *IntCmd { } func (c *commandable) Dump(key string) *StringCmd { - cmd := NewStringCmd("DUMP", key) + cmd := NewStringCmd("dump", key) c.Process(cmd) return cmd } func (c *commandable) Exists(key string) *BoolCmd { - cmd := NewBoolCmd("EXISTS", key) + cmd := NewBoolCmd("exists", key) c.Process(cmd) return cmd } func (c *commandable) Expire(key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd("EXPIRE", key, formatSec(expiration)) + cmd := NewBoolCmd("expire", key, formatSec(expiration)) c.Process(cmd) return cmd } func (c *commandable) ExpireAt(key string, tm time.Time) *BoolCmd { - cmd := NewBoolCmd("EXPIREAT", key, tm.Unix()) + cmd := NewBoolCmd("expireat", key, tm.Unix()) c.Process(cmd) return cmd } func (c *commandable) Keys(pattern string) *StringSliceCmd { - cmd := NewStringSliceCmd("KEYS", pattern) + cmd := NewStringSliceCmd("keys", pattern) c.Process(cmd) return cmd } func (c *commandable) Migrate(host, port, key string, db int64, timeout time.Duration) *StatusCmd { cmd := NewStatusCmd( - "MIGRATE", + "migrate", host, port, key, db, formatMs(timeout), ) - cmd._clusterKeyPos = 3 cmd.setReadTimeout(readTimeout(timeout)) c.Process(cmd) return cmd } func (c *commandable) Move(key string, db int64) *BoolCmd { - cmd := NewBoolCmd("MOVE", key, db) + cmd := NewBoolCmd("move", key, db) c.Process(cmd) return cmd } func (c *commandable) ObjectRefCount(keys ...string) *IntCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "OBJECT" - args[1] = "REFCOUNT" + args[0] = "object" + args[1] = "refcount" for i, key := range keys { args[2+i] = key } cmd := NewIntCmd(args...) - cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } func (c *commandable) ObjectEncoding(keys ...string) *StringCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "OBJECT" - args[1] = "ENCODING" + args[0] = "object" + args[1] = "encoding" for i, key := range keys { args[2+i] = key } cmd := NewStringCmd(args...) - cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } func (c *commandable) ObjectIdleTime(keys ...string) *DurationCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "OBJECT" - args[1] = "IDLETIME" + args[0] = "object" + args[1] = "idletime" for i, key := range keys { args[2+i] = key } cmd := NewDurationCmd(time.Second, args...) - cmd._clusterKeyPos = 2 c.Process(cmd) return cmd } func (c *commandable) Persist(key string) *BoolCmd { - cmd := NewBoolCmd("PERSIST", key) + cmd := NewBoolCmd("persist", key) c.Process(cmd) return cmd } func (c *commandable) PExpire(key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd("PEXPIRE", key, formatMs(expiration)) + cmd := NewBoolCmd("pexpire", key, formatMs(expiration)) c.Process(cmd) return cmd } func (c *commandable) PExpireAt(key string, tm time.Time) *BoolCmd { cmd := NewBoolCmd( - "PEXPIREAT", + "pexpireat", key, tm.UnixNano()/int64(time.Millisecond), ) @@ -216,32 +211,32 @@ func (c *commandable) PExpireAt(key string, tm time.Time) *BoolCmd { } func (c *commandable) PTTL(key string) *DurationCmd { - cmd := NewDurationCmd(time.Millisecond, "PTTL", key) + cmd := NewDurationCmd(time.Millisecond, "pttl", key) c.Process(cmd) return cmd } func (c *commandable) RandomKey() *StringCmd { - cmd := NewStringCmd("RANDOMKEY") + cmd := NewStringCmd("randomkey") c.Process(cmd) return cmd } func (c *commandable) Rename(key, newkey string) *StatusCmd { - cmd := NewStatusCmd("RENAME", key, newkey) + cmd := NewStatusCmd("rename", key, newkey) c.Process(cmd) return cmd } func (c *commandable) RenameNX(key, newkey string) *BoolCmd { - cmd := NewBoolCmd("RENAMENX", key, newkey) + cmd := NewBoolCmd("renamenx", key, newkey) c.Process(cmd) return cmd } func (c *commandable) Restore(key string, ttl time.Duration, value string) *StatusCmd { cmd := NewStatusCmd( - "RESTORE", + "restore", key, formatMs(ttl), value, @@ -252,11 +247,11 @@ func (c *commandable) Restore(key string, ttl time.Duration, value string) *Stat func (c *commandable) RestoreReplace(key string, ttl time.Duration, value string) *StatusCmd { cmd := NewStatusCmd( - "RESTORE", + "restore", key, formatMs(ttl), value, - "REPLACE", + "replace", ) c.Process(cmd) return cmd @@ -272,24 +267,24 @@ type Sort struct { } func (sort *Sort) args(key string) []interface{} { - args := []interface{}{"SORT", key} + args := []interface{}{"sort", key} if sort.By != "" { - args = append(args, "BY", sort.By) + args = append(args, "by", sort.By) } if sort.Offset != 0 || sort.Count != 0 { - args = append(args, "LIMIT", sort.Offset, sort.Count) + args = append(args, "limit", sort.Offset, sort.Count) } for _, get := range sort.Get { - args = append(args, "GET", get) + args = append(args, "get", get) } if sort.Order != "" { args = append(args, sort.Order) } if sort.IsAlpha { - args = append(args, "ALPHA") + args = append(args, "alpha") } if sort.Store != "" { - args = append(args, "STORE", sort.Store) + args = append(args, "store", sort.Store) } return args } @@ -307,24 +302,24 @@ func (c *commandable) SortInterfaces(key string, sort Sort) *SliceCmd { } func (c *commandable) TTL(key string) *DurationCmd { - cmd := NewDurationCmd(time.Second, "TTL", key) + cmd := NewDurationCmd(time.Second, "ttl", key) c.Process(cmd) return cmd } func (c *commandable) Type(key string) *StatusCmd { - cmd := NewStatusCmd("TYPE", key) + cmd := NewStatusCmd("type", key) c.Process(cmd) return cmd } func (c *commandable) Scan(cursor uint64, match string, count int64) Scanner { - args := []interface{}{"SCAN", cursor} + args := []interface{}{"scan", cursor} if match != "" { - args = append(args, "MATCH", match) + args = append(args, "match", match) } if count > 0 { - args = append(args, "COUNT", count) + args = append(args, "count", count) } cmd := NewScanCmd(args...) c.Process(cmd) @@ -335,12 +330,12 @@ func (c *commandable) Scan(cursor uint64, match string, count int64) Scanner { } func (c *commandable) SScan(key string, cursor uint64, match string, count int64) Scanner { - args := []interface{}{"SSCAN", key, cursor} + args := []interface{}{"sscan", key, cursor} if match != "" { - args = append(args, "MATCH", match) + args = append(args, "match", match) } if count > 0 { - args = append(args, "COUNT", count) + args = append(args, "count", count) } cmd := NewScanCmd(args...) c.Process(cmd) @@ -351,12 +346,12 @@ func (c *commandable) SScan(key string, cursor uint64, match string, count int64 } func (c *commandable) HScan(key string, cursor uint64, match string, count int64) Scanner { - args := []interface{}{"HSCAN", key, cursor} + args := []interface{}{"hscan", key, cursor} if match != "" { - args = append(args, "MATCH", match) + args = append(args, "match", match) } if count > 0 { - args = append(args, "COUNT", count) + args = append(args, "count", count) } cmd := NewScanCmd(args...) c.Process(cmd) @@ -367,12 +362,12 @@ func (c *commandable) HScan(key string, cursor uint64, match string, count int64 } func (c *commandable) ZScan(key string, cursor uint64, match string, count int64) Scanner { - args := []interface{}{"ZSCAN", key, cursor} + args := []interface{}{"zscan", key, cursor} if match != "" { - args = append(args, "MATCH", match) + args = append(args, "match", match) } if count > 0 { - args = append(args, "COUNT", count) + args = append(args, "count", count) } cmd := NewScanCmd(args...) c.Process(cmd) @@ -385,7 +380,7 @@ func (c *commandable) ZScan(key string, cursor uint64, match string, count int64 //------------------------------------------------------------------------------ func (c *commandable) Append(key, value string) *IntCmd { - cmd := NewIntCmd("APPEND", key, value) + cmd := NewIntCmd("append", key, value) c.Process(cmd) return cmd } @@ -395,7 +390,7 @@ type BitCount struct { } func (c *commandable) BitCount(key string, bitCount *BitCount) *IntCmd { - args := []interface{}{"BITCOUNT", key} + args := []interface{}{"bitcount", key} if bitCount != nil { args = append( args, @@ -410,7 +405,7 @@ func (c *commandable) BitCount(key string, bitCount *BitCount) *IntCmd { func (c *commandable) bitOp(op, destKey string, keys ...string) *IntCmd { args := make([]interface{}, 3+len(keys)) - args[0] = "BITOP" + args[0] = "bitop" args[1] = op args[2] = destKey for i, key := range keys { @@ -422,24 +417,24 @@ func (c *commandable) bitOp(op, destKey string, keys ...string) *IntCmd { } func (c *commandable) BitOpAnd(destKey string, keys ...string) *IntCmd { - return c.bitOp("AND", destKey, keys...) + return c.bitOp("and", destKey, keys...) } func (c *commandable) BitOpOr(destKey string, keys ...string) *IntCmd { - return c.bitOp("OR", destKey, keys...) + return c.bitOp("or", destKey, keys...) } func (c *commandable) BitOpXor(destKey string, keys ...string) *IntCmd { - return c.bitOp("XOR", destKey, keys...) + return c.bitOp("xor", destKey, keys...) } func (c *commandable) BitOpNot(destKey string, key string) *IntCmd { - return c.bitOp("NOT", destKey, key) + return c.bitOp("not", destKey, key) } func (c *commandable) BitPos(key string, bit int64, pos ...int64) *IntCmd { args := make([]interface{}, 3+len(pos)) - args[0] = "BITPOS" + args[0] = "bitpos" args[1] = key args[2] = bit switch len(pos) { @@ -458,62 +453,62 @@ func (c *commandable) BitPos(key string, bit int64, pos ...int64) *IntCmd { } func (c *commandable) Decr(key string) *IntCmd { - cmd := NewIntCmd("DECR", key) + cmd := NewIntCmd("decr", key) c.Process(cmd) return cmd } func (c *commandable) DecrBy(key string, decrement int64) *IntCmd { - cmd := NewIntCmd("DECRBY", key, decrement) + cmd := NewIntCmd("decrby", key, decrement) c.Process(cmd) return cmd } func (c *commandable) Get(key string) *StringCmd { - cmd := NewStringCmd("GET", key) + cmd := NewStringCmd("get", key) c.Process(cmd) return cmd } func (c *commandable) GetBit(key string, offset int64) *IntCmd { - cmd := NewIntCmd("GETBIT", key, offset) + cmd := NewIntCmd("getbit", key, offset) c.Process(cmd) return cmd } func (c *commandable) GetRange(key string, start, end int64) *StringCmd { - cmd := NewStringCmd("GETRANGE", key, start, end) + cmd := NewStringCmd("getrange", key, start, end) c.Process(cmd) return cmd } func (c *commandable) GetSet(key string, value interface{}) *StringCmd { - cmd := NewStringCmd("GETSET", key, value) + cmd := NewStringCmd("getset", key, value) c.Process(cmd) return cmd } func (c *commandable) Incr(key string) *IntCmd { - cmd := NewIntCmd("INCR", key) + cmd := NewIntCmd("incr", key) c.Process(cmd) return cmd } func (c *commandable) IncrBy(key string, value int64) *IntCmd { - cmd := NewIntCmd("INCRBY", key, value) + cmd := NewIntCmd("incrby", key, value) c.Process(cmd) return cmd } func (c *commandable) IncrByFloat(key string, value float64) *FloatCmd { - cmd := NewFloatCmd("INCRBYFLOAT", key, value) + cmd := NewFloatCmd("incrbyfloat", key, value) c.Process(cmd) return cmd } func (c *commandable) MGet(keys ...string) *SliceCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "MGET" + args[0] = "mget" for i, key := range keys { args[1+i] = key } @@ -524,7 +519,7 @@ func (c *commandable) MGet(keys ...string) *SliceCmd { func (c *commandable) MSet(pairs ...string) *StatusCmd { args := make([]interface{}, 1+len(pairs)) - args[0] = "MSET" + args[0] = "mset" for i, pair := range pairs { args[1+i] = pair } @@ -535,7 +530,7 @@ func (c *commandable) MSet(pairs ...string) *StatusCmd { func (c *commandable) MSetNX(pairs ...string) *BoolCmd { args := make([]interface{}, 1+len(pairs)) - args[0] = "MSETNX" + args[0] = "msetnx" for i, pair := range pairs { args[1+i] = pair } @@ -548,15 +543,15 @@ func (c *commandable) MSetNX(pairs ...string) *BoolCmd { // // Zero expiration means the key has no expiration time. func (c *commandable) Set(key string, value interface{}, expiration time.Duration) *StatusCmd { - args := make([]interface{}, 3, 5) - args[0] = "SET" + args := make([]interface{}, 3, 4) + args[0] = "set" args[1] = key args[2] = value if expiration > 0 { if usePrecise(expiration) { - args = append(args, "PX", formatMs(expiration)) + args = append(args, "px", formatMs(expiration)) } else { - args = append(args, "EX", formatSec(expiration)) + args = append(args, "ex", formatSec(expiration)) } } cmd := NewStatusCmd(args...) @@ -582,12 +577,12 @@ func (c *commandable) SetNX(key string, value interface{}, expiration time.Durat var cmd *BoolCmd if expiration == 0 { // Use old `SETNX` to support old Redis versions. - cmd = NewBoolCmd("SETNX", key, value) + cmd = NewBoolCmd("setnx", key, value) } else { if usePrecise(expiration) { - cmd = NewBoolCmd("SET", key, value, "PX", formatMs(expiration), "NX") + cmd = NewBoolCmd("set", key, value, "px", formatMs(expiration), "nx") } else { - cmd = NewBoolCmd("SET", key, value, "EX", formatSec(expiration), "NX") + cmd = NewBoolCmd("set", key, value, "ex", formatSec(expiration), "nx") } } c.Process(cmd) @@ -600,22 +595,22 @@ func (c *commandable) SetNX(key string, value interface{}, expiration time.Durat func (c *commandable) SetXX(key string, value interface{}, expiration time.Duration) *BoolCmd { var cmd *BoolCmd if usePrecise(expiration) { - cmd = NewBoolCmd("SET", key, value, "PX", formatMs(expiration), "XX") + cmd = NewBoolCmd("set", key, value, "px", formatMs(expiration), "xx") } else { - cmd = NewBoolCmd("SET", key, value, "EX", formatSec(expiration), "XX") + cmd = NewBoolCmd("set", key, value, "ex", formatSec(expiration), "xx") } c.Process(cmd) return cmd } func (c *commandable) SetRange(key string, offset int64, value string) *IntCmd { - cmd := NewIntCmd("SETRANGE", key, offset, value) + cmd := NewIntCmd("setrange", key, offset, value) c.Process(cmd) return cmd } func (c *commandable) StrLen(key string) *IntCmd { - cmd := NewIntCmd("STRLEN", key) + cmd := NewIntCmd("strlen", key) c.Process(cmd) return cmd } @@ -624,7 +619,7 @@ func (c *commandable) StrLen(key string) *IntCmd { func (c *commandable) HDel(key string, fields ...string) *IntCmd { args := make([]interface{}, 2+len(fields)) - args[0] = "HDEL" + args[0] = "hdel" args[1] = key for i, field := range fields { args[2+i] = field @@ -635,50 +630,50 @@ func (c *commandable) HDel(key string, fields ...string) *IntCmd { } func (c *commandable) HExists(key, field string) *BoolCmd { - cmd := NewBoolCmd("HEXISTS", key, field) + cmd := NewBoolCmd("hexists", key, field) c.Process(cmd) return cmd } func (c *commandable) HGet(key, field string) *StringCmd { - cmd := NewStringCmd("HGET", key, field) + cmd := NewStringCmd("hget", key, field) c.Process(cmd) return cmd } func (c *commandable) HGetAll(key string) *StringStringMapCmd { - cmd := NewStringStringMapCmd("HGETALL", key) + cmd := NewStringStringMapCmd("hgetall", key) c.Process(cmd) return cmd } func (c *commandable) HIncrBy(key, field string, incr int64) *IntCmd { - cmd := NewIntCmd("HINCRBY", key, field, incr) + cmd := NewIntCmd("hincrby", key, field, incr) c.Process(cmd) return cmd } func (c *commandable) HIncrByFloat(key, field string, incr float64) *FloatCmd { - cmd := NewFloatCmd("HINCRBYFLOAT", key, field, incr) + cmd := NewFloatCmd("hincrbyfloat", key, field, incr) c.Process(cmd) return cmd } func (c *commandable) HKeys(key string) *StringSliceCmd { - cmd := NewStringSliceCmd("HKEYS", key) + cmd := NewStringSliceCmd("hkeys", key) c.Process(cmd) return cmd } func (c *commandable) HLen(key string) *IntCmd { - cmd := NewIntCmd("HLEN", key) + cmd := NewIntCmd("hlen", key) c.Process(cmd) return cmd } func (c *commandable) HMGet(key string, fields ...string) *SliceCmd { args := make([]interface{}, 2+len(fields)) - args[0] = "HMGET" + args[0] = "hmget" args[1] = key for i, field := range fields { args[2+i] = field @@ -690,7 +685,7 @@ func (c *commandable) HMGet(key string, fields ...string) *SliceCmd { func (c *commandable) HMSet(key string, fields map[string]string) *StatusCmd { args := make([]interface{}, 2+len(fields)*2) - args[0] = "HMSET" + args[0] = "hmset" args[1] = key i := 2 for k, v := range fields { @@ -704,19 +699,19 @@ func (c *commandable) HMSet(key string, fields map[string]string) *StatusCmd { } func (c *commandable) HSet(key, field, value string) *BoolCmd { - cmd := NewBoolCmd("HSET", key, field, value) + cmd := NewBoolCmd("hset", key, field, value) c.Process(cmd) return cmd } func (c *commandable) HSetNX(key, field, value string) *BoolCmd { - cmd := NewBoolCmd("HSETNX", key, field, value) + cmd := NewBoolCmd("hsetnx", key, field, value) c.Process(cmd) return cmd } func (c *commandable) HVals(key string) *StringSliceCmd { - cmd := NewStringSliceCmd("HVALS", key) + cmd := NewStringSliceCmd("hvals", key) c.Process(cmd) return cmd } @@ -724,8 +719,8 @@ func (c *commandable) HVals(key string) *StringSliceCmd { //------------------------------------------------------------------------------ func (c *commandable) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd { - args := make([]interface{}, 2+len(keys)) - args[0] = "BLPOP" + args := make([]interface{}, 1+len(keys)+1) + args[0] = "blpop" for i, key := range keys { args[1+i] = key } @@ -737,12 +732,12 @@ func (c *commandable) BLPop(timeout time.Duration, keys ...string) *StringSliceC } func (c *commandable) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd { - args := make([]interface{}, 2+len(keys)) - args[0] = "BRPOP" + args := make([]interface{}, 1+len(keys)+1) + args[0] = "brpop" for i, key := range keys { args[1+i] = key } - args[len(args)-1] = formatSec(timeout) + args[len(keys)+1] = formatSec(timeout) cmd := NewStringSliceCmd(args...) cmd.setReadTimeout(readTimeout(timeout)) c.Process(cmd) @@ -751,7 +746,7 @@ func (c *commandable) BRPop(timeout time.Duration, keys ...string) *StringSliceC func (c *commandable) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd { cmd := NewStringCmd( - "BRPOPLPUSH", + "brpoplpush", source, destination, formatSec(timeout), @@ -762,32 +757,32 @@ func (c *commandable) BRPopLPush(source, destination string, timeout time.Durati } func (c *commandable) LIndex(key string, index int64) *StringCmd { - cmd := NewStringCmd("LINDEX", key, index) + cmd := NewStringCmd("lindex", key, index) c.Process(cmd) return cmd } func (c *commandable) LInsert(key, op, pivot, value string) *IntCmd { - cmd := NewIntCmd("LINSERT", key, op, pivot, value) + cmd := NewIntCmd("linsert", key, op, pivot, value) c.Process(cmd) return cmd } func (c *commandable) LLen(key string) *IntCmd { - cmd := NewIntCmd("LLEN", key) + cmd := NewIntCmd("llen", key) c.Process(cmd) return cmd } func (c *commandable) LPop(key string) *StringCmd { - cmd := NewStringCmd("LPOP", key) + cmd := NewStringCmd("lpop", key) c.Process(cmd) return cmd } func (c *commandable) LPush(key string, values ...string) *IntCmd { args := make([]interface{}, 2+len(values)) - args[0] = "LPUSH" + args[0] = "lpush" args[1] = key for i, value := range values { args[2+i] = value @@ -798,14 +793,14 @@ func (c *commandable) LPush(key string, values ...string) *IntCmd { } func (c *commandable) LPushX(key, value interface{}) *IntCmd { - cmd := NewIntCmd("LPUSHX", key, value) + cmd := NewIntCmd("lpushx", key, value) c.Process(cmd) return cmd } func (c *commandable) LRange(key string, start, stop int64) *StringSliceCmd { cmd := NewStringSliceCmd( - "LRANGE", + "lrange", key, start, stop, @@ -815,20 +810,20 @@ func (c *commandable) LRange(key string, start, stop int64) *StringSliceCmd { } func (c *commandable) LRem(key string, count int64, value interface{}) *IntCmd { - cmd := NewIntCmd("LREM", key, count, value) + cmd := NewIntCmd("lrem", key, count, value) c.Process(cmd) return cmd } func (c *commandable) LSet(key string, index int64, value interface{}) *StatusCmd { - cmd := NewStatusCmd("LSET", key, index, value) + cmd := NewStatusCmd("lset", key, index, value) c.Process(cmd) return cmd } func (c *commandable) LTrim(key string, start, stop int64) *StatusCmd { cmd := NewStatusCmd( - "LTRIM", + "ltrim", key, start, stop, @@ -838,20 +833,20 @@ func (c *commandable) LTrim(key string, start, stop int64) *StatusCmd { } func (c *commandable) RPop(key string) *StringCmd { - cmd := NewStringCmd("RPOP", key) + cmd := NewStringCmd("rpop", key) c.Process(cmd) return cmd } func (c *commandable) RPopLPush(source, destination string) *StringCmd { - cmd := NewStringCmd("RPOPLPUSH", source, destination) + cmd := NewStringCmd("rpoplpush", source, destination) c.Process(cmd) return cmd } func (c *commandable) RPush(key string, values ...string) *IntCmd { args := make([]interface{}, 2+len(values)) - args[0] = "RPUSH" + args[0] = "rpush" args[1] = key for i, value := range values { args[2+i] = value @@ -862,7 +857,7 @@ func (c *commandable) RPush(key string, values ...string) *IntCmd { } func (c *commandable) RPushX(key string, value interface{}) *IntCmd { - cmd := NewIntCmd("RPUSHX", key, value) + cmd := NewIntCmd("rpushx", key, value) c.Process(cmd) return cmd } @@ -871,7 +866,7 @@ func (c *commandable) RPushX(key string, value interface{}) *IntCmd { func (c *commandable) SAdd(key string, members ...string) *IntCmd { args := make([]interface{}, 2+len(members)) - args[0] = "SADD" + args[0] = "sadd" args[1] = key for i, member := range members { args[2+i] = member @@ -882,14 +877,14 @@ func (c *commandable) SAdd(key string, members ...string) *IntCmd { } func (c *commandable) SCard(key string) *IntCmd { - cmd := NewIntCmd("SCARD", key) + cmd := NewIntCmd("scard", key) c.Process(cmd) return cmd } func (c *commandable) SDiff(keys ...string) *StringSliceCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "SDIFF" + args[0] = "sdiff" for i, key := range keys { args[1+i] = key } @@ -900,7 +895,7 @@ func (c *commandable) SDiff(keys ...string) *StringSliceCmd { func (c *commandable) SDiffStore(destination string, keys ...string) *IntCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "SDIFFSTORE" + args[0] = "sdiffstore" args[1] = destination for i, key := range keys { args[2+i] = key @@ -912,7 +907,7 @@ func (c *commandable) SDiffStore(destination string, keys ...string) *IntCmd { func (c *commandable) SInter(keys ...string) *StringSliceCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "SINTER" + args[0] = "sinter" for i, key := range keys { args[1+i] = key } @@ -923,7 +918,7 @@ func (c *commandable) SInter(keys ...string) *StringSliceCmd { func (c *commandable) SInterStore(destination string, keys ...string) *IntCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "SINTERSTORE" + args[0] = "sinterstore" args[1] = destination for i, key := range keys { args[2+i] = key @@ -934,46 +929,46 @@ func (c *commandable) SInterStore(destination string, keys ...string) *IntCmd { } func (c *commandable) SIsMember(key string, member interface{}) *BoolCmd { - cmd := NewBoolCmd("SISMEMBER", key, member) + cmd := NewBoolCmd("sismember", key, member) c.Process(cmd) return cmd } func (c *commandable) SMembers(key string) *StringSliceCmd { - cmd := NewStringSliceCmd("SMEMBERS", key) + cmd := NewStringSliceCmd("smembers", key) c.Process(cmd) return cmd } func (c *commandable) SMove(source, destination string, member interface{}) *BoolCmd { - cmd := NewBoolCmd("SMOVE", source, destination, member) + cmd := NewBoolCmd("smove", source, destination, member) c.Process(cmd) return cmd } func (c *commandable) SPop(key string) *StringCmd { - cmd := NewStringCmd("SPOP", key) + cmd := NewStringCmd("spop", key) c.Process(cmd) return cmd } // Redis `SRANDMEMBER key` command. func (c *commandable) SRandMember(key string) *StringCmd { - cmd := NewStringCmd("SRANDMEMBER", key) + cmd := NewStringCmd("srandmember", key) c.Process(cmd) return cmd } // Redis `SRANDMEMBER key count` command. func (c *commandable) SRandMemberN(key string, count int64) *StringSliceCmd { - cmd := NewStringSliceCmd("SRANDMEMBER", key, count) + cmd := NewStringSliceCmd("srandmember", key, count) c.Process(cmd) return cmd } func (c *commandable) SRem(key string, members ...string) *IntCmd { args := make([]interface{}, 2+len(members)) - args[0] = "SREM" + args[0] = "srem" args[1] = key for i, member := range members { args[2+i] = member @@ -985,7 +980,7 @@ func (c *commandable) SRem(key string, members ...string) *IntCmd { func (c *commandable) SUnion(keys ...string) *StringSliceCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "SUNION" + args[0] = "sunion" for i, key := range keys { args[1+i] = key } @@ -996,7 +991,7 @@ func (c *commandable) SUnion(keys ...string) *StringSliceCmd { func (c *commandable) SUnionStore(destination string, keys ...string) *IntCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "SUNIONSTORE" + args[0] = "sunionstore" args[1] = destination for i, key := range keys { args[2+i] = key @@ -1035,7 +1030,7 @@ func (c *commandable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { func (c *commandable) ZAdd(key string, members ...Z) *IntCmd { const n = 2 a := make([]interface{}, n+2*len(members)) - a[0], a[1] = "ZADD", key + a[0], a[1] = "zadd", key return c.zAdd(a, n, members...) } @@ -1043,7 +1038,7 @@ func (c *commandable) ZAdd(key string, members ...Z) *IntCmd { func (c *commandable) ZAddNX(key string, members ...Z) *IntCmd { const n = 3 a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2] = "ZADD", key, "NX" + a[0], a[1], a[2] = "zadd", key, "nx" return c.zAdd(a, n, members...) } @@ -1051,7 +1046,7 @@ func (c *commandable) ZAddNX(key string, members ...Z) *IntCmd { func (c *commandable) ZAddXX(key string, members ...Z) *IntCmd { const n = 3 a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2] = "ZADD", key, "XX" + a[0], a[1], a[2] = "zadd", key, "xx" return c.zAdd(a, n, members...) } @@ -1059,7 +1054,7 @@ func (c *commandable) ZAddXX(key string, members ...Z) *IntCmd { func (c *commandable) ZAddCh(key string, members ...Z) *IntCmd { const n = 3 a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2] = "ZADD", key, "CH" + a[0], a[1], a[2] = "zadd", key, "ch" return c.zAdd(a, n, members...) } @@ -1067,7 +1062,7 @@ func (c *commandable) ZAddCh(key string, members ...Z) *IntCmd { func (c *commandable) ZAddNXCh(key string, members ...Z) *IntCmd { const n = 4 a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2], a[3] = "ZADD", key, "NX", "CH" + a[0], a[1], a[2], a[3] = "zadd", key, "nx", "ch" return c.zAdd(a, n, members...) } @@ -1075,7 +1070,7 @@ func (c *commandable) ZAddNXCh(key string, members ...Z) *IntCmd { func (c *commandable) ZAddXXCh(key string, members ...Z) *IntCmd { const n = 4 a := make([]interface{}, n+2*len(members)) - a[0], a[1], a[2], a[3] = "ZADD", key, "XX", "CH" + a[0], a[1], a[2], a[3] = "zadd", key, "xx", "ch" return c.zAdd(a, n, members...) } @@ -1093,7 +1088,7 @@ func (c *commandable) zIncr(a []interface{}, n int, members ...Z) *FloatCmd { func (c *commandable) ZIncr(key string, member Z) *FloatCmd { const n = 3 a := make([]interface{}, n+2) - a[0], a[1], a[2] = "ZADD", key, "INCR" + a[0], a[1], a[2] = "zadd", key, "incr" return c.zIncr(a, n, member) } @@ -1101,7 +1096,7 @@ func (c *commandable) ZIncr(key string, member Z) *FloatCmd { func (c *commandable) ZIncrNX(key string, member Z) *FloatCmd { const n = 4 a := make([]interface{}, n+2) - a[0], a[1], a[2], a[3] = "ZADD", key, "INCR", "NX" + a[0], a[1], a[2], a[3] = "zadd", key, "incr", "nx" return c.zIncr(a, n, member) } @@ -1109,44 +1104,44 @@ func (c *commandable) ZIncrNX(key string, member Z) *FloatCmd { func (c *commandable) ZIncrXX(key string, member Z) *FloatCmd { const n = 4 a := make([]interface{}, n+2) - a[0], a[1], a[2], a[3] = "ZADD", key, "INCR", "XX" + a[0], a[1], a[2], a[3] = "zadd", key, "incr", "xx" return c.zIncr(a, n, member) } func (c *commandable) ZCard(key string) *IntCmd { - cmd := NewIntCmd("ZCARD", key) + cmd := NewIntCmd("zcard", key) c.Process(cmd) return cmd } func (c *commandable) ZCount(key, min, max string) *IntCmd { - cmd := NewIntCmd("ZCOUNT", key, min, max) + cmd := NewIntCmd("zcount", key, min, max) c.Process(cmd) return cmd } func (c *commandable) ZIncrBy(key string, increment float64, member string) *FloatCmd { - cmd := NewFloatCmd("ZINCRBY", key, increment, member) + cmd := NewFloatCmd("zincrby", key, increment, member) c.Process(cmd) return cmd } func (c *commandable) ZInterStore(destination string, store ZStore, keys ...string) *IntCmd { args := make([]interface{}, 3+len(keys)) - args[0] = "ZINTERSTORE" + args[0] = "zinterstore" args[1] = destination args[2] = strconv.Itoa(len(keys)) for i, key := range keys { args[3+i] = key } if len(store.Weights) > 0 { - args = append(args, "WEIGHTS") + args = append(args, "weights") for _, weight := range store.Weights { args = append(args, weight) } } if store.Aggregate != "" { - args = append(args, "AGGREGATE", store.Aggregate) + args = append(args, "aggregate", store.Aggregate) } cmd := NewIntCmd(args...) c.Process(cmd) @@ -1155,13 +1150,13 @@ func (c *commandable) ZInterStore(destination string, store ZStore, keys ...stri func (c *commandable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { args := []interface{}{ - "ZRANGE", + "zrange", key, start, stop, } if withScores { - args = append(args, "WITHSCORES") + args = append(args, "withscores") } cmd := NewStringSliceCmd(args...) c.Process(cmd) @@ -1173,7 +1168,7 @@ func (c *commandable) ZRange(key string, start, stop int64) *StringSliceCmd { } func (c *commandable) ZRangeWithScores(key string, start, stop int64) *ZSliceCmd { - cmd := NewZSliceCmd("ZRANGE", key, start, stop, "WITHSCORES") + cmd := NewZSliceCmd("zrange", key, start, stop, "withscores") c.Process(cmd) return cmd } @@ -1186,12 +1181,12 @@ type ZRangeBy struct { func (c *commandable) zRangeBy(zcmd, key string, opt ZRangeBy, withScores bool) *StringSliceCmd { args := []interface{}{zcmd, key, opt.Min, opt.Max} if withScores { - args = append(args, "WITHSCORES") + args = append(args, "withscores") } if opt.Offset != 0 || opt.Count != 0 { args = append( args, - "LIMIT", + "limit", opt.Offset, opt.Count, ) @@ -1202,19 +1197,19 @@ func (c *commandable) zRangeBy(zcmd, key string, opt ZRangeBy, withScores bool) } func (c *commandable) ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd { - return c.zRangeBy("ZRANGEBYSCORE", key, opt, false) + return c.zRangeBy("zrangebyscore", key, opt, false) } func (c *commandable) ZRangeByLex(key string, opt ZRangeBy) *StringSliceCmd { - return c.zRangeBy("ZRANGEBYLEX", key, opt, false) + return c.zRangeBy("zrangebylex", key, opt, false) } func (c *commandable) ZRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceCmd { - args := []interface{}{"ZRANGEBYSCORE", key, opt.Min, opt.Max, "WITHSCORES"} + args := []interface{}{"zrangebyscore", key, opt.Min, opt.Max, "withscores"} if opt.Offset != 0 || opt.Count != 0 { args = append( args, - "LIMIT", + "limit", opt.Offset, opt.Count, ) @@ -1225,14 +1220,14 @@ func (c *commandable) ZRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceC } func (c *commandable) ZRank(key, member string) *IntCmd { - cmd := NewIntCmd("ZRANK", key, member) + cmd := NewIntCmd("zrank", key, member) c.Process(cmd) return cmd } func (c *commandable) ZRem(key string, members ...string) *IntCmd { args := make([]interface{}, 2+len(members)) - args[0] = "ZREM" + args[0] = "zrem" args[1] = key for i, member := range members { args[2+i] = member @@ -1244,7 +1239,7 @@ func (c *commandable) ZRem(key string, members ...string) *IntCmd { func (c *commandable) ZRemRangeByRank(key string, start, stop int64) *IntCmd { cmd := NewIntCmd( - "ZREMRANGEBYRANK", + "zremrangebyrank", key, start, stop, @@ -1254,19 +1249,19 @@ func (c *commandable) ZRemRangeByRank(key string, start, stop int64) *IntCmd { } func (c *commandable) ZRemRangeByScore(key, min, max string) *IntCmd { - cmd := NewIntCmd("ZREMRANGEBYSCORE", key, min, max) + cmd := NewIntCmd("zremrangebyscore", key, min, max) c.Process(cmd) return cmd } func (c *commandable) ZRevRange(key string, start, stop int64) *StringSliceCmd { - cmd := NewStringSliceCmd("ZREVRANGE", key, start, stop) + cmd := NewStringSliceCmd("zrevrange", key, start, stop) c.Process(cmd) return cmd } func (c *commandable) ZRevRangeWithScores(key string, start, stop int64) *ZSliceCmd { - cmd := NewZSliceCmd("ZREVRANGE", key, start, stop, "WITHSCORES") + cmd := NewZSliceCmd("zrevrange", key, start, stop, "withscores") c.Process(cmd) return cmd } @@ -1276,7 +1271,7 @@ func (c *commandable) zRevRangeBy(zcmd, key string, opt ZRangeBy) *StringSliceCm if opt.Offset != 0 || opt.Count != 0 { args = append( args, - "LIMIT", + "limit", opt.Offset, opt.Count, ) @@ -1287,19 +1282,19 @@ func (c *commandable) zRevRangeBy(zcmd, key string, opt ZRangeBy) *StringSliceCm } func (c *commandable) ZRevRangeByScore(key string, opt ZRangeBy) *StringSliceCmd { - return c.zRevRangeBy("ZREVRANGEBYSCORE", key, opt) + return c.zRevRangeBy("zrevrangebyscore", key, opt) } func (c *commandable) ZRevRangeByLex(key string, opt ZRangeBy) *StringSliceCmd { - return c.zRevRangeBy("ZREVRANGEBYLEX", key, opt) + return c.zRevRangeBy("zrevrangebylex", key, opt) } func (c *commandable) ZRevRangeByScoreWithScores(key string, opt ZRangeBy) *ZSliceCmd { - args := []interface{}{"ZREVRANGEBYSCORE", key, opt.Max, opt.Min, "WITHSCORES"} + args := []interface{}{"zrevrangebyscore", key, opt.Max, opt.Min, "withscores"} if opt.Offset != 0 || opt.Count != 0 { args = append( args, - "LIMIT", + "limit", opt.Offset, opt.Count, ) @@ -1310,33 +1305,33 @@ func (c *commandable) ZRevRangeByScoreWithScores(key string, opt ZRangeBy) *ZSli } func (c *commandable) ZRevRank(key, member string) *IntCmd { - cmd := NewIntCmd("ZREVRANK", key, member) + cmd := NewIntCmd("zrevrank", key, member) c.Process(cmd) return cmd } func (c *commandable) ZScore(key, member string) *FloatCmd { - cmd := NewFloatCmd("ZSCORE", key, member) + cmd := NewFloatCmd("zscore", key, member) c.Process(cmd) return cmd } func (c *commandable) ZUnionStore(dest string, store ZStore, keys ...string) *IntCmd { args := make([]interface{}, 3+len(keys)) - args[0] = "ZUNIONSTORE" + args[0] = "zunionstore" args[1] = dest args[2] = strconv.Itoa(len(keys)) for i, key := range keys { args[3+i] = key } if len(store.Weights) > 0 { - args = append(args, "WEIGHTS") + args = append(args, "weights") for _, weight := range store.Weights { args = append(args, weight) } } if store.Aggregate != "" { - args = append(args, "AGGREGATE", store.Aggregate) + args = append(args, "aggregate", store.Aggregate) } cmd := NewIntCmd(args...) c.Process(cmd) @@ -1347,7 +1342,7 @@ func (c *commandable) ZUnionStore(dest string, store ZStore, keys ...string) *In func (c *commandable) PFAdd(key string, fields ...string) *IntCmd { args := make([]interface{}, 2+len(fields)) - args[0] = "PFADD" + args[0] = "pfadd" args[1] = key for i, field := range fields { args[2+i] = field @@ -1359,7 +1354,7 @@ func (c *commandable) PFAdd(key string, fields ...string) *IntCmd { func (c *commandable) PFCount(keys ...string) *IntCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "PFCOUNT" + args[0] = "pfcount" for i, key := range keys { args[1+i] = key } @@ -1370,7 +1365,7 @@ func (c *commandable) PFCount(keys ...string) *IntCmd { func (c *commandable) PFMerge(dest string, keys ...string) *StatusCmd { args := make([]interface{}, 2+len(keys)) - args[0] = "PFMERGE" + args[0] = "pfmerge" args[1] = dest for i, key := range keys { args[2+i] = key @@ -1383,96 +1378,87 @@ func (c *commandable) PFMerge(dest string, keys ...string) *StatusCmd { //------------------------------------------------------------------------------ func (c *commandable) BgRewriteAOF() *StatusCmd { - cmd := NewStatusCmd("BGREWRITEAOF") - cmd._clusterKeyPos = 0 + cmd := NewStatusCmd("bgrewriteaof") c.Process(cmd) return cmd } func (c *commandable) BgSave() *StatusCmd { - cmd := NewStatusCmd("BGSAVE") - cmd._clusterKeyPos = 0 + cmd := NewStatusCmd("bgsave") c.Process(cmd) return cmd } func (c *commandable) ClientKill(ipPort string) *StatusCmd { - cmd := NewStatusCmd("CLIENT", "KILL", ipPort) - cmd._clusterKeyPos = 0 + cmd := NewStatusCmd("client", "kill", ipPort) c.Process(cmd) return cmd } func (c *commandable) ClientList() *StringCmd { - cmd := NewStringCmd("CLIENT", "LIST") - cmd._clusterKeyPos = 0 + cmd := NewStringCmd("client", "list") c.Process(cmd) return cmd } func (c *commandable) ClientPause(dur time.Duration) *BoolCmd { - cmd := NewBoolCmd("CLIENT", "PAUSE", formatMs(dur)) - cmd._clusterKeyPos = 0 + cmd := NewBoolCmd("client", "pause", formatMs(dur)) c.Process(cmd) return cmd } // ClientSetName assigns a name to the one of many connections in the pool. func (c *commandable) ClientSetName(name string) *BoolCmd { - cmd := NewBoolCmd("CLIENT", "SETNAME", name) + cmd := NewBoolCmd("client", "setname", name) c.Process(cmd) return cmd } // ClientGetName returns the name of the one of many connections in the pool. func (c *Client) ClientGetName() *StringCmd { - cmd := NewStringCmd("CLIENT", "GETNAME") + cmd := NewStringCmd("client", "getname") c.Process(cmd) return cmd } func (c *commandable) ConfigGet(parameter string) *SliceCmd { - cmd := NewSliceCmd("CONFIG", "GET", parameter) - cmd._clusterKeyPos = 0 + cmd := NewSliceCmd("config", "get", parameter) c.Process(cmd) return cmd } func (c *commandable) ConfigResetStat() *StatusCmd { - cmd := NewStatusCmd("CONFIG", "RESETSTAT") - cmd._clusterKeyPos = 0 + cmd := NewStatusCmd("config", "resetstat") c.Process(cmd) return cmd } func (c *commandable) ConfigSet(parameter, value string) *StatusCmd { - cmd := NewStatusCmd("CONFIG", "SET", parameter, value) - cmd._clusterKeyPos = 0 + cmd := NewStatusCmd("config", "set", parameter, value) c.Process(cmd) return cmd } func (c *commandable) DbSize() *IntCmd { - cmd := NewIntCmd("DBSIZE") - cmd._clusterKeyPos = 0 + cmd := NewIntCmd("dbsize") c.Process(cmd) return cmd } func (c *commandable) FlushAll() *StatusCmd { - cmd := newKeylessStatusCmd("FLUSHALL") + cmd := NewStatusCmd("flushall") c.Process(cmd) return cmd } func (c *commandable) FlushDb() *StatusCmd { - cmd := newKeylessStatusCmd("FLUSHDB") + cmd := NewStatusCmd("flushdb") c.Process(cmd) return cmd } func (c *commandable) Info(section ...string) *StringCmd { - args := []interface{}{"INFO"} + args := []interface{}{"info"} if len(section) > 0 { args = append(args, section[0]) } @@ -1482,14 +1468,13 @@ func (c *commandable) Info(section ...string) *StringCmd { } func (c *commandable) LastSave() *IntCmd { - cmd := NewIntCmd("LASTSAVE") - cmd._clusterKeyPos = 0 + cmd := NewIntCmd("lastsave") c.Process(cmd) return cmd } func (c *commandable) Save() *StatusCmd { - cmd := newKeylessStatusCmd("SAVE") + cmd := NewStatusCmd("save") c.Process(cmd) return cmd } @@ -1497,11 +1482,11 @@ func (c *commandable) Save() *StatusCmd { func (c *commandable) shutdown(modifier string) *StatusCmd { var args []interface{} if modifier == "" { - args = []interface{}{"SHUTDOWN"} + args = []interface{}{"shutdown"} } else { - args = []interface{}{"SHUTDOWN", modifier} + args = []interface{}{"shutdown", modifier} } - cmd := newKeylessStatusCmd(args...) + cmd := NewStatusCmd(args...) c.Process(cmd) if err := cmd.Err(); err != nil { if err == io.EOF { @@ -1521,15 +1506,15 @@ func (c *commandable) Shutdown() *StatusCmd { } func (c *commandable) ShutdownSave() *StatusCmd { - return c.shutdown("SAVE") + return c.shutdown("save") } func (c *commandable) ShutdownNoSave() *StatusCmd { - return c.shutdown("NOSAVE") + return c.shutdown("nosave") } func (c *commandable) SlaveOf(host, port string) *StatusCmd { - cmd := newKeylessStatusCmd("SLAVEOF", host, port) + cmd := NewStatusCmd("slaveof", host, port) c.Process(cmd) return cmd } @@ -1543,8 +1528,7 @@ func (c *commandable) Sync() { } func (c *commandable) Time() *StringSliceCmd { - cmd := NewStringSliceCmd("TIME") - cmd._clusterKeyPos = 0 + cmd := NewStringSliceCmd("time") c.Process(cmd) return cmd } @@ -1553,7 +1537,7 @@ func (c *commandable) Time() *StringSliceCmd { func (c *commandable) Eval(script string, keys []string, args ...interface{}) *Cmd { cmdArgs := make([]interface{}, 3+len(keys)+len(args)) - cmdArgs[0] = "EVAL" + cmdArgs[0] = "eval" cmdArgs[1] = script cmdArgs[2] = strconv.Itoa(len(keys)) for i, key := range keys { @@ -1564,16 +1548,13 @@ func (c *commandable) Eval(script string, keys []string, args ...interface{}) *C cmdArgs[pos+i] = arg } cmd := NewCmd(cmdArgs...) - if len(keys) > 0 { - cmd._clusterKeyPos = 3 - } c.Process(cmd) return cmd } func (c *commandable) EvalSha(sha1 string, keys []string, args ...interface{}) *Cmd { cmdArgs := make([]interface{}, 3+len(keys)+len(args)) - cmdArgs[0] = "EVALSHA" + cmdArgs[0] = "evalsha" cmdArgs[1] = sha1 cmdArgs[2] = strconv.Itoa(len(keys)) for i, key := range keys { @@ -1584,41 +1565,36 @@ func (c *commandable) EvalSha(sha1 string, keys []string, args ...interface{}) * cmdArgs[pos+i] = arg } cmd := NewCmd(cmdArgs...) - if len(keys) > 0 { - cmd._clusterKeyPos = 3 - } c.Process(cmd) return cmd } func (c *commandable) ScriptExists(scripts ...string) *BoolSliceCmd { args := make([]interface{}, 2+len(scripts)) - args[0] = "SCRIPT" - args[1] = "EXISTS" + args[0] = "script" + args[1] = "exists" for i, script := range scripts { args[2+i] = script } cmd := NewBoolSliceCmd(args...) - cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } func (c *commandable) ScriptFlush() *StatusCmd { - cmd := newKeylessStatusCmd("SCRIPT", "FLUSH") + cmd := NewStatusCmd("script", "flush") c.Process(cmd) return cmd } func (c *commandable) ScriptKill() *StatusCmd { - cmd := newKeylessStatusCmd("SCRIPT", "KILL") + cmd := NewStatusCmd("script", "kill") c.Process(cmd) return cmd } func (c *commandable) ScriptLoad(script string) *StringCmd { - cmd := NewStringCmd("SCRIPT", "LOAD", script) - cmd._clusterKeyPos = 0 + cmd := NewStringCmd("script", "load", script) c.Process(cmd) return cmd } @@ -1626,8 +1602,7 @@ func (c *commandable) ScriptLoad(script string) *StringCmd { //------------------------------------------------------------------------------ func (c *commandable) DebugObject(key string) *StringCmd { - cmd := NewStringCmd("DEBUG", "OBJECT", key) - cmd._clusterKeyPos = 2 + cmd := NewStringCmd("debug", "object", key) c.Process(cmd) return cmd } @@ -1635,32 +1610,29 @@ func (c *commandable) DebugObject(key string) *StringCmd { //------------------------------------------------------------------------------ func (c *commandable) PubSubChannels(pattern string) *StringSliceCmd { - args := []interface{}{"PUBSUB", "CHANNELS"} + args := []interface{}{"pubsub", "channels"} if pattern != "*" { args = append(args, pattern) } cmd := NewStringSliceCmd(args...) - cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } func (c *commandable) PubSubNumSub(channels ...string) *StringIntMapCmd { args := make([]interface{}, 2+len(channels)) - args[0] = "PUBSUB" - args[1] = "NUMSUB" + args[0] = "pubsub" + args[1] = "numsub" for i, channel := range channels { args[2+i] = channel } cmd := NewStringIntMapCmd(args...) - cmd._clusterKeyPos = 0 c.Process(cmd) return cmd } func (c *commandable) PubSubNumPat() *IntCmd { - cmd := NewIntCmd("PUBSUB", "NUMPAT") - cmd._clusterKeyPos = 0 + cmd := NewIntCmd("pubsub", "numpat") c.Process(cmd) return cmd } @@ -1668,85 +1640,79 @@ func (c *commandable) PubSubNumPat() *IntCmd { //------------------------------------------------------------------------------ func (c *commandable) ClusterSlots() *ClusterSlotsCmd { - cmd := NewClusterSlotsCmd("CLUSTER", "slots") - cmd._clusterKeyPos = 0 + cmd := NewClusterSlotsCmd("cluster", "slots") c.Process(cmd) return cmd } func (c *commandable) ClusterNodes() *StringCmd { - cmd := NewStringCmd("CLUSTER", "nodes") - cmd._clusterKeyPos = 0 + cmd := NewStringCmd("cluster", "nodes") c.Process(cmd) return cmd } func (c *commandable) ClusterMeet(host, port string) *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "meet", host, port) + cmd := NewStatusCmd("cluster", "meet", host, port) c.Process(cmd) return cmd } func (c *commandable) ClusterForget(nodeID string) *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "forget", nodeID) + cmd := NewStatusCmd("cluster", "forget", nodeID) c.Process(cmd) return cmd } func (c *commandable) ClusterReplicate(nodeID string) *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "replicate", nodeID) + cmd := NewStatusCmd("cluster", "replicate", nodeID) c.Process(cmd) return cmd } func (c *commandable) ClusterResetSoft() *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "reset", "soft") + cmd := NewStatusCmd("cluster", "reset", "soft") c.Process(cmd) return cmd } func (c *commandable) ClusterResetHard() *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "reset", "hard") + cmd := NewStatusCmd("cluster", "reset", "hard") c.Process(cmd) return cmd } func (c *commandable) ClusterInfo() *StringCmd { - cmd := NewStringCmd("CLUSTER", "info") - cmd._clusterKeyPos = 0 + cmd := NewStringCmd("cluster", "info") c.Process(cmd) return cmd } func (c *commandable) ClusterKeySlot(key string) *IntCmd { - cmd := NewIntCmd("CLUSTER", "keyslot", key) - cmd._clusterKeyPos = 2 + cmd := NewIntCmd("cluster", "keyslot", key) c.Process(cmd) return cmd } func (c *commandable) ClusterCountFailureReports(nodeID string) *IntCmd { - cmd := NewIntCmd("CLUSTER", "count-failure-reports", nodeID) - cmd._clusterKeyPos = 2 + cmd := NewIntCmd("cluster", "count-failure-reports", nodeID) c.Process(cmd) return cmd } func (c *commandable) ClusterCountKeysInSlot(slot int) *IntCmd { - cmd := NewIntCmd("CLUSTER", "countkeysinslot", slot) - cmd._clusterKeyPos = 2 + cmd := NewIntCmd("cluster", "countkeysinslot", slot) c.Process(cmd) return cmd } func (c *commandable) ClusterDelSlots(slots ...int) *StatusCmd { args := make([]interface{}, 2+len(slots)) - args[0] = "CLUSTER" - args[1] = "DELSLOTS" + args[0] = "cluster" + args[1] = "delslots" for i, slot := range slots { args[2+i] = slot } - cmd := newKeylessStatusCmd(args...) + cmd := NewStatusCmd(args...) c.Process(cmd) return cmd } @@ -1761,46 +1727,43 @@ func (c *commandable) ClusterDelSlotsRange(min, max int) *StatusCmd { } func (c *commandable) ClusterSaveConfig() *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "saveconfig") + cmd := NewStatusCmd("cluster", "saveconfig") c.Process(cmd) return cmd } func (c *commandable) ClusterSlaves(nodeID string) *StringSliceCmd { - cmd := NewStringSliceCmd("CLUSTER", "SLAVES", nodeID) - cmd._clusterKeyPos = 2 + cmd := NewStringSliceCmd("cluster", "slaves", nodeID) c.Process(cmd) return cmd } -func (c *commandable) Readonly() *StatusCmd { - cmd := newKeylessStatusCmd("READONLY") - cmd._clusterKeyPos = 0 +func (c *commandable) ReadOnly() *StatusCmd { + cmd := NewStatusCmd("readonly") c.Process(cmd) return cmd } func (c *commandable) ReadWrite() *StatusCmd { - cmd := newKeylessStatusCmd("READWRITE") - cmd._clusterKeyPos = 0 + cmd := NewStatusCmd("readwrite") c.Process(cmd) return cmd } func (c *commandable) ClusterFailover() *StatusCmd { - cmd := newKeylessStatusCmd("CLUSTER", "failover") + cmd := NewStatusCmd("cluster", "failover") c.Process(cmd) return cmd } func (c *commandable) ClusterAddSlots(slots ...int) *StatusCmd { args := make([]interface{}, 2+len(slots)) - args[0] = "CLUSTER" - args[1] = "ADDSLOTS" + args[0] = "cluster" + args[1] = "addslots" for i, num := range slots { args[2+i] = strconv.Itoa(num) } - cmd := newKeylessStatusCmd(args...) + cmd := NewStatusCmd(args...) c.Process(cmd) return cmd } @@ -1818,7 +1781,7 @@ func (c *commandable) ClusterAddSlotsRange(min, max int) *StatusCmd { func (c *commandable) GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd { args := make([]interface{}, 2+3*len(geoLocation)) - args[0] = "GEOADD" + args[0] = "geoadd" args[1] = key for i, eachLoc := range geoLocation { args[2+3*i] = eachLoc.Longitude @@ -1831,13 +1794,13 @@ func (c *commandable) GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd { } func (c *commandable) GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd { - cmd := NewGeoLocationCmd(query, "GEORADIUS", key, longitude, latitude) + cmd := NewGeoLocationCmd(query, "georadius", key, longitude, latitude) c.Process(cmd) return cmd } func (c *commandable) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) *GeoLocationCmd { - cmd := NewGeoLocationCmd(query, "GEORADIUSBYMEMBER", key, member) + cmd := NewGeoLocationCmd(query, "georadiusbymember", key, member) c.Process(cmd) return cmd } @@ -1846,14 +1809,14 @@ func (c *commandable) GeoDist(key string, member1, member2, unit string) *FloatC if unit == "" { unit = "km" } - cmd := NewFloatCmd("GEODIST", key, member1, member2, unit) + cmd := NewFloatCmd("geodist", key, member1, member2, unit) c.Process(cmd) return cmd } func (c *commandable) GeoHash(key string, members ...string) *StringSliceCmd { args := make([]interface{}, 2+len(members)) - args[0] = "GEOHASH" + args[0] = "geohash" args[1] = key for i, member := range members { args[2+i] = member @@ -1862,3 +1825,11 @@ func (c *commandable) GeoHash(key string, members ...string) *StringSliceCmd { c.Process(cmd) return cmd } + +//------------------------------------------------------------------------------ + +func (c *commandable) Command() *CommandsInfoCmd { + cmd := NewCommandsInfoCmd("command") + c.Process(cmd) + return cmd +} diff --git a/commands_test.go b/commands_test.go index d4f25ff..9d65f96 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2729,6 +2729,7 @@ var _ = Describe("Commands", func() { }) Describe("json marshaling/unmarshaling", func() { + BeforeEach(func() { value := &numberStruct{Number: 42} err := client.Set("key", value, 0).Err() @@ -2744,12 +2745,30 @@ var _ = Describe("Commands", func() { It("should scan custom values using json", func() { value := &numberStruct{} err := client.Get("key").Scan(value) - Expect(err).To(BeNil()) + Expect(err).NotTo(HaveOccurred()) Expect(value.Number).To(Equal(42)) }) }) + Describe("Command", func() { + + It("returns map of commands", func() { + cmds, err := client.Command().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cmds)).To(BeNumerically("~", 173, 5)) + + cmd := cmds["mget"] + Expect(cmd.Name).To(Equal("mget")) + Expect(cmd.Arity).To(Equal(int8(-2))) + Expect(cmd.Flags).To(Equal([]string{"readonly"})) + Expect(cmd.FirstKeyPos).To(Equal(int8(1))) + Expect(cmd.LastKeyPos).To(Equal(int8(-1))) + Expect(cmd.StepCount).To(Equal(int8(1))) + }) + + }) + }) type numberStruct struct { diff --git a/options.go b/options.go index dea2784..e1c0403 100644 --- a/options.go +++ b/options.go @@ -53,6 +53,9 @@ type Options struct { // The frequency of idle checks. // Default is 1 minute. IdleCheckFrequency time.Duration + + // Enables read queries for a connection to a Redis Cluster slave node. + ReadOnly bool } func (opt *Options) getNetwork() string { diff --git a/parser.go b/parser.go index 7a7b6f3..a58d3b1 100644 --- a/parser.go +++ b/parser.go @@ -627,8 +627,8 @@ func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) { func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser { return func(cn *pool.Conn, n int64) (interface{}, error) { var loc GeoLocation - var err error + loc.Name, err = readStringReply(cn) if err != nil { return nil, err @@ -690,3 +690,70 @@ func newGeoLocationSliceParser(q *GeoRadiusQuery) multiBulkParser { return locs, nil } } + +func commandInfoParser(cn *pool.Conn, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6") + } + + cmd.Name, err = readStringReply(cn) + if err != nil { + return nil, err + } + + arity, err := readIntReply(cn) + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := readReply(cn, stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := readIntReply(cn) + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := readIntReply(cn) + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := readIntReply(cn) + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + +func commandInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := readReply(cn, commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} diff --git a/redis.go b/redis.go index 9222db9..e2c173b 100644 --- a/redis.go +++ b/redis.go @@ -52,7 +52,7 @@ func (c *baseClient) putConn(cn *pool.Conn, err error, allowTimeout bool) bool { func (c *baseClient) initConn(cn *pool.Conn) error { cn.Inited = true - if c.opt.Password == "" && c.opt.DB == 0 { + if c.opt.Password == "" && c.opt.DB == 0 && !c.opt.ReadOnly { return nil } @@ -71,6 +71,12 @@ func (c *baseClient) initConn(cn *pool.Conn) error { } } + if c.opt.ReadOnly { + if err := client.ReadOnly().Err(); err != nil { + return err + } + } + return nil } diff --git a/ring.go b/ring.go index cac25d4..31559e6 100644 --- a/ring.go +++ b/ring.go @@ -115,10 +115,13 @@ type Ring struct { opt *RingOptions nreplicas int - mx sync.RWMutex + mu sync.RWMutex hash *consistenthash.Map shards map[string]*ringShard + cmdsInfo map[string]*CommandInfo + cmdsInfoOnce *sync.Once + closed bool } @@ -130,6 +133,8 @@ func NewRing(opt *RingOptions) *Ring { hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), + + cmdsInfoOnce: new(sync.Once), } ring.commandable.process = ring.process for name, addr := range opt.Addrs { @@ -141,15 +146,40 @@ func NewRing(opt *RingOptions) *Ring { return ring } +func (ring *Ring) cmdInfo(name string) *CommandInfo { + ring.cmdsInfoOnce.Do(func() { + for _, shard := range ring.shards { + cmdsInfo, err := shard.Client.Command().Result() + if err == nil { + ring.cmdsInfo = cmdsInfo + return + } + } + ring.cmdsInfoOnce = &sync.Once{} + }) + if ring.cmdsInfo == nil { + return nil + } + return ring.cmdsInfo[name] +} + +func (ring *Ring) cmdFirstKey(cmd Cmder) string { + cmdInfo := ring.cmdInfo(cmd.arg(0)) + if cmdInfo == nil { + return "" + } + return cmd.arg(int(cmdInfo.FirstKeyPos)) +} + func (ring *Ring) addClient(name string, cl *Client) { - ring.mx.Lock() + ring.mu.Lock() ring.hash.Add(name) ring.shards[name] = &ringShard{Client: cl} - ring.mx.Unlock() + ring.mu.Unlock() } func (ring *Ring) getClient(key string) (*Client, error) { - ring.mx.RLock() + ring.mu.RLock() if ring.closed { return nil, pool.ErrClosed @@ -157,17 +187,17 @@ func (ring *Ring) getClient(key string) (*Client, error) { name := ring.hash.Get(hashtag.Key(key)) if name == "" { - ring.mx.RUnlock() + ring.mu.RUnlock() return nil, errRingShardsDown } cl := ring.shards[name].Client - ring.mx.RUnlock() + ring.mu.RUnlock() return cl, nil } func (ring *Ring) process(cmd Cmder) { - cl, err := ring.getClient(cmd.clusterKey()) + cl, err := ring.getClient(ring.cmdFirstKey(cmd)) if err != nil { cmd.setErr(err) return @@ -177,8 +207,8 @@ func (ring *Ring) process(cmd Cmder) { // rebalance removes dead shards from the ring. func (ring *Ring) rebalance() { - defer ring.mx.Unlock() - ring.mx.Lock() + defer ring.mu.Unlock() + ring.mu.Lock() ring.hash = consistenthash.New(ring.nreplicas, nil) for name, shard := range ring.shards { @@ -195,10 +225,10 @@ func (ring *Ring) heartbeat() { for _ = range ticker.C { var rebalance bool - ring.mx.RLock() + ring.mu.RLock() if ring.closed { - ring.mx.RUnlock() + ring.mu.RUnlock() break } @@ -210,7 +240,7 @@ func (ring *Ring) heartbeat() { } } - ring.mx.RUnlock() + ring.mu.RUnlock() if rebalance { ring.rebalance() @@ -223,8 +253,8 @@ func (ring *Ring) heartbeat() { // It is rare to Close a Ring, as the Ring is meant to be long-lived // and shared between many goroutines. func (ring *Ring) Close() (retErr error) { - defer ring.mx.Unlock() - ring.mx.Lock() + defer ring.mu.Unlock() + ring.mu.Lock() if ring.closed { return nil @@ -259,7 +289,7 @@ func (ring *Ring) pipelineExec(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { - name := ring.hash.Get(hashtag.Key(cmd.clusterKey())) + name := ring.hash.Get(hashtag.Key(ring.cmdFirstKey(cmd))) if name == "" { cmd.setErr(errRingShardsDown) if retErr == nil {