2012-07-25 17:00:50 +04:00
|
|
|
package redis
|
|
|
|
|
|
|
|
import (
|
2019-06-08 15:02:51 +03:00
|
|
|
"context"
|
2012-07-25 17:00:50 +04:00
|
|
|
"fmt"
|
2019-03-12 13:40:08 +03:00
|
|
|
"strings"
|
2017-04-17 15:43:58 +03:00
|
|
|
"sync"
|
2014-05-11 11:42:40 +04:00
|
|
|
"time"
|
2016-03-12 11:52:13 +03:00
|
|
|
|
2020-03-11 17:29:16 +03:00
|
|
|
"github.com/go-redis/redis/v8/internal"
|
|
|
|
"github.com/go-redis/redis/v8/internal/pool"
|
|
|
|
"github.com/go-redis/redis/v8/internal/proto"
|
2012-07-25 17:00:50 +04:00
|
|
|
)
|
|
|
|
|
2019-04-08 15:06:31 +03:00
|
|
|
// PubSub implements Pub/Sub commands as described in
|
2018-07-23 15:55:13 +03:00
|
|
|
// http://redis.io/topics/pubsub. Message receiving is NOT safe
|
|
|
|
// for concurrent use by multiple goroutines.
|
2017-05-11 17:02:26 +03:00
|
|
|
//
|
2018-07-23 15:55:13 +03:00
|
|
|
// PubSub automatically reconnects to Redis Server and resubscribes
|
|
|
|
// to the channels in case of network errors.
|
2014-05-11 11:42:40 +04:00
|
|
|
type PubSub struct {
|
2017-07-09 10:07:20 +03:00
|
|
|
opt *Options
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
|
2017-07-09 10:07:20 +03:00
|
|
|
closeConn func(*pool.Conn) error
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2017-06-29 16:53:49 +03:00
|
|
|
mu sync.Mutex
|
|
|
|
cn *pool.Conn
|
2018-03-14 13:42:51 +03:00
|
|
|
channels map[string]struct{}
|
|
|
|
patterns map[string]struct{}
|
2019-03-12 13:40:08 +03:00
|
|
|
|
|
|
|
closed bool
|
|
|
|
exit chan struct{}
|
2017-04-24 12:43:15 +03:00
|
|
|
|
|
|
|
cmd *Cmd
|
2017-10-30 13:09:57 +03:00
|
|
|
|
|
|
|
chOnce sync.Once
|
2021-05-26 06:25:18 +03:00
|
|
|
msgCh *channel
|
|
|
|
allCh *channel
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *PubSub) init() {
|
|
|
|
c.exit = make(chan struct{})
|
2016-09-29 15:07:04 +03:00
|
|
|
}
|
|
|
|
|
2019-03-12 13:40:08 +03:00
|
|
|
func (c *PubSub) String() string {
|
|
|
|
channels := mapKeys(c.channels)
|
|
|
|
channels = append(channels, mapKeys(c.patterns)...)
|
|
|
|
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) {
|
2017-04-24 12:43:15 +03:00
|
|
|
c.mu.Lock()
|
2020-03-11 17:26:42 +03:00
|
|
|
cn, err := c.conn(ctx, nil)
|
2017-06-29 17:05:08 +03:00
|
|
|
c.mu.Unlock()
|
|
|
|
return cn, err
|
|
|
|
}
|
2017-04-24 12:43:15 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, error) {
|
2017-04-24 12:43:15 +03:00
|
|
|
if c.closed {
|
2017-06-29 17:05:08 +03:00
|
|
|
return nil, pool.ErrClosed
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
if c.cn != nil {
|
2017-06-29 17:05:08 +03:00
|
|
|
return c.cn, nil
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
|
2018-08-04 12:19:19 +03:00
|
|
|
channels := mapKeys(c.channels)
|
|
|
|
channels = append(channels, newChannels...)
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
cn, err := c.newConn(ctx, channels)
|
2016-09-29 15:07:04 +03:00
|
|
|
if err != nil {
|
2017-06-29 17:05:08 +03:00
|
|
|
return nil, err
|
2016-09-29 15:07:04 +03:00
|
|
|
}
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
if err := c.resubscribe(ctx, cn); err != nil {
|
2017-07-09 10:07:20 +03:00
|
|
|
_ = c.closeConn(cn)
|
2017-06-29 17:05:08 +03:00
|
|
|
return nil, err
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
c.cn = cn
|
2017-06-29 17:05:08 +03:00
|
|
|
return cn, nil
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
|
|
|
|
2019-06-08 15:02:51 +03:00
|
|
|
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
|
|
|
|
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
2018-08-17 13:56:37 +03:00
|
|
|
return writeCmd(wr, cmd)
|
2018-08-15 11:53:15 +03:00
|
|
|
})
|
2018-08-04 12:19:19 +03:00
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) resubscribe(ctx context.Context, cn *pool.Conn) error {
|
2017-04-17 16:05:01 +03:00
|
|
|
var firstErr error
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2017-04-24 12:43:15 +03:00
|
|
|
if len(c.channels) > 0 {
|
2020-03-11 17:26:42 +03:00
|
|
|
firstErr = c._subscribe(ctx, cn, "subscribe", mapKeys(c.channels))
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2017-04-24 12:43:15 +03:00
|
|
|
if len(c.patterns) > 0 {
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c._subscribe(ctx, cn, "psubscribe", mapKeys(c.patterns))
|
2018-07-23 15:55:13 +03:00
|
|
|
if err != nil && firstErr == nil {
|
2017-04-17 16:05:01 +03:00
|
|
|
firstErr = err
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2017-04-17 16:05:01 +03:00
|
|
|
return firstErr
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
func mapKeys(m map[string]struct{}) []string {
|
|
|
|
s := make([]string, len(m))
|
|
|
|
i := 0
|
|
|
|
for k := range m {
|
|
|
|
s[i] = k
|
2021-04-08 09:44:31 +03:00
|
|
|
i++
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
2018-08-04 12:19:19 +03:00
|
|
|
func (c *PubSub) _subscribe(
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx context.Context, cn *pool.Conn, redisCmd string, channels []string,
|
2018-08-04 12:19:19 +03:00
|
|
|
) error {
|
|
|
|
args := make([]interface{}, 0, 1+len(channels))
|
|
|
|
args = append(args, redisCmd)
|
|
|
|
for _, channel := range channels {
|
|
|
|
args = append(args, channel)
|
2017-06-29 16:53:49 +03:00
|
|
|
}
|
2020-03-11 17:26:42 +03:00
|
|
|
cmd := NewSliceCmd(ctx, args...)
|
|
|
|
return c.writeCmd(ctx, cn, cmd)
|
2017-06-29 16:53:49 +03:00
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) releaseConnWithLock(
|
|
|
|
ctx context.Context,
|
|
|
|
cn *pool.Conn,
|
|
|
|
err error,
|
|
|
|
allowTimeout bool,
|
|
|
|
) {
|
2017-04-24 12:43:15 +03:00
|
|
|
c.mu.Lock()
|
2020-03-11 17:26:42 +03:00
|
|
|
c.releaseConn(ctx, cn, err, allowTimeout)
|
2017-04-24 12:43:15 +03:00
|
|
|
c.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allowTimeout bool) {
|
2017-08-31 15:22:47 +03:00
|
|
|
if c.cn != cn {
|
|
|
|
return
|
|
|
|
}
|
chore: sync master (#2051)
* Upgrade redis-server version (#1833)
* Upgrade redis-server version
Signed-off-by: monkey <golang@88.com>
* XAutoClaim changed the return value
Signed-off-by: monkey <golang@88.com>
* add cmd: geosearch, geosearchstore (#1836)
* add cmd: geosearch, geosearchstore
Signed-off-by: monkey92t <golang@88.com>
* GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing
Signed-off-by: monkey92t <golang@88.com>
* Added missing method XInfoStreamFull to Cmdable interface
* Run go mod tidy in redisotel
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
* Revert "ConnPool check fd for bad conns (#1824)" (#1849)
This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e.
* Automate release process (#1852)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Upgrade gomega to v1.15.0
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: monkey92t <golang@88.com>
* Add version.go
* Fix otel example
* Fix package name in release script
* More fixes for otel example
* And more
* Fix release.sh
* Release v8.11.3 (release.sh)
* Create an annotated tag to give release.yml chance to run
* Tweak tag.sh
* Add Cmd.Slice helper to cast to []interface{} (#1859)
* after the connection pool is closed, no new connections should be added (#1863)
* after the connection pool is closed, no new connections should be added
Signed-off-by: monkey92t <golang@88.com>
* remove runGoroutine
Signed-off-by: monkey92t <golang@88.com>
* pool.popIdle add p.closed check
Signed-off-by: monkey92t <golang@88.com>
* upgrade golangci-lint v1.42.0
Signed-off-by: monkey92t <golang@88.com>
* Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865)
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Add go 1.17 to the build matrix
* Remove go 1.15 from build matrix
* Add scan struct example (#1870)
* Replace release job
* Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872)
Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2.
- [Release notes](https://github.com/cespare/xxhash/releases)
- [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2)
---
updated-dependencies:
- dependency-name: github.com/cespare/xxhash/v2
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix tag script to push tag by tag
* Fix releasing.md
* Fix/pubsub ping mutex (#1878)
* Fix PubSub.Ping to hold the lock
* Fix PubSub.Ping to hold the lock
* add write cmd data-race test
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: monkey92t <golang@88.com>
* chore: cleanup OpenTelemetry example
* chore: gofmt all code
* Refactor TestParseURL
This is in preparation for supporting query parameters
in ParseURL:
- use an expected *Options instance to execute assertions on
- extract assertions into helper function
- enable parallel testing
- condense test table
* Add query parameter parsing to ParseURL()
Before this change, ParseURL would only accept a very restricted
set of URLs (it returned an error, if it encountered any parameter).
This commit introduces the ability to process URLs like
redis://localhost/1?dial_timeout=10s
and similar.
Go programs which were providing a configuration tunable (e.g.
CLI flag, config entry or environment variable) to configure
the Redis connection now don't need to perform this task
themselves.
* chore: add links to readme
* chore: fix discussions link
* empty hooks.withContext removed
* chore: gofmt
* chore: use conventional commits and auto-generate changelog
* feat: add acl auth support for sentinels
* chore: swap to acl auth at the test-level
* Add support for BLMove command
* chore: update dependencies
* chore: update link
* feat: add SetVal method for each command
* feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example
* chore: tweak GH actions to run all jobs
* chore: add Lua scripting example
* Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914)
* fix: recycle connections in some Redis Cluster scenarios
This issue was surfaced in a Cloud Provider solution that used for
rolling out new nodes using the same address (hostname) of the nodes
that will be replaced in a Redis Cluster, while the former ones once
depromoted as Slaves would continue in service during some mintues
for redirecting traffic.
The solution basically identifies when the connection could be stale
since a MOVED response will be returned using the same address (hostname)
that is being used by the connection. At that moment we consider the
connection as no longer usable forcing to recycle the connection.
* chore: lazy reload when moved or ask
* chore: use conv commit message
* chore: release v8.11.4 (release.sh)
* fix: add whitespace for avoid unlikely colisions
* fix: format
* chore: fix links
* chore: use ctx parameter in cmdInfo
* Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925)
Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5)
---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* feat: add support for time.Duration write and scan
* test: add test case for setting and scanning durations
* chore: fix linter
* fix(extra/redisotel): set span.kind attribute to client
According to the opentelemetry specification this should always be set to client for database client
libraries.
I've also removed the SetAttributes call and instead set the attributes during creation of the span.
This is what the library SHOULD be doing according to the opentelemetry api specification.
* chore: update otel example
* fix: update some argument counts in pre-allocs
In some cases number of pre-allocated places in
argument array is missing 1 or 2 elements,
which results in re-allocation of twice as large array
* chore: add example how to delete keys without a ttl
* chore: don't enable all lints
* chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* feat: Add redis v7's NX, XX, GT, LT expire variants
* chore: add missing readme
* chore: tweak feature links
* chore: remove Discord
* fix: set timeout for WAIT command. Fixes #1963
* build: update `go` directive in `go.mod` to 1.17
This commit enables support for module graph pruning and lazy module
loading for projects that are at Go 1.17 or higher.
Reference: https://go.dev/ref/mod#go-mod-file-go
Reference: https://go.dev/ref/mod#graph-pruning
Reference: https://go.dev/ref/mod#lazy-loading
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
* chore: update link
* chore: export cmder.SetFirstKeyPos to support build module commands
* feat(redisotel): ability to override TracerProvider (#1998)
* fix: add missing Expire methods to Cmdable
This is a followup to https://github.com/go-redis/redis/pull/1928
* chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Update README.md (#2011)
chore: add fmt library in example code
* chore: instrumentation name and version (#2012)
* fix: invalid type assert in stringArg
* chore: cleanup
* fix: example/otel compile error (#2028)
* fix: rename Golang to Go (#2030)
https://go.dev/doc/faq#go_or_golang
* feat: add support for passing extra attributes added to spans
* feat: set net.peer.name and net.peer.port in otel example
* chore: tweak Uptrace copy
* feat: add support for COPY command (#2016)
* feat: add support for acl sentinel auth in universal client
* chore(deps): bump actions/checkout from 2 to 3
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)
---
updated-dependencies:
- dependency-name: actions/checkout
dependency-type: direct:production
update-type: version-update:semver-major
...
Signed-off-by: dependabot[bot] <support@github.com>
* chore: add hll example
* chore: tweak release script
* chore: release v8.11.5 (release.sh)
* chore: add discord back
Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com>
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Kishan B <kishancs46@gmail.com>
Co-authored-by: Dominik Menke <dom@digineo.de>
Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com>
Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co>
Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com>
Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com>
Co-authored-by: Pau Freixes <pfreixes@gmail.com>
Co-authored-by: Ethan Hur <ethan0311@gmail.com>
Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com>
Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com>
Co-authored-by: ffenix113 <razerer@bigmir.net>
Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com>
Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com>
Co-authored-by: Eng Zer Jun <engzerjun@gmail.com>
Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com>
Co-authored-by: Nelz <nelz9999@users.noreply.github.com>
Co-authored-by: Daniel Richter <Nexyz9@gmail.com>
Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com>
Co-authored-by: lintanghui <lintanghui@bilibili.com>
Co-authored-by: hidu <duv123+github@gmail.com>
Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com>
Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
|
|
|
if isBadConn(err, allowTimeout, c.opt.Addr) {
|
2020-03-11 17:26:42 +03:00
|
|
|
c.reconnect(ctx, err)
|
2017-08-01 14:21:26 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) reconnect(ctx context.Context, reason error) {
|
2019-06-17 12:32:40 +03:00
|
|
|
_ = c.closeTheCn(reason)
|
2020-03-11 17:26:42 +03:00
|
|
|
_, _ = c.conn(ctx, nil)
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
|
2019-06-17 12:32:40 +03:00
|
|
|
func (c *PubSub) closeTheCn(reason error) error {
|
2018-08-07 10:33:07 +03:00
|
|
|
if c.cn == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if !c.closed {
|
2020-07-18 09:04:36 +03:00
|
|
|
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason)
|
2018-08-07 10:33:07 +03:00
|
|
|
}
|
|
|
|
err := c.closeConn(c.cn)
|
|
|
|
c.cn = nil
|
|
|
|
return err
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
|
2017-04-24 12:43:15 +03:00
|
|
|
func (c *PubSub) Close() error {
|
2017-04-17 15:43:58 +03:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
if c.closed {
|
2017-04-24 12:43:15 +03:00
|
|
|
return pool.ErrClosed
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
2017-04-24 12:43:15 +03:00
|
|
|
c.closed = true
|
2018-07-23 15:55:13 +03:00
|
|
|
close(c.exit)
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2019-06-17 12:32:40 +03:00
|
|
|
return c.closeTheCn(pool.ErrClosed)
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// Subscribe the client to the specified channels. It returns
|
2017-05-11 17:02:26 +03:00
|
|
|
// empty subscription if there are no channels.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Subscribe(ctx context.Context, channels ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "subscribe", channels...)
|
2018-03-14 13:42:51 +03:00
|
|
|
if c.channels == nil {
|
|
|
|
c.channels = make(map[string]struct{})
|
|
|
|
}
|
2018-08-04 12:19:19 +03:00
|
|
|
for _, s := range channels {
|
|
|
|
c.channels[s] = struct{}{}
|
2018-03-14 13:42:51 +03:00
|
|
|
}
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// PSubscribe the client to the given patterns. It returns
|
2017-05-11 17:02:26 +03:00
|
|
|
// empty subscription if there are no patterns.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) PSubscribe(ctx context.Context, patterns ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "psubscribe", patterns...)
|
2018-03-14 13:42:51 +03:00
|
|
|
if c.patterns == nil {
|
|
|
|
c.patterns = make(map[string]struct{})
|
|
|
|
}
|
2018-08-04 12:19:19 +03:00
|
|
|
for _, s := range patterns {
|
|
|
|
c.patterns[s] = struct{}{}
|
2018-03-14 13:42:51 +03:00
|
|
|
}
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// Unsubscribe the client from the given channels, or from all of
|
2015-09-06 13:50:16 +03:00
|
|
|
// them if none is given.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2018-03-14 13:42:51 +03:00
|
|
|
for _, channel := range channels {
|
|
|
|
delete(c.channels, channel)
|
|
|
|
}
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "unsubscribe", channels...)
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// PUnsubscribe the client from the given patterns, or from all of
|
2015-09-06 13:50:16 +03:00
|
|
|
// them if none is given.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2018-03-14 13:42:51 +03:00
|
|
|
for _, pattern := range patterns {
|
|
|
|
delete(c.patterns, pattern)
|
|
|
|
}
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "punsubscribe", patterns...)
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) subscribe(ctx context.Context, redisCmd string, channels ...string) error {
|
|
|
|
cn, err := c.conn(ctx, channels)
|
2017-06-29 16:53:49 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err = c._subscribe(ctx, cn, redisCmd, channels)
|
|
|
|
c.releaseConn(ctx, cn, err, false)
|
2017-06-29 16:53:49 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Ping(ctx context.Context, payload ...string) error {
|
2017-04-17 15:43:58 +03:00
|
|
|
args := []interface{}{"ping"}
|
2017-02-23 16:29:38 +03:00
|
|
|
if len(payload) == 1 {
|
|
|
|
args = append(args, payload[0])
|
2015-07-11 13:12:47 +03:00
|
|
|
}
|
2020-03-11 17:26:42 +03:00
|
|
|
cmd := NewCmd(ctx, args...)
|
2016-12-03 18:30:13 +03:00
|
|
|
|
chore: sync master (#2051)
* Upgrade redis-server version (#1833)
* Upgrade redis-server version
Signed-off-by: monkey <golang@88.com>
* XAutoClaim changed the return value
Signed-off-by: monkey <golang@88.com>
* add cmd: geosearch, geosearchstore (#1836)
* add cmd: geosearch, geosearchstore
Signed-off-by: monkey92t <golang@88.com>
* GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing
Signed-off-by: monkey92t <golang@88.com>
* Added missing method XInfoStreamFull to Cmdable interface
* Run go mod tidy in redisotel
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
* Revert "ConnPool check fd for bad conns (#1824)" (#1849)
This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e.
* Automate release process (#1852)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Upgrade gomega to v1.15.0
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: monkey92t <golang@88.com>
* Add version.go
* Fix otel example
* Fix package name in release script
* More fixes for otel example
* And more
* Fix release.sh
* Release v8.11.3 (release.sh)
* Create an annotated tag to give release.yml chance to run
* Tweak tag.sh
* Add Cmd.Slice helper to cast to []interface{} (#1859)
* after the connection pool is closed, no new connections should be added (#1863)
* after the connection pool is closed, no new connections should be added
Signed-off-by: monkey92t <golang@88.com>
* remove runGoroutine
Signed-off-by: monkey92t <golang@88.com>
* pool.popIdle add p.closed check
Signed-off-by: monkey92t <golang@88.com>
* upgrade golangci-lint v1.42.0
Signed-off-by: monkey92t <golang@88.com>
* Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865)
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Add go 1.17 to the build matrix
* Remove go 1.15 from build matrix
* Add scan struct example (#1870)
* Replace release job
* Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872)
Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2.
- [Release notes](https://github.com/cespare/xxhash/releases)
- [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2)
---
updated-dependencies:
- dependency-name: github.com/cespare/xxhash/v2
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix tag script to push tag by tag
* Fix releasing.md
* Fix/pubsub ping mutex (#1878)
* Fix PubSub.Ping to hold the lock
* Fix PubSub.Ping to hold the lock
* add write cmd data-race test
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: monkey92t <golang@88.com>
* chore: cleanup OpenTelemetry example
* chore: gofmt all code
* Refactor TestParseURL
This is in preparation for supporting query parameters
in ParseURL:
- use an expected *Options instance to execute assertions on
- extract assertions into helper function
- enable parallel testing
- condense test table
* Add query parameter parsing to ParseURL()
Before this change, ParseURL would only accept a very restricted
set of URLs (it returned an error, if it encountered any parameter).
This commit introduces the ability to process URLs like
redis://localhost/1?dial_timeout=10s
and similar.
Go programs which were providing a configuration tunable (e.g.
CLI flag, config entry or environment variable) to configure
the Redis connection now don't need to perform this task
themselves.
* chore: add links to readme
* chore: fix discussions link
* empty hooks.withContext removed
* chore: gofmt
* chore: use conventional commits and auto-generate changelog
* feat: add acl auth support for sentinels
* chore: swap to acl auth at the test-level
* Add support for BLMove command
* chore: update dependencies
* chore: update link
* feat: add SetVal method for each command
* feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example
* chore: tweak GH actions to run all jobs
* chore: add Lua scripting example
* Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914)
* fix: recycle connections in some Redis Cluster scenarios
This issue was surfaced in a Cloud Provider solution that used for
rolling out new nodes using the same address (hostname) of the nodes
that will be replaced in a Redis Cluster, while the former ones once
depromoted as Slaves would continue in service during some mintues
for redirecting traffic.
The solution basically identifies when the connection could be stale
since a MOVED response will be returned using the same address (hostname)
that is being used by the connection. At that moment we consider the
connection as no longer usable forcing to recycle the connection.
* chore: lazy reload when moved or ask
* chore: use conv commit message
* chore: release v8.11.4 (release.sh)
* fix: add whitespace for avoid unlikely colisions
* fix: format
* chore: fix links
* chore: use ctx parameter in cmdInfo
* Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925)
Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5)
---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* feat: add support for time.Duration write and scan
* test: add test case for setting and scanning durations
* chore: fix linter
* fix(extra/redisotel): set span.kind attribute to client
According to the opentelemetry specification this should always be set to client for database client
libraries.
I've also removed the SetAttributes call and instead set the attributes during creation of the span.
This is what the library SHOULD be doing according to the opentelemetry api specification.
* chore: update otel example
* fix: update some argument counts in pre-allocs
In some cases number of pre-allocated places in
argument array is missing 1 or 2 elements,
which results in re-allocation of twice as large array
* chore: add example how to delete keys without a ttl
* chore: don't enable all lints
* chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* feat: Add redis v7's NX, XX, GT, LT expire variants
* chore: add missing readme
* chore: tweak feature links
* chore: remove Discord
* fix: set timeout for WAIT command. Fixes #1963
* build: update `go` directive in `go.mod` to 1.17
This commit enables support for module graph pruning and lazy module
loading for projects that are at Go 1.17 or higher.
Reference: https://go.dev/ref/mod#go-mod-file-go
Reference: https://go.dev/ref/mod#graph-pruning
Reference: https://go.dev/ref/mod#lazy-loading
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
* chore: update link
* chore: export cmder.SetFirstKeyPos to support build module commands
* feat(redisotel): ability to override TracerProvider (#1998)
* fix: add missing Expire methods to Cmdable
This is a followup to https://github.com/go-redis/redis/pull/1928
* chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Update README.md (#2011)
chore: add fmt library in example code
* chore: instrumentation name and version (#2012)
* fix: invalid type assert in stringArg
* chore: cleanup
* fix: example/otel compile error (#2028)
* fix: rename Golang to Go (#2030)
https://go.dev/doc/faq#go_or_golang
* feat: add support for passing extra attributes added to spans
* feat: set net.peer.name and net.peer.port in otel example
* chore: tweak Uptrace copy
* feat: add support for COPY command (#2016)
* feat: add support for acl sentinel auth in universal client
* chore(deps): bump actions/checkout from 2 to 3
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)
---
updated-dependencies:
- dependency-name: actions/checkout
dependency-type: direct:production
update-type: version-update:semver-major
...
Signed-off-by: dependabot[bot] <support@github.com>
* chore: add hll example
* chore: tweak release script
* chore: release v8.11.5 (release.sh)
* chore: add discord back
Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com>
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Kishan B <kishancs46@gmail.com>
Co-authored-by: Dominik Menke <dom@digineo.de>
Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com>
Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co>
Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com>
Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com>
Co-authored-by: Pau Freixes <pfreixes@gmail.com>
Co-authored-by: Ethan Hur <ethan0311@gmail.com>
Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com>
Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com>
Co-authored-by: ffenix113 <razerer@bigmir.net>
Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com>
Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com>
Co-authored-by: Eng Zer Jun <engzerjun@gmail.com>
Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com>
Co-authored-by: Nelz <nelz9999@users.noreply.github.com>
Co-authored-by: Daniel Richter <Nexyz9@gmail.com>
Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com>
Co-authored-by: lintanghui <lintanghui@bilibili.com>
Co-authored-by: hidu <duv123+github@gmail.com>
Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com>
Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
cn, err := c.conn(ctx, nil)
|
2016-12-03 18:30:13 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err = c.writeCmd(ctx, cn, cmd)
|
chore: sync master (#2051)
* Upgrade redis-server version (#1833)
* Upgrade redis-server version
Signed-off-by: monkey <golang@88.com>
* XAutoClaim changed the return value
Signed-off-by: monkey <golang@88.com>
* add cmd: geosearch, geosearchstore (#1836)
* add cmd: geosearch, geosearchstore
Signed-off-by: monkey92t <golang@88.com>
* GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing
Signed-off-by: monkey92t <golang@88.com>
* Added missing method XInfoStreamFull to Cmdable interface
* Run go mod tidy in redisotel
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
* Revert "ConnPool check fd for bad conns (#1824)" (#1849)
This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e.
* Automate release process (#1852)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Upgrade gomega to v1.15.0
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: monkey92t <golang@88.com>
* Add version.go
* Fix otel example
* Fix package name in release script
* More fixes for otel example
* And more
* Fix release.sh
* Release v8.11.3 (release.sh)
* Create an annotated tag to give release.yml chance to run
* Tweak tag.sh
* Add Cmd.Slice helper to cast to []interface{} (#1859)
* after the connection pool is closed, no new connections should be added (#1863)
* after the connection pool is closed, no new connections should be added
Signed-off-by: monkey92t <golang@88.com>
* remove runGoroutine
Signed-off-by: monkey92t <golang@88.com>
* pool.popIdle add p.closed check
Signed-off-by: monkey92t <golang@88.com>
* upgrade golangci-lint v1.42.0
Signed-off-by: monkey92t <golang@88.com>
* Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865)
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Add go 1.17 to the build matrix
* Remove go 1.15 from build matrix
* Add scan struct example (#1870)
* Replace release job
* Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872)
Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2.
- [Release notes](https://github.com/cespare/xxhash/releases)
- [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2)
---
updated-dependencies:
- dependency-name: github.com/cespare/xxhash/v2
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix tag script to push tag by tag
* Fix releasing.md
* Fix/pubsub ping mutex (#1878)
* Fix PubSub.Ping to hold the lock
* Fix PubSub.Ping to hold the lock
* add write cmd data-race test
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: monkey92t <golang@88.com>
* chore: cleanup OpenTelemetry example
* chore: gofmt all code
* Refactor TestParseURL
This is in preparation for supporting query parameters
in ParseURL:
- use an expected *Options instance to execute assertions on
- extract assertions into helper function
- enable parallel testing
- condense test table
* Add query parameter parsing to ParseURL()
Before this change, ParseURL would only accept a very restricted
set of URLs (it returned an error, if it encountered any parameter).
This commit introduces the ability to process URLs like
redis://localhost/1?dial_timeout=10s
and similar.
Go programs which were providing a configuration tunable (e.g.
CLI flag, config entry or environment variable) to configure
the Redis connection now don't need to perform this task
themselves.
* chore: add links to readme
* chore: fix discussions link
* empty hooks.withContext removed
* chore: gofmt
* chore: use conventional commits and auto-generate changelog
* feat: add acl auth support for sentinels
* chore: swap to acl auth at the test-level
* Add support for BLMove command
* chore: update dependencies
* chore: update link
* feat: add SetVal method for each command
* feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example
* chore: tweak GH actions to run all jobs
* chore: add Lua scripting example
* Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914)
* fix: recycle connections in some Redis Cluster scenarios
This issue was surfaced in a Cloud Provider solution that used for
rolling out new nodes using the same address (hostname) of the nodes
that will be replaced in a Redis Cluster, while the former ones once
depromoted as Slaves would continue in service during some mintues
for redirecting traffic.
The solution basically identifies when the connection could be stale
since a MOVED response will be returned using the same address (hostname)
that is being used by the connection. At that moment we consider the
connection as no longer usable forcing to recycle the connection.
* chore: lazy reload when moved or ask
* chore: use conv commit message
* chore: release v8.11.4 (release.sh)
* fix: add whitespace for avoid unlikely colisions
* fix: format
* chore: fix links
* chore: use ctx parameter in cmdInfo
* Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925)
Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5)
---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* feat: add support for time.Duration write and scan
* test: add test case for setting and scanning durations
* chore: fix linter
* fix(extra/redisotel): set span.kind attribute to client
According to the opentelemetry specification this should always be set to client for database client
libraries.
I've also removed the SetAttributes call and instead set the attributes during creation of the span.
This is what the library SHOULD be doing according to the opentelemetry api specification.
* chore: update otel example
* fix: update some argument counts in pre-allocs
In some cases number of pre-allocated places in
argument array is missing 1 or 2 elements,
which results in re-allocation of twice as large array
* chore: add example how to delete keys without a ttl
* chore: don't enable all lints
* chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* feat: Add redis v7's NX, XX, GT, LT expire variants
* chore: add missing readme
* chore: tweak feature links
* chore: remove Discord
* fix: set timeout for WAIT command. Fixes #1963
* build: update `go` directive in `go.mod` to 1.17
This commit enables support for module graph pruning and lazy module
loading for projects that are at Go 1.17 or higher.
Reference: https://go.dev/ref/mod#go-mod-file-go
Reference: https://go.dev/ref/mod#graph-pruning
Reference: https://go.dev/ref/mod#lazy-loading
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
* chore: update link
* chore: export cmder.SetFirstKeyPos to support build module commands
* feat(redisotel): ability to override TracerProvider (#1998)
* fix: add missing Expire methods to Cmdable
This is a followup to https://github.com/go-redis/redis/pull/1928
* chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Update README.md (#2011)
chore: add fmt library in example code
* chore: instrumentation name and version (#2012)
* fix: invalid type assert in stringArg
* chore: cleanup
* fix: example/otel compile error (#2028)
* fix: rename Golang to Go (#2030)
https://go.dev/doc/faq#go_or_golang
* feat: add support for passing extra attributes added to spans
* feat: set net.peer.name and net.peer.port in otel example
* chore: tweak Uptrace copy
* feat: add support for COPY command (#2016)
* feat: add support for acl sentinel auth in universal client
* chore(deps): bump actions/checkout from 2 to 3
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)
---
updated-dependencies:
- dependency-name: actions/checkout
dependency-type: direct:production
update-type: version-update:semver-major
...
Signed-off-by: dependabot[bot] <support@github.com>
* chore: add hll example
* chore: tweak release script
* chore: release v8.11.5 (release.sh)
* chore: add discord back
Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com>
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Kishan B <kishancs46@gmail.com>
Co-authored-by: Dominik Menke <dom@digineo.de>
Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com>
Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co>
Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com>
Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com>
Co-authored-by: Pau Freixes <pfreixes@gmail.com>
Co-authored-by: Ethan Hur <ethan0311@gmail.com>
Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com>
Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com>
Co-authored-by: ffenix113 <razerer@bigmir.net>
Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com>
Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com>
Co-authored-by: Eng Zer Jun <engzerjun@gmail.com>
Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com>
Co-authored-by: Nelz <nelz9999@users.noreply.github.com>
Co-authored-by: Daniel Richter <Nexyz9@gmail.com>
Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com>
Co-authored-by: lintanghui <lintanghui@bilibili.com>
Co-authored-by: hidu <duv123+github@gmail.com>
Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com>
Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
|
|
|
c.releaseConn(ctx, cn, err, false)
|
2016-12-03 18:30:13 +03:00
|
|
|
return err
|
2015-07-11 13:12:47 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// Subscription received after a successful subscription to channel.
|
2015-07-11 13:42:44 +03:00
|
|
|
type Subscription struct {
|
|
|
|
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
|
|
|
|
Kind string
|
|
|
|
// Channel name we have subscribed to.
|
|
|
|
Channel string
|
|
|
|
// Number of channels we are currently subscribed to.
|
|
|
|
Count int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Subscription) String() string {
|
|
|
|
return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
|
|
|
|
}
|
|
|
|
|
2015-05-23 18:17:45 +03:00
|
|
|
// Message received as result of a PUBLISH command issued by another client.
|
2012-07-25 17:00:50 +04:00
|
|
|
type Message struct {
|
2020-08-21 12:19:31 +03:00
|
|
|
Channel string
|
|
|
|
Pattern string
|
|
|
|
Payload string
|
|
|
|
PayloadSlice []string
|
2014-05-11 11:42:40 +04:00
|
|
|
}
|
2012-07-25 17:00:50 +04:00
|
|
|
|
2014-05-11 18:11:55 +04:00
|
|
|
func (m *Message) String() string {
|
|
|
|
return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
|
|
|
|
}
|
|
|
|
|
2015-07-11 13:12:47 +03:00
|
|
|
// Pong received as result of a PING command issued by another client.
|
|
|
|
type Pong struct {
|
|
|
|
Payload string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pong) String() string {
|
|
|
|
if p.Payload != "" {
|
|
|
|
return fmt.Sprintf("Pong<%s>", p.Payload)
|
|
|
|
}
|
|
|
|
return "Pong"
|
|
|
|
}
|
|
|
|
|
2017-02-08 12:24:09 +03:00
|
|
|
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
|
|
|
switch reply := reply.(type) {
|
|
|
|
case string:
|
2015-07-11 13:12:47 +03:00
|
|
|
return &Pong{
|
2017-02-08 12:24:09 +03:00
|
|
|
Payload: reply,
|
2015-07-11 13:12:47 +03:00
|
|
|
}, nil
|
2017-02-08 12:24:09 +03:00
|
|
|
case []interface{}:
|
|
|
|
switch kind := reply[0].(string); kind {
|
|
|
|
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
|
2020-05-21 09:10:43 +03:00
|
|
|
// Can be nil in case of "unsubscribe".
|
|
|
|
channel, _ := reply[1].(string)
|
2017-02-08 12:24:09 +03:00
|
|
|
return &Subscription{
|
|
|
|
Kind: kind,
|
2020-05-21 09:10:43 +03:00
|
|
|
Channel: channel,
|
2017-02-08 12:24:09 +03:00
|
|
|
Count: int(reply[2].(int64)),
|
|
|
|
}, nil
|
|
|
|
case "message":
|
2020-08-21 12:19:31 +03:00
|
|
|
switch payload := reply[2].(type) {
|
|
|
|
case string:
|
|
|
|
return &Message{
|
|
|
|
Channel: reply[1].(string),
|
|
|
|
Payload: payload,
|
|
|
|
}, nil
|
|
|
|
case []interface{}:
|
|
|
|
ss := make([]string, len(payload))
|
|
|
|
for i, s := range payload {
|
|
|
|
ss[i] = s.(string)
|
|
|
|
}
|
|
|
|
return &Message{
|
|
|
|
Channel: reply[1].(string),
|
|
|
|
PayloadSlice: ss,
|
|
|
|
}, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload)
|
|
|
|
}
|
2017-02-08 12:24:09 +03:00
|
|
|
case "pmessage":
|
|
|
|
return &Message{
|
|
|
|
Pattern: reply[1].(string),
|
|
|
|
Channel: reply[2].(string),
|
|
|
|
Payload: reply[3].(string),
|
|
|
|
}, nil
|
|
|
|
case "pong":
|
|
|
|
return &Pong{
|
|
|
|
Payload: reply[1].(string),
|
|
|
|
}, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
|
|
|
}
|
2015-07-11 13:12:47 +03:00
|
|
|
default:
|
2017-02-08 12:24:09 +03:00
|
|
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
|
2015-07-11 13:12:47 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-07-11 13:42:44 +03:00
|
|
|
// ReceiveTimeout acts like Receive but returns an error if message
|
2018-07-24 09:41:14 +03:00
|
|
|
// is not received in time. This is low-level API and in most cases
|
|
|
|
// Channel should be used instead.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error) {
|
2017-02-08 12:24:09 +03:00
|
|
|
if c.cmd == nil {
|
2020-03-11 17:26:42 +03:00
|
|
|
c.cmd = NewCmd(ctx)
|
2017-02-08 12:24:09 +03:00
|
|
|
}
|
2016-12-03 18:30:13 +03:00
|
|
|
|
chore: sync master (#2051)
* Upgrade redis-server version (#1833)
* Upgrade redis-server version
Signed-off-by: monkey <golang@88.com>
* XAutoClaim changed the return value
Signed-off-by: monkey <golang@88.com>
* add cmd: geosearch, geosearchstore (#1836)
* add cmd: geosearch, geosearchstore
Signed-off-by: monkey92t <golang@88.com>
* GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing
Signed-off-by: monkey92t <golang@88.com>
* Added missing method XInfoStreamFull to Cmdable interface
* Run go mod tidy in redisotel
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
* Revert "ConnPool check fd for bad conns (#1824)" (#1849)
This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e.
* Automate release process (#1852)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Upgrade gomega to v1.15.0
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: monkey92t <golang@88.com>
* Add version.go
* Fix otel example
* Fix package name in release script
* More fixes for otel example
* And more
* Fix release.sh
* Release v8.11.3 (release.sh)
* Create an annotated tag to give release.yml chance to run
* Tweak tag.sh
* Add Cmd.Slice helper to cast to []interface{} (#1859)
* after the connection pool is closed, no new connections should be added (#1863)
* after the connection pool is closed, no new connections should be added
Signed-off-by: monkey92t <golang@88.com>
* remove runGoroutine
Signed-off-by: monkey92t <golang@88.com>
* pool.popIdle add p.closed check
Signed-off-by: monkey92t <golang@88.com>
* upgrade golangci-lint v1.42.0
Signed-off-by: monkey92t <golang@88.com>
* Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865)
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Add go 1.17 to the build matrix
* Remove go 1.15 from build matrix
* Add scan struct example (#1870)
* Replace release job
* Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872)
Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2.
- [Release notes](https://github.com/cespare/xxhash/releases)
- [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2)
---
updated-dependencies:
- dependency-name: github.com/cespare/xxhash/v2
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix tag script to push tag by tag
* Fix releasing.md
* Fix/pubsub ping mutex (#1878)
* Fix PubSub.Ping to hold the lock
* Fix PubSub.Ping to hold the lock
* add write cmd data-race test
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: monkey92t <golang@88.com>
* chore: cleanup OpenTelemetry example
* chore: gofmt all code
* Refactor TestParseURL
This is in preparation for supporting query parameters
in ParseURL:
- use an expected *Options instance to execute assertions on
- extract assertions into helper function
- enable parallel testing
- condense test table
* Add query parameter parsing to ParseURL()
Before this change, ParseURL would only accept a very restricted
set of URLs (it returned an error, if it encountered any parameter).
This commit introduces the ability to process URLs like
redis://localhost/1?dial_timeout=10s
and similar.
Go programs which were providing a configuration tunable (e.g.
CLI flag, config entry or environment variable) to configure
the Redis connection now don't need to perform this task
themselves.
* chore: add links to readme
* chore: fix discussions link
* empty hooks.withContext removed
* chore: gofmt
* chore: use conventional commits and auto-generate changelog
* feat: add acl auth support for sentinels
* chore: swap to acl auth at the test-level
* Add support for BLMove command
* chore: update dependencies
* chore: update link
* feat: add SetVal method for each command
* feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example
* chore: tweak GH actions to run all jobs
* chore: add Lua scripting example
* Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914)
* fix: recycle connections in some Redis Cluster scenarios
This issue was surfaced in a Cloud Provider solution that used for
rolling out new nodes using the same address (hostname) of the nodes
that will be replaced in a Redis Cluster, while the former ones once
depromoted as Slaves would continue in service during some mintues
for redirecting traffic.
The solution basically identifies when the connection could be stale
since a MOVED response will be returned using the same address (hostname)
that is being used by the connection. At that moment we consider the
connection as no longer usable forcing to recycle the connection.
* chore: lazy reload when moved or ask
* chore: use conv commit message
* chore: release v8.11.4 (release.sh)
* fix: add whitespace for avoid unlikely colisions
* fix: format
* chore: fix links
* chore: use ctx parameter in cmdInfo
* Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925)
Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5)
---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* feat: add support for time.Duration write and scan
* test: add test case for setting and scanning durations
* chore: fix linter
* fix(extra/redisotel): set span.kind attribute to client
According to the opentelemetry specification this should always be set to client for database client
libraries.
I've also removed the SetAttributes call and instead set the attributes during creation of the span.
This is what the library SHOULD be doing according to the opentelemetry api specification.
* chore: update otel example
* fix: update some argument counts in pre-allocs
In some cases number of pre-allocated places in
argument array is missing 1 or 2 elements,
which results in re-allocation of twice as large array
* chore: add example how to delete keys without a ttl
* chore: don't enable all lints
* chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* feat: Add redis v7's NX, XX, GT, LT expire variants
* chore: add missing readme
* chore: tweak feature links
* chore: remove Discord
* fix: set timeout for WAIT command. Fixes #1963
* build: update `go` directive in `go.mod` to 1.17
This commit enables support for module graph pruning and lazy module
loading for projects that are at Go 1.17 or higher.
Reference: https://go.dev/ref/mod#go-mod-file-go
Reference: https://go.dev/ref/mod#graph-pruning
Reference: https://go.dev/ref/mod#lazy-loading
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
* chore: update link
* chore: export cmder.SetFirstKeyPos to support build module commands
* feat(redisotel): ability to override TracerProvider (#1998)
* fix: add missing Expire methods to Cmdable
This is a followup to https://github.com/go-redis/redis/pull/1928
* chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Update README.md (#2011)
chore: add fmt library in example code
* chore: instrumentation name and version (#2012)
* fix: invalid type assert in stringArg
* chore: cleanup
* fix: example/otel compile error (#2028)
* fix: rename Golang to Go (#2030)
https://go.dev/doc/faq#go_or_golang
* feat: add support for passing extra attributes added to spans
* feat: set net.peer.name and net.peer.port in otel example
* chore: tweak Uptrace copy
* feat: add support for COPY command (#2016)
* feat: add support for acl sentinel auth in universal client
* chore(deps): bump actions/checkout from 2 to 3
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)
---
updated-dependencies:
- dependency-name: actions/checkout
dependency-type: direct:production
update-type: version-update:semver-major
...
Signed-off-by: dependabot[bot] <support@github.com>
* chore: add hll example
* chore: tweak release script
* chore: release v8.11.5 (release.sh)
* chore: add discord back
Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com>
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Kishan B <kishancs46@gmail.com>
Co-authored-by: Dominik Menke <dom@digineo.de>
Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com>
Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co>
Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com>
Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com>
Co-authored-by: Pau Freixes <pfreixes@gmail.com>
Co-authored-by: Ethan Hur <ethan0311@gmail.com>
Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com>
Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com>
Co-authored-by: ffenix113 <razerer@bigmir.net>
Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com>
Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com>
Co-authored-by: Eng Zer Jun <engzerjun@gmail.com>
Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com>
Co-authored-by: Nelz <nelz9999@users.noreply.github.com>
Co-authored-by: Daniel Richter <Nexyz9@gmail.com>
Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com>
Co-authored-by: lintanghui <lintanghui@bilibili.com>
Co-authored-by: hidu <duv123+github@gmail.com>
Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com>
Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
|
|
|
// Don't hold the lock to allow subscriptions and pings.
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
cn, err := c.connWithLock(ctx)
|
2015-07-11 13:12:47 +03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2014-05-11 11:42:40 +04:00
|
|
|
}
|
2015-05-10 15:33:04 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error {
|
2018-08-15 11:53:15 +03:00
|
|
|
return c.cmd.readReply(rd)
|
|
|
|
})
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
|
chore: sync master (#2051)
* Upgrade redis-server version (#1833)
* Upgrade redis-server version
Signed-off-by: monkey <golang@88.com>
* XAutoClaim changed the return value
Signed-off-by: monkey <golang@88.com>
* add cmd: geosearch, geosearchstore (#1836)
* add cmd: geosearch, geosearchstore
Signed-off-by: monkey92t <golang@88.com>
* GeoSearchQuery and GeoSearchLocationQuery changed to pointer passing
Signed-off-by: monkey92t <golang@88.com>
* Added missing method XInfoStreamFull to Cmdable interface
* Run go mod tidy in redisotel
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
* Revert "ConnPool check fd for bad conns (#1824)" (#1849)
This reverts commit 346bfafddd36dd52d51b064033048de5552ee91e.
* Automate release process (#1852)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0 (#1832)
* Bump github.com/onsi/gomega from 1.10.5 to 1.14.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.10.5 to 1.14.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.10.5...v1.14.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Upgrade gomega to v1.15.0
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: monkey92t <golang@88.com>
* Add version.go
* Fix otel example
* Fix package name in release script
* More fixes for otel example
* And more
* Fix release.sh
* Release v8.11.3 (release.sh)
* Create an annotated tag to give release.yml chance to run
* Tweak tag.sh
* Add Cmd.Slice helper to cast to []interface{} (#1859)
* after the connection pool is closed, no new connections should be added (#1863)
* after the connection pool is closed, no new connections should be added
Signed-off-by: monkey92t <golang@88.com>
* remove runGoroutine
Signed-off-by: monkey92t <golang@88.com>
* pool.popIdle add p.closed check
Signed-off-by: monkey92t <golang@88.com>
* upgrade golangci-lint v1.42.0
Signed-off-by: monkey92t <golang@88.com>
* Bump github.com/onsi/gomega from 1.15.0 to 1.16.0 (#1865)
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.15.0...v1.16.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Add go 1.17 to the build matrix
* Remove go 1.15 from build matrix
* Add scan struct example (#1870)
* Replace release job
* Bump github.com/cespare/xxhash/v2 from 2.1.1 to 2.1.2 (#1872)
Bumps [github.com/cespare/xxhash/v2](https://github.com/cespare/xxhash) from 2.1.1 to 2.1.2.
- [Release notes](https://github.com/cespare/xxhash/releases)
- [Commits](https://github.com/cespare/xxhash/compare/v2.1.1...v2.1.2)
---
updated-dependencies:
- dependency-name: github.com/cespare/xxhash/v2
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix tag script to push tag by tag
* Fix releasing.md
* Fix/pubsub ping mutex (#1878)
* Fix PubSub.Ping to hold the lock
* Fix PubSub.Ping to hold the lock
* add write cmd data-race test
Signed-off-by: monkey92t <golang@88.com>
Co-authored-by: monkey92t <golang@88.com>
* chore: cleanup OpenTelemetry example
* chore: gofmt all code
* Refactor TestParseURL
This is in preparation for supporting query parameters
in ParseURL:
- use an expected *Options instance to execute assertions on
- extract assertions into helper function
- enable parallel testing
- condense test table
* Add query parameter parsing to ParseURL()
Before this change, ParseURL would only accept a very restricted
set of URLs (it returned an error, if it encountered any parameter).
This commit introduces the ability to process URLs like
redis://localhost/1?dial_timeout=10s
and similar.
Go programs which were providing a configuration tunable (e.g.
CLI flag, config entry or environment variable) to configure
the Redis connection now don't need to perform this task
themselves.
* chore: add links to readme
* chore: fix discussions link
* empty hooks.withContext removed
* chore: gofmt
* chore: use conventional commits and auto-generate changelog
* feat: add acl auth support for sentinels
* chore: swap to acl auth at the test-level
* Add support for BLMove command
* chore: update dependencies
* chore: update link
* feat: add SetVal method for each command
* feat: add Cmd.{String,Int,Float,Bool}Slice helpers and an example
* chore: tweak GH actions to run all jobs
* chore: add Lua scripting example
* Fix Redis Cluster issue during roll outs of new nodes with same addr (#1914)
* fix: recycle connections in some Redis Cluster scenarios
This issue was surfaced in a Cloud Provider solution that used for
rolling out new nodes using the same address (hostname) of the nodes
that will be replaced in a Redis Cluster, while the former ones once
depromoted as Slaves would continue in service during some mintues
for redirecting traffic.
The solution basically identifies when the connection could be stale
since a MOVED response will be returned using the same address (hostname)
that is being used by the connection. At that moment we consider the
connection as no longer usable forcing to recycle the connection.
* chore: lazy reload when moved or ask
* chore: use conv commit message
* chore: release v8.11.4 (release.sh)
* fix: add whitespace for avoid unlikely colisions
* fix: format
* chore: fix links
* chore: use ctx parameter in cmdInfo
* Bump github.com/onsi/ginkgo from 1.16.4 to 1.16.5 (#1925)
Bumps [github.com/onsi/ginkgo](https://github.com/onsi/ginkgo) from 1.16.4 to 1.16.5.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v1.16.4...v1.16.5)
---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* feat: add support for time.Duration write and scan
* test: add test case for setting and scanning durations
* chore: fix linter
* fix(extra/redisotel): set span.kind attribute to client
According to the opentelemetry specification this should always be set to client for database client
libraries.
I've also removed the SetAttributes call and instead set the attributes during creation of the span.
This is what the library SHOULD be doing according to the opentelemetry api specification.
* chore: update otel example
* fix: update some argument counts in pre-allocs
In some cases number of pre-allocated places in
argument array is missing 1 or 2 elements,
which results in re-allocation of twice as large array
* chore: add example how to delete keys without a ttl
* chore: don't enable all lints
* chore(deps): bump github.com/onsi/gomega from 1.16.0 to 1.17.0
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.16.0 to 1.17.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.16.0...v1.17.0)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* feat: Add redis v7's NX, XX, GT, LT expire variants
* chore: add missing readme
* chore: tweak feature links
* chore: remove Discord
* fix: set timeout for WAIT command. Fixes #1963
* build: update `go` directive in `go.mod` to 1.17
This commit enables support for module graph pruning and lazy module
loading for projects that are at Go 1.17 or higher.
Reference: https://go.dev/ref/mod#go-mod-file-go
Reference: https://go.dev/ref/mod#graph-pruning
Reference: https://go.dev/ref/mod#lazy-loading
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
* chore: update link
* chore: export cmder.SetFirstKeyPos to support build module commands
* feat(redisotel): ability to override TracerProvider (#1998)
* fix: add missing Expire methods to Cmdable
This is a followup to https://github.com/go-redis/redis/pull/1928
* chore(deps): bump github.com/onsi/gomega from 1.17.0 to 1.18.1
Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.17.0 to 1.18.1.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/gomega/compare/v1.17.0...v1.18.1)
---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* Update README.md (#2011)
chore: add fmt library in example code
* chore: instrumentation name and version (#2012)
* fix: invalid type assert in stringArg
* chore: cleanup
* fix: example/otel compile error (#2028)
* fix: rename Golang to Go (#2030)
https://go.dev/doc/faq#go_or_golang
* feat: add support for passing extra attributes added to spans
* feat: set net.peer.name and net.peer.port in otel example
* chore: tweak Uptrace copy
* feat: add support for COPY command (#2016)
* feat: add support for acl sentinel auth in universal client
* chore(deps): bump actions/checkout from 2 to 3
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)
---
updated-dependencies:
- dependency-name: actions/checkout
dependency-type: direct:production
update-type: version-update:semver-major
...
Signed-off-by: dependabot[bot] <support@github.com>
* chore: add hll example
* chore: tweak release script
* chore: release v8.11.5 (release.sh)
* chore: add discord back
Co-authored-by: Eugene Ponizovsky <ponizovsky@gmail.com>
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Kishan B <kishancs46@gmail.com>
Co-authored-by: Dominik Menke <dom@digineo.de>
Co-authored-by: Gökhan Özeloğlu <gozeloglu@gmail.com>
Co-authored-by: Justin Sievenpiper <justin@sievenpiper.co>
Co-authored-by: Алексей Романовский <aromanovsky@epiphan.com>
Co-authored-by: Stavros Panakakakis <stavrospanakakis@gmail.com>
Co-authored-by: Pau Freixes <pfreixes@gmail.com>
Co-authored-by: Ethan Hur <ethan0311@gmail.com>
Co-authored-by: Jackie <18378976+Pyrodash@users.noreply.github.com>
Co-authored-by: Kristinn Björgvin Árdal <kristinnardalsecondary@gmail.com>
Co-authored-by: ffenix113 <razerer@bigmir.net>
Co-authored-by: Bastien Penavayre <bastienPenava@gmail.com>
Co-authored-by: James3 Li(李麒傑) <james3_li@asus.com>
Co-authored-by: Eng Zer Jun <engzerjun@gmail.com>
Co-authored-by: gzjiangtao2014 <gzjiangtao2014@corp.netease.com>
Co-authored-by: Nelz <nelz9999@users.noreply.github.com>
Co-authored-by: Daniel Richter <Nexyz9@gmail.com>
Co-authored-by: Seyed Ali Ghaffari <ali.ghaffari@outlook.com>
Co-authored-by: lintanghui <lintanghui@bilibili.com>
Co-authored-by: hidu <duv123+github@gmail.com>
Co-authored-by: Jonas Lergell <jonas.lergell@volvocars.com>
Co-authored-by: Alex Kahn <alexanderkahn@gmail.com>
2022-03-19 07:40:31 +03:00
|
|
|
|
2015-11-14 15:44:16 +03:00
|
|
|
if err != nil {
|
2015-07-11 13:12:47 +03:00
|
|
|
return nil, err
|
|
|
|
}
|
2016-03-01 13:31:06 +03:00
|
|
|
|
2017-02-08 12:24:09 +03:00
|
|
|
return c.newMessage(c.cmd.Val())
|
2014-05-11 11:42:40 +04:00
|
|
|
}
|
2012-07-25 17:00:50 +04:00
|
|
|
|
2016-04-09 11:45:56 +03:00
|
|
|
// Receive returns a message as a Subscription, Message, Pong or error.
|
2018-07-24 09:41:14 +03:00
|
|
|
// See PubSub example for details. This is low-level API and in most cases
|
|
|
|
// Channel should be used instead.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
|
|
|
|
return c.ReceiveTimeout(ctx, 0)
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
2014-05-11 11:42:40 +04:00
|
|
|
|
2018-07-24 09:41:14 +03:00
|
|
|
// ReceiveMessage returns a Message or error ignoring Subscription and Pong
|
|
|
|
// messages. This is low-level API and in most cases Channel should be used
|
|
|
|
// instead.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) ReceiveMessage(ctx context.Context) (*Message, error) {
|
2015-09-06 13:50:16 +03:00
|
|
|
for {
|
2020-03-11 17:26:42 +03:00
|
|
|
msg, err := c.Receive(ctx)
|
2015-09-06 13:50:16 +03:00
|
|
|
if err != nil {
|
2018-07-23 15:55:13 +03:00
|
|
|
return nil, err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
switch msg := msg.(type) {
|
2015-09-06 13:50:16 +03:00
|
|
|
case *Subscription:
|
|
|
|
// Ignore.
|
|
|
|
case *Pong:
|
|
|
|
// Ignore.
|
|
|
|
case *Message:
|
|
|
|
return msg, nil
|
|
|
|
default:
|
2018-07-23 15:55:13 +03:00
|
|
|
err := fmt.Errorf("redis: unknown message: %T", msg)
|
|
|
|
return nil, err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
}
|
2012-08-09 16:27:06 +04:00
|
|
|
}
|
2016-03-01 13:31:06 +03:00
|
|
|
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *PubSub) getContext() context.Context {
|
|
|
|
if c.cmd != nil {
|
|
|
|
return c.cmd.ctx
|
|
|
|
}
|
|
|
|
return context.Background()
|
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
2017-10-30 13:09:57 +03:00
|
|
|
// Channel returns a Go channel for concurrently receiving messages.
|
2019-07-01 17:21:32 +03:00
|
|
|
// The channel is closed together with the PubSub. If the Go channel
|
|
|
|
// is blocked full for 30 seconds the message is dropped.
|
|
|
|
// Receive* APIs can not be used after channel is created.
|
2019-03-12 13:40:08 +03:00
|
|
|
//
|
2019-07-01 17:21:32 +03:00
|
|
|
// go-redis periodically sends ping messages to test connection health
|
|
|
|
// and re-subscribes if ping can not not received for 30 seconds.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
|
2019-07-01 17:21:32 +03:00
|
|
|
c.chOnce.Do(func() {
|
2021-05-26 06:25:18 +03:00
|
|
|
c.msgCh = newChannel(c, opts...)
|
|
|
|
c.msgCh.initMsgChan()
|
2019-07-01 17:21:32 +03:00
|
|
|
})
|
|
|
|
if c.msgCh == nil {
|
|
|
|
err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
|
|
|
|
panic(err)
|
|
|
|
}
|
2021-05-26 06:25:18 +03:00
|
|
|
return c.msgCh.msgCh
|
|
|
|
}
|
|
|
|
|
|
|
|
// ChannelSize is like Channel, but creates a Go channel
|
|
|
|
// with specified buffer size.
|
|
|
|
//
|
|
|
|
// Deprecated: use Channel(WithChannelSize(size)), remove in v9.
|
|
|
|
func (c *PubSub) ChannelSize(size int) <-chan *Message {
|
|
|
|
return c.Channel(WithChannelSize(size))
|
2019-03-12 13:48:32 +03:00
|
|
|
}
|
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
// ChannelWithSubscriptions is like Channel, but message type can be either
|
|
|
|
// *Subscription or *Message. Subscription messages can be used to detect
|
|
|
|
// reconnections.
|
|
|
|
//
|
|
|
|
// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *PubSub) ChannelWithSubscriptions(_ context.Context, size int) <-chan interface{} {
|
2019-03-12 13:48:32 +03:00
|
|
|
c.chOnce.Do(func() {
|
2021-05-26 06:25:18 +03:00
|
|
|
c.allCh = newChannel(c, WithChannelSize(size))
|
|
|
|
c.allCh.initAllChan()
|
2019-03-12 13:48:32 +03:00
|
|
|
})
|
2019-07-01 17:21:32 +03:00
|
|
|
if c.allCh == nil {
|
|
|
|
err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
|
|
|
|
panic(err)
|
|
|
|
}
|
2021-05-26 06:25:18 +03:00
|
|
|
return c.allCh.allCh
|
|
|
|
}
|
|
|
|
|
|
|
|
type ChannelOption func(c *channel)
|
|
|
|
|
|
|
|
// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
|
|
|
|
//
|
|
|
|
// The default is 100 messages.
|
|
|
|
func WithChannelSize(size int) ChannelOption {
|
|
|
|
return func(c *channel) {
|
|
|
|
c.chanSize = size
|
2019-03-12 13:48:32 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
|
2021-05-26 06:25:18 +03:00
|
|
|
// WithChannelHealthCheckInterval specifies the health check interval.
|
|
|
|
// PubSub will ping Redis Server if it does not receive any messages within the interval.
|
|
|
|
// To disable health check, use zero interval.
|
|
|
|
//
|
|
|
|
// The default is 3 seconds.
|
|
|
|
func WithChannelHealthCheckInterval(d time.Duration) ChannelOption {
|
|
|
|
return func(c *channel) {
|
|
|
|
c.checkInterval = d
|
2020-07-18 09:04:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-26 15:12:25 +03:00
|
|
|
// WithChannelSendTimeout specifies the channel send timeout after which
|
2021-05-26 06:25:18 +03:00
|
|
|
// the message is dropped.
|
|
|
|
//
|
|
|
|
// The default is 60 seconds.
|
|
|
|
func WithChannelSendTimeout(d time.Duration) ChannelOption {
|
|
|
|
return func(c *channel) {
|
|
|
|
c.chanSendTimeout = d
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type channel struct {
|
|
|
|
pubSub *PubSub
|
|
|
|
|
|
|
|
msgCh chan *Message
|
|
|
|
allCh chan interface{}
|
|
|
|
ping chan struct{}
|
|
|
|
|
|
|
|
chanSize int
|
|
|
|
chanSendTimeout time.Duration
|
|
|
|
checkInterval time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
|
|
|
|
c := &channel{
|
|
|
|
pubSub: pubSub,
|
|
|
|
|
|
|
|
chanSize: 100,
|
|
|
|
chanSendTimeout: time.Minute,
|
|
|
|
checkInterval: 3 * time.Second,
|
|
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(c)
|
|
|
|
}
|
|
|
|
if c.checkInterval > 0 {
|
|
|
|
c.initHealthCheck()
|
|
|
|
}
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *channel) initHealthCheck() {
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx := context.TODO()
|
2019-03-12 13:40:08 +03:00
|
|
|
c.ping = make(chan struct{}, 1)
|
2021-05-26 06:25:18 +03:00
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
go func() {
|
2020-09-05 10:56:09 +03:00
|
|
|
timer := time.NewTimer(time.Minute)
|
2019-07-01 17:21:32 +03:00
|
|
|
timer.Stop()
|
2018-07-24 09:41:14 +03:00
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
for {
|
2021-05-26 06:25:18 +03:00
|
|
|
timer.Reset(c.checkInterval)
|
2019-07-01 17:21:32 +03:00
|
|
|
select {
|
|
|
|
case <-c.ping:
|
|
|
|
if !timer.Stop() {
|
|
|
|
<-timer.C
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
2021-05-26 06:25:18 +03:00
|
|
|
if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
|
|
|
|
c.pubSub.mu.Lock()
|
|
|
|
c.pubSub.reconnect(ctx, pingErr)
|
|
|
|
c.pubSub.mu.Unlock()
|
2019-07-01 17:21:32 +03:00
|
|
|
}
|
2021-05-26 06:25:18 +03:00
|
|
|
case <-c.pubSub.exit:
|
2019-07-01 17:21:32 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
// initMsgChan must be in sync with initAllChan.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *channel) initMsgChan() {
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx := context.TODO()
|
2021-05-26 06:25:18 +03:00
|
|
|
c.msgCh = make(chan *Message, c.chanSize)
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
go func() {
|
2020-09-05 10:56:09 +03:00
|
|
|
timer := time.NewTimer(time.Minute)
|
2019-03-12 13:40:08 +03:00
|
|
|
timer.Stop()
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
var errCount int
|
|
|
|
for {
|
2021-05-26 06:25:18 +03:00
|
|
|
msg, err := c.pubSub.Receive(ctx)
|
2018-07-23 15:55:13 +03:00
|
|
|
if err != nil {
|
|
|
|
if err == pool.ErrClosed {
|
2019-07-01 17:21:32 +03:00
|
|
|
close(c.msgCh)
|
2018-07-23 15:55:13 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
if errCount > 0 {
|
2020-09-17 12:54:48 +03:00
|
|
|
time.Sleep(100 * time.Millisecond)
|
2017-04-11 16:18:35 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
errCount++
|
|
|
|
continue
|
2017-04-11 16:18:35 +03:00
|
|
|
}
|
2019-03-12 13:40:08 +03:00
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
errCount = 0
|
2018-07-24 09:41:14 +03:00
|
|
|
|
|
|
|
// Any message is as good as a ping.
|
|
|
|
select {
|
|
|
|
case c.ping <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msg := msg.(type) {
|
|
|
|
case *Subscription:
|
|
|
|
// Ignore.
|
|
|
|
case *Pong:
|
|
|
|
// Ignore.
|
|
|
|
case *Message:
|
2021-05-26 06:25:18 +03:00
|
|
|
timer.Reset(c.chanSendTimeout)
|
2019-03-12 13:40:08 +03:00
|
|
|
select {
|
2019-07-01 17:21:32 +03:00
|
|
|
case c.msgCh <- msg:
|
2019-03-12 13:40:08 +03:00
|
|
|
if !timer.Stop() {
|
|
|
|
<-timer.C
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
2019-06-17 12:32:40 +03:00
|
|
|
internal.Logger.Printf(
|
2021-05-26 06:25:18 +03:00
|
|
|
ctx, "redis: %s channel is full for %s (message is dropped)",
|
|
|
|
c, c.chanSendTimeout)
|
2019-03-12 13:40:08 +03:00
|
|
|
}
|
2018-07-24 09:41:14 +03:00
|
|
|
default:
|
2021-05-26 06:25:18 +03:00
|
|
|
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
|
2018-07-24 09:41:14 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
}()
|
2019-07-01 17:21:32 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
// initAllChan must be in sync with initMsgChan.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *channel) initAllChan() {
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx := context.TODO()
|
2021-05-26 06:25:18 +03:00
|
|
|
c.allCh = make(chan interface{}, c.chanSize)
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
go func() {
|
2021-05-26 06:25:18 +03:00
|
|
|
timer := time.NewTimer(time.Minute)
|
2018-07-23 15:55:13 +03:00
|
|
|
timer.Stop()
|
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
var errCount int
|
2018-07-23 15:55:13 +03:00
|
|
|
for {
|
2021-05-26 06:25:18 +03:00
|
|
|
msg, err := c.pubSub.Receive(ctx)
|
2019-07-01 17:21:32 +03:00
|
|
|
if err != nil {
|
|
|
|
if err == pool.ErrClosed {
|
|
|
|
close(c.allCh)
|
|
|
|
return
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
2019-07-01 17:21:32 +03:00
|
|
|
if errCount > 0 {
|
2020-09-17 12:54:48 +03:00
|
|
|
time.Sleep(100 * time.Millisecond)
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
2019-07-01 17:21:32 +03:00
|
|
|
errCount++
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
errCount = 0
|
|
|
|
|
|
|
|
// Any message is as good as a ping.
|
|
|
|
select {
|
|
|
|
case c.ping <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msg := msg.(type) {
|
|
|
|
case *Pong:
|
|
|
|
// Ignore.
|
2021-05-26 06:25:18 +03:00
|
|
|
case *Subscription, *Message:
|
|
|
|
timer.Reset(c.chanSendTimeout)
|
|
|
|
select {
|
|
|
|
case c.allCh <- msg:
|
|
|
|
if !timer.Stop() {
|
|
|
|
<-timer.C
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
|
|
|
internal.Logger.Printf(
|
|
|
|
ctx, "redis: %s channel is full for %s (message is dropped)",
|
|
|
|
c, c.chanSendTimeout)
|
|
|
|
}
|
2019-07-01 17:21:32 +03:00
|
|
|
default:
|
2021-05-26 06:25:18 +03:00
|
|
|
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|