This commit is contained in:
Vladimir Mihailenco 2020-06-10 15:04:12 +03:00
parent ef82e3705c
commit 736fa28659
6 changed files with 89 additions and 90 deletions

View File

@ -2,13 +2,13 @@
## v8 (unreleased) ## v8 (unreleased)
- All commands require `context.Context` as a first argument, e.g. - All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not
`rdb.Ping(ctx)`. If you are not using `context.Context` yet, the simplest using `context.Context` yet, the simplest option is to define package variable
option is to define package variable `var ctx = context.TODO()` and use it `var ctx = context.TODO()` and use it when `ctx` is expected.
when `ctx` is expected. - Ring uses Rendezvous Hashing by default which provides better distribution. This means that
- Ring uses Rendezvous Hashing by default which provides better distribution. existing keys must be moved to a new location or key will be inaccessible / lost. To use old
This means that existing keys must be moved to a new location or key will be hashing scheme:
inaccessible / lost. To use old hashing scheme: - `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`.
```go ```go
import "github.com/golang/groupcache/consistenthash" import "github.com/golang/groupcache/consistenthash"
@ -20,45 +20,43 @@ ring := redis.NewRing(&redis.RingOptions{
}) })
``` ```
- Basic support for OpenTelemetry instrumentation. - Added `redisext.OpenTemetryHook` that adds
[Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/).
## v7.3 ## v7.3
- New option `Options.Username` which causes client to use `AuthACL`. Be aware - New option `Options.Username` which causes client to use `AuthACL`. Be aware if your connection
if your connection URL contains username. URL contains username.
## v7.2 ## v7.2
- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored - Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users.
for Redis 3 users.
## v7.1 ## v7.1
- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements - Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer`
`fmt.Stringer` interface. interface.
## v7 ## v7
- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use - _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a
Tx.TxPipeline for a transactional pipeline. transactional pipeline.
- WrapProcess is replaced with more convenient AddHook that has access to - WrapProcess is replaced with more convenient AddHook that has access to context.Context.
context.Context.
- WithContext now can not be used to create a shallow copy of the client. - WithContext now can not be used to create a shallow copy of the client.
- New methods ProcessContext, DoContext, and ExecContext. - New methods ProcessContext, DoContext, and ExecContext.
- Client respects Context.Deadline when setting net.Conn deadline. - Client respects Context.Deadline when setting net.Conn deadline.
- Client listens on Context.Done while waiting for a connection from the pool - Client listens on Context.Done while waiting for a connection from the pool and returns an error
and returns an error when context context is cancelled. when context context is cancelled.
- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to - Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow
`*Message` to allow detecting reconnections. detecting reconnections.
- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` - `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse
helper is added to parse the time. the time.
- `SetLimiter` is removed and added `Options.Limiter` instead. - `SetLimiter` is removed and added `Options.Limiter` instead.
- `HMSet` is deprecated as of Redis v4. - `HMSet` is deprecated as of Redis v4.
## v6.15 ## v6.15
- Cluster and Ring pipelines process commands for each node in its own - Cluster and Ring pipelines process commands for each node in its own goroutine.
goroutine.
## 6.14 ## 6.14
@ -66,23 +64,20 @@ ring := redis.NewRing(&redis.RingOptions{
- Added Options.MaxConnAge. - Added Options.MaxConnAge.
- PoolStats.FreeConns is renamed to PoolStats.IdleConns. - PoolStats.FreeConns is renamed to PoolStats.IdleConns.
- Add Client.Do to simplify creating custom commands. - Add Client.Do to simplify creating custom commands.
- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool - Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers.
helpers.
- Lower memory usage. - Lower memory usage.
## v6.13 ## v6.13
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to - Ring got new options called `HashReplicas` and `Hash`. It is recommended to set
set `HashReplicas = 1000` for better keys distribution between shards. `HashReplicas = 1000` for better keys distribution between shards.
- Cluster client was optimized to use much less memory when reloading cluster - Cluster client was optimized to use much less memory when reloading cluster state.
state. - PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout
- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not occurres. In most cases it is recommended to use PubSub.Channel instead.
lose data when timeout occurres. In most cases it is recommended to use
PubSub.Channel instead.
- Dialer.KeepAlive is set to 5 minutes by default. - Dialer.KeepAlive is set to 5 minutes by default.
## v6.12 ## v6.12
- ClusterClient got new option called `ClusterSlots` which allows to build - ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis
cluster of normal Redis Servers that don't have cluster mode enabled. See Servers that don't have cluster mode enabled. See
https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup

View File

@ -3,6 +3,10 @@
[![Build Status](https://travis-ci.org/go-redis/redis.png?branch=master)](https://travis-ci.org/go-redis/redis) [![Build Status](https://travis-ci.org/go-redis/redis.png?branch=master)](https://travis-ci.org/go-redis/redis)
[![GoDoc](https://godoc.org/github.com/go-redis/redis?status.svg)](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc) [![GoDoc](https://godoc.org/github.com/go-redis/redis?status.svg)](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc)
- [Docs](https://redis.uptrace.dev)
- [Reference](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc)
- [Examples](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#pkg-examples)
## Sponsors ## Sponsors
- [**Uptrace.dev** - distributed traces and metrics](https://uptrace.dev) - [**Uptrace.dev** - distributed traces and metrics](https://uptrace.dev)
@ -54,35 +58,35 @@ import "github.com/go-redis/redis/v8"
```go ```go
func ExampleNewClient() { func ExampleNewClient() {
client := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", Addr: "localhost:6379",
Password: "", // no password set Password: "", // no password set
DB: 0, // use default DB DB: 0, // use default DB
}) })
pong, err := client.Ping(ctx).Result() pong, err := rdb.Ping(ctx).Result()
fmt.Println(pong, err) fmt.Println(pong, err)
// Output: PONG <nil> // Output: PONG <nil>
} }
func ExampleClient() { func ExampleClient() {
client := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", Addr: "localhost:6379",
Password: "", // no password set Password: "", // no password set
DB: 0, // use default DB DB: 0, // use default DB
}) })
err := client.Set(ctx, "key", "value", 0).Err() err := rdb.Set(ctx, "key", "value", 0).Err()
if err != nil { if err != nil {
panic(err) panic(err)
} }
val, err := client.Get(ctx, "key").Result() val, err := rdb.Get(ctx, "key").Result()
if err != nil { if err != nil {
panic(err) panic(err)
} }
fmt.Println("key", val) fmt.Println("key", val)
val2, err := client.Get(ctx, "key2").Result() val2, err := rdb.Get(ctx, "key2").Result()
if err == redis.Nil { if err == redis.Nil {
fmt.Println("key2 does not exist") fmt.Println("key2 does not exist")
} else if err != nil { } else if err != nil {
@ -106,13 +110,13 @@ Some corner cases:
```go ```go
// SET key value EX 10 NX // SET key value EX 10 NX
set, err := client.SetNX(ctx, "key", "value", 10*time.Second).Result() set, err := rdb.SetNX(ctx, "key", "value", 10*time.Second).Result()
// SORT list LIMIT 0 2 ASC // SORT list LIMIT 0 2 ASC
vals, err := client.Sort(ctx, "list", &redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() vals, err := rdb.Sort(ctx, "list", &redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result()
// ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2 // ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2
vals, err := client.ZRangeByScoreWithScores(ctx, "zset", &redis.ZRangeBy{ vals, err := rdb.ZRangeByScoreWithScores(ctx, "zset", &redis.ZRangeBy{
Min: "-inf", Min: "-inf",
Max: "+inf", Max: "+inf",
Offset: 0, Offset: 0,
@ -120,16 +124,16 @@ vals, err := client.ZRangeByScoreWithScores(ctx, "zset", &redis.ZRangeBy{
}).Result() }).Result()
// ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM // ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM
vals, err := client.ZInterStore(ctx, "out", &redis.ZStore{ vals, err := rdb.ZInterStore(ctx, "out", &redis.ZStore{
Keys: []string{"zset1", "zset2"}, Keys: []string{"zset1", "zset2"},
Weights: []int64{2, 3} Weights: []int64{2, 3}
}).Result() }).Result()
// EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" // EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello"
vals, err := client.Eval(ctx, "return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() vals, err := rdb.Eval(ctx, "return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result()
// custom command // custom command
res, err := client.Do(ctx, "set", "key", "value").Result() res, err := rdb.Do(ctx, "set", "key", "value").Result()
``` ```
## See also ## See also

View File

@ -27,6 +27,9 @@ type ClusterOptions struct {
// A seed list of host:port addresses of cluster nodes. // A seed list of host:port addresses of cluster nodes.
Addrs []string Addrs []string
// NewClient creates a cluster node client with provided name and options.
NewClient func(opt *Options) *Client
// The maximum number of retries before giving up. Command is retried // The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects. // on network errors and MOVED/ASK redirects.
// Default is 8 retries. // Default is 8 retries.
@ -48,9 +51,6 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading. // and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error) ClusterSlots func() ([]ClusterSlot, error)
// Optional hook that is called when a new node is created.
OnNewNode func(*Client)
// Following options are copied from Options struct. // Following options are copied from Options struct.
Dialer func(ctx context.Context, network, addr string) (net.Conn, error) Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
@ -68,9 +68,6 @@ type ClusterOptions struct {
ReadTimeout time.Duration ReadTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
// NewClient creates a cluster node client with provided name and options.
NewClient func(opt *Options) *Client
// PoolSize applies per cluster node and not for the whole cluster. // PoolSize applies per cluster node and not for the whole cluster.
PoolSize int PoolSize int
MinIdleConns int MinIdleConns int
@ -179,10 +176,6 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency() go node.updateLatency()
} }
if clOpt.OnNewNode != nil {
clOpt.OnNewNode(node.Client)
}
return &node return &node
} }
@ -907,9 +900,9 @@ func (c *ClusterClient) ForEachSlave(
} }
} }
// ForEachNode concurrently calls the fn on each known node in the cluster. // ForEachShard concurrently calls the fn on each known node in the cluster.
// It returns the first error if any. // It returns the first error if any.
func (c *ClusterClient) ForEachNode( func (c *ClusterClient) ForEachShard(
ctx context.Context, ctx context.Context,
fn func(ctx context.Context, client *Client) error, fn func(ctx context.Context, client *Client) error,
) error { ) error {

View File

@ -415,7 +415,7 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14)) Expect(cmds).To(HaveLen(14))
_ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
defer GinkgoRecover() defer GinkgoRecover()
Eventually(func() int64 { Eventually(func() int64 {
return node.DBSize(ctx).Val() return node.DBSize(ctx).Val()
@ -534,7 +534,7 @@ var _ = Describe("ClusterClient", func() {
err := client.Ping(ctx).Err() err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err() return node.Ping(ctx).Err()
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -568,7 +568,7 @@ var _ = Describe("ClusterClient", func() {
}, },
} }
_ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(nodeHook) node.AddHook(nodeHook)
return nil return nil
}) })
@ -592,7 +592,7 @@ var _ = Describe("ClusterClient", func() {
err := client.Ping(ctx).Err() err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err() return node.Ping(ctx).Err()
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -614,7 +614,7 @@ var _ = Describe("ClusterClient", func() {
}, },
}) })
_ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(&hook{ node.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
@ -649,7 +649,7 @@ var _ = Describe("ClusterClient", func() {
err := client.Ping(ctx).Err() err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err() return node.Ping(ctx).Err()
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -671,7 +671,7 @@ var _ = Describe("ClusterClient", func() {
}, },
}) })
_ = client.ForEachNode(ctx, func(ctx context.Context, node *redis.Client) error { _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(&hook{ node.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(3)) Expect(cmds).To(HaveLen(3))
@ -1204,14 +1204,14 @@ var _ = Describe("ClusterClient timeout", func() {
opt.MaxRedirects = 1 opt.MaxRedirects = 1
client = cluster.newClusterClient(ctx, opt) client = cluster.newClusterClient(ctx, opt)
err := client.ForEachNode(ctx, func(ctx context.Context, client *redis.Client) error { err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
return client.ClientPause(ctx, pause).Err() return client.ClientPause(ctx, pause).Err()
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
AfterEach(func() { AfterEach(func() {
_ = client.ForEachNode(ctx, func(ctx context.Context, client *redis.Client) error { _ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
defer GinkgoRecover() defer GinkgoRecover()
Eventually(func() error { Eventually(func() error {
return client.Ping(ctx).Err() return client.Ping(ctx).Err()

View File

@ -301,42 +301,49 @@ func ExampleClient_TxPipeline() {
} }
func ExampleClient_Watch() { func ExampleClient_Watch() {
const routineCount = 100 const maxRetries = 1000
// Transactionally increments key using GET and SET commands. // Increment transactionally increments key using GET and SET commands.
increment := func(key string) error { increment := func(key string) error {
// Transactional function.
txf := func(tx *redis.Tx) error { txf := func(tx *redis.Tx) error {
// get current value or zero // Get current value or zero.
n, err := tx.Get(ctx, key).Int() n, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
return err return err
} }
// actual opperation (local in optimistic lock) // Actual opperation (local in optimistic lock).
n++ n++
// runs only if the watched keys remain unchanged // Operation is commited only if the watched keys remain unchanged.
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
// pipe handles the error case
pipe.Set(ctx, key, n, 0) pipe.Set(ctx, key, n, 0)
return nil return nil
}) })
return err return err
} }
for retries := routineCount; retries > 0; retries-- { for i := 0; i < maxRetries; i++ {
err := rdb.Watch(ctx, txf, key) err := rdb.Watch(ctx, txf, key)
if err != redis.TxFailedErr { if err == nil {
return err // Success.
return nil
} }
// optimistic lock lost if err == redis.TxFailedErr {
// Optimistic lock lost. Retry.
continue
}
// Return any other error.
return err
} }
return errors.New("increment reached maximum number of retries") return errors.New("increment reached maximum number of retries")
} }
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(routineCount) for i := 0; i < 100; i++ {
for i := 0; i < routineCount; i++ { wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()

16
ring.go
View File

@ -685,14 +685,6 @@ func (c *Ring) processShardPipeline(
return err return err
} }
// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func (c *Ring) Close() error {
return c.shards.Close()
}
func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 { if len(keys) == 0 {
return fmt.Errorf("redis: Watch requires at least one key") return fmt.Errorf("redis: Watch requires at least one key")
@ -725,3 +717,11 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
return shards[0].Client.Watch(ctx, fn, keys...) return shards[0].Client.Watch(ctx, fn, keys...)
} }
// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func (c *Ring) Close() error {
return c.shards.Close()
}