Merge branch 'master' of github.com:go-redis/redis

This commit is contained in:
Vladimir Mihailenco 2018-06-29 11:14:17 +03:00
commit 5db3ab16e5
4 changed files with 146 additions and 25 deletions

View File

@ -30,7 +30,7 @@ type ClusterOptions struct {
// The maximum number of retries before giving up. Command is retried // The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects. // on network errors and MOVED/ASK redirects.
// Default is 8. // Default is 8 retries.
MaxRedirects int MaxRedirects int
// Enables read-only commands on slave nodes. // Enables read-only commands on slave nodes.
@ -39,8 +39,14 @@ type ClusterOptions struct {
// It automatically enables ReadOnly. // It automatically enables ReadOnly.
RouteByLatency bool RouteByLatency bool
// Allows routing read-only commands to the random master or slave node. // Allows routing read-only commands to the random master or slave node.
// It automatically enables ReadOnly.
RouteRandomly bool RouteRandomly bool
// Optional function that is used to load cluster slots information.
// It is useful to manually create cluster of standalone Redis servers
// or load-balance read/write operations between master and slaves.
ClusterSlots func() ([]ClusterSlot, error)
// Following options are copied from Options struct. // Following options are copied from Options struct.
OnConnect func(*Conn) error OnConnect func(*Conn) error
@ -70,7 +76,7 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8 opt.MaxRedirects = 8
} }
if opt.RouteByLatency { if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true opt.ReadOnly = true
} }
@ -160,10 +166,6 @@ func (n *clusterNode) Close() error {
return n.Client.Close() return n.Client.Close()
} }
func (n *clusterNode) Test() error {
return n.Client.ClusterInfo().Err()
}
func (n *clusterNode) updateLatency() { func (n *clusterNode) updateLatency() {
const probes = 10 const probes = 10
@ -330,7 +332,7 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) { v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr) node := newClusterNode(c.opt, addr)
return node, node.Test() return node, nil
}) })
c.mu.Lock() c.mu.Lock()
@ -509,6 +511,10 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
} }
func (c *clusterState) IsConsistent() bool { func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil {
return true
}
if len(c.Masters) > len(c.Slaves) { if len(c.Masters) > len(c.Slaves) {
return false return false
} }
@ -614,6 +620,14 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
return nil, errors.New("redis: cluster has no state") return nil, errors.New("redis: cluster has no state")
} }
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
state, err := c.Reload()
if err == nil {
return state, nil
}
return c.Get()
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// ClusterClient is a Redis Cluster client representing a pool of zero // ClusterClient is a Redis Cluster client representing a pool of zero
@ -662,6 +676,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c return c
} }
// ReloadState loads cluster slots information to update cluster topography.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
func (c *ClusterClient) init() { func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process) c.cmdable.setProcessor(c.Process)
} }
@ -946,12 +966,9 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster. // ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any. // It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state, err := c.state.Reload() state, err := c.state.ReloadOrGet()
if err != nil { if err != nil {
state, err = c.state.Get() return err
if err != nil {
return err
}
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -982,12 +999,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster. // ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any. // It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state, err := c.state.Reload() state, err := c.state.ReloadOrGet()
if err != nil { if err != nil {
state, err = c.state.Get() return err
if err != nil {
return err
}
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -1018,12 +1032,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster. // ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any. // It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
state, err := c.state.Reload() state, err := c.state.ReloadOrGet()
if err != nil { if err != nil {
state, err = c.state.Get() return err
if err != nil {
return err
}
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -1092,6 +1103,14 @@ func (c *ClusterClient) PoolStats() *PoolStats {
} }
func (c *ClusterClient) loadState() (*clusterState, error) { func (c *ClusterClient) loadState() (*clusterState, error) {
if c.opt.ClusterSlots != nil {
slots, err := c.opt.ClusterSlots()
if err != nil {
return nil, err
}
return newClusterState(c.nodes, slots, "")
}
addrs, err := c.nodes.Addrs() addrs, err := c.nodes.Addrs()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -310,7 +310,8 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
for _, master := range cluster.masters() { client.ForEachMaster(func(master *redis.Client) error {
defer GinkgoRecover()
Eventually(func() string { Eventually(func() string {
return master.Info("keyspace").Val() return master.Info("keyspace").Val()
}, 30*time.Second).Should(Or( }, 30*time.Second).Should(Or(
@ -318,7 +319,8 @@ var _ = Describe("ClusterClient", func() {
ContainSubstring("keys=29"), ContainSubstring("keys=29"),
ContainSubstring("keys=40"), ContainSubstring("keys=40"),
)) ))
} return nil
})
}) })
It("supports Watch", func() { It("supports Watch", func() {
@ -750,6 +752,59 @@ var _ = Describe("ClusterClient", func() {
assertClusterClient() assertClusterClient()
}) })
Describe("ClusterClient with ClusterSlots", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard1Port,
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard2Port,
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard3Port,
}},
}}
return slots, nil
}
client = cluster.clusterClient(opt)
err := client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(func(slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize().Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
}) })
var _ = Describe("ClusterClient without nodes", func() { var _ = Describe("ClusterClient without nodes", func() {

View File

@ -71,6 +71,48 @@ func ExampleNewClusterClient() {
client.Ping() client.Ping()
} }
// Following example creates a cluster from 2 master nodes and 2 slave nodes
// without using cluster mode or Redis Sentinel.
func ExampleNewClusterClient_manualSetup() {
loadClusterSlots := func() ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{
// First node with 1 master and 1 slave.
{
Start: 0,
End: 8191,
Nodes: []redis.ClusterNode{{
Addr: ":7000", // master
}, {
Addr: ":8000", // 1st slave
}},
},
// Second node with 1 master and 1 slave.
{
Start: 8192,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":7001", // master
}, {
Addr: ":8001", // 1st slave
}},
},
}
return slots, nil
}
client := redis.NewClusterClient(&redis.ClusterOptions{
ClusterSlots: loadClusterSlots,
RouteRandomly: true,
})
client.Ping()
// ReloadState can be used to update cluster topography.
err := client.ReloadState()
if err != nil {
panic(err)
}
}
func ExampleNewRing() { func ExampleNewRing() {
client := redis.NewRing(&redis.RingOptions{ client := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{ Addrs: map[string]string{

View File

@ -27,6 +27,7 @@ const (
const ( const (
ringShard1Port = "6390" ringShard1Port = "6390"
ringShard2Port = "6391" ringShard2Port = "6391"
ringShard3Port = "6392"
) )
const ( const (
@ -39,7 +40,7 @@ const (
var ( var (
redisMain *redisProcess redisMain *redisProcess
ringShard1, ringShard2 *redisProcess ringShard1, ringShard2, ringShard3 *redisProcess
sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
) )
@ -62,6 +63,9 @@ var _ = BeforeSuite(func() {
ringShard2, err = startRedis(ringShard2Port) ringShard2, err = startRedis(ringShard2Port)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
ringShard3, err = startRedis(ringShard3Port)
Expect(err).NotTo(HaveOccurred())
sentinelMaster, err = startRedis(sentinelMasterPort) sentinelMaster, err = startRedis(sentinelMasterPort)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -84,6 +88,7 @@ var _ = AfterSuite(func() {
Expect(ringShard1.Close()).NotTo(HaveOccurred()) Expect(ringShard1.Close()).NotTo(HaveOccurred())
Expect(ringShard2.Close()).NotTo(HaveOccurred()) Expect(ringShard2.Close()).NotTo(HaveOccurred())
Expect(ringShard3.Close()).NotTo(HaveOccurred())
Expect(sentinel.Close()).NotTo(HaveOccurred()) Expect(sentinel.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave1.Close()).NotTo(HaveOccurred()) Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())