Add Limiter interface

This commit is contained in:
Vladimir Mihailenco 2018-10-14 10:53:48 +03:00
parent 78a66f0e5f
commit 5d8c5c5c37
3 changed files with 55 additions and 4 deletions

View File

@ -50,6 +50,9 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
// Optional hook that is called when a new node is created.
OnNewNode func(*Client)
// Following options are copied from Options struct.
OnConnect func(*Conn) error
@ -166,6 +169,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency()
}
if clOpt.OnNewNode != nil {
clOpt.OnNewNode(node.Client)
}
return &node
}

View File

@ -14,6 +14,17 @@ import (
"github.com/go-redis/redis/internal/pool"
)
// Limiter is the interface of a rate limiter or a circuit breaker.
type Limiter interface {
// Allow returns a nil if operation is allowed or an error otherwise.
// If operation is allowed client must report the result of operation
// whether is a success or a failure.
Allow() error
// ReportResult reports the result of previously allowed operation.
// nil indicates a success, non-nil error indicates a failure.
ReportResult(result error)
}
type Options struct {
// The network type, either tcp or unix.
// Default is tcp.

View File

@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) {
type baseClient struct {
opt *Options
connPool pool.Pooler
limiter Limiter
process func(Cmder) error
processPipeline func([]Cmder) error
@ -61,6 +62,24 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
}
func (c *baseClient) getConn() (*pool.Conn, error) {
if c.limiter != nil {
err := c.limiter.Allow()
if err != nil {
return nil, err
}
}
cn, err := c._getConn()
if err != nil {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
return nil, err
}
return cn, nil
}
func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get()
if err != nil {
return nil, err
@ -78,6 +97,10 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
}
func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if internal.IsBadConn(err, false) {
c.connPool.Remove(cn)
} else {
@ -86,6 +109,10 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
}
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if err == nil || internal.IsRedisError(err) {
c.connPool.Put(cn)
} else {
@ -132,7 +159,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Do creates a Cmd from the args and processes the cmd.
func (c *baseClient) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
c.Process(cmd)
_ = c.Process(cmd)
return cmd
}
@ -362,7 +389,8 @@ type Client struct {
baseClient
cmdable
ctx context.Context
limiter Limiter
ctx context.Context
}
// NewClient returns a client to the Redis Server specified by Options.
@ -396,12 +424,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
c2 := c.copy()
c2 := c.clone()
c2.ctx = ctx
return c2
}
func (c *Client) copy() *Client {
func (c *Client) clone() *Client {
cp := *c
cp.init()
return &cp
@ -412,6 +440,11 @@ func (c *Client) Options() *Options {
return c.opt
}
func (c *Client) SetLimiter(l Limiter) *Client {
c.limiter = l
return c
}
type PoolStats pool.Stats
// PoolStats returns connection pool stats.