Add WrapProcessPipeline

This commit is contained in:
yifei.huang 2018-01-20 18:26:33 +08:00 committed by Vladimir Mihailenco
parent 2c11cbf01a
commit 8b4fa6d443
8 changed files with 174 additions and 109 deletions

View File

@ -445,6 +445,10 @@ type ClusterClient struct {
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
process func(Cmder) error
processPipeline func([]Cmder) error
processTxPipeline func([]Cmder) error
// Reports whether slots reloading is in progress.
reloading uint32
}
@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt: opt,
nodes: newClusterNodes(opt),
}
c.setProcessor(c.Process)
c.process = c.defaultProcess
c.processPipeline = c.defaultProcessPipeline
c.processTxPipeline = c.defaultProcessTxPipeline
c.cmdable.setProcessor(c.Process)
// Add initial nodes.
for _, addr := range opt.Addrs {
@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
func (c *ClusterClient) WrapProcess(
fn func(oldProcess func(Cmder) error) func(Cmder) error,
) {
c.process = fn(c.process)
}
func (c *ClusterClient) Process(cmd Cmder) error {
if c.process != nil {
return c.process(cmd)
}
return c.defaultProcess(cmd)
}
func (c *ClusterClient) defaultProcess(cmd Cmder) error {
state, err := c.state()
if err != nil {
cmd.setErr(err)
@ -910,9 +932,9 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExec,
exec: c.processPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@ -920,7 +942,13 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
func (c *ClusterClient) WrapProcessPipeline(
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
) {
c.processPipeline = fn(c.processPipeline)
}
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
@ -1064,9 +1092,9 @@ func (c *ClusterClient) checkMovedErr(
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.txPipelineExec,
exec: c.processTxPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@ -1074,7 +1102,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(fn)
}
func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
state, err := c.state()
if err != nil {
return err

View File

@ -2,58 +2,47 @@ package redis_test
import (
"fmt"
"sync/atomic"
"time"
"github.com/go-redis/redis"
)
func Example_instrumentation() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"shard1": ":6379",
},
cl := redis.NewClient(&redis.Options{
Addr: ":6379",
})
ring.ForEachShard(func(client *redis.Client) error {
wrapRedisProcess(client)
return nil
})
for {
ring.Ping()
}
}
func wrapRedisProcess(client *redis.Client) {
const precision = time.Microsecond
var count, avgDur uint32
go func() {
for range time.Tick(3 * time.Second) {
n := atomic.LoadUint32(&count)
dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
}
}()
client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
start := time.Now()
err := oldProcess(cmd)
dur := time.Since(start)
const decay = float64(1) / 100
ms := float64(dur / precision)
for {
avg := atomic.LoadUint32(&avgDur)
newAvg := uint32((1-decay)*float64(avg) + decay*ms)
if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
break
}
}
atomic.AddUint32(&count, 1)
fmt.Printf("starting processing: <%s>\n", cmd)
err := old(cmd)
fmt.Printf("finished processing: <%s>\n", cmd)
return err
}
})
cl.Ping()
// Output: starting processing: <ping: >
// finished processing: <ping: PONG>
}
func Example_Pipeline_instrumentation() {
client := redis.NewClient(&redis.Options{
Addr: ":6379",
})
client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error {
return func(cmds []redis.Cmder) error {
fmt.Printf("pipeline starting processing: %v\n", cmds)
err := old(cmds)
fmt.Printf("pipeline finished processing: %v\n", cmds)
return err
}
})
client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
pipe.Ping()
return nil
})
// Output: pipeline starting processing: [ping: ping: ]
// pipeline finished processing: [ping: PONG ping: PONG]
}

104
redis.go
View File

