diff --git a/commands.go b/commands.go index 45112842..0f06652d 100644 --- a/commands.go +++ b/commands.go @@ -185,7 +185,6 @@ type Cmdable interface { ClientKill(ipPort string) *StatusCmd ClientList() *StringCmd ClientPause(dur time.Duration) *BoolCmd - ClientSetName(name string) *BoolCmd ConfigGet(parameter string) *SliceCmd ConfigResetStat() *StatusCmd ConfigSet(parameter, value string) *StatusCmd @@ -241,14 +240,6 @@ type cmdable struct { process func(cmd Cmder) error } -// WrapProcess replaces the process func. It takes a function createWrapper -// which is supplied by the user. createWrapper takes the old process func as -// an input and returns the new wrapper process func. createWrapper should -// use call the old process func within the new process func. -func (c *cmdable) WrapProcess(createWrapper func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.process = createWrapper(c.process) -} - type statefulCmdable struct { process func(cmd Cmder) error } @@ -1625,15 +1616,15 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd { return cmd } -// ClientSetName assigns a name to the one of many connections in the pool. -func (c *cmdable) ClientSetName(name string) *BoolCmd { +// ClientSetName assigns a name to the connection. +func (c *statefulCmdable) ClientSetName(name string) *BoolCmd { cmd := NewBoolCmd("client", "setname", name) c.process(cmd) return cmd } -// ClientGetName returns the name of the one of many connections in the pool. -func (c *Client) ClientGetName() *StringCmd { +// ClientGetName returns the name of the connection. +func (c *statefulCmdable) ClientGetName() *StringCmd { cmd := NewStringCmd("client", "getname") c.process(cmd) return cmd diff --git a/commands_test.go b/commands_test.go index 7da5f2a8..dba37aac 100644 --- a/commands_test.go +++ b/commands_test.go @@ -26,14 +26,20 @@ var _ = Describe("Commands", func() { Describe("server", func() { - // It("should Auth", func() { - // auth := client.Auth("password") - // Expect(auth.Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) - // Expect(auth.Val()).To(Equal("")) - // }) + It("should Auth", func() { + _, err := client.Pipelined(func(pipe *redis.Pipeline) error { + pipe.Auth("password") + return nil + }) + Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set")) + }) It("should Echo", func() { - echo := client.Echo("hello") + pipe := client.Pipeline() + echo := pipe.Echo("hello") + _, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + Expect(echo.Err()).NotTo(HaveOccurred()) Expect(echo.Val()).To(Equal("hello")) }) @@ -44,11 +50,15 @@ var _ = Describe("Commands", func() { Expect(ping.Val()).To(Equal("PONG")) }) - // It("should Select", func() { - // sel := client.Select(1) - // Expect(sel.Err()).NotTo(HaveOccurred()) - // Expect(sel.Val()).To(Equal("OK")) - // }) + It("should Select", func() { + pipe := client.Pipeline() + sel := pipe.Select(1) + _, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + + Expect(sel.Err()).NotTo(HaveOccurred()) + Expect(sel.Val()).To(Equal("OK")) + }) It("should BgRewriteAOF", func() { Skip("flaky test") @@ -84,13 +94,18 @@ var _ = Describe("Commands", func() { }) It("should ClientSetName and ClientGetName", func() { - isSet, err := client.ClientSetName("theclientname").Result() + pipe := client.Pipeline() + set := pipe.ClientSetName("theclientname") + get := pipe.ClientGetName() + _, err := pipe.Exec() Expect(err).NotTo(HaveOccurred()) - Expect(isSet).To(BeTrue()) - val, err := client.ClientGetName().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("theclientname")) + Expect(set.Err()).NotTo(HaveOccurred()) + Expect(set.Val()).To(BeTrue()) + + Expect(get.Err()).NotTo(HaveOccurred()) + Expect(get.Val()).To(Equal("theclientname")) + }) It("should ConfigGet", func() { diff --git a/example_instrumentation_test.go b/example_instrumentation_test.go new file mode 100644 index 00000000..009138eb --- /dev/null +++ b/example_instrumentation_test.go @@ -0,0 +1,59 @@ +package redis_test + +import ( + "fmt" + "sync/atomic" + "time" + + redis "gopkg.in/redis.v5" +) + +func Example_instrumentation() { + ring := redis.NewRing(&redis.RingOptions{ + Addrs: map[string]string{ + "shard1": ":6379", + }, + }) + ring.ForEachShard(func(client *redis.Client) error { + wrapRedisProcess(client) + return nil + }) + + for { + ring.Ping() + } +} + +func wrapRedisProcess(client *redis.Client) { + const precision = time.Microsecond + var count, avgDur uint32 + + go func() { + for _ = range time.Tick(3 * time.Second) { + n := atomic.LoadUint32(&count) + dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision + fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur) + } + }() + + client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error { + return func(cmd redis.Cmder) error { + start := time.Now() + err := oldProcess(cmd) + dur := time.Since(start) + + const decay = float64(1) / 100 + ms := float64(dur / precision) + for { + avg := atomic.LoadUint32(&avgDur) + newAvg := uint32((1-decay)*float64(avg) + decay*ms) + if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) { + break + } + } + atomic.AddUint32(&count, 1) + + return err + } + }) +} diff --git a/example_test.go b/example_test.go index 8f35f2f5..0769aa05 100644 --- a/example_test.go +++ b/example_test.go @@ -331,21 +331,3 @@ func ExampleScanCmd_Iterator() { panic(err) } } - -func ExampleClient_instrumentation() { - client := redis.NewClient(&redis.Options{ - Addr: ":6379", - }) - client.WrapProcess(func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { - return func(cmd redis.Cmder) error { - start := time.Now() - err := oldProcess(cmd) - if err != nil { - fmt.Printf("command %s failed: %s", cmd, err) - } else { - fmt.Printf("command %q took %s", cmd, time.Since(start)) - } - return err - } - }) -} diff --git a/pipeline_test.go b/pipeline_test.go index 74cdd385..01fcc091 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -152,7 +152,7 @@ var _ = Describe("Pipelining", func() { const N = 1000 pipeline := client.Pipeline() - wg := &sync.WaitGroup{} + var wg sync.WaitGroup wg.Add(N) for i := 0; i < N; i++ { go func() { diff --git a/redis.go b/redis.go index 72b8ee6e..1c5fdd1b 100644 --- a/redis.go +++ b/redis.go @@ -19,6 +19,7 @@ type baseClient struct { connPool pool.Pooler opt *Options + process func(Cmder) error onClose func() error // hook called when client is closed } @@ -78,6 +79,21 @@ func (c *baseClient) initConn(cn *pool.Conn) error { } func (c *baseClient) Process(cmd Cmder) error { + if c.process != nil { + return c.process(cmd) + } + return c.defaultProcess(cmd) +} + +// WrapProcess replaces the process func. It takes a function createWrapper +// which is supplied by the user. createWrapper takes the old process func as +// an input and returns the new wrapper process func. createWrapper should +// use call the old process func within the new process func. +func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { + c.process = fn(c.defaultProcess) +} + +func (c *baseClient) defaultProcess(cmd Cmder) error { for i := 0; i <= c.opt.MaxRetries; i++ { if i > 0 { cmd.reset() diff --git a/ring.go b/ring.go index 96085346..53c3f113 100644 --- a/ring.go +++ b/ring.go @@ -230,7 +230,7 @@ func (c *Ring) addClient(name string, cl *Client) { c.mu.Unlock() } -func (c *Ring) shardByKey(key string) (*Client, error) { +func (c *Ring) shardByKey(key string) (*ringShard, error) { key = hashtag.Key(key) c.mu.RLock() @@ -246,27 +246,27 @@ func (c *Ring) shardByKey(key string) (*Client, error) { return nil, errRingShardsDown } - cl := c.shards[name].Client + shard := c.shards[name] c.mu.RUnlock() - return cl, nil + return shard, nil } -func (c *Ring) randomShard() (*Client, error) { +func (c *Ring) randomShard() (*ringShard, error) { return c.shardByKey(strconv.Itoa(rand.Int())) } -func (c *Ring) shardByName(name string) (*Client, error) { +func (c *Ring) shardByName(name string) (*ringShard, error) { if name == "" { return c.randomShard() } c.mu.RLock() - cl := c.shards[name].Client + shard := c.shards[name] c.mu.RUnlock() - return cl, nil + return shard, nil } -func (c *Ring) cmdShard(cmd Cmder) (*Client, error) { +func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.arg(0)) firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) if firstKey == "" { @@ -276,12 +276,12 @@ func (c *Ring) cmdShard(cmd Cmder) (*Client, error) { } func (c *Ring) Process(cmd Cmder) error { - cl, err := c.cmdShard(cmd) + shard, err := c.cmdShard(cmd) if err != nil { cmd.setErr(err) return err } - return cl.baseClient.Process(cmd) + return shard.Client.Process(cmd) } // rebalance removes dead shards from the Ring. @@ -384,7 +384,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { resetCmds(cmds) } - client, err := c.shardByName(name) + shard, err := c.shardByName(name) if err != nil { setCmdsErr(cmds, err) if firstErr == nil { @@ -393,7 +393,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { continue } - cn, _, err := client.conn() + cn, _, err := shard.Client.conn() if err != nil { setCmdsErr(cmds, err) if firstErr == nil { @@ -403,7 +403,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { } retry, err := execCmds(cn, cmds) - client.putConn(cn, err, false) + shard.Client.putConn(cn, err, false) if err == nil { continue }