From 736fa2865992feedee4f744ff3e6e5d2c27a17f3 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 10 Jun 2020 15:04:12 +0300 Subject: [PATCH] Cleanup --- CHANGELOG.md | 69 +++++++++++++++++++++++-------------------------- README.md | 28 +++++++++++--------- cluster.go | 17 ++++-------- cluster_test.go | 18 ++++++------- example_test.go | 31 +++++++++++++--------- ring.go | 16 ++++++------ 6 files changed, 89 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc4dc03..8617ca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,13 @@ ## v8 (unreleased) -- All commands require `context.Context` as a first argument, e.g. - `rdb.Ping(ctx)`. If you are not using `context.Context` yet, the simplest - option is to define package variable `var ctx = context.TODO()` and use it - when `ctx` is expected. -- Ring uses Rendezvous Hashing by default which provides better distribution. - This means that existing keys must be moved to a new location or key will be - inaccessible / lost. To use old hashing scheme: +- All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not + using `context.Context` yet, the simplest option is to define package variable + `var ctx = context.TODO()` and use it when `ctx` is expected. +- Ring uses Rendezvous Hashing by default which provides better distribution. This means that + existing keys must be moved to a new location or key will be inaccessible / lost. To use old + hashing scheme: +- `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`. ```go import "github.com/golang/groupcache/consistenthash" @@ -20,45 +20,43 @@ ring := redis.NewRing(&redis.RingOptions{ }) ``` -- Basic support for OpenTelemetry instrumentation. +- Added `redisext.OpenTemetryHook` that adds + [Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/). ## v7.3 -- New option `Options.Username` which causes client to use `AuthACL`. Be aware - if your connection URL contains username. +- New option `Options.Username` which causes client to use `AuthACL`. Be aware if your connection + URL contains username. ## v7.2 -- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored - for Redis 3 users. +- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users. ## v7.1 -- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements - `fmt.Stringer` interface. +- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer` + interface. ## v7 -- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use - Tx.TxPipeline for a transactional pipeline. -- WrapProcess is replaced with more convenient AddHook that has access to - context.Context. +- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a + transactional pipeline. +- WrapProcess is replaced with more convenient AddHook that has access to context.Context. - WithContext now can not be used to create a shallow copy of the client. - New methods ProcessContext, DoContext, and ExecContext. - Client respects Context.Deadline when setting net.Conn deadline. -- Client listens on Context.Done while waiting for a connection from the pool - and returns an error when context context is cancelled. -- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to - `*Message` to allow detecting reconnections. -- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` - helper is added to parse the time. +- Client listens on Context.Done while waiting for a connection from the pool and returns an error + when context context is cancelled. +- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow + detecting reconnections. +- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse + the time. - `SetLimiter` is removed and added `Options.Limiter` instead. - `HMSet` is deprecated as of Redis v4. ## v6.15 -- Cluster and Ring pipelines process commands for each node in its own - goroutine. +- Cluster and Ring pipelines process commands for each node in its own goroutine. ## 6.14 @@ -66,23 +64,20 @@ ring := redis.NewRing(&redis.RingOptions{ - Added Options.MaxConnAge. - PoolStats.FreeConns is renamed to PoolStats.IdleConns. - Add Client.Do to simplify creating custom commands. -- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool - helpers. +- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. - Lower memory usage. ## v6.13 -- Ring got new options called `HashReplicas` and `Hash`. It is recommended to - set `HashReplicas = 1000` for better keys distribution between shards. -- Cluster client was optimized to use much less memory when reloading cluster - state. -- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not - lose data when timeout occurres. In most cases it is recommended to use - PubSub.Channel instead. +- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set + `HashReplicas = 1000` for better keys distribution between shards. +- Cluster client was optimized to use much less memory when reloading cluster state. +- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout + occurres. In most cases it is recommended to use PubSub.Channel instead. - Dialer.KeepAlive is set to 5 minutes by default. ## v6.12 -- ClusterClient got new option called `ClusterSlots` which allows to build - cluster of normal Redis Servers that don't have cluster mode enabled. See +- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis + Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup diff --git a/README.md b/README.md index 5955320..fa31478 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,10 @@ [![Build Status](https://travis-ci.org/go-redis/redis.png?branch=master)](https://travis-ci.org/go-redis/redis) [![GoDoc](https://godoc.org/github.com/go-redis/redis?status.svg)](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc) +- [Docs](https://redis.uptrace.dev) +- [Reference](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc) +- [Examples](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#pkg-examples) + ## Sponsors - [**Uptrace.dev** - distributed traces and metrics](https://uptrace.dev) @@ -54,35 +58,35 @@ import "github.com/go-redis/redis/v8" ```go func ExampleNewClient() { - client := redis.NewClient(&redis.Options{ + rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) - pong, err := client.Ping(ctx).Result() + pong, err := rdb.Ping(ctx).Result() fmt.Println(pong, err) // Output: PONG } func ExampleClient() { - client := redis.NewClient(&redis.Options{ + rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) - err := client.Set(ctx, "key", "value", 0).Err() + err := rdb.Set(ctx, "key", "value", 0).Err() if err != nil { panic(err) } - val, err := client.Get(ctx, "key").Result() + val, err := rdb.Get(ctx, "key").Result() if err != nil { panic(err) } fmt.Println("key", val) - val2, err := client.Get(ctx, "key2").Result() + val2, err := rdb.Get(ctx, "key2").Result() if err == redis.Nil { fmt.Println("key2 does not exist") } else if err != nil { @@ -106,13 +110,13 @@ Some corner cases: ```go // SET key value EX 10 NX -set, err := client.SetNX(ctx, "key", "value", 10*time.Second).Result() +set, err := rdb.SetNX(ctx, "key", "value", 10*time.Second).Result() // SORT list LIMIT 0 2 ASC -vals, err := client.Sort(ctx, "list", &redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() +vals, err := rdb.Sort(ctx, "list", &redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() // ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2 -vals, err := client.ZRangeByScoreWithScores(ctx, "zset", &redis.ZRangeBy{ +vals, err := rdb.ZRangeByScoreWithScores(ctx, "zset", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", Offset: 0, @@ -120,16 +124,16 @@ vals, err := client.ZRangeByScoreWithScores(ctx, "zset", &redis.ZRangeBy{ }).Result() // ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM -vals, err := client.ZInterStore(ctx, "out", &redis.ZStore{ +vals, err := rdb.ZInterStore(ctx, "out", &redis.ZStore{ Keys: []string{"zset1", "zset2"}, Weights: []int64{2, 3} }).Result() // EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" -vals, err := client.Eval(ctx, "return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() +vals, err := rdb.Eval(ctx, "return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() // custom command -res, err := client.Do(ctx, "set", "key", "value").Result() +res, err := rdb.Do(ctx, "set", "key", "value").Result() ``` ## See also diff --git a/cluster.go b/cluster.go index 6f25c0c..edc4007 100644 --- a/cluster.go +++ b/cluster.go @@ -27,6 +27,9 @@ type ClusterOptions struct { // A seed list of host:port addresses of cluster nodes. Addrs []string + // NewClient creates a cluster node client with provided name and options. + NewClient func(opt *Options) *Client + // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. // Default is 8 retries. @@ -48,9 +51,6 @@ type ClusterOptions struct { // and Cluster.ReloadState to manually trigger state reloading. ClusterSlots func() ([]ClusterSlot, error) - // Optional hook that is called when a new node is created. - OnNewNode func(*Client) - // Following options are copied from Options struct. Dialer func(ctx context.Context, network, addr string) (net.Conn, error) @@ -68,9 +68,6 @@ type ClusterOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration - // NewClient creates a cluster node client with provided name and options. - NewClient func(opt *Options) *Client - // PoolSize applies per cluster node and not for the whole cluster. PoolSize int MinIdleConns int @@ -179,10 +176,6 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { go node.updateLatency() } - if clOpt.OnNewNode != nil { - clOpt.OnNewNode(node.Client) - } - return &node } @@ -907,9 +900,9 @@ func (c *ClusterClient) ForEachSlave( } } -// ForEachNode concurrently calls the fn on each known node in the cluster. +// ForEachShard concurrently calls the fn on each known node in the cluster. // It returns the first error if any. -func (c *ClusterClient) ForEachNode( +func (c *ClusterClient) ForEachShard( ctx context.Context, fn func(ctx context.Context, client *Client) error, ) error { diff --git a/cluster_test.go b/cluster_test.go index d05ec6b..46948a8 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -415,7 +415,7 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) - _ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { defer GinkgoRecover() Eventually(func() int64 { return node.DBSize(ctx).Val() @@ -534,7 +534,7 @@ var _ = Describe("ClusterClient", func() { err := client.Ping(ctx).Err() Expect(err).NotTo(HaveOccurred()) - err = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { return node.Ping(ctx).Err() }) Expect(err).NotTo(HaveOccurred()) @@ -568,7 +568,7 @@ var _ = Describe("ClusterClient", func() { }, } - _ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { node.AddHook(nodeHook) return nil }) @@ -592,7 +592,7 @@ var _ = Describe("ClusterClient", func() { err := client.Ping(ctx).Err() Expect(err).NotTo(HaveOccurred()) - err = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { return node.Ping(ctx).Err() }) Expect(err).NotTo(HaveOccurred()) @@ -614,7 +614,7 @@ var _ = Describe("ClusterClient", func() { }, }) - _ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { node.AddHook(&hook{ beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { Expect(cmds).To(HaveLen(1)) @@ -649,7 +649,7 @@ var _ = Describe("ClusterClient", func() { err := client.Ping(ctx).Err() Expect(err).NotTo(HaveOccurred()) - err = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { return node.Ping(ctx).Err() }) Expect(err).NotTo(HaveOccurred()) @@ -671,7 +671,7 @@ var _ = Describe("ClusterClient", func() { }, }) - _ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { + _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { node.AddHook(&hook{ beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { Expect(cmds).To(HaveLen(3)) @@ -1204,14 +1204,14 @@ var _ = Describe("ClusterClient timeout", func() { opt.MaxRedirects = 1 client = cluster.newClusterClient(ctx, opt) - err := client.ForEachNode(ctx, func(ctx context.Context, client *redis.Client) error { + err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error { return client.ClientPause(ctx, pause).Err() }) Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { - _ = client.ForEachNode(ctx, func(ctx context.Context, client *redis.Client) error { + _ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error { defer GinkgoRecover() Eventually(func() error { return client.Ping(ctx).Err() diff --git a/example_test.go b/example_test.go index 32154a4..afb2a75 100644 --- a/example_test.go +++ b/example_test.go @@ -301,42 +301,49 @@ func ExampleClient_TxPipeline() { } func ExampleClient_Watch() { - const routineCount = 100 + const maxRetries = 1000 - // Transactionally increments key using GET and SET commands. + // Increment transactionally increments key using GET and SET commands. increment := func(key string) error { + // Transactional function. txf := func(tx *redis.Tx) error { - // get current value or zero + // Get current value or zero. n, err := tx.Get(ctx, key).Int() if err != nil && err != redis.Nil { return err } - // actual opperation (local in optimistic lock) + // Actual opperation (local in optimistic lock). n++ - // runs only if the watched keys remain unchanged + // Operation is commited only if the watched keys remain unchanged. _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - // pipe handles the error case pipe.Set(ctx, key, n, 0) return nil }) return err } - for retries := routineCount; retries > 0; retries-- { + for i := 0; i < maxRetries; i++ { err := rdb.Watch(ctx, txf, key) - if err != redis.TxFailedErr { - return err + if err == nil { + // Success. + return nil } - // optimistic lock lost + if err == redis.TxFailedErr { + // Optimistic lock lost. Retry. + continue + } + // Return any other error. + return err } + return errors.New("increment reached maximum number of retries") } var wg sync.WaitGroup - wg.Add(routineCount) - for i := 0; i < routineCount; i++ { + for i := 0; i < 100; i++ { + wg.Add(1) go func() { defer wg.Done() diff --git a/ring.go b/ring.go index 99a3466..7955e50 100644 --- a/ring.go +++ b/ring.go @@ -685,14 +685,6 @@ func (c *Ring) processShardPipeline( return err } -// Close closes the ring client, releasing any open resources. -// -// It is rare to Close a Ring, as the Ring is meant to be long-lived -// and shared between many goroutines. -func (c *Ring) Close() error { - return c.shards.Close() -} - func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { if len(keys) == 0 { return fmt.Errorf("redis: Watch requires at least one key") @@ -725,3 +717,11 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er return shards[0].Client.Watch(ctx, fn, keys...) } + +// Close closes the ring client, releasing any open resources. +// +// It is rare to Close a Ring, as the Ring is meant to be long-lived +// and shared between many goroutines. +func (c *Ring) Close() error { + return c.shards.Close() +}