redis/sentinel.go

763 lines
20 KiB
Go
Raw Normal View History

2014-05-11 18:11:55 +04:00
package redis
import (
"context"
"crypto/tls"
2014-05-11 18:11:55 +04:00
"errors"
"net"
"strings"
"sync"
"time"
2022-06-04 17:39:21 +03:00
"github.com/go-redis/redis/v9/internal"
"github.com/go-redis/redis/v9/internal/pool"
"github.com/go-redis/redis/v9/internal/rand"
2014-05-11 18:11:55 +04:00
)
//------------------------------------------------------------------------------
// FailoverOptions are used to configure a failover client and should
// be passed to NewFailoverClient.
2014-05-11 18:11:55 +04:00
type FailoverOptions struct {
2015-01-31 17:54:37 +03:00
// The master name.
MasterName string
// A seed list of host:port addresses of sentinel nodes.
SentinelAddrs []string
chore: sync master (#2051) * Upgrade redis-server version (#1833) * Upgrade redis-server version Signed-off-by: monkey <golang@88.com> * XAutoClaim changed the return value Signed-off-by: monkey <golang@88.com> * add cmd: geosearch, geosearchstore (#1836) * add cmd: geosearch, geosearchstore Signed-off-by: monkey92t <golang@88.com> * GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing Signed-off-by: monkey92t <golang@88.com> * Added missing method XInfoStreamFull to Cmdable interface * Run go mod tidy in redisotel Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> * Revert "ConnPool check fd for bad conns (#1824)" (#1849) This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e. * Automate release process (#1852) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Upgrade gomega to v1.15.0 Signed-off-by: monkey92t <golang@88.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: monkey92t <golang@88.com> * Add version.go * Fix otel example * Fix package name in release script * More fixes for otel example * And more * Fix release.sh * Release v8.11.3 (release.sh) * Create an annotated tag to give release.yml chance to run * Tweak tag.sh * Add Cmd.Slice helper to cast to []interface{} (#1859) * after the connection pool is closed, no new connections should be added (#1863) * after the connection pool is closed, no new connections should be added Signed-off-by: monkey92t <golang@88.com> * remove runGoroutine Signed-off-by: monkey92t <golang@88.com> * pool.popIdle add p.closed check Signed-off-by: monkey92t <golang@88.com> * upgrade golangci-lint v1.42.0 Signed-off-by: monkey92t <golang@88.com> * Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865) Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Add go 1.17 to the build matrix * Remove go 1.15 from build matrix * Add scan struct example (#1870) * Replace release job * Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872) Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2. - [Release notes](https://github.com/cespare/xxhash/releases) - [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2) --- updated-dependencies: - dependency-name: github.com/cespare/xxhash/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix tag script to push tag by tag * Fix releasing.md * Fix/pubsub ping mutex (#1878) * Fix PubSub.Ping to hold the lock * Fix PubSub.Ping to hold the lock * add write cmd data-race test Signed-off-by: monkey92t <golang@88.com> Co-authored-by: monkey92t <golang@88.com> * chore: cleanup OpenTelemetry example * chore: gofmt all code * Refactor TestParseURL This is in preparation for supporting query parameters in ParseURL: - use an expected *Options instance to execute assertions on - extract assertions into helper function - enable parallel testing - condense test table * Add query parameter parsing to ParseURL() Before this change, ParseURL would only accept a very restricted set of URLs (it returned an error, if it encountered any parameter). This commit introduces the ability to process URLs like redis://localhost/1?dial_timeout=10s and similar. Go programs which were providing a configuration tunable (e.g. CLI flag, config entry or environment variable) to configure the Redis connection now don't need to perform this task themselves. * chore: add links to readme * chore: fix discussions link * empty hooks.withContext removed * chore: gofmt * chore: use conventional commits and auto-generate changelog * feat: add acl auth support for sentinels * chore: swap to acl auth at the test-level * Add support for BLMove command * chore: update dependencies * chore: update link * feat: add SetVal method for each command * feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example * chore: tweak GH actions to run all jobs * chore: add Lua scripting example * Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914) * fix: recycle connections in some Redis Cluster scenarios This issue was surfaced in a Cloud Provider solution that used for rolling out new nodes using the same address (hostname) of the nodes that will be replaced in a Redis Cluster, while the former ones once depromoted as Slaves would continue in service during some mintues for redirecting traffic. The solution basically identifies when the connection could be stale since a MOVED response will be returned using the same address (hostname) that is being used by the connection. At that moment we consider the connection as no longer usable forcing to recycle the connection. * chore: lazy reload when moved or ask * chore: use conv commit message * chore: release v8.11.4 (release.sh) * fix: add whitespace for avoid unlikely colisions * fix: format * chore: fix links * chore: use ctx parameter in cmdInfo * Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925) Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5. - [Release notes](https://github.com/onsi/ginkgo/releases) - [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5) --- updated-dependencies: - dependency-name: github.com/onsi/ginkgo dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * feat: add support for time.Duration write and scan * test: add test case for setting and scanning durations * chore: fix linter * fix(extra/redisotel): set span.kind attribute to client According to the opentelemetry specification this should always be set to client for database client libraries. I've also removed the SetAttributes call and instead set the attributes during creation of the span. This is what the library SHOULD be doing according to the opentelemetry api specification. * chore: update otel example * fix: update some argument counts in pre-allocs In some cases number of pre-allocated places in argument array is missing 1 or 2 elements, which results in re-allocation of twice as large array * chore: add example how to delete keys without a ttl * chore: don't enable all lints * chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * feat: Add redis v7's NX, XX, GT, LT expire variants * chore: add missing readme * chore: tweak feature links * chore: remove Discord * fix: set timeout for WAIT command. Fixes #1963 * build: update `go` directive in `go.mod` to 1.17 This commit enables support for module graph pruning and lazy module loading for projects that are at Go 1.17 or higher. Reference: https://go.dev/ref/mod#go-mod-file-go Reference: https://go.dev/ref/mod#graph-pruning Reference: https://go.dev/ref/mod#lazy-loading Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> * chore: update link * chore: export cmder.SetFirstKeyPos to support build module commands * feat(redisotel): ability to override TracerProvider (#1998) * fix: add missing Expire methods to Cmdable This is a followup to https://github.com/go-redis/redis/pull/1928 * chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update README.md (#2011) chore: add fmt library in example code * chore: instrumentation name and version (#2012) * fix: invalid type assert in stringArg * chore: cleanup * fix: example/otel compile error (#2028) * fix: rename Golang to Go (#2030) https://go.dev/doc/faq#go_or_golang * feat: add support for passing extra attributes added to spans * feat: set net.peer.name and net.peer.port in otel example * chore: tweak Uptrace copy * feat: add support for COPY command (#2016) * feat: add support for acl sentinel auth in universal client * chore(deps): bump actions/checkout from 2 to 3 Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * chore: add hll example * chore: tweak release script * chore: release v8.11.5 (release.sh) * chore: add discord back Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com> Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com> Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Kishan B <kishancs46@gmail.com> Co-authored-by: Dominik Menke <dom@digineo.de> Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com> Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co> Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com> Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com> Co-authored-by: Pau Freixes <pfreixes@gmail.com> Co-authored-by: Ethan Hur <ethan0311@gmail.com> Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com> Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com> Co-authored-by: ffenix113 <razerer@bigmir.net> Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com> Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com> Co-authored-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com> Co-authored-by: Nelz <nelz9999@users.noreply.github.com> Co-authored-by: Daniel Richter <Nexyz9@gmail.com> Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com> Co-authored-by: lintanghui <lintanghui@bilibili.com> Co-authored-by: hidu <duv123+github@gmail.com> Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com> Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
// If specified with SentinelPassword, enables ACL-based authentication (via
// AUTH <user> <pass>).
SentinelUsername string
// Sentinel password from "requirepass <password>" (if enabled) in Sentinel
// configuration, or, if SentinelUsername is also supplied, used for ACL-based
// authentication.
2019-06-04 14:05:29 +03:00
SentinelPassword string
2014-05-11 18:11:55 +04:00
2022-06-04 17:25:12 +03:00
// Allows routing read-only commands to the closest master or replica node.
2020-09-11 15:52:38 +03:00
// This option only works with NewFailoverClusterClient.
RouteByLatency bool
2022-06-04 17:25:12 +03:00
// Allows routing read-only commands to the random master or replica node.
2020-09-11 15:52:38 +03:00
// This option only works with NewFailoverClusterClient.
RouteRandomly bool
2022-06-04 17:25:12 +03:00
// Route all commands to replica read-only nodes.
ReplicaOnly bool
2020-09-05 17:39:26 +03:00
2022-06-04 17:25:12 +03:00
// Use replicas disconnected with master when cannot get connected replicas
// Now, this option only works in RandomReplicaAddr function.
UseDisconnectedReplicas bool
// Following options are copied from Options struct.
2019-06-04 14:05:29 +03:00
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
2020-06-10 10:36:22 +03:00
OnConnect func(ctx context.Context, cn *Conn) error
2017-05-25 14:16:39 +03:00
2020-05-21 08:59:20 +03:00
Username string
2019-06-04 14:05:29 +03:00
Password string
DB int
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
2016-03-17 19:00:47 +03:00
DialTimeout time.Duration
ReadTimeout time.Duration
2015-01-31 17:54:37 +03:00
WriteTimeout time.Duration
2014-05-11 18:11:55 +04:00
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool
2016-03-17 19:00:47 +03:00
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
2016-03-17 19:00:47 +03:00
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
TLSConfig *tls.Config
2014-05-11 18:11:55 +04:00
}
2020-09-11 11:24:38 +03:00
func (opt *FailoverOptions) clientOptions() *Options {
return &Options{
2020-06-10 10:36:22 +03:00
Addr: "FailoverClient",
2019-05-18 14:00:07 +03:00
Dialer: opt.Dialer,
2017-05-25 14:16:39 +03:00
OnConnect: opt.OnConnect,
2014-05-11 18:11:55 +04:00
DB: opt.DB,
2020-05-21 08:59:20 +03:00
Username: opt.Username,
2014-05-11 18:11:55 +04:00
Password: opt.Password,
MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
2016-03-17 19:00:47 +03:00
DialTimeout: opt.DialTimeout,
2014-05-11 18:11:55 +04:00
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolFIFO: opt.PoolFIFO,
2016-03-17 19:00:47 +03:00
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig,
2014-05-11 18:11:55 +04:00
}
2020-09-11 11:24:38 +03:00
}
func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
return &Options{
Addr: addr,
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
DB: 0,
chore: sync master (#2051) * Upgrade redis-server version (#1833) * Upgrade redis-server version Signed-off-by: monkey <golang@88.com> * XAutoClaim changed the return value Signed-off-by: monkey <golang@88.com> * add cmd: geosearch, geosearchstore (#1836) * add cmd: geosearch, geosearchstore Signed-off-by: monkey92t <golang@88.com> * GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing Signed-off-by: monkey92t <golang@88.com> * Added missing method XInfoStreamFull to Cmdable interface * Run go mod tidy in redisotel Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> * Revert "ConnPool check fd for bad conns (#1824)" (#1849) This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e. * Automate release process (#1852) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Upgrade gomega to v1.15.0 Signed-off-by: monkey92t <golang@88.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: monkey92t <golang@88.com> * Add version.go * Fix otel example * Fix package name in release script * More fixes for otel example * And more * Fix release.sh * Release v8.11.3 (release.sh) * Create an annotated tag to give release.yml chance to run * Tweak tag.sh * Add Cmd.Slice helper to cast to []interface{} (#1859) * after the connection pool is closed, no new connections should be added (#1863) * after the connection pool is closed, no new connections should be added Signed-off-by: monkey92t <golang@88.com> * remove runGoroutine Signed-off-by: monkey92t <golang@88.com> * pool.popIdle add p.closed check Signed-off-by: monkey92t <golang@88.com> * upgrade golangci-lint v1.42.0 Signed-off-by: monkey92t <golang@88.com> * Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865) Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Add go 1.17 to the build matrix * Remove go 1.15 from build matrix * Add scan struct example (#1870) * Replace release job * Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872) Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2. - [Release notes](https://github.com/cespare/xxhash/releases) - [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2) --- updated-dependencies: - dependency-name: github.com/cespare/xxhash/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix tag script to push tag by tag * Fix releasing.md * Fix/pubsub ping mutex (#1878) * Fix PubSub.Ping to hold the lock * Fix PubSub.Ping to hold the lock * add write cmd data-race test Signed-off-by: monkey92t <golang@88.com> Co-authored-by: monkey92t <golang@88.com> * chore: cleanup OpenTelemetry example * chore: gofmt all code * Refactor TestParseURL This is in preparation for supporting query parameters in ParseURL: - use an expected *Options instance to execute assertions on - extract assertions into helper function - enable parallel testing - condense test table * Add query parameter parsing to ParseURL() Before this change, ParseURL would only accept a very restricted set of URLs (it returned an error, if it encountered any parameter). This commit introduces the ability to process URLs like redis://localhost/1?dial_timeout=10s and similar. Go programs which were providing a configuration tunable (e.g. CLI flag, config entry or environment variable) to configure the Redis connection now don't need to perform this task themselves. * chore: add links to readme * chore: fix discussions link * empty hooks.withContext removed * chore: gofmt * chore: use conventional commits and auto-generate changelog * feat: add acl auth support for sentinels * chore: swap to acl auth at the test-level * Add support for BLMove command * chore: update dependencies * chore: update link * feat: add SetVal method for each command * feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example * chore: tweak GH actions to run all jobs * chore: add Lua scripting example * Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914) * fix: recycle connections in some Redis Cluster scenarios This issue was surfaced in a Cloud Provider solution that used for rolling out new nodes using the same address (hostname) of the nodes that will be replaced in a Redis Cluster, while the former ones once depromoted as Slaves would continue in service during some mintues for redirecting traffic. The solution basically identifies when the connection could be stale since a MOVED response will be returned using the same address (hostname) that is being used by the connection. At that moment we consider the connection as no longer usable forcing to recycle the connection. * chore: lazy reload when moved or ask * chore: use conv commit message * chore: release v8.11.4 (release.sh) * fix: add whitespace for avoid unlikely colisions * fix: format * chore: fix links * chore: use ctx parameter in cmdInfo * Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925) Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5. - [Release notes](https://github.com/onsi/ginkgo/releases) - [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5) --- updated-dependencies: - dependency-name: github.com/onsi/ginkgo dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * feat: add support for time.Duration write and scan * test: add test case for setting and scanning durations * chore: fix linter * fix(extra/redisotel): set span.kind attribute to client According to the opentelemetry specification this should always be set to client for database client libraries. I've also removed the SetAttributes call and instead set the attributes during creation of the span. This is what the library SHOULD be doing according to the opentelemetry api specification. * chore: update otel example * fix: update some argument counts in pre-allocs In some cases number of pre-allocated places in argument array is missing 1 or 2 elements, which results in re-allocation of twice as large array * chore: add example how to delete keys without a ttl * chore: don't enable all lints * chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * feat: Add redis v7's NX, XX, GT, LT expire variants * chore: add missing readme * chore: tweak feature links * chore: remove Discord * fix: set timeout for WAIT command. Fixes #1963 * build: update `go` directive in `go.mod` to 1.17 This commit enables support for module graph pruning and lazy module loading for projects that are at Go 1.17 or higher. Reference: https://go.dev/ref/mod#go-mod-file-go Reference: https://go.dev/ref/mod#graph-pruning Reference: https://go.dev/ref/mod#lazy-loading Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> * chore: update link * chore: export cmder.SetFirstKeyPos to support build module commands * feat(redisotel): ability to override TracerProvider (#1998) * fix: add missing Expire methods to Cmdable This is a followup to https://github.com/go-redis/redis/pull/1928 * chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update README.md (#2011) chore: add fmt library in example code * chore: instrumentation name and version (#2012) * fix: invalid type assert in stringArg * chore: cleanup * fix: example/otel compile error (#2028) * fix: rename Golang to Go (#2030) https://go.dev/doc/faq#go_or_golang * feat: add support for passing extra attributes added to spans * feat: set net.peer.name and net.peer.port in otel example * chore: tweak Uptrace copy * feat: add support for COPY command (#2016) * feat: add support for acl sentinel auth in universal client * chore(deps): bump actions/checkout from 2 to 3 Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * chore: add hll example * chore: tweak release script * chore: release v8.11.5 (release.sh) * chore: add discord back Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com> Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com> Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Kishan B <kishancs46@gmail.com> Co-authored-by: Dominik Menke <dom@digineo.de> Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com> Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co> Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com> Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com> Co-authored-by: Pau Freixes <pfreixes@gmail.com> Co-authored-by: Ethan Hur <ethan0311@gmail.com> Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com> Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com> Co-authored-by: ffenix113 <razerer@bigmir.net> Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com> Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com> Co-authored-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com> Co-authored-by: Nelz <nelz9999@users.noreply.github.com> Co-authored-by: Daniel Richter <Nexyz9@gmail.com> Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com> Co-authored-by: lintanghui <lintanghui@bilibili.com> Co-authored-by: hidu <duv123+github@gmail.com> Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com> Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
Username: opt.SentinelUsername,
2020-09-11 11:24:38 +03:00
Password: opt.SentinelPassword,
MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolFIFO: opt.PoolFIFO,
2020-09-11 11:24:38 +03:00
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig,
}
2014-05-11 18:11:55 +04:00
}
2020-09-09 15:27:17 +03:00
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
2020-09-11 11:24:38 +03:00
return &ClusterOptions{
2020-09-09 15:27:17 +03:00
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
Username: opt.Username,
Password: opt.Password,
2020-09-11 15:52:38 +03:00
MaxRedirects: opt.MaxRetries,
RouteByLatency: opt.RouteByLatency,
RouteRandomly: opt.RouteRandomly,
2020-09-09 15:27:17 +03:00
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolFIFO: opt.PoolFIFO,
2020-09-09 15:27:17 +03:00
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig,
}
}
2015-09-12 09:36:03 +03:00
// NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple
// goroutines.
2014-05-11 18:11:55 +04:00
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
2020-09-11 15:52:38 +03:00
if failoverOpt.RouteByLatency {
panic("to route commands by latency, use NewFailoverClusterClient")
}
if failoverOpt.RouteRandomly {
panic("to route commands randomly, use NewFailoverClusterClient")
}
2020-09-11 11:24:38 +03:00
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
2014-05-11 18:11:55 +04:00
2021-02-10 18:25:09 +03:00
rand.Shuffle(len(sentinelAddrs), func(i, j int) {
sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]
})
2020-09-11 11:24:38 +03:00
failover := &sentinelFailover{
opt: failoverOpt,
sentinelAddrs: sentinelAddrs,
2014-05-11 18:11:55 +04:00
}
2016-06-05 14:10:30 +03:00
2020-09-11 11:24:38 +03:00
opt := failoverOpt.clientOptions()
2022-06-04 17:25:12 +03:00
opt.Dialer = masterReplicaDialer(failover)
2020-09-11 11:24:38 +03:00
opt.init()
2020-09-09 15:27:17 +03:00
connPool := newConnPool(opt)
2020-12-06 11:05:26 +03:00
failover.mu.Lock()
2020-09-09 15:27:17 +03:00
failover.onFailover = func(ctx context.Context, addr string) {
_ = connPool.Filter(func(cn *pool.Conn) bool {
return cn.RemoteAddr().String() != addr
})
}
failover.mu.Unlock()
2020-09-09 15:27:17 +03:00
2018-01-20 13:26:33 +03:00
c := Client{
2020-09-09 15:27:17 +03:00
baseClient: newBaseClient(opt, connPool),
2020-02-02 15:59:27 +03:00
ctx: context.Background(),
}
2019-08-24 12:22:52 +03:00
c.cmdable = c.Process
2020-02-02 15:59:27 +03:00
c.onClose = failover.Close
2016-06-05 14:10:30 +03:00
2018-01-20 13:26:33 +03:00
return &c
2014-05-11 18:11:55 +04:00
}
2022-06-04 17:25:12 +03:00
func masterReplicaDialer(
2020-09-11 11:24:38 +03:00
failover *sentinelFailover,
) func(ctx context.Context, network, addr string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var addr string
var err error
2022-06-04 17:25:12 +03:00
if failover.opt.ReplicaOnly {
addr, err = failover.RandomReplicaAddr(ctx)
} else {
addr, err = failover.MasterAddr(ctx)
if err == nil {
failover.trySwitchMaster(ctx, addr)
}
}
if err != nil {
return nil, err
}
if failover.opt.Dialer != nil {
return failover.opt.Dialer(ctx, network, addr)
}
netDialer := &net.Dialer{
Timeout: failover.opt.DialTimeout,
KeepAlive: 5 * time.Minute,
}
if failover.opt.TLSConfig == nil {
return netDialer.DialContext(ctx, network, addr)
}
return tls.DialWithDialer(netDialer, network, addr, failover.opt.TLSConfig)
}
}
2014-05-11 18:11:55 +04:00
//------------------------------------------------------------------------------
// SentinelClient is a client for a Redis Sentinel.
2018-05-31 13:15:52 +03:00
type SentinelClient struct {
2019-05-31 17:37:34 +03:00
*baseClient
2020-09-17 12:27:16 +03:00
hooks
2014-05-11 18:11:55 +04:00
}
2018-05-31 13:15:52 +03:00
func NewSentinelClient(opt *Options) *SentinelClient {
2016-06-05 14:10:30 +03:00
opt.init()
2018-05-31 13:15:52 +03:00
c := &SentinelClient{
2019-05-31 17:37:34 +03:00
baseClient: &baseClient{
opt: opt,
connPool: newConnPool(opt),
},
2014-05-11 18:11:55 +04:00
}
2018-05-31 13:15:52 +03:00
return c
2014-05-11 18:11:55 +04:00
}
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
2020-09-17 12:27:16 +03:00
return c.hooks.process(ctx, cmd, c.baseClient.process)
}
func (c *SentinelClient) pubSub() *PubSub {
2018-07-23 15:55:13 +03:00
pubsub := &PubSub{
2017-07-09 10:07:20 +03:00
opt: c.opt,
2020-03-11 17:26:42 +03:00
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.newConn(ctx)
2014-05-11 18:11:55 +04:00
},
2017-07-09 10:07:20 +03:00
closeConn: c.connPool.CloseConn,
2014-05-11 18:11:55 +04:00
}
2018-07-23 15:55:13 +03:00
pubsub.init()
return pubsub
2014-05-11 18:11:55 +04:00
}
// Ping is used to test if a connection is still alive, or to
// measure latency.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Ping(ctx context.Context) *StringCmd {
cmd := NewStringCmd(ctx, "ping")
_ = c.Process(ctx, cmd)
return cmd
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
2020-03-11 17:26:42 +03:00
_ = pubsub.Subscribe(ctx, channels...)
}
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
2020-03-11 17:26:42 +03:00
_ = pubsub.PSubscribe(ctx, channels...)
}
return pubsub
}
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "sentinel", "get-master-addr-by-name", name)
_ = c.Process(ctx, cmd)
2014-05-11 18:11:55 +04:00
return cmd
}
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "sentinels", name)
2020-03-11 17:26:42 +03:00
_ = c.Process(ctx, cmd)
2014-05-11 18:11:55 +04:00
return cmd
}
2019-02-20 13:39:33 +03:00
// Failover forces a failover as if the master was not reachable, and without
// asking for agreement to other Sentinels.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
cmd := NewStatusCmd(ctx, "sentinel", "failover", name)
_ = c.Process(ctx, cmd)
2019-02-20 13:39:33 +03:00
return cmd
}
2019-02-20 18:19:42 +03:00
// Reset resets all the masters with matching name. The pattern argument is a
// glob-style pattern. The reset process clears any previous state in a master
2022-06-04 17:25:12 +03:00
// (including a failover in progress), and removes every replica and sentinel
2019-02-20 18:19:42 +03:00
// already discovered and associated with the master.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
_ = c.Process(ctx, cmd)
2019-02-20 18:19:42 +03:00
return cmd
}
2019-02-21 13:28:23 +03:00
// FlushConfig forces Sentinel to rewrite its configuration on disk, including
// the current Sentinel state.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "sentinel", "flushconfig")
_ = c.Process(ctx, cmd)
2019-02-21 13:28:23 +03:00
return cmd
}
2019-02-21 20:13:04 +03:00
// Master shows the state and info of the specified master.
func (c *SentinelClient) Master(ctx context.Context, name string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "sentinel", "master", name)
2020-03-11 17:26:42 +03:00
_ = c.Process(ctx, cmd)
2019-02-21 20:13:04 +03:00
return cmd
}
2019-05-25 23:31:06 +03:00
// Masters shows a list of monitored masters and their state.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
cmd := NewSliceCmd(ctx, "sentinel", "masters")
_ = c.Process(ctx, cmd)
2019-05-25 23:31:06 +03:00
return cmd
}
2022-06-04 17:25:12 +03:00
// Replicas shows a list of replicas for the specified master and their state.
func (c *SentinelClient) Replicas(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "replicas", name)
2020-03-11 17:26:42 +03:00
_ = c.Process(ctx, cmd)
2019-05-25 23:31:06 +03:00
return cmd
}
// CkQuorum checks if the current Sentinel configuration is able to reach the
// quorum needed to failover a master, and the majority needed to authorize the
// failover. This command should be used in monitoring systems to check if a
// Sentinel deployment is ok.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) CkQuorum(ctx context.Context, name string) *StringCmd {
cmd := NewStringCmd(ctx, "sentinel", "ckquorum", name)
_ = c.Process(ctx, cmd)
2019-05-25 23:31:06 +03:00
return cmd
}
// Monitor tells the Sentinel to start monitoring a new master with the specified
2019-05-25 23:58:27 +03:00
// name, ip, port, and quorum.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Monitor(ctx context.Context, name, ip, port, quorum string) *StringCmd {
cmd := NewStringCmd(ctx, "sentinel", "monitor", name, ip, port, quorum)
_ = c.Process(ctx, cmd)
return cmd
}
// Set is used in order to change configuration parameters of a specific master.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Set(ctx context.Context, name, option, value string) *StringCmd {
cmd := NewStringCmd(ctx, "sentinel", "set", name, option, value)
_ = c.Process(ctx, cmd)
return cmd
}
// Remove is used in order to remove the specified master: the master will no
// longer be monitored, and will totally be removed from the internal state of
2019-05-25 23:58:27 +03:00
// the Sentinel.
2020-03-11 17:26:42 +03:00
func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
cmd := NewStringCmd(ctx, "sentinel", "remove", name)
_ = c.Process(ctx, cmd)
return cmd
}
2020-09-09 15:27:17 +03:00
//------------------------------------------------------------------------------
2014-05-11 18:11:55 +04:00
type sentinelFailover struct {
2020-09-11 11:24:38 +03:00
opt *FailoverOptions
2014-05-11 18:11:55 +04:00
2020-09-11 11:24:38 +03:00
sentinelAddrs []string
2020-09-11 15:52:38 +03:00
onFailover func(ctx context.Context, addr string)
onUpdate func(ctx context.Context)
2014-05-11 18:11:55 +04:00
mu sync.RWMutex
_masterAddr string
2018-05-31 13:15:52 +03:00
sentinel *SentinelClient
pubsub *PubSub
}
func (c *sentinelFailover) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
2018-10-25 09:42:56 +03:00
if c.sentinel != nil {
return c.closeSentinel()
}
return nil
2014-05-11 18:11:55 +04:00
}
2019-10-08 12:43:00 +03:00
func (c *sentinelFailover) closeSentinel() error {
firstErr := c.pubsub.Close()
c.pubsub = nil
err := c.sentinel.Close()
if err != nil && firstErr == nil {
firstErr = err
}
c.sentinel = nil
return firstErr
}
2022-06-04 17:25:12 +03:00
func (c *sentinelFailover) RandomReplicaAddr(ctx context.Context) (string, error) {
if c.opt == nil {
return "", errors.New("opt is nil")
}
2022-06-04 17:25:12 +03:00
addresses, err := c.replicaAddrs(ctx, false)
if err != nil {
return "", err
}
2022-06-04 17:25:12 +03:00
if len(addresses) == 0 && c.opt.UseDisconnectedReplicas {
addresses, err = c.replicaAddrs(ctx, true)
if err != nil {
return "", err
}
}
if len(addresses) == 0 {
return c.MasterAddr(ctx)
}
return addresses[rand.Intn(len(addresses))], nil
}
2020-09-09 15:27:17 +03:00
func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
2019-10-08 12:43:00 +03:00
c.mu.RLock()
sentinel := c.sentinel
c.mu.RUnlock()
if sentinel != nil {
2020-03-11 17:26:42 +03:00
addr := c.getMasterAddr(ctx, sentinel)
2019-10-08 12:43:00 +03:00
if addr != "" {
return addr, nil
}
2014-05-11 18:11:55 +04:00
}
c.mu.Lock()
defer c.mu.Unlock()
2019-10-08 12:43:00 +03:00
if c.sentinel != nil {
2020-03-11 17:26:42 +03:00
addr := c.getMasterAddr(ctx, c.sentinel)
2019-10-08 12:43:00 +03:00
if addr != "" {
return addr, nil
}
_ = c.closeSentinel()
}
for i, sentinelAddr := range c.sentinelAddrs {
2020-09-11 11:24:38 +03:00
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
2014-05-11 18:11:55 +04:00
2020-09-11 11:24:38 +03:00
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
2014-05-11 18:11:55 +04:00
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
2020-09-11 11:24:38 +03:00
c.opt.MasterName, err)
2018-10-25 09:42:56 +03:00
_ = sentinel.Close()
continue
2014-05-11 18:11:55 +04:00
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
2020-03-11 17:26:42 +03:00
c.setSentinel(ctx, sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
2014-05-11 18:11:55 +04:00
}
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
2014-05-11 18:11:55 +04:00
}
2022-06-04 17:25:12 +03:00
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
c.mu.RLock()
sentinel := c.sentinel
c.mu.RUnlock()
if sentinel != nil {
2022-06-04 17:25:12 +03:00
addrs := c.getReplicaAddrs(ctx, sentinel)
if len(addrs) > 0 {
return addrs, nil
}
}
c.mu.Lock()
defer c.mu.Unlock()
if c.sentinel != nil {
2022-06-04 17:25:12 +03:00
addrs := c.getReplicaAddrs(ctx, c.sentinel)
if len(addrs) > 0 {
return addrs, nil
}
_ = c.closeSentinel()
}
var sentinelReachable bool
for i, sentinelAddr := range c.sentinelAddrs {
2020-09-11 11:24:38 +03:00
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
2022-06-04 17:25:12 +03:00
replicas, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
if err != nil {
2022-06-04 17:25:12 +03:00
internal.Logger.Printf(ctx, "sentinel: Replicas master=%q failed: %s",
2020-09-11 11:24:38 +03:00
c.opt.MasterName, err)
_ = sentinel.Close()
continue
}
sentinelReachable = true
2022-06-04 17:25:12 +03:00
addrs := parseReplicaAddrs(replicas, useDisconnected)
if len(addrs) == 0 {
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
return addrs, nil
}
if sentinelReachable {
return []string{}, nil
}
return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
}
2020-03-11 17:26:42 +03:00
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
2020-09-11 11:24:38 +03:00
addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
2020-09-11 11:24:38 +03:00
c.opt.MasterName, err)
return ""
}
return net.JoinHostPort(addr[0], addr[1])
2018-07-23 15:55:13 +03:00
}
2022-06-04 17:25:12 +03:00
func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *SentinelClient) []string {
addrs, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
if err != nil {
2022-06-04 17:25:12 +03:00
internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
2020-09-11 11:24:38 +03:00
c.opt.MasterName, err)
return []string{}
}
2022-06-04 17:25:12 +03:00
return parseReplicaAddrs(addrs, false)
}
2022-06-04 17:25:12 +03:00
func parseReplicaAddrs(addrs []map[string]string, keepDisconnected bool) []string {
2020-09-11 15:52:38 +03:00
nodes := make([]string, 0, len(addrs))
for _, node := range addrs {
isDown := false
if flags, ok := node["flags"]; ok {
for _, flag := range strings.Split(flags, ",") {
switch flag {
case "s_down", "o_down":
isDown = true
case "disconnected":
if !keepDisconnected {
isDown = true
}
}
}
}
if !isDown && node["ip"] != "" && node["port"] != "" {
nodes = append(nodes, net.JoinHostPort(node["ip"], node["port"]))
}
}
return nodes
}
2020-09-09 15:27:17 +03:00
func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
c.mu.RLock()
2021-03-23 11:55:14 +03:00
currentAddr := c._masterAddr //nolint:ifshort
c.mu.RUnlock()
2020-09-09 15:27:17 +03:00
if addr == currentAddr {
2018-07-23 15:55:13 +03:00
return
}
c.mu.Lock()
defer c.mu.Unlock()
2020-09-09 15:27:17 +03:00
if addr == c._masterAddr {
2019-10-08 12:43:00 +03:00
return
}
2020-09-09 15:27:17 +03:00
c._masterAddr = addr
2019-10-08 12:43:00 +03:00
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
2020-09-11 11:24:38 +03:00
c.opt.MasterName, addr)
2020-09-11 15:52:38 +03:00
if c.onFailover != nil {
c.onFailover(ctx, addr)
}
}
2020-03-11 17:26:42 +03:00
func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
2019-10-08 12:43:00 +03:00
if c.sentinel != nil {
panic("not reached")
}
c.sentinel = sentinel
2020-03-11 17:26:42 +03:00
c.discoverSentinels(ctx)
2018-11-11 13:13:00 +03:00
2022-06-04 17:25:12 +03:00
c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+replica-reconf-done")
2018-11-11 13:13:00 +03:00
go c.listen(c.pubsub)
}
2020-03-11 17:26:42 +03:00
func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
2020-09-11 11:24:38 +03:00
sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
2014-05-11 18:11:55 +04:00
if err != nil {
2020-09-11 11:24:38 +03:00
internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
2014-05-11 18:11:55 +04:00
return
}
for _, sentinel := range sentinels {
ip, ok := sentinel["ip"]
if !ok {
continue
}
port, ok := sentinel["port"]
if !ok {
continue
}
if ip != "" && port != "" {
sentinelAddr := net.JoinHostPort(ip, port)
if !contains(c.sentinelAddrs, sentinelAddr) {
internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
sentinelAddr, c.opt.MasterName)
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
2014-05-11 18:11:55 +04:00
}
}
}
}
2018-11-11 13:13:00 +03:00
func (c *sentinelFailover) listen(pubsub *PubSub) {
2020-09-11 15:52:38 +03:00
ctx := context.TODO()
2020-12-06 11:05:26 +03:00
2020-09-11 15:52:38 +03:00
if c.onUpdate != nil {
c.onUpdate(ctx)
}
2014-05-11 18:11:55 +04:00
2020-09-11 15:52:38 +03:00
ch := pubsub.Channel()
for msg := range ch {
if msg.Channel == "+switch-master" {
parts := strings.Split(msg.Payload, " ")
2020-09-11 11:24:38 +03:00
if parts[0] != c.opt.MasterName {
internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
continue
2014-05-11 18:11:55 +04:00
}
addr := net.JoinHostPort(parts[3], parts[4])
2020-09-09 15:27:17 +03:00
c.trySwitchMaster(pubsub.getContext(), addr)
2014-05-11 18:11:55 +04:00
}
2020-09-11 15:52:38 +03:00
if c.onUpdate != nil {
c.onUpdate(ctx)
}
2014-05-11 18:11:55 +04:00
}
}
func contains(slice []string, str string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}
2020-09-09 15:27:17 +03:00
//------------------------------------------------------------------------------
2020-09-11 15:52:38 +03:00
// NewFailoverClusterClient returns a client that supports routing read-only commands
2022-06-04 17:25:12 +03:00
// to a replica node.
2020-09-09 15:27:17 +03:00
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
2020-09-11 11:24:38 +03:00
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
2020-09-09 15:27:17 +03:00
2020-09-11 11:24:38 +03:00
failover := &sentinelFailover{
opt: failoverOpt,
sentinelAddrs: sentinelAddrs,
2020-09-09 15:27:17 +03:00
}
opt := failoverOpt.clusterOptions()
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
masterAddr, err := failover.MasterAddr(ctx)
if err != nil {
return nil, err
}
nodes := []ClusterNode{{
Addr: masterAddr,
}}
2022-06-04 17:25:12 +03:00
replicaAddrs, err := failover.replicaAddrs(ctx, false)
2020-09-09 15:27:17 +03:00
if err != nil {
return nil, err
}
2022-06-04 17:25:12 +03:00
for _, replicaAddr := range replicaAddrs {
2020-09-09 15:27:17 +03:00
nodes = append(nodes, ClusterNode{
2022-06-04 17:25:12 +03:00
Addr: replicaAddr,
2020-09-09 15:27:17 +03:00
})
}
slots := []ClusterSlot{
{
Start: 0,
End: 16383,
Nodes: nodes,
},
}
return slots, nil
}
c := NewClusterClient(opt)
2020-12-06 11:05:26 +03:00
failover.mu.Lock()
2020-09-11 15:52:38 +03:00
failover.onUpdate = func(ctx context.Context) {
c.ReloadState(ctx)
2020-09-09 15:27:17 +03:00
}
2020-12-06 11:05:26 +03:00
failover.mu.Unlock()
2020-09-09 15:27:17 +03:00
return c
}