redis/redis.go

800 lines
19 KiB
Go
Raw Normal View History

2017-02-18 17:42:34 +03:00
package redis
2012-07-25 17:00:50 +04:00
import (
"context"
2020-11-17 17:08:15 +03:00
"errors"
2015-05-15 15:21:28 +03:00
"fmt"
"net"
"strings"
2020-12-06 12:05:42 +03:00
"sync/atomic"
"time"
2022-06-04 17:39:21 +03:00
"github.com/go-redis/redis/v9/internal"
"github.com/go-redis/redis/v9/internal/hscan"
2022-06-04 17:39:21 +03:00
"github.com/go-redis/redis/v9/internal/pool"
"github.com/go-redis/redis/v9/internal/proto"
2012-08-09 14:12:41 +04:00
)
2012-07-25 17:00:50 +04:00
// Scanner internal/hscan.Scanner exposed interface.
type Scanner = hscan.Scanner
2019-08-09 16:23:56 +03:00
// Nil reply returned by Redis when key does not exist.
2018-02-22 15:14:30 +03:00
const Nil = proto.Nil
2016-07-02 15:52:10 +03:00
chore: sync master (#2051) * Upgrade redis-server version (#1833) * Upgrade redis-server version Signed-off-by: monkey <golang@88.com> * XAutoClaim changed the return value Signed-off-by: monkey <golang@88.com> * add cmd: geosearch, geosearchstore (#1836) * add cmd: geosearch, geosearchstore Signed-off-by: monkey92t <golang@88.com> * GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing Signed-off-by: monkey92t <golang@88.com> * Added missing method XInfoStreamFull to Cmdable interface * Run go mod tidy in redisotel Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> * Revert "ConnPool check fd for bad conns (#1824)" (#1849) This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e. * Automate release process (#1852) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832) * Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Upgrade gomega to v1.15.0 Signed-off-by: monkey92t <golang@88.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: monkey92t <golang@88.com> * Add version.go * Fix otel example * Fix package name in release script * More fixes for otel example * And more * Fix release.sh * Release v8.11.3 (release.sh) * Create an annotated tag to give release.yml chance to run * Tweak tag.sh * Add Cmd.Slice helper to cast to []interface{} (#1859) * after the connection pool is closed, no new connections should be added (#1863) * after the connection pool is closed, no new connections should be added Signed-off-by: monkey92t <golang@88.com> * remove runGoroutine Signed-off-by: monkey92t <golang@88.com> * pool.popIdle add p.closed check Signed-off-by: monkey92t <golang@88.com> * upgrade golangci-lint v1.42.0 Signed-off-by: monkey92t <golang@88.com> * Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865) Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Add go 1.17 to the build matrix * Remove go 1.15 from build matrix * Add scan struct example (#1870) * Replace release job * Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872) Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2. - [Release notes](https://github.com/cespare/xxhash/releases) - [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2) --- updated-dependencies: - dependency-name: github.com/cespare/xxhash/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix tag script to push tag by tag * Fix releasing.md * Fix/pubsub ping mutex (#1878) * Fix PubSub.Ping to hold the lock * Fix PubSub.Ping to hold the lock * add write cmd data-race test Signed-off-by: monkey92t <golang@88.com> Co-authored-by: monkey92t <golang@88.com> * chore: cleanup OpenTelemetry example * chore: gofmt all code * Refactor TestParseURL This is in preparation for supporting query parameters in ParseURL: - use an expected *Options instance to execute assertions on - extract assertions into helper function - enable parallel testing - condense test table * Add query parameter parsing to ParseURL() Before this change, ParseURL would only accept a very restricted set of URLs (it returned an error, if it encountered any parameter). This commit introduces the ability to process URLs like redis://localhost/1?dial_timeout=10s and similar. Go programs which were providing a configuration tunable (e.g. CLI flag, config entry or environment variable) to configure the Redis connection now don't need to perform this task themselves. * chore: add links to readme * chore: fix discussions link * empty hooks.withContext removed * chore: gofmt * chore: use conventional commits and auto-generate changelog * feat: add acl auth support for sentinels * chore: swap to acl auth at the test-level * Add support for BLMove command * chore: update dependencies * chore: update link * feat: add SetVal method for each command * feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example * chore: tweak GH actions to run all jobs * chore: add Lua scripting example * Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914) * fix: recycle connections in some Redis Cluster scenarios This issue was surfaced in a Cloud Provider solution that used for rolling out new nodes using the same address (hostname) of the nodes that will be replaced in a Redis Cluster, while the former ones once depromoted as Slaves would continue in service during some mintues for redirecting traffic. The solution basically identifies when the connection could be stale since a MOVED response will be returned using the same address (hostname) that is being used by the connection. At that moment we consider the connection as no longer usable forcing to recycle the connection. * chore: lazy reload when moved or ask * chore: use conv commit message * chore: release v8.11.4 (release.sh) * fix: add whitespace for avoid unlikely colisions * fix: format * chore: fix links * chore: use ctx parameter in cmdInfo * Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925) Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5. - [Release notes](https://github.com/onsi/ginkgo/releases) - [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5) --- updated-dependencies: - dependency-name: github.com/onsi/ginkgo dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * feat: add support for time.Duration write and scan * test: add test case for setting and scanning durations * chore: fix linter * fix(extra/redisotel): set span.kind attribute to client According to the opentelemetry specification this should always be set to client for database client libraries. I've also removed the SetAttributes call and instead set the attributes during creation of the span. This is what the library SHOULD be doing according to the opentelemetry api specification. * chore: update otel example * fix: update some argument counts in pre-allocs In some cases number of pre-allocated places in argument array is missing 1 or 2 elements, which results in re-allocation of twice as large array * chore: add example how to delete keys without a ttl * chore: don't enable all lints * chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * feat: Add redis v7's NX, XX, GT, LT expire variants * chore: add missing readme * chore: tweak feature links * chore: remove Discord * fix: set timeout for WAIT command. Fixes #1963 * build: update `go` directive in `go.mod` to 1.17 This commit enables support for module graph pruning and lazy module loading for projects that are at Go 1.17 or higher. Reference: https://go.dev/ref/mod#go-mod-file-go Reference: https://go.dev/ref/mod#graph-pruning Reference: https://go.dev/ref/mod#lazy-loading Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> * chore: update link * chore: export cmder.SetFirstKeyPos to support build module commands * feat(redisotel): ability to override TracerProvider (#1998) * fix: add missing Expire methods to Cmdable This is a followup to https://github.com/go-redis/redis/pull/1928 * chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1 Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update README.md (#2011) chore: add fmt library in example code * chore: instrumentation name and version (#2012) * fix: invalid type assert in stringArg * chore: cleanup * fix: example/otel compile error (#2028) * fix: rename Golang to Go (#2030) https://go.dev/doc/faq#go_or_golang * feat: add support for passing extra attributes added to spans * feat: set net.peer.name and net.peer.port in otel example * chore: tweak Uptrace copy * feat: add support for COPY command (#2016) * feat: add support for acl sentinel auth in universal client * chore(deps): bump actions/checkout from 2 to 3 Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * chore: add hll example * chore: tweak release script * chore: release v8.11.5 (release.sh) * chore: add discord back Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com> Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com> Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Kishan B <kishancs46@gmail.com> Co-authored-by: Dominik Menke <dom@digineo.de> Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com> Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co> Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com> Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com> Co-authored-by: Pau Freixes <pfreixes@gmail.com> Co-authored-by: Ethan Hur <ethan0311@gmail.com> Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com> Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com> Co-authored-by: ffenix113 <razerer@bigmir.net> Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com> Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com> Co-authored-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com> Co-authored-by: Nelz <nelz9999@users.noreply.github.com> Co-authored-by: Daniel Richter <Nexyz9@gmail.com> Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com> Co-authored-by: lintanghui <lintanghui@bilibili.com> Co-authored-by: hidu <duv123+github@gmail.com> Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com> Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
// SetLogger set custom log
2020-03-30 17:02:17 +03:00
func SetLogger(logger internal.Logging) {
2016-04-09 14:52:01 +03:00
internal.Logger = logger
}
//------------------------------------------------------------------------------
type Hook interface {
DialHook(next DialHook) DialHook
ProcessHook(next ProcessHook) ProcessHook
ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
}
type (
DialHook func(ctx context.Context, network, addr string) (net.Conn, error)
ProcessHook func(ctx context.Context, cmd Cmder) error
ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
)
type hooks struct {
2022-10-12 15:00:06 +03:00
slice []Hook
dialHook DialHook
processHook ProcessHook
processPipelineHook ProcessPipelineHook
processTxPipelineHook ProcessPipelineHook
}
// AddHook is to add a hook to the queue.
// Hook is a function executed during network connection, command execution, and pipeline,
// it is a first-in-last-out stack queue (FILO).
// The first to be added to the queue is the execution function of the redis command (the last to be executed).
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
// For example, you added hook-1, hook-2:
//
// client.AddHook(hook-1, hook-2)
//
// hook-1:
//
// func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
// return func(ctx context.Context, cmd Cmder) error {
// print("hook-1 start")
// next(ctx, cmd)
// print("hook-1 end")
// return nil
// }
// }
//
// hook-2:
//
// func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
// return func(ctx context.Context, cmd redis.Cmder) error {
// print("hook-2 start")
// next(ctx, cmd)
// print("hook-2 end")
// return nil
// }
// }
//
// The execution sequence is:
//
// hook-2 start -> hook-1 start -> exec redis cmd -> hook-1 end -> hook-2 end
//
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
// if "next(ctx, cmd)" is not executed in hook-1, the redis command will not be executed.
func (hs *hooks) AddHook(hook Hook) {
hs.slice = append(hs.slice, hook)
2022-10-12 15:00:06 +03:00
hs.dialHook = hook.DialHook(hs.dialHook)
hs.processHook = hook.ProcessHook(hs.processHook)
hs.processPipelineHook = hook.ProcessPipelineHook(hs.processPipelineHook)
hs.processTxPipelineHook = hook.ProcessPipelineHook(hs.processTxPipelineHook)
2019-12-29 13:06:43 +03:00
}
func (hs *hooks) clone() hooks {
clone := *hs
l := len(clone.slice)
clone.slice = clone.slice[:l:l]
return clone
}
func (hs *hooks) setDial(dial DialHook) {
2022-10-12 15:00:06 +03:00
hs.dialHook = dial
for _, h := range hs.slice {
2022-10-12 15:00:06 +03:00
if wrapped := h.DialHook(hs.dialHook); wrapped != nil {
hs.dialHook = wrapped
2020-09-10 11:06:15 +03:00
}
}
}
func (hs *hooks) setProcess(process ProcessHook) {
2022-10-12 15:00:06 +03:00
hs.processHook = process
for _, h := range hs.slice {
2022-10-12 15:00:06 +03:00
if wrapped := h.ProcessHook(hs.processHook); wrapped != nil {
hs.processHook = wrapped
}
}
}
func (hs *hooks) setProcessPipeline(processPipeline ProcessPipelineHook) {
2022-10-12 15:00:06 +03:00
hs.processPipelineHook = processPipeline
for _, h := range hs.slice {
2022-10-12 15:00:06 +03:00
if wrapped := h.ProcessPipelineHook(hs.processPipelineHook); wrapped != nil {
hs.processPipelineHook = wrapped
2020-09-10 11:06:15 +03:00
}
}
}
func (hs *hooks) setProcessTxPipeline(processTxPipeline ProcessPipelineHook) {
2022-10-12 15:00:06 +03:00
hs.processTxPipelineHook = processTxPipeline
for _, h := range hs.slice {
2022-10-12 15:00:06 +03:00
if wrapped := h.ProcessPipelineHook(hs.processTxPipelineHook); wrapped != nil {
hs.processTxPipelineHook = wrapped
}
}
}
func (hs *hooks) withProcessHook(ctx context.Context, cmd Cmder, hook ProcessHook) error {
for _, h := range hs.slice {
if wrapped := h.ProcessHook(hook); wrapped != nil {
hook = wrapped
}
}
return hook(ctx, cmd)
}
func (hs *hooks) withProcessPipelineHook(
ctx context.Context, cmds []Cmder, hook ProcessPipelineHook,
2020-02-14 16:37:35 +03:00
) error {
for _, h := range hs.slice {
if wrapped := h.ProcessPipelineHook(hook); wrapped != nil {
hook = wrapped
}
}
return hook(ctx, cmds)
2020-02-14 16:37:35 +03:00
}
2022-10-12 15:00:06 +03:00
func (hs *hooks) dial(ctx context.Context, network, addr string) (net.Conn, error) {
return hs.dialHook(ctx, network, addr)
}
func (hs *hooks) process(ctx context.Context, cmd Cmder) error {
return hs.processHook(ctx, cmd)
}
func (hs *hooks) processPipeline(ctx context.Context, cmds []Cmder) error {
return hs.processPipelineHook(ctx, cmds)
}
func (hs *hooks) processTxPipeline(ctx context.Context, cmds []Cmder) error {
return hs.processTxPipelineHook(ctx, cmds)
}
//------------------------------------------------------------------------------
type baseClient struct {
opt *Options
2018-03-07 14:50:14 +03:00
connPool pool.Pooler
onClose func() error // hook called when client is closed
}
2020-02-02 15:59:27 +03:00
func (c *baseClient) clone() *baseClient {
clone := *c
return &clone
}
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
opt := c.opt.clone()
opt.ReadTimeout = timeout
opt.WriteTimeout = timeout
clone := c.clone()
clone.opt = opt
return clone
}
2015-05-15 15:21:28 +03:00
func (c *baseClient) String() string {
2016-10-05 23:20:05 +03:00
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
2015-05-15 15:21:28 +03:00
}
2019-06-14 16:00:03 +03:00
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
cn, err := c.connPool.NewConn(ctx)
2017-07-09 10:07:20 +03:00
if err != nil {
return nil, err
}
2019-07-25 13:28:15 +03:00
err = c.initConn(ctx, cn)
2019-03-25 14:02:31 +03:00
if err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
2017-07-09 10:07:20 +03:00
}
return cn, nil
}
2019-06-04 14:05:29 +03:00
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
2020-02-02 12:09:27 +03:00
if c.opt.Limiter != nil {
err := c.opt.Limiter.Allow()
2018-10-14 10:53:48 +03:00
if err != nil {
return nil, err
}
}
2019-06-04 14:05:29 +03:00
cn, err := c._getConn(ctx)
2018-10-14 10:53:48 +03:00
if err != nil {
2020-02-02 12:09:27 +03:00
if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err)
2018-10-14 10:53:48 +03:00
}
return nil, err
}
2020-03-11 17:26:42 +03:00
2018-10-14 10:53:48 +03:00
return cn, nil
}
2019-06-04 14:05:29 +03:00
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
cn, err := c.connPool.Get(ctx)
2016-03-15 15:04:35 +03:00
if err != nil {
2018-05-28 17:27:24 +03:00
return nil, err
2016-03-15 15:04:35 +03:00
}
2020-03-11 17:26:42 +03:00
if cn.Inited {
return cn, nil
}
2021-03-20 11:01:48 +03:00
if err := c.initConn(ctx, cn); err != nil {
2020-08-15 15:36:02 +03:00
c.connPool.Remove(ctx, cn, err)
2020-11-17 17:08:15 +03:00
if err := errors.Unwrap(err); err != nil {
2019-08-08 10:36:13 +03:00
return nil, err
}
2019-03-25 14:02:31 +03:00
return nil, err
2016-03-12 13:41:02 +03:00
}
2017-04-18 13:12:38 +03:00
2018-05-28 17:27:24 +03:00
return cn, nil
2012-07-26 19:16:17 +04:00
}
2019-07-25 13:28:15 +03:00
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
2019-03-25 14:02:31 +03:00
if cn.Inited {
return nil
}
cn.Inited = true
2016-03-15 15:04:35 +03:00
username, password := c.opt.Username, c.opt.Password
if c.opt.CredentialsProvider != nil {
username, password = c.opt.CredentialsProvider()
}
2020-08-15 15:36:02 +03:00
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(c.opt, connPool)
var auth bool
// For redis-server < 6.0 that does not support the Hello command,
// we continue to provide services with RESP2.
2022-06-04 16:07:28 +03:00
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
auth = true
} else if !strings.HasPrefix(err.Error(), "ERR unknown command") {
return err
}
2020-03-11 17:26:42 +03:00
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
2022-06-04 14:28:10 +03:00
if !auth && password != "" {
if username != "" {
pipe.AuthACL(ctx, username, password)
2020-05-21 08:59:20 +03:00
} else {
pipe.Auth(ctx, password)
2020-05-21 08:59:20 +03:00
}
2016-03-12 13:41:02 +03:00
}
if c.opt.DB > 0 {
2020-03-11 17:26:42 +03:00
pipe.Select(ctx, c.opt.DB)
2016-03-12 13:41:02 +03:00
}
if c.opt.readOnly {
2020-03-11 17:26:42 +03:00
pipe.ReadOnly(ctx)
}
if c.opt.ClientName != "" {
pipe.ClientSetName(ctx, c.opt.ClientName)
}
return nil
})
2017-05-25 14:16:39 +03:00
if err != nil {
return err
}
if c.opt.OnConnect != nil {
2020-06-10 10:36:22 +03:00
return c.opt.OnConnect(ctx, conn)
2017-05-25 14:16:39 +03:00
}
return nil
2016-03-12 13:41:02 +03:00
}
2020-08-15 15:36:02 +03:00
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
2020-02-02 12:09:27 +03:00
if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err)
2019-08-07 16:57:48 +03:00
}
if isBadConn(err, false, c.opt.Addr) {
2020-08-15 15:36:02 +03:00
c.connPool.Remove(ctx, cn, err)
2019-08-07 16:57:48 +03:00
} else {
2020-08-15 15:36:02 +03:00
c.connPool.Put(ctx, cn)
2019-08-07 16:57:48 +03:00
}
}
func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
2021-03-20 11:01:48 +03:00
cn, err := c.getConn(ctx)
if err != nil {
return err
}
err = fn(ctx, cn)
c.releaseConn(ctx, cn, err)
return err
}
func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, error) {
return c.opt.Dialer(ctx, network, addr)
}
2019-06-04 13:30:47 +03:00
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
var lastErr error
2017-07-09 13:10:07 +03:00
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
2020-03-11 17:26:42 +03:00
attempt := attempt
2021-03-20 11:01:48 +03:00
retry, err := c._process(ctx, cmd, attempt)
2020-03-11 17:26:42 +03:00
if err == nil || !retry {
return err
}
2021-03-20 11:01:48 +03:00
2020-03-11 17:26:42 +03:00
lastErr = err
2012-07-26 19:16:17 +04:00
}
return lastErr
2014-05-11 11:42:40 +04:00
}
2021-03-20 11:01:48 +03:00
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return false, err
}
}
retryTimeout := uint32(0)
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
2021-03-20 11:01:48 +03:00
return writeCmd(wr, cmd)
}); err != nil {
2022-10-06 10:06:02 +03:00
atomic.StoreUint32(&retryTimeout, 1)
2021-03-20 11:01:48 +03:00
return err
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
2021-03-20 11:01:48 +03:00
if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1)
2022-10-06 10:06:02 +03:00
} else {
atomic.StoreUint32(&retryTimeout, 0)
2021-03-20 11:01:48 +03:00
}
return err
}
return nil
}); err != nil {
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
return retry, err
2021-03-20 11:01:48 +03:00
}
return false, nil
2021-03-20 11:01:48 +03:00
}
2017-07-09 13:10:07 +03:00
func (c *baseClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
2018-09-13 09:14:52 +03:00
t := *timeout
if t == 0 {
return 0
}
return t + 10*time.Second
}
2018-01-24 21:38:47 +03:00
return c.opt.ReadTimeout
}
2014-05-11 11:42:40 +04:00
// Close closes the client, releasing any open resources.
2015-09-12 09:36:03 +03:00
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
2014-05-11 11:42:40 +04:00
func (c *baseClient) Close() error {
var firstErr error
if c.onClose != nil {
if err := c.onClose(); err != nil {
firstErr = err
}
}
if err := c.connPool.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
2012-07-26 19:16:17 +04:00
}
2016-10-05 23:20:05 +03:00
func (c *baseClient) getAddr() string {
return c.opt.Addr
}
2019-06-04 13:30:47 +03:00
func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
if err := c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds); err != nil {
return err
}
return cmdsFirstErr(cmds)
2018-01-20 13:26:33 +03:00
}
2019-06-04 13:30:47 +03:00
func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
if err := c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds); err != nil {
return err
}
return cmdsFirstErr(cmds)
}
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
func (c *baseClient) generalProcessPipeline(
ctx context.Context, cmds []Cmder, p pipelineProcessor,
) error {
var lastErr error
2018-01-20 13:26:33 +03:00
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
2019-07-30 12:13:00 +03:00
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
2019-07-30 12:13:00 +03:00
return err
}
2018-01-20 13:26:33 +03:00
}
2017-08-31 15:22:47 +03:00
2022-10-11 09:38:10 +03:00
// Enable retries by default to retry dial errors returned by withConn.
canRetry := true
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
var err error
canRetry, err = p(ctx, cn, cmds)
2018-01-20 13:26:33 +03:00
return err
})
2020-07-24 14:57:12 +03:00
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
return lastErr
2016-12-13 18:28:39 +03:00
}
}
return lastErr
2016-12-13 18:28:39 +03:00
}
func (c *baseClient) pipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
setCmdsErr(cmds, err)
2016-12-13 18:28:39 +03:00
return true, err
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
2018-08-15 11:53:15 +03:00
return pipelineReadCmds(rd, cmds)
}); err != nil {
return true, err
}
return false, nil
2016-12-13 18:28:39 +03:00
}
2018-08-17 13:56:37 +03:00
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
for i, cmd := range cmds {
2018-08-15 09:38:58 +03:00
err := cmd.readReply(rd)
2020-09-17 12:27:16 +03:00
cmd.SetErr(err)
if err != nil && !isRedisError(err) {
setCmdsErr(cmds[i+1:], err)
2017-08-31 15:22:47 +03:00
return err
2016-12-13 18:28:39 +03:00
}
}
// Retry errors like "LOADING redis is loading the dataset in memory".
return cmds[0].Err()
2016-12-13 18:28:39 +03:00
}
func (c *baseClient) txPipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
2020-02-14 16:37:35 +03:00
return writeCmds(wr, cmds)
}); err != nil {
setCmdsErr(cmds, err)
2016-12-13 18:28:39 +03:00
return true, err
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
2020-02-14 16:37:35 +03:00
statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1]
2020-02-14 16:37:35 +03:00
if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil {
setCmdsErr(cmds, err)
2018-08-15 11:53:15 +03:00
return err
}
2020-02-14 16:37:35 +03:00
return pipelineReadCmds(rd, trimmedCmds)
}); err != nil {
return false, err
}
return false, nil
2016-12-13 18:28:39 +03:00
}
2020-02-14 16:37:35 +03:00
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
2022-06-04 16:07:28 +03:00
// Parse +OK.
2020-02-14 16:37:35 +03:00
if err := statusCmd.readReply(rd); err != nil {
2017-08-31 15:22:47 +03:00
return err
2016-12-13 18:28:39 +03:00
}
2022-06-04 16:07:28 +03:00
// Parse +QUEUED.
2018-10-11 13:53:40 +03:00
for range cmds {
2020-02-14 16:37:35 +03:00
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
2017-08-31 15:22:47 +03:00
return err
2016-12-13 18:28:39 +03:00
}
}
// Parse number of replies.
2018-08-15 09:38:58 +03:00
line, err := rd.ReadLine()
2016-12-13 18:28:39 +03:00
if err != nil {
if err == Nil {
err = TxFailedErr
}
return err
}
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
2016-12-13 18:28:39 +03:00
}
return nil
}
func (c *baseClient) context(ctx context.Context) context.Context {
if c.opt.ContextTimeoutEnabled {
return ctx
}
return context.Background()
}
2014-05-11 11:42:40 +04:00
//------------------------------------------------------------------------------
2021-05-13 09:25:36 +03:00
// Client is a Redis client representing a pool of zero or more underlying connections.
// It's safe for concurrent use by multiple goroutines.
//
// Client creates and frees connections automatically; it also maintains a free pool
// of idle connections. You can control the pool size with Config.PoolSize option.
type Client struct {
2020-02-02 15:59:27 +03:00
*baseClient
2019-08-24 12:22:52 +03:00
cmdable
hooks
2015-01-24 15:12:48 +03:00
}
// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {
opt.init()
2018-01-20 13:26:33 +03:00
c := Client{
baseClient: &baseClient{
opt: opt,
},
2015-01-24 15:12:48 +03:00
}
c.init()
2022-10-12 15:00:06 +03:00
c.connPool = newConnPool(opt, c.hooks.dial)
2018-01-20 13:26:33 +03:00
return &c
2012-08-05 16:09:43 +04:00
}
func (c *Client) init() {
c.cmdable = c.Process
c.hooks.setDial(c.baseClient.dial)
c.hooks.setProcess(c.baseClient.process)
c.hooks.setProcessPipeline(c.baseClient.processPipeline)
c.hooks.setProcessTxPipeline(c.baseClient.processTxPipeline)
2020-02-02 15:59:27 +03:00
}
func (c *Client) WithTimeout(timeout time.Duration) *Client {
clone := *c
2020-02-02 15:59:27 +03:00
clone.baseClient = c.baseClient.withTimeout(timeout)
clone.init()
return &clone
2020-02-02 15:59:27 +03:00
}
func (c *Client) Conn() *Conn {
return newConn(c.opt, pool.NewStickyConnPool(c.connPool))
}
// Do create a Cmd from the args and processes the cmd.
2020-03-11 17:26:42 +03:00
func (c *Client) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...)
_ = c.Process(ctx, cmd)
2019-06-04 13:30:47 +03:00
return cmd
}
2020-03-11 17:26:42 +03:00
func (c *Client) Process(ctx context.Context, cmd Cmder) error {
err := c.hooks.process(ctx, cmd)
cmd.SetErr(err)
return err
2017-01-11 06:32:10 +03:00
}
2017-05-25 14:16:39 +03:00
// Options returns read-only Options that were used to create the client.
func (c *Client) Options() *Options {
return c.opt
}
2017-09-11 10:12:00 +03:00
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
2016-01-19 19:36:40 +03:00
func (c *Client) PoolStats() *PoolStats {
2017-09-11 10:12:00 +03:00
stats := c.connPool.Stats()
return (*PoolStats)(stats)
2016-01-19 19:36:40 +03:00
}
2020-03-11 17:26:42 +03:00
func (c *Client) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
2016-12-13 18:28:39 +03:00
}
2017-05-02 18:00:53 +03:00
func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
exec: pipelineExecer(c.hooks.processPipeline),
}
2019-05-31 17:03:20 +03:00
pipe.init()
return &pipe
}
2020-03-11 17:26:42 +03:00
func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(ctx, fn)
}
2016-12-16 15:19:53 +03:00
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() Pipeliner {
2016-12-13 18:28:39 +03:00
pipe := Pipeline{
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.hooks.processTxPipeline(ctx, cmds)
},
}
2019-05-31 17:03:20 +03:00
pipe.init()
2016-12-13 18:28:39 +03:00
return &pipe
}
func (c *Client) pubSub() *PubSub {
2018-07-23 15:55:13 +03:00
pubsub := &PubSub{
2017-07-09 10:07:20 +03:00
opt: c.opt,
2020-03-11 17:26:42 +03:00
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.newConn(ctx)
},
2017-07-09 10:07:20 +03:00
closeConn: c.connPool.CloseConn,
}
2018-07-23 15:55:13 +03:00
pubsub.init()
return pubsub
}
// Subscribe subscribes the client to the specified channels.
2017-06-17 12:43:19 +03:00
// Channels can be omitted to create empty subscription.
// Note that this method does not wait on a response from Redis, so the
// subscription may not be active immediately. To force the connection to wait,
// you may call the Receive() method on the returned *PubSub like so:
//
// sub := client.Subscribe(queryResp)
// iface, err := sub.Receive()
// if err != nil {
// // handle error
// }
2018-12-13 11:31:02 +03:00
//
// // Should be *Subscription, but others are possible if other actions have been
// // taken on sub since it was created.
// switch iface.(type) {
// case *Subscription:
// // subscribe succeeded
// case *Message:
// // received first message
// case *Pong:
// // pong received
// default:
// // handle error
// }
//
// ch := sub.Channel()
2020-03-11 17:26:42 +03:00
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
2020-03-11 17:26:42 +03:00
_ = pubsub.Subscribe(ctx, channels...)
}
2017-04-11 16:53:55 +03:00
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
2017-06-17 12:43:19 +03:00
// Patterns can be omitted to create empty subscription.
2020-03-11 17:26:42 +03:00
func (c *Client) PSubscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
2020-03-11 17:26:42 +03:00
_ = pubsub.PSubscribe(ctx, channels...)
}
2017-04-11 16:53:55 +03:00
return pubsub
}
2017-05-25 14:16:39 +03:00
2022-08-03 18:10:03 +03:00
// SSubscribe Subscribes the client to the specified shard channels.
// Channels can be omitted to create empty subscription.
func (c *Client) SSubscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.SSubscribe(ctx, channels...)
}
return pubsub
}
2017-05-25 14:16:39 +03:00
//------------------------------------------------------------------------------
2021-10-04 13:16:33 +03:00
// Conn represents a single Redis connection rather than a pool of connections.
// Prefer running commands from Client unless there is a specific need
// for a continuous single Redis connection.
2019-07-25 13:28:15 +03:00
type Conn struct {
baseClient
cmdable
statefulCmdable
hooks
2019-07-25 13:28:15 +03:00
}
func newConn(opt *Options, connPool pool.Pooler) *Conn {
c := Conn{
baseClient: baseClient{
opt: opt,
connPool: connPool,
},
}
2019-05-31 17:03:20 +03:00
c.cmdable = c.Process
c.statefulCmdable = c.Process
c.hooks.setDial(c.baseClient.dial)
c.hooks.setProcess(c.baseClient.process)
c.hooks.setProcessPipeline(c.baseClient.processPipeline)
c.hooks.setProcessTxPipeline(c.baseClient.processTxPipeline)
return &c
}
2020-03-11 17:26:42 +03:00
func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
err := c.hooks.process(ctx, cmd)
cmd.SetErr(err)
return err
2020-10-17 15:21:09 +03:00
}
2020-03-11 17:26:42 +03:00
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
2017-05-25 14:16:39 +03:00
}
func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
2022-11-22 15:28:39 +03:00
exec: c.hooks.processPipeline,
2017-05-25 14:16:39 +03:00
}
2019-05-31 17:03:20 +03:00
pipe.init()
2017-05-25 14:16:39 +03:00
return &pipe
}
2020-03-11 17:26:42 +03:00
func (c *Conn) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(ctx, fn)
2017-05-25 14:16:39 +03:00
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
2022-11-22 15:28:39 +03:00
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.hooks.processTxPipeline(ctx, cmds)
},
2017-05-25 14:16:39 +03:00
}
2019-05-31 17:03:20 +03:00
pipe.init()
2017-05-25 14:16:39 +03:00
return &pipe
}