From 382feca78479faa1e4ce1ea8b48d222db5eeb7de Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 13 Dec 2018 12:26:02 +0200 Subject: [PATCH] clusterStateHolder.Get should load a state when there is none --- cluster.go | 46 ++++++++-------------------------------------- cluster_test.go | 41 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/cluster.go b/cluster.go index 4c1fd73..af2b057 100644 --- a/cluster.go +++ b/cluster.go @@ -3,7 +3,6 @@ package redis import ( "context" "crypto/tls" - "errors" "fmt" "math" "math/rand" @@ -594,11 +593,7 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { type clusterStateHolder struct { load func() (*clusterState, error) - state atomic.Value - - firstErrMu sync.RWMutex - firstErr error - + state atomic.Value reloading uint32 // atomic } @@ -609,21 +604,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder } func (c *clusterStateHolder) Reload() (*clusterState, error) { - state, err := c.reload() - if err != nil { - return nil, err - } - return state, nil -} - -func (c *clusterStateHolder) reload() (*clusterState, error) { state, err := c.load() if err != nil { - c.firstErrMu.Lock() - if c.firstErr == nil { - c.firstErr = err - } - c.firstErrMu.Unlock() return nil, err } c.state.Store(state) @@ -637,7 +619,7 @@ func (c *clusterStateHolder) LazyReload() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - _, err := c.reload() + _, err := c.Reload() if err != nil { return } @@ -654,15 +636,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) { } return state, nil } - - c.firstErrMu.RLock() - err := c.firstErr - c.firstErrMu.RUnlock() - if err != nil { - return nil, err - } - - return nil, errors.New("redis: cluster has no state") + return c.Reload() } func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { @@ -710,10 +684,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c.processTxPipeline = c.defaultProcessTxPipeline c.init() - - _, _ = c.state.Reload() - _, _ = c.cmdsInfoCache.Get() - if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -721,17 +691,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } -// ReloadState reloads cluster state. It calls ClusterSlots func +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + +// ReloadState reloads cluster state. If available it calls ClusterSlots func // to get cluster slots information. func (c *ClusterClient) ReloadState() error { _, err := c.state.Reload() return err } -func (c *ClusterClient) init() { - c.cmdable.setProcessor(c.Process) -} - func (c *ClusterClient) Context() context.Context { if c.ctx != nil { return c.ctx diff --git a/cluster_test.go b/cluster_test.go index cdbb733..d39ebd0 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -48,9 +48,14 @@ func (s *clusterScenario) addrs() []string { return addrs } -func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient { +func (s *clusterScenario) clusterClientUnsafe(opt *redis.ClusterOptions) *redis.ClusterClient { opt.Addrs = s.addrs() - client := redis.NewClusterClient(opt) + return redis.NewClusterClient(opt) + +} + +func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient { + client := s.clusterClientUnsafe(opt) err := eventually(func() error { if opt.ClusterSlots != nil { @@ -932,6 +937,36 @@ var _ = Describe("ClusterClient without valid nodes", func() { }) }) +var _ = Describe("ClusterClient with unavailable Cluster", func() { + var client *redis.ClusterClient + + BeforeEach(func() { + for _, node := range cluster.clients { + err := node.ClientPause(5 * time.Second).Err() + Expect(err).NotTo(HaveOccurred()) + } + + opt := redisClusterOptions() + opt.ReadTimeout = 250 * time.Millisecond + opt.WriteTimeout = 250 * time.Millisecond + opt.MaxRedirects = 1 + client = cluster.clusterClientUnsafe(opt) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("recovers when Cluster recovers", func() { + err := client.Ping().Err() + Expect(err).To(HaveOccurred()) + + Eventually(func() error { + return client.Ping().Err() + }, "30s").ShouldNot(HaveOccurred()) + }) +}) + var _ = Describe("ClusterClient timeout", func() { var client *redis.ClusterClient @@ -976,7 +1011,7 @@ var _ = Describe("ClusterClient timeout", func() { }) } - const pause = 3 * time.Second + const pause = 5 * time.Second Context("read/write timeout", func() { BeforeEach(func() {