From d7c44c7899316631fdd417940d8ca18d0a018d43 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 22 Dec 2015 15:45:03 +0200 Subject: [PATCH 1/4] Better rate limited message. --- .travis.yml | 1 - multi.go | 15 +++++++-------- pool.go | 45 ++++++++++++++++++++++++++++++--------------- pool_test.go | 26 ++++++++++++++++++++++---- pubsub.go | 6 +++--- redis.go | 7 +++---- sentinel.go | 6 ++++-- 7 files changed, 69 insertions(+), 37 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1d3148f7..dc4191ac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,6 @@ services: - redis-server go: - - 1.3 - 1.4 - 1.5 - tip diff --git a/multi.go b/multi.go index 320e78eb..0e0281d3 100644 --- a/multi.go +++ b/multi.go @@ -46,16 +46,15 @@ func (c *Client) Multi() *Multi { return multi } -func (c *Multi) putConn(cn *conn, ei error) { - var err error - if isBadConn(cn, ei) { +func (c *Multi) putConn(cn *conn, err error) { + if isBadConn(cn, err) { // Close current connection. - c.base.connPool.(*stickyConnPool).Reset() + c.base.connPool.(*stickyConnPool).Reset(err) } else { - err = c.base.connPool.Put(cn) - } - if err != nil { - log.Printf("redis: putConn failed: %s", err) + err := c.base.connPool.Put(cn) + if err != nil { + log.Printf("redis: putConn failed: %s", err) + } } } diff --git a/pool.go b/pool.go index 9968bd7e..5ed72097 100644 --- a/pool.go +++ b/pool.go @@ -20,7 +20,7 @@ type pool interface { First() *conn Get() (*conn, bool, error) Put(*conn) error - Remove(*conn) error + Remove(*conn, error) error Len() int FreeLen() int Close() error @@ -130,7 +130,7 @@ type connPool struct { _closed int32 - lastDialErr error + lastErr atomic.Value } func newConnPool(opt *Options) *connPool { @@ -204,15 +204,15 @@ func (p *connPool) wait() *conn { func (p *connPool) new() (*conn, error) { if p.rl.Limit() { err := fmt.Errorf( - "redis: you open connections too fast (last error: %v)", - p.lastDialErr, + "redis: you open connections too fast (last_error=%q)", + p.loadLastErr(), ) return nil, err } cn, err := p.dialer() if err != nil { - p.lastDialErr = err + p.storeLastErr(err.Error()) return nil, err } @@ -255,8 +255,9 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) { func (p *connPool) Put(cn *conn) error { if cn.rd.Buffered() != 0 { b, _ := cn.rd.Peek(cn.rd.Buffered()) - log.Printf("redis: connection has unread data: %q", b) - return p.Remove(cn) + err := fmt.Errorf("redis: connection has unread data: %q", b) + log.Print(err) + return p.Remove(cn, err) } if p.opt.getIdleTimeout() > 0 { cn.usedAt = time.Now() @@ -275,7 +276,9 @@ func (p *connPool) replace(cn *conn) (*conn, error) { return newcn, nil } -func (p *connPool) Remove(cn *conn) error { +func (p *connPool) Remove(cn *conn, reason error) error { + p.storeLastErr(reason.Error()) + // Replace existing connection with new one and unblock waiter. newcn, err := p.replace(cn) if err != nil { @@ -330,6 +333,17 @@ func (p *connPool) reaper() { } } +func (p *connPool) storeLastErr(err string) { + p.lastErr.Store(err) +} + +func (p *connPool) loadLastErr() string { + if v := p.lastErr.Load(); v != nil { + return v.(string) + } + return "" +} + //------------------------------------------------------------------------------ type singleConnPool struct { @@ -357,7 +371,7 @@ func (p *singleConnPool) Put(cn *conn) error { return nil } -func (p *singleConnPool) Remove(cn *conn) error { +func (p *singleConnPool) Remove(cn *conn, _ error) error { if p.cn != cn { panic("p.cn != cn") } @@ -440,13 +454,13 @@ func (p *stickyConnPool) Put(cn *conn) error { return nil } -func (p *stickyConnPool) remove() (err error) { - err = p.pool.Remove(p.cn) +func (p *stickyConnPool) remove(reason error) (err error) { + err = p.pool.Remove(p.cn, reason) p.cn = nil return err } -func (p *stickyConnPool) Remove(cn *conn) error { +func (p *stickyConnPool) Remove(cn *conn, _ error) error { defer p.mx.Unlock() p.mx.Lock() if p.closed { @@ -479,10 +493,10 @@ func (p *stickyConnPool) FreeLen() int { return 0 } -func (p *stickyConnPool) Reset() (err error) { +func (p *stickyConnPool) Reset(reason error) (err error) { p.mx.Lock() if p.cn != nil { - err = p.remove() + err = p.remove(reason) } p.mx.Unlock() return err @@ -500,7 +514,8 @@ func (p *stickyConnPool) Close() error { if p.reusable { err = p.put() } else { - err = p.remove() + reason := errors.New("redis: sticky not reusable connection") + err = p.remove(reason) } } return err diff --git a/pool_test.go b/pool_test.go index 9eb2c990..4d787a68 100644 --- a/pool_test.go +++ b/pool_test.go @@ -1,6 +1,7 @@ package redis_test import ( + "errors" "sync" "testing" "time" @@ -36,7 +37,6 @@ var _ = Describe("pool", func() { }) AfterEach(func() { - Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -141,12 +141,12 @@ var _ = Describe("pool", func() { pool := client.Pool() // Reserve one connection. - cn, _, err := client.Pool().Get() + cn, _, err := pool.Get() Expect(err).NotTo(HaveOccurred()) // Reserve the rest of connections. for i := 0; i < 9; i++ { - _, _, err := client.Pool().Get() + _, _, err := pool.Get() Expect(err).NotTo(HaveOccurred()) } @@ -168,7 +168,8 @@ var _ = Describe("pool", func() { // ok } - Expect(pool.Remove(cn)).NotTo(HaveOccurred()) + err = pool.Remove(cn, errors.New("test")) + Expect(err).NotTo(HaveOccurred()) // Check that Ping is unblocked. select { @@ -179,6 +180,23 @@ var _ = Describe("pool", func() { } Expect(ping.Err()).NotTo(HaveOccurred()) }) + + It("should rate limit dial", func() { + pool := client.Pool() + + var rateErr error + for i := 0; i < 1000; i++ { + cn, _, err := pool.Get() + if err != nil { + rateErr = err + break + } + + _ = pool.Remove(cn, errors.New("test")) + } + + Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`)) + }) }) func BenchmarkPool(b *testing.B) { diff --git a/pubsub.go b/pubsub.go index 3e20fe72..aea2bed7 100644 --- a/pubsub.go +++ b/pubsub.go @@ -233,9 +233,9 @@ func (c *PubSub) Receive() (interface{}, error) { return c.ReceiveTimeout(0) } -func (c *PubSub) reconnect() { +func (c *PubSub) reconnect(reason error) { // Close current connection. - c.connPool.(*stickyConnPool).Reset() + c.connPool.(*stickyConnPool).Reset(reason) if len(c.channels) > 0 { if err := c.Subscribe(c.channels...); err != nil { @@ -276,7 +276,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { if errNum > 2 { time.Sleep(time.Second) } - c.reconnect() + c.reconnect(err) continue } diff --git a/redis.go b/redis.go index cd88cefd..087564fd 100644 --- a/redis.go +++ b/redis.go @@ -20,10 +20,9 @@ func (c *baseClient) conn() (*conn, bool, error) { return c.connPool.Get() } -func (c *baseClient) putConn(cn *conn, ei error) { - var err error - if isBadConn(cn, ei) { - err = c.connPool.Remove(cn) +func (c *baseClient) putConn(cn *conn, err error) { + if isBadConn(cn, err) { + err = c.connPool.Remove(cn, err) } else { err = c.connPool.Put(cn) } diff --git a/sentinel.go b/sentinel.go index 63c011d4..7edd75f2 100644 --- a/sentinel.go +++ b/sentinel.go @@ -2,6 +2,7 @@ package redis import ( "errors" + "fmt" "log" "net" "strings" @@ -227,11 +228,12 @@ func (d *sentinelFailover) closeOldConns(newMaster string) { break } if cn.RemoteAddr().String() != newMaster { - log.Printf( + err := fmt.Errorf( "redis-sentinel: closing connection to the old master %s", cn.RemoteAddr(), ) - d.pool.Remove(cn) + log.Print(err) + d.pool.Remove(cn, err) } else { cnsToPut = append(cnsToPut, cn) } From a6da93713abbcc558cb4a23435f7d2108f9c2c5f Mon Sep 17 00:00:00 2001 From: Rich Hong Date: Wed, 23 Dec 2015 17:24:42 -0500 Subject: [PATCH 2/4] Support multiple keys for the PFCOUNT command --- commands.go | 9 +++++++-- commands_test.go | 4 ++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/commands.go b/commands.go index d5350022..231d4721 100644 --- a/commands.go +++ b/commands.go @@ -1340,8 +1340,13 @@ func (c *commandable) PFAdd(key string, fields ...string) *IntCmd { return cmd } -func (c *commandable) PFCount(key string) *IntCmd { - cmd := NewIntCmd("PFCOUNT", key) +func (c *commandable) PFCount(keys ...string) *IntCmd { + args := make([]interface{}, 1+len(keys)) + args[0] = "PFCOUNT" + for i, key := range keys { + args[1+i] = key + } + cmd := NewIntCmd(args...) c.Process(cmd) return cmd } diff --git a/commands_test.go b/commands_test.go index 9796a37d..c592090a 100644 --- a/commands_test.go +++ b/commands_test.go @@ -1203,6 +1203,10 @@ var _ = Describe("Commands", func() { pfCount = client.PFCount("hllMerged") Expect(pfCount.Err()).NotTo(HaveOccurred()) Expect(pfCount.Val()).To(Equal(int64(10))) + + pfCount = client.PFCount("hll1", "hll2") + Expect(pfCount.Err()).NotTo(HaveOccurred()) + Expect(pfCount.Val()).To(Equal(int64(10))) }) }) From 9b1148903efb856265efb32d3a562d21c06d928b Mon Sep 17 00:00:00 2001 From: Anatolii Mihailenco Date: Mon, 28 Dec 2015 18:58:04 +0200 Subject: [PATCH 3/4] commands.go: Add ClusterKeySlot function. --- cluster_test.go | 5 +++++ commands.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/cluster_test.go b/cluster_test.go index d409dcb5..a585ad78 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -229,6 +229,11 @@ var _ = Describe("Cluster", func() { Expect(res).To(ContainSubstring("cluster_known_nodes:6")) }) + It("should CLUSTER KEYSLOT", func() { + res, err := cluster.primary().ClusterKeySlot("somekey").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(11058))) + }) }) Describe("Client", func() { diff --git a/commands.go b/commands.go index 231d4721..ef615b0c 100644 --- a/commands.go +++ b/commands.go @@ -1698,6 +1698,13 @@ func (c *commandable) ClusterInfo() *StringCmd { return cmd } +func (c *commandable) ClusterKeySlot(key string) *IntCmd { + cmd := NewIntCmd("CLUSTER", "keyslot", key) + cmd._clusterKeyPos = 2 + c.Process(cmd) + return cmd +} + func (c *commandable) ClusterFailover() *StatusCmd { cmd := newKeylessStatusCmd("CLUSTER", "failover") c.Process(cmd) From cbc5360e78f2d61bacb68d5e336493d6ff6ae491 Mon Sep 17 00:00:00 2001 From: Anatolii Mihailenco Date: Tue, 29 Dec 2015 18:16:14 +0200 Subject: [PATCH 4/4] commands.go: Add new functions to cluster. --- cluster_test.go | 48 ++++++++++++++++++++++++++++++++++++-- commands.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index a585ad78..fe004c96 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -230,9 +230,53 @@ var _ = Describe("Cluster", func() { }) It("should CLUSTER KEYSLOT", func() { - res, err := cluster.primary().ClusterKeySlot("somekey").Result() + hashSlot, err := cluster.primary().ClusterKeySlot("somekey").Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(int64(11058))) + Expect(hashSlot).To(Equal(int64(11058))) + }) + + It("should CLUSTER COUNT-FAILURE-REPORTS", func() { + n, err := cluster.primary().ClusterCountFailureReports(cluster.nodeIds[0]).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + }) + + It("should CLUSTER COUNTKEYSINSLOT", func() { + n, err := cluster.primary().ClusterCountKeysInSlot(10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + }) + + It("should CLUSTER DELSLOTS", func() { + res, err := cluster.primary().ClusterDelSlotsRange(16000, 16384-1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + cluster.primary().ClusterAddSlotsRange(16000, 16384-1) + }) + + It("should CLUSTER SAVECONFIG", func() { + res, err := cluster.primary().ClusterSaveConfig().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + }) + + It("should CLUSTER SLAVES", func() { + nodesList, err := cluster.primary().ClusterSlaves(cluster.nodeIds[0]).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(nodesList).Should(ContainElement(ContainSubstring("slave"))) + Expect(nodesList).Should(HaveLen(1)) + }) + + It("should CLUSTER READONLY", func() { + res, err := cluster.primary().Readonly().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + }) + + It("should CLUSTER READWRITE", func() { + res, err := cluster.primary().ReadWrite().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) }) }) diff --git a/commands.go b/commands.go index ef615b0c..215f414c 100644 --- a/commands.go +++ b/commands.go @@ -1705,6 +1705,68 @@ func (c *commandable) ClusterKeySlot(key string) *IntCmd { return cmd } +func (c *commandable) ClusterCountFailureReports(nodeID string) *IntCmd { + cmd := NewIntCmd("CLUSTER", "count-failure-reports", nodeID) + cmd._clusterKeyPos = 2 + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterCountKeysInSlot(slot int) *IntCmd { + cmd := NewIntCmd("CLUSTER", "countkeysinslot", slot) + cmd._clusterKeyPos = 2 + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterDelSlots(slots ...int) *StatusCmd { + args := make([]interface{}, 2+len(slots)) + args[0] = "CLUSTER" + args[1] = "DELSLOTS" + for i, slot := range slots { + args[2+i] = slot + } + cmd := newKeylessStatusCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterDelSlotsRange(min, max int) *StatusCmd { + size := max - min + 1 + slots := make([]int, size) + for i := 0; i < size; i++ { + slots[i] = min + i + } + return c.ClusterDelSlots(slots...) +} + +func (c *commandable) ClusterSaveConfig() *StatusCmd { + cmd := newKeylessStatusCmd("CLUSTER", "saveconfig") + c.Process(cmd) + return cmd +} + +func (c *commandable) ClusterSlaves(nodeID string) *StringSliceCmd { + cmd := NewStringSliceCmd("CLUSTER", "SLAVES", nodeID) + cmd._clusterKeyPos = 2 + c.Process(cmd) + return cmd +} + +func (c *commandable) Readonly() *StatusCmd { + cmd := newKeylessStatusCmd("READONLY") + cmd._clusterKeyPos = 0 + c.Process(cmd) + return cmd +} + +func (c *commandable) ReadWrite() *StatusCmd { + cmd := newKeylessStatusCmd("READWRITE") + cmd._clusterKeyPos = 0 + c.Process(cmd) + return cmd +} + func (c *commandable) ClusterFailover() *StatusCmd { cmd := newKeylessStatusCmd("CLUSTER", "failover") c.Process(cmd)