diff --git a/.travis.yml b/.travis.yml index 632feca0..6b110b4c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,8 +5,6 @@ services: - redis-server go: - - 1.7.x - - 1.8.x - 1.9.x - 1.10.x - 1.11.x diff --git a/cluster.go b/cluster.go index 3f140a6d..4c1fd73c 100644 --- a/cluster.go +++ b/cluster.go @@ -50,6 +50,9 @@ type ClusterOptions struct { // and Cluster.ReloadState to manually trigger state reloading. ClusterSlots func() ([]ClusterSlot, error) + // Optional hook that is called when a new node is created. + OnNewNode func(*Client) + // Following options are copied from Options struct. OnConnect func(*Conn) error @@ -166,6 +169,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { go node.updateLatency() } + if clOpt.OnNewNode != nil { + clOpt.OnNewNode(node.Client) + } + return &node } diff --git a/commands_test.go b/commands_test.go index 5bea31f6..2dba77c9 100644 --- a/commands_test.go +++ b/commands_test.go @@ -270,11 +270,11 @@ var _ = Describe("Commands", func() { n, err := client.MemoryUsage("foo").Result() Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(52))) + Expect(n).To(Equal(int64(50))) n, err = client.MemoryUsage("foo", 0).Result() Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(52))) + Expect(n).To(Equal(int64(50))) }) }) diff --git a/options.go b/options.go index 088f6977..16dbd5eb 100644 --- a/options.go +++ b/options.go @@ -14,6 +14,17 @@ import ( "github.com/go-redis/redis/internal/pool" ) +// Limiter is the interface of a rate limiter or a circuit breaker. +type Limiter interface { + // Allow returns a nil if operation is allowed or an error otherwise. + // If operation is allowed client must report the result of operation + // whether is a success or a failure. + Allow() error + // ReportResult reports the result of previously allowed operation. + // nil indicates a success, non-nil error indicates a failure. + ReportResult(result error) +} + type Options struct { // The network type, either tcp or unix. // Default is tcp. diff --git a/redis.go b/redis.go index c8b14ab6..f692ce52 100644 --- a/redis.go +++ b/redis.go @@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) { type baseClient struct { opt *Options connPool pool.Pooler + limiter Limiter process func(Cmder) error processPipeline func([]Cmder) error @@ -61,6 +62,24 @@ func (c *baseClient) newConn() (*pool.Conn, error) { } func (c *baseClient) getConn() (*pool.Conn, error) { + if c.limiter != nil { + err := c.limiter.Allow() + if err != nil { + return nil, err + } + } + + cn, err := c._getConn() + if err != nil { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + return nil, err + } + return cn, nil +} + +func (c *baseClient) _getConn() (*pool.Conn, error) { cn, err := c.connPool.Get() if err != nil { return nil, err @@ -78,6 +97,10 @@ func (c *baseClient) getConn() (*pool.Conn, error) { } func (c *baseClient) releaseConn(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + if internal.IsBadConn(err, false) { c.connPool.Remove(cn) } else { @@ -86,6 +109,10 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) { } func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + if err == nil || internal.IsRedisError(err) { c.connPool.Put(cn) } else { @@ -132,7 +159,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { // Do creates a Cmd from the args and processes the cmd. func (c *baseClient) Do(args ...interface{}) *Cmd { cmd := NewCmd(args...) - c.Process(cmd) + _ = c.Process(cmd) return cmd } @@ -362,7 +389,8 @@ type Client struct { baseClient cmdable - ctx context.Context + limiter Limiter + ctx context.Context } // NewClient returns a client to the Redis Server specified by Options. @@ -396,12 +424,12 @@ func (c *Client) WithContext(ctx context.Context) *Client { if ctx == nil { panic("nil context") } - c2 := c.copy() + c2 := c.clone() c2.ctx = ctx return c2 } -func (c *Client) copy() *Client { +func (c *Client) clone() *Client { cp := *c cp.init() return &cp @@ -412,6 +440,11 @@ func (c *Client) Options() *Options { return c.opt } +func (c *Client) SetLimiter(l Limiter) *Client { + c.limiter = l + return c +} + type PoolStats pool.Stats // PoolStats returns connection pool stats.