diff --git a/.travis.yml b/.travis.yml index 169ccd0..3af4dd9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ go: install: - go get gopkg.in/bufio.v1 - go get gopkg.in/bsm/ratelimit.v1 + - go get github.com/golang/groupcache/consistenthash - go get github.com/onsi/ginkgo - go get github.com/onsi/gomega - mkdir -p $HOME/gopath/src/gopkg.in diff --git a/cluster.go b/cluster.go index 2d46ca7..99d3eeb 100644 --- a/cluster.go +++ b/cluster.go @@ -307,7 +307,6 @@ func (opt *ClusterOptions) getMaxRedirects() int { func (opt *ClusterOptions) clientOptions() *Options { return &Options{ - DB: 0, Password: opt.Password, DialTimeout: opt.DialTimeout, @@ -324,14 +323,19 @@ func (opt *ClusterOptions) clientOptions() *Options { const hashSlots = 16384 -// hashSlot returns a consistent slot number between 0 and 16383 -// for any given string key. -func hashSlot(key string) int { +func hashKey(key string) string { if s := strings.IndexByte(key, '{'); s > -1 { if e := strings.IndexByte(key[s+1:], '}'); e > 0 { key = key[s+1 : s+e+1] } } + return key +} + +// hashSlot returns a consistent slot number between 0 and 16383 +// for any given string key. +func hashSlot(key string) int { + key = hashKey(key) if key == "" { return rand.Intn(hashSlots) } diff --git a/main_test.go b/main_test.go index 57eb493..ed2f7de 100644 --- a/main_test.go +++ b/main_test.go @@ -23,6 +23,11 @@ const ( redisSecondaryPort = "6381" ) +const ( + ringShard1Port = "6390" + ringShard2Port = "6391" +) + const ( sentinelName = "mymaster" sentinelMasterPort = "8123" @@ -31,7 +36,11 @@ const ( sentinelPort = "8126" ) -var redisMain, sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess +var ( + redisMain *redisProcess + ringShard1, ringShard2 *redisProcess + sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess +) var cluster = &clusterScenario{ ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, @@ -46,6 +55,12 @@ var _ = BeforeSuite(func() { redisMain, err = startRedis(redisPort) Expect(err).NotTo(HaveOccurred()) + ringShard1, err = startRedis(ringShard1Port) + Expect(err).NotTo(HaveOccurred()) + + ringShard2, err = startRedis(ringShard2Port) + Expect(err).NotTo(HaveOccurred()) + sentinelMaster, err = startRedis(sentinelMasterPort) Expect(err).NotTo(HaveOccurred()) @@ -65,6 +80,8 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { Expect(redisMain.Close()).NotTo(HaveOccurred()) + Expect(ringShard1.Close()).NotTo(HaveOccurred()) + Expect(ringShard2.Close()).NotTo(HaveOccurred()) Expect(sentinel.Close()).NotTo(HaveOccurred()) Expect(sentinelSlave1.Close()).NotTo(HaveOccurred()) diff --git a/ring.go b/ring.go new file mode 100644 index 0000000..0735b16 --- /dev/null +++ b/ring.go @@ -0,0 +1,237 @@ +package redis + +import ( + "errors" + "fmt" + "log" + "sync" + "time" + + "github.com/golang/groupcache/consistenthash" +) + +var ( + errRingShardsDown = errors.New("redis: all ring shards are down") +) + +// RingOptions are used to configure a ring client and should be +// passed to NewRing. +type RingOptions struct { + // A map of name => host:port addresses of ring shards. + Addrs map[string]string + + // Following options are copied from Options struct. + + DB int64 + Password string + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + PoolSize int + PoolTimeout time.Duration + IdleTimeout time.Duration +} + +func (opt *RingOptions) clientOptions() *Options { + return &Options{ + DB: opt.DB, + Password: opt.Password, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + } +} + +type ringShard struct { + Client *Client + down int +} + +func (shard *ringShard) String() string { + var state string + if shard.IsUp() { + state = "up" + } else { + state = "down" + } + return fmt.Sprintf("%s is %s", shard.Client, state) +} + +func (shard *ringShard) IsDown() bool { + const threshold = 5 + return shard.down >= threshold +} + +func (shard *ringShard) IsUp() bool { + return !shard.IsDown() +} + +// Vote votes to set shard state and returns true if state was changed. +func (shard *ringShard) Vote(up bool) bool { + if up { + changed := shard.IsDown() + shard.down = 0 + return changed + } + + if shard.IsDown() { + return false + } + + shard.down++ + return shard.IsDown() +} + +// Ring is a Redis client that uses constistent hashing to distribute +// keys across multiple Redis servers (shards). +// +// It monitors the state of each shard and removes dead shards from +// the ring. When shard comes online it is added back to the ring. This +// gives you maximum availability and partition tolerance, but no +// consistency between different shards or even clients. Each client +// uses shards that are available to the client and does not do any +// coordination when shard state is changed. +// +// Ring should be used when you use multiple Redis servers for caching +// and can tolerate losing data when one of the servers dies. +// Otherwise you should use Redis Cluster. +type Ring struct { + commandable + + nreplicas int + + mx sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard + + closed bool +} + +func NewRing(opt *RingOptions) *Ring { + const nreplicas = 100 + ring := &Ring{ + nreplicas: nreplicas, + hash: consistenthash.New(nreplicas, nil), + shards: make(map[string]*ringShard), + } + ring.commandable.process = ring.process + for name, addr := range opt.Addrs { + clopt := opt.clientOptions() + clopt.Addr = addr + ring.addClient(name, NewClient(clopt)) + } + go ring.heartbeat() + return ring +} + +func (ring *Ring) addClient(name string, cl *Client) { + ring.mx.Lock() + ring.hash.Add(name) + ring.shards[name] = &ringShard{Client: cl} + ring.mx.Unlock() +} + +func (ring *Ring) getClient(key string) (*Client, error) { + ring.mx.RLock() + + if ring.closed { + return nil, errClosed + } + + name := ring.hash.Get(key) + if name == "" { + ring.mx.RUnlock() + return nil, errRingShardsDown + } + + if shard, ok := ring.shards[name]; ok { + ring.mx.RUnlock() + return shard.Client, nil + } + + ring.mx.RUnlock() + return nil, errRingShardsDown +} + +func (ring *Ring) process(cmd Cmder) { + cl, err := ring.getClient(hashKey(cmd.clusterKey())) + if err != nil { + cmd.setErr(err) + return + } + cl.baseClient.process(cmd) +} + +// rebalance removes dead shards from the ring. +func (ring *Ring) rebalance() { + defer ring.mx.Unlock() + ring.mx.Lock() + + ring.hash = consistenthash.New(ring.nreplicas, nil) + for name, shard := range ring.shards { + if shard.IsUp() { + ring.hash.Add(name) + } + } +} + +// heartbeat monitors state of each shard in the ring. +func (ring *Ring) heartbeat() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for _ = range ticker.C { + var rebalance bool + + ring.mx.RLock() + + if ring.closed { + ring.mx.RUnlock() + break + } + + for _, shard := range ring.shards { + err := shard.Client.Ping().Err() + if shard.Vote(err == nil) { + log.Printf("redis: ring shard state changed: %s", shard) + rebalance = true + } + } + + ring.mx.RUnlock() + + if rebalance { + ring.rebalance() + } + } +} + +// Close closes the ring client, releasing any open resources. +// +// It is rare to Close a Client, as the Client is meant to be +// long-lived and shared between many goroutines. +func (ring *Ring) Close() (retErr error) { + defer ring.mx.Unlock() + ring.mx.Lock() + + if ring.closed { + return nil + } + ring.closed = true + + for _, shard := range ring.shards { + if err := shard.Client.Close(); err != nil { + retErr = err + } + } + ring.hash = nil + ring.shards = nil + + return retErr +} diff --git a/ring_test.go b/ring_test.go new file mode 100644 index 0000000..35212c3 --- /dev/null +++ b/ring_test.go @@ -0,0 +1,84 @@ +package redis_test + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "gopkg.in/redis.v3" +) + +var _ = Describe("Redis ring", func() { + var ring *redis.Ring + + setRingKeys := func() { + for i := 0; i < 100; i++ { + err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + } + + BeforeEach(func() { + ring = redis.NewRing(&redis.RingOptions{ + Addrs: map[string]string{ + "ringShard1": ":" + ringShard1Port, + "ringShard2": ":" + ringShard2Port, + }, + }) + + // Shards should not have any keys. + Expect(ringShard1.FlushDb().Err()).NotTo(HaveOccurred()) + Expect(ringShard1.Info().Val()).NotTo(ContainSubstring("keys=")) + + Expect(ringShard2.FlushDb().Err()).NotTo(HaveOccurred()) + Expect(ringShard2.Info().Val()).NotTo(ContainSubstring("keys=")) + }) + + AfterEach(func() { + Expect(ring.Close()).NotTo(HaveOccurred()) + }) + + It("uses both shards", func() { + setRingKeys() + + // Both shards should have some keys now. + Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57")) + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) + }) + + It("uses one shard when other shard is down", func() { + // Stop ringShard2. + Expect(ringShard2.Close()).NotTo(HaveOccurred()) + + // Ring needs 5 * heartbeat time to detect that node is down. + // Give it more to be sure. + heartbeat := 100 * time.Millisecond + time.Sleep(5*heartbeat + heartbeat) + + setRingKeys() + + // RingShard1 should have all keys. + Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=100")) + + // Start ringShard2. + var err error + ringShard2, err = startRedis(ringShard2Port) + Expect(err).NotTo(HaveOccurred()) + + // Wait for ringShard2 to come up. + Eventually(func() error { + return ringShard2.Ping().Err() + }, "1s").ShouldNot(HaveOccurred()) + + // Ring needs heartbeat time to detect that node is up. + // Give it more to be sure. + time.Sleep(heartbeat + heartbeat) + + setRingKeys() + + // RingShard2 should have its keys. + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) + }) +})