From 46f49a17a54456682c79b3f4e06c08800a2defb8 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 25 May 2015 16:22:27 +0300 Subject: [PATCH 1/8] Add Redis Ring. --- .travis.yml | 1 + cluster.go | 12 ++- main_test.go | 19 ++++- ring.go | 237 +++++++++++++++++++++++++++++++++++++++++++++++++++ ring_test.go | 84 ++++++++++++++++++ 5 files changed, 348 insertions(+), 5 deletions(-) create mode 100644 ring.go create mode 100644 ring_test.go diff --git a/.travis.yml b/.travis.yml index 169ccd0a..3af4dd98 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ go: install: - go get gopkg.in/bufio.v1 - go get gopkg.in/bsm/ratelimit.v1 + - go get github.com/golang/groupcache/consistenthash - go get github.com/onsi/ginkgo - go get github.com/onsi/gomega - mkdir -p $HOME/gopath/src/gopkg.in diff --git a/cluster.go b/cluster.go index 2d46ca73..99d3eebf 100644 --- a/cluster.go +++ b/cluster.go @@ -307,7 +307,6 @@ func (opt *ClusterOptions) getMaxRedirects() int { func (opt *ClusterOptions) clientOptions() *Options { return &Options{ - DB: 0, Password: opt.Password, DialTimeout: opt.DialTimeout, @@ -324,14 +323,19 @@ func (opt *ClusterOptions) clientOptions() *Options { const hashSlots = 16384 -// hashSlot returns a consistent slot number between 0 and 16383 -// for any given string key. -func hashSlot(key string) int { +func hashKey(key string) string { if s := strings.IndexByte(key, '{'); s > -1 { if e := strings.IndexByte(key[s+1:], '}'); e > 0 { key = key[s+1 : s+e+1] } } + return key +} + +// hashSlot returns a consistent slot number between 0 and 16383 +// for any given string key. +func hashSlot(key string) int { + key = hashKey(key) if key == "" { return rand.Intn(hashSlots) } diff --git a/main_test.go b/main_test.go index 57eb4933..ed2f7de9 100644 --- a/main_test.go +++ b/main_test.go @@ -23,6 +23,11 @@ const ( redisSecondaryPort = "6381" ) +const ( + ringShard1Port = "6390" + ringShard2Port = "6391" +) + const ( sentinelName = "mymaster" sentinelMasterPort = "8123" @@ -31,7 +36,11 @@ const ( sentinelPort = "8126" ) -var redisMain, sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess +var ( + redisMain *redisProcess + ringShard1, ringShard2 *redisProcess + sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess +) var cluster = &clusterScenario{ ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, @@ -46,6 +55,12 @@ var _ = BeforeSuite(func() { redisMain, err = startRedis(redisPort) Expect(err).NotTo(HaveOccurred()) + ringShard1, err = startRedis(ringShard1Port) + Expect(err).NotTo(HaveOccurred()) + + ringShard2, err = startRedis(ringShard2Port) + Expect(err).NotTo(HaveOccurred()) + sentinelMaster, err = startRedis(sentinelMasterPort) Expect(err).NotTo(HaveOccurred()) @@ -65,6 +80,8 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { Expect(redisMain.Close()).NotTo(HaveOccurred()) + Expect(ringShard1.Close()).NotTo(HaveOccurred()) + Expect(ringShard2.Close()).NotTo(HaveOccurred()) Expect(sentinel.Close()).NotTo(HaveOccurred()) Expect(sentinelSlave1.Close()).NotTo(HaveOccurred()) diff --git a/ring.go b/ring.go new file mode 100644 index 00000000..0735b16b --- /dev/null +++ b/ring.go @@ -0,0 +1,237 @@ +package redis + +import ( + "errors" + "fmt" + "log" + "sync" + "time" + + "github.com/golang/groupcache/consistenthash" +) + +var ( + errRingShardsDown = errors.New("redis: all ring shards are down") +) + +// RingOptions are used to configure a ring client and should be +// passed to NewRing. +type RingOptions struct { + // A map of name => host:port addresses of ring shards. + Addrs map[string]string + + // Following options are copied from Options struct. + + DB int64 + Password string + + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + PoolSize int + PoolTimeout time.Duration + IdleTimeout time.Duration +} + +func (opt *RingOptions) clientOptions() *Options { + return &Options{ + DB: opt.DB, + Password: opt.Password, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + } +} + +type ringShard struct { + Client *Client + down int +} + +func (shard *ringShard) String() string { + var state string + if shard.IsUp() { + state = "up" + } else { + state = "down" + } + return fmt.Sprintf("%s is %s", shard.Client, state) +} + +func (shard *ringShard) IsDown() bool { + const threshold = 5 + return shard.down >= threshold +} + +func (shard *ringShard) IsUp() bool { + return !shard.IsDown() +} + +// Vote votes to set shard state and returns true if state was changed. +func (shard *ringShard) Vote(up bool) bool { + if up { + changed := shard.IsDown() + shard.down = 0 + return changed + } + + if shard.IsDown() { + return false + } + + shard.down++ + return shard.IsDown() +} + +// Ring is a Redis client that uses constistent hashing to distribute +// keys across multiple Redis servers (shards). +// +// It monitors the state of each shard and removes dead shards from +// the ring. When shard comes online it is added back to the ring. This +// gives you maximum availability and partition tolerance, but no +// consistency between different shards or even clients. Each client +// uses shards that are available to the client and does not do any +// coordination when shard state is changed. +// +// Ring should be used when you use multiple Redis servers for caching +// and can tolerate losing data when one of the servers dies. +// Otherwise you should use Redis Cluster. +type Ring struct { + commandable + + nreplicas int + + mx sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard + + closed bool +} + +func NewRing(opt *RingOptions) *Ring { + const nreplicas = 100 + ring := &Ring{ + nreplicas: nreplicas, + hash: consistenthash.New(nreplicas, nil), + shards: make(map[string]*ringShard), + } + ring.commandable.process = ring.process + for name, addr := range opt.Addrs { + clopt := opt.clientOptions() + clopt.Addr = addr + ring.addClient(name, NewClient(clopt)) + } + go ring.heartbeat() + return ring +} + +func (ring *Ring) addClient(name string, cl *Client) { + ring.mx.Lock() + ring.hash.Add(name) + ring.shards[name] = &ringShard{Client: cl} + ring.mx.Unlock() +} + +func (ring *Ring) getClient(key string) (*Client, error) { + ring.mx.RLock() + + if ring.closed { + return nil, errClosed + } + + name := ring.hash.Get(key) + if name == "" { + ring.mx.RUnlock() + return nil, errRingShardsDown + } + + if shard, ok := ring.shards[name]; ok { + ring.mx.RUnlock() + return shard.Client, nil + } + + ring.mx.RUnlock() + return nil, errRingShardsDown +} + +func (ring *Ring) process(cmd Cmder) { + cl, err := ring.getClient(hashKey(cmd.clusterKey())) + if err != nil { + cmd.setErr(err) + return + } + cl.baseClient.process(cmd) +} + +// rebalance removes dead shards from the ring. +func (ring *Ring) rebalance() { + defer ring.mx.Unlock() + ring.mx.Lock() + + ring.hash = consistenthash.New(ring.nreplicas, nil) + for name, shard := range ring.shards { + if shard.IsUp() { + ring.hash.Add(name) + } + } +} + +// heartbeat monitors state of each shard in the ring. +func (ring *Ring) heartbeat() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for _ = range ticker.C { + var rebalance bool + + ring.mx.RLock() + + if ring.closed { + ring.mx.RUnlock() + break + } + + for _, shard := range ring.shards { + err := shard.Client.Ping().Err() + if shard.Vote(err == nil) { + log.Printf("redis: ring shard state changed: %s", shard) + rebalance = true + } + } + + ring.mx.RUnlock() + + if rebalance { + ring.rebalance() + } + } +} + +// Close closes the ring client, releasing any open resources. +// +// It is rare to Close a Client, as the Client is meant to be +// long-lived and shared between many goroutines. +func (ring *Ring) Close() (retErr error) { + defer ring.mx.Unlock() + ring.mx.Lock() + + if ring.closed { + return nil + } + ring.closed = true + + for _, shard := range ring.shards { + if err := shard.Client.Close(); err != nil { + retErr = err + } + } + ring.hash = nil + ring.shards = nil + + return retErr +} diff --git a/ring_test.go b/ring_test.go new file mode 100644 index 00000000..35212c3f --- /dev/null +++ b/ring_test.go @@ -0,0 +1,84 @@ +package redis_test + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "gopkg.in/redis.v3" +) + +var _ = Describe("Redis ring", func() { + var ring *redis.Ring + + setRingKeys := func() { + for i := 0; i < 100; i++ { + err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + } + + BeforeEach(func() { + ring = redis.NewRing(&redis.RingOptions{ + Addrs: map[string]string{ + "ringShard1": ":" + ringShard1Port, + "ringShard2": ":" + ringShard2Port, + }, + }) + + // Shards should not have any keys. + Expect(ringShard1.FlushDb().Err()).NotTo(HaveOccurred()) + Expect(ringShard1.Info().Val()).NotTo(ContainSubstring("keys=")) + + Expect(ringShard2.FlushDb().Err()).NotTo(HaveOccurred()) + Expect(ringShard2.Info().Val()).NotTo(ContainSubstring("keys=")) + }) + + AfterEach(func() { + Expect(ring.Close()).NotTo(HaveOccurred()) + }) + + It("uses both shards", func() { + setRingKeys() + + // Both shards should have some keys now. + Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57")) + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) + }) + + It("uses one shard when other shard is down", func() { + // Stop ringShard2. + Expect(ringShard2.Close()).NotTo(HaveOccurred()) + + // Ring needs 5 * heartbeat time to detect that node is down. + // Give it more to be sure. + heartbeat := 100 * time.Millisecond + time.Sleep(5*heartbeat + heartbeat) + + setRingKeys() + + // RingShard1 should have all keys. + Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=100")) + + // Start ringShard2. + var err error + ringShard2, err = startRedis(ringShard2Port) + Expect(err).NotTo(HaveOccurred()) + + // Wait for ringShard2 to come up. + Eventually(func() error { + return ringShard2.Ping().Err() + }, "1s").ShouldNot(HaveOccurred()) + + // Ring needs heartbeat time to detect that node is up. + // Give it more to be sure. + time.Sleep(heartbeat + heartbeat) + + setRingKeys() + + // RingShard2 should have its keys. + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) + }) +}) From 2103d88732e47fbe86aa70ba4e270bd96f2b5007 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 2 Jun 2015 17:19:51 +0300 Subject: [PATCH 2/8] Embed consistenthash package. --- .travis.yml | 1 - internal/consistenthash/consistenthash.go | 81 +++++++++++++ .../consistenthash/consistenthash_test.go | 110 ++++++++++++++++++ ring.go | 2 +- 4 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 internal/consistenthash/consistenthash.go create mode 100644 internal/consistenthash/consistenthash_test.go diff --git a/.travis.yml b/.travis.yml index 3af4dd98..169ccd0a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,6 @@ go: install: - go get gopkg.in/bufio.v1 - go get gopkg.in/bsm/ratelimit.v1 - - go get github.com/golang/groupcache/consistenthash - go get github.com/onsi/ginkgo - go get github.com/onsi/gomega - mkdir -p $HOME/gopath/src/gopkg.in diff --git a/internal/consistenthash/consistenthash.go b/internal/consistenthash/consistenthash.go new file mode 100644 index 00000000..a9c56f07 --- /dev/null +++ b/internal/consistenthash/consistenthash.go @@ -0,0 +1,81 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package consistenthash provides an implementation of a ring hash. +package consistenthash + +import ( + "hash/crc32" + "sort" + "strconv" +) + +type Hash func(data []byte) uint32 + +type Map struct { + hash Hash + replicas int + keys []int // Sorted + hashMap map[int]string +} + +func New(replicas int, fn Hash) *Map { + m := &Map{ + replicas: replicas, + hash: fn, + hashMap: make(map[int]string), + } + if m.hash == nil { + m.hash = crc32.ChecksumIEEE + } + return m +} + +// Returns true if there are no items available. +func (m *Map) IsEmpty() bool { + return len(m.keys) == 0 +} + +// Adds some keys to the hash. +func (m *Map) Add(keys ...string) { + for _, key := range keys { + for i := 0; i < m.replicas; i++ { + hash := int(m.hash([]byte(strconv.Itoa(i) + key))) + m.keys = append(m.keys, hash) + m.hashMap[hash] = key + } + } + sort.Ints(m.keys) +} + +// Gets the closest item in the hash to the provided key. +func (m *Map) Get(key string) string { + if m.IsEmpty() { + return "" + } + + hash := int(m.hash([]byte(key))) + + // Binary search for appropriate replica. + idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) + + // Means we have cycled back to the first replica. + if idx == len(m.keys) { + idx = 0 + } + + return m.hashMap[m.keys[idx]] +} diff --git a/internal/consistenthash/consistenthash_test.go b/internal/consistenthash/consistenthash_test.go new file mode 100644 index 00000000..1a37fd7f --- /dev/null +++ b/internal/consistenthash/consistenthash_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consistenthash + +import ( + "fmt" + "strconv" + "testing" +) + +func TestHashing(t *testing.T) { + + // Override the hash function to return easier to reason about values. Assumes + // the keys can be converted to an integer. + hash := New(3, func(key []byte) uint32 { + i, err := strconv.Atoi(string(key)) + if err != nil { + panic(err) + } + return uint32(i) + }) + + // Given the above hash function, this will give replicas with "hashes": + // 2, 4, 6, 12, 14, 16, 22, 24, 26 + hash.Add("6", "4", "2") + + testCases := map[string]string{ + "2": "2", + "11": "2", + "23": "4", + "27": "2", + } + + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yielded %s", k, v) + } + } + + // Adds 8, 18, 28 + hash.Add("8") + + // 27 should now map to 8. + testCases["27"] = "8" + + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yielded %s", k, v) + } + } + +} + +func TestConsistency(t *testing.T) { + hash1 := New(1, nil) + hash2 := New(1, nil) + + hash1.Add("Bill", "Bob", "Bonny") + hash2.Add("Bob", "Bonny", "Bill") + + if hash1.Get("Ben") != hash2.Get("Ben") { + t.Errorf("Fetching 'Ben' from both hashes should be the same") + } + + hash2.Add("Becky", "Ben", "Bobby") + + if hash1.Get("Ben") != hash2.Get("Ben") || + hash1.Get("Bob") != hash2.Get("Bob") || + hash1.Get("Bonny") != hash2.Get("Bonny") { + t.Errorf("Direct matches should always return the same entry") + } + +} + +func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) } +func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) } +func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) } +func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) } + +func benchmarkGet(b *testing.B, shards int) { + + hash := New(50, nil) + + var buckets []string + for i := 0; i < shards; i++ { + buckets = append(buckets, fmt.Sprintf("shard-%d", i)) + } + + hash.Add(buckets...) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + hash.Get(buckets[i&(shards-1)]) + } +} diff --git a/ring.go b/ring.go index 0735b16b..4772581f 100644 --- a/ring.go +++ b/ring.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/golang/groupcache/consistenthash" + "gopkg.in/redis.v3/internal/consistenthash" ) var ( From 90aaae2ba2f19f5e2f68b28becae53c56fc1edba Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 2 Jun 2015 18:24:14 +0300 Subject: [PATCH 3/8] makefile: remove ginkgo flags. --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 33dc9733..d3763d61 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ all: testdeps - go test ./... -v 1 -ginkgo.slowSpecThreshold=10 -cpu=1,2,4 - go test ./... -ginkgo.slowSpecThreshold=10 -short -race + go test ./... -v=1 -cpu=1,2,4 + go test ./... -short -race test: testdeps - go test ./... -v 1 -ginkgo.slowSpecThreshold=10 + go test ./... -v=1 testdeps: .test/redis/src/redis-server From fc04a09033c98e5c3eff38c214c4c34dd40e980e Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 3 Jun 2015 14:36:48 +0300 Subject: [PATCH 4/8] Fix flaky test. --- pool_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pool_test.go b/pool_test.go index 3b0d00af..a4fa76e2 100644 --- a/pool_test.go +++ b/pool_test.go @@ -101,8 +101,9 @@ var _ = Describe("Pool", func() { }) pool := client.Pool() - Expect(pool.Len()).To(Equal(10)) - Expect(pool.FreeLen()).To(Equal(10)) + Expect(pool.Len()).To(BeNumerically("<=", 10)) + Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) + Expect(pool.Len()).To(Equal(pool.FreeLen())) }) It("should remove broken connections", func() { From 7d886330f114b726d3464c59a62e82b2cdfa7adc Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 3 Jun 2015 14:50:43 +0300 Subject: [PATCH 5/8] pool: close all connections at once. --- pool.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pool.go b/pool.go index 090270b5..301b9e00 100644 --- a/pool.go +++ b/pool.go @@ -278,17 +278,13 @@ func (p *connPool) Close() (retErr error) { if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) { return errClosed } - // First close free connections. - for p.Len() > 0 { - cn := p.wait() - if cn == nil { + // Wait for app to free connections, but don't close them immediately. + for i := 0; i < p.Len(); i++ { + if cn := p.wait(); cn == nil { break } - if err := p.conns.Remove(cn); err != nil { - retErr = err - } } - // Then close the rest. + // Close all connections. if err := p.conns.Close(); err != nil { retErr = err } From 3fc16811b5b8a77c9c5b541c3c1d6525bb38f2d9 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 3 Jun 2015 15:09:56 +0300 Subject: [PATCH 6/8] Fix flaky tests by using better matcher. --- commands_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/commands_test.go b/commands_test.go index 0d19b2e5..ef593e1e 100644 --- a/commands_test.go +++ b/commands_test.go @@ -369,8 +369,7 @@ var _ = Describe("Commands", func() { pttl := client.PTTL("key") Expect(pttl.Err()).NotTo(HaveOccurred()) - Expect(pttl.Val() <= expiration).To(Equal(true)) - Expect(pttl.Val() >= expiration-time.Millisecond).To(Equal(true)) + Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond)) }) It("should PExpireAt", func() { @@ -389,8 +388,7 @@ var _ = Describe("Commands", func() { pttl := client.PTTL("key") Expect(pttl.Err()).NotTo(HaveOccurred()) - Expect(pttl.Val() <= expiration).To(Equal(true)) - Expect(pttl.Val() >= expiration-time.Millisecond).To(Equal(true)) + Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond)) }) It("should PTTL", func() { @@ -405,8 +403,7 @@ var _ = Describe("Commands", func() { pttl := client.PTTL("key") Expect(pttl.Err()).NotTo(HaveOccurred()) - Expect(pttl.Val() <= expiration).To(Equal(true)) - Expect(pttl.Val() >= expiration-time.Millisecond).To(Equal(true)) + Expect(pttl.Val()).To(BeNumerically("~", expiration, 10*time.Millisecond)) }) It("should RandomKey", func() { From a8fe55571b1d68ed68d1e7f19e2fe8ed0570412b Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 3 Jun 2015 16:45:46 +0300 Subject: [PATCH 7/8] pool: put connection to the list before returning it. --- main_test.go | 1 + multi.go | 8 ++++++-- pool.go | 56 +++++++++++++++++++++++++++------------------------ redis_test.go | 18 +++++++++++++++++ 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/main_test.go b/main_test.go index ed2f7de9..c4b5a597 100644 --- a/main_test.go +++ b/main_test.go @@ -80,6 +80,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { Expect(redisMain.Close()).NotTo(HaveOccurred()) + Expect(ringShard1.Close()).NotTo(HaveOccurred()) Expect(ringShard2.Close()).NotTo(HaveOccurred()) diff --git a/multi.go b/multi.go index edd5ce3a..9d87de9a 100644 --- a/multi.go +++ b/multi.go @@ -3,6 +3,7 @@ package redis import ( "errors" "fmt" + "log" ) var errDiscard = errors.New("redis: Discard can be used only inside Exec") @@ -18,7 +19,10 @@ type Multi struct { func (c *Client) Multi() *Multi { multi := &Multi{ - base: &baseClient{opt: c.opt, connPool: newSingleConnPool(c.connPool, true)}, + base: &baseClient{ + opt: c.opt, + connPool: newSingleConnPool(c.connPool, true), + }, } multi.commandable.process = multi.process return multi @@ -34,7 +38,7 @@ func (c *Multi) process(cmd Cmder) { func (c *Multi) Close() error { if err := c.Unwatch().Err(); err != nil { - return err + log.Printf("redis: Unwatch failed: %s", err) } return c.base.Close() } diff --git a/pool.go b/pool.go index 301b9e00..714cbe5b 100644 --- a/pool.go +++ b/pool.go @@ -258,10 +258,12 @@ func (p *connPool) Remove(cn *conn) error { // Replace existing connection with new one and unblock waiter. newcn, err := p.new() if err != nil { + log.Printf("redis: new failed: %s", err) return p.conns.Remove(cn) } + err = p.conns.Replace(cn, newcn) p.freeConns <- newcn - return p.conns.Replace(cn, newcn) + return err } // Len returns total number of connections. @@ -312,14 +314,12 @@ func (p *connPool) reaper() { //------------------------------------------------------------------------------ type singleConnPool struct { - pool pool - - cnMtx sync.Mutex - cn *conn - + pool pool reusable bool + cn *conn closed bool + mx sync.Mutex } func newSingleConnPool(pool pool, reusable bool) *singleConnPool { @@ -330,20 +330,24 @@ func newSingleConnPool(pool pool, reusable bool) *singleConnPool { } func (p *singleConnPool) SetConn(cn *conn) { - p.cnMtx.Lock() + p.mx.Lock() + if p.cn != nil { + panic("p.cn != nil") + } p.cn = cn - p.cnMtx.Unlock() + p.mx.Unlock() } func (p *singleConnPool) First() *conn { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() - return p.cn + p.mx.Lock() + cn := p.cn + p.mx.Unlock() + return cn } func (p *singleConnPool) Get() (*conn, error) { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() + defer p.mx.Unlock() + p.mx.Lock() if p.closed { return nil, errClosed @@ -362,8 +366,8 @@ func (p *singleConnPool) Get() (*conn, error) { } func (p *singleConnPool) Put(cn *conn) error { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() + defer p.mx.Unlock() + p.mx.Lock() if p.cn != cn { panic("p.cn != cn") } @@ -374,8 +378,8 @@ func (p *singleConnPool) Put(cn *conn) error { } func (p *singleConnPool) Remove(cn *conn) error { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() + defer p.mx.Unlock() + p.mx.Lock() if p.cn == nil { panic("p.cn == nil") } @@ -395,8 +399,8 @@ func (p *singleConnPool) remove() error { } func (p *singleConnPool) Len() int { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() + defer p.mx.Unlock() + p.mx.Lock() if p.cn == nil { return 0 } @@ -404,19 +408,19 @@ func (p *singleConnPool) Len() int { } func (p *singleConnPool) FreeLen() int { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() + defer p.mx.Unlock() + p.mx.Lock() if p.cn == nil { - return 0 + return 1 } - return 1 + return 0 } func (p *singleConnPool) Close() error { - defer p.cnMtx.Unlock() - p.cnMtx.Lock() + defer p.mx.Unlock() + p.mx.Lock() if p.closed { - return nil + return errClosed } p.closed = true var err error diff --git a/redis_test.go b/redis_test.go index 4ad44866..8a8663e1 100644 --- a/redis_test.go +++ b/redis_test.go @@ -88,6 +88,24 @@ var _ = Describe("Client", func() { Expect(client.Ping().Err()).NotTo(HaveOccurred()) }) + It("should close pubsub when client is closed", func() { + pubsub := client.PubSub() + Expect(client.Close()).NotTo(HaveOccurred()) + Expect(pubsub.Close()).NotTo(HaveOccurred()) + }) + + It("should close multi when client is closed", func() { + multi := client.Multi() + Expect(client.Close()).NotTo(HaveOccurred()) + Expect(multi.Close()).NotTo(HaveOccurred()) + }) + + It("should close pipeline when client is closed", func() { + pipeline := client.Pipeline() + Expect(client.Close()).NotTo(HaveOccurred()) + Expect(pipeline.Close()).NotTo(HaveOccurred()) + }) + It("should support idle-timeouts", func() { idle := redis.NewClient(&redis.Options{ Addr: redisAddr, From c64b7819b9edd393bdde0a9dd206bc57b051d464 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 3 Jun 2015 17:08:27 +0300 Subject: [PATCH 8/8] Fix flaky test. --- cluster_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cluster_test.go b/cluster_test.go index e8e0802a..9173faea 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -252,7 +252,6 @@ var _ = Describe("Cluster", func() { val, err := client.Get("A").Result() Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal("VALUE")) - Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) Eventually(func() []string { return client.SlotAddrs(slot)