Merge pull request #117 from go-redis/feature/redis-ring

Add Redis Ring.
This commit is contained in:
Vladimir Mihailenco 2015-06-04 10:12:04 +03:00
commit 7ce4387ff8
13 changed files with 606 additions and 53 deletions

View File

@ -1,9 +1,9 @@
all: testdeps
go test ./... -v 1 -ginkgo.slowSpecThreshold=10 -cpu=1,2,4
go test ./... -ginkgo.slowSpecThreshold=10 -short -race
go test ./... -v=1 -cpu=1,2,4
go test ./... -short -race
test: testdeps
go test ./... -v 1 -ginkgo.slowSpecThreshold=10
go test ./... -v=1
testdeps: .test/redis/src/redis-server

View File

@ -307,7 +307,6 @@ func (opt *ClusterOptions) getMaxRedirects() int {
func (opt *ClusterOptions) clientOptions() *Options {
return &Options{
DB: 0,
Password: opt.Password,
DialTimeout: opt.DialTimeout,
@ -324,14 +323,19 @@ func (opt *ClusterOptions) clientOptions() *Options {
const hashSlots = 16384
// hashSlot returns a consistent slot number between 0 and 16383
// for any given string key.
func hashSlot(key string) int {
func hashKey(key string) string {
if s := strings.IndexByte(key, '{'); s > -1 {
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
key = key[s+1 : s+e+1]
}
}
return key
}
// hashSlot returns a consistent slot number between 0 and 16383
// for any given string key.
func hashSlot(key string) int {
key = hashKey(key)
if key == "" {
return rand.Intn(hashSlots)
}

View File

@ -252,7 +252,6 @@ var _ = Describe("Cluster", func() {
val, err := client.Get("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("VALUE"))
Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
Eventually(func() []string {
return client.SlotAddrs(slot)

View File

@ -369,8 +369,7 @@ var _ = Describe("Commands", func() {
pttl := client.PTTL("key")
Expect(pttl.Err()).NotTo(HaveOccurred())
Expect(pttl.Val() <= expiration).To(Equal(true))
Expect(pttl.Val() >= expiration-time.Millisecond).To(Equal(true))
Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond))
})
It("should PExpireAt", func() {
@ -389,8 +388,7 @@ var _ = Describe("Commands", func() {
pttl := client.PTTL("key")
Expect(pttl.Err()).NotTo(HaveOccurred())
Expect(pttl.Val() <= expiration).To(Equal(true))
Expect(pttl.Val() >= expiration-time.Millisecond).To(Equal(true))
Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond))
})
It("should PTTL", func() {
@ -405,8 +403,7 @@ var _ = Describe("Commands", func() {
pttl := client.PTTL("key")
Expect(pttl.Err()).NotTo(HaveOccurred())
Expect(pttl.Val() <= expiration).To(Equal(true))
Expect(pttl.Val() >= expiration-time.Millisecond).To(Equal(true))
Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond))
})
It("should RandomKey", func() {

View File

@ -0,0 +1,81 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package consistenthash provides an implementation of a ring hash.
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
type Hash func(data []byte) uint32
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
return len(m.keys) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
// Means we have cycled back to the first replica.
if idx == len(m.keys) {
idx = 0
}
return m.hashMap[m.keys[idx]]
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistenthash
import (
"fmt"
"strconv"
"testing"
)
func TestHashing(t *testing.T) {
// Override the hash function to return easier to reason about values. Assumes
// the keys can be converted to an integer.
hash := New(3, func(key []byte) uint32 {
i, err := strconv.Atoi(string(key))
if err != nil {
panic(err)
}
return uint32(i)
})
// Given the above hash function, this will give replicas with "hashes":
// 2, 4, 6, 12, 14, 16, 22, 24, 26
hash.Add("6", "4", "2")
testCases := map[string]string{
"2": "2",
"11": "2",
"23": "4",
"27": "2",
}
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
// Adds 8, 18, 28
hash.Add("8")
// 27 should now map to 8.
testCases["27"] = "8"
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
}
func TestConsistency(t *testing.T) {
hash1 := New(1, nil)
hash2 := New(1, nil)
hash1.Add("Bill", "Bob", "Bonny")
hash2.Add("Bob", "Bonny", "Bill")
if hash1.Get("Ben") != hash2.Get("Ben") {
t.Errorf("Fetching 'Ben' from both hashes should be the same")
}
hash2.Add("Becky", "Ben", "Bobby")
if hash1.Get("Ben") != hash2.Get("Ben") ||
hash1.Get("Bob") != hash2.Get("Bob") ||
hash1.Get("Bonny") != hash2.Get("Bonny") {
t.Errorf("Direct matches should always return the same entry")
}
}
func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }
func benchmarkGet(b *testing.B, shards int) {
hash := New(50, nil)
var buckets []string
for i := 0; i < shards; i++ {
buckets = append(buckets, fmt.Sprintf("shard-%d", i))
}
hash.Add(buckets...)
b.ResetTimer()
for i := 0; i < b.N; i++ {
hash.Get(buckets[i&(shards-1)])
}
}

View File

@ -23,6 +23,11 @@ const (
redisSecondaryPort = "6381"
)
const (
ringShard1Port = "6390"
ringShard2Port = "6391"
)
const (
sentinelName = "mymaster"
sentinelMasterPort = "8123"
@ -31,7 +36,11 @@ const (
sentinelPort = "8126"
)
var redisMain, sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
var (
redisMain *redisProcess
ringShard1, ringShard2 *redisProcess
sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
)
var cluster = &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
@ -46,6 +55,12 @@ var _ = BeforeSuite(func() {
redisMain, err = startRedis(redisPort)
Expect(err).NotTo(HaveOccurred())
ringShard1, err = startRedis(ringShard1Port)
Expect(err).NotTo(HaveOccurred())
ringShard2, err = startRedis(ringShard2Port)
Expect(err).NotTo(HaveOccurred())
sentinelMaster, err = startRedis(sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
@ -66,6 +81,9 @@ var _ = BeforeSuite(func() {
var _ = AfterSuite(func() {
Expect(redisMain.Close()).NotTo(HaveOccurred())
Expect(ringShard1.Close()).NotTo(HaveOccurred())
Expect(ringShard2.Close()).NotTo(HaveOccurred())
Expect(sentinel.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())

View File

@ -3,6 +3,7 @@ package redis
import (
"errors"
"fmt"
"log"
)
var errDiscard = errors.New("redis: Discard can be used only inside Exec")
@ -18,7 +19,10 @@ type Multi struct {
func (c *Client) Multi() *Multi {
multi := &Multi{
base: &baseClient{opt: c.opt, connPool: newSingleConnPool(c.connPool, true)},
base: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, true),
},
}
multi.commandable.process = multi.process
return multi
@ -34,7 +38,7 @@ func (c *Multi) process(cmd Cmder) {
func (c *Multi) Close() error {
if err := c.Unwatch().Err(); err != nil {
return err
log.Printf("redis: Unwatch failed: %s", err)
}
return c.base.Close()
}

66
pool.go
View File

@ -258,10 +258,12 @@ func (p *connPool) Remove(cn *conn) error {
// Replace existing connection with new one and unblock waiter.
newcn, err := p.new()
if err != nil {
log.Printf("redis: new failed: %s", err)
return p.conns.Remove(cn)
}
err = p.conns.Replace(cn, newcn)
p.freeConns <- newcn
return p.conns.Replace(cn, newcn)
return err
}
// Len returns total number of connections.
@ -278,17 +280,13 @@ func (p *connPool) Close() (retErr error) {
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
return errClosed
}
// First close free connections.
for p.Len() > 0 {
cn := p.wait()
if cn == nil {
// Wait for app to free connections, but don't close them immediately.
for i := 0; i < p.Len(); i++ {
if cn := p.wait(); cn == nil {
break
}
if err := p.conns.Remove(cn); err != nil {
retErr = err
}
}
// Then close the rest.
// Close all connections.
if err := p.conns.Close(); err != nil {
retErr = err
}
@ -317,13 +315,11 @@ func (p *connPool) reaper() {
type singleConnPool struct {
pool pool
cnMtx sync.Mutex
cn *conn
reusable bool
cn *conn
closed bool
mx sync.Mutex
}
func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
@ -334,20 +330,24 @@ func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
}
func (p *singleConnPool) SetConn(cn *conn) {
p.cnMtx.Lock()
p.mx.Lock()
if p.cn != nil {
panic("p.cn != nil")
}
p.cn = cn
p.cnMtx.Unlock()
p.mx.Unlock()
}
func (p *singleConnPool) First() *conn {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
return p.cn
p.mx.Lock()
cn := p.cn
p.mx.Unlock()
return cn
}
func (p *singleConnPool) Get() (*conn, error) {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
return nil, errClosed
@ -366,8 +366,8 @@ func (p *singleConnPool) Get() (*conn, error) {
}
func (p *singleConnPool) Put(cn *conn) error {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
defer p.mx.Unlock()
p.mx.Lock()
if p.cn != cn {
panic("p.cn != cn")
}
@ -378,8 +378,8 @@ func (p *singleConnPool) Put(cn *conn) error {
}
func (p *singleConnPool) Remove(cn *conn) error {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
panic("p.cn == nil")
}
@ -399,8 +399,8 @@ func (p *singleConnPool) remove() error {
}
func (p *singleConnPool) Len() int {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
return 0
}
@ -408,19 +408,19 @@ func (p *singleConnPool) Len() int {
}
func (p *singleConnPool) FreeLen() int {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
return 0
}
return 1
}
return 0
}
func (p *singleConnPool) Close() error {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
return nil
return errClosed
}
p.closed = true
var err error

View File

@ -101,8 +101,9 @@ var _ = Describe("Pool", func() {
})
pool := client.Pool()
Expect(pool.Len()).To(Equal(10))
Expect(pool.FreeLen()).To(Equal(10))
Expect(pool.Len()).To(BeNumerically("<=", 10))
Expect(pool.FreeLen()).To(BeNumerically("<=", 10))
Expect(pool.Len()).To(Equal(pool.FreeLen()))
})
It("should remove broken connections", func() {

View File

@ -88,6 +88,24 @@ var _ = Describe("Client", func() {
Expect(client.Ping().Err()).NotTo(HaveOccurred())
})
It("should close pubsub when client is closed", func() {
pubsub := client.PubSub()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred())
})
It("should close multi when client is closed", func() {
multi := client.Multi()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(multi.Close()).NotTo(HaveOccurred())
})
It("should close pipeline when client is closed", func() {
pipeline := client.Pipeline()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pipeline.Close()).NotTo(HaveOccurred())
})
It("should support idle-timeouts", func() {
idle := redis.NewClient(&redis.Options{
Addr: redisAddr,

237
ring.go Normal file
View File

@ -0,0 +1,237 @@
package redis
import (
"errors"
"fmt"
"log"
"sync"
"time"
"gopkg.in/redis.v3/internal/consistenthash"
)
var (
errRingShardsDown = errors.New("redis: all ring shards are down")
)
// RingOptions are used to configure a ring client and should be
// passed to NewRing.
type RingOptions struct {
// A map of name => host:port addresses of ring shards.
Addrs map[string]string
// Following options are copied from Options struct.
DB int64
Password string
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
PoolTimeout time.Duration
IdleTimeout time.Duration
}
func (opt *RingOptions) clientOptions() *Options {
return &Options{
DB: opt.DB,
Password: opt.Password,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
}
}
type ringShard struct {
Client *Client
down int
}
func (shard *ringShard) String() string {
var state string
if shard.IsUp() {
state = "up"
} else {
state = "down"
}
return fmt.Sprintf("%s is %s", shard.Client, state)
}
func (shard *ringShard) IsDown() bool {
const threshold = 5
return shard.down >= threshold
}
func (shard *ringShard) IsUp() bool {
return !shard.IsDown()
}
// Vote votes to set shard state and returns true if state was changed.
func (shard *ringShard) Vote(up bool) bool {
if up {
changed := shard.IsDown()
shard.down = 0
return changed
}
if shard.IsDown() {
return false
}
shard.down++
return shard.IsDown()
}
// Ring is a Redis client that uses constistent hashing to distribute
// keys across multiple Redis servers (shards).
//
// It monitors the state of each shard and removes dead shards from
// the ring. When shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
// coordination when shard state is changed.
//
// Ring should be used when you use multiple Redis servers for caching
// and can tolerate losing data when one of the servers dies.
// Otherwise you should use Redis Cluster.
type Ring struct {
commandable
nreplicas int
mx sync.RWMutex
hash *consistenthash.Map
shards map[string]*ringShard
closed bool
}
func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
ring := &Ring{
nreplicas: nreplicas,
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
ring.commandable.process = ring.process
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
ring.addClient(name, NewClient(clopt))
}
go ring.heartbeat()
return ring
}
func (ring *Ring) addClient(name string, cl *Client) {
ring.mx.Lock()
ring.hash.Add(name)
ring.shards[name] = &ringShard{Client: cl}
ring.mx.Unlock()
}
func (ring *Ring) getClient(key string) (*Client, error) {
ring.mx.RLock()
if ring.closed {
return nil, errClosed
}
name := ring.hash.Get(key)
if name == "" {
ring.mx.RUnlock()
return nil, errRingShardsDown
}
if shard, ok := ring.shards[name]; ok {
ring.mx.RUnlock()
return shard.Client, nil
}
ring.mx.RUnlock()
return nil, errRingShardsDown
}
func (ring *Ring) process(cmd Cmder) {
cl, err := ring.getClient(hashKey(cmd.clusterKey()))
if err != nil {
cmd.setErr(err)
return
}
cl.baseClient.process(cmd)
}
// rebalance removes dead shards from the ring.
func (ring *Ring) rebalance() {
defer ring.mx.Unlock()
ring.mx.Lock()
ring.hash = consistenthash.New(ring.nreplicas, nil)
for name, shard := range ring.shards {
if shard.IsUp() {
ring.hash.Add(name)
}
}
}
// heartbeat monitors state of each shard in the ring.
func (ring *Ring) heartbeat() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for _ = range ticker.C {
var rebalance bool
ring.mx.RLock()
if ring.closed {
ring.mx.RUnlock()
break
}
for _, shard := range ring.shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil) {
log.Printf("redis: ring shard state changed: %s", shard)
rebalance = true
}
}
ring.mx.RUnlock()
if rebalance {
ring.rebalance()
}
}
}
// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (ring *Ring) Close() (retErr error) {
defer ring.mx.Unlock()
ring.mx.Lock()
if ring.closed {
return nil
}
ring.closed = true
for _, shard := range ring.shards {
if err := shard.Client.Close(); err != nil {
retErr = err
}
}
ring.hash = nil
ring.shards = nil
return retErr
}

84
ring_test.go Normal file
View File

@ -0,0 +1,84 @@
package redis_test
import (
"fmt"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"gopkg.in/redis.v3"
)
var _ = Describe("Redis ring", func() {
var ring *redis.Ring
setRingKeys := func() {
for i := 0; i < 100; i++ {
err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
}
BeforeEach(func() {
ring = redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"ringShard1": ":" + ringShard1Port,
"ringShard2": ":" + ringShard2Port,
},
})
// Shards should not have any keys.
Expect(ringShard1.FlushDb().Err()).NotTo(HaveOccurred())
Expect(ringShard1.Info().Val()).NotTo(ContainSubstring("keys="))
Expect(ringShard2.FlushDb().Err()).NotTo(HaveOccurred())
Expect(ringShard2.Info().Val()).NotTo(ContainSubstring("keys="))
})
AfterEach(func() {
Expect(ring.Close()).NotTo(HaveOccurred())
})
It("uses both shards", func() {
setRingKeys()
// Both shards should have some keys now.
Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
})
It("uses one shard when other shard is down", func() {
// Stop ringShard2.
Expect(ringShard2.Close()).NotTo(HaveOccurred())
// Ring needs 5 * heartbeat time to detect that node is down.
// Give it more to be sure.
heartbeat := 100 * time.Millisecond
time.Sleep(5*heartbeat + heartbeat)
setRingKeys()
// RingShard1 should have all keys.
Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=100"))
// Start ringShard2.
var err error
ringShard2, err = startRedis(ringShard2Port)
Expect(err).NotTo(HaveOccurred())
// Wait for ringShard2 to come up.
Eventually(func() error {
return ringShard2.Ping().Err()
}, "1s").ShouldNot(HaveOccurred())
// Ring needs heartbeat time to detect that node is up.
// Give it more to be sure.
time.Sleep(heartbeat + heartbeat)
setRingKeys()
// RingShard2 should have its keys.
Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
})
})