Merge pull request #1581 from prathik/replica-cmd

Add support to get cluster replica node for a given key
This commit is contained in:
Vladimir Mihailenco 2020-12-16 17:29:41 +02:00 committed by GitHub
commit e9fef17b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 0 deletions

View File

@ -1655,6 +1655,35 @@ func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterN
return state.slotMasterNode(slot)
}
// ReplicaForKey gets a client for a replica node to run any command on it.
// This is especially useful if we want to run a particular lua script which has
// only read only commands on the replica.
// This is because other redis commands generally have a flag that points that
// they are read only and automatically run on the replica nodes
// if ClusterOptions.ReadOnly flag is set to true.
func (c *ClusterClient) ReplicaForKey(ctx context.Context, key string) (*Client, error) {
state, err := c.state.Get(ctx)
if err != nil {
return nil, err
}
slot := hashtag.Slot(key)
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return nil, err
}
return node.Client, err
}
// MasterForKey return a client to the master node for a particular key.
func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
slot := hashtag.Slot(key)
node, err := c.slotMasterNode(ctx, slot)
if err != nil {
return nil, err
}
return node.Client, err
}
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {

View File

@ -864,6 +864,20 @@ var _ = Describe("ClusterClient", func() {
}))
})
It("should return correct replica for key", func() {
client, err := client.ReplicaForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred())
info := client.Info(ctx, "server")
Expect(info.Val()).Should(ContainSubstring("tcp_port:8224"))
})
It("should return correct master for key", func() {
client, err := client.MasterForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred())
info := client.Info(ctx, "server")
Expect(info.Val()).Should(ContainSubstring("tcp_port:8221"))
})
assertClusterClient()
})