Merge pull request #884 from go-redis/feature/limiter

Add Limiter interface
This commit is contained in:
Vladimir Mihailenco 2018-11-29 11:21:03 +02:00 committed by GitHub
commit 1d1269e0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 8 deletions

View File

@ -5,8 +5,6 @@ services:
- redis-server - redis-server
go: go:
- 1.7.x
- 1.8.x
- 1.9.x - 1.9.x
- 1.10.x - 1.10.x
- 1.11.x - 1.11.x

View File

@ -50,6 +50,9 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading. // and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error) ClusterSlots func() ([]ClusterSlot, error)
// Optional hook that is called when a new node is created.
OnNewNode func(*Client)
// Following options are copied from Options struct. // Following options are copied from Options struct.
OnConnect func(*Conn) error OnConnect func(*Conn) error
@ -166,6 +169,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency() go node.updateLatency()
} }
if clOpt.OnNewNode != nil {
clOpt.OnNewNode(node.Client)
}
return &node return &node
} }

View File

@ -270,11 +270,11 @@ var _ = Describe("Commands", func() {
n, err := client.MemoryUsage("foo").Result() n, err := client.MemoryUsage("foo").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(52))) Expect(n).To(Equal(int64(50)))
n, err = client.MemoryUsage("foo", 0).Result() n, err = client.MemoryUsage("foo", 0).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(52))) Expect(n).To(Equal(int64(50)))
}) })
}) })

View File

@ -14,6 +14,17 @@ import (
"github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/pool"
) )
// Limiter is the interface of a rate limiter or a circuit breaker.
type Limiter interface {
// Allow returns a nil if operation is allowed or an error otherwise.
// If operation is allowed client must report the result of operation
// whether is a success or a failure.
Allow() error
// ReportResult reports the result of previously allowed operation.
// nil indicates a success, non-nil error indicates a failure.
ReportResult(result error)
}
type Options struct { type Options struct {
// The network type, either tcp or unix. // The network type, either tcp or unix.
// Default is tcp. // Default is tcp.

View File

@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) {
type baseClient struct { type baseClient struct {
opt *Options opt *Options
connPool pool.Pooler connPool pool.Pooler
limiter Limiter
process func(Cmder) error process func(Cmder) error
processPipeline func([]Cmder) error processPipeline func([]Cmder) error
@ -61,6 +62,24 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
} }
func (c *baseClient) getConn() (*pool.Conn, error) { func (c *baseClient) getConn() (*pool.Conn, error) {
if c.limiter != nil {
err := c.limiter.Allow()
if err != nil {
return nil, err
}
}
cn, err := c._getConn()
if err != nil {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
return nil, err
}
return cn, nil
}
func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get() cn, err := c.connPool.Get()
if err != nil { if err != nil {
return nil, err return nil, err
@ -78,6 +97,10 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
} }
func (c *baseClient) releaseConn(cn *pool.Conn, err error) { func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if internal.IsBadConn(err, false) { if internal.IsBadConn(err, false) {
c.connPool.Remove(cn) c.connPool.Remove(cn)
} else { } else {
@ -86,6 +109,10 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
} }
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if err == nil || internal.IsRedisError(err) { if err == nil || internal.IsRedisError(err) {
c.connPool.Put(cn) c.connPool.Put(cn)
} else { } else {
@ -132,7 +159,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Do creates a Cmd from the args and processes the cmd. // Do creates a Cmd from the args and processes the cmd.
func (c *baseClient) Do(args ...interface{}) *Cmd { func (c *baseClient) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...) cmd := NewCmd(args...)
c.Process(cmd) _ = c.Process(cmd)
return cmd return cmd
} }
@ -362,7 +389,8 @@ type Client struct {
baseClient baseClient
cmdable cmdable
ctx context.Context limiter Limiter
ctx context.Context
} }
// NewClient returns a client to the Redis Server specified by Options. // NewClient returns a client to the Redis Server specified by Options.
@ -396,12 +424,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil { if ctx == nil {
panic("nil context") panic("nil context")
} }
c2 := c.copy() c2 := c.clone()
c2.ctx = ctx c2.ctx = ctx
return c2 return c2
} }
func (c *Client) copy() *Client { func (c *Client) clone() *Client {
cp := *c cp := *c
cp.init() cp.init()
return &cp return &cp
@ -412,6 +440,11 @@ func (c *Client) Options() *Options {
return c.opt return c.opt
} }
func (c *Client) SetLimiter(l Limiter) *Client {
c.limiter = l
return c
}
type PoolStats pool.Stats type PoolStats pool.Stats
// PoolStats returns connection pool stats. // PoolStats returns connection pool stats.