mirror of https://github.com/go-redis/redis.git
Allow for tracking state in Limiter
Extend Allow and ReportResult functions to handle a Context. Allow can override the passed in Context. The returned Context is then further passed down to ReportResult. Using this Context it is then possible to store values/track state between Allow and ReportResult calls. Without this tracking HalfOpen/Generation state is hard to implement efficiently for Circuit Breakers.
This commit is contained in:
parent
7d56a2c38d
commit
69bab38007
|
@ -21,10 +21,12 @@ type Limiter interface {
|
||||||
// Allow returns nil if operation is allowed or an error otherwise.
|
// Allow returns nil if operation is allowed or an error otherwise.
|
||||||
// If operation is allowed client must ReportResult of the operation
|
// If operation is allowed client must ReportResult of the operation
|
||||||
// whether it is a success or a failure.
|
// whether it is a success or a failure.
|
||||||
Allow() error
|
// The returned context will be passed to ReportResult.
|
||||||
|
Allow(ctx context.Context) (context.Context, error)
|
||||||
// ReportResult reports the result of the previously allowed operation.
|
// ReportResult reports the result of the previously allowed operation.
|
||||||
// nil indicates a success, non-nil error usually indicates a failure.
|
// nil indicates a success, non-nil error usually indicates a failure.
|
||||||
ReportResult(result error)
|
// Context can be used to access state tracked by previous Allow call.
|
||||||
|
ReportResult(ctx context.Context, result error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options keeps the settings to set up redis connection.
|
// Options keeps the settings to set up redis connection.
|
||||||
|
|
|
@ -1319,7 +1319,7 @@ func (c *ClusterClient) processPipelineNode(
|
||||||
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
|
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
|
||||||
) {
|
) {
|
||||||
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
|
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
|
||||||
cn, err := node.Client.getConn(ctx)
|
ctx, cn, err := node.Client.getConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
node.MarkAsFailing()
|
node.MarkAsFailing()
|
||||||
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
|
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
|
||||||
|
@ -1504,7 +1504,7 @@ func (c *ClusterClient) processTxPipelineNode(
|
||||||
) {
|
) {
|
||||||
cmds = wrapMultiExec(ctx, cmds)
|
cmds = wrapMultiExec(ctx, cmds)
|
||||||
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
|
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
|
||||||
cn, err := node.Client.getConn(ctx)
|
ctx, cn, err := node.Client.getConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
|
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
|
|
17
redis.go
17
redis.go
|
@ -237,23 +237,24 @@ func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
|
||||||
return cn, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
|
func (c *baseClient) getConn(ctx context.Context) (context.Context, *pool.Conn, error) {
|
||||||
|
var err error
|
||||||
if c.opt.Limiter != nil {
|
if c.opt.Limiter != nil {
|
||||||
err := c.opt.Limiter.Allow()
|
ctx, err = c.opt.Limiter.Allow(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return ctx, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, err := c._getConn(ctx)
|
cn, err := c._getConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.opt.Limiter != nil {
|
if c.opt.Limiter != nil {
|
||||||
c.opt.Limiter.ReportResult(err)
|
c.opt.Limiter.ReportResult(ctx, err)
|
||||||
}
|
}
|
||||||
return nil, err
|
return ctx, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cn, nil
|
return ctx, cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
|
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
|
||||||
|
@ -365,7 +366,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||||
|
|
||||||
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
|
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
|
||||||
if c.opt.Limiter != nil {
|
if c.opt.Limiter != nil {
|
||||||
c.opt.Limiter.ReportResult(err)
|
c.opt.Limiter.ReportResult(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isBadConn(err, false, c.opt.Addr) {
|
if isBadConn(err, false, c.opt.Addr) {
|
||||||
|
@ -378,7 +379,7 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
|
||||||
func (c *baseClient) withConn(
|
func (c *baseClient) withConn(
|
||||||
ctx context.Context, fn func(context.Context, *pool.Conn) error,
|
ctx context.Context, fn func(context.Context, *pool.Conn) error,
|
||||||
) error {
|
) error {
|
||||||
cn, err := c.getConn(ctx)
|
ctx, cn, err := c.getConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue