Don't enable ReadOnly in custom cluster mode

This commit is contained in:
Vladimir Mihailenco 2018-09-20 12:49:43 +03:00
parent 40dbb03d62
commit 69445c6e87
2 changed files with 56 additions and 2 deletions

View File

@ -82,7 +82,7 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8 opt.MaxRedirects = 8
} }
if opt.RouteByLatency || opt.RouteRandomly { if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
opt.ReadOnly = true opt.ReadOnly = true
} }
@ -831,7 +831,7 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
cmdInfo := c.cmdInfo(cmd.Name()) cmdInfo := c.cmdInfo(cmd.Name())
slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency { if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot) node, err := state.slotClosestNode(slot)
return slot, node, err return slot, node, err

View File

@ -846,6 +846,60 @@ var _ = Describe("ClusterClient", func() {
assertClusterClient() assertClusterClient()
}) })
Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.RouteRandomly = true
opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard1Port,
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard2Port,
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard3Port,
}},
}}
return slots, nil
}
client = cluster.clusterClient(opt)
err := client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(func(slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize().Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
}) })
var _ = Describe("ClusterClient without nodes", func() { var _ = Describe("ClusterClient without nodes", func() {