Attempt to cleanup cluster logic.

This commit is contained in:
Vladimir Mihailenco 2015-04-04 16:46:57 +03:00
parent 8096f43489
commit 94a31f499f
4 changed files with 145 additions and 229 deletions

View File

@ -1,25 +1,36 @@
package redis package redis
import ( import (
"errors"
"io"
"math/rand" "math/rand"
"net"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
func removeDuplicates(slice []string) []string {
seen := make(map[string]struct{}, len(slice))
for i := 0; i < len(slice); {
addr := slice[i]
if _, ok := seen[addr]; ok {
slice = append(slice[:i], slice[i+1:]...)
} else {
seen[addr] = struct{}{}
i++
}
}
return slice
}
type ClusterClient struct { type ClusterClient struct {
commandable commandable
addrs map[string]struct{} addrs []string
slots [][]string slots [][]string
slotsMx sync.RWMutex // protects slots & addrs cache slotsMx sync.RWMutex // protects slots & addrs cache
conns map[string]*Client clients map[string]*Client
connsMx sync.Mutex // protects conns clientsMx sync.RWMutex // protects clients
opt *ClusterOptions opt *ClusterOptions
@ -28,96 +39,123 @@ type ClusterClient struct {
// NewClusterClient initializes a new cluster-aware client using given options. // NewClusterClient initializes a new cluster-aware client using given options.
// A list of seed addresses must be provided. // A list of seed addresses must be provided.
func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { func NewClusterClient(opt *ClusterOptions) *ClusterClient {
addrs, err := opt.getAddrSet()
if err != nil {
return nil, err
}
client := &ClusterClient{ client := &ClusterClient{
addrs: addrs, addrs: opt.getAddrs(),
conns: make(map[string]*Client), clients: make(map[string]*Client),
opt: opt, opt: opt,
_reload: 1, _reload: 1,
} }
client.commandable.process = client.process client.commandable.process = client.process
client.reloadIfDue()
go client.reaper(time.NewTicker(5 * time.Minute)) go client.reaper(time.NewTicker(5 * time.Minute))
return client, nil return client
} }
// Close closes the cluster connection // Close closes the cluster client.
func (c *ClusterClient) Close() error { func (c *ClusterClient) Close() error {
c.slotsMx.Lock() // TODO: close should make client unusable
defer c.slotsMx.Unlock() c.setSlots(nil)
return nil
return c.reset()
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Finds the current master address for a given hash slot // getClient returns a Client for a given address.
func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string { func (c *ClusterClient) getClient(addr string) *Client {
if addrs := c.slots[hashSlot]; len(addrs) > 0 { c.clientsMx.RLock()
return addrs[0] client, ok := c.clients[addr]
if ok {
c.clientsMx.RUnlock()
return client
} }
return "" c.clientsMx.RUnlock()
}
// Returns a node's client for a given address c.clientsMx.Lock()
func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { client, ok = c.clients[addr]
c.connsMx.Lock()
client, ok := c.conns[addr]
if !ok { if !ok {
opt := c.opt.clientOptions() opt := c.opt.clientOptions()
opt.Addr = addr opt.Addr = addr
client = NewTCPClient(opt) client = NewTCPClient(opt)
c.conns[addr] = client c.clients[addr] = client
} }
c.connsMx.Unlock() c.clientsMx.Unlock()
return client return client
} }
// randomClient returns a Client for the first live node.
func (c *ClusterClient) randomClient() (client *Client, err error) {
for i := 0; i < 10; i++ {
n := rand.Intn(len(c.addrs))
client = c.getClient(c.addrs[n])
err = client.Ping().Err()
if err == nil {
return client, nil
}
}
return nil, err
}
// Process a command // Process a command
func (c *ClusterClient) process(cmd Cmder) { func (c *ClusterClient) process(cmd Cmder) {
var client *Client
var ask bool var ask bool
c.reloadIfDue() c.reloadIfDue()
hashSlot := hashSlot(cmd.clusterKey()) slot := hashSlot(cmd.clusterKey())
c.slotsMx.RLock() c.slotsMx.RLock()
defer c.slotsMx.RUnlock() defer c.slotsMx.RUnlock()
tried := make(map[string]struct{}, len(c.addrs)) addrs := c.slots[slot]
addr := c.getMasterAddrBySlot(hashSlot) if len(addrs) > 0 {
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { // First address is master.
tried[addr] = struct{}{} client = c.getClient(addrs[0])
} else {
var err error
client, err = c.randomClient()
if err != nil {
cmd.setErr(err)
return
}
}
// Pick the connection, process request // Index in the addrs slice pointing to the next replica.
conn := c.getNodeClientByAddr(addr) replicaIndex := 1
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
if ask { if ask {
pipe := conn.Pipeline() pipe := client.Pipeline()
pipe.Process(NewCmd("ASKING")) pipe.Process(NewCmd("ASKING"))
pipe.Process(cmd) pipe.Process(cmd)
_, _ = pipe.Exec() _, _ = pipe.Exec()
ask = false ask = false
} else { } else {
conn.Process(cmd) client.Process(cmd)
} }
// If there is no (real) error, we are done! // If there is no (real) error, we are done!
err := cmd.Err() err := cmd.Err()
if err == nil || err == Nil { if err == nil || err == Nil || err == TxFailedErr {
return return
} }
// On connection errors, pick a random, previosuly untried connection // On network errors try another node.
// and request again. if isNetworkError(err) {
if _, ok := err.(*net.OpError); ok || err == io.EOF { if replicaIndex < len(addrs) {
if addr = c.findNextAddr(tried); addr == "" { // Try next available replica.
client = c.getClient(addrs[replicaIndex])
replicaIndex++
cmd.reset()
continue
} else {
// Otherwise try random node.
client, err = c.randomClient()
if err != nil {
return return
} }
}
cmd.reset() cmd.reset()
continue continue
} }
@ -131,11 +169,11 @@ func (c *ClusterClient) process(cmd Cmder) {
// Handle MOVE and ASK redirections, return on any other error // Handle MOVE and ASK redirections, return on any other error
switch parts[0] { switch parts[0] {
case "MOVED": case "MOVED":
c.forceReload() c.scheduleReload()
addr = parts[2] client = c.getClient(parts[2])
case "ASK": case "ASK":
ask = true ask = true
addr = parts[2] client = c.getClient(parts[2])
default: default:
return return
} }
@ -143,84 +181,60 @@ func (c *ClusterClient) process(cmd Cmder) {
} }
} }
// Closes all connections and reloads slot cache, if due // Closes all clients and returns last error if there are any.
func (c *ClusterClient) resetClients() (err error) {
c.clientsMx.Lock()
for addr, client := range c.clients {
if e := client.Close(); e != nil {
err = e
}
delete(c.clients, addr)
}
c.clientsMx.Unlock()
return err
}
func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
c.slotsMx.Lock()
c.slots = make([][]string, hashSlots)
for _, info := range slots {
for i := info.Start; i <= info.End; i++ {
c.slots[i] = info.Addrs
}
c.addrs = append(c.addrs, info.Addrs...)
}
c.addrs = removeDuplicates(c.addrs)
c.resetClients()
c.slotsMx.Unlock()
}
// Closes all connections and reloads slot cache, if due.
func (c *ClusterClient) reloadIfDue() (err error) { func (c *ClusterClient) reloadIfDue() (err error) {
if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) { if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) {
return return
} }
var infos []ClusterSlotInfo client, err := c.randomClient()
if err != nil {
c.slotsMx.Lock() return err
defer c.slotsMx.Unlock()
// Try known addresses in random order (map interation order is random in Go)
// http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections
// https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157
for addr := range c.addrs {
c.reset()
infos, err = c.fetchClusterSlots(addr)
if err == nil {
c.update(infos)
break
} }
slots, err := client.ClusterSlots().Result()
if err != nil {
return err
} }
return c.setSlots(slots)
return nil
} }
// Closes all connections and flushes slots cache // Schedules slots reload on next request.
func (c *ClusterClient) reset() (err error) { func (c *ClusterClient) scheduleReload() {
c.connsMx.Lock()
for addr, client := range c.conns {
if e := client.Close(); e != nil {
err = e
}
delete(c.conns, addr)
}
c.connsMx.Unlock()
c.slots = make([][]string, hashSlots)
return
}
// Forces a cache reload on next request
func (c *ClusterClient) forceReload() {
atomic.StoreUint32(&c._reload, 1) atomic.StoreUint32(&c._reload, 1)
} }
// Find the next untried address
func (c *ClusterClient) findNextAddr(tried map[string]struct{}) string {
for addr := range c.addrs {
if _, ok := tried[addr]; !ok {
return addr
}
}
return ""
}
// Fetch slot information
func (c *ClusterClient) fetchClusterSlots(addr string) ([]ClusterSlotInfo, error) {
opt := c.opt.clientOptions()
opt.Addr = addr
client := NewClient(opt)
defer client.Close()
return client.ClusterSlots().Result()
}
// Update slot information, populate slots
func (c *ClusterClient) update(infos []ClusterSlotInfo) {
for _, info := range infos {
for i := info.Start; i <= info.End; i++ {
c.slots[i] = info.Addrs
}
for _, addr := range info.Addrs {
c.addrs[addr] = struct{}{}
}
}
}
// reaper closes idle connections to the cluster. // reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper(ticker *time.Ticker) { func (c *ClusterClient) reaper(ticker *time.Ticker) {
for _ = range ticker.C { for _ = range ticker.C {
@ -237,8 +251,6 @@ func (c *ClusterClient) reaper(ticker *time.Ticker) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
var errNoAddrs = errors.New("redis: no addresses")
type ClusterOptions struct { type ClusterOptions struct {
// A seed-list of host:port addresses of known cluster nodes // A seed-list of host:port addresses of known cluster nodes
Addrs []string Addrs []string
@ -278,17 +290,9 @@ func (opt *ClusterOptions) getMaxRedirects() int {
return opt.MaxRedirects return opt.MaxRedirects
} }
func (opt *ClusterOptions) getAddrSet() (map[string]struct{}, error) { func (opt *ClusterOptions) getAddrs() []string {
size := len(opt.Addrs) opt.Addrs = removeDuplicates(opt.Addrs)
if size < 1 { return opt.Addrs
return nil, errNoAddrs
}
addrs := make(map[string]struct{}, size)
for _, addr := range opt.Addrs {
addrs[addr] = struct{}{}
}
return addrs, nil
} }
func (opt *ClusterOptions) clientOptions() *Options { func (opt *ClusterOptions) clientOptions() *Options {

View File

@ -1,95 +0,0 @@
package redis
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("ClusterClient", func() {
var subject *ClusterClient
var populate = func() {
subject.reset()
subject.update([]ClusterSlotInfo{
{0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}},
{12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}},
{4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}},
{8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}},
})
}
BeforeEach(func() {
var err error
subject, err = NewClusterClient(&ClusterOptions{
Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"},
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
subject.Close()
})
It("should initialize", func() {
Expect(subject.addrs).To(HaveLen(3))
Expect(subject.slots).To(HaveLen(hashSlots))
Expect(subject._reload).To(Equal(uint32(0)))
})
It("should update slots cache", func() {
populate()
Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
Expect(subject.addrs).To(Equal(map[string]struct{}{
"127.0.0.1:6379": struct{}{},
"127.0.0.1:7000": struct{}{},
"127.0.0.1:7001": struct{}{},
"127.0.0.1:7002": struct{}{},
"127.0.0.1:7003": struct{}{},
"127.0.0.1:7004": struct{}{},
"127.0.0.1:7005": struct{}{},
"127.0.0.1:7006": struct{}{},
"127.0.0.1:7007": struct{}{},
}))
})
It("should find next addresses", func() {
populate()
seen := map[string]struct{}{
"127.0.0.1:7000": struct{}{},
"127.0.0.1:7001": struct{}{},
"127.0.0.1:7003": struct{}{},
}
addr := subject.findNextAddr(seen)
for addr != "" {
seen[addr] = struct{}{}
addr = subject.findNextAddr(seen)
}
Expect(subject.findNextAddr(seen)).To(Equal(""))
Expect(seen).To(Equal(map[string]struct{}{
"127.0.0.1:6379": struct{}{},
"127.0.0.1:7000": struct{}{},
"127.0.0.1:7001": struct{}{},
"127.0.0.1:7002": struct{}{},
"127.0.0.1:7003": struct{}{},
"127.0.0.1:7004": struct{}{},
"127.0.0.1:7005": struct{}{},
"127.0.0.1:7006": struct{}{},
"127.0.0.1:7007": struct{}{},
}))
})
It("should check if reload is due", func() {
subject._reload = 0
Expect(subject._reload).To(Equal(uint32(0)))
subject.forceReload()
Expect(subject._reload).To(Equal(uint32(1)))
})
})

View File

@ -148,11 +148,9 @@ var _ = Describe("Cluster", func() {
var client *redis.ClusterClient var client *redis.ClusterClient
BeforeEach(func() { BeforeEach(func() {
var err error client = redis.NewClusterClient(&redis.ClusterOptions{
client, err = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"}, Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"},
}) })
Expect(err).NotTo(HaveOccurred())
}) })
AfterEach(func() { AfterEach(func() {

View File

@ -2,6 +2,8 @@ package redis
import ( import (
"fmt" "fmt"
"io"
"net"
) )
// Redis nil reply. // Redis nil reply.
@ -21,3 +23,10 @@ func errorf(s string, args ...interface{}) redisError {
func (err redisError) Error() string { func (err redisError) Error() string {
return err.s return err.s
} }
func isNetworkError(err error) bool {
if _, ok := err.(*net.OpError); ok || err == io.EOF {
return true
}
return false
}