mirror of https://github.com/go-redis/redis.git
Merge pull request #302 from go-redis/fix/export-cluster-node-id
Expose cluster node id in ClusterSlots.
This commit is contained in:
commit
96650c0e91
21
cluster.go
21
cluster.go
|
@ -236,7 +236,7 @@ func (c *ClusterClient) resetClients() (retErr error) {
|
||||||
return retErr
|
return retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
func (c *ClusterClient) setSlots(slots []ClusterSlot) {
|
||||||
c.slotsMx.Lock()
|
c.slotsMx.Lock()
|
||||||
|
|
||||||
seen := make(map[string]struct{})
|
seen := make(map[string]struct{})
|
||||||
|
@ -247,15 +247,20 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
||||||
for i := 0; i < hashtag.SlotNumber; i++ {
|
for i := 0; i < hashtag.SlotNumber; i++ {
|
||||||
c.slots[i] = c.slots[i][:0]
|
c.slots[i] = c.slots[i][:0]
|
||||||
}
|
}
|
||||||
for _, info := range slots {
|
for _, slot := range slots {
|
||||||
for slot := info.Start; slot <= info.End; slot++ {
|
var addrs []string
|
||||||
c.slots[slot] = info.Addrs
|
for _, node := range slot.Nodes {
|
||||||
|
addrs = append(addrs, node.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range info.Addrs {
|
for i := slot.Start; i <= slot.End; i++ {
|
||||||
if _, ok := seen[addr]; !ok {
|
c.slots[i] = addrs
|
||||||
c.addrs = append(c.addrs, addr)
|
}
|
||||||
seen[addr] = struct{}{}
|
|
||||||
|
for _, node := range slot.Nodes {
|
||||||
|
if _, ok := seen[node.Addr]; !ok {
|
||||||
|
c.addrs = append(c.addrs, node.Addr)
|
||||||
|
seen[node.Addr] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,11 @@ var _ = Describe("ClusterClient", func() {
|
||||||
var subject *ClusterClient
|
var subject *ClusterClient
|
||||||
|
|
||||||
var populate = func() {
|
var populate = func() {
|
||||||
subject.setSlots([]ClusterSlotInfo{
|
subject.setSlots([]ClusterSlot{
|
||||||
{0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}},
|
{0, 4095, []ClusterNode{{"", "127.0.0.1:7000"}, {"", "127.0.0.1:7004"}}},
|
||||||
{12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}},
|
{12288, 16383, []ClusterNode{{"", "127.0.0.1:7003"}, {"", "127.0.0.1:7007"}}},
|
||||||
{4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}},
|
{4096, 8191, []ClusterNode{{"", "127.0.0.1:7001"}, {"", "127.0.0.1:7005"}}},
|
||||||
{8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}},
|
{8192, 12287, []ClusterNode{{"", "127.0.0.1:7002"}, {"", "127.0.0.1:7006"}}},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -135,21 +134,12 @@ func startCluster(scenario *clusterScenario) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
wanted := []redis.ClusterSlotInfo{
|
wanted := []redis.ClusterSlot{
|
||||||
{0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}},
|
{0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
|
||||||
{5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}},
|
{5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
|
||||||
{10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}},
|
{10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
|
||||||
}
|
}
|
||||||
loop:
|
return assertSlotsEqual(res, wanted)
|
||||||
for _, info := range res {
|
|
||||||
for _, info2 := range wanted {
|
|
||||||
if reflect.DeepEqual(info, info2) {
|
|
||||||
continue loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("cluster did not reach consistent state (%v)", res)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}, 30*time.Second)
|
}, 30*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -159,6 +149,34 @@ func startCluster(scenario *clusterScenario) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
|
||||||
|
outer_loop:
|
||||||
|
for _, s2 := range wanted {
|
||||||
|
for _, s1 := range slots {
|
||||||
|
if slotEqual(s1, s2) {
|
||||||
|
continue outer_loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%v not found in %v", s2, slots)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func slotEqual(s1, s2 redis.ClusterSlot) bool {
|
||||||
|
if s1.Start != s2.Start {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if s1.End != s2.End {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, n1 := range s1.Nodes {
|
||||||
|
if n1.Addr != s2.Nodes[i].Addr {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func stopCluster(scenario *clusterScenario) error {
|
func stopCluster(scenario *clusterScenario) error {
|
||||||
for _, client := range scenario.clients {
|
for _, client := range scenario.clients {
|
||||||
if err := client.Close(); err != nil {
|
if err := client.Close(); err != nil {
|
||||||
|
@ -223,11 +241,13 @@ var _ = Describe("Cluster", func() {
|
||||||
res, err := cluster.primary().ClusterSlots().Result()
|
res, err := cluster.primary().ClusterSlots().Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(res).To(HaveLen(3))
|
Expect(res).To(HaveLen(3))
|
||||||
Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{
|
|
||||||
{0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}},
|
wanted := []redis.ClusterSlot{
|
||||||
{5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}},
|
{0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
|
||||||
{10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}},
|
{5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
|
||||||
}))
|
{10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
|
||||||
|
}
|
||||||
|
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should CLUSTER NODES", func() {
|
It("should CLUSTER NODES", func() {
|
||||||
|
|
35
command.go
35
command.go
|
@ -25,7 +25,7 @@ var (
|
||||||
_ Cmder = (*StringIntMapCmd)(nil)
|
_ Cmder = (*StringIntMapCmd)(nil)
|
||||||
_ Cmder = (*ZSliceCmd)(nil)
|
_ Cmder = (*ZSliceCmd)(nil)
|
||||||
_ Cmder = (*ScanCmd)(nil)
|
_ Cmder = (*ScanCmd)(nil)
|
||||||
_ Cmder = (*ClusterSlotCmd)(nil)
|
_ Cmder = (*ClusterSlotsCmd)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cmder interface {
|
type Cmder interface {
|
||||||
|
@ -730,48 +730,51 @@ func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// TODO: rename to ClusterSlot
|
type ClusterNode struct {
|
||||||
type ClusterSlotInfo struct {
|
Id string
|
||||||
|
Addr string
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterSlot struct {
|
||||||
Start int
|
Start int
|
||||||
End int
|
End int
|
||||||
Addrs []string
|
Nodes []ClusterNode
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: rename to ClusterSlotsCmd
|
type ClusterSlotsCmd struct {
|
||||||
type ClusterSlotCmd struct {
|
|
||||||
baseCmd
|
baseCmd
|
||||||
|
|
||||||
val []ClusterSlotInfo
|
val []ClusterSlot
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClusterSlotCmd(args ...interface{}) *ClusterSlotCmd {
|
func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd {
|
||||||
return &ClusterSlotCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
return &ClusterSlotsCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ClusterSlotCmd) Val() []ClusterSlotInfo {
|
func (cmd *ClusterSlotsCmd) Val() []ClusterSlot {
|
||||||
return cmd.val
|
return cmd.val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ClusterSlotCmd) Result() ([]ClusterSlotInfo, error) {
|
func (cmd *ClusterSlotsCmd) Result() ([]ClusterSlot, error) {
|
||||||
return cmd.Val(), cmd.Err()
|
return cmd.Val(), cmd.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ClusterSlotCmd) String() string {
|
func (cmd *ClusterSlotsCmd) String() string {
|
||||||
return cmdString(cmd, cmd.val)
|
return cmdString(cmd, cmd.val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ClusterSlotCmd) reset() {
|
func (cmd *ClusterSlotsCmd) reset() {
|
||||||
cmd.val = nil
|
cmd.val = nil
|
||||||
cmd.err = nil
|
cmd.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ClusterSlotCmd) readReply(cn *pool.Conn) error {
|
func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
|
||||||
v, err := readArrayReply(cn, clusterSlotInfoSliceParser)
|
v, err := readArrayReply(cn, clusterSlotsParser)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.err = err
|
cmd.err = err
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cmd.val = v.([]ClusterSlotInfo)
|
cmd.val = v.([]ClusterSlot)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1659,8 +1659,8 @@ func (c *commandable) PubSubNumPat() *IntCmd {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
func (c *commandable) ClusterSlots() *ClusterSlotCmd {
|
func (c *commandable) ClusterSlots() *ClusterSlotsCmd {
|
||||||
cmd := NewClusterSlotCmd("CLUSTER", "slots")
|
cmd := NewClusterSlotsCmd("CLUSTER", "slots")
|
||||||
cmd._clusterKeyPos = 0
|
cmd._clusterKeyPos = 0
|
||||||
c.Process(cmd)
|
c.Process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
|
|
31
parser.go
31
parser.go
|
@ -562,9 +562,9 @@ func zSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
|
||||||
return zz, nil
|
return zz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func clusterSlotInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
|
func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) {
|
||||||
infos := make([]ClusterSlotInfo, 0, n)
|
slots := make([]ClusterSlot, slotNum)
|
||||||
for i := int64(0); i < n; i++ {
|
for slotInd := 0; slotInd < len(slots); slotInd++ {
|
||||||
n, err := readArrayHeader(cn)
|
n, err := readArrayHeader(cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -584,14 +584,8 @@ func clusterSlotInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
addrsn := n - 2
|
nodes := make([]ClusterNode, n-2)
|
||||||
info := ClusterSlotInfo{
|
for nodeInd := 0; nodeInd < len(nodes); nodeInd++ {
|
||||||
Start: int(start),
|
|
||||||
End: int(end),
|
|
||||||
Addrs: make([]string, addrsn),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := int64(0); i < addrsn; i++ {
|
|
||||||
n, err := readArrayHeader(cn)
|
n, err := readArrayHeader(cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -610,21 +604,24 @@ func clusterSlotInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
nodes[nodeInd].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
|
||||||
|
|
||||||
if n == 3 {
|
if n == 3 {
|
||||||
// TODO: expose id in ClusterSlotInfo
|
id, err := readStringReply(cn)
|
||||||
_, err := readStringReply(cn)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
nodes[nodeInd].Id = id
|
||||||
}
|
}
|
||||||
|
|
||||||
info.Addrs[i] = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
infos = append(infos, info)
|
slots[slotInd] = ClusterSlot{
|
||||||
|
Start: int(start),
|
||||||
|
End: int(end),
|
||||||
|
Nodes: nodes,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return infos, nil
|
return slots, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser {
|
func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser {
|
||||||
|
|
Loading…
Reference in New Issue