forked from mirror/redis
Compare commits
10 Commits
f70c952806
...
09fcdbe6e4
Author | SHA1 | Date |
---|---|---|
Monkey | 09fcdbe6e4 | |
monkey92t | d42dd1007c | |
Monkey | af893143ae | |
monkey92t | 2fd84242f0 | |
Monkey | 7828fa10a8 | |
monkey92t | 443f0ec113 | |
Monkey | c7bc54b4d0 | |
monkey92t | a872c35b1a | |
Monkey | a4336cbd43 | |
Scott | 7c4b924350 |
2
Makefile
2
Makefile
|
@ -18,7 +18,7 @@ bench: testdeps
|
||||||
|
|
||||||
testdata/redis:
|
testdata/redis:
|
||||||
mkdir -p $@
|
mkdir -p $@
|
||||||
wget -qO- https://download.redis.io/releases/redis-7.0.6.tar.gz | tar xvz --strip-components=1 -C $@
|
wget -qO- https://download.redis.io/releases/redis-7.0.7.tar.gz | tar xvz --strip-components=1 -C $@
|
||||||
|
|
||||||
testdata/redis/src/redis-server: testdata/redis
|
testdata/redis/src/redis-server: testdata/redis
|
||||||
cd $< && make all
|
cd $< && make all
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
- [Documentation](https://redis.uptrace.dev)
|
- [Documentation](https://redis.uptrace.dev)
|
||||||
- [Discussions](https://github.com/go-redis/redis/discussions)
|
- [Discussions](https://github.com/go-redis/redis/discussions)
|
||||||
- [Chat](https://discord.gg/rWtp5Aj)
|
- [Chat](https://discord.gg/rWtp5Aj)
|
||||||
- [Reference](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc)
|
- [Reference](https://pkg.go.dev/github.com/go-redis/redis/v9)
|
||||||
- [Examples](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#pkg-examples)
|
- [Examples](https://pkg.go.dev/github.com/go-redis/redis/v9#pkg-examples)
|
||||||
|
|
||||||
## Ecosystem
|
## Ecosystem
|
||||||
|
|
||||||
|
|
64
cluster.go
64
cluster.go
|
@ -29,6 +29,9 @@ type ClusterOptions struct {
|
||||||
// A seed list of host:port addresses of cluster nodes.
|
// A seed list of host:port addresses of cluster nodes.
|
||||||
Addrs []string
|
Addrs []string
|
||||||
|
|
||||||
|
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
|
||||||
|
ClientName string
|
||||||
|
|
||||||
// NewClient creates a cluster node client with provided name and options.
|
// NewClient creates a cluster node client with provided name and options.
|
||||||
NewClient func(opt *Options) *Client
|
NewClient func(opt *Options) *Client
|
||||||
|
|
||||||
|
@ -133,34 +136,39 @@ func (opt *ClusterOptions) init() {
|
||||||
|
|
||||||
// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
|
// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
|
||||||
// The URL must be in the form:
|
// The URL must be in the form:
|
||||||
// redis://<user>:<password>@<host>:<port>
|
//
|
||||||
// or
|
// redis://<user>:<password>@<host>:<port>
|
||||||
// rediss://<user>:<password>@<host>:<port>
|
// or
|
||||||
|
// rediss://<user>:<password>@<host>:<port>
|
||||||
|
//
|
||||||
// To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
|
// To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
|
||||||
// redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
|
//
|
||||||
// or
|
// redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
|
||||||
// rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
|
// or
|
||||||
|
// rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
|
||||||
//
|
//
|
||||||
// Most Option fields can be set using query parameters, with the following restrictions:
|
// Most Option fields can be set using query parameters, with the following restrictions:
|
||||||
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
|
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
|
||||||
// - only scalar type fields are supported (bool, int, time.Duration)
|
// - only scalar type fields are supported (bool, int, time.Duration)
|
||||||
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
|
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
|
||||||
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
|
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
|
||||||
// - to disable a duration field, use value less than or equal to 0; to use the default
|
// - to disable a duration field, use value less than or equal to 0; to use the default
|
||||||
// value, leave the value blank or remove the parameter
|
// value, leave the value blank or remove the parameter
|
||||||
// - only the last value is interpreted if a parameter is given multiple times
|
// - only the last value is interpreted if a parameter is given multiple times
|
||||||
// - fields "network", "addr", "username" and "password" can only be set using other
|
// - fields "network", "addr", "username" and "password" can only be set using other
|
||||||
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
|
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
|
||||||
// names will be treated as unknown parameters
|
// names will be treated as unknown parameters
|
||||||
// - unknown parameter names will result in an error
|
// - unknown parameter names will result in an error
|
||||||
|
//
|
||||||
// Example:
|
// Example:
|
||||||
// redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
|
//
|
||||||
// is equivalent to:
|
// redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
|
||||||
// &ClusterOptions{
|
// is equivalent to:
|
||||||
// Addr: ["localhost:6789", "localhost:6790", "localhost:6791"]
|
// &ClusterOptions{
|
||||||
// DialTimeout: 3 * time.Second, // no time unit = seconds
|
// Addr: ["localhost:6789", "localhost:6790", "localhost:6791"]
|
||||||
// ReadTimeout: 6 * time.Second,
|
// DialTimeout: 3 * time.Second, // no time unit = seconds
|
||||||
// }
|
// ReadTimeout: 6 * time.Second,
|
||||||
|
// }
|
||||||
func ParseClusterURL(redisURL string) (*ClusterOptions, error) {
|
func ParseClusterURL(redisURL string) (*ClusterOptions, error) {
|
||||||
o := &ClusterOptions{}
|
o := &ClusterOptions{}
|
||||||
|
|
||||||
|
@ -208,6 +216,7 @@ func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptio
|
||||||
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
|
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
|
||||||
q := queryOptions{q: u.Query()}
|
q := queryOptions{q: u.Query()}
|
||||||
|
|
||||||
|
o.ClientName = q.string("client_name")
|
||||||
o.MaxRedirects = q.int("max_redirects")
|
o.MaxRedirects = q.int("max_redirects")
|
||||||
o.ReadOnly = q.bool("read_only")
|
o.ReadOnly = q.bool("read_only")
|
||||||
o.RouteByLatency = q.bool("route_by_latency")
|
o.RouteByLatency = q.bool("route_by_latency")
|
||||||
|
@ -250,8 +259,9 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
|
||||||
|
|
||||||
func (opt *ClusterOptions) clientOptions() *Options {
|
func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
Dialer: opt.Dialer,
|
ClientName: opt.ClientName,
|
||||||
OnConnect: opt.OnConnect,
|
Dialer: opt.Dialer,
|
||||||
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
@ -871,7 +881,7 @@ func (c *ClusterClient) Close() error {
|
||||||
return c.nodes.Close()
|
return c.nodes.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do creates a Cmd from the args and processes the cmd.
|
// Do create a Cmd from the args and processes the cmd.
|
||||||
func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
|
func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
|
||||||
cmd := NewCmd(ctx, args...)
|
cmd := NewCmd(ctx, args...)
|
||||||
_ = c.Process(ctx, cmd)
|
_ = c.Process(ctx, cmd)
|
||||||
|
|
|
@ -589,6 +589,7 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Describe("ClusterClient", func() {
|
Describe("ClusterClient", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
opt = redisClusterOptions()
|
opt = redisClusterOptions()
|
||||||
|
opt.ClientName = "cluster_hi"
|
||||||
client = cluster.newClusterClient(ctx, opt)
|
client = cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
@ -679,6 +680,20 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should cluster client setname", func() {
|
||||||
|
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
return c.Ping(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.ClientList(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainSubstring("name=cluster_hi"))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
It("should CLUSTER NODES", func() {
|
It("should CLUSTER NODES", func() {
|
||||||
res, err := client.ClusterNodes(ctx).Result()
|
res, err := client.ClusterNodes(ctx).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -1408,6 +1423,10 @@ func TestParseClusterURL(t *testing.T) {
|
||||||
test: "UseDefault",
|
test: "UseDefault",
|
||||||
url: "redis://localhost:123?conn_max_idle_time=",
|
url: "redis://localhost:123?conn_max_idle_time=",
|
||||||
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
|
||||||
|
}, {
|
||||||
|
test: "ClientName",
|
||||||
|
url: "redis://localhost:123?client_name=cluster_hi",
|
||||||
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
|
||||||
}, {
|
}, {
|
||||||
test: "UseDefaultMissing=",
|
test: "UseDefaultMissing=",
|
||||||
url: "redis://localhost:123?conn_max_idle_time",
|
url: "redis://localhost:123?conn_max_idle_time",
|
||||||
|
|
17
command.go
17
command.go
|
@ -1110,15 +1110,16 @@ func (cmd *KeyValueSliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Many commands will respond to two formats:
|
// Many commands will respond to two formats:
|
||||||
// 1) 1) "one"
|
// 1. 1) "one"
|
||||||
// 2) (double) 1
|
// 2. (double) 1
|
||||||
// 2) 1) "two"
|
// 2. 1) "two"
|
||||||
// 2) (double) 2
|
// 2. (double) 2
|
||||||
|
//
|
||||||
// OR:
|
// OR:
|
||||||
// 1) "two"
|
// 1. "two"
|
||||||
// 2) (double) 2
|
// 2. (double) 2
|
||||||
// 3) "one"
|
// 3. "one"
|
||||||
// 4) (double) 1
|
// 4. (double) 1
|
||||||
func (cmd *KeyValueSliceCmd) readReply(rd *proto.Reader) error { // nolint:dupl
|
func (cmd *KeyValueSliceCmd) readReply(rd *proto.Reader) error { // nolint:dupl
|
||||||
n, err := rd.ReadArrayLen()
|
n, err := rd.ReadArrayLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -14,6 +14,15 @@ import (
|
||||||
"github.com/go-redis/redis/v9/internal/proto"
|
"github.com/go-redis/redis/v9/internal/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TimeValue struct {
|
||||||
|
time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeValue) ScanRedis(s string) (err error) {
|
||||||
|
t.Time, err = time.Parse(time.RFC3339Nano, s)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Describe("Commands", func() {
|
var _ = Describe("Commands", func() {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
|
@ -1192,19 +1201,28 @@ var _ = Describe("Commands", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should scan Mget", func() {
|
It("should scan Mget", func() {
|
||||||
err := client.MSet(ctx, "key1", "hello1", "key2", 123).Err()
|
now := time.Now()
|
||||||
|
|
||||||
|
err := client.MSet(ctx, "key1", "hello1", "key2", 123, "time", now.Format(time.RFC3339Nano)).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
res := client.MGet(ctx, "key1", "key2", "_")
|
res := client.MGet(ctx, "key1", "key2", "_", "time")
|
||||||
Expect(res.Err()).NotTo(HaveOccurred())
|
Expect(res.Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
type data struct {
|
type data struct {
|
||||||
Key1 string `redis:"key1"`
|
Key1 string `redis:"key1"`
|
||||||
Key2 int `redis:"key2"`
|
Key2 int `redis:"key2"`
|
||||||
|
Time TimeValue `redis:"time"`
|
||||||
}
|
}
|
||||||
var d data
|
var d data
|
||||||
Expect(res.Scan(&d)).NotTo(HaveOccurred())
|
Expect(res.Scan(&d)).NotTo(HaveOccurred())
|
||||||
Expect(d).To(Equal(data{Key1: "hello1", Key2: 123}))
|
Expect(d.Time.UnixNano()).To(Equal(now.UnixNano()))
|
||||||
|
d.Time.Time = time.Time{}
|
||||||
|
Expect(d).To(Equal(data{
|
||||||
|
Key1: "hello1",
|
||||||
|
Key2: 123,
|
||||||
|
Time: TimeValue{Time: time.Time{}},
|
||||||
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should MSetNX", func() {
|
It("should MSetNX", func() {
|
||||||
|
@ -1732,19 +1750,28 @@ var _ = Describe("Commands", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should scan", func() {
|
It("should scan", func() {
|
||||||
err := client.HMSet(ctx, "hash", "key1", "hello1", "key2", 123).Err()
|
now := time.Now()
|
||||||
|
|
||||||
|
err := client.HMSet(ctx, "hash", "key1", "hello1", "key2", 123, "time", now.Format(time.RFC3339Nano)).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
res := client.HGetAll(ctx, "hash")
|
res := client.HGetAll(ctx, "hash")
|
||||||
Expect(res.Err()).NotTo(HaveOccurred())
|
Expect(res.Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
type data struct {
|
type data struct {
|
||||||
Key1 string `redis:"key1"`
|
Key1 string `redis:"key1"`
|
||||||
Key2 int `redis:"key2"`
|
Key2 int `redis:"key2"`
|
||||||
|
Time TimeValue `redis:"time"`
|
||||||
}
|
}
|
||||||
var d data
|
var d data
|
||||||
Expect(res.Scan(&d)).NotTo(HaveOccurred())
|
Expect(res.Scan(&d)).NotTo(HaveOccurred())
|
||||||
Expect(d).To(Equal(data{Key1: "hello1", Key2: 123}))
|
Expect(d.Time.UnixNano()).To(Equal(now.UnixNano()))
|
||||||
|
d.Time.Time = time.Time{}
|
||||||
|
Expect(d).To(Equal(data{
|
||||||
|
Key1: "hello1",
|
||||||
|
Key2: 123,
|
||||||
|
Time: TimeValue{Time: time.Time{}},
|
||||||
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should HIncrBy", func() {
|
It("should HIncrBy", func() {
|
||||||
|
|
|
@ -193,7 +193,11 @@ func (mh *metricsHook) DialHook(hook redis.DialHook) redis.DialHook {
|
||||||
|
|
||||||
conn, err := hook(ctx, network, addr)
|
conn, err := hook(ctx, network, addr)
|
||||||
|
|
||||||
mh.createTime.Record(ctx, milliseconds(time.Since(start)), mh.attrs...)
|
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+1)
|
||||||
|
attrs = append(attrs, mh.attrs...)
|
||||||
|
attrs = append(attrs, statusAttr(err))
|
||||||
|
|
||||||
|
mh.createTime.Record(ctx, milliseconds(time.Since(start)), attrs...)
|
||||||
return conn, err
|
return conn, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,12 @@ import (
|
||||||
// decoderFunc represents decoding functions for default built-in types.
|
// decoderFunc represents decoding functions for default built-in types.
|
||||||
type decoderFunc func(reflect.Value, string) error
|
type decoderFunc func(reflect.Value, string) error
|
||||||
|
|
||||||
|
// Scanner is the interface implemented by themselves,
|
||||||
|
// which will override the decoding behavior of decoderFunc.
|
||||||
|
type Scanner interface {
|
||||||
|
ScanRedis(s string) error
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// List of built-in decoders indexed by their numeric constant values (eg: reflect.Bool = 1).
|
// List of built-in decoders indexed by their numeric constant values (eg: reflect.Bool = 1).
|
||||||
decoders = []decoderFunc{
|
decoders = []decoderFunc{
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
@ -30,6 +31,20 @@ type data struct {
|
||||||
Bool bool `redis:"bool"`
|
Bool bool `redis:"bool"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TimeRFC3339Nano struct {
|
||||||
|
time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeRFC3339Nano) ScanRedis(s string) (err error) {
|
||||||
|
t.Time, err = time.Parse(time.RFC3339Nano, s)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimeData struct {
|
||||||
|
Name string `redis:"name"`
|
||||||
|
Time *TimeRFC3339Nano `redis:"login"`
|
||||||
|
}
|
||||||
|
|
||||||
type i []interface{}
|
type i []interface{}
|
||||||
|
|
||||||
func TestGinkgoSuite(t *testing.T) {
|
func TestGinkgoSuite(t *testing.T) {
|
||||||
|
@ -175,4 +190,14 @@ var _ = Describe("Scan", func() {
|
||||||
Expect(Scan(&d, i{"bool"}, i{""})).To(HaveOccurred())
|
Expect(Scan(&d, i{"bool"}, i{""})).To(HaveOccurred())
|
||||||
Expect(Scan(&d, i{"bool"}, i{"123"})).To(HaveOccurred())
|
Expect(Scan(&d, i{"bool"}, i{"123"})).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("Implements Scanner", func() {
|
||||||
|
var td TimeData
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
Expect(Scan(&td, i{"name", "login"}, i{"hello", now.Format(time.RFC3339Nano)})).NotTo(HaveOccurred())
|
||||||
|
Expect(td.Name).To(Equal("hello"))
|
||||||
|
Expect(td.Time.UnixNano()).To(Equal(now.UnixNano()))
|
||||||
|
Expect(td.Time.Format(time.RFC3339Nano)).To(Equal(now.Format(time.RFC3339Nano)))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -84,7 +84,29 @@ func (s StructValue) Scan(key string, value string) error {
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := field.fn(s.value.Field(field.index), value); err != nil {
|
|
||||||
|
v := s.value.Field(field.index)
|
||||||
|
isPtr := v.Kind() == reflect.Pointer
|
||||||
|
|
||||||
|
if isPtr && v.IsNil() {
|
||||||
|
v.Set(reflect.New(v.Type().Elem()))
|
||||||
|
}
|
||||||
|
if !isPtr && v.Type().Name() != "" && v.CanAddr() {
|
||||||
|
v = v.Addr()
|
||||||
|
isPtr = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if isPtr && v.Type().NumMethod() > 0 && v.CanInterface() {
|
||||||
|
if scan, ok := v.Interface().(Scanner); ok {
|
||||||
|
return scan.ScanRedis(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if isPtr {
|
||||||
|
v = v.Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := field.fn(v, value); err != nil {
|
||||||
t := s.value.Type()
|
t := s.value.Type()
|
||||||
return fmt.Errorf("cannot scan redis.result %s into struct field %s.%s of type %s, error-%s",
|
return fmt.Errorf("cannot scan redis.result %s into struct field %s.%s of type %s, error-%s",
|
||||||
value, t.Name(), t.Field(field.index).Name, t.Field(field.index).Type, err.Error())
|
value, t.Name(), t.Field(field.index).Name, t.Field(field.index).Type, err.Error())
|
||||||
|
|
|
@ -32,7 +32,9 @@ type Once struct {
|
||||||
|
|
||||||
// Do calls the function f if and only if Do has not been invoked
|
// Do calls the function f if and only if Do has not been invoked
|
||||||
// without error for this instance of Once. In other words, given
|
// without error for this instance of Once. In other words, given
|
||||||
// var once Once
|
//
|
||||||
|
// var once Once
|
||||||
|
//
|
||||||
// if once.Do(f) is called multiple times, only the first call will
|
// if once.Do(f) is called multiple times, only the first call will
|
||||||
// invoke f, even if f has a different value in each invocation unless
|
// invoke f, even if f has a different value in each invocation unless
|
||||||
// f returns an error. A new instance of Once is required for each
|
// f returns an error. A new instance of Once is required for each
|
||||||
|
@ -41,7 +43,8 @@ type Once struct {
|
||||||
// Do is intended for initialization that must be run exactly once. Since f
|
// Do is intended for initialization that must be run exactly once. Since f
|
||||||
// is niladic, it may be necessary to use a function literal to capture the
|
// is niladic, it may be necessary to use a function literal to capture the
|
||||||
// arguments to a function to be invoked by Do:
|
// arguments to a function to be invoked by Do:
|
||||||
// err := config.once.Do(func() error { return config.init(filename) })
|
//
|
||||||
|
// err := config.once.Do(func() error { return config.init(filename) })
|
||||||
func (o *Once) Do(f func() error) error {
|
func (o *Once) Do(f func() error) error {
|
||||||
if atomic.LoadUint32(&o.done) == 1 {
|
if atomic.LoadUint32(&o.done) == 1 {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Scan parses bytes `b` to `v` with appropriate type.
|
// Scan parses bytes `b` to `v` with appropriate type.
|
||||||
|
//
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func Scan(b []byte, v interface{}) error {
|
func Scan(b []byte, v interface{}) error {
|
||||||
switch v := v.(type) {
|
switch v := v.(type) {
|
||||||
|
|
58
options.go
58
options.go
|
@ -27,7 +27,7 @@ type Limiter interface {
|
||||||
ReportResult(result error)
|
ReportResult(result error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options keeps the settings to setup redis connection.
|
// Options keeps the settings to set up redis connection.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// The network type, either tcp or unix.
|
// The network type, either tcp or unix.
|
||||||
// Default is tcp.
|
// Default is tcp.
|
||||||
|
@ -35,6 +35,9 @@ type Options struct {
|
||||||
// host:port address.
|
// host:port address.
|
||||||
Addr string
|
Addr string
|
||||||
|
|
||||||
|
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
|
||||||
|
ClientName string
|
||||||
|
|
||||||
// Dialer creates new network connection and has priority over
|
// Dialer creates new network connection and has priority over
|
||||||
// Network and Addr options.
|
// Network and Addr options.
|
||||||
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
|
@ -220,32 +223,38 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
|
||||||
// Scheme is required.
|
// Scheme is required.
|
||||||
// There are two connection types: by tcp socket and by unix socket.
|
// There are two connection types: by tcp socket and by unix socket.
|
||||||
// Tcp connection:
|
// Tcp connection:
|
||||||
// redis://<user>:<password>@<host>:<port>/<db_number>
|
//
|
||||||
|
// redis://<user>:<password>@<host>:<port>/<db_number>
|
||||||
|
//
|
||||||
// Unix connection:
|
// Unix connection:
|
||||||
// unix://<user>:<password>@</path/to/redis.sock>?db=<db_number>
|
//
|
||||||
|
// unix://<user>:<password>@</path/to/redis.sock>?db=<db_number>
|
||||||
|
//
|
||||||
// Most Option fields can be set using query parameters, with the following restrictions:
|
// Most Option fields can be set using query parameters, with the following restrictions:
|
||||||
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
|
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
|
||||||
// - only scalar type fields are supported (bool, int, time.Duration)
|
// - only scalar type fields are supported (bool, int, time.Duration)
|
||||||
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
|
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
|
||||||
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
|
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
|
||||||
// - to disable a duration field, use value less than or equal to 0; to use the default
|
// - to disable a duration field, use value less than or equal to 0; to use the default
|
||||||
// value, leave the value blank or remove the parameter
|
// value, leave the value blank or remove the parameter
|
||||||
// - only the last value is interpreted if a parameter is given multiple times
|
// - only the last value is interpreted if a parameter is given multiple times
|
||||||
// - fields "network", "addr", "username" and "password" can only be set using other
|
// - fields "network", "addr", "username" and "password" can only be set using other
|
||||||
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
|
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
|
||||||
// names will be treated as unknown parameters
|
// names will be treated as unknown parameters
|
||||||
// - unknown parameter names will result in an error
|
// - unknown parameter names will result in an error
|
||||||
|
//
|
||||||
// Examples:
|
// Examples:
|
||||||
// redis://user:password@localhost:6789/3?dial_timeout=3&db=1&read_timeout=6s&max_retries=2
|
//
|
||||||
// is equivalent to:
|
// redis://user:password@localhost:6789/3?dial_timeout=3&db=1&read_timeout=6s&max_retries=2
|
||||||
// &Options{
|
// is equivalent to:
|
||||||
// Network: "tcp",
|
// &Options{
|
||||||
// Addr: "localhost:6789",
|
// Network: "tcp",
|
||||||
// DB: 1, // path "/3" was overridden by "&db=1"
|
// Addr: "localhost:6789",
|
||||||
// DialTimeout: 3 * time.Second, // no time unit = seconds
|
// DB: 1, // path "/3" was overridden by "&db=1"
|
||||||
// ReadTimeout: 6 * time.Second,
|
// DialTimeout: 3 * time.Second, // no time unit = seconds
|
||||||
// MaxRetries: 2,
|
// ReadTimeout: 6 * time.Second,
|
||||||
// }
|
// MaxRetries: 2,
|
||||||
|
// }
|
||||||
func ParseURL(redisURL string) (*Options, error) {
|
func ParseURL(redisURL string) (*Options, error) {
|
||||||
u, err := url.Parse(redisURL)
|
u, err := url.Parse(redisURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -426,6 +435,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
|
||||||
o.DB = db
|
o.DB = db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
o.ClientName = q.string("client_name")
|
||||||
o.MaxRetries = q.int("max_retries")
|
o.MaxRetries = q.int("max_retries")
|
||||||
o.MinRetryBackoff = q.duration("min_retry_backoff")
|
o.MinRetryBackoff = q.duration("min_retry_backoff")
|
||||||
o.MaxRetryBackoff = q.duration("max_retry_backoff")
|
o.MaxRetryBackoff = q.duration("max_retry_backoff")
|
||||||
|
|
|
@ -59,6 +59,9 @@ func TestParseURL(t *testing.T) {
|
||||||
}, {
|
}, {
|
||||||
url: "redis://localhost:123/?db=2&conn_max_idle_time", // missing "=" at the end
|
url: "redis://localhost:123/?db=2&conn_max_idle_time", // missing "=" at the end
|
||||||
o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0},
|
o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0},
|
||||||
|
}, {
|
||||||
|
url: "redis://localhost:123/?db=2&client_name=hi", // client name
|
||||||
|
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
|
||||||
}, {
|
}, {
|
||||||
url: "unix:///tmp/redis.sock",
|
url: "unix:///tmp/redis.sock",
|
||||||
o: &Options{Addr: "/tmp/redis.sock"},
|
o: &Options{Addr: "/tmp/redis.sock"},
|
||||||
|
|
89
redis.go
89
redis.go
|
@ -10,10 +10,14 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v9/internal"
|
"github.com/go-redis/redis/v9/internal"
|
||||||
|
"github.com/go-redis/redis/v9/internal/hscan"
|
||||||
"github.com/go-redis/redis/v9/internal/pool"
|
"github.com/go-redis/redis/v9/internal/pool"
|
||||||
"github.com/go-redis/redis/v9/internal/proto"
|
"github.com/go-redis/redis/v9/internal/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Scanner internal/hscan.Scanner exposed interface.
|
||||||
|
type Scanner = hscan.Scanner
|
||||||
|
|
||||||
// Nil reply returned by Redis when key does not exist.
|
// Nil reply returned by Redis when key does not exist.
|
||||||
const Nil = proto.Nil
|
const Nil = proto.Nil
|
||||||
|
|
||||||
|
@ -25,9 +29,9 @@ func SetLogger(logger internal.Logging) {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type Hook interface {
|
type Hook interface {
|
||||||
DialHook(hook DialHook) DialHook
|
DialHook(next DialHook) DialHook
|
||||||
ProcessHook(hook ProcessHook) ProcessHook
|
ProcessHook(next ProcessHook) ProcessHook
|
||||||
ProcessPipelineHook(hook ProcessPipelineHook) ProcessPipelineHook
|
ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -44,6 +48,43 @@ type hooks struct {
|
||||||
processTxPipelineHook ProcessPipelineHook
|
processTxPipelineHook ProcessPipelineHook
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddHook is to add a hook to the queue.
|
||||||
|
// Hook is a function executed during network connection, command execution, and pipeline,
|
||||||
|
// it is a first-in-last-out stack queue (FILO).
|
||||||
|
// The first to be added to the queue is the execution function of the redis command (the last to be executed).
|
||||||
|
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
|
||||||
|
// For example, you added hook-1, hook-2:
|
||||||
|
//
|
||||||
|
// client.AddHook(hook-1, hook-2)
|
||||||
|
//
|
||||||
|
// hook-1:
|
||||||
|
//
|
||||||
|
// func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
||||||
|
// return func(ctx context.Context, cmd Cmder) error {
|
||||||
|
// print("hook-1 start")
|
||||||
|
// next(ctx, cmd)
|
||||||
|
// print("hook-1 end")
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// hook-2:
|
||||||
|
//
|
||||||
|
// func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
||||||
|
// return func(ctx context.Context, cmd redis.Cmder) error {
|
||||||
|
// print("hook-2 start")
|
||||||
|
// next(ctx, cmd)
|
||||||
|
// print("hook-2 end")
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// The execution sequence is:
|
||||||
|
//
|
||||||
|
// hook-2 start -> hook-1 start -> exec redis cmd -> hook-1 end -> hook-2 end
|
||||||
|
//
|
||||||
|
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
|
||||||
|
// if "next(ctx, cmd)" is not executed in hook-1, the redis command will not be executed.
|
||||||
func (hs *hooks) AddHook(hook Hook) {
|
func (hs *hooks) AddHook(hook Hook) {
|
||||||
hs.slice = append(hs.slice, hook)
|
hs.slice = append(hs.slice, hook)
|
||||||
hs.dialHook = hook.DialHook(hs.dialHook)
|
hs.dialHook = hook.DialHook(hs.dialHook)
|
||||||
|
@ -256,6 +297,10 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||||
pipe.ReadOnly(ctx)
|
pipe.ReadOnly(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.opt.ClientName != "" {
|
||||||
|
pipe.ClientSetName(ctx, c.opt.ClientName)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -567,7 +612,7 @@ func (c *Client) Conn() *Conn {
|
||||||
return newConn(c.opt, pool.NewStickyConnPool(c.connPool))
|
return newConn(c.opt, pool.NewStickyConnPool(c.connPool))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do creates a Cmd from the args and processes the cmd.
|
// Do create a Cmd from the args and processes the cmd.
|
||||||
func (c *Client) Do(ctx context.Context, args ...interface{}) *Cmd {
|
func (c *Client) Do(ctx context.Context, args ...interface{}) *Cmd {
|
||||||
cmd := NewCmd(ctx, args...)
|
cmd := NewCmd(ctx, args...)
|
||||||
_ = c.Process(ctx, cmd)
|
_ = c.Process(ctx, cmd)
|
||||||
|
@ -640,26 +685,26 @@ func (c *Client) pubSub() *PubSub {
|
||||||
// subscription may not be active immediately. To force the connection to wait,
|
// subscription may not be active immediately. To force the connection to wait,
|
||||||
// you may call the Receive() method on the returned *PubSub like so:
|
// you may call the Receive() method on the returned *PubSub like so:
|
||||||
//
|
//
|
||||||
// sub := client.Subscribe(queryResp)
|
// sub := client.Subscribe(queryResp)
|
||||||
// iface, err := sub.Receive()
|
// iface, err := sub.Receive()
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// // handle error
|
// // handle error
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// // Should be *Subscription, but others are possible if other actions have been
|
// // Should be *Subscription, but others are possible if other actions have been
|
||||||
// // taken on sub since it was created.
|
// // taken on sub since it was created.
|
||||||
// switch iface.(type) {
|
// switch iface.(type) {
|
||||||
// case *Subscription:
|
// case *Subscription:
|
||||||
// // subscribe succeeded
|
// // subscribe succeeded
|
||||||
// case *Message:
|
// case *Message:
|
||||||
// // received first message
|
// // received first message
|
||||||
// case *Pong:
|
// case *Pong:
|
||||||
// // pong received
|
// // pong received
|
||||||
// default:
|
// default:
|
||||||
// // handle error
|
// // handle error
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// ch := sub.Channel()
|
// ch := sub.Channel()
|
||||||
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub {
|
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub {
|
||||||
pubsub := c.pubSub()
|
pubsub := c.pubSub()
|
||||||
if len(channels) > 0 {
|
if len(channels) > 0 {
|
||||||
|
|
|
@ -169,6 +169,21 @@ var _ = Describe("Client", func() {
|
||||||
Expect(db2.Close()).NotTo(HaveOccurred())
|
Expect(db2.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should client setname", func() {
|
||||||
|
opt := redisOptions()
|
||||||
|
opt.ClientName = "hi"
|
||||||
|
db := redis.NewClient(opt)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
Expect(db.Close()).NotTo(HaveOccurred())
|
||||||
|
}()
|
||||||
|
|
||||||
|
Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
val, err := db.ClientList(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainSubstring("name=hi"))
|
||||||
|
})
|
||||||
|
|
||||||
It("processes custom commands", func() {
|
It("processes custom commands", func() {
|
||||||
cmd := redis.NewCmd(ctx, "PING")
|
cmd := redis.NewCmd(ctx, "PING")
|
||||||
_ = client.Process(ctx, cmd)
|
_ = client.Process(ctx, cmd)
|
||||||
|
|
10
ring.go
10
ring.go
|
@ -51,6 +51,9 @@ type RingOptions struct {
|
||||||
// NewClient creates a shard client with provided options.
|
// NewClient creates a shard client with provided options.
|
||||||
NewClient func(opt *Options) *Client
|
NewClient func(opt *Options) *Client
|
||||||
|
|
||||||
|
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
|
||||||
|
ClientName string
|
||||||
|
|
||||||
// Frequency of PING commands sent to check shards availability.
|
// Frequency of PING commands sent to check shards availability.
|
||||||
// Shard is considered down after 3 subsequent failed checks.
|
// Shard is considered down after 3 subsequent failed checks.
|
||||||
HeartbeatFrequency time.Duration
|
HeartbeatFrequency time.Duration
|
||||||
|
@ -129,8 +132,9 @@ func (opt *RingOptions) init() {
|
||||||
|
|
||||||
func (opt *RingOptions) clientOptions() *Options {
|
func (opt *RingOptions) clientOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
Dialer: opt.Dialer,
|
ClientName: opt.ClientName,
|
||||||
OnConnect: opt.OnConnect,
|
Dialer: opt.Dialer,
|
||||||
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
@ -522,7 +526,7 @@ func (c *Ring) SetAddrs(addrs map[string]string) {
|
||||||
c.sharding.SetAddrs(addrs)
|
c.sharding.SetAddrs(addrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do creates a Cmd from the args and processes the cmd.
|
// Do create a Cmd from the args and processes the cmd.
|
||||||
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
|
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
|
||||||
cmd := NewCmd(ctx, args...)
|
cmd := NewCmd(ctx, args...)
|
||||||
_ = c.Process(ctx, cmd)
|
_ = c.Process(ctx, cmd)
|
||||||
|
|
15
ring_test.go
15
ring_test.go
|
@ -29,6 +29,7 @@ var _ = Describe("Redis Ring", func() {
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
opt := redisRingOptions()
|
opt := redisRingOptions()
|
||||||
|
opt.ClientName = "ring_hi"
|
||||||
opt.HeartbeatFrequency = heartbeat
|
opt.HeartbeatFrequency = heartbeat
|
||||||
ring = redis.NewRing(opt)
|
ring = redis.NewRing(opt)
|
||||||
|
|
||||||
|
@ -50,6 +51,20 @@ var _ = Describe("Redis Ring", func() {
|
||||||
Expect(err).To(MatchError("context canceled"))
|
Expect(err).To(MatchError("context canceled"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should ring client setname", func() {
|
||||||
|
err := ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
return c.Ping(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.ClientList(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainSubstring("name=ring_hi"))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
It("distributes keys", func() {
|
It("distributes keys", func() {
|
||||||
setRingKeys()
|
setRingKeys()
|
||||||
|
|
||||||
|
|
11
sentinel.go
11
sentinel.go
|
@ -24,6 +24,9 @@ type FailoverOptions struct {
|
||||||
// A seed list of host:port addresses of sentinel nodes.
|
// A seed list of host:port addresses of sentinel nodes.
|
||||||
SentinelAddrs []string
|
SentinelAddrs []string
|
||||||
|
|
||||||
|
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
|
||||||
|
ClientName string
|
||||||
|
|
||||||
// If specified with SentinelPassword, enables ACL-based authentication (via
|
// If specified with SentinelPassword, enables ACL-based authentication (via
|
||||||
// AUTH <user> <pass>).
|
// AUTH <user> <pass>).
|
||||||
SentinelUsername string
|
SentinelUsername string
|
||||||
|
@ -78,7 +81,8 @@ type FailoverOptions struct {
|
||||||
|
|
||||||
func (opt *FailoverOptions) clientOptions() *Options {
|
func (opt *FailoverOptions) clientOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
Addr: "FailoverClient",
|
Addr: "FailoverClient",
|
||||||
|
ClientName: opt.ClientName,
|
||||||
|
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
@ -110,7 +114,8 @@ func (opt *FailoverOptions) clientOptions() *Options {
|
||||||
|
|
||||||
func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
|
func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
|
ClientName: opt.ClientName,
|
||||||
|
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
@ -141,6 +146,8 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
|
||||||
|
|
||||||
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||||
return &ClusterOptions{
|
return &ClusterOptions{
|
||||||
|
ClientName: opt.ClientName,
|
||||||
|
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
|
@ -17,6 +18,7 @@ var _ = Describe("Sentinel", func() {
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
|
ClientName: "sentinel_hi",
|
||||||
MasterName: sentinelName,
|
MasterName: sentinelName,
|
||||||
SentinelAddrs: sentinelAddrs,
|
SentinelAddrs: sentinelAddrs,
|
||||||
MaxRetries: -1,
|
MaxRetries: -1,
|
||||||
|
@ -125,6 +127,13 @@ var _ = Describe("Sentinel", func() {
|
||||||
err := client.Ping(ctx).Err()
|
err := client.Ping(ctx).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should sentinel client setname", func() {
|
||||||
|
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
val, err := client.ClientList(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("NewFailoverClusterClient", func() {
|
var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
|
@ -134,6 +143,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
|
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
|
||||||
|
ClientName: "sentinel_cluster_hi",
|
||||||
MasterName: sentinelName,
|
MasterName: sentinelName,
|
||||||
SentinelAddrs: sentinelAddrs,
|
SentinelAddrs: sentinelAddrs,
|
||||||
|
|
||||||
|
@ -213,6 +223,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
_, err = startRedis(masterPort)
|
_, err = startRedis(masterPort)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should sentinel cluster client setname", func() {
|
||||||
|
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
return c.Ping(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.ClientList(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainSubstring("name=sentinel_cluster_hi"))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("SentinelAclAuth", func() {
|
var _ = Describe("SentinelAclAuth", func() {
|
||||||
|
|
18
universal.go
18
universal.go
|
@ -14,6 +14,9 @@ type UniversalOptions struct {
|
||||||
// of cluster/sentinel nodes.
|
// of cluster/sentinel nodes.
|
||||||
Addrs []string
|
Addrs []string
|
||||||
|
|
||||||
|
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
|
||||||
|
ClientName string
|
||||||
|
|
||||||
// Database to be selected after connecting to the server.
|
// Database to be selected after connecting to the server.
|
||||||
// Only single-node and failover clients.
|
// Only single-node and failover clients.
|
||||||
DB int
|
DB int
|
||||||
|
@ -69,9 +72,10 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ClusterOptions{
|
return &ClusterOptions{
|
||||||
Addrs: o.Addrs,
|
Addrs: o.Addrs,
|
||||||
Dialer: o.Dialer,
|
ClientName: o.ClientName,
|
||||||
OnConnect: o.OnConnect,
|
Dialer: o.Dialer,
|
||||||
|
OnConnect: o.OnConnect,
|
||||||
|
|
||||||
Username: o.Username,
|
Username: o.Username,
|
||||||
Password: o.Password,
|
Password: o.Password,
|
||||||
|
@ -112,6 +116,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
|
||||||
return &FailoverOptions{
|
return &FailoverOptions{
|
||||||
SentinelAddrs: o.Addrs,
|
SentinelAddrs: o.Addrs,
|
||||||
MasterName: o.MasterName,
|
MasterName: o.MasterName,
|
||||||
|
ClientName: o.ClientName,
|
||||||
|
|
||||||
Dialer: o.Dialer,
|
Dialer: o.Dialer,
|
||||||
OnConnect: o.OnConnect,
|
OnConnect: o.OnConnect,
|
||||||
|
@ -151,9 +156,10 @@ func (o *UniversalOptions) Simple() *Options {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Options{
|
return &Options{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
Dialer: o.Dialer,
|
ClientName: o.ClientName,
|
||||||
OnConnect: o.OnConnect,
|
Dialer: o.Dialer,
|
||||||
|
OnConnect: o.OnConnect,
|
||||||
|
|
||||||
DB: o.DB,
|
DB: o.DB,
|
||||||
Username: o.Username,
|
Username: o.Username,
|
||||||
|
|
Loading…
Reference in New Issue