From 94a31f499f5329fe852c965c32350b9ea12e82dd Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 4 Apr 2015 16:46:57 +0300 Subject: [PATCH 1/6] Attempt to cleanup cluster logic. --- cluster.go | 266 +++++++++++++++++++++-------------------- cluster_client_test.go | 95 --------------- cluster_test.go | 4 +- error.go | 9 ++ 4 files changed, 145 insertions(+), 229 deletions(-) delete mode 100644 cluster_client_test.go diff --git a/cluster.go b/cluster.go index bbb505b..6fbc774 100644 --- a/cluster.go +++ b/cluster.go @@ -1,25 +1,36 @@ package redis import ( - "errors" - "io" "math/rand" - "net" "strings" "sync" "sync/atomic" "time" ) +func removeDuplicates(slice []string) []string { + seen := make(map[string]struct{}, len(slice)) + for i := 0; i < len(slice); { + addr := slice[i] + if _, ok := seen[addr]; ok { + slice = append(slice[:i], slice[i+1:]...) + } else { + seen[addr] = struct{}{} + i++ + } + } + return slice +} + type ClusterClient struct { commandable - addrs map[string]struct{} + addrs []string slots [][]string slotsMx sync.RWMutex // protects slots & addrs cache - conns map[string]*Client - connsMx sync.Mutex // protects conns + clients map[string]*Client + clientsMx sync.RWMutex // protects clients opt *ClusterOptions @@ -28,95 +39,122 @@ type ClusterClient struct { // NewClusterClient initializes a new cluster-aware client using given options. // A list of seed addresses must be provided. -func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { - addrs, err := opt.getAddrSet() - if err != nil { - return nil, err - } - +func NewClusterClient(opt *ClusterOptions) *ClusterClient { client := &ClusterClient{ - addrs: addrs, - conns: make(map[string]*Client), + addrs: opt.getAddrs(), + clients: make(map[string]*Client), opt: opt, _reload: 1, } client.commandable.process = client.process - client.reloadIfDue() go client.reaper(time.NewTicker(5 * time.Minute)) - return client, nil + return client } -// Close closes the cluster connection +// Close closes the cluster client. func (c *ClusterClient) Close() error { - c.slotsMx.Lock() - defer c.slotsMx.Unlock() - - return c.reset() + // TODO: close should make client unusable + c.setSlots(nil) + return nil } // ------------------------------------------------------------------------ -// Finds the current master address for a given hash slot -func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string { - if addrs := c.slots[hashSlot]; len(addrs) > 0 { - return addrs[0] +// getClient returns a Client for a given address. +func (c *ClusterClient) getClient(addr string) *Client { + c.clientsMx.RLock() + client, ok := c.clients[addr] + if ok { + c.clientsMx.RUnlock() + return client } - return "" -} + c.clientsMx.RUnlock() -// Returns a node's client for a given address -func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { - c.connsMx.Lock() - client, ok := c.conns[addr] + c.clientsMx.Lock() + client, ok = c.clients[addr] if !ok { opt := c.opt.clientOptions() opt.Addr = addr client = NewTCPClient(opt) - c.conns[addr] = client + c.clients[addr] = client } - c.connsMx.Unlock() + c.clientsMx.Unlock() + return client } +// randomClient returns a Client for the first live node. +func (c *ClusterClient) randomClient() (client *Client, err error) { + for i := 0; i < 10; i++ { + n := rand.Intn(len(c.addrs)) + client = c.getClient(c.addrs[n]) + err = client.Ping().Err() + if err == nil { + return client, nil + } + } + return nil, err +} + // Process a command func (c *ClusterClient) process(cmd Cmder) { + var client *Client var ask bool c.reloadIfDue() - hashSlot := hashSlot(cmd.clusterKey()) + slot := hashSlot(cmd.clusterKey()) c.slotsMx.RLock() defer c.slotsMx.RUnlock() - tried := make(map[string]struct{}, len(c.addrs)) - addr := c.getMasterAddrBySlot(hashSlot) - for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { - tried[addr] = struct{}{} + addrs := c.slots[slot] + if len(addrs) > 0 { + // First address is master. + client = c.getClient(addrs[0]) + } else { + var err error + client, err = c.randomClient() + if err != nil { + cmd.setErr(err) + return + } + } - // Pick the connection, process request - conn := c.getNodeClientByAddr(addr) + // Index in the addrs slice pointing to the next replica. + replicaIndex := 1 + + for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { if ask { - pipe := conn.Pipeline() + pipe := client.Pipeline() pipe.Process(NewCmd("ASKING")) pipe.Process(cmd) _, _ = pipe.Exec() ask = false } else { - conn.Process(cmd) + client.Process(cmd) } // If there is no (real) error, we are done! err := cmd.Err() - if err == nil || err == Nil { + if err == nil || err == Nil || err == TxFailedErr { return } - // On connection errors, pick a random, previosuly untried connection - // and request again. - if _, ok := err.(*net.OpError); ok || err == io.EOF { - if addr = c.findNextAddr(tried); addr == "" { - return + // On network errors try another node. + if isNetworkError(err) { + if replicaIndex < len(addrs) { + // Try next available replica. + client = c.getClient(addrs[replicaIndex]) + replicaIndex++ + cmd.reset() + continue + } else { + // Otherwise try random node. + client, err = c.randomClient() + if err != nil { + return + } } cmd.reset() continue @@ -131,11 +169,11 @@ func (c *ClusterClient) process(cmd Cmder) { // Handle MOVE and ASK redirections, return on any other error switch parts[0] { case "MOVED": - c.forceReload() - addr = parts[2] + c.scheduleReload() + client = c.getClient(parts[2]) case "ASK": ask = true - addr = parts[2] + client = c.getClient(parts[2]) default: return } @@ -143,84 +181,60 @@ func (c *ClusterClient) process(cmd Cmder) { } } -// Closes all connections and reloads slot cache, if due +// Closes all clients and returns last error if there are any. +func (c *ClusterClient) resetClients() (err error) { + c.clientsMx.Lock() + for addr, client := range c.clients { + if e := client.Close(); e != nil { + err = e + } + delete(c.clients, addr) + } + c.clientsMx.Unlock() + return err +} + +func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { + c.slotsMx.Lock() + + c.slots = make([][]string, hashSlots) + for _, info := range slots { + for i := info.Start; i <= info.End; i++ { + c.slots[i] = info.Addrs + } + c.addrs = append(c.addrs, info.Addrs...) + } + c.addrs = removeDuplicates(c.addrs) + c.resetClients() + + c.slotsMx.Unlock() +} + +// Closes all connections and reloads slot cache, if due. func (c *ClusterClient) reloadIfDue() (err error) { if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) { return } - var infos []ClusterSlotInfo - - c.slotsMx.Lock() - defer c.slotsMx.Unlock() - - // Try known addresses in random order (map interation order is random in Go) - // http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections - // https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157 - for addr := range c.addrs { - c.reset() - - infos, err = c.fetchClusterSlots(addr) - if err == nil { - c.update(infos) - break - } + client, err := c.randomClient() + if err != nil { + return err } - return + + slots, err := client.ClusterSlots().Result() + if err != nil { + return err + } + c.setSlots(slots) + + return nil } -// Closes all connections and flushes slots cache -func (c *ClusterClient) reset() (err error) { - c.connsMx.Lock() - for addr, client := range c.conns { - if e := client.Close(); e != nil { - err = e - } - delete(c.conns, addr) - } - c.connsMx.Unlock() - c.slots = make([][]string, hashSlots) - return -} - -// Forces a cache reload on next request -func (c *ClusterClient) forceReload() { +// Schedules slots reload on next request. +func (c *ClusterClient) scheduleReload() { atomic.StoreUint32(&c._reload, 1) } -// Find the next untried address -func (c *ClusterClient) findNextAddr(tried map[string]struct{}) string { - for addr := range c.addrs { - if _, ok := tried[addr]; !ok { - return addr - } - } - return "" -} - -// Fetch slot information -func (c *ClusterClient) fetchClusterSlots(addr string) ([]ClusterSlotInfo, error) { - opt := c.opt.clientOptions() - opt.Addr = addr - client := NewClient(opt) - defer client.Close() - - return client.ClusterSlots().Result() -} - -// Update slot information, populate slots -func (c *ClusterClient) update(infos []ClusterSlotInfo) { - for _, info := range infos { - for i := info.Start; i <= info.End; i++ { - c.slots[i] = info.Addrs - } - - for _, addr := range info.Addrs { - c.addrs[addr] = struct{}{} - } - } -} - // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper(ticker *time.Ticker) { for _ = range ticker.C { @@ -237,8 +251,6 @@ func (c *ClusterClient) reaper(ticker *time.Ticker) { //------------------------------------------------------------------------------ -var errNoAddrs = errors.New("redis: no addresses") - type ClusterOptions struct { // A seed-list of host:port addresses of known cluster nodes Addrs []string @@ -278,17 +290,9 @@ func (opt *ClusterOptions) getMaxRedirects() int { return opt.MaxRedirects } -func (opt *ClusterOptions) getAddrSet() (map[string]struct{}, error) { - size := len(opt.Addrs) - if size < 1 { - return nil, errNoAddrs - } - - addrs := make(map[string]struct{}, size) - for _, addr := range opt.Addrs { - addrs[addr] = struct{}{} - } - return addrs, nil +func (opt *ClusterOptions) getAddrs() []string { + opt.Addrs = removeDuplicates(opt.Addrs) + return opt.Addrs } func (opt *ClusterOptions) clientOptions() *Options { diff --git a/cluster_client_test.go b/cluster_client_test.go deleted file mode 100644 index dbeb1b2..0000000 --- a/cluster_client_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package redis - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("ClusterClient", func() { - - var subject *ClusterClient - var populate = func() { - subject.reset() - subject.update([]ClusterSlotInfo{ - {0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}}, - {12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}}, - {4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}}, - {8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}}, - }) - } - - BeforeEach(func() { - var err error - subject, err = NewClusterClient(&ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"}, - }) - Expect(err).NotTo(HaveOccurred()) - }) - - AfterEach(func() { - subject.Close() - }) - - It("should initialize", func() { - Expect(subject.addrs).To(HaveLen(3)) - Expect(subject.slots).To(HaveLen(hashSlots)) - Expect(subject._reload).To(Equal(uint32(0))) - }) - - It("should update slots cache", func() { - populate() - Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) - Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) - Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) - Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) - Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) - Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) - Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) - Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) - Expect(subject.addrs).To(Equal(map[string]struct{}{ - "127.0.0.1:6379": struct{}{}, - "127.0.0.1:7000": struct{}{}, - "127.0.0.1:7001": struct{}{}, - "127.0.0.1:7002": struct{}{}, - "127.0.0.1:7003": struct{}{}, - "127.0.0.1:7004": struct{}{}, - "127.0.0.1:7005": struct{}{}, - "127.0.0.1:7006": struct{}{}, - "127.0.0.1:7007": struct{}{}, - })) - }) - - It("should find next addresses", func() { - populate() - seen := map[string]struct{}{ - "127.0.0.1:7000": struct{}{}, - "127.0.0.1:7001": struct{}{}, - "127.0.0.1:7003": struct{}{}, - } - - addr := subject.findNextAddr(seen) - for addr != "" { - seen[addr] = struct{}{} - addr = subject.findNextAddr(seen) - } - Expect(subject.findNextAddr(seen)).To(Equal("")) - Expect(seen).To(Equal(map[string]struct{}{ - "127.0.0.1:6379": struct{}{}, - "127.0.0.1:7000": struct{}{}, - "127.0.0.1:7001": struct{}{}, - "127.0.0.1:7002": struct{}{}, - "127.0.0.1:7003": struct{}{}, - "127.0.0.1:7004": struct{}{}, - "127.0.0.1:7005": struct{}{}, - "127.0.0.1:7006": struct{}{}, - "127.0.0.1:7007": struct{}{}, - })) - }) - - It("should check if reload is due", func() { - subject._reload = 0 - Expect(subject._reload).To(Equal(uint32(0))) - subject.forceReload() - Expect(subject._reload).To(Equal(uint32(1))) - }) -}) diff --git a/cluster_test.go b/cluster_test.go index ce583ad..5d12cd3 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -148,11 +148,9 @@ var _ = Describe("Cluster", func() { var client *redis.ClusterClient BeforeEach(func() { - var err error - client, err = redis.NewClusterClient(&redis.ClusterOptions{ + client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"}, }) - Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { diff --git a/error.go b/error.go index 667fffd..33159d5 100644 --- a/error.go +++ b/error.go @@ -2,6 +2,8 @@ package redis import ( "fmt" + "io" + "net" ) // Redis nil reply. @@ -21,3 +23,10 @@ func errorf(s string, args ...interface{}) redisError { func (err redisError) Error() string { return err.s } + +func isNetworkError(err error) bool { + if _, ok := err.(*net.OpError); ok || err == io.EOF { + return true + } + return false +} From 2511a1791d98b39ca017bec9cdc72e09cf0c067f Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 7 Apr 2015 12:30:06 +0300 Subject: [PATCH 2/6] Use only master node address. --- cluster.go | 56 ++++++++++++++++++++++++------------------------------ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/cluster.go b/cluster.go index 6fbc774..8790e12 100644 --- a/cluster.go +++ b/cluster.go @@ -26,11 +26,11 @@ type ClusterClient struct { commandable addrs []string - slots [][]string + slots []string slotsMx sync.RWMutex // protects slots & addrs cache clients map[string]*Client - clientsMx sync.RWMutex // protects clients + clientsMx sync.RWMutex opt *ClusterOptions @@ -106,12 +106,11 @@ func (c *ClusterClient) process(cmd Cmder) { slot := hashSlot(cmd.clusterKey()) c.slotsMx.RLock() - defer c.slotsMx.RUnlock() + masterAddr := c.slots[slot] + c.slotsMx.RUnlock() - addrs := c.slots[slot] - if len(addrs) > 0 { - // First address is master. - client = c.getClient(addrs[0]) + if masterAddr != "" { + client = c.getClient(masterAddr) } else { var err error client, err = c.randomClient() @@ -121,9 +120,6 @@ func (c *ClusterClient) process(cmd Cmder) { } } - // Index in the addrs slice pointing to the next replica. - replicaIndex := 1 - for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { if ask { pipe := client.Pipeline() @@ -141,20 +137,11 @@ func (c *ClusterClient) process(cmd Cmder) { return } - // On network errors try another node. + // On network errors try random node. if isNetworkError(err) { - if replicaIndex < len(addrs) { - // Try next available replica. - client = c.getClient(addrs[replicaIndex]) - replicaIndex++ - cmd.reset() - continue - } else { - // Otherwise try random node. - client, err = c.randomClient() - if err != nil { - return - } + client, err = c.randomClient() + if err != nil { + return } cmd.reset() continue @@ -197,16 +184,23 @@ func (c *ClusterClient) resetClients() (err error) { func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Lock() - c.slots = make([][]string, hashSlots) - for _, info := range slots { - for i := info.Start; i <= info.End; i++ { - c.slots[i] = info.Addrs - } - c.addrs = append(c.addrs, info.Addrs...) - } - c.addrs = removeDuplicates(c.addrs) + c.addrs = c.addrs[:0] + c.slots = make([]string, hashSlots) c.resetClients() + seen := make(map[string]struct{}) + for _, info := range slots { + masterAddr := info.Addrs[0] + for slot := info.Start; slot <= info.End; slot++ { + c.slots[slot] = masterAddr + } + + if _, ok := seen[masterAddr]; !ok { + c.addrs = append(c.addrs, masterAddr) + seen[masterAddr] = struct{}{} + } + } + c.slotsMx.Unlock() } From dbfd75705c3a57fc256bad0b4e8cfb121aa6a6fd Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 7 Apr 2015 12:30:57 +0300 Subject: [PATCH 3/6] Remove unused func. --- cluster.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/cluster.go b/cluster.go index 8790e12..b68c60f 100644 --- a/cluster.go +++ b/cluster.go @@ -8,20 +8,6 @@ import ( "time" ) -func removeDuplicates(slice []string) []string { - seen := make(map[string]struct{}, len(slice)) - for i := 0; i < len(slice); { - addr := slice[i] - if _, ok := seen[addr]; ok { - slice = append(slice[:i], slice[i+1:]...) - } else { - seen[addr] = struct{}{} - i++ - } - } - return slice -} - type ClusterClient struct { commandable @@ -285,7 +271,6 @@ func (opt *ClusterOptions) getMaxRedirects() int { } func (opt *ClusterOptions) getAddrs() []string { - opt.Addrs = removeDuplicates(opt.Addrs) return opt.Addrs } From 52a9dd52b73c0e44e21ceb69098796e41e2e5e2f Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 8 Apr 2015 12:28:21 +0300 Subject: [PATCH 4/6] Store addresses of replicas again. --- cluster.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/cluster.go b/cluster.go index b68c60f..fdf0f84 100644 --- a/cluster.go +++ b/cluster.go @@ -12,7 +12,7 @@ type ClusterClient struct { commandable addrs []string - slots []string + slots [][]string slotsMx sync.RWMutex // protects slots & addrs cache clients map[string]*Client @@ -90,13 +90,12 @@ func (c *ClusterClient) process(cmd Cmder) { c.reloadIfDue() slot := hashSlot(cmd.clusterKey()) - c.slotsMx.RLock() - masterAddr := c.slots[slot] + addrs := c.slots[slot] c.slotsMx.RUnlock() - if masterAddr != "" { - client = c.getClient(masterAddr) + if len(addrs) > 0 { + client = c.getClient(addrs[0]) // First address is master. } else { var err error client, err = c.randomClient() @@ -170,20 +169,24 @@ func (c *ClusterClient) resetClients() (err error) { func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Lock() - c.addrs = c.addrs[:0] - c.slots = make([]string, hashSlots) + c.slots = make([][]string, hashSlots) c.resetClients() seen := make(map[string]struct{}) + for _, addr := range c.addrs { + seen[addr] = struct{}{} + } + for _, info := range slots { - masterAddr := info.Addrs[0] for slot := info.Start; slot <= info.End; slot++ { - c.slots[slot] = masterAddr + c.slots[slot] = info.Addrs } - if _, ok := seen[masterAddr]; !ok { - c.addrs = append(c.addrs, masterAddr) - seen[masterAddr] = struct{}{} + for _, addr := range info.Addrs { + if _, ok := seen[addr]; !ok { + c.addrs = append(c.addrs, addr) + seen[addr] = struct{}{} + } } } @@ -218,7 +221,7 @@ func (c *ClusterClient) scheduleReload() { // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper(ticker *time.Ticker) { for _ = range ticker.C { - for _, client := range c.conns { + for _, client := range c.clients { pool := client.connPool // pool.First removes idle connections from the pool for us. So // just put returned connection back. From fc0565a85b09e4e3223643e5a9f135b51a207029 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 8 Apr 2015 12:28:50 +0300 Subject: [PATCH 5/6] Remove getAddrs. --- cluster.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cluster.go b/cluster.go index fdf0f84..e0f2b40 100644 --- a/cluster.go +++ b/cluster.go @@ -27,7 +27,7 @@ type ClusterClient struct { // A list of seed addresses must be provided. func NewClusterClient(opt *ClusterOptions) *ClusterClient { client := &ClusterClient{ - addrs: opt.getAddrs(), + addrs: opt.Addrs, clients: make(map[string]*Client), opt: opt, _reload: 1, @@ -273,10 +273,6 @@ func (opt *ClusterOptions) getMaxRedirects() int { return opt.MaxRedirects } -func (opt *ClusterOptions) getAddrs() []string { - return opt.Addrs -} - func (opt *ClusterOptions) clientOptions() *Options { return &Options{ DB: 0, From 58c7dacf5be97bc5411a409ce3d77e12fdcd437e Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 8 Apr 2015 12:40:45 +0300 Subject: [PATCH 6/6] Restore cluster_client_test.go --- cluster_client_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 cluster_client_test.go diff --git a/cluster_client_test.go b/cluster_client_test.go new file mode 100644 index 0000000..ad5c101 --- /dev/null +++ b/cluster_client_test.go @@ -0,0 +1,66 @@ +package redis + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("ClusterClient", func() { + var subject *ClusterClient + + var populate = func() { + subject.setSlots([]ClusterSlotInfo{ + {0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}}, + {12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}}, + {4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}}, + {8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}}, + }) + } + + BeforeEach(func() { + var err error + subject = NewClusterClient(&ClusterOptions{ + Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"}, + }) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + subject.Close() + }) + + It("should initialize", func() { + Expect(subject.addrs).To(HaveLen(3)) + Expect(subject._reload).To(Equal(uint32(1))) + }) + + It("should update slots cache", func() { + populate() + Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) + Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) + Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) + Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) + Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) + Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) + Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) + Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) + Expect(subject.addrs).To(Equal([]string{ + "127.0.0.1:6379", + "127.0.0.1:7003", + "127.0.0.1:7006", + "127.0.0.1:7000", + "127.0.0.1:7004", + "127.0.0.1:7007", + "127.0.0.1:7001", + "127.0.0.1:7005", + "127.0.0.1:7002", + })) + }) + + It("should check if reload is due", func() { + subject._reload = 0 + Expect(subject._reload).To(Equal(uint32(0))) + subject.scheduleReload() + Expect(subject._reload).To(Equal(uint32(1))) + }) +})