From 8b4fa6d443e35ca8b1c37be877285252430b06a3 Mon Sep 17 00:00:00 2001 From: "yifei.huang" Date: Sat, 20 Jan 2018 18:26:33 +0800 Subject: [PATCH] Add WrapProcessPipeline --- cluster.go | 42 ++++++++++--- example_instrumentation_test.go | 77 ++++++++++------------- redis.go | 104 +++++++++++++++++++------------- redis_context.go | 5 +- redis_no_context.go | 5 +- ring.go | 29 +++++++-- sentinel.go | 14 +++-- tx.go | 7 ++- 8 files changed, 174 insertions(+), 109 deletions(-) diff --git a/cluster.go b/cluster.go index accdb3d..a2c18b3 100644 --- a/cluster.go +++ b/cluster.go @@ -445,6 +445,10 @@ type ClusterClient struct { cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + // Reports whether slots reloading is in progress. reloading uint32 } @@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { opt: opt, nodes: newClusterNodes(opt), } - c.setProcessor(c.Process) + + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline + + c.cmdable.setProcessor(c.Process) // Add initial nodes. for _, addr := range opt.Addrs { @@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +func (c *ClusterClient) WrapProcess( + fn func(oldProcess func(Cmder) error) func(Cmder) error, +) { + c.process = fn(c.process) +} + func (c *ClusterClient) Process(cmd Cmder) error { + if c.process != nil { + return c.process(cmd) + } + return c.defaultProcess(cmd) +} + +func (c *ClusterClient) defaultProcess(cmd Cmder) error { state, err := c.state() if err != nil { cmd.setErr(err) @@ -910,9 +932,9 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { func (c *ClusterClient) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -920,7 +942,13 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *ClusterClient) pipelineExec(cmds []Cmder) error { +func (c *ClusterClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { setCmdsErr(cmds, err) @@ -1064,9 +1092,9 @@ func (c *ClusterClient) checkMovedErr( // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *ClusterClient) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.txPipelineExec, + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -1074,7 +1102,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.TxPipeline().Pipelined(fn) } -func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { +func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { state, err := c.state() if err != nil { return err diff --git a/example_instrumentation_test.go b/example_instrumentation_test.go index 02051f9..85abbd7 100644 --- a/example_instrumentation_test.go +++ b/example_instrumentation_test.go @@ -2,58 +2,47 @@ package redis_test import ( "fmt" - "sync/atomic" - "time" "github.com/go-redis/redis" ) func Example_instrumentation() { - ring := redis.NewRing(&redis.RingOptions{ - Addrs: map[string]string{ - "shard1": ":6379", - }, + cl := redis.NewClient(&redis.Options{ + Addr: ":6379", }) - ring.ForEachShard(func(client *redis.Client) error { - wrapRedisProcess(client) - return nil - }) - - for { - ring.Ping() - } -} - -func wrapRedisProcess(client *redis.Client) { - const precision = time.Microsecond - var count, avgDur uint32 - - go func() { - for range time.Tick(3 * time.Second) { - n := atomic.LoadUint32(&count) - dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision - fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur) - } - }() - - client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error { + cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { return func(cmd redis.Cmder) error { - start := time.Now() - err := oldProcess(cmd) - dur := time.Since(start) - - const decay = float64(1) / 100 - ms := float64(dur / precision) - for { - avg := atomic.LoadUint32(&avgDur) - newAvg := uint32((1-decay)*float64(avg) + decay*ms) - if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) { - break - } - } - atomic.AddUint32(&count, 1) - + fmt.Printf("starting processing: <%s>\n", cmd) + err := old(cmd) + fmt.Printf("finished processing: <%s>\n", cmd) return err } }) + + cl.Ping() + // Output: starting processing: + // finished processing: +} + +func Example_Pipeline_instrumentation() { + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + + client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error { + return func(cmds []redis.Cmder) error { + fmt.Printf("pipeline starting processing: %v\n", cmds) + err := old(cmds) + fmt.Printf("pipeline finished processing: %v\n", cmds) + return err + } + }) + + client.Pipelined(func(pipe redis.Pipeliner) error { + pipe.Ping() + pipe.Ping() + return nil + }) + // Output: pipeline starting processing: [ping: ping: ] + // pipeline finished processing: [ping: PONG ping: PONG] } diff --git a/redis.go b/redis.go index ab7beb7..cf40298 100644 --- a/redis.go +++ b/redis.go @@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) { internal.Logger = logger } +func (c *baseClient) init() { + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline +} + func (c *baseClient) String() string { return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) } @@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error { connPool: pool.NewSingleConnPool(cn), }, } - conn.setProcessor(conn.Process) + conn.baseClient.init() + conn.statefulCmdable.setProcessor(conn.Process) _, err := conn.Pipelined(func(pipe Pipeliner) error { if c.opt.Password != "" { @@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error { // an input and returns the new wrapper process func. createWrapper should // use call the old process func within the new process func. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.process = fn(c.defaultProcess) + c.process = fn(c.process) } func (c *baseClient) Process(cmd Cmder) error { - if c.process != nil { - return c.process(cmd) - } - return c.defaultProcess(cmd) + return c.process(cmd) } func (c *baseClient) defaultProcess(cmd Cmder) error { @@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string { return c.opt.Addr } +func (c *baseClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) + c.processTxPipeline = fn(c.processTxPipeline) +} + +func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) +} + +func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) +} + type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) -func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { - return func(cmds []Cmder) error { - for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { - if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) - } - - cn, _, err := c.getConn() - if err != nil { - setCmdsErr(cmds, err) - return err - } - - canRetry, err := p(cn, cmds) - - if err == nil || internal.IsRedisError(err) { - _ = c.connPool.Put(cn) - break - } - _ = c.connPool.Remove(cn) - - if !canRetry || !internal.IsRetryableError(err, true) { - break - } +func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + cn, _, err := c.getConn() + if err != nil { + setCmdsErr(cmds, err) + return err + } + + canRetry, err := p(cn, cmds) + + if err == nil || internal.IsRedisError(err) { + _ = c.connPool.Put(cn) + break + } + _ = c.connPool.Remove(cn) + + if !canRetry || !internal.IsRetryableError(err, true) { + break } - return firstCmdsErr(cmds) } + return firstCmdsErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { @@ -324,14 +341,15 @@ type Client struct { } func newClient(opt *Options, pool pool.Pooler) *Client { - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: pool, }, } - client.setProcessor(client.Process) - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } // NewClient returns a client to the Redis Server specified by Options. @@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client { func (c *Client) copy() *Client { c2 := new(Client) *c2 = *c - c2.setProcessor(c2.Process) + c2.cmdable.setProcessor(c2.Process) return c2 } @@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Client) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Client) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Conn) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Conn) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } diff --git a/redis_context.go b/redis_context.go index 6ec811c..c00e505 100644 --- a/redis_context.go +++ b/redis_context.go @@ -12,7 +12,10 @@ type baseClient struct { connPool pool.Pooler opt *Options - process func(Cmder) error + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + onClose func() error // hook called when client is closed ctx context.Context diff --git a/redis_no_context.go b/redis_no_context.go index 0752192..8555c5c 100644 --- a/redis_no_context.go +++ b/redis_no_context.go @@ -10,6 +10,9 @@ type baseClient struct { connPool pool.Pooler opt *Options - process func(Cmder) error + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + onClose func() error // hook called when client is closed } diff --git a/ring.go b/ring.go index c11ef6b..10f33ed 100644 --- a/ring.go +++ b/ring.go @@ -150,6 +150,8 @@ type Ring struct { shards map[string]*ringShard shardsList []*ringShard + processPipeline func([]Cmder) error + cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -158,7 +160,9 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { const nreplicas = 100 + opt.init() + ring := &Ring{ opt: opt, nreplicas: nreplicas, @@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring { hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), } - ring.setProcessor(ring.Process) + ring.processPipeline = ring.defaultProcessPipeline + ring.cmdable.setProcessor(ring.Process) + for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr ring.addShard(name, NewClient(clopt)) } + go ring.heartbeat() + return ring } @@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shardByKey(firstKey) } +func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { + c.ForEachShard(func(c *Client) error { + c.WrapProcess(fn) + return nil + }) +} + func (c *Ring) Process(cmd Cmder) error { shard, err := c.cmdShard(cmd) if err != nil { @@ -436,9 +451,9 @@ func (c *Ring) Close() error { func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.cmdable.setProcessor(pipe.Process) return &pipe } @@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) error { +func (c *Ring) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) diff --git a/sentinel.go b/sentinel.go index 37d06b4..3f56f08 100644 --- a/sentinel.go +++ b/sentinel.go @@ -76,7 +76,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { opt: opt, } - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: failover.Pool(), @@ -86,9 +86,10 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { }, }, } - client.setProcessor(client.Process) + c.baseClient.init() + c.setProcessor(c.Process) - return &client + return &c } //------------------------------------------------------------------------------ @@ -100,14 +101,15 @@ type sentinelClient struct { func newSentinel(opt *Options) *sentinelClient { opt.init() - client := sentinelClient{ + c := sentinelClient{ baseClient: baseClient{ opt: opt, connPool: newConnPool(opt), }, } - client.cmdable = cmdable{client.Process} - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } func (c *sentinelClient) PubSub() *PubSub { diff --git a/tx.go b/tx.go index eaea66d..26c29be 100644 --- a/tx.go +++ b/tx.go @@ -24,7 +24,8 @@ func (c *Client) newTx() *Tx { connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true), }, } - tx.setProcessor(tx.Process) + tx.baseClient.init() + tx.statefulCmdable.setProcessor(tx.Process) return &tx } @@ -75,9 +76,9 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd { func (c *Tx) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe }