Merge pull request #1591 from signalfx/clusterslots-ro

ClusterSlots Config and ReadOnly
This commit is contained in:
Vladimir Mihailenco 2021-01-09 08:50:35 +02:00 committed by GitHub
commit 71e90de07c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 3 deletions

View File

@ -86,7 +86,7 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 3 opt.MaxRedirects = 3
} }
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true opt.ReadOnly = true
} }
@ -153,9 +153,13 @@ func (opt *ClusterOptions) clientOptions() *Options {
IdleTimeout: opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck, IdleCheckFrequency: disableIdleCheck,
readOnly: opt.ReadOnly,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
// If ClusterSlots is populated, then we probably have an artificial
// cluster whose nodes are not in clustering mode (otherwise there isn't
// much use for ClusterSlots config). This means we cannot execute the
// READONLY command against that node -- setting readOnly to false in such
// situations in the options below will prevent that from happening.
readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
} }
} }

View File

@ -1020,6 +1020,66 @@ var _ = Describe("ClusterClient", func() {
assertClusterClient() assertClusterClient()
}) })
Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.ReadOnly = true
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
Addr: ":8220",
}, {
Addr: ":8223",
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
Addr: ":8221",
}, {
Addr: ":8224",
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":8222",
}, {
Addr: ":8225",
}},
}}
return slots, nil
}
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
}) })
var _ = Describe("ClusterClient without nodes", func() { var _ = Describe("ClusterClient without nodes", func() {