forked from mirror/redis
clusterStateHolder.Get should load a state when there is none
This commit is contained in:
parent
f18a97fc94
commit
382feca784
46
cluster.go
46
cluster.go
|
@ -3,7 +3,6 @@ package redis
|
|||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
|
@ -594,11 +593,7 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
|
|||
type clusterStateHolder struct {
|
||||
load func() (*clusterState, error)
|
||||
|
||||
state atomic.Value
|
||||
|
||||
firstErrMu sync.RWMutex
|
||||
firstErr error
|
||||
|
||||
state atomic.Value
|
||||
reloading uint32 // atomic
|
||||
}
|
||||
|
||||
|
@ -609,21 +604,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
|
|||
}
|
||||
|
||||
func (c *clusterStateHolder) Reload() (*clusterState, error) {
|
||||
state, err := c.reload()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func (c *clusterStateHolder) reload() (*clusterState, error) {
|
||||
state, err := c.load()
|
||||
if err != nil {
|
||||
c.firstErrMu.Lock()
|
||||
if c.firstErr == nil {
|
||||
c.firstErr = err
|
||||
}
|
||||
c.firstErrMu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
c.state.Store(state)
|
||||
|
@ -637,7 +619,7 @@ func (c *clusterStateHolder) LazyReload() {
|
|||
go func() {
|
||||
defer atomic.StoreUint32(&c.reloading, 0)
|
||||
|
||||
_, err := c.reload()
|
||||
_, err := c.Reload()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -654,15 +636,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
|
|||
}
|
||||
return state, nil
|
||||
}
|
||||
|
||||
c.firstErrMu.RLock()
|
||||
err := c.firstErr
|
||||
c.firstErrMu.RUnlock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, errors.New("redis: cluster has no state")
|
||||
return c.Reload()
|
||||
}
|
||||
|
||||
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
|
||||
|
@ -710,10 +684,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
|||
c.processTxPipeline = c.defaultProcessTxPipeline
|
||||
|
||||
c.init()
|
||||
|
||||
_, _ = c.state.Reload()
|
||||
_, _ = c.cmdsInfoCache.Get()
|
||||
|
||||
if opt.IdleCheckFrequency > 0 {
|
||||
go c.reaper(opt.IdleCheckFrequency)
|
||||
}
|
||||
|
@ -721,17 +691,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
|||
return c
|
||||
}
|
||||
|
||||
// ReloadState reloads cluster state. It calls ClusterSlots func
|
||||
func (c *ClusterClient) init() {
|
||||
c.cmdable.setProcessor(c.Process)
|
||||
}
|
||||
|
||||
// ReloadState reloads cluster state. If available it calls ClusterSlots func
|
||||
// to get cluster slots information.
|
||||
func (c *ClusterClient) ReloadState() error {
|
||||
_, err := c.state.Reload()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ClusterClient) init() {
|
||||
c.cmdable.setProcessor(c.Process)
|
||||
}
|
||||
|
||||
func (c *ClusterClient) Context() context.Context {
|
||||
if c.ctx != nil {
|
||||
return c.ctx
|
||||
|
|
|
@ -48,9 +48,14 @@ func (s *clusterScenario) addrs() []string {
|
|||
return addrs
|
||||
}
|
||||
|
||||
func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
|
||||
func (s *clusterScenario) clusterClientUnsafe(opt *redis.ClusterOptions) *redis.ClusterClient {
|
||||
opt.Addrs = s.addrs()
|
||||
client := redis.NewClusterClient(opt)
|
||||
return redis.NewClusterClient(opt)
|
||||
|
||||
}
|
||||
|
||||
func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
|
||||
client := s.clusterClientUnsafe(opt)
|
||||
|
||||
err := eventually(func() error {
|
||||
if opt.ClusterSlots != nil {
|
||||
|
@ -932,6 +937,36 @@ var _ = Describe("ClusterClient without valid nodes", func() {
|
|||
})
|
||||
})
|
||||
|
||||
var _ = Describe("ClusterClient with unavailable Cluster", func() {
|
||||
var client *redis.ClusterClient
|
||||
|
||||
BeforeEach(func() {
|
||||
for _, node := range cluster.clients {
|
||||
err := node.ClientPause(5 * time.Second).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
opt := redisClusterOptions()
|
||||
opt.ReadTimeout = 250 * time.Millisecond
|
||||
opt.WriteTimeout = 250 * time.Millisecond
|
||||
opt.MaxRedirects = 1
|
||||
client = cluster.clusterClientUnsafe(opt)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("recovers when Cluster recovers", func() {
|
||||
err := client.Ping().Err()
|
||||
Expect(err).To(HaveOccurred())
|
||||
|
||||
Eventually(func() error {
|
||||
return client.Ping().Err()
|
||||
}, "30s").ShouldNot(HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("ClusterClient timeout", func() {
|
||||
var client *redis.ClusterClient
|
||||
|
||||
|
@ -976,7 +1011,7 @@ var _ = Describe("ClusterClient timeout", func() {
|
|||
})
|
||||
}
|
||||
|
||||
const pause = 3 * time.Second
|
||||
const pause = 5 * time.Second
|
||||
|
||||
Context("read/write timeout", func() {
|
||||
BeforeEach(func() {
|
||||
|
|
Loading…
Reference in New Issue