mirror of https://github.com/go-redis/redis.git
feat: add protocol option (#2598)
This commit is contained in:
parent
31ba855dde
commit
391798880c
|
@ -62,6 +62,7 @@ type ClusterOptions struct {
|
||||||
|
|
||||||
OnConnect func(ctx context.Context, cn *Conn) error
|
OnConnect func(ctx context.Context, cn *Conn) error
|
||||||
|
|
||||||
|
Protocol int
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
|
|
||||||
|
@ -263,6 +264,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
|
Protocol: opt.Protocol,
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
|
||||||
|
|
|
@ -583,6 +583,35 @@ var _ = Describe("ClusterClient", func() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Describe("ClusterClient PROTO 2", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
opt = redisClusterOptions()
|
||||||
|
opt.Protocol = 2
|
||||||
|
client = cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
return master.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
return master.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER PROTO 2", func() {
|
||||||
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainElements("proto", int64(2)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Describe("ClusterClient", func() {
|
Describe("ClusterClient", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
opt = redisClusterOptions()
|
opt = redisClusterOptions()
|
||||||
|
@ -746,6 +775,15 @@ var _ = Describe("ClusterClient", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER PROTO 3", func() {
|
||||||
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
It("should CLUSTER MYSHARDID", func() {
|
It("should CLUSTER MYSHARDID", func() {
|
||||||
shardID, err := client.ClusterMyShardID(ctx).Result()
|
shardID, err := client.ClusterMyShardID(ctx).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
|
@ -45,6 +45,9 @@ type Options struct {
|
||||||
// Hook that is called when new connection is established.
|
// Hook that is called when new connection is established.
|
||||||
OnConnect func(ctx context.Context, cn *Conn) error
|
OnConnect func(ctx context.Context, cn *Conn) error
|
||||||
|
|
||||||
|
// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
|
||||||
|
// Default is 3.
|
||||||
|
Protocol int
|
||||||
// Use the specified Username to authenticate the current connection
|
// Use the specified Username to authenticate the current connection
|
||||||
// with one of the connections defined in the ACL list when connecting
|
// with one of the connections defined in the ACL list when connecting
|
||||||
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
|
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
|
||||||
|
@ -437,6 +440,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
|
||||||
o.DB = db
|
o.DB = db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
o.Protocol = q.int("protocol")
|
||||||
o.ClientName = q.string("client_name")
|
o.ClientName = q.string("client_name")
|
||||||
o.MaxRetries = q.int("max_retries")
|
o.MaxRetries = q.int("max_retries")
|
||||||
o.MinRetryBackoff = q.duration("min_retry_backoff")
|
o.MinRetryBackoff = q.duration("min_retry_backoff")
|
||||||
|
|
|
@ -62,6 +62,9 @@ func TestParseURL(t *testing.T) {
|
||||||
}, {
|
}, {
|
||||||
url: "redis://localhost:123/?db=2&client_name=hi", // client name
|
url: "redis://localhost:123/?db=2&client_name=hi", // client name
|
||||||
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
|
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
|
||||||
|
}, {
|
||||||
|
url: "redis://localhost:123/?db=2&protocol=2", // RESP Protocol
|
||||||
|
o: &Options{Addr: "localhost:123", DB: 2, Protocol: 2},
|
||||||
}, {
|
}, {
|
||||||
url: "unix:///tmp/redis.sock",
|
url: "unix:///tmp/redis.sock",
|
||||||
o: &Options{Addr: "/tmp/redis.sock"},
|
o: &Options{Addr: "/tmp/redis.sock"},
|
||||||
|
|
7
redis.go
7
redis.go
|
@ -279,10 +279,15 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||||
conn := newConn(c.opt, connPool)
|
conn := newConn(c.opt, connPool)
|
||||||
|
|
||||||
var auth bool
|
var auth bool
|
||||||
|
protocol := c.opt.Protocol
|
||||||
|
// By default, use RESP3 in current version.
|
||||||
|
if protocol < 2 {
|
||||||
|
protocol = 3
|
||||||
|
}
|
||||||
|
|
||||||
// for redis-server versions that do not support the HELLO command,
|
// for redis-server versions that do not support the HELLO command,
|
||||||
// RESP2 will continue to be used.
|
// RESP2 will continue to be used.
|
||||||
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
|
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
|
||||||
auth = true
|
auth = true
|
||||||
} else if !isRedisError(err) {
|
} else if !isRedisError(err) {
|
||||||
// When the server responds with the RESP protocol and the result is not a normal
|
// When the server responds with the RESP protocol and the result is not a normal
|
||||||
|
|
|
@ -185,6 +185,33 @@ var _ = Describe("Client", func() {
|
||||||
Expect(val).Should(ContainSubstring("name=hi"))
|
Expect(val).Should(ContainSubstring("name=hi"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should client PROTO 2", func() {
|
||||||
|
opt := redisOptions()
|
||||||
|
opt.Protocol = 2
|
||||||
|
db := redis.NewClient(opt)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
Expect(db.Close()).NotTo(HaveOccurred())
|
||||||
|
}()
|
||||||
|
|
||||||
|
val, err := db.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainElements("proto", int64(2)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should client PROTO 3", func() {
|
||||||
|
opt := redisOptions()
|
||||||
|
db := redis.NewClient(opt)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
Expect(db.Close()).NotTo(HaveOccurred())
|
||||||
|
}()
|
||||||
|
|
||||||
|
val, err := db.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
|
||||||
|
})
|
||||||
|
|
||||||
It("processes custom commands", func() {
|
It("processes custom commands", func() {
|
||||||
cmd := redis.NewCmd(ctx, "PING")
|
cmd := redis.NewCmd(ctx, "PING")
|
||||||
_ = client.Process(ctx, cmd)
|
_ = client.Process(ctx, cmd)
|
||||||
|
|
4
ring.go
4
ring.go
|
@ -12,7 +12,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cespare/xxhash/v2"
|
"github.com/cespare/xxhash/v2"
|
||||||
rendezvous "github.com/dgryski/go-rendezvous" //nolint
|
"github.com/dgryski/go-rendezvous" //nolint
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal"
|
"github.com/redis/go-redis/v9/internal"
|
||||||
"github.com/redis/go-redis/v9/internal/hashtag"
|
"github.com/redis/go-redis/v9/internal/hashtag"
|
||||||
|
@ -70,6 +70,7 @@ type RingOptions struct {
|
||||||
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
OnConnect func(ctx context.Context, cn *Conn) error
|
OnConnect func(ctx context.Context, cn *Conn) error
|
||||||
|
|
||||||
|
Protocol int
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
DB int
|
DB int
|
||||||
|
@ -136,6 +137,7 @@ func (opt *RingOptions) clientOptions() *Options {
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
|
Protocol: opt.Protocol,
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
DB: opt.DB,
|
DB: opt.DB,
|
||||||
|
|
40
ring_test.go
40
ring_test.go
|
@ -15,6 +15,37 @@ import (
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Redis Ring PROTO 2", func() {
|
||||||
|
const heartbeat = 100 * time.Millisecond
|
||||||
|
|
||||||
|
var ring *redis.Ring
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
opt := redisRingOptions()
|
||||||
|
opt.Protocol = 2
|
||||||
|
opt.HeartbeatFrequency = heartbeat
|
||||||
|
ring = redis.NewRing(opt)
|
||||||
|
|
||||||
|
err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
|
||||||
|
return cl.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
Expect(ring.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should ring PROTO 2", func() {
|
||||||
|
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainElements("proto", int64(2)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
var _ = Describe("Redis Ring", func() {
|
var _ = Describe("Redis Ring", func() {
|
||||||
const heartbeat = 100 * time.Millisecond
|
const heartbeat = 100 * time.Millisecond
|
||||||
|
|
||||||
|
@ -65,6 +96,15 @@ var _ = Describe("Redis Ring", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should ring PROTO 3", func() {
|
||||||
|
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := c.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
It("distributes keys", func() {
|
It("distributes keys", func() {
|
||||||
setRingKeys()
|
setRingKeys()
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@ type FailoverOptions struct {
|
||||||
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
OnConnect func(ctx context.Context, cn *Conn) error
|
OnConnect func(ctx context.Context, cn *Conn) error
|
||||||
|
|
||||||
|
Protocol int
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
DB int
|
DB int
|
||||||
|
@ -88,6 +89,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
DB: opt.DB,
|
DB: opt.DB,
|
||||||
|
Protocol: opt.Protocol,
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
|
||||||
|
@ -151,6 +153,7 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
|
Protocol: opt.Protocol,
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,30 @@ import (
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Sentinel PROTO 2", func() {
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
|
MasterName: sentinelName,
|
||||||
|
SentinelAddrs: sentinelAddrs,
|
||||||
|
MaxRetries: -1,
|
||||||
|
Protocol: 2,
|
||||||
|
})
|
||||||
|
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
_ = client.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should sentinel client PROTO 2", func() {
|
||||||
|
val, err := client.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainElements("proto", int64(2)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
var _ = Describe("Sentinel", func() {
|
var _ = Describe("Sentinel", func() {
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
var master *redis.Client
|
var master *redis.Client
|
||||||
|
@ -134,6 +158,40 @@ var _ = Describe("Sentinel", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
|
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should sentinel client PROTO 3", func() {
|
||||||
|
val, err := client.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
var _ = Describe("NewFailoverClusterClient PROTO 2", func() {
|
||||||
|
var client *redis.ClusterClient
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
|
||||||
|
MasterName: sentinelName,
|
||||||
|
SentinelAddrs: sentinelAddrs,
|
||||||
|
Protocol: 2,
|
||||||
|
|
||||||
|
RouteRandomly: true,
|
||||||
|
})
|
||||||
|
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
_ = client.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should sentinel cluster PROTO 2", func() {
|
||||||
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := client.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(ContainElements("proto", int64(2)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("NewFailoverClusterClient", func() {
|
var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
|
@ -237,6 +295,15 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should sentinel cluster PROTO 3", func() {
|
||||||
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
||||||
|
val, err := client.Do(ctx, "HELLO").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("SentinelAclAuth", func() {
|
var _ = Describe("SentinelAclAuth", func() {
|
||||||
|
|
|
@ -26,6 +26,7 @@ type UniversalOptions struct {
|
||||||
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
OnConnect func(ctx context.Context, cn *Conn) error
|
OnConnect func(ctx context.Context, cn *Conn) error
|
||||||
|
|
||||||
|
Protocol int
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
SentinelUsername string
|
SentinelUsername string
|
||||||
|
@ -77,6 +78,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
|
||||||
Dialer: o.Dialer,
|
Dialer: o.Dialer,
|
||||||
OnConnect: o.OnConnect,
|
OnConnect: o.OnConnect,
|
||||||
|
|
||||||
|
Protocol: o.Protocol,
|
||||||
Username: o.Username,
|
Username: o.Username,
|
||||||
Password: o.Password,
|
Password: o.Password,
|
||||||
|
|
||||||
|
@ -122,6 +124,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
|
||||||
OnConnect: o.OnConnect,
|
OnConnect: o.OnConnect,
|
||||||
|
|
||||||
DB: o.DB,
|
DB: o.DB,
|
||||||
|
Protocol: o.Protocol,
|
||||||
Username: o.Username,
|
Username: o.Username,
|
||||||
Password: o.Password,
|
Password: o.Password,
|
||||||
SentinelUsername: o.SentinelUsername,
|
SentinelUsername: o.SentinelUsername,
|
||||||
|
@ -162,6 +165,7 @@ func (o *UniversalOptions) Simple() *Options {
|
||||||
OnConnect: o.OnConnect,
|
OnConnect: o.OnConnect,
|
||||||
|
|
||||||
DB: o.DB,
|
DB: o.DB,
|
||||||
|
Protocol: o.Protocol,
|
||||||
Username: o.Username,
|
Username: o.Username,
|
||||||
Password: o.Password,
|
Password: o.Password,
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue