Compare commits

..

10 Commits

Author SHA1 Message Date
Monkey 09fcdbe6e4
Merge pull request #2344 from monkey92t/hook
docs: add a description of the hook
2023-01-07 16:52:12 +08:00
monkey92t d42dd1007c docs: add a description of the hook
Signed-off-by: monkey92t <golang@88.com>
2023-01-07 16:30:56 +08:00
Monkey af893143ae
Merge pull request #2338 from monkey92t/readme
docs(README.md): update doc addr, to v9
2023-01-03 18:40:12 +08:00
monkey92t 2fd84242f0 docs(README.md): update doc addr, to v9
Signed-off-by: monkey92t <golang@88.com>
2023-01-03 18:37:13 +08:00
Monkey 7828fa10a8
Merge pull request #2334 from monkey92t/version
test: upgrade redis version(7.0.7)
2022-12-28 22:57:06 +08:00
monkey92t 443f0ec113 test: upgrade redis version(7.0.7)
Signed-off-by: monkey92t <golang@88.com>
2022-12-28 22:38:59 +08:00
Monkey c7bc54b4d0
Merge pull request #2333 from monkey92t/fix_2312
feat: add ClientName option
2022-12-28 22:31:25 +08:00
monkey92t a872c35b1a feat: add ClientName option
Signed-off-by: monkey92t <golang@88.com>
2022-12-28 22:14:52 +08:00
Monkey a4336cbd43
feat(scan): add Scanner interface (#2317)
Signed-off-by: monkey92t <golang@88.com>
2022-12-24 22:29:45 +08:00
Scott 7c4b924350
fix(redisotel): correct metrics.DialHook attrs (#2331) 2022-12-23 20:40:07 +08:00
21 changed files with 355 additions and 108 deletions

View File

@ -18,7 +18,7 @@ bench: testdeps
testdata/redis: testdata/redis:
mkdir -p $@ mkdir -p $@
wget -qO- https://download.redis.io/releases/redis-7.0.6.tar.gz | tar xvz --strip-components=1 -C $@ wget -qO- https://download.redis.io/releases/redis-7.0.7.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis testdata/redis/src/redis-server: testdata/redis
cd $< && make all cd $< && make all

View File

@ -18,8 +18,8 @@
- [Documentation](https://redis.uptrace.dev) - [Documentation](https://redis.uptrace.dev)
- [Discussions](https://github.com/go-redis/redis/discussions) - [Discussions](https://github.com/go-redis/redis/discussions)
- [Chat](https://discord.gg/rWtp5Aj) - [Chat](https://discord.gg/rWtp5Aj)
- [Reference](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc) - [Reference](https://pkg.go.dev/github.com/go-redis/redis/v9)
- [Examples](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#pkg-examples) - [Examples](https://pkg.go.dev/github.com/go-redis/redis/v9#pkg-examples)
## Ecosystem ## Ecosystem

View File

@ -29,6 +29,9 @@ type ClusterOptions struct {
// A seed list of host:port addresses of cluster nodes. // A seed list of host:port addresses of cluster nodes.
Addrs []string Addrs []string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// NewClient creates a cluster node client with provided name and options. // NewClient creates a cluster node client with provided name and options.
NewClient func(opt *Options) *Client NewClient func(opt *Options) *Client
@ -133,10 +136,13 @@ func (opt *ClusterOptions) init() {
// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis. // ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
// The URL must be in the form: // The URL must be in the form:
//
// redis://<user>:<password>@<host>:<port> // redis://<user>:<password>@<host>:<port>
// or // or
// rediss://<user>:<password>@<host>:<port> // rediss://<user>:<password>@<host>:<port>
//
// To add additional addresses, specify the query parameter, "addr" one or more times. e.g: // To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
//
// redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3> // redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
// or // or
// rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3> // rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
@ -153,7 +159,9 @@ func (opt *ClusterOptions) init() {
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these // URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// names will be treated as unknown parameters // names will be treated as unknown parameters
// - unknown parameter names will result in an error // - unknown parameter names will result in an error
//
// Example: // Example:
//
// redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791 // redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
// is equivalent to: // is equivalent to:
// &ClusterOptions{ // &ClusterOptions{
@ -208,6 +216,7 @@ func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptio
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) { func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
q := queryOptions{q: u.Query()} q := queryOptions{q: u.Query()}
o.ClientName = q.string("client_name")
o.MaxRedirects = q.int("max_redirects") o.MaxRedirects = q.int("max_redirects")
o.ReadOnly = q.bool("read_only") o.ReadOnly = q.bool("read_only")
o.RouteByLatency = q.bool("route_by_latency") o.RouteByLatency = q.bool("route_by_latency")
@ -250,6 +259,7 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
func (opt *ClusterOptions) clientOptions() *Options { func (opt *ClusterOptions) clientOptions() *Options {
return &Options{ return &Options{
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -871,7 +881,7 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close() return c.nodes.Close()
} }
// Do creates a Cmd from the args and processes the cmd. // Do create a Cmd from the args and processes the cmd.
func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd { func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...) cmd := NewCmd(ctx, args...)
_ = c.Process(ctx, cmd) _ = c.Process(ctx, cmd)

View File

@ -589,6 +589,7 @@ var _ = Describe("ClusterClient", func() {
Describe("ClusterClient", func() { Describe("ClusterClient", func() {
BeforeEach(func() { BeforeEach(func() {
opt = redisClusterOptions() opt = redisClusterOptions()
opt.ClientName = "cluster_hi"
client = cluster.newClusterClient(ctx, opt) client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
@ -679,6 +680,20 @@ var _ = Describe("ClusterClient", func() {
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
}) })
It("should cluster client setname", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=cluster_hi"))
return nil
})
})
It("should CLUSTER NODES", func() { It("should CLUSTER NODES", func() {
res, err := client.ClusterNodes(ctx).Result() res, err := client.ClusterNodes(ctx).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -1408,6 +1423,10 @@ func TestParseClusterURL(t *testing.T) {
test: "UseDefault", test: "UseDefault",
url: "redis://localhost:123?conn_max_idle_time=", url: "redis://localhost:123?conn_max_idle_time=",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0}, o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "ClientName",
url: "redis://localhost:123?client_name=cluster_hi",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
}, { }, {
test: "UseDefaultMissing=", test: "UseDefaultMissing=",
url: "redis://localhost:123?conn_max_idle_time", url: "redis://localhost:123?conn_max_idle_time",

View File

@ -1110,15 +1110,16 @@ func (cmd *KeyValueSliceCmd) String() string {
} }
// Many commands will respond to two formats: // Many commands will respond to two formats:
// 1) 1) "one" // 1. 1) "one"
// 2) (double) 1 // 2. (double) 1
// 2) 1) "two" // 2. 1) "two"
// 2) (double) 2 // 2. (double) 2
//
// OR: // OR:
// 1) "two" // 1. "two"
// 2) (double) 2 // 2. (double) 2
// 3) "one" // 3. "one"
// 4) (double) 1 // 4. (double) 1
func (cmd *KeyValueSliceCmd) readReply(rd *proto.Reader) error { // nolint:dupl func (cmd *KeyValueSliceCmd) readReply(rd *proto.Reader) error { // nolint:dupl
n, err := rd.ReadArrayLen() n, err := rd.ReadArrayLen()
if err != nil { if err != nil {

View File

@ -14,6 +14,15 @@ import (
"github.com/go-redis/redis/v9/internal/proto" "github.com/go-redis/redis/v9/internal/proto"
) )
type TimeValue struct {
time.Time
}
func (t *TimeValue) ScanRedis(s string) (err error) {
t.Time, err = time.Parse(time.RFC3339Nano, s)
return
}
var _ = Describe("Commands", func() { var _ = Describe("Commands", func() {
ctx := context.TODO() ctx := context.TODO()
var client *redis.Client var client *redis.Client
@ -1192,19 +1201,28 @@ var _ = Describe("Commands", func() {
}) })
It("should scan Mget", func() { It("should scan Mget", func() {
err := client.MSet(ctx, "key1", "hello1", "key2", 123).Err() now := time.Now()
err := client.MSet(ctx, "key1", "hello1", "key2", 123, "time", now.Format(time.RFC3339Nano)).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
res := client.MGet(ctx, "key1", "key2", "_") res := client.MGet(ctx, "key1", "key2", "_", "time")
Expect(res.Err()).NotTo(HaveOccurred()) Expect(res.Err()).NotTo(HaveOccurred())
type data struct { type data struct {
Key1 string `redis:"key1"` Key1 string `redis:"key1"`
Key2 int `redis:"key2"` Key2 int `redis:"key2"`
Time TimeValue `redis:"time"`
} }
var d data var d data
Expect(res.Scan(&d)).NotTo(HaveOccurred()) Expect(res.Scan(&d)).NotTo(HaveOccurred())
Expect(d).To(Equal(data{Key1: "hello1", Key2: 123})) Expect(d.Time.UnixNano()).To(Equal(now.UnixNano()))
d.Time.Time = time.Time{}
Expect(d).To(Equal(data{
Key1: "hello1",
Key2: 123,
Time: TimeValue{Time: time.Time{}},
}))
}) })
It("should MSetNX", func() { It("should MSetNX", func() {
@ -1732,7 +1750,9 @@ var _ = Describe("Commands", func() {
}) })
It("should scan", func() { It("should scan", func() {
err := client.HMSet(ctx, "hash", "key1", "hello1", "key2", 123).Err() now := time.Now()
err := client.HMSet(ctx, "hash", "key1", "hello1", "key2", 123, "time", now.Format(time.RFC3339Nano)).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
res := client.HGetAll(ctx, "hash") res := client.HGetAll(ctx, "hash")
@ -1741,10 +1761,17 @@ var _ = Describe("Commands", func() {
type data struct { type data struct {
Key1 string `redis:"key1"` Key1 string `redis:"key1"`
Key2 int `redis:"key2"` Key2 int `redis:"key2"`
Time TimeValue `redis:"time"`
} }
var d data var d data
Expect(res.Scan(&d)).NotTo(HaveOccurred()) Expect(res.Scan(&d)).NotTo(HaveOccurred())
Expect(d).To(Equal(data{Key1: "hello1", Key2: 123})) Expect(d.Time.UnixNano()).To(Equal(now.UnixNano()))
d.Time.Time = time.Time{}
Expect(d).To(Equal(data{
Key1: "hello1",
Key2: 123,
Time: TimeValue{Time: time.Time{}},
}))
}) })
It("should HIncrBy", func() { It("should HIncrBy", func() {

View File

@ -193,7 +193,11 @@ func (mh *metricsHook) DialHook(hook redis.DialHook) redis.DialHook {
conn, err := hook(ctx, network, addr) conn, err := hook(ctx, network, addr)
mh.createTime.Record(ctx, milliseconds(time.Since(start)), mh.attrs...) attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+1)
attrs = append(attrs, mh.attrs...)
attrs = append(attrs, statusAttr(err))
mh.createTime.Record(ctx, milliseconds(time.Since(start)), attrs...)
return conn, err return conn, err
} }
} }

View File

@ -10,6 +10,12 @@ import (
// decoderFunc represents decoding functions for default built-in types. // decoderFunc represents decoding functions for default built-in types.
type decoderFunc func(reflect.Value, string) error type decoderFunc func(reflect.Value, string) error
// Scanner is the interface implemented by themselves,
// which will override the decoding behavior of decoderFunc.
type Scanner interface {
ScanRedis(s string) error
}
var ( var (
// List of built-in decoders indexed by their numeric constant values (eg: reflect.Bool = 1). // List of built-in decoders indexed by their numeric constant values (eg: reflect.Bool = 1).
decoders = []decoderFunc{ decoders = []decoderFunc{

View File

@ -4,6 +4,7 @@ import (
"math" "math"
"strconv" "strconv"
"testing" "testing"
"time"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -30,6 +31,20 @@ type data struct {
Bool bool `redis:"bool"` Bool bool `redis:"bool"`
} }
type TimeRFC3339Nano struct {
time.Time
}
func (t *TimeRFC3339Nano) ScanRedis(s string) (err error) {
t.Time, err = time.Parse(time.RFC3339Nano, s)
return
}
type TimeData struct {
Name string `redis:"name"`
Time *TimeRFC3339Nano `redis:"login"`
}
type i []interface{} type i []interface{}
func TestGinkgoSuite(t *testing.T) { func TestGinkgoSuite(t *testing.T) {
@ -175,4 +190,14 @@ var _ = Describe("Scan", func() {
Expect(Scan(&d, i{"bool"}, i{""})).To(HaveOccurred()) Expect(Scan(&d, i{"bool"}, i{""})).To(HaveOccurred())
Expect(Scan(&d, i{"bool"}, i{"123"})).To(HaveOccurred()) Expect(Scan(&d, i{"bool"}, i{"123"})).To(HaveOccurred())
}) })
It("Implements Scanner", func() {
var td TimeData
now := time.Now()
Expect(Scan(&td, i{"name", "login"}, i{"hello", now.Format(time.RFC3339Nano)})).NotTo(HaveOccurred())
Expect(td.Name).To(Equal("hello"))
Expect(td.Time.UnixNano()).To(Equal(now.UnixNano()))
Expect(td.Time.Format(time.RFC3339Nano)).To(Equal(now.Format(time.RFC3339Nano)))
})
}) })

View File

@ -84,7 +84,29 @@ func (s StructValue) Scan(key string, value string) error {
if !ok { if !ok {
return nil return nil
} }
if err := field.fn(s.value.Field(field.index), value); err != nil {
v := s.value.Field(field.index)
isPtr := v.Kind() == reflect.Pointer
if isPtr && v.IsNil() {
v.Set(reflect.New(v.Type().Elem()))
}
if !isPtr && v.Type().Name() != "" && v.CanAddr() {
v = v.Addr()
isPtr = true
}
if isPtr && v.Type().NumMethod() > 0 && v.CanInterface() {
if scan, ok := v.Interface().(Scanner); ok {
return scan.ScanRedis(value)
}
}
if isPtr {
v = v.Elem()
}
if err := field.fn(v, value); err != nil {
t := s.value.Type() t := s.value.Type()
return fmt.Errorf("cannot scan redis.result %s into struct field %s.%s of type %s, error-%s", return fmt.Errorf("cannot scan redis.result %s into struct field %s.%s of type %s, error-%s",
value, t.Name(), t.Field(field.index).Name, t.Field(field.index).Type, err.Error()) value, t.Name(), t.Field(field.index).Name, t.Field(field.index).Type, err.Error())

View File

@ -32,7 +32,9 @@ type Once struct {
// Do calls the function f if and only if Do has not been invoked // Do calls the function f if and only if Do has not been invoked
// without error for this instance of Once. In other words, given // without error for this instance of Once. In other words, given
//
// var once Once // var once Once
//
// if once.Do(f) is called multiple times, only the first call will // if once.Do(f) is called multiple times, only the first call will
// invoke f, even if f has a different value in each invocation unless // invoke f, even if f has a different value in each invocation unless
// f returns an error. A new instance of Once is required for each // f returns an error. A new instance of Once is required for each
@ -41,6 +43,7 @@ type Once struct {
// Do is intended for initialization that must be run exactly once. Since f // Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the // is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do: // arguments to a function to be invoked by Do:
//
// err := config.once.Do(func() error { return config.init(filename) }) // err := config.once.Do(func() error { return config.init(filename) })
func (o *Once) Do(f func() error) error { func (o *Once) Do(f func() error) error {
if atomic.LoadUint32(&o.done) == 1 { if atomic.LoadUint32(&o.done) == 1 {

View File

@ -11,6 +11,7 @@ import (
) )
// Scan parses bytes `b` to `v` with appropriate type. // Scan parses bytes `b` to `v` with appropriate type.
//
//nolint:gocyclo //nolint:gocyclo
func Scan(b []byte, v interface{}) error { func Scan(b []byte, v interface{}) error {
switch v := v.(type) { switch v := v.(type) {

View File

@ -27,7 +27,7 @@ type Limiter interface {
ReportResult(result error) ReportResult(result error)
} }
// Options keeps the settings to setup redis connection. // Options keeps the settings to set up redis connection.
type Options struct { type Options struct {
// The network type, either tcp or unix. // The network type, either tcp or unix.
// Default is tcp. // Default is tcp.
@ -35,6 +35,9 @@ type Options struct {
// host:port address. // host:port address.
Addr string Addr string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// Dialer creates new network connection and has priority over // Dialer creates new network connection and has priority over
// Network and Addr options. // Network and Addr options.
Dialer func(ctx context.Context, network, addr string) (net.Conn, error) Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
@ -220,9 +223,13 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
// Scheme is required. // Scheme is required.
// There are two connection types: by tcp socket and by unix socket. // There are two connection types: by tcp socket and by unix socket.
// Tcp connection: // Tcp connection:
//
// redis://<user>:<password>@<host>:<port>/<db_number> // redis://<user>:<password>@<host>:<port>/<db_number>
//
// Unix connection: // Unix connection:
//
// unix://<user>:<password>@</path/to/redis.sock>?db=<db_number> // unix://<user>:<password>@</path/to/redis.sock>?db=<db_number>
//
// Most Option fields can be set using query parameters, with the following restrictions: // Most Option fields can be set using query parameters, with the following restrictions:
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries // - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration) // - only scalar type fields are supported (bool, int, time.Duration)
@ -235,7 +242,9 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these // URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// names will be treated as unknown parameters // names will be treated as unknown parameters
// - unknown parameter names will result in an error // - unknown parameter names will result in an error
//
// Examples: // Examples:
//
// redis://user:password@localhost:6789/3?dial_timeout=3&db=1&read_timeout=6s&max_retries=2 // redis://user:password@localhost:6789/3?dial_timeout=3&db=1&read_timeout=6s&max_retries=2
// is equivalent to: // is equivalent to:
// &Options{ // &Options{
@ -426,6 +435,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
o.DB = db o.DB = db
} }
o.ClientName = q.string("client_name")
o.MaxRetries = q.int("max_retries") o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff") o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff") o.MaxRetryBackoff = q.duration("max_retry_backoff")

View File

@ -59,6 +59,9 @@ func TestParseURL(t *testing.T) {
}, { }, {
url: "redis://localhost:123/?db=2&conn_max_idle_time", // missing "=" at the end url: "redis://localhost:123/?db=2&conn_max_idle_time", // missing "=" at the end
o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0}, o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0},
}, {
url: "redis://localhost:123/?db=2&client_name=hi", // client name
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
}, { }, {
url: "unix:///tmp/redis.sock", url: "unix:///tmp/redis.sock",
o: &Options{Addr: "/tmp/redis.sock"}, o: &Options{Addr: "/tmp/redis.sock"},

View File

@ -10,10 +10,14 @@ import (
"time" "time"
"github.com/go-redis/redis/v9/internal" "github.com/go-redis/redis/v9/internal"
"github.com/go-redis/redis/v9/internal/hscan"
"github.com/go-redis/redis/v9/internal/pool" "github.com/go-redis/redis/v9/internal/pool"
"github.com/go-redis/redis/v9/internal/proto" "github.com/go-redis/redis/v9/internal/proto"
) )
// Scanner internal/hscan.Scanner exposed interface.
type Scanner = hscan.Scanner
// Nil reply returned by Redis when key does not exist. // Nil reply returned by Redis when key does not exist.
const Nil = proto.Nil const Nil = proto.Nil
@ -25,9 +29,9 @@ func SetLogger(logger internal.Logging) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Hook interface { type Hook interface {
DialHook(hook DialHook) DialHook DialHook(next DialHook) DialHook
ProcessHook(hook ProcessHook) ProcessHook ProcessHook(next ProcessHook) ProcessHook
ProcessPipelineHook(hook ProcessPipelineHook) ProcessPipelineHook ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
} }
type ( type (
@ -44,6 +48,43 @@ type hooks struct {
processTxPipelineHook ProcessPipelineHook processTxPipelineHook ProcessPipelineHook
} }
// AddHook is to add a hook to the queue.
// Hook is a function executed during network connection, command execution, and pipeline,
// it is a first-in-last-out stack queue (FILO).
// The first to be added to the queue is the execution function of the redis command (the last to be executed).
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
// For example, you added hook-1, hook-2:
//
// client.AddHook(hook-1, hook-2)
//
// hook-1:
//
// func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
// return func(ctx context.Context, cmd Cmder) error {
// print("hook-1 start")
// next(ctx, cmd)
// print("hook-1 end")
// return nil
// }
// }
//
// hook-2:
//
// func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
// return func(ctx context.Context, cmd redis.Cmder) error {
// print("hook-2 start")
// next(ctx, cmd)
// print("hook-2 end")
// return nil
// }
// }
//
// The execution sequence is:
//
// hook-2 start -> hook-1 start -> exec redis cmd -> hook-1 end -> hook-2 end
//
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
// if "next(ctx, cmd)" is not executed in hook-1, the redis command will not be executed.
func (hs *hooks) AddHook(hook Hook) { func (hs *hooks) AddHook(hook Hook) {
hs.slice = append(hs.slice, hook) hs.slice = append(hs.slice, hook)
hs.dialHook = hook.DialHook(hs.dialHook) hs.dialHook = hook.DialHook(hs.dialHook)
@ -256,6 +297,10 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
pipe.ReadOnly(ctx) pipe.ReadOnly(ctx)
} }
if c.opt.ClientName != "" {
pipe.ClientSetName(ctx, c.opt.ClientName)
}
return nil return nil
}) })
if err != nil { if err != nil {
@ -567,7 +612,7 @@ func (c *Client) Conn() *Conn {
return newConn(c.opt, pool.NewStickyConnPool(c.connPool)) return newConn(c.opt, pool.NewStickyConnPool(c.connPool))
} }
// Do creates a Cmd from the args and processes the cmd. // Do create a Cmd from the args and processes the cmd.
func (c *Client) Do(ctx context.Context, args ...interface{}) *Cmd { func (c *Client) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...) cmd := NewCmd(ctx, args...)
_ = c.Process(ctx, cmd) _ = c.Process(ctx, cmd)

View File

@ -169,6 +169,21 @@ var _ = Describe("Client", func() {
Expect(db2.Close()).NotTo(HaveOccurred()) Expect(db2.Close()).NotTo(HaveOccurred())
}) })
It("should client setname", func() {
opt := redisOptions()
opt.ClientName = "hi"
db := redis.NewClient(opt)
defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()
Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred())
val, err := db.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=hi"))
})
It("processes custom commands", func() { It("processes custom commands", func() {
cmd := redis.NewCmd(ctx, "PING") cmd := redis.NewCmd(ctx, "PING")
_ = client.Process(ctx, cmd) _ = client.Process(ctx, cmd)

View File

@ -51,6 +51,9 @@ type RingOptions struct {
// NewClient creates a shard client with provided options. // NewClient creates a shard client with provided options.
NewClient func(opt *Options) *Client NewClient func(opt *Options) *Client
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// Frequency of PING commands sent to check shards availability. // Frequency of PING commands sent to check shards availability.
// Shard is considered down after 3 subsequent failed checks. // Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration HeartbeatFrequency time.Duration
@ -129,6 +132,7 @@ func (opt *RingOptions) init() {
func (opt *RingOptions) clientOptions() *Options { func (opt *RingOptions) clientOptions() *Options {
return &Options{ return &Options{
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -522,7 +526,7 @@ func (c *Ring) SetAddrs(addrs map[string]string) {
c.sharding.SetAddrs(addrs) c.sharding.SetAddrs(addrs)
} }
// Do creates a Cmd from the args and processes the cmd. // Do create a Cmd from the args and processes the cmd.
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd { func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...) cmd := NewCmd(ctx, args...)
_ = c.Process(ctx, cmd) _ = c.Process(ctx, cmd)

View File

@ -29,6 +29,7 @@ var _ = Describe("Redis Ring", func() {
BeforeEach(func() { BeforeEach(func() {
opt := redisRingOptions() opt := redisRingOptions()
opt.ClientName = "ring_hi"
opt.HeartbeatFrequency = heartbeat opt.HeartbeatFrequency = heartbeat
ring = redis.NewRing(opt) ring = redis.NewRing(opt)
@ -50,6 +51,20 @@ var _ = Describe("Redis Ring", func() {
Expect(err).To(MatchError("context canceled")) Expect(err).To(MatchError("context canceled"))
}) })
It("should ring client setname", func() {
err := ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=ring_hi"))
return nil
})
})
It("distributes keys", func() { It("distributes keys", func() {
setRingKeys() setRingKeys()

View File

@ -24,6 +24,9 @@ type FailoverOptions struct {
// A seed list of host:port addresses of sentinel nodes. // A seed list of host:port addresses of sentinel nodes.
SentinelAddrs []string SentinelAddrs []string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// If specified with SentinelPassword, enables ACL-based authentication (via // If specified with SentinelPassword, enables ACL-based authentication (via
// AUTH <user> <pass>). // AUTH <user> <pass>).
SentinelUsername string SentinelUsername string
@ -79,6 +82,7 @@ type FailoverOptions struct {
func (opt *FailoverOptions) clientOptions() *Options { func (opt *FailoverOptions) clientOptions() *Options {
return &Options{ return &Options{
Addr: "FailoverClient", Addr: "FailoverClient",
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -111,6 +115,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
func (opt *FailoverOptions) sentinelOptions(addr string) *Options { func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
return &Options{ return &Options{
Addr: addr, Addr: addr,
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -141,6 +146,8 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
func (opt *FailoverOptions) clusterOptions() *ClusterOptions { func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
return &ClusterOptions{ return &ClusterOptions{
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,

View File

@ -1,6 +1,7 @@
package redis_test package redis_test
import ( import (
"context"
"net" "net"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -17,6 +18,7 @@ var _ = Describe("Sentinel", func() {
BeforeEach(func() { BeforeEach(func() {
client = redis.NewFailoverClient(&redis.FailoverOptions{ client = redis.NewFailoverClient(&redis.FailoverOptions{
ClientName: "sentinel_hi",
MasterName: sentinelName, MasterName: sentinelName,
SentinelAddrs: sentinelAddrs, SentinelAddrs: sentinelAddrs,
MaxRetries: -1, MaxRetries: -1,
@ -125,6 +127,13 @@ var _ = Describe("Sentinel", func() {
err := client.Ping(ctx).Err() err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should sentinel client setname", func() {
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
val, err := client.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
})
}) })
var _ = Describe("NewFailoverClusterClient", func() { var _ = Describe("NewFailoverClusterClient", func() {
@ -134,6 +143,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
BeforeEach(func() { BeforeEach(func() {
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{ client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
ClientName: "sentinel_cluster_hi",
MasterName: sentinelName, MasterName: sentinelName,
SentinelAddrs: sentinelAddrs, SentinelAddrs: sentinelAddrs,
@ -213,6 +223,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
_, err = startRedis(masterPort) _, err = startRedis(masterPort)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should sentinel cluster client setname", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=sentinel_cluster_hi"))
return nil
})
})
}) })
var _ = Describe("SentinelAclAuth", func() { var _ = Describe("SentinelAclAuth", func() {

View File

@ -14,6 +14,9 @@ type UniversalOptions struct {
// of cluster/sentinel nodes. // of cluster/sentinel nodes.
Addrs []string Addrs []string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// Database to be selected after connecting to the server. // Database to be selected after connecting to the server.
// Only single-node and failover clients. // Only single-node and failover clients.
DB int DB int
@ -70,6 +73,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
return &ClusterOptions{ return &ClusterOptions{
Addrs: o.Addrs, Addrs: o.Addrs,
ClientName: o.ClientName,
Dialer: o.Dialer, Dialer: o.Dialer,
OnConnect: o.OnConnect, OnConnect: o.OnConnect,
@ -112,6 +116,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
return &FailoverOptions{ return &FailoverOptions{
SentinelAddrs: o.Addrs, SentinelAddrs: o.Addrs,
MasterName: o.MasterName, MasterName: o.MasterName,
ClientName: o.ClientName,
Dialer: o.Dialer, Dialer: o.Dialer,
OnConnect: o.OnConnect, OnConnect: o.OnConnect,
@ -152,6 +157,7 @@ func (o *UniversalOptions) Simple() *Options {
return &Options{ return &Options{
Addr: addr, Addr: addr,
ClientName: o.ClientName,
Dialer: o.Dialer, Dialer: o.Dialer,
OnConnect: o.OnConnect, OnConnect: o.OnConnect,