forked from mirror/redis
Better rate limited message.
This commit is contained in:
parent
f6d6826d82
commit
d7c44c7899
|
@ -5,7 +5,6 @@ services:
|
||||||
- redis-server
|
- redis-server
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.3
|
|
||||||
- 1.4
|
- 1.4
|
||||||
- 1.5
|
- 1.5
|
||||||
- tip
|
- tip
|
||||||
|
|
11
multi.go
11
multi.go
|
@ -46,17 +46,16 @@ func (c *Client) Multi() *Multi {
|
||||||
return multi
|
return multi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Multi) putConn(cn *conn, ei error) {
|
func (c *Multi) putConn(cn *conn, err error) {
|
||||||
var err error
|
if isBadConn(cn, err) {
|
||||||
if isBadConn(cn, ei) {
|
|
||||||
// Close current connection.
|
// Close current connection.
|
||||||
c.base.connPool.(*stickyConnPool).Reset()
|
c.base.connPool.(*stickyConnPool).Reset(err)
|
||||||
} else {
|
} else {
|
||||||
err = c.base.connPool.Put(cn)
|
err := c.base.connPool.Put(cn)
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("redis: putConn failed: %s", err)
|
log.Printf("redis: putConn failed: %s", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Multi) process(cmd Cmder) {
|
func (c *Multi) process(cmd Cmder) {
|
||||||
|
|
45
pool.go
45
pool.go
|
@ -20,7 +20,7 @@ type pool interface {
|
||||||
First() *conn
|
First() *conn
|
||||||
Get() (*conn, bool, error)
|
Get() (*conn, bool, error)
|
||||||
Put(*conn) error
|
Put(*conn) error
|
||||||
Remove(*conn) error
|
Remove(*conn, error) error
|
||||||
Len() int
|
Len() int
|
||||||
FreeLen() int
|
FreeLen() int
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -130,7 +130,7 @@ type connPool struct {
|
||||||
|
|
||||||
_closed int32
|
_closed int32
|
||||||
|
|
||||||
lastDialErr error
|
lastErr atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConnPool(opt *Options) *connPool {
|
func newConnPool(opt *Options) *connPool {
|
||||||
|
@ -204,15 +204,15 @@ func (p *connPool) wait() *conn {
|
||||||
func (p *connPool) new() (*conn, error) {
|
func (p *connPool) new() (*conn, error) {
|
||||||
if p.rl.Limit() {
|
if p.rl.Limit() {
|
||||||
err := fmt.Errorf(
|
err := fmt.Errorf(
|
||||||
"redis: you open connections too fast (last error: %v)",
|
"redis: you open connections too fast (last_error=%q)",
|
||||||
p.lastDialErr,
|
p.loadLastErr(),
|
||||||
)
|
)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, err := p.dialer()
|
cn, err := p.dialer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.lastDialErr = err
|
p.storeLastErr(err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,8 +255,9 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
|
||||||
func (p *connPool) Put(cn *conn) error {
|
func (p *connPool) Put(cn *conn) error {
|
||||||
if cn.rd.Buffered() != 0 {
|
if cn.rd.Buffered() != 0 {
|
||||||
b, _ := cn.rd.Peek(cn.rd.Buffered())
|
b, _ := cn.rd.Peek(cn.rd.Buffered())
|
||||||
log.Printf("redis: connection has unread data: %q", b)
|
err := fmt.Errorf("redis: connection has unread data: %q", b)
|
||||||
return p.Remove(cn)
|
log.Print(err)
|
||||||
|
return p.Remove(cn, err)
|
||||||
}
|
}
|
||||||
if p.opt.getIdleTimeout() > 0 {
|
if p.opt.getIdleTimeout() > 0 {
|
||||||
cn.usedAt = time.Now()
|
cn.usedAt = time.Now()
|
||||||
|
@ -275,7 +276,9 @@ func (p *connPool) replace(cn *conn) (*conn, error) {
|
||||||
return newcn, nil
|
return newcn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *connPool) Remove(cn *conn) error {
|
func (p *connPool) Remove(cn *conn, reason error) error {
|
||||||
|
p.storeLastErr(reason.Error())
|
||||||
|
|
||||||
// Replace existing connection with new one and unblock waiter.
|
// Replace existing connection with new one and unblock waiter.
|
||||||
newcn, err := p.replace(cn)
|
newcn, err := p.replace(cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -330,6 +333,17 @@ func (p *connPool) reaper() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *connPool) storeLastErr(err string) {
|
||||||
|
p.lastErr.Store(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *connPool) loadLastErr() string {
|
||||||
|
if v := p.lastErr.Load(); v != nil {
|
||||||
|
return v.(string)
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type singleConnPool struct {
|
type singleConnPool struct {
|
||||||
|
@ -357,7 +371,7 @@ func (p *singleConnPool) Put(cn *conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *singleConnPool) Remove(cn *conn) error {
|
func (p *singleConnPool) Remove(cn *conn, _ error) error {
|
||||||
if p.cn != cn {
|
if p.cn != cn {
|
||||||
panic("p.cn != cn")
|
panic("p.cn != cn")
|
||||||
}
|
}
|
||||||
|
@ -440,13 +454,13 @@ func (p *stickyConnPool) Put(cn *conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *stickyConnPool) remove() (err error) {
|
func (p *stickyConnPool) remove(reason error) (err error) {
|
||||||
err = p.pool.Remove(p.cn)
|
err = p.pool.Remove(p.cn, reason)
|
||||||
p.cn = nil
|
p.cn = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *stickyConnPool) Remove(cn *conn) error {
|
func (p *stickyConnPool) Remove(cn *conn, _ error) error {
|
||||||
defer p.mx.Unlock()
|
defer p.mx.Unlock()
|
||||||
p.mx.Lock()
|
p.mx.Lock()
|
||||||
if p.closed {
|
if p.closed {
|
||||||
|
@ -479,10 +493,10 @@ func (p *stickyConnPool) FreeLen() int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *stickyConnPool) Reset() (err error) {
|
func (p *stickyConnPool) Reset(reason error) (err error) {
|
||||||
p.mx.Lock()
|
p.mx.Lock()
|
||||||
if p.cn != nil {
|
if p.cn != nil {
|
||||||
err = p.remove()
|
err = p.remove(reason)
|
||||||
}
|
}
|
||||||
p.mx.Unlock()
|
p.mx.Unlock()
|
||||||
return err
|
return err
|
||||||
|
@ -500,7 +514,8 @@ func (p *stickyConnPool) Close() error {
|
||||||
if p.reusable {
|
if p.reusable {
|
||||||
err = p.put()
|
err = p.put()
|
||||||
} else {
|
} else {
|
||||||
err = p.remove()
|
reason := errors.New("redis: sticky not reusable connection")
|
||||||
|
err = p.remove(reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
26
pool_test.go
26
pool_test.go
|
@ -1,6 +1,7 @@
|
||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -36,7 +37,6 @@ var _ = Describe("pool", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
|
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -141,12 +141,12 @@ var _ = Describe("pool", func() {
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
|
|
||||||
// Reserve one connection.
|
// Reserve one connection.
|
||||||
cn, _, err := client.Pool().Get()
|
cn, _, err := pool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Reserve the rest of connections.
|
// Reserve the rest of connections.
|
||||||
for i := 0; i < 9; i++ {
|
for i := 0; i < 9; i++ {
|
||||||
_, _, err := client.Pool().Get()
|
_, _, err := pool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,7 +168,8 @@ var _ = Describe("pool", func() {
|
||||||
// ok
|
// ok
|
||||||
}
|
}
|
||||||
|
|
||||||
Expect(pool.Remove(cn)).NotTo(HaveOccurred())
|
err = pool.Remove(cn, errors.New("test"))
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Check that Ping is unblocked.
|
// Check that Ping is unblocked.
|
||||||
select {
|
select {
|
||||||
|
@ -179,6 +180,23 @@ var _ = Describe("pool", func() {
|
||||||
}
|
}
|
||||||
Expect(ping.Err()).NotTo(HaveOccurred())
|
Expect(ping.Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should rate limit dial", func() {
|
||||||
|
pool := client.Pool()
|
||||||
|
|
||||||
|
var rateErr error
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
cn, _, err := pool.Get()
|
||||||
|
if err != nil {
|
||||||
|
rateErr = err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = pool.Remove(cn, errors.New("test"))
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
func BenchmarkPool(b *testing.B) {
|
func BenchmarkPool(b *testing.B) {
|
||||||
|
|
|
@ -233,9 +233,9 @@ func (c *PubSub) Receive() (interface{}, error) {
|
||||||
return c.ReceiveTimeout(0)
|
return c.ReceiveTimeout(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PubSub) reconnect() {
|
func (c *PubSub) reconnect(reason error) {
|
||||||
// Close current connection.
|
// Close current connection.
|
||||||
c.connPool.(*stickyConnPool).Reset()
|
c.connPool.(*stickyConnPool).Reset(reason)
|
||||||
|
|
||||||
if len(c.channels) > 0 {
|
if len(c.channels) > 0 {
|
||||||
if err := c.Subscribe(c.channels...); err != nil {
|
if err := c.Subscribe(c.channels...); err != nil {
|
||||||
|
@ -276,7 +276,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
|
||||||
if errNum > 2 {
|
if errNum > 2 {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
c.reconnect()
|
c.reconnect(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
7
redis.go
7
redis.go
|
@ -20,10 +20,9 @@ func (c *baseClient) conn() (*conn, bool, error) {
|
||||||
return c.connPool.Get()
|
return c.connPool.Get()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) putConn(cn *conn, ei error) {
|
func (c *baseClient) putConn(cn *conn, err error) {
|
||||||
var err error
|
if isBadConn(cn, err) {
|
||||||
if isBadConn(cn, ei) {
|
err = c.connPool.Remove(cn, err)
|
||||||
err = c.connPool.Remove(cn)
|
|
||||||
} else {
|
} else {
|
||||||
err = c.connPool.Put(cn)
|
err = c.connPool.Put(cn)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -227,11 +228,12 @@ func (d *sentinelFailover) closeOldConns(newMaster string) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if cn.RemoteAddr().String() != newMaster {
|
if cn.RemoteAddr().String() != newMaster {
|
||||||
log.Printf(
|
err := fmt.Errorf(
|
||||||
"redis-sentinel: closing connection to the old master %s",
|
"redis-sentinel: closing connection to the old master %s",
|
||||||
cn.RemoteAddr(),
|
cn.RemoteAddr(),
|
||||||
)
|
)
|
||||||
d.pool.Remove(cn)
|
log.Print(err)
|
||||||
|
d.pool.Remove(cn, err)
|
||||||
} else {
|
} else {
|
||||||
cnsToPut = append(cnsToPut, cn)
|
cnsToPut = append(cnsToPut, cn)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue