Merge pull request #2333 from monkey92t/fix_2312

feat: add ClientName option
This commit is contained in:
Monkey 2022-12-28 22:31:25 +08:00 committed by GitHub
commit c7bc54b4d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 121 additions and 15 deletions

View File

@ -29,6 +29,9 @@ type ClusterOptions struct {
// A seed list of host:port addresses of cluster nodes. // A seed list of host:port addresses of cluster nodes.
Addrs []string Addrs []string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// NewClient creates a cluster node client with provided name and options. // NewClient creates a cluster node client with provided name and options.
NewClient func(opt *Options) *Client NewClient func(opt *Options) *Client
@ -208,6 +211,7 @@ func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptio
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) { func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
q := queryOptions{q: u.Query()} q := queryOptions{q: u.Query()}
o.ClientName = q.string("client_name")
o.MaxRedirects = q.int("max_redirects") o.MaxRedirects = q.int("max_redirects")
o.ReadOnly = q.bool("read_only") o.ReadOnly = q.bool("read_only")
o.RouteByLatency = q.bool("route_by_latency") o.RouteByLatency = q.bool("route_by_latency")
@ -250,6 +254,7 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
func (opt *ClusterOptions) clientOptions() *Options { func (opt *ClusterOptions) clientOptions() *Options {
return &Options{ return &Options{
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -871,7 +876,7 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close() return c.nodes.Close()
} }
// Do creates a Cmd from the args and processes the cmd. // Do create a Cmd from the args and processes the cmd.
func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd { func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...) cmd := NewCmd(ctx, args...)
_ = c.Process(ctx, cmd) _ = c.Process(ctx, cmd)

View File

@ -589,6 +589,7 @@ var _ = Describe("ClusterClient", func() {
Describe("ClusterClient", func() { Describe("ClusterClient", func() {
BeforeEach(func() { BeforeEach(func() {
opt = redisClusterOptions() opt = redisClusterOptions()
opt.ClientName = "cluster_hi"
client = cluster.newClusterClient(ctx, opt) client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
@ -679,6 +680,20 @@ var _ = Describe("ClusterClient", func() {
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
}) })
It("should cluster client setname", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=cluster_hi"))
return nil
})
})
It("should CLUSTER NODES", func() { It("should CLUSTER NODES", func() {
res, err := client.ClusterNodes(ctx).Result() res, err := client.ClusterNodes(ctx).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -1408,6 +1423,10 @@ func TestParseClusterURL(t *testing.T) {
test: "UseDefault", test: "UseDefault",
url: "redis://localhost:123?conn_max_idle_time=", url: "redis://localhost:123?conn_max_idle_time=",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0}, o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "ClientName",
url: "redis://localhost:123?client_name=cluster_hi",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
}, { }, {
test: "UseDefaultMissing=", test: "UseDefaultMissing=",
url: "redis://localhost:123?conn_max_idle_time", url: "redis://localhost:123?conn_max_idle_time",

View File

@ -35,6 +35,9 @@ type Options struct {
// host:port address. // host:port address.
Addr string Addr string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// Dialer creates new network connection and has priority over // Dialer creates new network connection and has priority over
// Network and Addr options. // Network and Addr options.
Dialer func(ctx context.Context, network, addr string) (net.Conn, error) Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
@ -426,6 +429,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
o.DB = db o.DB = db
} }
o.ClientName = q.string("client_name")
o.MaxRetries = q.int("max_retries") o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff") o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff") o.MaxRetryBackoff = q.duration("max_retry_backoff")

View File

@ -59,6 +59,9 @@ func TestParseURL(t *testing.T) {
}, { }, {
url: "redis://localhost:123/?db=2&conn_max_idle_time", // missing "=" at the end url: "redis://localhost:123/?db=2&conn_max_idle_time", // missing "=" at the end
o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0}, o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0},
}, {
url: "redis://localhost:123/?db=2&client_name=hi", // client name
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
}, { }, {
url: "unix:///tmp/redis.sock", url: "unix:///tmp/redis.sock",
o: &Options{Addr: "/tmp/redis.sock"}, o: &Options{Addr: "/tmp/redis.sock"},

View File

@ -260,6 +260,10 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
pipe.ReadOnly(ctx) pipe.ReadOnly(ctx)
} }
if c.opt.ClientName != "" {
pipe.ClientSetName(ctx, c.opt.ClientName)
}
return nil return nil
}) })
if err != nil { if err != nil {

View File

@ -169,6 +169,21 @@ var _ = Describe("Client", func() {
Expect(db2.Close()).NotTo(HaveOccurred()) Expect(db2.Close()).NotTo(HaveOccurred())
}) })
It("should client setname", func() {
opt := redisOptions()
opt.ClientName = "hi"
db := redis.NewClient(opt)
defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()
Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred())
val, err := db.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=hi"))
})
It("processes custom commands", func() { It("processes custom commands", func() {
cmd := redis.NewCmd(ctx, "PING") cmd := redis.NewCmd(ctx, "PING")
_ = client.Process(ctx, cmd) _ = client.Process(ctx, cmd)

View File

@ -51,6 +51,9 @@ type RingOptions struct {
// NewClient creates a shard client with provided options. // NewClient creates a shard client with provided options.
NewClient func(opt *Options) *Client NewClient func(opt *Options) *Client
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// Frequency of PING commands sent to check shards availability. // Frequency of PING commands sent to check shards availability.
// Shard is considered down after 3 subsequent failed checks. // Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration HeartbeatFrequency time.Duration
@ -129,6 +132,7 @@ func (opt *RingOptions) init() {
func (opt *RingOptions) clientOptions() *Options { func (opt *RingOptions) clientOptions() *Options {
return &Options{ return &Options{
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -522,7 +526,7 @@ func (c *Ring) SetAddrs(addrs map[string]string) {
c.sharding.SetAddrs(addrs) c.sharding.SetAddrs(addrs)
} }
// Do creates a Cmd from the args and processes the cmd. // Do create a Cmd from the args and processes the cmd.
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd { func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...) cmd := NewCmd(ctx, args...)
_ = c.Process(ctx, cmd) _ = c.Process(ctx, cmd)

View File

@ -29,6 +29,7 @@ var _ = Describe("Redis Ring", func() {
BeforeEach(func() { BeforeEach(func() {
opt := redisRingOptions() opt := redisRingOptions()
opt.ClientName = "ring_hi"
opt.HeartbeatFrequency = heartbeat opt.HeartbeatFrequency = heartbeat
ring = redis.NewRing(opt) ring = redis.NewRing(opt)
@ -50,6 +51,20 @@ var _ = Describe("Redis Ring", func() {
Expect(err).To(MatchError("context canceled")) Expect(err).To(MatchError("context canceled"))
}) })
It("should ring client setname", func() {
err := ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=ring_hi"))
return nil
})
})
It("distributes keys", func() { It("distributes keys", func() {
setRingKeys() setRingKeys()

View File

@ -24,6 +24,9 @@ type FailoverOptions struct {
// A seed list of host:port addresses of sentinel nodes. // A seed list of host:port addresses of sentinel nodes.
SentinelAddrs []string SentinelAddrs []string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// If specified with SentinelPassword, enables ACL-based authentication (via // If specified with SentinelPassword, enables ACL-based authentication (via
// AUTH <user> <pass>). // AUTH <user> <pass>).
SentinelUsername string SentinelUsername string
@ -79,6 +82,7 @@ type FailoverOptions struct {
func (opt *FailoverOptions) clientOptions() *Options { func (opt *FailoverOptions) clientOptions() *Options {
return &Options{ return &Options{
Addr: "FailoverClient", Addr: "FailoverClient",
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -111,6 +115,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
func (opt *FailoverOptions) sentinelOptions(addr string) *Options { func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
return &Options{ return &Options{
Addr: addr, Addr: addr,
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -141,6 +146,8 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
func (opt *FailoverOptions) clusterOptions() *ClusterOptions { func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
return &ClusterOptions{ return &ClusterOptions{
ClientName: opt.ClientName,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,

View File

@ -1,6 +1,7 @@
package redis_test package redis_test
import ( import (
"context"
"net" "net"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -17,6 +18,7 @@ var _ = Describe("Sentinel", func() {
BeforeEach(func() { BeforeEach(func() {
client = redis.NewFailoverClient(&redis.FailoverOptions{ client = redis.NewFailoverClient(&redis.FailoverOptions{
ClientName: "sentinel_hi",
MasterName: sentinelName, MasterName: sentinelName,
SentinelAddrs: sentinelAddrs, SentinelAddrs: sentinelAddrs,
MaxRetries: -1, MaxRetries: -1,
@ -125,6 +127,13 @@ var _ = Describe("Sentinel", func() {
err := client.Ping(ctx).Err() err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should sentinel client setname", func() {
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
val, err := client.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
})
}) })
var _ = Describe("NewFailoverClusterClient", func() { var _ = Describe("NewFailoverClusterClient", func() {
@ -134,6 +143,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
BeforeEach(func() { BeforeEach(func() {
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{ client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
ClientName: "sentinel_cluster_hi",
MasterName: sentinelName, MasterName: sentinelName,
SentinelAddrs: sentinelAddrs, SentinelAddrs: sentinelAddrs,
@ -213,6 +223,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
_, err = startRedis(masterPort) _, err = startRedis(masterPort)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should sentinel cluster client setname", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=sentinel_cluster_hi"))
return nil
})
})
}) })
var _ = Describe("SentinelAclAuth", func() { var _ = Describe("SentinelAclAuth", func() {

View File

@ -14,6 +14,9 @@ type UniversalOptions struct {
// of cluster/sentinel nodes. // of cluster/sentinel nodes.
Addrs []string Addrs []string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// Database to be selected after connecting to the server. // Database to be selected after connecting to the server.
// Only single-node and failover clients. // Only single-node and failover clients.
DB int DB int
@ -70,6 +73,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
return &ClusterOptions{ return &ClusterOptions{
Addrs: o.Addrs, Addrs: o.Addrs,
ClientName: o.ClientName,
Dialer: o.Dialer, Dialer: o.Dialer,
OnConnect: o.OnConnect, OnConnect: o.OnConnect,
@ -112,6 +116,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
return &FailoverOptions{ return &FailoverOptions{
SentinelAddrs: o.Addrs, SentinelAddrs: o.Addrs,
MasterName: o.MasterName, MasterName: o.MasterName,
ClientName: o.ClientName,
Dialer: o.Dialer, Dialer: o.Dialer,
OnConnect: o.OnConnect, OnConnect: o.OnConnect,
@ -152,6 +157,7 @@ func (o *UniversalOptions) Simple() *Options {
return &Options{ return &Options{
Addr: addr, Addr: addr,
ClientName: o.ClientName,
Dialer: o.Dialer, Dialer: o.Dialer,
OnConnect: o.OnConnect, OnConnect: o.OnConnect,