Merge pull request #524 from go-redis/fix/nil-state

Don't panic if cluster state is nil.
This commit is contained in:
Vladimir Mihailenco 2017-03-04 14:38:25 +03:00 committed by GitHub
commit a16aeec10f
2 changed files with 65 additions and 38 deletions

View File

@ -14,6 +14,7 @@ import (
) )
var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes") var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be // ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient. // passed to NewClusterClient.
@ -355,7 +356,14 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
_, _ = c.nodes.Get(addr) _, _ = c.nodes.Get(addr)
} }
c.reloadSlots() // Preload cluster slots.
for i := 0; i < 10; i++ {
state, err := c.reloadSlots()
if err == nil {
c._state.Store(state)
break
}
}
if opt.IdleCheckFrequency > 0 { if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency) go c.reaper(opt.IdleCheckFrequency)
@ -366,10 +374,11 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
func (c *ClusterClient) state() *clusterState { func (c *ClusterClient) state() *clusterState {
v := c._state.Load() v := c._state.Load()
if v == nil { if v != nil {
return nil return v.(*clusterState)
} }
return v.(*clusterState) c.lazyReloadSlots()
return nil
} }
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
@ -397,10 +406,12 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
} }
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
state := c.state()
var node *clusterNode var node *clusterNode
var err error var err error
if len(keys) > 0 { if state != nil && len(keys) > 0 {
node, err = c.state().slotMasterNode(hashtag.Slot(keys[0])) node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
} else { } else {
node, err = c.nodes.Random() node, err = c.nodes.Random()
} }
@ -463,8 +474,9 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var addr string var addr string
moved, ask, addr = internal.IsMovedError(err) moved, ask, addr = internal.IsMovedError(err)
if moved || ask { if moved || ask {
if slot >= 0 { state := c.state()
master, _ := c.state().slotMasterNode(slot) if state != nil && slot >= 0 {
master, _ := state.slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) { if moved && (master == nil || master.Client.getAddr() != addr) {
c.lazyReloadSlots() c.lazyReloadSlots()
} }
@ -523,7 +535,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state := c.state() state := c.state()
if state == nil { if state == nil {
return nil return errNilClusterState
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -564,12 +576,13 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// PoolStats returns accumulated connection pool stats. // PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats { func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
nodes, err := c.nodes.All() nodes, err := c.nodes.All()
if err != nil { if err != nil {
return nil return &acc
} }
var acc PoolStats
for _, node := range nodes { for _, node := range nodes {
s := node.Client.connPool.Stats() s := node.Client.connPool.Stats()
acc.Requests += s.Requests acc.Requests += s.Requests
@ -585,37 +598,46 @@ func (c *ClusterClient) lazyReloadSlots() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return return
} }
go func() { go func() {
c.reloadSlots() for i := 0; i < 1000; i++ {
state, err := c.reloadSlots()
if err == pool.ErrClosed {
break
}
if err == nil {
c._state.Store(state)
break
}
time.Sleep(time.Millisecond)
}
time.Sleep(3 * time.Second)
atomic.StoreUint32(&c.reloading, 0) atomic.StoreUint32(&c.reloading, 0)
}() }()
} }
func (c *ClusterClient) reloadSlots() { func (c *ClusterClient) reloadSlots() (*clusterState, error) {
for i := 0; i < 10; i++ { node, err := c.nodes.Random()
node, err := c.nodes.Random() if err != nil {
if err != nil { return nil, err
return
}
if c.cmds == nil {
cmds, err := node.Client.Command().Result()
if err == nil {
c.cmds = cmds
}
}
slots, err := node.Client.ClusterSlots().Result()
if err != nil {
continue
}
state, err := newClusterState(c.nodes, slots)
if err != nil {
return
}
c._state.Store(state)
} }
// TODO: fix race
if c.cmds == nil {
cmds, err := node.Client.Command().Result()
if err != nil {
return nil, err
}
c.cmds = cmds
}
slots, err := node.Client.ClusterSlots().Result()
if err != nil {
return nil, err
}
return newClusterState(c.nodes, slots)
} }
// reaper closes idle connections to the cluster. // reaper closes idle connections to the cluster.
@ -789,8 +811,13 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
return err return err
} }
state := c.state()
if state == nil {
return errNilClusterState
}
for slot, cmds := range cmdsMap { for slot, cmds := range cmdsMap {
node, err := c.state().slotMasterNode(slot) node, err := state.slotMasterNode(slot)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
continue continue

View File

@ -578,7 +578,7 @@ var _ = Describe("ClusterClient timeout", func() {
var client *redis.ClusterClient var client *redis.ClusterClient
AfterEach(func() { AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred()) _ = client.Close()
}) })
testTimeout := func() { testTimeout := func() {