diff --git a/cluster.go b/cluster.go index d24438fc..c08c00db 100644 --- a/cluster.go +++ b/cluster.go @@ -391,16 +391,19 @@ type clusterState struct { slots [][]*clusterNode generation uint32 + createdAt time.Time } func newClusterState( nodes *clusterNodes, slots []ClusterSlot, origin string, ) (*clusterState, error) { c := clusterState{ - nodes: nodes, - generation: nodes.NextGeneration(), + nodes: nodes, slots: make([][]*clusterNode, hashtag.SlotNumber), + + generation: nodes.NextGeneration(), + createdAt: time.Now(), } isLoopbackOrigin := isLoopbackAddr(origin) @@ -534,8 +537,8 @@ type clusterStateHolder struct { state atomic.Value - lastErrMu sync.RWMutex - lastErr error + firstErrMu sync.RWMutex + firstErr error reloading uint32 // atomic } @@ -560,9 +563,11 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) { func (c *clusterStateHolder) reload() (*clusterState, error) { state, err := c.load() if err != nil { - c.lastErrMu.Lock() - c.lastErr = err - c.lastErrMu.Unlock() + c.firstErrMu.Lock() + if c.firstErr == nil { + c.firstErr = err + } + c.firstErrMu.Unlock() return nil, err } c.state.Store(state) @@ -592,12 +597,16 @@ func (c *clusterStateHolder) LazyReload() { func (c *clusterStateHolder) Get() (*clusterState, error) { v := c.state.Load() if v != nil { - return v.(*clusterState), nil + state := v.(*clusterState) + if time.Since(state.createdAt) > time.Minute { + c.LazyReload() + } + return state, nil } - c.lastErrMu.RLock() - err := c.lastErr - c.lastErrMu.RUnlock() + c.firstErrMu.RLock() + err := c.firstErr + c.firstErrMu.RUnlock() if err != nil { return nil, err } @@ -930,9 +939,12 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state, err := c.state.Get() + state, err := c.state.Reload() if err != nil { - return err + state, err = c.state.Get() + if err != nil { + return err + } } var wg sync.WaitGroup @@ -963,9 +975,12 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state, err := c.state.Get() + state, err := c.state.Reload() if err != nil { - return err + state, err = c.state.Get() + if err != nil { + return err + } } var wg sync.WaitGroup @@ -996,9 +1011,12 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state, err := c.state.Get() + state, err := c.state.Reload() if err != nil { - return err + state, err = c.state.Get() + if err != nil { + return err + } } var wg sync.WaitGroup