Merge pull request #228 from go-redis/fix/better-rate-limited-message

Better rate limited message.
This commit is contained in:
Vladimir Mihailenco 2015-12-24 11:03:48 +02:00
commit 8319126d93
7 changed files with 69 additions and 37 deletions

View File

@ -5,7 +5,6 @@ services:
- redis-server - redis-server
go: go:
- 1.3
- 1.4 - 1.4
- 1.5 - 1.5
- tip - tip

View File

@ -46,17 +46,16 @@ func (c *Client) Multi() *Multi {
return multi return multi
} }
func (c *Multi) putConn(cn *conn, ei error) { func (c *Multi) putConn(cn *conn, err error) {
var err error if isBadConn(cn, err) {
if isBadConn(cn, ei) {
// Close current connection. // Close current connection.
c.base.connPool.(*stickyConnPool).Reset() c.base.connPool.(*stickyConnPool).Reset(err)
} else { } else {
err = c.base.connPool.Put(cn) err := c.base.connPool.Put(cn)
}
if err != nil { if err != nil {
log.Printf("redis: putConn failed: %s", err) log.Printf("redis: putConn failed: %s", err)
} }
}
} }
func (c *Multi) process(cmd Cmder) { func (c *Multi) process(cmd Cmder) {

45
pool.go
View File

@ -20,7 +20,7 @@ type pool interface {
First() *conn First() *conn
Get() (*conn, bool, error) Get() (*conn, bool, error)
Put(*conn) error Put(*conn) error
Remove(*conn) error Remove(*conn, error) error
Len() int Len() int
FreeLen() int FreeLen() int
Close() error Close() error
@ -130,7 +130,7 @@ type connPool struct {
_closed int32 _closed int32
lastDialErr error lastErr atomic.Value
} }
func newConnPool(opt *Options) *connPool { func newConnPool(opt *Options) *connPool {
@ -204,15 +204,15 @@ func (p *connPool) wait() *conn {
func (p *connPool) new() (*conn, error) { func (p *connPool) new() (*conn, error) {
if p.rl.Limit() { if p.rl.Limit() {
err := fmt.Errorf( err := fmt.Errorf(
"redis: you open connections too fast (last error: %v)", "redis: you open connections too fast (last_error=%q)",
p.lastDialErr, p.loadLastErr(),
) )
return nil, err return nil, err
} }
cn, err := p.dialer() cn, err := p.dialer()
if err != nil { if err != nil {
p.lastDialErr = err p.storeLastErr(err.Error())
return nil, err return nil, err
} }
@ -255,8 +255,9 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
func (p *connPool) Put(cn *conn) error { func (p *connPool) Put(cn *conn) error {
if cn.rd.Buffered() != 0 { if cn.rd.Buffered() != 0 {
b, _ := cn.rd.Peek(cn.rd.Buffered()) b, _ := cn.rd.Peek(cn.rd.Buffered())
log.Printf("redis: connection has unread data: %q", b) err := fmt.Errorf("redis: connection has unread data: %q", b)
return p.Remove(cn) log.Print(err)
return p.Remove(cn, err)
} }
if p.opt.getIdleTimeout() > 0 { if p.opt.getIdleTimeout() > 0 {
cn.usedAt = time.Now() cn.usedAt = time.Now()
@ -275,7 +276,9 @@ func (p *connPool) replace(cn *conn) (*conn, error) {
return newcn, nil return newcn, nil
} }
func (p *connPool) Remove(cn *conn) error { func (p *connPool) Remove(cn *conn, reason error) error {
p.storeLastErr(reason.Error())
// Replace existing connection with new one and unblock waiter. // Replace existing connection with new one and unblock waiter.
newcn, err := p.replace(cn) newcn, err := p.replace(cn)
if err != nil { if err != nil {
@ -330,6 +333,17 @@ func (p *connPool) reaper() {
} }
} }
func (p *connPool) storeLastErr(err string) {
p.lastErr.Store(err)
}
func (p *connPool) loadLastErr() string {
if v := p.lastErr.Load(); v != nil {
return v.(string)
}
return ""
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type singleConnPool struct { type singleConnPool struct {
@ -357,7 +371,7 @@ func (p *singleConnPool) Put(cn *conn) error {
return nil return nil
} }
func (p *singleConnPool) Remove(cn *conn) error { func (p *singleConnPool) Remove(cn *conn, _ error) error {
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
@ -440,13 +454,13 @@ func (p *stickyConnPool) Put(cn *conn) error {
return nil return nil
} }
func (p *stickyConnPool) remove() (err error) { func (p *stickyConnPool) remove(reason error) (err error) {
err = p.pool.Remove(p.cn) err = p.pool.Remove(p.cn, reason)
p.cn = nil p.cn = nil
return err return err
} }
func (p *stickyConnPool) Remove(cn *conn) error { func (p *stickyConnPool) Remove(cn *conn, _ error) error {
defer p.mx.Unlock() defer p.mx.Unlock()
p.mx.Lock() p.mx.Lock()
if p.closed { if p.closed {
@ -479,10 +493,10 @@ func (p *stickyConnPool) FreeLen() int {
return 0 return 0
} }
func (p *stickyConnPool) Reset() (err error) { func (p *stickyConnPool) Reset(reason error) (err error) {
p.mx.Lock() p.mx.Lock()
if p.cn != nil { if p.cn != nil {
err = p.remove() err = p.remove(reason)
} }
p.mx.Unlock() p.mx.Unlock()
return err return err
@ -500,7 +514,8 @@ func (p *stickyConnPool) Close() error {
if p.reusable { if p.reusable {
err = p.put() err = p.put()
} else { } else {
err = p.remove() reason := errors.New("redis: sticky not reusable connection")
err = p.remove(reason)
} }
} }
return err return err

View File

@ -1,6 +1,7 @@
package redis_test package redis_test
import ( import (
"errors"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -36,7 +37,6 @@ var _ = Describe("pool", func() {
}) })
AfterEach(func() { AfterEach(func() {
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
@ -141,12 +141,12 @@ var _ = Describe("pool", func() {
pool := client.Pool() pool := client.Pool()
// Reserve one connection. // Reserve one connection.
cn, _, err := client.Pool().Get() cn, _, err := pool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Reserve the rest of connections. // Reserve the rest of connections.
for i := 0; i < 9; i++ { for i := 0; i < 9; i++ {
_, _, err := client.Pool().Get() _, _, err := pool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
@ -168,7 +168,8 @@ var _ = Describe("pool", func() {
// ok // ok
} }
Expect(pool.Remove(cn)).NotTo(HaveOccurred()) err = pool.Remove(cn, errors.New("test"))
Expect(err).NotTo(HaveOccurred())
// Check that Ping is unblocked. // Check that Ping is unblocked.
select { select {
@ -179,6 +180,23 @@ var _ = Describe("pool", func() {
} }
Expect(ping.Err()).NotTo(HaveOccurred()) Expect(ping.Err()).NotTo(HaveOccurred())
}) })
It("should rate limit dial", func() {
pool := client.Pool()
var rateErr error
for i := 0; i < 1000; i++ {
cn, _, err := pool.Get()
if err != nil {
rateErr = err
break
}
_ = pool.Remove(cn, errors.New("test"))
}
Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`))
})
}) })
func BenchmarkPool(b *testing.B) { func BenchmarkPool(b *testing.B) {

View File

@ -233,9 +233,9 @@ func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0) return c.ReceiveTimeout(0)
} }
func (c *PubSub) reconnect() { func (c *PubSub) reconnect(reason error) {
// Close current connection. // Close current connection.
c.connPool.(*stickyConnPool).Reset() c.connPool.(*stickyConnPool).Reset(reason)
if len(c.channels) > 0 { if len(c.channels) > 0 {
if err := c.Subscribe(c.channels...); err != nil { if err := c.Subscribe(c.channels...); err != nil {
@ -276,7 +276,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
if errNum > 2 { if errNum > 2 {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
c.reconnect() c.reconnect(err)
continue continue
} }

View File

@ -20,10 +20,9 @@ func (c *baseClient) conn() (*conn, bool, error) {
return c.connPool.Get() return c.connPool.Get()
} }
func (c *baseClient) putConn(cn *conn, ei error) { func (c *baseClient) putConn(cn *conn, err error) {
var err error if isBadConn(cn, err) {
if isBadConn(cn, ei) { err = c.connPool.Remove(cn, err)
err = c.connPool.Remove(cn)
} else { } else {
err = c.connPool.Put(cn) err = c.connPool.Put(cn)
} }

View File

@ -2,6 +2,7 @@ package redis
import ( import (
"errors" "errors"
"fmt"
"log" "log"
"net" "net"
"strings" "strings"
@ -227,11 +228,12 @@ func (d *sentinelFailover) closeOldConns(newMaster string) {
break break
} }
if cn.RemoteAddr().String() != newMaster { if cn.RemoteAddr().String() != newMaster {
log.Printf( err := fmt.Errorf(
"redis-sentinel: closing connection to the old master %s", "redis-sentinel: closing connection to the old master %s",
cn.RemoteAddr(), cn.RemoteAddr(),
) )
d.pool.Remove(cn) log.Print(err)
d.pool.Remove(cn, err)
} else { } else {
cnsToPut = append(cnsToPut, cn) cnsToPut = append(cnsToPut, cn)
} }