forked from mirror/redis
Merge pull request #416 from go-redis/fix/cluster-state
Use consistent cluster state when executing pipeline.
This commit is contained in:
commit
b6bfe529a8
458
cluster.go
458
cluster.go
|
@ -85,174 +85,116 @@ type clusterNode struct {
|
||||||
loading time.Time
|
loading time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||||
|
opt := clOpt.clientOptions()
|
||||||
|
opt.Addr = addr
|
||||||
|
node := clusterNode{
|
||||||
|
Client: NewClient(opt),
|
||||||
|
}
|
||||||
|
|
||||||
|
if clOpt.RouteByLatency {
|
||||||
|
const probes = 10
|
||||||
|
for i := 0; i < probes; i++ {
|
||||||
|
t1 := time.Now()
|
||||||
|
node.Client.Ping()
|
||||||
|
node.Latency += time.Since(t1)
|
||||||
|
}
|
||||||
|
node.Latency = node.Latency / probes
|
||||||
|
}
|
||||||
|
|
||||||
|
return &node
|
||||||
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Loading() bool {
|
func (n *clusterNode) Loading() bool {
|
||||||
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
|
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClusterClient is a Redis Cluster client representing a pool of zero
|
//------------------------------------------------------------------------------
|
||||||
// or more underlying connections. It's safe for concurrent use by
|
|
||||||
// multiple goroutines.
|
|
||||||
type ClusterClient struct {
|
|
||||||
cmdable
|
|
||||||
|
|
||||||
|
type clusterNodes struct {
|
||||||
opt *ClusterOptions
|
opt *ClusterOptions
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
addrs []string
|
addrs []string
|
||||||
nodes map[string]*clusterNode
|
nodes map[string]*clusterNode
|
||||||
slots [][]*clusterNode
|
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
cmdsInfoOnce *sync.Once
|
|
||||||
cmdsInfo map[string]*CommandInfo
|
|
||||||
|
|
||||||
// Reports where slots reloading is in progress.
|
|
||||||
reloading uint32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Cmdable = (*ClusterClient)(nil)
|
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
|
||||||
|
return &clusterNodes{
|
||||||
// NewClusterClient returns a Redis Cluster client as described in
|
|
||||||
// http://redis.io/topics/cluster-spec.
|
|
||||||
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
|
||||||
opt.init()
|
|
||||||
|
|
||||||
c := &ClusterClient{
|
|
||||||
opt: opt,
|
opt: opt,
|
||||||
nodes: make(map[string]*clusterNode),
|
nodes: make(map[string]*clusterNode),
|
||||||
|
|
||||||
cmdsInfoOnce: new(sync.Once),
|
|
||||||
}
|
}
|
||||||
c.cmdable.process = c.Process
|
|
||||||
|
|
||||||
for _, addr := range opt.Addrs {
|
|
||||||
_, _ = c.nodeByAddr(addr)
|
|
||||||
}
|
|
||||||
c.reloadSlots()
|
|
||||||
|
|
||||||
if opt.IdleCheckFrequency > 0 {
|
|
||||||
go c.reaper(opt.IdleCheckFrequency)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return c
|
func (c *clusterNodes) Close() error {
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
|
|
||||||
c.cmdsInfoOnce.Do(func() {
|
|
||||||
for _, node := range c.nodes {
|
|
||||||
cmdsInfo, err := node.Client.Command().Result()
|
|
||||||
if err == nil {
|
|
||||||
c.cmdsInfo = cmdsInfo
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.cmdsInfoOnce = &sync.Once{}
|
|
||||||
})
|
|
||||||
if c.cmdsInfo == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return c.cmdsInfo[name]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClusterClient) getNodes() map[string]*clusterNode {
|
|
||||||
var nodes map[string]*clusterNode
|
|
||||||
c.mu.RLock()
|
|
||||||
if !c.closed {
|
|
||||||
nodes = make(map[string]*clusterNode, len(c.nodes))
|
|
||||||
for addr, node := range c.nodes {
|
|
||||||
nodes[addr] = node
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.mu.RUnlock()
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
|
|
||||||
node, err := c.slotMasterNode(hashtag.Slot(keys[0]))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return node.Client.Watch(fn, keys...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PoolStats returns accumulated connection pool stats.
|
|
||||||
func (c *ClusterClient) PoolStats() *PoolStats {
|
|
||||||
var acc PoolStats
|
|
||||||
for _, node := range c.getNodes() {
|
|
||||||
s := node.Client.connPool.Stats()
|
|
||||||
acc.Requests += s.Requests
|
|
||||||
acc.Hits += s.Hits
|
|
||||||
acc.Timeouts += s.Timeouts
|
|
||||||
acc.TotalConns += s.TotalConns
|
|
||||||
acc.FreeConns += s.FreeConns
|
|
||||||
}
|
|
||||||
return &acc
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the cluster client, releasing any open resources.
|
|
||||||
//
|
|
||||||
// It is rare to Close a ClusterClient, as the ClusterClient is meant
|
|
||||||
// to be long-lived and shared between many goroutines.
|
|
||||||
func (c *ClusterClient) Close() error {
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if !c.closed {
|
defer c.mu.Unlock()
|
||||||
c.closeClients()
|
|
||||||
c.addrs = nil
|
if c.closed {
|
||||||
c.nodes = nil
|
return nil
|
||||||
c.slots = nil
|
|
||||||
c.cmdsInfo = nil
|
|
||||||
}
|
}
|
||||||
c.closed = true
|
c.closed = true
|
||||||
c.mu.Unlock()
|
|
||||||
return nil
|
var firstErr error
|
||||||
|
for _, node := range c.nodes {
|
||||||
|
if err := node.Client.Close(); err != nil && firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.addrs = nil
|
||||||
|
c.nodes = nil
|
||||||
|
|
||||||
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) nodeByAddr(addr string) (*clusterNode, error) {
|
func (c *clusterNodes) All() ([]*clusterNode, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
node, ok := c.nodes[addr]
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
|
if c.closed {
|
||||||
|
return nil, pool.ErrClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
var nodes []*clusterNode
|
||||||
|
for _, node := range c.nodes {
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
return nodes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
|
||||||
|
var node *clusterNode
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
c.mu.RLock()
|
||||||
|
if !c.closed {
|
||||||
|
node, ok = c.nodes[addr]
|
||||||
|
}
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
if ok {
|
if ok {
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.closed {
|
if c.closed {
|
||||||
return nil, pool.ErrClosed
|
return nil, pool.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
node, ok = c.nodes[addr]
|
node, ok = c.nodes[addr]
|
||||||
if !ok {
|
if ok {
|
||||||
node = c.newNode(addr)
|
|
||||||
c.nodes[addr] = node
|
|
||||||
c.addrs = append(c.addrs, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) newNode(addr string) *clusterNode {
|
c.addrs = append(c.addrs, addr)
|
||||||
opt := c.opt.clientOptions()
|
node = newClusterNode(c.opt, addr)
|
||||||
opt.Addr = addr
|
c.nodes[addr] = node
|
||||||
return &clusterNode{
|
return node, nil
|
||||||
Client: NewClient(opt),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) slotNodes(slot int) (nodes []*clusterNode) {
|
func (c *clusterNodes) Random() (*clusterNode, error) {
|
||||||
c.mu.RLock()
|
|
||||||
if slot < len(c.slots) {
|
|
||||||
nodes = c.slots[slot]
|
|
||||||
}
|
|
||||||
c.mu.RUnlock()
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
// randomNode returns random live node.
|
|
||||||
func (c *ClusterClient) randomNode() (*clusterNode, error) {
|
|
||||||
var nodeErr error
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
closed := c.closed
|
closed := c.closed
|
||||||
addrs := c.addrs
|
addrs := c.addrs
|
||||||
|
@ -261,13 +203,14 @@ func (c *ClusterClient) randomNode() (*clusterNode, error) {
|
||||||
if closed {
|
if closed {
|
||||||
return nil, pool.ErrClosed
|
return nil, pool.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(addrs) == 0 {
|
if len(addrs) == 0 {
|
||||||
return nil, errClusterNoNodes
|
return nil, errClusterNoNodes
|
||||||
}
|
}
|
||||||
n := rand.Intn(len(addrs))
|
|
||||||
|
|
||||||
node, err := c.nodeByAddr(addrs[n])
|
var nodeErr error
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
n := rand.Intn(len(addrs))
|
||||||
|
node, err := c.Get(addrs[n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -280,19 +223,50 @@ func (c *ClusterClient) randomNode() (*clusterNode, error) {
|
||||||
return nil, nodeErr
|
return nil, nodeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type clusterState struct {
|
||||||
|
nodes *clusterNodes
|
||||||
|
slots [][]*clusterNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClusterState(nodes *clusterNodes, slots []ClusterSlot) (*clusterState, error) {
|
||||||
|
c := clusterState{
|
||||||
|
nodes: nodes,
|
||||||
|
slots: make([][]*clusterNode, hashtag.SlotNumber),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, slot := range slots {
|
||||||
|
var nodes []*clusterNode
|
||||||
|
for _, slotNode := range slot.Nodes {
|
||||||
|
node, err := c.nodes.Get(slotNode.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := slot.Start; i <= slot.End; i++ {
|
||||||
|
c.slots[i] = nodes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return c.randomNode()
|
return c.nodes.Random()
|
||||||
}
|
}
|
||||||
return nodes[0], nil
|
return nodes[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) {
|
func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
switch len(nodes) {
|
switch len(nodes) {
|
||||||
case 0:
|
case 0:
|
||||||
return c.randomNode()
|
return c.nodes.Random()
|
||||||
case 1:
|
case 1:
|
||||||
return nodes[0], nil
|
return nodes[0], nil
|
||||||
case 2:
|
case 2:
|
||||||
|
@ -313,10 +287,10 @@ func (c *ClusterClient) slotSlaveNode(slot int) (*clusterNode, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) {
|
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return c.randomNode()
|
return c.nodes.Random()
|
||||||
}
|
}
|
||||||
|
|
||||||
var node *clusterNode
|
var node *clusterNode
|
||||||
|
@ -328,31 +302,108 @@ func (c *ClusterClient) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
|
func (c *clusterState) slotNodes(slot int) []*clusterNode {
|
||||||
cmdInfo := c.cmdInfo(cmd.arg(0))
|
if slot < len(c.slots) {
|
||||||
|
return c.slots[slot]
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// ClusterClient is a Redis Cluster client representing a pool of zero
|
||||||
|
// or more underlying connections. It's safe for concurrent use by
|
||||||
|
// multiple goroutines.
|
||||||
|
type ClusterClient struct {
|
||||||
|
cmdable
|
||||||
|
|
||||||
|
opt *ClusterOptions
|
||||||
|
cmds map[string]*CommandInfo
|
||||||
|
nodes *clusterNodes
|
||||||
|
_state atomic.Value
|
||||||
|
|
||||||
|
// Reports where slots reloading is in progress.
|
||||||
|
reloading uint32
|
||||||
|
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmdable = (*ClusterClient)(nil)
|
||||||
|
|
||||||
|
// NewClusterClient returns a Redis Cluster client as described in
|
||||||
|
// http://redis.io/topics/cluster-spec.
|
||||||
|
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
||||||
|
opt.init()
|
||||||
|
|
||||||
|
c := &ClusterClient{
|
||||||
|
opt: opt,
|
||||||
|
nodes: newClusterNodes(opt),
|
||||||
|
}
|
||||||
|
c.cmdable.process = c.Process
|
||||||
|
|
||||||
|
// Add initial nodes.
|
||||||
|
for _, addr := range opt.Addrs {
|
||||||
|
_, _ = c.nodes.Get(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.reloadSlots()
|
||||||
|
|
||||||
|
if opt.IdleCheckFrequency > 0 {
|
||||||
|
go c.reaper(opt.IdleCheckFrequency)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) state() *clusterState {
|
||||||
|
v := c._state.Load()
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v.(*clusterState)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
|
||||||
|
cmdInfo := c.cmds[cmd.arg(0)]
|
||||||
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
|
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
|
||||||
if firstKey == "" {
|
if firstKey == "" {
|
||||||
node, err := c.randomNode()
|
node, err := c.nodes.Random()
|
||||||
return -1, node, err
|
return -1, node, err
|
||||||
}
|
}
|
||||||
slot := hashtag.Slot(firstKey)
|
slot := hashtag.Slot(firstKey)
|
||||||
|
|
||||||
if cmdInfo.ReadOnly && c.opt.ReadOnly {
|
if cmdInfo.ReadOnly && c.opt.ReadOnly {
|
||||||
if c.opt.RouteByLatency {
|
if c.opt.RouteByLatency {
|
||||||
node, err := c.slotClosestNode(slot)
|
node, err := state.slotClosestNode(slot)
|
||||||
return slot, node, err
|
return slot, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := c.slotSlaveNode(slot)
|
node, err := state.slotSlaveNode(slot)
|
||||||
return slot, node, err
|
return slot, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := c.slotMasterNode(slot)
|
node, err := state.slotMasterNode(slot)
|
||||||
return slot, node, err
|
return slot, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
|
||||||
|
node, err := c.state().slotMasterNode(hashtag.Slot(keys[0]))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return node.Client.Watch(fn, keys...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the cluster client, releasing any open resources.
|
||||||
|
//
|
||||||
|
// It is rare to Close a ClusterClient, as the ClusterClient is meant
|
||||||
|
// to be long-lived and shared between many goroutines.
|
||||||
|
func (c *ClusterClient) Close() error {
|
||||||
|
return c.nodes.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) Process(cmd Cmder) error {
|
func (c *ClusterClient) Process(cmd Cmder) error {
|
||||||
slot, node, err := c.cmdSlotAndNode(cmd)
|
slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
return err
|
return err
|
||||||
|
@ -388,7 +439,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
|
||||||
|
|
||||||
// On network errors try random node.
|
// On network errors try random node.
|
||||||
if internal.IsRetryableError(err) {
|
if internal.IsRetryableError(err) {
|
||||||
node, err = c.randomNode()
|
node, err = c.nodes.Random()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,17 +448,18 @@ func (c *ClusterClient) Process(cmd Cmder) error {
|
||||||
moved, ask, addr = internal.IsMovedError(err)
|
moved, ask, addr = internal.IsMovedError(err)
|
||||||
if moved || ask {
|
if moved || ask {
|
||||||
if slot >= 0 {
|
if slot >= 0 {
|
||||||
master, _ := c.slotMasterNode(slot)
|
master, _ := c.state().slotMasterNode(slot)
|
||||||
if moved && (master == nil || master.Client.getAddr() != addr) {
|
if moved && (master == nil || master.Client.getAddr() != addr) {
|
||||||
c.lazyReloadSlots()
|
c.lazyReloadSlots()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err = c.nodeByAddr(addr)
|
node, err = c.nodes.Get(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,14 +472,15 @@ func (c *ClusterClient) Process(cmd Cmder) error {
|
||||||
// ForEachMaster concurrently calls the fn on each master node in the cluster.
|
// ForEachMaster concurrently calls the fn on each master node in the cluster.
|
||||||
// It returns the first error if any.
|
// It returns the first error if any.
|
||||||
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
|
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
|
||||||
c.mu.RLock()
|
state := c.state()
|
||||||
slots := c.slots
|
if state == nil {
|
||||||
c.mu.RUnlock()
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
visited := make(map[*clusterNode]struct{})
|
visited := make(map[*clusterNode]struct{})
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
for _, nodes := range slots {
|
for _, nodes := range state.slots {
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -460,77 +513,59 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeClients closes all clients and returns the first error if there are any.
|
// PoolStats returns accumulated connection pool stats.
|
||||||
func (c *ClusterClient) closeClients() error {
|
func (c *ClusterClient) PoolStats() *PoolStats {
|
||||||
var retErr error
|
nodes, err := c.nodes.All()
|
||||||
for _, node := range c.nodes {
|
if err != nil {
|
||||||
if err := node.Client.Close(); err != nil && retErr == nil {
|
return nil
|
||||||
retErr = err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return retErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) setSlots(cs []ClusterSlot) {
|
var acc PoolStats
|
||||||
slots := make([][]*clusterNode, hashtag.SlotNumber)
|
for _, node := range nodes {
|
||||||
for _, s := range cs {
|
s := node.Client.connPool.Stats()
|
||||||
var nodes []*clusterNode
|
acc.Requests += s.Requests
|
||||||
for _, n := range s.Nodes {
|
acc.Hits += s.Hits
|
||||||
node, err := c.nodeByAddr(n.Addr)
|
acc.Timeouts += s.Timeouts
|
||||||
if err == nil {
|
acc.TotalConns += s.TotalConns
|
||||||
nodes = append(nodes, node)
|
acc.FreeConns += s.FreeConns
|
||||||
}
|
}
|
||||||
}
|
return &acc
|
||||||
|
|
||||||
for i := s.Start; i <= s.End; i++ {
|
|
||||||
slots[i] = nodes
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mu.Lock()
|
|
||||||
if !c.closed {
|
|
||||||
c.slots = slots
|
|
||||||
}
|
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) lazyReloadSlots() {
|
func (c *ClusterClient) lazyReloadSlots() {
|
||||||
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
|
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go c.reloadSlots()
|
go func() {
|
||||||
|
c.reloadSlots()
|
||||||
|
atomic.StoreUint32(&c.reloading, 0)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) reloadSlots() {
|
func (c *ClusterClient) reloadSlots() {
|
||||||
defer atomic.StoreUint32(&c.reloading, 0)
|
for i := 0; i < 10; i++ {
|
||||||
|
node, err := c.nodes.Random()
|
||||||
node, err := c.randomNode()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.cmds == nil {
|
||||||
|
cmds, err := node.Client.Command().Result()
|
||||||
|
if err == nil {
|
||||||
|
c.cmds = cmds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
slots, err := node.Client.ClusterSlots().Result()
|
slots, err := node.Client.ClusterSlots().Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logf("ClusterSlots on addr=%q failed: %s", node.Client.getAddr(), err)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
state, err := newClusterState(c.nodes, slots)
|
||||||
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c._state.Store(state)
|
||||||
c.setSlots(slots)
|
|
||||||
if c.opt.RouteByLatency {
|
|
||||||
c.setNodesLatency()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClusterClient) setNodesLatency() {
|
|
||||||
const n = 10
|
|
||||||
for _, node := range c.getNodes() {
|
|
||||||
var latency time.Duration
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
t1 := time.Now()
|
|
||||||
node.Client.Ping()
|
|
||||||
latency += time.Since(t1)
|
|
||||||
}
|
|
||||||
node.Latency = latency / n
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -540,8 +575,8 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for _ = range ticker.C {
|
for _ = range ticker.C {
|
||||||
nodes := c.getNodes()
|
nodes, err := c.nodes.All()
|
||||||
if nodes == nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -584,9 +619,10 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state := c.state()
|
||||||
cmdsMap := make(map[*clusterNode][]Cmder)
|
cmdsMap := make(map[*clusterNode][]Cmder)
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
_, node, err := c.cmdSlotAndNode(cmd)
|
_, node, err := c.cmdSlotAndNode(state, cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
setRetErr(err)
|
setRetErr(err)
|
||||||
|
@ -599,16 +635,6 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
|
||||||
failedCmds := make(map[*clusterNode][]Cmder)
|
failedCmds := make(map[*clusterNode][]Cmder)
|
||||||
|
|
||||||
for node, cmds := range cmdsMap {
|
for node, cmds := range cmdsMap {
|
||||||
if node == nil {
|
|
||||||
var err error
|
|
||||||
node, err = c.randomNode()
|
|
||||||
if err != nil {
|
|
||||||
setCmdsErr(cmds, err)
|
|
||||||
setRetErr(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cn, _, err := node.Client.conn()
|
cn, _, err := node.Client.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
|
@ -660,7 +686,7 @@ func (c *ClusterClient) execClusterCmds(
|
||||||
if moved {
|
if moved {
|
||||||
c.lazyReloadSlots()
|
c.lazyReloadSlots()
|
||||||
|
|
||||||
node, err := c.nodeByAddr(addr)
|
node, err := c.nodes.Get(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setRetErr(err)
|
setRetErr(err)
|
||||||
continue
|
continue
|
||||||
|
@ -669,7 +695,7 @@ func (c *ClusterClient) execClusterCmds(
|
||||||
cmd.reset()
|
cmd.reset()
|
||||||
failedCmds[node] = append(failedCmds[node], cmd)
|
failedCmds[node] = append(failedCmds[node], cmd)
|
||||||
} else if ask {
|
} else if ask {
|
||||||
node, err := c.nodeByAddr(addr)
|
node, err := c.nodes.Get(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setRetErr(err)
|
setRetErr(err)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *ClusterClient) SlotAddrs(slot int) []string {
|
|
||||||
var addrs []string
|
|
||||||
for _, n := range c.slotNodes(slot) {
|
|
||||||
addrs = append(addrs, n.Client.getAddr())
|
|
||||||
}
|
|
||||||
return addrs
|
|
||||||
}
|
|
||||||
|
|
||||||
// SwapSlot swaps a slot's master/slave address
|
|
||||||
// for testing MOVED redirects
|
|
||||||
func (c *ClusterClient) SwapSlotNodes(slot int) []string {
|
|
||||||
c.mu.Lock()
|
|
||||||
nodes := c.slots[slot]
|
|
||||||
nodes[0], nodes[1] = nodes[1], nodes[0]
|
|
||||||
c.mu.Unlock()
|
|
||||||
return c.SlotAddrs(slot)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ = Describe("ClusterClient", func() {
|
|
||||||
var subject *ClusterClient
|
|
||||||
|
|
||||||
var populate = func() {
|
|
||||||
subject.setSlots([]ClusterSlot{
|
|
||||||
{0, 4095, []ClusterNode{{"", "127.0.0.1:7000"}, {"", "127.0.0.1:7004"}}},
|
|
||||||
{12288, 16383, []ClusterNode{{"", "127.0.0.1:7003"}, {"", "127.0.0.1:7007"}}},
|
|
||||||
{4096, 8191, []ClusterNode{{"", "127.0.0.1:7001"}, {"", "127.0.0.1:7005"}}},
|
|
||||||
{8192, 12287, []ClusterNode{{"", "127.0.0.1:7002"}, {"", "127.0.0.1:7006"}}},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
BeforeEach(func() {
|
|
||||||
subject = NewClusterClient(&ClusterOptions{
|
|
||||||
Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
AfterEach(func() {
|
|
||||||
_ = subject.Close()
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should initialize", func() {
|
|
||||||
Expect(subject.addrs).To(HaveLen(3))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should update slots cache", func() {
|
|
||||||
populate()
|
|
||||||
Expect(subject.slots[0][0].Client.getAddr()).To(Equal("127.0.0.1:7000"))
|
|
||||||
Expect(subject.slots[0][1].Client.getAddr()).To(Equal("127.0.0.1:7004"))
|
|
||||||
Expect(subject.slots[4095][0].Client.getAddr()).To(Equal("127.0.0.1:7000"))
|
|
||||||
Expect(subject.slots[4095][1].Client.getAddr()).To(Equal("127.0.0.1:7004"))
|
|
||||||
Expect(subject.slots[4096][0].Client.getAddr()).To(Equal("127.0.0.1:7001"))
|
|
||||||
Expect(subject.slots[4096][1].Client.getAddr()).To(Equal("127.0.0.1:7005"))
|
|
||||||
Expect(subject.slots[8191][0].Client.getAddr()).To(Equal("127.0.0.1:7001"))
|
|
||||||
Expect(subject.slots[8191][1].Client.getAddr()).To(Equal("127.0.0.1:7005"))
|
|
||||||
Expect(subject.slots[8192][0].Client.getAddr()).To(Equal("127.0.0.1:7002"))
|
|
||||||
Expect(subject.slots[8192][1].Client.getAddr()).To(Equal("127.0.0.1:7006"))
|
|
||||||
Expect(subject.slots[12287][0].Client.getAddr()).To(Equal("127.0.0.1:7002"))
|
|
||||||
Expect(subject.slots[12287][1].Client.getAddr()).To(Equal("127.0.0.1:7006"))
|
|
||||||
Expect(subject.slots[12288][0].Client.getAddr()).To(Equal("127.0.0.1:7003"))
|
|
||||||
Expect(subject.slots[12288][1].Client.getAddr()).To(Equal("127.0.0.1:7007"))
|
|
||||||
Expect(subject.slots[16383][0].Client.getAddr()).To(Equal("127.0.0.1:7003"))
|
|
||||||
Expect(subject.slots[16383][1].Client.getAddr()).To(Equal("127.0.0.1:7007"))
|
|
||||||
Expect(subject.addrs).To(Equal([]string{
|
|
||||||
"127.0.0.1:6379",
|
|
||||||
"127.0.0.1:7003",
|
|
||||||
"127.0.0.1:7006",
|
|
||||||
"127.0.0.1:7000",
|
|
||||||
"127.0.0.1:7004",
|
|
||||||
"127.0.0.1:7007",
|
|
||||||
"127.0.0.1:7001",
|
|
||||||
"127.0.0.1:7005",
|
|
||||||
"127.0.0.1:7002",
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should close", func() {
|
|
||||||
populate()
|
|
||||||
Expect(subject.Close()).NotTo(HaveOccurred())
|
|
||||||
Expect(subject.addrs).To(BeEmpty())
|
|
||||||
Expect(subject.nodes).To(BeEmpty())
|
|
||||||
Expect(subject.slots).To(BeEmpty())
|
|
||||||
Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed"))
|
|
||||||
})
|
|
||||||
})
|
|
|
@ -17,3 +17,18 @@ func (c *PubSub) Pool() pool.Pooler {
|
||||||
func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) {
|
func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) {
|
||||||
return c.receiveMessage(timeout)
|
return c.receiveMessage(timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) SlotAddrs(slot int) []string {
|
||||||
|
var addrs []string
|
||||||
|
for _, n := range c.state().slotNodes(slot) {
|
||||||
|
addrs = append(addrs, n.Client.getAddr())
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
|
||||||
|
func (c *ClusterClient) SwapSlotNodes(slot int) []string {
|
||||||
|
nodes := c.state().slots[slot]
|
||||||
|
nodes[0], nodes[1] = nodes[1], nodes[0]
|
||||||
|
return c.SlotAddrs(slot)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue