forked from mirror/redis
ClusterSlots Config and ReadOnly
You have to set the ReadOnly flag on the ClusterOptions to make it fall back to non-master nodes on reads. For non-clustered situations where you have multiple nodes in a single ClusterSlot, it made the non-master node useless because it would never route requests to it and if you set ReadOnly=true it would issue a READONLY command to it, which would fail. See comment in code for more details.
This commit is contained in:
parent
b3e0aa270a
commit
74fdad907c
10
cluster.go
10
cluster.go
|
@ -86,7 +86,7 @@ func (opt *ClusterOptions) init() {
|
||||||
opt.MaxRedirects = 3
|
opt.MaxRedirects = 3
|
||||||
}
|
}
|
||||||
|
|
||||||
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
|
if opt.RouteByLatency || opt.RouteRandomly {
|
||||||
opt.ReadOnly = true
|
opt.ReadOnly = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,9 +153,13 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
IdleCheckFrequency: disableIdleCheck,
|
IdleCheckFrequency: disableIdleCheck,
|
||||||
|
|
||||||
readOnly: opt.ReadOnly,
|
|
||||||
|
|
||||||
TLSConfig: opt.TLSConfig,
|
TLSConfig: opt.TLSConfig,
|
||||||
|
// If ClusterSlots is populated, then we probably have an artificial
|
||||||
|
// cluster whose nodes are not in clustering mode (otherwise there isn't
|
||||||
|
// much use for ClusterSlots config). This means we cannot execute the
|
||||||
|
// READONLY command against that node -- setting readOnly to false in such
|
||||||
|
// situations in the options below will prevent that from happening.
|
||||||
|
readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1020,6 +1020,66 @@ var _ = Describe("ClusterClient", func() {
|
||||||
|
|
||||||
assertClusterClient()
|
assertClusterClient()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
failover = true
|
||||||
|
|
||||||
|
opt = redisClusterOptions()
|
||||||
|
opt.ReadOnly = true
|
||||||
|
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
|
||||||
|
slots := []redis.ClusterSlot{{
|
||||||
|
Start: 0,
|
||||||
|
End: 4999,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":8220",
|
||||||
|
}, {
|
||||||
|
Addr: ":8223",
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
Start: 5000,
|
||||||
|
End: 9999,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":8221",
|
||||||
|
}, {
|
||||||
|
Addr: ":8224",
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
Start: 10000,
|
||||||
|
End: 16383,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
Addr: ":8222",
|
||||||
|
}, {
|
||||||
|
Addr: ":8225",
|
||||||
|
}},
|
||||||
|
}}
|
||||||
|
return slots, nil
|
||||||
|
}
|
||||||
|
client = cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
return master.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
|
||||||
|
Eventually(func() int64 {
|
||||||
|
return client.DBSize(ctx).Val()
|
||||||
|
}, 30*time.Second).Should(Equal(int64(0)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
failover = false
|
||||||
|
|
||||||
|
err := client.Close()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
assertClusterClient()
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("ClusterClient without nodes", func() {
|
var _ = Describe("ClusterClient without nodes", func() {
|
||||||
|
|
Loading…
Reference in New Issue