cluster: reload state for ForEach functions and every 1 minute

This commit is contained in:
Jeffrey Hutchins 2018-05-21 16:33:41 +03:00 committed by Vladimir Mihailenco
parent 5c742fff78
commit 471caa3d91
1 changed files with 35 additions and 17 deletions

View File

@ -391,16 +391,19 @@ type clusterState struct {
slots [][]*clusterNode
generation uint32
createdAt time.Time
}
func newClusterState(
nodes *clusterNodes, slots []ClusterSlot, origin string,
) (*clusterState, error) {
c := clusterState{
nodes: nodes,
generation: nodes.NextGeneration(),
nodes: nodes,
slots: make([][]*clusterNode, hashtag.SlotNumber),
generation: nodes.NextGeneration(),
createdAt: time.Now(),
}
isLoopbackOrigin := isLoopbackAddr(origin)
@ -534,8 +537,8 @@ type clusterStateHolder struct {
state atomic.Value
lastErrMu sync.RWMutex
lastErr error
firstErrMu sync.RWMutex
firstErr error
reloading uint32 // atomic
}
@ -560,9 +563,11 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) {
func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
c.lastErrMu.Lock()
c.lastErr = err
c.lastErrMu.Unlock()
c.firstErrMu.Lock()
if c.firstErr == nil {
c.firstErr = err
}
c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@ -592,12 +597,16 @@ func (c *clusterStateHolder) LazyReload() {
func (c *clusterStateHolder) Get() (*clusterState, error) {
v := c.state.Load()
if v != nil {
return v.(*clusterState), nil
state := v.(*clusterState)
if time.Since(state.createdAt) > time.Minute {
c.LazyReload()
}
return state, nil
}
c.lastErrMu.RLock()
err := c.lastErr
c.lastErrMu.RUnlock()
c.firstErrMu.RLock()
err := c.firstErr
c.firstErrMu.RUnlock()
if err != nil {
return nil, err
}
@ -930,9 +939,12 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state, err := c.state.Get()
state, err := c.state.Reload()
if err != nil {
return err
state, err = c.state.Get()
if err != nil {
return err
}
}
var wg sync.WaitGroup
@ -963,9 +975,12 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state, err := c.state.Get()
state, err := c.state.Reload()
if err != nil {
return err
state, err = c.state.Get()
if err != nil {
return err
}
}
var wg sync.WaitGroup
@ -996,9 +1011,12 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
state, err := c.state.Get()
state, err := c.state.Reload()
if err != nil {
return err
state, err = c.state.Get()
if err != nil {
return err
}
}
var wg sync.WaitGroup