diff --git a/cluster.go b/cluster.go index fe50257..84a026c 100644 --- a/cluster.go +++ b/cluster.go @@ -8,7 +8,7 @@ import ( "math" "math/rand" "net" - "strings" + "sort" "sync" "sync/atomic" "time" @@ -387,12 +387,31 @@ func (c *clusterNodes) Random() (*clusterNode, error) { //------------------------------------------------------------------------------ +type clusterSlot struct { + start, end int + nodes []*clusterNode +} + +type clusterSlotSlice []*clusterSlot + +func (p clusterSlotSlice) Len() int { + return len(p) +} + +func (p clusterSlotSlice) Less(i, j int) bool { + return p[i].start < p[j].start +} + +func (p clusterSlotSlice) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + type clusterState struct { nodes *clusterNodes Masters []*clusterNode Slaves []*clusterNode - slots [][]*clusterNode + slots []*clusterSlot generation uint32 createdAt time.Time @@ -404,7 +423,7 @@ func newClusterState( c := clusterState{ nodes: nodes, - slots: make([][]*clusterNode, hashtag.SlotNumber), + slots: make([]*clusterSlot, 0, len(slots)), generation: nodes.NextGeneration(), createdAt: time.Now(), @@ -434,11 +453,15 @@ func newClusterState( } } - for i := slot.Start; i <= slot.End; i++ { - c.slots[i] = nodes - } + c.slots = append(c.slots, &clusterSlot{ + start: slot.Start, + end: slot.End, + nodes: nodes, + }) } + sort.Sort(clusterSlotSlice(c.slots)) + time.AfterFunc(time.Minute, func() { nodes.GC(c.generation) }) @@ -506,8 +529,15 @@ func (c *clusterState) slotRandomNode(slot int) *clusterNode { } func (c *clusterState) slotNodes(slot int) []*clusterNode { - if slot >= 0 && slot < len(c.slots) { - return c.slots[slot] + i := sort.Search(len(c.slots), func(i int) bool { + return c.slots[i].end >= slot + }) + if i >= len(c.slots) { + return nil + } + x := c.slots[i] + if slot >= x.start && slot <= x.end { + return x.nodes } return nil } @@ -516,26 +546,7 @@ func (c *clusterState) IsConsistent() bool { if c.nodes.opt.ClusterSlots != nil { return true } - - if len(c.Masters) > len(c.Slaves) { - return false - } - - for _, master := range c.Masters { - s := master.Client.Info("replication").Val() - if !strings.Contains(s, "role:master") { - return false - } - } - - for _, slave := range c.Slaves { - s := slave.Client.Info("replication").Val() - if !strings.Contains(s, "role:slave") { - return false - } - } - - return true + return len(c.Masters) <= len(c.Slaves) } //------------------------------------------------------------------------------ @@ -563,7 +574,7 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) { return nil, err } if !state.IsConsistent() { - c.LazyReload() + time.AfterFunc(time.Second, c.LazyReload) } return state, nil } @@ -843,6 +854,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { } if internal.IsRetryableError(err, true) { + c.state.LazyReload() continue } @@ -929,12 +941,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { } if internal.IsRetryableError(err, true) { - // Firstly retry the same node. + c.state.LazyReload() + + // First retry the same node. if attempt == 0 { continue } - // Secondly try random node. + // Second try random node. node, err = c.nodes.Random() if err != nil { break diff --git a/cluster_test.go b/cluster_test.go index 3bedff3..a64a0f6 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -51,13 +51,19 @@ func (s *clusterScenario) addrs() []string { func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient { opt.Addrs = s.addrs() client := redis.NewClusterClient(opt) - Eventually(func() bool { + err := eventually(func() error { state, err := client.GetState() if err != nil { - return false + return err } - return state.IsConsistent() - }, 30*time.Second).Should(BeTrue()) + if !state.IsConsistent() { + return fmt.Errorf("cluster state is not conistent") + } + return nil + }, 30*time.Second) + if err != nil { + panic(err) + } return client } @@ -935,18 +941,21 @@ var _ = Describe("ClusterClient timeout", func() { //------------------------------------------------------------------------------ -func BenchmarkRedisClusterPing(b *testing.B) { - if testing.Short() { - b.Skip("skipping in short mode") - } - - cluster := &clusterScenario{ +func newClusterScenario() *clusterScenario { + return &clusterScenario{ ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, nodeIds: make([]string, 6), processes: make(map[string]*redisProcess, 6), clients: make(map[string]*redis.Client, 6), } +} +func BenchmarkRedisClusterPing(b *testing.B) { + if testing.Short() { + b.Skip("skipping in short mode") + } + + cluster := newClusterScenario() if err := startCluster(cluster); err != nil { b.Fatal(err) } @@ -959,7 +968,8 @@ func BenchmarkRedisClusterPing(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - if err := client.Ping().Err(); err != nil { + err := client.Ping().Err() + if err != nil { b.Fatal(err) } } @@ -971,13 +981,7 @@ func BenchmarkRedisClusterSetString(b *testing.B) { b.Skip("skipping in short mode") } - cluster := &clusterScenario{ - ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, - nodeIds: make([]string, 6), - processes: make(map[string]*redisProcess, 6), - clients: make(map[string]*redis.Client, 6), - } - + cluster := newClusterScenario() if err := startCluster(cluster); err != nil { b.Fatal(err) } @@ -992,9 +996,34 @@ func BenchmarkRedisClusterSetString(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - if err := client.Set("key", value, 0).Err(); err != nil { + err := client.Set("key", value, 0).Err() + if err != nil { b.Fatal(err) } } }) } + +func BenchmarkRedisClusterReloadState(b *testing.B) { + if testing.Short() { + b.Skip("skipping in short mode") + } + + cluster := newClusterScenario() + if err := startCluster(cluster); err != nil { + b.Fatal(err) + } + defer stopCluster(cluster) + + client := cluster.clusterClient(redisClusterOptions()) + defer client.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := client.ReloadState() + if err != nil { + b.Fatal(err) + } + } +} diff --git a/export_test.go b/export_test.go index fcb7fa0..060df10 100644 --- a/export_test.go +++ b/export_test.go @@ -49,7 +49,7 @@ func (c *ClusterClient) Nodes(key string) ([]*clusterNode, error) { } slot := hashtag.Slot(key) - nodes := state.slots[slot] + nodes := state.slotNodes(slot) if len(nodes) != 2 { return nil, fmt.Errorf("slot=%d does not have enough nodes: %v", slot, nodes) } diff --git a/internal/hashtag/hashtag.go b/internal/hashtag/hashtag.go index 8c7ebbf..22f5b39 100644 --- a/internal/hashtag/hashtag.go +++ b/internal/hashtag/hashtag.go @@ -5,7 +5,7 @@ import ( "strings" ) -const SlotNumber = 16384 +const slotNumber = 16384 // CRC16 implementation according to CCITT standards. // Copyright 2001-2010 Georges Menie (www.menie.org) @@ -56,7 +56,7 @@ func Key(key string) string { } func RandomSlot() int { - return rand.Intn(SlotNumber) + return rand.Intn(slotNumber) } // hashSlot returns a consistent slot number between 0 and 16383 @@ -66,7 +66,7 @@ func Slot(key string) int { return RandomSlot() } key = Key(key) - return int(crc16sum(key)) % SlotNumber + return int(crc16sum(key)) % slotNumber } func crc16sum(key string) (crc uint16) { diff --git a/main_test.go b/main_test.go index 299cd70..e49d954 100644 --- a/main_test.go +++ b/main_test.go @@ -8,7 +8,6 @@ import ( "os/exec" "path/filepath" "sync" - "sync/atomic" "testing" "time" @@ -169,24 +168,28 @@ func perform(n int, cbs ...func(int)) { } func eventually(fn func() error, timeout time.Duration) error { - var exit int32 errCh := make(chan error) done := make(chan struct{}) + exit := make(chan struct{}) go func() { - defer GinkgoRecover() - - for atomic.LoadInt32(&exit) == 0 { + for { err := fn() if err == nil { close(done) return } + select { case errCh <- err: default: } - time.Sleep(timeout / 100) + + select { + case <-exit: + return + case <-time.After(timeout / 100): + } } }() @@ -194,7 +197,7 @@ func eventually(fn func() error, timeout time.Duration) error { case <-done: return nil case <-time.After(timeout): - atomic.StoreInt32(&exit, 1) + close(exit) select { case err := <-errCh: return err diff --git a/ring.go b/ring.go index 5cbfb9b..8b20d47 100644 --- a/ring.go +++ b/ring.go @@ -170,6 +170,8 @@ type ringShards struct { func newRingShards(opt *RingOptions) *ringShards { return &ringShards{ + opt: opt, + hash: newConsistentHash(opt), shards: make(map[string]*ringShard), }