From 5d8c5c5c37620fcb38ba114d9307eb96ae0c77ea Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sun, 14 Oct 2018 10:53:48 +0300 Subject: [PATCH] Add Limiter interface --- cluster.go | 7 +++++++ options.go | 11 +++++++++++ redis.go | 41 +++++++++++++++++++++++++++++++++++++---- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/cluster.go b/cluster.go index 03c186cf..19185dea 100644 --- a/cluster.go +++ b/cluster.go @@ -50,6 +50,9 @@ type ClusterOptions struct { // and Cluster.ReloadState to manually trigger state reloading. ClusterSlots func() ([]ClusterSlot, error) + // Optional hook that is called when a new node is created. + OnNewNode func(*Client) + // Following options are copied from Options struct. OnConnect func(*Conn) error @@ -166,6 +169,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { go node.updateLatency() } + if clOpt.OnNewNode != nil { + clOpt.OnNewNode(node.Client) + } + return &node } diff --git a/options.go b/options.go index 088f6977..16dbd5eb 100644 --- a/options.go +++ b/options.go @@ -14,6 +14,17 @@ import ( "github.com/go-redis/redis/internal/pool" ) +// Limiter is the interface of a rate limiter or a circuit breaker. +type Limiter interface { + // Allow returns a nil if operation is allowed or an error otherwise. + // If operation is allowed client must report the result of operation + // whether is a success or a failure. + Allow() error + // ReportResult reports the result of previously allowed operation. + // nil indicates a success, non-nil error indicates a failure. + ReportResult(result error) +} + type Options struct { // The network type, either tcp or unix. // Default is tcp. diff --git a/redis.go b/redis.go index 92a07fc6..8a1f4f35 100644 --- a/redis.go +++ b/redis.go @@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) { type baseClient struct { opt *Options connPool pool.Pooler + limiter Limiter process func(Cmder) error processPipeline func([]Cmder) error @@ -61,6 +62,24 @@ func (c *baseClient) newConn() (*pool.Conn, error) { } func (c *baseClient) getConn() (*pool.Conn, error) { + if c.limiter != nil { + err := c.limiter.Allow() + if err != nil { + return nil, err + } + } + + cn, err := c._getConn() + if err != nil { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + return nil, err + } + return cn, nil +} + +func (c *baseClient) _getConn() (*pool.Conn, error) { cn, err := c.connPool.Get() if err != nil { return nil, err @@ -78,6 +97,10 @@ func (c *baseClient) getConn() (*pool.Conn, error) { } func (c *baseClient) releaseConn(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + if internal.IsBadConn(err, false) { c.connPool.Remove(cn) } else { @@ -86,6 +109,10 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) { } func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + if err == nil || internal.IsRedisError(err) { c.connPool.Put(cn) } else { @@ -132,7 +159,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { // Do creates a Cmd from the args and processes the cmd. func (c *baseClient) Do(args ...interface{}) *Cmd { cmd := NewCmd(args...) - c.Process(cmd) + _ = c.Process(cmd) return cmd } @@ -362,7 +389,8 @@ type Client struct { baseClient cmdable - ctx context.Context + limiter Limiter + ctx context.Context } // NewClient returns a client to the Redis Server specified by Options. @@ -396,12 +424,12 @@ func (c *Client) WithContext(ctx context.Context) *Client { if ctx == nil { panic("nil context") } - c2 := c.copy() + c2 := c.clone() c2.ctx = ctx return c2 } -func (c *Client) copy() *Client { +func (c *Client) clone() *Client { cp := *c cp.init() return &cp @@ -412,6 +440,11 @@ func (c *Client) Options() *Options { return c.opt } +func (c *Client) SetLimiter(l Limiter) *Client { + c.limiter = l + return c +} + type PoolStats pool.Stats // PoolStats returns connection pool stats.