forked from mirror/redis
Merge pull request #800 from go-redis/fix/cluster-client-manual-setup
cluster: add manual setup
This commit is contained in:
commit
ca660cd915
57
cluster.go
57
cluster.go
|
@ -30,7 +30,7 @@ type ClusterOptions struct {
|
||||||
|
|
||||||
// The maximum number of retries before giving up. Command is retried
|
// The maximum number of retries before giving up. Command is retried
|
||||||
// on network errors and MOVED/ASK redirects.
|
// on network errors and MOVED/ASK redirects.
|
||||||
// Default is 8.
|
// Default is 8 retries.
|
||||||
MaxRedirects int
|
MaxRedirects int
|
||||||
|
|
||||||
// Enables read-only commands on slave nodes.
|
// Enables read-only commands on slave nodes.
|
||||||
|
@ -39,8 +39,14 @@ type ClusterOptions struct {
|
||||||
// It automatically enables ReadOnly.
|
// It automatically enables ReadOnly.
|
||||||
RouteByLatency bool
|
RouteByLatency bool
|
||||||
// Allows routing read-only commands to the random master or slave node.
|
// Allows routing read-only commands to the random master or slave node.
|
||||||
|
// It automatically enables ReadOnly.
|
||||||
RouteRandomly bool
|
RouteRandomly bool
|
||||||
|
|
||||||
|
// Optional function that is used to load cluster slots information.
|
||||||
|
// It is useful to manually create cluster of standalone Redis servers
|
||||||
|
// or load-balance read/write operations between master and slaves.
|
||||||
|
ClusterSlots func() ([]ClusterSlot, error)
|
||||||
|
|
||||||
// Following options are copied from Options struct.
|
// Following options are copied from Options struct.
|
||||||
|
|
||||||
OnConnect func(*Conn) error
|
OnConnect func(*Conn) error
|
||||||
|
@ -70,7 +76,7 @@ func (opt *ClusterOptions) init() {
|
||||||
opt.MaxRedirects = 8
|
opt.MaxRedirects = 8
|
||||||
}
|
}
|
||||||
|
|
||||||
if opt.RouteByLatency {
|
if opt.RouteByLatency || opt.RouteRandomly {
|
||||||
opt.ReadOnly = true
|
opt.ReadOnly = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,10 +166,6 @@ func (n *clusterNode) Close() error {
|
||||||
return n.Client.Close()
|
return n.Client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Test() error {
|
|
||||||
return n.Client.ClusterInfo().Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *clusterNode) updateLatency() {
|
func (n *clusterNode) updateLatency() {
|
||||||
const probes = 10
|
const probes = 10
|
||||||
|
|
||||||
|
@ -330,7 +332,7 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
|
||||||
|
|
||||||
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
|
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
|
||||||
node := newClusterNode(c.opt, addr)
|
node := newClusterNode(c.opt, addr)
|
||||||
return node, node.Test()
|
return node, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
@ -509,6 +511,10 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterState) IsConsistent() bool {
|
func (c *clusterState) IsConsistent() bool {
|
||||||
|
if c.nodes.opt.ClusterSlots != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
if len(c.Masters) > len(c.Slaves) {
|
if len(c.Masters) > len(c.Slaves) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -614,6 +620,14 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
|
||||||
return nil, errors.New("redis: cluster has no state")
|
return nil, errors.New("redis: cluster has no state")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
|
||||||
|
state, err := c.Reload()
|
||||||
|
if err == nil {
|
||||||
|
return state, nil
|
||||||
|
}
|
||||||
|
return c.Get()
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// ClusterClient is a Redis Cluster client representing a pool of zero
|
// ClusterClient is a Redis Cluster client representing a pool of zero
|
||||||
|
@ -662,6 +676,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReloadState loads cluster slots information to update cluster topography.
|
||||||
|
func (c *ClusterClient) ReloadState() error {
|
||||||
|
_, err := c.state.Reload()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) init() {
|
func (c *ClusterClient) init() {
|
||||||
c.cmdable.setProcessor(c.Process)
|
c.cmdable.setProcessor(c.Process)
|
||||||
}
|
}
|
||||||
|
@ -946,13 +966,10 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
|
||||||
// ForEachMaster concurrently calls the fn on each master node in the cluster.
|
// ForEachMaster concurrently calls the fn on each master node in the cluster.
|
||||||
// It returns the first error if any.
|
// It returns the first error if any.
|
||||||
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
|
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
|
||||||
state, err := c.state.Reload()
|
state, err := c.state.ReloadOrGet()
|
||||||
if err != nil {
|
|
||||||
state, err = c.state.Get()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
|
@ -982,13 +999,10 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
|
||||||
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
|
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
|
||||||
// It returns the first error if any.
|
// It returns the first error if any.
|
||||||
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
|
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
|
||||||
state, err := c.state.Reload()
|
state, err := c.state.ReloadOrGet()
|
||||||
if err != nil {
|
|
||||||
state, err = c.state.Get()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
|
@ -1018,13 +1032,10 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
|
||||||
// ForEachNode concurrently calls the fn on each known node in the cluster.
|
// ForEachNode concurrently calls the fn on each known node in the cluster.
|
||||||
// It returns the first error if any.
|
// It returns the first error if any.
|
||||||
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
|
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
|
||||||
state, err := c.state.Reload()
|
state, err := c.state.ReloadOrGet()
|
||||||
if err != nil {
|
|
||||||
state, err = c.state.Get()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
|
@ -1092,6 +1103,14 @@ func (c *ClusterClient) PoolStats() *PoolStats {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) loadState() (*clusterState, error) {
|
func (c *ClusterClient) loadState() (*clusterState, error) {
|
||||||
|
if c.opt.ClusterSlots != nil {
|
||||||
|
slots, err := c.opt.ClusterSlots()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newClusterState(c.nodes, slots, "")
|
||||||
|
}
|
||||||
|
|
||||||
addrs, err := c.nodes.Addrs()
|
addrs, err := c.nodes.Addrs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -310,7 +310,8 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, master := range cluster.masters() {
|
client.ForEachMaster(func(master *redis.Client) error {
|
||||||
|
defer GinkgoRecover()
|
||||||
Eventually(func() string {
|
Eventually(func() string {
|
||||||
return master.Info("keyspace").Val()
|
return master.Info("keyspace").Val()
|
||||||
}, 30*time.Second).Should(Or(
|
}, 30*time.Second).Should(Or(
|
||||||
|
@ -318,7 +319,8 @@ var _ = Describe("ClusterClient", func() {
|
||||||
ContainSubstring("keys=29"),
|
ContainSubstring("keys=29"),
|
||||||
ContainSubstring("keys=40"),
|
ContainSubstring("keys=40"),
|
||||||
))
|
))
|
||||||
}
|
return nil
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("supports Watch", func() {
|
It("supports Watch", func() {
|
||||||
|
@ -750,6 +752,59 @@ var _ = Describe("ClusterClient", func() {
|
||||||
|
|
||||||
assertClusterClient()
|
assertClusterClient()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("ClusterClient with ClusterSlots", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
failover = true
|
||||||
|
|
||||||
|
opt = redisClusterOptions()
|
||||||
|
opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
|
||||||
|
slots := []redis.ClusterSlot{{
|
||||||
|
Start: 0,
|
||||||
|
End: 4999,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":" + ringShard1Port,
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
Start: 5000,
|
||||||
|
End: 9999,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":" + ringShard2Port,
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
Start: 10000,
|
||||||
|
End: 16383,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":" + ringShard3Port,
|
||||||
|
}},
|
||||||
|
}}
|
||||||
|
return slots, nil
|
||||||
|
}
|
||||||
|
client = cluster.clusterClient(opt)
|
||||||
|
|
||||||
|
err := client.ForEachMaster(func(master *redis.Client) error {
|
||||||
|
return master.FlushDB().Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.ForEachSlave(func(slave *redis.Client) error {
|
||||||
|
Eventually(func() int64 {
|
||||||
|
return client.DBSize().Val()
|
||||||
|
}, 30*time.Second).Should(Equal(int64(0)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
failover = false
|
||||||
|
|
||||||
|
err := client.Close()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
assertClusterClient()
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("ClusterClient without nodes", func() {
|
var _ = Describe("ClusterClient without nodes", func() {
|
||||||
|
|
|
@ -71,6 +71,48 @@ func ExampleNewClusterClient() {
|
||||||
client.Ping()
|
client.Ping()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Following example creates a cluster from 2 master nodes and 2 slave nodes
|
||||||
|
// without using cluster mode or Redis Sentinel.
|
||||||
|
func ExampleNewClusterClient_manualSetup() {
|
||||||
|
loadClusterSlots := func() ([]redis.ClusterSlot, error) {
|
||||||
|
slots := []redis.ClusterSlot{
|
||||||
|
// First node with 1 master and 1 slave.
|
||||||
|
{
|
||||||
|
Start: 0,
|
||||||
|
End: 8191,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":7000", // master
|
||||||
|
}, {
|
||||||
|
Addr: ":8000", // 1st slave
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
// Second node with 1 master and 1 slave.
|
||||||
|
{
|
||||||
|
Start: 8192,
|
||||||
|
End: 16383,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":7001", // master
|
||||||
|
}, {
|
||||||
|
Addr: ":8001", // 1st slave
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return slots, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
client := redis.NewClusterClient(&redis.ClusterOptions{
|
||||||
|
ClusterSlots: loadClusterSlots,
|
||||||
|
RouteRandomly: true,
|
||||||
|
})
|
||||||
|
client.Ping()
|
||||||
|
|
||||||
|
// ReloadState can be used to update cluster topography.
|
||||||
|
err := client.ReloadState()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func ExampleNewRing() {
|
func ExampleNewRing() {
|
||||||
client := redis.NewRing(&redis.RingOptions{
|
client := redis.NewRing(&redis.RingOptions{
|
||||||
Addrs: map[string]string{
|
Addrs: map[string]string{
|
||||||
|
|
|
@ -27,6 +27,7 @@ const (
|
||||||
const (
|
const (
|
||||||
ringShard1Port = "6390"
|
ringShard1Port = "6390"
|
||||||
ringShard2Port = "6391"
|
ringShard2Port = "6391"
|
||||||
|
ringShard3Port = "6392"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -39,7 +40,7 @@ const (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
redisMain *redisProcess
|
redisMain *redisProcess
|
||||||
ringShard1, ringShard2 *redisProcess
|
ringShard1, ringShard2, ringShard3 *redisProcess
|
||||||
sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
|
sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,6 +63,9 @@ var _ = BeforeSuite(func() {
|
||||||
ringShard2, err = startRedis(ringShard2Port)
|
ringShard2, err = startRedis(ringShard2Port)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
ringShard3, err = startRedis(ringShard3Port)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sentinelMaster, err = startRedis(sentinelMasterPort)
|
sentinelMaster, err = startRedis(sentinelMasterPort)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
@ -84,6 +88,7 @@ var _ = AfterSuite(func() {
|
||||||
|
|
||||||
Expect(ringShard1.Close()).NotTo(HaveOccurred())
|
Expect(ringShard1.Close()).NotTo(HaveOccurred())
|
||||||
Expect(ringShard2.Close()).NotTo(HaveOccurred())
|
Expect(ringShard2.Close()).NotTo(HaveOccurred())
|
||||||
|
Expect(ringShard3.Close()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
Expect(sentinel.Close()).NotTo(HaveOccurred())
|
Expect(sentinel.Close()).NotTo(HaveOccurred())
|
||||||
Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
|
Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
|
||||||
|
|
Loading…
Reference in New Issue