diff --git a/cluster.go b/cluster.go index 9d07d04..e294a1a 100644 --- a/cluster.go +++ b/cluster.go @@ -85,189 +85,132 @@ type clusterNode struct { loading time.Time } +func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { + opt := clOpt.clientOptions() + opt.Addr = addr + node := clusterNode{ + Client: NewClient(opt), + } + + if clOpt.RouteByLatency { + const probes = 10 + for i := 0; i < probes; i++ { + t1 := time.Now() + node.Client.Ping() + node.Latency += time.Since(t1) + } + node.Latency = node.Latency / probes + } + + return &node +} + func (n *clusterNode) Loading() bool { return !n.loading.IsZero() && time.Since(n.loading) < time.Minute } -// ClusterClient is a Redis Cluster client representing a pool of zero -// or more underlying connections. It's safe for concurrent use by -// multiple goroutines. -type ClusterClient struct { - cmdable +//------------------------------------------------------------------------------ +type clusterNodes struct { opt *ClusterOptions mu sync.RWMutex addrs []string nodes map[string]*clusterNode - slots [][]*clusterNode closed bool - - cmdsInfoOnce *sync.Once - cmdsInfo map[string]*CommandInfo - - // Reports where slots reloading is in progress. - reloading uint32 } -var _ Cmdable = (*ClusterClient)(nil) - -// NewClusterClient returns a Redis Cluster client as described in -// http://redis.io/topics/cluster-spec. -func NewClusterClient(opt *ClusterOptions) *ClusterClient { - opt.init() - - c := &ClusterClient{ +func newClusterNodes(opt *ClusterOptions) *clusterNodes { + return &clusterNodes{ opt: opt, nodes: make(map[string]*clusterNode), - - cmdsInfoOnce: new(sync.Once), } - c.cmdable.process = c.Process - - for _, addr := range opt.Addrs { - _, _ = c.nodeByAddr(addr) - } - c.reloadSlots() - - if opt.IdleCheckFrequency > 0 { - go c.reaper(opt.IdleCheckFrequency) - } - - return c } -func (c *ClusterClient) cmdInfo(name string) *CommandInfo { - c.cmdsInfoOnce.Do(func() { - for _, node := range c.nodes { - cmdsInfo, err := node.Client.Command().Result() - if err == nil { - c.cmdsInfo = cmdsInfo - return - } - } - c.cmdsInfoOnce = &sync.Once{} - }) - if c.cmdsInfo == nil { +func (c *clusterNodes) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { return nil } - return c.cmdsInfo[name] -} + c.closed = true -func (c *ClusterClient) getNodes() map[string]*clusterNode { - var nodes map[string]*clusterNode - c.mu.RLock() - if !c.closed { - nodes = make(map[string]*clusterNode, len(c.nodes)) - for addr, node := range c.nodes { - nodes[addr] = node + var firstErr error + for _, node := range c.nodes { + if err := node.Client.Close(); err != nil && firstErr == nil { + firstErr = err } } - c.mu.RUnlock() - return nodes + c.addrs = nil + c.nodes = nil + + return firstErr } -func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { - node, err := c.slotMasterNode(hashtag.Slot(keys[0])) - if err != nil { - return err - } - return node.Client.Watch(fn, keys...) -} - -// PoolStats returns accumulated connection pool stats. -func (c *ClusterClient) PoolStats() *PoolStats { - var acc PoolStats - for _, node := range c.getNodes() { - s := node.Client.connPool.Stats() - acc.Requests += s.Requests - acc.Hits += s.Hits - acc.Timeouts += s.Timeouts - acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns - } - return &acc -} - -// Close closes the cluster client, releasing any open resources. -// -// It is rare to Close a ClusterClient, as the ClusterClient is meant -// to be long-lived and shared between many goroutines. -func (c *ClusterClient) Close() error { - c.mu.Lock() - if !c.closed { - c.closeClients() - c.addrs = nil - c.nodes = nil - c.slots = nil - c.cmdsInfo = nil - } - c.closed = true - c.mu.Unlock() - return nil -} - -func (c *ClusterClient) nodeByAddr(addr string) (*clusterNode, error) { +func (c *clusterNodes) All() ([]*clusterNode, error) { c.mu.RLock() - node, ok := c.nodes[addr] + defer c.mu.RUnlock() + + if c.closed { + return nil, pool.ErrClosed + } + + var nodes []*clusterNode + for _, node := range c.nodes { + nodes = append(nodes, node) + } + return nodes, nil +} + +func (c *clusterNodes) Get(addr string) (*clusterNode, error) { + var node *clusterNode + var ok bool + + c.mu.RLock() + if !c.closed { + node, ok = c.nodes[addr] + } c.mu.RUnlock() if ok { return node, nil } - defer c.mu.Unlock() c.mu.Lock() + defer c.mu.Unlock() if c.closed { return nil, pool.ErrClosed } node, ok = c.nodes[addr] - if !ok { - node = c.newNode(addr) - c.nodes[addr] = node - c.addrs = append(c.addrs, addr) + if ok { + return node, nil } + c.addrs = append(c.addrs, addr) + node = newClusterNode(c.opt, addr) + c.nodes[addr] = node return node, nil } -func (c *ClusterClient) newNode(addr string) *clusterNode { - opt := c.opt.clientOptions() - opt.Addr = addr - return &clusterNode{ - Client: NewClient(opt), - } -} - -func (c *ClusterClient) slotNodes(slot int) (nodes []*clusterNode) { +func (c *clusterNodes) Random() (*clusterNode, error) { c.mu.RLock() - if slot < len(c.slots) { - nodes = c.slots[slot] - } + closed := c.closed + addrs := c.addrs c.mu.RUnlock() - return nodes -} -// randomNode returns random live node. -func (c *ClusterClient) randomNode() (*clusterNode, error) { + if closed { + return nil, pool.ErrClosed + } + if len(addrs) == 0 { + return nil, errClusterNoNodes + } + var nodeErr error for i := 0; i < 10; i++ { - c.mu.RLock() - closed := c.closed - addrs := c.addrs - c.mu.RUnlock() - - if closed { - return nil, pool.ErrClosed - } - - if len(addrs) == 0 { - return nil, errClusterNoNodes - } n := rand.Intn(len(addrs)) - - node, err := c.nodeByAddr(addrs[n]) + node, err := c.Get(addrs[n]) if err != nil { return nil, err } @@ -280,19 +223,50 @@ func (c *ClusterClient) randomNode() (*clusterNode, error) { return nil, nodeErr } -func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { +//------------------------------------------------------------------------------ + +type clusterState struct { + nodes *clusterNodes + slots [][]*clusterNode +} + +func newClusterState(nodes *clusterNodes, slots []ClusterSlot) (*clusterState, error) { + c := clusterState{ + nodes: nodes, + slots: make([][]*clusterNode, hashtag.SlotNumber), + } + + for _, slot := range slots { + var nodes []*clusterNode + for _, slotNode := range slot.Nodes { + node, err := c.nodes.Get(slotNode.Addr) + if err != nil { + return nil, err + } + nodes = append(nodes, node) + } + + for i := slot.Start; i <= slot.End; i++ { + c.slots[i] = nodes + } + } + + return &c, nil +} + +func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) if len(nodes) == 0 { - return c.randomNode() + return c.nodes.Random() } return nodes[0], nil } -func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) { +func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) switch len(nodes) { case 0: - return c.randomNode() + return c.nodes.Random() case 1: return nodes[0], nil case 2: @@ -313,10 +287,10 @@ func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) { } } -func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) { +func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) if len(nodes) == 0 { - return c.randomNode() + return c.nodes.Random() } var node *clusterNode @@ -328,31 +302,108 @@ func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) { return node, nil } -func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { - cmdInfo := c.cmdInfo(cmd.arg(0)) +func (c *clusterState) slotNodes(slot int) []*clusterNode { + if slot < len(c.slots) { + return c.slots[slot] + } + return nil +} + +//------------------------------------------------------------------------------ + +// ClusterClient is a Redis Cluster client representing a pool of zero +// or more underlying connections. It's safe for concurrent use by +// multiple goroutines. +type ClusterClient struct { + cmdable + + opt *ClusterOptions + cmds map[string]*CommandInfo + nodes *clusterNodes + _state atomic.Value + + // Reports where slots reloading is in progress. + reloading uint32 + + closed bool +} + +var _ Cmdable = (*ClusterClient)(nil) + +// NewClusterClient returns a Redis Cluster client as described in +// http://redis.io/topics/cluster-spec. +func NewClusterClient(opt *ClusterOptions) *ClusterClient { + opt.init() + + c := &ClusterClient{ + opt: opt, + nodes: newClusterNodes(opt), + } + c.cmdable.process = c.Process + + // Add initial nodes. + for _, addr := range opt.Addrs { + _, _ = c.nodes.Get(addr) + } + + c.reloadSlots() + + if opt.IdleCheckFrequency > 0 { + go c.reaper(opt.IdleCheckFrequency) + } + + return c +} + +func (c *ClusterClient) state() *clusterState { + v := c._state.Load() + if v == nil { + return nil + } + return v.(*clusterState) +} + +func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { + cmdInfo := c.cmds[cmd.arg(0)] firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) if firstKey == "" { - node, err := c.randomNode() + node, err := c.nodes.Random() return -1, node, err } slot := hashtag.Slot(firstKey) if cmdInfo.ReadOnly && c.opt.ReadOnly { if c.opt.RouteByLatency { - node, err := c.slotClosestNode(slot) + node, err := state.slotClosestNode(slot) return slot, node, err } - node, err := c.slotSlaveNode(slot) + node, err := state.slotSlaveNode(slot) return slot, node, err } - node, err := c.slotMasterNode(slot) + node, err := state.slotMasterNode(slot) return slot, node, err } +func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { + node, err := c.state().slotMasterNode(hashtag.Slot(keys[0])) + if err != nil { + return err + } + return node.Client.Watch(fn, keys...) +} + +// Close closes the cluster client, releasing any open resources. +// +// It is rare to Close a ClusterClient, as the ClusterClient is meant +// to be long-lived and shared between many goroutines. +func (c *ClusterClient) Close() error { + return c.nodes.Close() +} + func (c *ClusterClient) Process(cmd Cmder) error { - slot, node, err := c.cmdSlotAndNode(cmd) + slot, node, err := c.cmdSlotAndNode(c.state(), cmd) if err != nil { cmd.setErr(err) return err @@ -388,7 +439,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { // On network errors try random node. if internal.IsRetryableError(err) { - node, err = c.randomNode() + node, err = c.nodes.Random() continue } @@ -397,17 +448,18 @@ func (c *ClusterClient) Process(cmd Cmder) error { moved, ask, addr = internal.IsMovedError(err) if moved || ask { if slot >= 0 { - master, _ := c.slotMasterNode(slot) + master, _ := c.state().slotMasterNode(slot) if moved && (master == nil || master.Client.getAddr() != addr) { c.lazyReloadSlots() } } - node, err = c.nodeByAddr(addr) + node, err = c.nodes.Get(addr) if err != nil { cmd.setErr(err) return err } + continue } @@ -420,14 +472,15 @@ func (c *ClusterClient) Process(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - c.mu.RLock() - slots := c.slots - c.mu.RUnlock() + state := c.state() + if state == nil { + return nil + } var wg sync.WaitGroup visited := make(map[*clusterNode]struct{}) errCh := make(chan error, 1) - for _, nodes := range slots { + for _, nodes := range state.slots { if len(nodes) == 0 { continue } @@ -460,77 +513,59 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { } } -// closeClients closes all clients and returns the first error if there are any. -func (c *ClusterClient) closeClients() error { - var retErr error - for _, node := range c.nodes { - if err := node.Client.Close(); err != nil && retErr == nil { - retErr = err - } - } - return retErr -} - -func (c *ClusterClient) setSlots(cs []ClusterSlot) { - slots := make([][]*clusterNode, hashtag.SlotNumber) - for _, s := range cs { - var nodes []*clusterNode - for _, n := range s.Nodes { - node, err := c.nodeByAddr(n.Addr) - if err == nil { - nodes = append(nodes, node) - } - } - - for i := s.Start; i <= s.End; i++ { - slots[i] = nodes - } +// PoolStats returns accumulated connection pool stats. +func (c *ClusterClient) PoolStats() *PoolStats { + nodes, err := c.nodes.All() + if err != nil { + return nil } - c.mu.Lock() - if !c.closed { - c.slots = slots + var acc PoolStats + for _, node := range nodes { + s := node.Client.connPool.Stats() + acc.Requests += s.Requests + acc.Hits += s.Hits + acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns + acc.FreeConns += s.FreeConns } - c.mu.Unlock() + return &acc } func (c *ClusterClient) lazyReloadSlots() { if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { return } - go c.reloadSlots() + go func() { + c.reloadSlots() + atomic.StoreUint32(&c.reloading, 0) + }() } func (c *ClusterClient) reloadSlots() { - defer atomic.StoreUint32(&c.reloading, 0) - - node, err := c.randomNode() - if err != nil { - return - } - - slots, err := node.Client.ClusterSlots().Result() - if err != nil { - internal.Logf("ClusterSlots on addr=%q failed: %s", node.Client.getAddr(), err) - return - } - - c.setSlots(slots) - if c.opt.RouteByLatency { - c.setNodesLatency() - } -} - -func (c *ClusterClient) setNodesLatency() { - const n = 10 - for _, node := range c.getNodes() { - var latency time.Duration - for i := 0; i < n; i++ { - t1 := time.Now() - node.Client.Ping() - latency += time.Since(t1) + for i := 0; i < 10; i++ { + node, err := c.nodes.Random() + if err != nil { + return } - node.Latency = latency / n + + if c.cmds == nil { + cmds, err := node.Client.Command().Result() + if err == nil { + c.cmds = cmds + } + } + + slots, err := node.Client.ClusterSlots().Result() + if err != nil { + continue + } + + state, err := newClusterState(c.nodes, slots) + if err != nil { + return + } + c._state.Store(state) } } @@ -540,8 +575,8 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { defer ticker.Stop() for _ = range ticker.C { - nodes := c.getNodes() - if nodes == nil { + nodes, err := c.nodes.All() + if err != nil { break } @@ -584,9 +619,10 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { } } + state := c.state() cmdsMap := make(map[*clusterNode][]Cmder) for _, cmd := range cmds { - _, node, err := c.cmdSlotAndNode(cmd) + _, node, err := c.cmdSlotAndNode(state, cmd) if err != nil { cmd.setErr(err) setRetErr(err) @@ -599,16 +635,6 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - if node == nil { - var err error - node, err = c.randomNode() - if err != nil { - setCmdsErr(cmds, err) - setRetErr(err) - continue - } - } - cn, _, err := node.Client.conn() if err != nil { setCmdsErr(cmds, err) @@ -660,7 +686,7 @@ func (c *ClusterClient) execClusterCmds( if moved { c.lazyReloadSlots() - node, err := c.nodeByAddr(addr) + node, err := c.nodes.Get(addr) if err != nil { setRetErr(err) continue @@ -669,7 +695,7 @@ func (c *ClusterClient) execClusterCmds( cmd.reset() failedCmds[node] = append(failedCmds[node], cmd) } else if ask { - node, err := c.nodeByAddr(addr) + node, err := c.nodes.Get(addr) if err != nil { setRetErr(err) continue diff --git a/cluster_client_test.go b/cluster_client_test.go deleted file mode 100644 index 0a30d5d..0000000 --- a/cluster_client_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package redis - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -func (c *ClusterClient) SlotAddrs(slot int) []string { - var addrs []string - for _, n := range c.slotNodes(slot) { - addrs = append(addrs, n.Client.getAddr()) - } - return addrs -} - -// SwapSlot swaps a slot's master/slave address -// for testing MOVED redirects -func (c *ClusterClient) SwapSlotNodes(slot int) []string { - c.mu.Lock() - nodes := c.slots[slot] - nodes[0], nodes[1] = nodes[1], nodes[0] - c.mu.Unlock() - return c.SlotAddrs(slot) -} - -var _ = Describe("ClusterClient", func() { - var subject *ClusterClient - - var populate = func() { - subject.setSlots([]ClusterSlot{ - {0, 4095, []ClusterNode{{"", "127.0.0.1:7000"}, {"", "127.0.0.1:7004"}}}, - {12288, 16383, []ClusterNode{{"", "127.0.0.1:7003"}, {"", "127.0.0.1:7007"}}}, - {4096, 8191, []ClusterNode{{"", "127.0.0.1:7001"}, {"", "127.0.0.1:7005"}}}, - {8192, 12287, []ClusterNode{{"", "127.0.0.1:7002"}, {"", "127.0.0.1:7006"}}}, - }) - } - - BeforeEach(func() { - subject = NewClusterClient(&ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"}, - }) - }) - - AfterEach(func() { - _ = subject.Close() - }) - - It("should initialize", func() { - Expect(subject.addrs).To(HaveLen(3)) - }) - - It("should update slots cache", func() { - populate() - Expect(subject.slots[0][0].Client.getAddr()).To(Equal("127.0.0.1:7000")) - Expect(subject.slots[0][1].Client.getAddr()).To(Equal("127.0.0.1:7004")) - Expect(subject.slots[4095][0].Client.getAddr()).To(Equal("127.0.0.1:7000")) - Expect(subject.slots[4095][1].Client.getAddr()).To(Equal("127.0.0.1:7004")) - Expect(subject.slots[4096][0].Client.getAddr()).To(Equal("127.0.0.1:7001")) - Expect(subject.slots[4096][1].Client.getAddr()).To(Equal("127.0.0.1:7005")) - Expect(subject.slots[8191][0].Client.getAddr()).To(Equal("127.0.0.1:7001")) - Expect(subject.slots[8191][1].Client.getAddr()).To(Equal("127.0.0.1:7005")) - Expect(subject.slots[8192][0].Client.getAddr()).To(Equal("127.0.0.1:7002")) - Expect(subject.slots[8192][1].Client.getAddr()).To(Equal("127.0.0.1:7006")) - Expect(subject.slots[12287][0].Client.getAddr()).To(Equal("127.0.0.1:7002")) - Expect(subject.slots[12287][1].Client.getAddr()).To(Equal("127.0.0.1:7006")) - Expect(subject.slots[12288][0].Client.getAddr()).To(Equal("127.0.0.1:7003")) - Expect(subject.slots[12288][1].Client.getAddr()).To(Equal("127.0.0.1:7007")) - Expect(subject.slots[16383][0].Client.getAddr()).To(Equal("127.0.0.1:7003")) - Expect(subject.slots[16383][1].Client.getAddr()).To(Equal("127.0.0.1:7007")) - Expect(subject.addrs).To(Equal([]string{ - "127.0.0.1:6379", - "127.0.0.1:7003", - "127.0.0.1:7006", - "127.0.0.1:7000", - "127.0.0.1:7004", - "127.0.0.1:7007", - "127.0.0.1:7001", - "127.0.0.1:7005", - "127.0.0.1:7002", - })) - }) - - It("should close", func() { - populate() - Expect(subject.Close()).NotTo(HaveOccurred()) - Expect(subject.addrs).To(BeEmpty()) - Expect(subject.nodes).To(BeEmpty()) - Expect(subject.slots).To(BeEmpty()) - Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed")) - }) -}) diff --git a/export_test.go b/export_test.go index d36f867..a366dce 100644 --- a/export_test.go +++ b/export_test.go @@ -17,3 +17,18 @@ func (c *PubSub) Pool() pool.Pooler { func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) { return c.receiveMessage(timeout) } + +func (c *ClusterClient) SlotAddrs(slot int) []string { + var addrs []string + for _, n := range c.state().slotNodes(slot) { + addrs = append(addrs, n.Client.getAddr()) + } + return addrs +} + +// SwapSlot swaps a slot's master/slave address for testing MOVED redirects. +func (c *ClusterClient) SwapSlotNodes(slot int) []string { + nodes := c.state().slots[slot] + nodes[0], nodes[1] = nodes[1], nodes[0] + return c.SlotAddrs(slot) +}