@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) {
internal.Logger = logger
}
func (c *baseClient) init() {
c.process = c.defaultProcess
c.processPipeline = c.defaultProcessPipeline
c.processTxPipeline = c.defaultProcessTxPipeline
}
func (c *baseClient) String() string {
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
}
@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
connPool: pool.NewSingleConnPool(cn),
},
}
conn.setProcessor(conn.Process)
conn.baseClient.init()
conn.statefulCmdable.setProcessor(conn.Process)
_, err := conn.Pipelined(func(pipe Pipeliner) error {
if c.opt.Password != "" {
@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.process = fn(c.defaultProcess)
c.process = fn(c.process)
}
func (c *baseClient) Process(cmd Cmder) error {
if c.process != nil {
return c.process(cmd)
}
return c.defaultProcess(cmd)
return c.process(cmd)
}
func (c *baseClient) defaultProcess(cmd Cmder) error {
@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string {
return c.opt.Addr
}
func (c *baseClient) WrapProcessPipeline(
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
) {
c.processPipeline = fn(c.processPipeline)
c.processTxPipeline = fn(c.processTxPipeline)
}
func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
}
func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
}
type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
return func(cmds []Cmder) error {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
cn, _, err := c.getConn()
if err != nil {
setCmdsErr(cmds, err)
return err
}
canRetry, err := p(cn, cmds)
if err == nil || internal.IsRedisError(err) {
_ = c.connPool.Put(cn)
break
}
_ = c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err, true) {
break
}
func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
cn, _, err := c.getConn()
if err != nil {
setCmdsErr(cmds, err)
return err
}
canRetry, err := p(cn, cmds)
if err == nil || internal.IsRedisError(err) {
_ = c.connPool.Put(cn)
break
}
_ = c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err, true) {
break
}
return firstCmdsErr(cmds)
}
return firstCmdsErr(cmds)
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@ -324,14 +341,15 @@ type Client struct {
}
func newClient(opt *Options, pool pool.Pooler) *Client {
client := Client{
c := Client{
baseClient: baseClient{
opt: opt,
connPool: pool,
},
}
client.setProcessor(client.Process)
return &client
c.baseClient.init()
c.cmdable.setProcessor(c.Process)
return &c
}
// NewClient returns a client to the Redis Server specified by Options.
@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client {
func (c *Client) copy() *Client {
c2 := new(Client)
*c2 = *c
c2.setProcessor(c2.Process)
c2.cmdable.setProcessor(c2.Process)
return c2
}
@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExecer(c.pipelineProcessCmds),
exec: c.processPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds),
exec: c.processTxPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExecer(c.pipelineProcessCmds),
exec: c.processPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds),
exec: c.processTxPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}

View File

@ -12,7 +12,10 @@ type baseClient struct {
connPool pool.Pooler
opt *Options
process func(Cmder) error
process func(Cmder) error
processPipeline func([]Cmder) error
processTxPipeline func([]Cmder) error
onClose func() error // hook called when client is closed
ctx context.Context

View File

@ -10,6 +10,9 @@ type baseClient struct {
connPool pool.Pooler
opt *Options
process func(Cmder) error
process func(Cmder) error
processPipeline func([]Cmder) error
processTxPipeline func([]Cmder) error
onClose func() error // hook called when client is closed
}

29
ring.go
View File

@ -150,6 +150,8 @@ type Ring struct {
shards map[string]*ringShard
shardsList []*ringShard
processPipeline func([]Cmder) error
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
@ -158,7 +160,9 @@ type Ring struct {
func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
opt.init()
ring := &Ring{
opt: opt,
nreplicas: nreplicas,
@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring {
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
ring.setProcessor(ring.Process)
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
ring.addShard(name, NewClient(clopt))
}
go ring.heartbeat()
return ring
}
@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shardByKey(firstKey)
}
func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.ForEachShard(func(c *Client) error {
c.WrapProcess(fn)
return nil
})
}
func (c *Ring) Process(cmd Cmder) error {
shard, err := c.cmdShard(cmd)
if err != nil {
@ -436,9 +451,9 @@ func (c *Ring) Close() error {
func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExec,
exec: c.processPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.cmdable.setProcessor(pipe.Process)
return &pipe
}
@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
func (c *Ring) pipelineExec(cmds []Cmder) error {
func (c *Ring) WrapProcessPipeline(
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
) {
c.processPipeline = fn(c.processPipeline)
}
func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())

View File

@ -76,7 +76,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt,
}
client := Client{
c := Client{
baseClient: baseClient{
opt: opt,
connPool: failover.Pool(),
@ -86,9 +86,10 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
},
},
}
client.setProcessor(client.Process)
c.baseClient.init()
c.setProcessor(c.Process)
return &client
return &c
}
//------------------------------------------------------------------------------
@ -100,14 +101,15 @@ type sentinelClient struct {
func newSentinel(opt *Options) *sentinelClient {
opt.init()
client := sentinelClient{
c := sentinelClient{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
},
}
client.cmdable = cmdable{client.Process}
return &client
c.baseClient.init()
c.cmdable.setProcessor(c.Process)
return &c
}
func (c *sentinelClient) PubSub() *PubSub {

7
tx.go
View File

@ -24,7 +24,8 @@ func (c *Client) newTx() *Tx {
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
},
}
tx.setProcessor(tx.Process)
tx.baseClient.init()
tx.statefulCmdable.setProcessor(tx.Process)
return &tx
}
@ -75,9 +76,9 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds),
exec: c.processTxPipeline,
}
pipe.setProcessor(pipe.Process)
pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}