Merge pull request #768 from go-redis/fix/for-each-reload-state

cluster: reload state for ForEach functions and every 1 minute
This commit is contained in:
Vladimir Mihailenco 2018-05-21 17:18:30 +03:00 committed by GitHub
commit e3d9f9d1c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 17 deletions

View File

@ -391,6 +391,7 @@ type clusterState struct {
slots [][]*clusterNode
generation uint32
createdAt time.Time
}
func newClusterState(
@ -398,9 +399,11 @@ func newClusterState(
) (*clusterState, error) {
c := clusterState{
nodes: nodes,
generation: nodes.NextGeneration(),
slots: make([][]*clusterNode, hashtag.SlotNumber),
generation: nodes.NextGeneration(),
createdAt: time.Now(),
}
isLoopbackOrigin := isLoopbackAddr(origin)
@ -534,8 +537,8 @@ type clusterStateHolder struct {
state atomic.Value
lastErrMu sync.RWMutex
lastErr error
firstErrMu sync.RWMutex
firstErr error
reloading uint32 // atomic
}
@ -560,9 +563,11 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) {
func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
c.lastErrMu.Lock()
c.lastErr = err
c.lastErrMu.Unlock()
c.firstErrMu.Lock()
if c.firstErr == nil {
c.firstErr = err
}
c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@ -592,12 +597,16 @@ func (c *clusterStateHolder) LazyReload() {
func (c *clusterStateHolder) Get() (*clusterState, error) {
v := c.state.Load()
if v != nil {
return v.(*clusterState), nil
state := v.(*clusterState)
if time.Since(state.createdAt) > time.Minute {
c.LazyReload()
}
return state, nil
}
c.lastErrMu.RLock()
err := c.lastErr
c.lastErrMu.RUnlock()
c.firstErrMu.RLock()
err := c.firstErr
c.firstErrMu.RUnlock()
if err != nil {
return nil, err
}
@ -930,10 +939,13 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state, err := c.state.Get()
state, err := c.state.Reload()
if err != nil {
state, err = c.state.Get()
if err != nil {
return err
}
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
@ -963,10 +975,13 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state, err := c.state.Get()
state, err := c.state.Reload()
if err != nil {
state, err = c.state.Get()
if err != nil {
return err
}
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
@ -996,10 +1011,13 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
state, err := c.state.Get()
state, err := c.state.Reload()
if err != nil {
state, err = c.state.Get()
if err != nil {
return err
}
}
var wg sync.WaitGroup
errCh := make(chan error, 1)