Merge pull request #96 from go-redis/fix/reload-slots-in-background

Reload slots in background goroutine.
This commit is contained in:
Vladimir Mihailenco 2015-05-05 12:44:43 +03:00
commit 2507be6cd2
4 changed files with 27 additions and 43 deletions

View File

@ -1,6 +1,7 @@
package redis package redis
import ( import (
"log"
"math/rand" "math/rand"
"strings" "strings"
"sync" "sync"
@ -21,7 +22,8 @@ type ClusterClient struct {
opt *ClusterOptions opt *ClusterOptions
_reload uint32 // Reports where slots reloading is in progress.
reloading uint32
} }
// NewClusterClient initializes a new cluster-aware client using given options. // NewClusterClient initializes a new cluster-aware client using given options.
@ -32,10 +34,9 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
slots: make([][]string, hashSlots), slots: make([][]string, hashSlots),
clients: make(map[string]*Client), clients: make(map[string]*Client),
opt: opt, opt: opt,
_reload: 1,
} }
client.commandable.process = client.process client.commandable.process = client.process
client.reloadIfDue() client.reloadSlots()
go client.reaper() go client.reaper()
return client return client
} }
@ -115,8 +116,6 @@ func (c *ClusterClient) randomClient() (client *Client, err error) {
func (c *ClusterClient) process(cmd Cmder) { func (c *ClusterClient) process(cmd Cmder) {
var ask bool var ask bool
c.reloadIfDue()
slot := hashSlot(cmd.clusterKey()) slot := hashSlot(cmd.clusterKey())
var addr string var addr string
@ -162,7 +161,7 @@ func (c *ClusterClient) process(cmd Cmder) {
moved, ask, addr = isMovedError(err) moved, ask, addr = isMovedError(err)
if moved || ask { if moved || ask {
if moved { if moved {
c.scheduleReload() c.lazyReloadSlots()
} }
client, err = c.getClient(addr) client, err = c.getClient(addr)
if err != nil { if err != nil {
@ -214,29 +213,28 @@ func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
c.slotsMx.Unlock() c.slotsMx.Unlock()
} }
// Closes all connections and reloads slot cache, if due. func (c *ClusterClient) reloadSlots() {
func (c *ClusterClient) reloadIfDue() (err error) { defer atomic.StoreUint32(&c.reloading, 0)
if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) {
return
}
client, err := c.randomClient() client, err := c.randomClient()
if err != nil { if err != nil {
return err log.Printf("redis: randomClient failed: %s", err)
return
} }
slots, err := client.ClusterSlots().Result() slots, err := client.ClusterSlots().Result()
if err != nil { if err != nil {
return err log.Printf("redis: ClusterSlots failed: %s", err)
return
} }
c.setSlots(slots) c.setSlots(slots)
return nil
} }
// Schedules slots reload on next request. func (c *ClusterClient) lazyReloadSlots() {
func (c *ClusterClient) scheduleReload() { if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
atomic.StoreUint32(&c._reload, 1) return
}
go c.reloadSlots()
} }
// reaper closes idle connections to the cluster. // reaper closes idle connections to the cluster.

View File

@ -5,12 +5,8 @@ import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
// GetSlot returns the cached slot addresses func (c *ClusterClient) SlotAddrs(slot int) []string {
func (c *ClusterClient) GetSlot(pos int) []string { return c.slotAddrs(slot)
c.slotsMx.RLock()
defer c.slotsMx.RUnlock()
return c.slots[pos]
} }
// SwapSlot swaps a slot's master/slave address // SwapSlot swaps a slot's master/slave address
@ -49,7 +45,6 @@ var _ = Describe("ClusterClient", func() {
It("should initialize", func() { It("should initialize", func() {
Expect(subject.addrs).To(HaveLen(3)) Expect(subject.addrs).To(HaveLen(3))
Expect(subject.slots).To(HaveLen(16384)) Expect(subject.slots).To(HaveLen(16384))
Expect(subject._reload).To(Equal(uint32(0)))
}) })
It("should update slots cache", func() { It("should update slots cache", func() {
@ -85,11 +80,4 @@ var _ = Describe("ClusterClient", func() {
Expect(subject.slots[16383]).To(BeEmpty()) Expect(subject.slots[16383]).To(BeEmpty())
Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed")) Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed"))
}) })
It("should check if reload is due", func() {
subject._reload = 0
Expect(subject._reload).To(Equal(uint32(0)))
subject.scheduleReload()
Expect(subject._reload).To(Equal(uint32(1)))
})
}) })

View File

@ -113,7 +113,7 @@ func (c *ClusterPipeline) execClusterCmds(
failedCmds[""] = append(failedCmds[""], cmds[i:]...) failedCmds[""] = append(failedCmds[""], cmds[i:]...)
break break
} else if moved, ask, addr := isMovedError(err); moved { } else if moved, ask, addr := isMovedError(err); moved {
c.cluster.scheduleReload() c.cluster.lazyReloadSlots()
cmd.reset() cmd.reset()
failedCmds[addr] = append(failedCmds[addr], cmd) failedCmds[addr] = append(failedCmds[addr], cmd)
} else if ask { } else if ask {

View File

@ -258,25 +258,23 @@ var _ = Describe("Cluster", func() {
It("should follow redirects", func() { It("should follow redirects", func() {
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
Expect(redis.HashSlot("A")).To(Equal(6373))
Expect(client.SwapSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) slot := redis.HashSlot("A")
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
val, err := client.Get("A").Result() val, err := client.Get("A").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("VALUE")) Expect(val).To(Equal("VALUE"))
Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
val, err = client.Get("A").Result() Eventually(func() []string {
Expect(err).NotTo(HaveOccurred()) return client.SlotAddrs(slot)
Expect(val).To(Equal("VALUE")) }).Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
}) })
It("should perform multi-pipelines", func() { It("should perform multi-pipelines", func() {
// Dummy command to load slots info.
Expect(client.Ping().Err()).NotTo(HaveOccurred())
slot := redis.HashSlot("A") slot := redis.HashSlot("A")
Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
pipe := client.Pipeline() pipe := client.Pipeline()