diff --git a/cluster.go b/cluster.go index a8115b4..d584423 100644 --- a/cluster.go +++ b/cluster.go @@ -30,7 +30,7 @@ type ClusterOptions struct { // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. - // Default is 8. + // Default is 8 retries. MaxRedirects int // Enables read-only commands on slave nodes. @@ -39,8 +39,14 @@ type ClusterOptions struct { // It automatically enables ReadOnly. RouteByLatency bool // Allows routing read-only commands to the random master or slave node. + // It automatically enables ReadOnly. RouteRandomly bool + // Optional function that is used to load cluster slots information. + // It is useful to manually create cluster of standalone Redis servers + // or load-balance read/write operations between master and slaves. + ClusterSlots func() ([]ClusterSlot, error) + // Following options are copied from Options struct. OnConnect func(*Conn) error @@ -70,7 +76,7 @@ func (opt *ClusterOptions) init() { opt.MaxRedirects = 8 } - if opt.RouteByLatency { + if opt.RouteByLatency || opt.RouteRandomly { opt.ReadOnly = true } @@ -160,10 +166,6 @@ func (n *clusterNode) Close() error { return n.Client.Close() } -func (n *clusterNode) Test() error { - return n.Client.ClusterInfo().Err() -} - func (n *clusterNode) updateLatency() { const probes = 10 @@ -330,7 +332,7 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) { node := newClusterNode(c.opt, addr) - return node, node.Test() + return node, nil }) c.mu.Lock() @@ -509,6 +511,10 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { } func (c *clusterState) IsConsistent() bool { + if c.nodes.opt.ClusterSlots != nil { + return true + } + if len(c.Masters) > len(c.Slaves) { return false } @@ -614,6 +620,14 @@ func (c *clusterStateHolder) Get() (*clusterState, error) { return nil, errors.New("redis: cluster has no state") } +func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { + state, err := c.Reload() + if err == nil { + return state, nil + } + return c.Get() +} + //------------------------------------------------------------------------------ // ClusterClient is a Redis Cluster client representing a pool of zero @@ -662,6 +676,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } +// ReloadState loads cluster slots information to update cluster topography. +func (c *ClusterClient) ReloadState() error { + _, err := c.state.Reload() + return err +} + func (c *ClusterClient) init() { c.cmdable.setProcessor(c.Process) } @@ -946,12 +966,9 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state, err := c.state.Reload() + state, err := c.state.ReloadOrGet() if err != nil { - state, err = c.state.Get() - if err != nil { - return err - } + return err } var wg sync.WaitGroup @@ -982,12 +999,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state, err := c.state.Reload() + state, err := c.state.ReloadOrGet() if err != nil { - state, err = c.state.Get() - if err != nil { - return err - } + return err } var wg sync.WaitGroup @@ -1018,12 +1032,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state, err := c.state.Reload() + state, err := c.state.ReloadOrGet() if err != nil { - state, err = c.state.Get() - if err != nil { - return err - } + return err } var wg sync.WaitGroup @@ -1092,6 +1103,14 @@ func (c *ClusterClient) PoolStats() *PoolStats { } func (c *ClusterClient) loadState() (*clusterState, error) { + if c.opt.ClusterSlots != nil { + slots, err := c.opt.ClusterSlots() + if err != nil { + return nil, err + } + return newClusterState(c.nodes, slots, "") + } + addrs, err := c.nodes.Addrs() if err != nil { return nil, err diff --git a/cluster_test.go b/cluster_test.go index 80b4d02..3bedff3 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -310,7 +310,8 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) } - for _, master := range cluster.masters() { + client.ForEachMaster(func(master *redis.Client) error { + defer GinkgoRecover() Eventually(func() string { return master.Info("keyspace").Val() }, 30*time.Second).Should(Or( @@ -318,7 +319,8 @@ var _ = Describe("ClusterClient", func() { ContainSubstring("keys=29"), ContainSubstring("keys=40"), )) - } + return nil + }) }) It("supports Watch", func() { @@ -750,6 +752,59 @@ var _ = Describe("ClusterClient", func() { assertClusterClient() }) + + Describe("ClusterClient with ClusterSlots", func() { + BeforeEach(func() { + failover = true + + opt = redisClusterOptions() + opt.ClusterSlots = func() ([]redis.ClusterSlot, error) { + slots := []redis.ClusterSlot{{ + Start: 0, + End: 4999, + Nodes: []redis.ClusterNode{{ + Addr: ":" + ringShard1Port, + }}, + }, { + Start: 5000, + End: 9999, + Nodes: []redis.ClusterNode{{ + Addr: ":" + ringShard2Port, + }}, + }, { + Start: 10000, + End: 16383, + Nodes: []redis.ClusterNode{{ + Addr: ":" + ringShard3Port, + }}, + }} + return slots, nil + } + client = cluster.clusterClient(opt) + + err := client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) + Expect(err).NotTo(HaveOccurred()) + + err = client.ForEachSlave(func(slave *redis.Client) error { + Eventually(func() int64 { + return client.DBSize().Val() + }, 30*time.Second).Should(Equal(int64(0))) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + failover = false + + err := client.Close() + Expect(err).NotTo(HaveOccurred()) + }) + + assertClusterClient() + }) }) var _ = Describe("ClusterClient without nodes", func() { diff --git a/example_test.go b/example_test.go index 4d18ddb..eb2bb95 100644 --- a/example_test.go +++ b/example_test.go @@ -71,6 +71,48 @@ func ExampleNewClusterClient() { client.Ping() } +// Following example creates a cluster from 2 master nodes and 2 slave nodes +// without using cluster mode or Redis Sentinel. +func ExampleNewClusterClient_manualSetup() { + loadClusterSlots := func() ([]redis.ClusterSlot, error) { + slots := []redis.ClusterSlot{ + // First node with 1 master and 1 slave. + { + Start: 0, + End: 8191, + Nodes: []redis.ClusterNode{{ + Addr: ":7000", // master + }, { + Addr: ":8000", // 1st slave + }}, + }, + // Second node with 1 master and 1 slave. + { + Start: 8192, + End: 16383, + Nodes: []redis.ClusterNode{{ + Addr: ":7001", // master + }, { + Addr: ":8001", // 1st slave + }}, + }, + } + return slots, nil + } + + client := redis.NewClusterClient(&redis.ClusterOptions{ + ClusterSlots: loadClusterSlots, + RouteRandomly: true, + }) + client.Ping() + + // ReloadState can be used to update cluster topography. + err := client.ReloadState() + if err != nil { + panic(err) + } +} + func ExampleNewRing() { client := redis.NewRing(&redis.RingOptions{ Addrs: map[string]string{ diff --git a/main_test.go b/main_test.go index 4eddc1e..299cd70 100644 --- a/main_test.go +++ b/main_test.go @@ -27,6 +27,7 @@ const ( const ( ringShard1Port = "6390" ringShard2Port = "6391" + ringShard3Port = "6392" ) const ( @@ -39,7 +40,7 @@ const ( var ( redisMain *redisProcess - ringShard1, ringShard2 *redisProcess + ringShard1, ringShard2, ringShard3 *redisProcess sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess ) @@ -62,6 +63,9 @@ var _ = BeforeSuite(func() { ringShard2, err = startRedis(ringShard2Port) Expect(err).NotTo(HaveOccurred()) + ringShard3, err = startRedis(ringShard3Port) + Expect(err).NotTo(HaveOccurred()) + sentinelMaster, err = startRedis(sentinelMasterPort) Expect(err).NotTo(HaveOccurred()) @@ -84,6 +88,7 @@ var _ = AfterSuite(func() { Expect(ringShard1.Close()).NotTo(HaveOccurred()) Expect(ringShard2.Close()).NotTo(HaveOccurred()) + Expect(ringShard3.Close()).NotTo(HaveOccurred()) Expect(sentinel.Close()).NotTo(HaveOccurred()) Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())