diff --git a/cluster.go b/cluster.go index f5fbaf7..feecf80 100644 --- a/cluster.go +++ b/cluster.go @@ -1,6 +1,7 @@ package redis import ( + "log" "math/rand" "strings" "sync" @@ -21,7 +22,8 @@ type ClusterClient struct { opt *ClusterOptions - _reload uint32 + // Reports where slots reloading is in progress. + reloading uint32 } // NewClusterClient initializes a new cluster-aware client using given options. @@ -32,10 +34,9 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { slots: make([][]string, hashSlots), clients: make(map[string]*Client), opt: opt, - _reload: 1, } client.commandable.process = client.process - client.reloadIfDue() + client.reloadSlots() go client.reaper() return client } @@ -115,8 +116,6 @@ func (c *ClusterClient) randomClient() (client *Client, err error) { func (c *ClusterClient) process(cmd Cmder) { var ask bool - c.reloadIfDue() - slot := hashSlot(cmd.clusterKey()) var addr string @@ -162,7 +161,7 @@ func (c *ClusterClient) process(cmd Cmder) { moved, ask, addr = isMovedError(err) if moved || ask { if moved { - c.scheduleReload() + c.lazyReloadSlots() } client, err = c.getClient(addr) if err != nil { @@ -214,29 +213,28 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Unlock() } -// Closes all connections and reloads slot cache, if due. -func (c *ClusterClient) reloadIfDue() (err error) { - if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) { - return - } +func (c *ClusterClient) reloadSlots() { + defer atomic.StoreUint32(&c.reloading, 0) client, err := c.randomClient() if err != nil { - return err + log.Printf("redis: randomClient failed: %s", err) + return } slots, err := client.ClusterSlots().Result() if err != nil { - return err + log.Printf("redis: ClusterSlots failed: %s", err) + return } c.setSlots(slots) - - return nil } -// Schedules slots reload on next request. -func (c *ClusterClient) scheduleReload() { - atomic.StoreUint32(&c._reload, 1) +func (c *ClusterClient) lazyReloadSlots() { + if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { + return + } + go c.reloadSlots() } // reaper closes idle connections to the cluster. diff --git a/cluster_client_test.go b/cluster_client_test.go index 3f23368..e9ea16c 100644 --- a/cluster_client_test.go +++ b/cluster_client_test.go @@ -5,12 +5,8 @@ import ( . "github.com/onsi/gomega" ) -// GetSlot returns the cached slot addresses -func (c *ClusterClient) GetSlot(pos int) []string { - c.slotsMx.RLock() - defer c.slotsMx.RUnlock() - - return c.slots[pos] +func (c *ClusterClient) SlotAddrs(slot int) []string { + return c.slotAddrs(slot) } // SwapSlot swaps a slot's master/slave address @@ -49,7 +45,6 @@ var _ = Describe("ClusterClient", func() { It("should initialize", func() { Expect(subject.addrs).To(HaveLen(3)) Expect(subject.slots).To(HaveLen(16384)) - Expect(subject._reload).To(Equal(uint32(0))) }) It("should update slots cache", func() { @@ -85,11 +80,4 @@ var _ = Describe("ClusterClient", func() { Expect(subject.slots[16383]).To(BeEmpty()) Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed")) }) - - It("should check if reload is due", func() { - subject._reload = 0 - Expect(subject._reload).To(Equal(uint32(0))) - subject.scheduleReload() - Expect(subject._reload).To(Equal(uint32(1))) - }) }) diff --git a/cluster_pipeline.go b/cluster_pipeline.go index 780477c..2ddc064 100644 --- a/cluster_pipeline.go +++ b/cluster_pipeline.go @@ -113,7 +113,7 @@ func (c *ClusterPipeline) execClusterCmds( failedCmds[""] = append(failedCmds[""], cmds[i:]...) break } else if moved, ask, addr := isMovedError(err); moved { - c.cluster.scheduleReload() + c.cluster.lazyReloadSlots() cmd.reset() failedCmds[addr] = append(failedCmds[addr], cmd) } else if ask { diff --git a/cluster_test.go b/cluster_test.go index 32022ed..6b06687 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -258,25 +258,23 @@ var _ = Describe("Cluster", func() { It("should follow redirects", func() { Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) - Expect(redis.HashSlot("A")).To(Equal(6373)) - Expect(client.SwapSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + + slot := redis.HashSlot("A") + Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) val, err := client.Get("A").Result() Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal("VALUE")) - Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) - val, err = client.Get("A").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("VALUE")) - Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) + Eventually(func() []string { + return client.SlotAddrs(slot) + }).Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) }) It("should perform multi-pipelines", func() { - // Dummy command to load slots info. - Expect(client.Ping().Err()).NotTo(HaveOccurred()) - slot := redis.HashSlot("A") + Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) pipe := client.Pipeline()