redis/cluster_test.go

1482 lines
38 KiB
Go
Raw Permalink Normal View History

2015-01-24 15:12:48 +03:00
package redis_test
import (
2019-07-04 11:18:06 +03:00
"context"
"crypto/tls"
"errors"
2015-11-14 16:54:16 +03:00
"fmt"
"net"
2015-12-16 17:11:52 +03:00
"strconv"
2015-11-14 16:54:16 +03:00
"strings"
2015-12-16 17:11:52 +03:00
"sync"
"testing"
2015-03-18 13:41:24 +03:00
"time"
2015-01-24 15:12:48 +03:00
2016-12-16 17:26:48 +03:00
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
chore: sync master (#2051) * Upgrade redis-server version (#1833) * Upgrade redis-server version Signed-off-by: monkey <golang@88.com> * XAutoClaim changed the return value Signed-off-by: monkey <golang@88.com> * add cmd: geosearch, geosearchstore (#1836) * add cmd: geosearch, geosearchstore Signed-off-by: monkey92t <golang@88.com> * GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing Signed-off-by: monkey92t <golang@88.com> * Added missing method XInfoStreamFull to Cmdable interface * Run go mod tidy in redisotel Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> * Revert "ConnPool check fd for bad conns (#1824)" (#1849) This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e. * Automate release process (#1852) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Upgrade gomega to v1.15.0 Signed-off-by: monkey92t <golang@88.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: monkey92t <golang@88.com> * Add version.go * Fix otel example * Fix package name in release script * More fixes for otel example * And more * Fix release.sh * Release v8.11.3 (release.sh) * Create an annotated tag to give release.yml chance to run * Tweak tag.sh * Add Cmd.Slice helper to cast to []interface{} (#1859) * after the connection pool is closed, no new connections should be added (#1863) * after the connection pool is closed, no new connections should be added Signed-off-by: monkey92t <golang@88.com> * remove runGoroutine Signed-off-by: monkey92t <golang@88.com> * pool.popIdle add p.closed check Signed-off-by: monkey92t <golang@88.com> * upgrade golangci-lint v1.42.0 Signed-off-by: monkey92t <golang@88.com> * Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865) Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Add go 1.17 to the build matrix * Remove go 1.15 from build matrix * Add scan struct example (#1870) * Replace release job * Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872) Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2. - [Release notes](https://github.com/cespare/xxhash/releases) - [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2) --- updated-dependencies: - dependency-name: github.com/cespare/xxhash/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix tag script to push tag by tag * Fix releasing.md * Fix/pubsub ping mutex (#1878) * Fix PubSub.Ping to hold the lock * Fix PubSub.Ping to hold the lock * add write cmd data-race test Signed-off-by: monkey92t <golang@88.com> Co-authored-by: monkey92t <golang@88.com> * chore: cleanup OpenTelemetry example * chore: gofmt all code * Refactor TestParseURL This is in preparation for supporting query parameters in ParseURL: - use an expected *Options instance to execute assertions on - extract assertions into helper function - enable parallel testing - condense test table * Add query parameter parsing to ParseURL() Before this change, ParseURL would only accept a very restricted set of URLs (it returned an error, if it encountered any parameter). This commit introduces the ability to process URLs like redis://localhost/1?dial_timeout=10s and similar. Go programs which were providing a configuration tunable (e.g. CLI flag, config entry or environment variable) to configure the Redis connection now don't need to perform this task themselves. * chore: add links to readme * chore: fix discussions link * empty hooks.withContext removed * chore: gofmt * chore: use conventional commits and auto-generate changelog * feat: add acl auth support for sentinels * chore: swap to acl auth at the test-level * Add support for BLMove command * chore: update dependencies * chore: update link * feat: add SetVal method for each command * feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example * chore: tweak GH actions to run all jobs * chore: add Lua scripting example * Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914) * fix: recycle connections in some Redis Cluster scenarios This issue was surfaced in a Cloud Provider solution that used for rolling out new nodes using the same address (hostname) of the nodes that will be replaced in a Redis Cluster, while the former ones once depromoted as Slaves would continue in service during some mintues for redirecting traffic. The solution basically identifies when the connection could be stale since a MOVED response will be returned using the same address (hostname) that is being used by the connection. At that moment we consider the connection as no longer usable forcing to recycle the connection. * chore: lazy reload when moved or ask * chore: use conv commit message * chore: release v8.11.4 (release.sh) * fix: add whitespace for avoid unlikely colisions * fix: format * chore: fix links * chore: use ctx parameter in cmdInfo * Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925) Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5. - [Release notes](https://github.com/onsi/ginkgo/releases) - [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5) --- updated-dependencies: - dependency-name: github.com/onsi/ginkgo dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * feat: add support for time.Duration write and scan * test: add test case for setting and scanning durations * chore: fix linter * fix(extra/redisotel): set span.kind attribute to client According to the opentelemetry specification this should always be set to client for database client libraries. I've also removed the SetAttributes call and instead set the attributes during creation of the span. This is what the library SHOULD be doing according to the opentelemetry api specification. * chore: update otel example * fix: update some argument counts in pre-allocs In some cases number of pre-allocated places in argument array is missing 1 or 2 elements, which results in re-allocation of twice as large array * chore: add example how to delete keys without a ttl * chore: don't enable all lints * chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * feat: Add redis v7's NX, XX, GT, LT expire variants * chore: add missing readme * chore: tweak feature links * chore: remove Discord * fix: set timeout for WAIT command. Fixes #1963 * build: update `go` directive in `go.mod` to 1.17 This commit enables support for module graph pruning and lazy module loading for projects that are at Go 1.17 or higher. Reference: https://go.dev/ref/mod#go-mod-file-go Reference: https://go.dev/ref/mod#graph-pruning Reference: https://go.dev/ref/mod#lazy-loading Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> * chore: update link * chore: export cmder.SetFirstKeyPos to support build module commands * feat(redisotel): ability to override TracerProvider (#1998) * fix: add missing Expire methods to Cmdable This is a followup to https://github.com/go-redis/redis/pull/1928 * chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update README.md (#2011) chore: add fmt library in example code * chore: instrumentation name and version (#2012) * fix: invalid type assert in stringArg * chore: cleanup * fix: example/otel compile error (#2028) * fix: rename Golang to Go (#2030) https://go.dev/doc/faq#go_or_golang * feat: add support for passing extra attributes added to spans * feat: set net.peer.name and net.peer.port in otel example * chore: tweak Uptrace copy * feat: add support for COPY command (#2016) * feat: add support for acl sentinel auth in universal client * chore(deps): bump actions/checkout from 2 to 3 Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * chore: add hll example * chore: tweak release script * chore: release v8.11.5 (release.sh) * chore: add discord back Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com> Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com> Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Kishan B <kishancs46@gmail.com> Co-authored-by: Dominik Menke <dom@digineo.de> Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com> Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co> Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com> Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com> Co-authored-by: Pau Freixes <pfreixes@gmail.com> Co-authored-by: Ethan Hur <ethan0311@gmail.com> Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com> Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com> Co-authored-by: ffenix113 <razerer@bigmir.net> Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com> Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com> Co-authored-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com> Co-authored-by: Nelz <nelz9999@users.noreply.github.com> Co-authored-by: Daniel Richter <Nexyz9@gmail.com> Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com> Co-authored-by: lintanghui <lintanghui@bilibili.com> Co-authored-by: hidu <duv123+github@gmail.com> Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com> Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
2022-06-04 17:39:21 +03:00
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v9/internal/hashtag"
2015-01-24 15:12:48 +03:00
)
type clusterScenario struct {
ports []string
2019-07-25 13:53:00 +03:00
nodeIDs []string
processes map[string]*redisProcess
clients map[string]*redis.Client
}
func (s *clusterScenario) masters() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[:3] {
result[pos] = s.clients[port]
2015-01-24 15:12:48 +03:00
}
return result
}
2015-01-24 15:12:48 +03:00
func (s *clusterScenario) slaves() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[3:] {
result[pos] = s.clients[port]
}
return result
}
2015-01-24 15:12:48 +03:00
2017-02-17 13:12:06 +03:00
func (s *clusterScenario) addrs() []string {
addrs := make([]string, len(s.ports))
for i, port := range s.ports {
addrs[i] = net.JoinHostPort("127.0.0.1", port)
}
2017-02-17 13:12:06 +03:00
return addrs
}
2020-09-05 11:34:37 +03:00
func (s *clusterScenario) newClusterClientUnstable(opt *redis.ClusterOptions) *redis.ClusterClient {
2017-02-17 13:12:06 +03:00
opt.Addrs = s.addrs()
return redis.NewClusterClient(opt)
}
2020-03-11 17:26:42 +03:00
func (s *clusterScenario) newClusterClient(
ctx context.Context, opt *redis.ClusterOptions,
) *redis.ClusterClient {
2020-09-05 11:34:37 +03:00
client := s.newClusterClientUnstable(opt)
2018-07-22 10:50:26 +03:00
err := eventually(func() error {
if opt.ClusterSlots != nil {
return nil
}
2020-03-11 17:26:42 +03:00
state, err := client.LoadState(ctx)
2018-05-17 16:09:56 +03:00
if err != nil {
2018-07-22 10:50:26 +03:00
return err
}
2020-03-11 17:26:42 +03:00
if !state.IsConsistent(ctx) {
2018-11-24 14:16:21 +03:00
return fmt.Errorf("cluster state is not consistent")
}
2018-07-22 10:50:26 +03:00
return nil
}, 30*time.Second)
if err != nil {
panic(err)
}
2018-05-17 16:09:56 +03:00
return client
}
2015-01-24 15:12:48 +03:00
2020-09-09 17:39:13 +03:00
func (s *clusterScenario) Close() error {
for _, port := range s.ports {
if process, ok := processes[port]; ok {
process.Close()
delete(processes, port)
}
2020-09-09 17:39:13 +03:00
}
return nil
}
2020-03-11 17:26:42 +03:00
func startCluster(ctx context.Context, scenario *clusterScenario) error {
2015-11-14 16:54:16 +03:00
// Start processes and collect node ids
for pos, port := range scenario.ports {
process, err := startRedis(port, "--cluster-enabled", "yes")
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
client := redis.NewClient(&redis.Options{
Addr: ":" + port,
})
2020-03-11 17:26:42 +03:00
info, err := client.ClusterNodes(ctx).Result()
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
scenario.processes[port] = process
scenario.clients[port] = client
2019-07-25 13:53:00 +03:00
scenario.nodeIDs[pos] = info[:40]
}
2017-07-09 13:10:07 +03:00
// Meet cluster nodes.
for _, client := range scenario.clients {
2020-03-11 17:26:42 +03:00
err := client.ClusterMeet(ctx, "127.0.0.1", scenario.ports[0]).Err()
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
}
2015-01-24 15:12:48 +03:00
2017-07-09 13:10:07 +03:00
// Bootstrap masters.
slots := []int{0, 5000, 10000, 16384}
2015-11-14 16:54:16 +03:00
for pos, master := range scenario.masters() {
2020-03-11 17:26:42 +03:00
err := master.ClusterAddSlotsRange(ctx, slots[pos], slots[pos+1]-1).Err()
if err != nil {
return err
}
}
2015-01-24 15:12:48 +03:00
2017-07-09 13:10:07 +03:00
// Bootstrap slaves.
2015-11-14 16:54:16 +03:00
for idx, slave := range scenario.slaves() {
2019-07-25 13:53:00 +03:00
masterID := scenario.nodeIDs[idx]
2015-11-14 16:54:16 +03:00
// Wait until master is available
err := eventually(func() error {
2020-03-11 17:26:42 +03:00
s := slave.ClusterNodes(ctx).Val()
2019-07-25 13:53:00 +03:00
wanted := masterID
2015-11-14 16:54:16 +03:00
if !strings.Contains(s, wanted) {
return fmt.Errorf("%q does not contain %q", s, wanted)
}
return nil
}, 10*time.Second)
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
2020-03-11 17:26:42 +03:00
err = slave.ClusterReplicate(ctx, masterID).Err()
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
}
2015-01-24 15:12:48 +03:00
2017-07-09 13:10:07 +03:00
// Wait until all nodes have consistent info.
2018-05-17 16:09:56 +03:00
wanted := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
2019-07-25 13:53:00 +03:00
ID: "",
2018-05-17 16:09:56 +03:00
Addr: "127.0.0.1:8220",
}, {
2019-07-25 13:53:00 +03:00
ID: "",
2018-05-17 16:09:56 +03:00
Addr: "127.0.0.1:8223",
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
2019-07-25 13:53:00 +03:00
ID: "",
2018-05-17 16:09:56 +03:00
Addr: "127.0.0.1:8221",
}, {
2019-07-25 13:53:00 +03:00
ID: "",
2018-05-17 16:09:56 +03:00
Addr: "127.0.0.1:8224",
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
2019-07-25 13:53:00 +03:00
ID: "",
2018-05-17 16:09:56 +03:00
Addr: "127.0.0.1:8222",
}, {
2019-07-25 13:53:00 +03:00
ID: "",
2018-05-17 16:09:56 +03:00
Addr: "127.0.0.1:8225",
}},
}}
for _, client := range scenario.clients {
2015-11-14 16:54:16 +03:00
err := eventually(func() error {
2020-03-11 17:26:42 +03:00
res, err := client.ClusterSlots(ctx).Result()
2015-11-22 15:44:38 +03:00
if err != nil {
return err
2015-11-21 14:16:13 +03:00
}
return assertSlotsEqual(res, wanted)
2016-03-14 17:51:46 +03:00
}, 30*time.Second)
if err != nil {
return err
}
}
return nil
}
func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
2018-10-11 13:58:31 +03:00
outerLoop:
for _, s2 := range wanted {
for _, s1 := range slots {
if slotEqual(s1, s2) {
2018-10-11 13:58:31 +03:00
continue outerLoop
}
}
return fmt.Errorf("%v not found in %v", s2, slots)
}
return nil
}
func slotEqual(s1, s2 redis.ClusterSlot) bool {
if s1.Start != s2.Start {
return false
}
if s1.End != s2.End {
return false
}
2016-12-16 17:26:48 +03:00
if len(s1.Nodes) != len(s2.Nodes) {
return false
}
for i, n1 := range s1.Nodes {
if n1.Addr != s2.Nodes[i].Addr {
return false
}
}
return true
}
//------------------------------------------------------------------------------
var _ = Describe("ClusterClient", func() {
2018-05-17 16:09:56 +03:00
var failover bool
2016-12-16 17:26:48 +03:00
var opt *redis.ClusterOptions
var client *redis.ClusterClient
2015-01-24 15:12:48 +03:00
2016-12-16 17:26:48 +03:00
assertClusterClient := func() {
2015-01-24 15:12:48 +03:00
It("should GET/SET/DEL", func() {
2020-03-11 17:26:42 +03:00
err := client.Get(ctx, "A").Err()
2015-01-24 15:12:48 +03:00
Expect(err).To(Equal(redis.Nil))
2020-03-11 17:26:42 +03:00
err = client.Set(ctx, "A", "VALUE", 0).Err()
2015-01-24 15:12:48 +03:00
Expect(err).NotTo(HaveOccurred())
2017-07-09 13:10:07 +03:00
Eventually(func() string {
2020-03-11 17:26:42 +03:00
return client.Get(ctx, "A").Val()
2017-08-31 15:22:47 +03:00
}, 30*time.Second).Should(Equal("VALUE"))
2015-01-24 15:12:48 +03:00
2020-03-11 17:26:42 +03:00
cnt, err := client.Del(ctx, "A").Result()
2015-01-24 15:12:48 +03:00
Expect(err).NotTo(HaveOccurred())
Expect(cnt).To(Equal(int64(1)))
})
2018-05-17 16:09:56 +03:00
It("GET follows redirects", func() {
2020-03-11 17:26:42 +03:00
err := client.Set(ctx, "A", "VALUE", 0).Err()
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
2015-05-01 10:42:58 +03:00
2018-05-17 16:09:56 +03:00
if !failover {
Eventually(func() int64 {
2020-03-11 17:26:42 +03:00
nodes, err := client.Nodes(ctx, "A")
2018-05-17 16:09:56 +03:00
if err != nil {
return 0
}
2020-03-11 17:26:42 +03:00
return nodes[1].Client.DBSize(ctx).Val()
2018-05-17 16:09:56 +03:00
}, 30*time.Second).Should(Equal(int64(1)))
2015-01-24 15:12:48 +03:00
2018-05-17 16:09:56 +03:00
Eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.SwapNodes(ctx, "A")
2018-05-17 16:09:56 +03:00
}, 30*time.Second).ShouldNot(HaveOccurred())
}
2020-03-11 17:26:42 +03:00
v, err := client.Get(ctx, "A").Result()
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal("VALUE"))
})
It("SET follows redirects", func() {
if !failover {
Eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.SwapNodes(ctx, "A")
2018-05-17 16:09:56 +03:00
}, 30*time.Second).ShouldNot(HaveOccurred())
}
2020-03-11 17:26:42 +03:00
err := client.Set(ctx, "A", "VALUE", 0).Err()
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
2020-03-11 17:26:42 +03:00
v, err := client.Get(ctx, "A").Result()
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal("VALUE"))
})
2015-12-16 17:11:52 +03:00
2016-10-09 14:12:32 +03:00
It("distributes keys", func() {
for i := 0; i < 100; i++ {
2020-03-11 17:26:42 +03:00
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
2016-10-09 14:12:32 +03:00
Expect(err).NotTo(HaveOccurred())
}
2020-03-11 17:26:42 +03:00
client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
2018-05-17 16:09:56 +03:00
defer GinkgoRecover()
2017-07-09 13:10:07 +03:00
Eventually(func() string {
2020-03-11 17:26:42 +03:00
return master.Info(ctx, "keyspace").Val()
2017-08-31 15:22:47 +03:00
}, 30*time.Second).Should(Or(
2017-07-09 13:10:07 +03:00
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
))
2018-05-17 16:09:56 +03:00
return nil
})
2016-10-09 14:12:32 +03:00
})
It("distributes keys when using EVAL", func() {
script := redis.NewScript(`
local r = redis.call('SET', KEYS[1], ARGV[1])
return r
`)
var key string
for i := 0; i < 100; i++ {
key = fmt.Sprintf("key%d", i)
2020-03-11 17:26:42 +03:00
err := script.Run(ctx, client, []string{key}, "value").Err()
2016-10-09 14:12:32 +03:00
Expect(err).NotTo(HaveOccurred())
}
2020-03-11 17:26:42 +03:00
client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
2018-06-29 10:45:05 +03:00
defer GinkgoRecover()
2017-07-09 13:10:07 +03:00
Eventually(func() string {
2020-03-11 17:26:42 +03:00
return master.Info(ctx, "keyspace").Val()
2017-08-31 15:22:47 +03:00
}, 30*time.Second).Should(Or(
2017-07-09 13:10:07 +03:00
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
))
2018-06-29 10:45:05 +03:00
return nil
})
2016-10-09 14:12:32 +03:00
})
It("distributes scripts when using Script Load", func() {
client.ScriptFlush(ctx)
script := redis.NewScript(`return 'Unique script'`)
script.Load(ctx, client)
client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
defer GinkgoRecover()
val, _ := script.Exists(ctx, shard).Result()
Expect(val[0]).To(Equal(true))
return nil
})
})
It("checks all shards when using Script Exists", func() {
client.ScriptFlush(ctx)
script := redis.NewScript(`return 'First script'`)
lostScriptSrc := `return 'Lost script'`
lostScript := redis.NewScript(lostScriptSrc)
script.Load(ctx, client)
client.Do(ctx, "script", "load", lostScriptSrc)
val, _ := client.ScriptExists(ctx, script.Hash(), lostScript.Hash()).Result()
Expect(val).To(Equal([]bool{true, false}))
})
It("flushes scripts from all shards when using ScriptFlush", func() {
script := redis.NewScript(`return 'Unnecessary script'`)
script.Load(ctx, client)
val, _ := client.ScriptExists(ctx, script.Hash()).Result()
Expect(val).To(Equal([]bool{true}))
client.ScriptFlush(ctx)
val, _ = client.ScriptExists(ctx, script.Hash()).Result()
Expect(val).To(Equal([]bool{false}))
})
2016-06-17 15:09:38 +03:00
It("supports Watch", func() {
2015-12-16 17:11:52 +03:00
var incr func(string) error
// Transactionally increments key using GET and SET commands.
incr = func(key string) error {
2020-03-11 17:26:42 +03:00
err := client.Watch(ctx, func(tx *redis.Tx) error {
n, err := tx.Get(ctx, key).Int64()
2016-05-02 15:54:15 +03:00
if err != nil && err != redis.Nil {
return err
}
2020-03-11 17:26:42 +03:00
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
2016-05-02 15:54:15 +03:00
return nil
})
2015-12-16 17:11:52 +03:00
return err
2016-05-02 15:54:15 +03:00
}, key)
2015-12-16 17:11:52 +03:00
if err == redis.TxFailedErr {
return incr(key)
}
return err
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
2016-07-02 15:52:10 +03:00
defer GinkgoRecover()
2015-12-16 17:11:52 +03:00
defer wg.Done()
err := incr("key")
Expect(err).NotTo(HaveOccurred())
}()
}
wg.Wait()
2018-02-15 14:00:54 +03:00
Eventually(func() string {
2020-03-11 17:26:42 +03:00
return client.Get(ctx, "key").Val()
2018-02-15 14:00:54 +03:00
}, 30*time.Second).Should(Equal("100"))
2015-12-16 17:11:52 +03:00
})
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
Describe("pipelining", func() {
var pipe *redis.Pipeline
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
assertPipeline := func() {
2016-12-16 17:26:48 +03:00
keys := []string{"A", "B", "C", "D", "E", "F", "G"}
2016-04-06 14:01:08 +03:00
2016-12-16 17:26:48 +03:00
It("follows redirects", func() {
2018-05-17 16:09:56 +03:00
if !failover {
for _, key := range keys {
Eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.SwapNodes(ctx, key)
2018-05-17 16:09:56 +03:00
}, 30*time.Second).ShouldNot(HaveOccurred())
}
2016-12-16 17:26:48 +03:00
}
2016-12-13 18:28:39 +03:00
for i, key := range keys {
2020-03-11 17:26:42 +03:00
pipe.Set(ctx, key, key+"_value", 0)
pipe.Expire(ctx, key, time.Duration(i+1)*time.Hour)
2016-12-13 18:28:39 +03:00
}
2020-03-11 17:26:42 +03:00
cmds, err := pipe.Exec(ctx)
2016-12-13 18:28:39 +03:00
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
2020-06-10 15:04:12 +03:00
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
defer GinkgoRecover()
Eventually(func() int64 {
2020-03-11 17:26:42 +03:00
return node.DBSize(ctx).Val()
}, 30*time.Second).ShouldNot(BeZero())
return nil
})
2018-05-17 16:09:56 +03:00
if !failover {
for _, key := range keys {
Eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.SwapNodes(ctx, key)
2018-05-17 16:09:56 +03:00
}, 30*time.Second).ShouldNot(HaveOccurred())
}
2016-12-16 17:26:48 +03:00
}
2016-12-13 18:28:39 +03:00
for _, key := range keys {
2020-03-11 17:26:42 +03:00
pipe.Get(ctx, key)
pipe.TTL(ctx, key)
2016-12-13 18:28:39 +03:00
}
2020-03-11 17:26:42 +03:00
cmds, err = pipe.Exec(ctx)
2016-12-13 18:28:39 +03:00
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
2016-12-16 17:26:48 +03:00
for i, key := range keys {
get := cmds[i*2].(*redis.StringCmd)
Expect(get.Val()).To(Equal(key + "_value"))
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
2017-08-15 10:34:05 +03:00
dur := time.Duration(i+1) * time.Hour
2018-07-23 15:55:13 +03:00
Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
2016-12-16 17:26:48 +03:00
}
2016-12-13 18:28:39 +03:00
})
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
It("works with missing keys", func() {
2020-03-11 17:26:42 +03:00
pipe.Set(ctx, "A", "A_value", 0)
pipe.Set(ctx, "C", "C_value", 0)
_, err := pipe.Exec(ctx)
2016-12-16 17:26:48 +03:00
Expect(err).NotTo(HaveOccurred())
2020-03-11 17:26:42 +03:00
a := pipe.Get(ctx, "A")
b := pipe.Get(ctx, "B")
c := pipe.Get(ctx, "C")
cmds, err := pipe.Exec(ctx)
2016-12-13 18:28:39 +03:00
Expect(err).To(Equal(redis.Nil))
Expect(cmds).To(HaveLen(3))
Expect(a.Err()).NotTo(HaveOccurred())
Expect(a.Val()).To(Equal("A_value"))
Expect(b.Err()).To(Equal(redis.Nil))
Expect(b.Val()).To(Equal(""))
Expect(c.Err()).NotTo(HaveOccurred())
Expect(c.Val()).To(Equal("C_value"))
})
}
2017-07-09 13:10:07 +03:00
Describe("with Pipeline", func() {
2016-12-13 18:28:39 +03:00
BeforeEach(func() {
pipe = client.Pipeline().(*redis.Pipeline)
2016-10-09 14:12:32 +03:00
})
2016-04-06 14:01:08 +03:00
AfterEach(func() {})
2016-12-13 18:28:39 +03:00
assertPipeline()
})
2016-04-06 14:01:08 +03:00
2017-07-09 13:10:07 +03:00
Describe("with TxPipeline", func() {
2016-12-13 18:28:39 +03:00
BeforeEach(func() {
pipe = client.TxPipeline().(*redis.Pipeline)
2016-12-13 18:28:39 +03:00
})
AfterEach(func() {})
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
assertPipeline()
2016-10-09 14:12:32 +03:00
})
2016-04-06 14:01:08 +03:00
})
2016-06-17 15:09:38 +03:00
2017-07-09 10:07:20 +03:00
It("supports PubSub", func() {
2020-03-11 17:26:42 +03:00
pubsub := client.Subscribe(ctx, "mychannel")
2017-07-09 10:07:20 +03:00
defer pubsub.Close()
2017-07-09 13:10:07 +03:00
Eventually(func() error {
2020-03-11 17:26:42 +03:00
_, err := client.Publish(ctx, "mychannel", "hello").Result()
2017-07-09 13:10:07 +03:00
if err != nil {
return err
}
2017-07-09 10:07:20 +03:00
2020-03-11 17:26:42 +03:00
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
2017-07-09 13:10:07 +03:00
if err != nil {
return err
}
2017-07-09 10:07:20 +03:00
2017-07-09 13:10:07 +03:00
_, ok := msg.(*redis.Message)
if !ok {
return fmt.Errorf("got %T, wanted *redis.Message", msg)
}
2022-08-03 18:10:03 +03:00
return nil
}, 30*time.Second).ShouldNot(HaveOccurred())
})
It("supports sharded PubSub", func() {
pubsub := client.SSubscribe(ctx, "mychannel")
defer pubsub.Close()
Eventually(func() error {
_, err := client.SPublish(ctx, "mychannel", "hello").Result()
if err != nil {
return err
}
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
if err != nil {
return err
}
_, ok := msg.(*redis.Message)
if !ok {
return fmt.Errorf("got %T, wanted *redis.Message", msg)
}
2017-07-09 13:10:07 +03:00
return nil
}, 30*time.Second).ShouldNot(HaveOccurred())
})
It("supports PubSub.Ping without channels", func() {
2020-03-11 17:26:42 +03:00
pubsub := client.Subscribe(ctx)
defer pubsub.Close()
2020-03-11 17:26:42 +03:00
err := pubsub.Ping(ctx)
Expect(err).NotTo(HaveOccurred())
})
2020-09-11 12:46:38 +03:00
}
Describe("ClusterClient", func() {
BeforeEach(func() {
opt = redisClusterOptions()
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})
It("returns pool stats", func() {
stats := client.PoolStats()
Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
})
It("returns an error when there are no attempts left", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err := client.Get(ctx, "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("calls fn for every master node", func() {
for i := 0; i < 10; i++ {
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
}
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
size, err := client.DBSize(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(size).To(Equal(int64(0)))
})
It("should CLUSTER SLOTS", func() {
res, err := client.ClusterSlots(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(3))
wanted := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:8220",
}, {
ID: "",
Addr: "127.0.0.1:8223",
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:8221",
}, {
ID: "",
Addr: "127.0.0.1:8224",
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:8222",
}, {
ID: "",
Addr: "127.0.0.1:8225",
}},
}}
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
})
It("should CLUSTER NODES", func() {
res, err := client.ClusterNodes(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res)).To(BeNumerically(">", 400))
})
It("should CLUSTER INFO", func() {
res, err := client.ClusterInfo(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
})
It("should CLUSTER KEYSLOT", func() {
hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
Expect(err).NotTo(HaveOccurred())
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
})
It("should CLUSTER GETKEYSINSLOT", func() {
keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(keys)).To(Equal(0))
})
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})
It("should CLUSTER COUNTKEYSINSLOT", func() {
n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})
It("should CLUSTER SAVECONFIG", func() {
res, err := client.ClusterSaveConfig(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})
It("should CLUSTER SLAVES", func() {
nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
Expect(nodesList).Should(HaveLen(1))
})
It("should RANDOMKEY", func() {
const nkeys = 100
for i := 0; i < nkeys; i++ {
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
var keys []string
addKey := func(key string) {
for _, k := range keys {
if k == key {
return
}
}
keys = append(keys, key)
}
for i := 0; i < nkeys*10; i++ {
key := client.RandomKey(ctx).Val()
addKey(key)
}
Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
})
2020-02-14 15:30:07 +03:00
It("supports Process hook", func() {
testCtx, cancel := context.WithCancel(ctx)
defer cancel()
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
Expect(err).NotTo(HaveOccurred())
2020-06-10 15:04:12 +03:00
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
2020-03-11 17:26:42 +03:00
return node.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
})
Expect(err).NotTo(HaveOccurred())
var stack []string
clusterHook := &hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
select {
case <-testCtx.Done():
return hook(ctx, cmd)
default:
}
Expect(cmd.String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcess")
err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcess")
return err
}
2020-02-14 15:30:07 +03:00
},
}
client.AddHook(clusterHook)
2020-02-14 16:37:35 +03:00
nodeHook := &hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
select {
case <-testCtx.Done():
return hook(ctx, cmd)
default:
}
Expect(cmd.String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcess")
err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcess")
return err
}
2020-02-14 15:30:07 +03:00
},
}
2020-06-10 15:04:12 +03:00
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
2020-02-14 16:37:35 +03:00
node.AddHook(nodeHook)
return nil
})
2020-02-14 15:30:07 +03:00
2020-03-11 17:26:42 +03:00
err = client.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"cluster.BeforeProcess",
"shard.BeforeProcess",
"shard.AfterProcess",
"cluster.AfterProcess",
}))
})
It("supports Pipeline hook", func() {
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
Expect(err).NotTo(HaveOccurred())
2020-06-10 15:04:12 +03:00
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
2020-03-11 17:26:42 +03:00
return node.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
})
Expect(err).NotTo(HaveOccurred())
var stack []string
client.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcessPipeline")
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcessPipeline")
return err
}
2020-02-14 15:30:07 +03:00
},
})
2020-06-10 15:04:12 +03:00
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
2020-02-14 16:37:35 +03:00
node.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcessPipeline")
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline")
return err
}
2020-02-14 15:30:07 +03:00
},
})
2020-02-14 16:37:35 +03:00
return nil
})
2020-02-14 15:30:07 +03:00
2020-03-11 17:26:42 +03:00
_, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
2020-02-14 15:30:07 +03:00
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"cluster.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"cluster.AfterProcessPipeline",
}))
})
It("supports TxPipeline hook", func() {
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
Expect(err).NotTo(HaveOccurred())
2020-06-10 15:04:12 +03:00
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
2020-03-11 17:26:42 +03:00
return node.Ping(ctx).Err()
2020-02-14 15:30:07 +03:00
})
Expect(err).NotTo(HaveOccurred())
var stack []string
client.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcessPipeline")
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcessPipeline")
return err
}
2020-02-14 15:30:07 +03:00
},
})
2020-06-10 15:04:12 +03:00
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
2020-02-14 16:37:35 +03:00
node.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcessPipeline")
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline")
return err
}
2020-02-14 15:30:07 +03:00
},
})
2020-02-14 16:37:35 +03:00
return nil
})
2020-02-14 15:30:07 +03:00
2020-03-11 17:26:42 +03:00
_, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
2020-02-14 15:30:07 +03:00
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"cluster.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"cluster.AfterProcessPipeline",
}))
})
It("should return correct replica for key", func() {
2020-12-16 18:45:06 +03:00
client, err := client.SlaveForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred())
info := client.Info(ctx, "server")
Expect(info.Val()).Should(ContainSubstring("tcp_port:8224"))
})
It("should return correct master for key", func() {
client, err := client.MasterForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred())
info := client.Info(ctx, "server")
Expect(info.Val()).Should(ContainSubstring("tcp_port:8221"))
})
2017-07-09 13:10:07 +03:00
assertClusterClient()
})
Describe("ClusterClient with RouteByLatency", func() {
BeforeEach(func() {
2016-12-16 17:26:48 +03:00
opt = redisClusterOptions()
opt.RouteByLatency = true
2020-03-11 17:26:42 +03:00
client = cluster.newClusterClient(ctx, opt)
2016-06-17 15:09:38 +03:00
2020-03-11 17:26:42 +03:00
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
2016-06-17 15:09:38 +03:00
})
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
2017-08-31 15:22:47 +03:00
2020-03-11 17:26:42 +03:00
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
2017-08-31 15:22:47 +03:00
Eventually(func() int64 {
2020-03-11 17:26:42 +03:00
return client.DBSize(ctx).Val()
2017-08-31 15:22:47 +03:00
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
2020-03-11 17:26:42 +03:00
err := client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
return slave.ReadWrite(ctx).Err()
2017-08-31 15:22:47 +03:00
})
2018-05-17 16:09:56 +03:00
Expect(err).NotTo(HaveOccurred())
err = client.Close()
Expect(err).NotTo(HaveOccurred())
})
2016-12-16 17:26:48 +03:00
assertClusterClient()
2016-04-06 14:01:08 +03:00
})
2018-06-29 10:45:05 +03:00
Describe("ClusterClient with ClusterSlots", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
2020-09-09 15:27:17 +03:00
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
2018-06-29 10:45:05 +03:00
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard1Port,
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard2Port,
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard3Port,
}},
}}
return slots, nil
}
2020-03-11 17:26:42 +03:00
client = cluster.newClusterClient(ctx, opt)
2018-06-29 10:45:05 +03:00
2020-03-11 17:26:42 +03:00
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
2018-06-29 10:45:05 +03:00
})
Expect(err).NotTo(HaveOccurred())
2020-03-11 17:26:42 +03:00
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
2020-03-11 17:26:42 +03:00
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.RouteRandomly = true
2020-09-09 15:27:17 +03:00
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard1Port,
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard2Port,
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard3Port,
}},
}}
return slots, nil
}
2020-03-11 17:26:42 +03:00
client = cluster.newClusterClient(ctx, opt)
2020-03-11 17:26:42 +03:00
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
2020-03-11 17:26:42 +03:00
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
2018-06-29 10:45:05 +03:00
Eventually(func() int64 {
2020-03-11 17:26:42 +03:00
return client.DBSize(ctx).Val()
2018-06-29 10:45:05 +03:00
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.ReadOnly = true
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
Nodes: []redis.ClusterNode{{
Addr: ":8220",
}, {
Addr: ":8223",
}},
}, {
Start: 5000,
End: 9999,
Nodes: []redis.ClusterNode{{
Addr: ":8221",
}, {
Addr: ":8224",
}},
}, {
Start: 10000,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":8222",
}, {
Addr: ":8225",
}},
}}
return slots, nil
}
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
2018-06-29 10:45:05 +03:00
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
})
var _ = Describe("ClusterClient without nodes", func() {
var client *redis.ClusterClient
BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{})
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
2017-08-31 15:22:47 +03:00
It("Ping returns an error", func() {
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
It("pipeline returns an error", func() {
2020-03-11 17:26:42 +03:00
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
2016-10-09 14:12:32 +03:00
})
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
})
var _ = Describe("ClusterClient without valid nodes", func() {
var client *redis.ClusterClient
BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{redisAddr},
})
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("returns an error", func() {
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
2018-02-15 14:00:54 +03:00
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
It("pipeline returns an error", func() {
2020-03-11 17:26:42 +03:00
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
2018-02-15 14:00:54 +03:00
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
})
var _ = Describe("ClusterClient with unavailable Cluster", func() {
var client *redis.ClusterClient
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = 250 * time.Millisecond
opt.WriteTimeout = 250 * time.Millisecond
opt.MaxRedirects = 1
2020-09-05 11:34:37 +03:00
client = cluster.newClusterClientUnstable(opt)
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("recovers when Cluster recovers", func() {
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.Ping(ctx).Err()
}, "30s").ShouldNot(HaveOccurred())
})
})
var _ = Describe("ClusterClient timeout", func() {
var client *redis.ClusterClient
2016-10-09 14:12:32 +03:00
AfterEach(func() {
2017-03-04 14:04:27 +03:00
_ = client.Close()
})
testTimeout := func() {
It("Ping timeouts", func() {
2020-03-11 17:26:42 +03:00
err := client.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
2016-10-09 14:12:32 +03:00
})
It("Pipeline timeouts", func() {
2020-03-11 17:26:42 +03:00
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
2016-10-09 14:12:32 +03:00
return nil
})
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
2016-10-09 14:12:32 +03:00
})
It("Tx timeouts", func() {
2020-03-11 17:26:42 +03:00
err := client.Watch(ctx, func(tx *redis.Tx) error {
return tx.Ping(ctx).Err()
2017-08-31 15:22:47 +03:00
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
It("Tx Pipeline timeouts", func() {
2020-03-11 17:26:42 +03:00
err := client.Watch(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
return err
2017-08-31 15:22:47 +03:00
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
}
2016-10-09 14:12:32 +03:00
const pause = 5 * time.Second
2017-08-31 15:22:47 +03:00
Context("read/write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
2020-03-11 17:26:42 +03:00
client = cluster.newClusterClient(ctx, opt)
2020-06-10 15:04:12 +03:00
err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
err := client.ClientPause(ctx, pause).Err()
opt := client.Options()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = time.Nanosecond
return err
})
Expect(err).NotTo(HaveOccurred())
// Overwrite timeouts after the client is initialized.
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = time.Nanosecond
opt.MaxRedirects = 0
})
AfterEach(func() {
2020-06-10 15:04:12 +03:00
_ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
2018-02-15 14:00:54 +03:00
defer GinkgoRecover()
opt := client.Options()
opt.ReadTimeout = time.Second
opt.WriteTimeout = time.Second
2017-08-15 10:34:05 +03:00
Eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.Ping(ctx).Err()
2017-08-15 10:34:05 +03:00
}, 2*pause).ShouldNot(HaveOccurred())
return nil
})
2022-11-21 12:31:38 +03:00
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
testTimeout()
})
2015-01-24 15:12:48 +03:00
})
func TestParseClusterURL(t *testing.T) {
cases := []struct {
test string
url string
o *redis.ClusterOptions // expected value
err error
}{
{
test: "ParseRedisURL",
url: "redis://localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
}, {
test: "ParseRedissURL",
url: "rediss://localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "MissingRedisPort",
url: "redis://localhost",
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
}, {
test: "MissingRedissPort",
url: "rediss://localhost",
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "MultipleRedisURLs",
url: "redis://localhost:123?addr=localhost:1234&addr=localhost:12345",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}},
}, {
test: "MultipleRedissURLs",
url: "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "OnlyPassword",
url: "redis://:bar@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
}, {
test: "OnlyUser",
url: "redis://foo@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
}, {
test: "RedisUsernamePassword",
url: "redis://foo:bar@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
}, {
test: "RedissUsernamePassword",
url: "rediss://foo:bar@localhost:123?addr=localhost:1234",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "QueryParameters",
url: "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
}, {
test: "DisabledTimeout",
url: "redis://localhost:123?conn_max_idle_time=0",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
}, {
test: "DisabledTimeoutNeg",
url: "redis://localhost:123?conn_max_idle_time=-1",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
}, {
test: "UseDefault",
url: "redis://localhost:123?conn_max_idle_time=",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "UseDefaultMissing=",
url: "redis://localhost:123?conn_max_idle_time",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "InvalidQueryAddr",
url: "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234",
err: errors.New(`redis: unable to parse addr param: rediss://foo:barr@localhost:1234`),
}, {
test: "InvalidInt",
url: "redis://localhost?pool_size=five",
err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
}, {
test: "InvalidBool",
url: "redis://localhost?pool_fifo=yes",
err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
}, {
test: "UnknownParam",
url: "redis://localhost?abc=123",
err: errors.New("redis: unexpected option: abc"),
}, {
test: "InvalidScheme",
url: "https://google.com",
err: errors.New("redis: invalid URL scheme: https"),
},
}
for i := range cases {
tc := cases[i]
t.Run(tc.test, func(t *testing.T) {
t.Parallel()
actual, err := redis.ParseClusterURL(tc.url)
if tc.err == nil && err != nil {
t.Fatalf("unexpected error: %q", err)
return
}
if tc.err != nil && err == nil {
t.Fatalf("expected error: got %+v", actual)
return
}
if tc.err != nil && err != nil {
if tc.err.Error() != err.Error() {
t.Fatalf("got %q, expected %q", err, tc.err)
}
return
}
comprareOptions(t, actual, tc.o)
})
}
}
func comprareOptions(t *testing.T, actual, expected *redis.ClusterOptions) {
t.Helper()
assert.Equal(t, expected.Addrs, actual.Addrs)
assert.Equal(t, expected.TLSConfig, actual.TLSConfig)
assert.Equal(t, expected.Username, actual.Username)
assert.Equal(t, expected.Password, actual.Password)
assert.Equal(t, expected.MaxRetries, actual.MaxRetries)
assert.Equal(t, expected.MinRetryBackoff, actual.MinRetryBackoff)
assert.Equal(t, expected.MaxRetryBackoff, actual.MaxRetryBackoff)
assert.Equal(t, expected.DialTimeout, actual.DialTimeout)
assert.Equal(t, expected.ReadTimeout, actual.ReadTimeout)
assert.Equal(t, expected.WriteTimeout, actual.WriteTimeout)
assert.Equal(t, expected.PoolFIFO, actual.PoolFIFO)
assert.Equal(t, expected.PoolSize, actual.PoolSize)
assert.Equal(t, expected.MinIdleConns, actual.MinIdleConns)
assert.Equal(t, expected.ConnMaxLifetime, actual.ConnMaxLifetime)
assert.Equal(t, expected.ConnMaxIdleTime, actual.ConnMaxIdleTime)
assert.Equal(t, expected.PoolTimeout, actual.PoolTimeout)
}