Merge pull request #819 from go-redis/fix/optimize-reload-state

cluster: optimize newClusterState
This commit is contained in:
Vladimir Mihailenco 2018-07-22 14:22:31 +03:00 committed by GitHub
commit 9748bb5648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 109 additions and 61 deletions

View File

@ -8,7 +8,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"net" "net"
"strings" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -387,12 +387,31 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterSlot struct {
start, end int
nodes []*clusterNode
}
type clusterSlotSlice []*clusterSlot
func (p clusterSlotSlice) Len() int {
return len(p)
}
func (p clusterSlotSlice) Less(i, j int) bool {
return p[i].start < p[j].start
}
func (p clusterSlotSlice) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
type clusterState struct { type clusterState struct {
nodes *clusterNodes nodes *clusterNodes
Masters []*clusterNode Masters []*clusterNode
Slaves []*clusterNode Slaves []*clusterNode
slots [][]*clusterNode slots []*clusterSlot
generation uint32 generation uint32
createdAt time.Time createdAt time.Time
@ -404,7 +423,7 @@ func newClusterState(
c := clusterState{ c := clusterState{
nodes: nodes, nodes: nodes,
slots: make([][]*clusterNode, hashtag.SlotNumber), slots: make([]*clusterSlot, 0, len(slots)),
generation: nodes.NextGeneration(), generation: nodes.NextGeneration(),
createdAt: time.Now(), createdAt: time.Now(),
@ -434,11 +453,15 @@ func newClusterState(
} }
} }
for i := slot.Start; i <= slot.End; i++ { c.slots = append(c.slots, &clusterSlot{
c.slots[i] = nodes start: slot.Start,
} end: slot.End,
nodes: nodes,
})
} }
sort.Sort(clusterSlotSlice(c.slots))
time.AfterFunc(time.Minute, func() { time.AfterFunc(time.Minute, func() {
nodes.GC(c.generation) nodes.GC(c.generation)
}) })
@ -506,8 +529,15 @@ func (c *clusterState) slotRandomNode(slot int) *clusterNode {
} }
func (c *clusterState) slotNodes(slot int) []*clusterNode { func (c *clusterState) slotNodes(slot int) []*clusterNode {
if slot >= 0 && slot < len(c.slots) { i := sort.Search(len(c.slots), func(i int) bool {
return c.slots[slot] return c.slots[i].end >= slot
})
if i >= len(c.slots) {
return nil
}
x := c.slots[i]
if slot >= x.start && slot <= x.end {
return x.nodes
} }
return nil return nil
} }
@ -516,26 +546,7 @@ func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil { if c.nodes.opt.ClusterSlots != nil {
return true return true
} }
return len(c.Masters) <= len(c.Slaves)
if len(c.Masters) > len(c.Slaves) {
return false
}
for _, master := range c.Masters {
s := master.Client.Info("replication").Val()
if !strings.Contains(s, "role:master") {
return false
}
}
for _, slave := range c.Slaves {
s := slave.Client.Info("replication").Val()
if !strings.Contains(s, "role:slave") {
return false
}
}
return true
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -563,7 +574,7 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) {
return nil, err return nil, err
} }
if !state.IsConsistent() { if !state.IsConsistent() {
c.LazyReload() time.AfterFunc(time.Second, c.LazyReload)
} }
return state, nil return state, nil
} }
@ -843,6 +854,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
} }
if internal.IsRetryableError(err, true) { if internal.IsRetryableError(err, true) {
c.state.LazyReload()
continue continue
} }
@ -929,12 +941,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
} }
if internal.IsRetryableError(err, true) { if internal.IsRetryableError(err, true) {
// Firstly retry the same node. c.state.LazyReload()
// First retry the same node.
if attempt == 0 { if attempt == 0 {
continue continue
} }
// Secondly try random node. // Second try random node.
node, err = c.nodes.Random() node, err = c.nodes.Random()
if err != nil { if err != nil {
break break

View File

@ -51,13 +51,19 @@ func (s *clusterScenario) addrs() []string {
func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient { func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
opt.Addrs = s.addrs() opt.Addrs = s.addrs()
client := redis.NewClusterClient(opt) client := redis.NewClusterClient(opt)
Eventually(func() bool { err := eventually(func() error {
state, err := client.GetState() state, err := client.GetState()
if err != nil { if err != nil {
return false return err
} }
return state.IsConsistent() if !state.IsConsistent() {
}, 30*time.Second).Should(BeTrue()) return fmt.Errorf("cluster state is not conistent")
}
return nil
}, 30*time.Second)
if err != nil {
panic(err)
}
return client return client
} }
@ -935,18 +941,21 @@ var _ = Describe("ClusterClient timeout", func() {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func BenchmarkRedisClusterPing(b *testing.B) { func newClusterScenario() *clusterScenario {
if testing.Short() { return &clusterScenario{
b.Skip("skipping in short mode")
}
cluster := &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6), nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6), processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6), clients: make(map[string]*redis.Client, 6),
} }
}
func BenchmarkRedisClusterPing(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil { if err := startCluster(cluster); err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -959,7 +968,8 @@ func BenchmarkRedisClusterPing(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
if err := client.Ping().Err(); err != nil { err := client.Ping().Err()
if err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
@ -971,13 +981,7 @@ func BenchmarkRedisClusterSetString(b *testing.B) {
b.Skip("skipping in short mode") b.Skip("skipping in short mode")
} }
cluster := &clusterScenario{ cluster := newClusterScenario()
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
if err := startCluster(cluster); err != nil { if err := startCluster(cluster); err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -992,9 +996,34 @@ func BenchmarkRedisClusterSetString(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
if err := client.Set("key", value, 0).Err(); err != nil { err := client.Set("key", value, 0).Err()
if err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
}) })
} }
func BenchmarkRedisClusterReloadState(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
cluster := newClusterScenario()
if err := startCluster(cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
client := cluster.clusterClient(redisClusterOptions())
defer client.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.ReloadState()
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -49,7 +49,7 @@ func (c *ClusterClient) Nodes(key string) ([]*clusterNode, error) {
} }
slot := hashtag.Slot(key) slot := hashtag.Slot(key)
nodes := state.slots[slot] nodes := state.slotNodes(slot)
if len(nodes) != 2 { if len(nodes) != 2 {
return nil, fmt.Errorf("slot=%d does not have enough nodes: %v", slot, nodes) return nil, fmt.Errorf("slot=%d does not have enough nodes: %v", slot, nodes)
} }

View File

@ -5,7 +5,7 @@ import (
"strings" "strings"
) )
const SlotNumber = 16384 const slotNumber = 16384
// CRC16 implementation according to CCITT standards. // CRC16 implementation according to CCITT standards.
// Copyright 2001-2010 Georges Menie (www.menie.org) // Copyright 2001-2010 Georges Menie (www.menie.org)
@ -56,7 +56,7 @@ func Key(key string) string {
} }
func RandomSlot() int { func RandomSlot() int {
return rand.Intn(SlotNumber) return rand.Intn(slotNumber)
} }
// hashSlot returns a consistent slot number between 0 and 16383 // hashSlot returns a consistent slot number between 0 and 16383
@ -66,7 +66,7 @@ func Slot(key string) int {
return RandomSlot() return RandomSlot()
} }
key = Key(key) key = Key(key)
return int(crc16sum(key)) % SlotNumber return int(crc16sum(key)) % slotNumber
} }
func crc16sum(key string) (crc uint16) { func crc16sum(key string) (crc uint16) {

View File

@ -8,7 +8,6 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -169,24 +168,28 @@ func perform(n int, cbs ...func(int)) {
} }
func eventually(fn func() error, timeout time.Duration) error { func eventually(fn func() error, timeout time.Duration) error {
var exit int32
errCh := make(chan error) errCh := make(chan error)
done := make(chan struct{}) done := make(chan struct{})
exit := make(chan struct{})
go func() { go func() {
defer GinkgoRecover() for {
for atomic.LoadInt32(&exit) == 0 {
err := fn() err := fn()
if err == nil { if err == nil {
close(done) close(done)
return return
} }
select { select {
case errCh <- err: case errCh <- err:
default: default:
} }
time.Sleep(timeout / 100)
select {
case <-exit:
return
case <-time.After(timeout / 100):
}
} }
}() }()
@ -194,7 +197,7 @@ func eventually(fn func() error, timeout time.Duration) error {
case <-done: case <-done:
return nil return nil
case <-time.After(timeout): case <-time.After(timeout):
atomic.StoreInt32(&exit, 1) close(exit)
select { select {
case err := <-errCh: case err := <-errCh:
return err return err

View File

@ -170,6 +170,8 @@ type ringShards struct {
func newRingShards(opt *RingOptions) *ringShards { func newRingShards(opt *RingOptions) *ringShards {
return &ringShards{ return &ringShards{
opt: opt,
hash: newConsistentHash(opt), hash: newConsistentHash(opt),
shards: make(map[string]*ringShard), shards: make(map[string]*ringShard),
} }