diff --git a/cluster.go b/cluster.go index baecb8f5..9443a9f3 100644 --- a/cluster.go +++ b/cluster.go @@ -237,7 +237,7 @@ func (c *ClusterClient) resetClients() (retErr error) { return retErr } -func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { +func (c *ClusterClient) setSlots(slots []ClusterSlot) { c.slotsMx.Lock() seen := make(map[string]struct{}) @@ -248,15 +248,20 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { for i := 0; i < hashtag.SlotNumber; i++ { c.slots[i] = c.slots[i][:0] } - for _, info := range slots { - for slot := info.Start; slot <= info.End; slot++ { - c.slots[slot] = info.Addrs + for _, slot := range slots { + var addrs []string + for _, node := range slot.Nodes { + addrs = append(addrs, node.Addr) } - for _, addr := range info.Addrs { - if _, ok := seen[addr]; !ok { - c.addrs = append(c.addrs, addr) - seen[addr] = struct{}{} + for i := slot.Start; i <= slot.End; i++ { + c.slots[i] = addrs + } + + for _, node := range slot.Nodes { + if _, ok := seen[node.Addr]; !ok { + c.addrs = append(c.addrs, node.Addr) + seen[node.Addr] = struct{}{} } } } diff --git a/cluster_client_test.go b/cluster_client_test.go index 9502ba05..560c4387 100644 --- a/cluster_client_test.go +++ b/cluster_client_test.go @@ -22,11 +22,11 @@ var _ = Describe("ClusterClient", func() { var subject *ClusterClient var populate = func() { - subject.setSlots([]ClusterSlotInfo{ - {0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}}, - {12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}}, - {4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}}, - {8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}}, + subject.setSlots([]ClusterSlot{ + {0, 4095, []ClusterNode{{"", "127.0.0.1:7000"}, {"", "127.0.0.1:7004"}}}, + {12288, 16383, []ClusterNode{{"", "127.0.0.1:7003"}, {"", "127.0.0.1:7007"}}}, + {4096, 8191, []ClusterNode{{"", "127.0.0.1:7001"}, {"", "127.0.0.1:7005"}}}, + {8192, 12287, []ClusterNode{{"", "127.0.0.1:7002"}, {"", "127.0.0.1:7006"}}}, }) } diff --git a/cluster_test.go b/cluster_test.go index e7aa9b96..24d516bf 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4,7 +4,6 @@ import ( "fmt" "math/rand" "net" - "reflect" "strconv" "strings" "sync" @@ -135,21 +134,12 @@ func startCluster(scenario *clusterScenario) error { if err != nil { return err } - wanted := []redis.ClusterSlotInfo{ - {0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}}, - {5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}}, - {10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}}, + wanted := []redis.ClusterSlot{ + {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}}, + {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}}, + {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}}, } - loop: - for _, info := range res { - for _, info2 := range wanted { - if reflect.DeepEqual(info, info2) { - continue loop - } - } - return fmt.Errorf("cluster did not reach consistent state (%v)", res) - } - return nil + return assertSlotsEqual(res, wanted) }, 30*time.Second) if err != nil { return err @@ -159,6 +149,34 @@ func startCluster(scenario *clusterScenario) error { return nil } +func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error { +outer_loop: + for _, s2 := range wanted { + for _, s1 := range slots { + if slotEqual(s1, s2) { + continue outer_loop + } + } + return fmt.Errorf("%v not found in %v", s2, slots) + } + return nil +} + +func slotEqual(s1, s2 redis.ClusterSlot) bool { + if s1.Start != s2.Start { + return false + } + if s1.End != s2.End { + return false + } + for i, n1 := range s1.Nodes { + if n1.Addr != s2.Nodes[i].Addr { + return false + } + } + return true +} + func stopCluster(scenario *clusterScenario) error { for _, client := range scenario.clients { if err := client.Close(); err != nil { @@ -223,11 +241,13 @@ var _ = Describe("Cluster", func() { res, err := cluster.primary().ClusterSlots().Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(HaveLen(3)) - Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{ - {0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}}, - {5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}}, - {10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}}, - })) + + wanted := []redis.ClusterSlot{ + {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}}, + {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}}, + {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}}, + } + Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) }) It("should CLUSTER NODES", func() { diff --git a/command.go b/command.go index 0c30bcb5..b6a9b195 100644 --- a/command.go +++ b/command.go @@ -25,7 +25,7 @@ var ( _ Cmder = (*StringIntMapCmd)(nil) _ Cmder = (*ZSliceCmd)(nil) _ Cmder = (*ScanCmd)(nil) - _ Cmder = (*ClusterSlotCmd)(nil) + _ Cmder = (*ClusterSlotsCmd)(nil) ) type Cmder interface { @@ -730,48 +730,51 @@ func (cmd *ScanCmd) readReply(cn *pool.Conn) error { //------------------------------------------------------------------------------ -// TODO: rename to ClusterSlot -type ClusterSlotInfo struct { +type ClusterNode struct { + Id string + Addr string +} + +type ClusterSlot struct { Start int End int - Addrs []string + Nodes []ClusterNode } -// TODO: rename to ClusterSlotsCmd -type ClusterSlotCmd struct { +type ClusterSlotsCmd struct { baseCmd - val []ClusterSlotInfo + val []ClusterSlot } -func NewClusterSlotCmd(args ...interface{}) *ClusterSlotCmd { - return &ClusterSlotCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} +func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd { + return &ClusterSlotsCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}} } -func (cmd *ClusterSlotCmd) Val() []ClusterSlotInfo { +func (cmd *ClusterSlotsCmd) Val() []ClusterSlot { return cmd.val } -func (cmd *ClusterSlotCmd) Result() ([]ClusterSlotInfo, error) { +func (cmd *ClusterSlotsCmd) Result() ([]ClusterSlot, error) { return cmd.Val(), cmd.Err() } -func (cmd *ClusterSlotCmd) String() string { +func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotCmd) reset() { +func (cmd *ClusterSlotsCmd) reset() { cmd.val = nil cmd.err = nil } -func (cmd *ClusterSlotCmd) readReply(cn *pool.Conn) error { - v, err := readArrayReply(cn, clusterSlotInfoSliceParser) +func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { + v, err := readArrayReply(cn, clusterSlotsParser) if err != nil { cmd.err = err return err } - cmd.val = v.([]ClusterSlotInfo) + cmd.val = v.([]ClusterSlot) return nil } diff --git a/commands.go b/commands.go index dc6dd08f..41c13019 100644 --- a/commands.go +++ b/commands.go @@ -1659,8 +1659,8 @@ func (c *commandable) PubSubNumPat() *IntCmd { //------------------------------------------------------------------------------ -func (c *commandable) ClusterSlots() *ClusterSlotCmd { - cmd := NewClusterSlotCmd("CLUSTER", "slots") +func (c *commandable) ClusterSlots() *ClusterSlotsCmd { + cmd := NewClusterSlotsCmd("CLUSTER", "slots") cmd._clusterKeyPos = 0 c.Process(cmd) return cmd diff --git a/parser.go b/parser.go index 07988579..d96b5e86 100644 --- a/parser.go +++ b/parser.go @@ -562,9 +562,9 @@ func zSliceParser(cn *pool.Conn, n int64) (interface{}, error) { return zz, nil } -func clusterSlotInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) { - infos := make([]ClusterSlotInfo, 0, n) - for i := int64(0); i < n; i++ { +func clusterSlotsParser(cn *pool.Conn, slotNum int64) (interface{}, error) { + slots := make([]ClusterSlot, slotNum) + for slotInd := 0; slotInd < len(slots); slotInd++ { n, err := readArrayHeader(cn) if err != nil { return nil, err @@ -584,14 +584,8 @@ func clusterSlotInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) { return nil, err } - addrsn := n - 2 - info := ClusterSlotInfo{ - Start: int(start), - End: int(end), - Addrs: make([]string, addrsn), - } - - for i := int64(0); i < addrsn; i++ { + nodes := make([]ClusterNode, n-2) + for nodeInd := 0; nodeInd < len(nodes); nodeInd++ { n, err := readArrayHeader(cn) if err != nil { return nil, err @@ -610,21 +604,24 @@ func clusterSlotInfoSliceParser(cn *pool.Conn, n int64) (interface{}, error) { if err != nil { return nil, err } + nodes[nodeInd].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) if n == 3 { - // TODO: expose id in ClusterSlotInfo - _, err := readStringReply(cn) + id, err := readStringReply(cn) if err != nil { return nil, err } + nodes[nodeInd].Id = id } - - info.Addrs[i] = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) } - infos = append(infos, info) + slots[slotInd] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } } - return infos, nil + return slots, nil } func newGeoLocationParser(q *GeoRadiusQuery) multiBulkParser {