From 51f0a7b0a71815800e586de8307fa4253d6ac8f8 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Mon, 13 Apr 2015 14:33:44 +0100 Subject: [PATCH] Ensure slots are initialised. Return non-failing connections to pool --- cluster.go | 8 ++++++-- cluster_client_test.go | 13 ++++++++++++- redis.go | 5 +++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/cluster.go b/cluster.go index 0c405c0..43498ee 100644 --- a/cluster.go +++ b/cluster.go @@ -28,11 +28,14 @@ type ClusterClient struct { func NewClusterClient(opt *ClusterOptions) *ClusterClient { client := &ClusterClient{ addrs: opt.Addrs, + slots: make([][]string, hashSlots), clients: make(map[string]*Client), opt: opt, _reload: 1, } client.commandable.process = client.process + client.reloadIfDue() + go client.reaper(time.NewTicker(5 * time.Minute)) return client } @@ -176,14 +179,15 @@ func (c *ClusterClient) resetClients() (err error) { func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Lock() - c.slots = make([][]string, hashSlots) c.resetClients() - seen := make(map[string]struct{}) for _, addr := range c.addrs { seen[addr] = struct{}{} } + for i := 0; i < hashSlots; i++ { + c.slots[i] = c.slots[i][:0] + } for _, info := range slots { for slot := info.Start; slot <= info.End; slot++ { c.slots[slot] = info.Addrs diff --git a/cluster_client_test.go b/cluster_client_test.go index 6e44b10..84a996d 100644 --- a/cluster_client_test.go +++ b/cluster_client_test.go @@ -48,7 +48,8 @@ var _ = Describe("ClusterClient", func() { It("should initialize", func() { Expect(subject.addrs).To(HaveLen(3)) - Expect(subject._reload).To(Equal(uint32(1))) + Expect(subject.slots).To(HaveLen(16384)) + Expect(subject._reload).To(Equal(uint32(0))) }) It("should update slots cache", func() { @@ -74,6 +75,16 @@ var _ = Describe("ClusterClient", func() { })) }) + It("should close", func() { + populate() + Expect(subject.Close()).NotTo(HaveOccurred()) + Expect(subject.clients).To(BeEmpty()) + Expect(subject.slots[0]).To(BeEmpty()) + Expect(subject.slots[8191]).To(BeEmpty()) + Expect(subject.slots[8192]).To(BeEmpty()) + Expect(subject.slots[16383]).To(BeEmpty()) + }) + It("should check if reload is due", func() { subject._reload = 0 Expect(subject._reload).To(Equal(uint32(0))) diff --git a/redis.go b/redis.go index ef659a1..b8fd3e8 100644 --- a/redis.go +++ b/redis.go @@ -56,8 +56,9 @@ func (c *baseClient) initConn(cn *conn) error { func (c *baseClient) freeConn(cn *conn, ei error) error { if cn.rd.Buffered() > 0 { return c.connPool.Remove(cn) - } - if _, ok := ei.(redisError); ok { + } else if ei == nil { + return c.connPool.Put(cn) + } else if _, ok := ei.(redisError); ok { return c.connPool.Put(cn) } return c.connPool.Remove(cn)