Use consistent cluster state when executing pipeline.

This commit is contained in:
Vladimir Mihailenco 2016-11-08 11:46:44 +02:00
parent 80cf5d1652
commit 83208a1d9b
3 changed files with 268 additions and 318 deletions

View File

@ -85,189 +85,132 @@ type clusterNode struct {
loading time.Time
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
opt := clOpt.clientOptions()
opt.Addr = addr
node := clusterNode{
Client: NewClient(opt),
}
if clOpt.RouteByLatency {
const probes = 10
for i := 0; i < probes; i++ {
t1 := time.Now()
node.Client.Ping()
node.Latency += time.Since(t1)
}
node.Latency = node.Latency / probes
}
return &node
}
func (n *clusterNode) Loading() bool {
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
}
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
cmdable
//------------------------------------------------------------------------------
type clusterNodes struct {
opt *ClusterOptions
mu sync.RWMutex
addrs []string
nodes map[string]*clusterNode
slots [][]*clusterNode
closed bool
cmdsInfoOnce *sync.Once
cmdsInfo map[string]*CommandInfo
// Reports where slots reloading is in progress.
reloading uint32
}
var _ Cmdable = (*ClusterClient)(nil)
// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()
c := &ClusterClient{
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
return &clusterNodes{
opt: opt,
nodes: make(map[string]*clusterNode),
cmdsInfoOnce: new(sync.Once),
}
c.cmdable.process = c.Process
for _, addr := range opt.Addrs {
_, _ = c.nodeByAddr(addr)
}
c.reloadSlots()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
return c
}
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
c.cmdsInfoOnce.Do(func() {
for _, node := range c.nodes {
cmdsInfo, err := node.Client.Command().Result()
if err == nil {
c.cmdsInfo = cmdsInfo
return
}
}
c.cmdsInfoOnce = &sync.Once{}
})
if c.cmdsInfo == nil {
func (c *clusterNodes) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
return c.cmdsInfo[name]
}
c.closed = true
func (c *ClusterClient) getNodes() map[string]*clusterNode {
var nodes map[string]*clusterNode
c.mu.RLock()
if !c.closed {
nodes = make(map[string]*clusterNode, len(c.nodes))
for addr, node := range c.nodes {
nodes[addr] = node
var firstErr error
for _, node := range c.nodes {
if err := node.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
c.mu.RUnlock()
return nodes
c.addrs = nil
c.nodes = nil
return firstErr
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
node, err := c.slotMasterNode(hashtag.Slot(keys[0]))
if err != nil {
return err
}
return node.Client.Watch(fn, keys...)
}
// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
for _, node := range c.getNodes() {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
return &acc
}
// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a ClusterClient, as the ClusterClient is meant
// to be long-lived and shared between many goroutines.
func (c *ClusterClient) Close() error {
c.mu.Lock()
if !c.closed {
c.closeClients()
c.addrs = nil
c.nodes = nil
c.slots = nil
c.cmdsInfo = nil
}
c.closed = true
c.mu.Unlock()
return nil
}
func (c *ClusterClient) nodeByAddr(addr string) (*clusterNode, error) {
func (c *clusterNodes) All() ([]*clusterNode, error) {
c.mu.RLock()
node, ok := c.nodes[addr]
defer c.mu.RUnlock()
if c.closed {
return nil, pool.ErrClosed
}
var nodes []*clusterNode
for _, node := range c.nodes {
nodes = append(nodes, node)
}
return nodes, nil
}
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
var node *clusterNode
var ok bool
c.mu.RLock()
if !c.closed {
node, ok = c.nodes[addr]
}
c.mu.RUnlock()
if ok {
return node, nil
}
defer c.mu.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil, pool.ErrClosed
}
node, ok = c.nodes[addr]
if !ok {
node = c.newNode(addr)
c.nodes[addr] = node
c.addrs = append(c.addrs, addr)
if ok {
return node, nil
}
c.addrs = append(c.addrs, addr)
node = newClusterNode(c.opt, addr)
c.nodes[addr] = node
return node, nil
}
func (c *ClusterClient) newNode(addr string) *clusterNode {
opt := c.opt.clientOptions()
opt.Addr = addr
return &clusterNode{
Client: NewClient(opt),
}
}
func (c *ClusterClient) slotNodes(slot int) (nodes []*clusterNode) {
func (c *clusterNodes) Random() (*clusterNode, error) {
c.mu.RLock()
if slot < len(c.slots) {
nodes = c.slots[slot]
}
closed := c.closed
addrs := c.addrs
c.mu.RUnlock()
return nodes
}
// randomNode returns random live node.
func (c *ClusterClient) randomNode() (*clusterNode, error) {
if closed {
return nil, pool.ErrClosed
}
if len(addrs) == 0 {
return nil, errClusterNoNodes
}
var nodeErr error
for i := 0; i < 10; i++ {
c.mu.RLock()
closed := c.closed
addrs := c.addrs
c.mu.RUnlock()
if closed {
return nil, pool.ErrClosed
}
if len(addrs) == 0 {
return nil, errClusterNoNodes
}
n := rand.Intn(len(addrs))
node, err := c.nodeByAddr(addrs[n])
node, err := c.Get(addrs[n])
if err != nil {
return nil, err
}
@ -280,19 +223,50 @@ func (c *ClusterClient) randomNode() (*clusterNode, error) {
return nil, nodeErr
}
func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
//------------------------------------------------------------------------------
type clusterState struct {
nodes *clusterNodes
slots [][]*clusterNode
}
func newClusterState(nodes *clusterNodes, slots []ClusterSlot) (*clusterState, error) {
c := clusterState{
nodes: nodes,
slots: make([][]*clusterNode, hashtag.SlotNumber),
}
for _, slot := range slots {
var nodes []*clusterNode
for _, slotNode := range slot.Nodes {
node, err := c.nodes.Get(slotNode.Addr)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
for i := slot.Start; i <= slot.End; i++ {
c.slots[i] = nodes
}
}
return &c, nil
}
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) == 0 {
return c.randomNode()
return c.nodes.Random()
}
return nodes[0], nil
}
func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) {
func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
switch len(nodes) {
case 0:
return c.randomNode()
return c.nodes.Random()
case 1:
return nodes[0], nil
case 2:
@ -313,10 +287,10 @@ func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) {
}
}
func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) {
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) == 0 {
return c.randomNode()
return c.nodes.Random()
}
var node *clusterNode
@ -328,31 +302,108 @@ func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) {
return node, nil
}
func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
cmdInfo := c.cmdInfo(cmd.arg(0))
func (c *clusterState) slotNodes(slot int) []*clusterNode {
if slot < len(c.slots) {
return c.slots[slot]
}
return nil
}
//------------------------------------------------------------------------------
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
cmdable
opt *ClusterOptions
cmds map[string]*CommandInfo
nodes *clusterNodes
_state atomic.Value
// Reports where slots reloading is in progress.
reloading uint32
closed bool
}
var _ Cmdable = (*ClusterClient)(nil)
// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()
c := &ClusterClient{
opt: opt,
nodes: newClusterNodes(opt),
}
c.cmdable.process = c.Process
// Add initial nodes.
for _, addr := range opt.Addrs {
_, _ = c.nodes.Get(addr)
}
c.reloadSlots()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
return c
}
func (c *ClusterClient) state() *clusterState {
v := c._state.Load()
if v == nil {
return nil
}
return v.(*clusterState)
}
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
cmdInfo := c.cmds[cmd.arg(0)]
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
if firstKey == "" {
node, err := c.randomNode()
node, err := c.nodes.Random()
return -1, node, err
}
slot := hashtag.Slot(firstKey)
if cmdInfo.ReadOnly && c.opt.ReadOnly {
if c.opt.RouteByLatency {
node, err := c.slotClosestNode(slot)
node, err := state.slotClosestNode(slot)
return slot, node, err
}
node, err := c.slotSlaveNode(slot)
node, err := state.slotSlaveNode(slot)
return slot, node, err
}
node, err := c.slotMasterNode(slot)
node, err := state.slotMasterNode(slot)
return slot, node, err
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
node, err := c.state().slotMasterNode(hashtag.Slot(keys[0]))
if err != nil {
return err
}
return node.Client.Watch(fn, keys...)
}
// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a ClusterClient, as the ClusterClient is meant
// to be long-lived and shared between many goroutines.
func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
func (c *ClusterClient) Process(cmd Cmder) error {
slot, node, err := c.cmdSlotAndNode(cmd)
slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
if err != nil {
cmd.setErr(err)
return err
@ -388,7 +439,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// On network errors try random node.
if internal.IsRetryableError(err) {
node, err = c.randomNode()
node, err = c.nodes.Random()
continue
}
@ -397,17 +448,18 @@ func (c *ClusterClient) Process(cmd Cmder) error {
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
if slot >= 0 {
master, _ := c.slotMasterNode(slot)
master, _ := c.state().slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) {
c.lazyReloadSlots()
}
}
node, err = c.nodeByAddr(addr)
node, err = c.nodes.Get(addr)
if err != nil {
cmd.setErr(err)
return err
}
continue
}
@ -420,14 +472,15 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
c.mu.RLock()
slots := c.slots
c.mu.RUnlock()
state := c.state()
if state == nil {
return nil
}
var wg sync.WaitGroup
visited := make(map[*clusterNode]struct{})
errCh := make(chan error, 1)
for _, nodes := range slots {
for _, nodes := range state.slots {
if len(nodes) == 0 {
continue
}
@ -460,77 +513,59 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
}
}
// closeClients closes all clients and returns the first error if there are any.
func (c *ClusterClient) closeClients() error {
var retErr error
for _, node := range c.nodes {
if err := node.Client.Close(); err != nil && retErr == nil {
retErr = err
}
}
return retErr
}
func (c *ClusterClient) setSlots(cs []ClusterSlot) {
slots := make([][]*clusterNode, hashtag.SlotNumber)
for _, s := range cs {
var nodes []*clusterNode
for _, n := range s.Nodes {
node, err := c.nodeByAddr(n.Addr)
if err == nil {
nodes = append(nodes, node)
}
}
for i := s.Start; i <= s.End; i++ {
slots[i] = nodes
}
// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
nodes, err := c.nodes.All()
if err != nil {
return nil
}
c.mu.Lock()
if !c.closed {
c.slots = slots
var acc PoolStats
for _, node := range nodes {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
c.mu.Unlock()
return &acc
}
func (c *ClusterClient) lazyReloadSlots() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go c.reloadSlots()
go func() {
c.reloadSlots()
atomic.StoreUint32(&c.reloading, 0)
}()
}
func (c *ClusterClient) reloadSlots() {
defer atomic.StoreUint32(&c.reloading, 0)
node, err := c.randomNode()
if err != nil {
return
}
slots, err := node.Client.ClusterSlots().Result()
if err != nil {
internal.Logf("ClusterSlots on addr=%q failed: %s", node.Client.getAddr(), err)
return
}
c.setSlots(slots)
if c.opt.RouteByLatency {
c.setNodesLatency()
}
}
func (c *ClusterClient) setNodesLatency() {
const n = 10
for _, node := range c.getNodes() {
var latency time.Duration
for i := 0; i < n; i++ {
t1 := time.Now()
node.Client.Ping()
latency += time.Since(t1)
for i := 0; i < 10; i++ {
node, err := c.nodes.Random()
if err != nil {
return
}
node.Latency = latency / n
if c.cmds == nil {
cmds, err := node.Client.Command().Result()
if err == nil {
c.cmds = cmds
}
}
slots, err := node.Client.ClusterSlots().Result()
if err != nil {
continue
}
state, err := newClusterState(c.nodes, slots)
if err != nil {
return
}
c._state.Store(state)
}
}
@ -540,8 +575,8 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
defer ticker.Stop()
for _ = range ticker.C {
nodes := c.getNodes()
if nodes == nil {
nodes, err := c.nodes.All()
if err != nil {
break
}
@ -584,9 +619,10 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
}
}
state := c.state()
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
_, node, err := c.cmdSlotAndNode(cmd)
_, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
setRetErr(err)
@ -599,16 +635,6 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
if node == nil {
var err error
node, err = c.randomNode()
if err != nil {
setCmdsErr(cmds, err)
setRetErr(err)
continue
}
}
cn, _, err := node.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
@ -660,7 +686,7 @@ func (c *ClusterClient) execClusterCmds(
if moved {
c.lazyReloadSlots()
node, err := c.nodeByAddr(addr)
node, err := c.nodes.Get(addr)
if err != nil {
setRetErr(err)
continue
@ -669,7 +695,7 @@ func (c *ClusterClient) execClusterCmds(
cmd.reset()
failedCmds[node] = append(failedCmds[node], cmd)
} else if ask {
node, err := c.nodeByAddr(addr)
node, err := c.nodes.Get(addr)
if err != nil {
setRetErr(err)
continue

View File

@ -1,91 +0,0 @@
package redis
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func (c *ClusterClient) SlotAddrs(slot int) []string {
var addrs []string
for _, n := range c.slotNodes(slot) {
addrs = append(addrs, n.Client.getAddr())
}
return addrs
}
// SwapSlot swaps a slot's master/slave address
// for testing MOVED redirects
func (c *ClusterClient) SwapSlotNodes(slot int) []string {
c.mu.Lock()
nodes := c.slots[slot]
nodes[0], nodes[1] = nodes[1], nodes[0]
c.mu.Unlock()
return c.SlotAddrs(slot)
}
var _ = Describe("ClusterClient", func() {
var subject *ClusterClient
var populate = func() {
subject.setSlots([]ClusterSlot{
{0, 4095, []ClusterNode{{"", "127.0.0.1:7000"}, {"", "127.0.0.1:7004"}}},
{12288, 16383, []ClusterNode{{"", "127.0.0.1:7003"}, {"", "127.0.0.1:7007"}}},
{4096, 8191, []ClusterNode{{"", "127.0.0.1:7001"}, {"", "127.0.0.1:7005"}}},
{8192, 12287, []ClusterNode{{"", "127.0.0.1:7002"}, {"", "127.0.0.1:7006"}}},
})
}
BeforeEach(func() {
subject = NewClusterClient(&ClusterOptions{
Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"},
})
})
AfterEach(func() {
_ = subject.Close()
})
It("should initialize", func() {
Expect(subject.addrs).To(HaveLen(3))
})
It("should update slots cache", func() {
populate()
Expect(subject.slots[0][0].Client.getAddr()).To(Equal("127.0.0.1:7000"))
Expect(subject.slots[0][1].Client.getAddr()).To(Equal("127.0.0.1:7004"))
Expect(subject.slots[4095][0].Client.getAddr()).To(Equal("127.0.0.1:7000"))
Expect(subject.slots[4095][1].Client.getAddr()).To(Equal("127.0.0.1:7004"))
Expect(subject.slots[4096][0].Client.getAddr()).To(Equal("127.0.0.1:7001"))
Expect(subject.slots[4096][1].Client.getAddr()).To(Equal("127.0.0.1:7005"))
Expect(subject.slots[8191][0].Client.getAddr()).To(Equal("127.0.0.1:7001"))
Expect(subject.slots[8191][1].Client.getAddr()).To(Equal("127.0.0.1:7005"))
Expect(subject.slots[8192][0].Client.getAddr()).To(Equal("127.0.0.1:7002"))
Expect(subject.slots[8192][1].Client.getAddr()).To(Equal("127.0.0.1:7006"))
Expect(subject.slots[12287][0].Client.getAddr()).To(Equal("127.0.0.1:7002"))
Expect(subject.slots[12287][1].Client.getAddr()).To(Equal("127.0.0.1:7006"))
Expect(subject.slots[12288][0].Client.getAddr()).To(Equal("127.0.0.1:7003"))
Expect(subject.slots[12288][1].Client.getAddr()).To(Equal("127.0.0.1:7007"))
Expect(subject.slots[16383][0].Client.getAddr()).To(Equal("127.0.0.1:7003"))
Expect(subject.slots[16383][1].Client.getAddr()).To(Equal("127.0.0.1:7007"))
Expect(subject.addrs).To(Equal([]string{
"127.0.0.1:6379",
"127.0.0.1:7003",
"127.0.0.1:7006",
"127.0.0.1:7000",
"127.0.0.1:7004",
"127.0.0.1:7007",
"127.0.0.1:7001",
"127.0.0.1:7005",
"127.0.0.1:7002",
}))
})
It("should close", func() {
populate()
Expect(subject.Close()).NotTo(HaveOccurred())
Expect(subject.addrs).To(BeEmpty())
Expect(subject.nodes).To(BeEmpty())
Expect(subject.slots).To(BeEmpty())
Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed"))
})
})

View File

@ -17,3 +17,18 @@ func (c *PubSub) Pool() pool.Pooler {
func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) {
return c.receiveMessage(timeout)
}
func (c *ClusterClient) SlotAddrs(slot int) []string {
var addrs []string
for _, n := range c.state().slotNodes(slot) {
addrs = append(addrs, n.Client.getAddr())
}
return addrs
}
// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
func (c *ClusterClient) SwapSlotNodes(slot int) []string {
nodes := c.state().slots[slot]
nodes[0], nodes[1] = nodes[1], nodes[0]
return c.SlotAddrs(slot)
}