Merge pull request #1448 from rueian/pubsub-message-payload-slice

Support string array in pubsub message payload
This commit is contained in:
Vladimir Mihailenco 2020-09-08 18:02:19 +03:00 committed by GitHub
commit 8a25ceadae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 7 deletions

View File

@ -290,6 +290,7 @@ type Message struct {
Channel string Channel string
Pattern string Pattern string
Payload string Payload string
PayloadSlice []string
} }
func (m *Message) String() string { func (m *Message) String() string {
@ -325,10 +326,24 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
Count: int(reply[2].(int64)), Count: int(reply[2].(int64)),
}, nil }, nil
case "message": case "message":
switch payload := reply[2].(type) {
case string:
return &Message{ return &Message{
Channel: reply[1].(string), Channel: reply[1].(string),
Payload: reply[2].(string), Payload: payload,
}, nil }, nil
case []interface{}:
ss := make([]string, len(payload))
for i, s := range payload {
ss[i] = s.(string)
}
return &Message{
Channel: reply[1].(string),
PayloadSlice: ss,
}, nil
default:
return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload)
}
case "pmessage": case "pmessage":
return &Message{ return &Message{
Pattern: reply[1].(string), Pattern: reply[1].(string),

View File

@ -1,6 +1,7 @@
package redis_test package redis_test
import ( import (
"context"
"io" "io"
"net" "net"
"sync" "sync"
@ -14,11 +15,16 @@ import (
var _ = Describe("PubSub", func() { var _ = Describe("PubSub", func() {
var client *redis.Client var client *redis.Client
var clientID int64
BeforeEach(func() { BeforeEach(func() {
opt := redisOptions() opt := redisOptions()
opt.MinIdleConns = 0 opt.MinIdleConns = 0
opt.MaxConnAge = 0 opt.MaxConnAge = 0
opt.OnConnect = func(ctx context.Context, cn *redis.Conn) (err error) {
clientID, err = cn.ClientID(ctx).Result()
return err
}
client = redis.NewClient(opt) client = redis.NewClient(opt)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
}) })
@ -415,6 +421,30 @@ var _ = Describe("PubSub", func() {
Expect(msg.Payload).To(Equal(string(bigVal))) Expect(msg.Payload).To(Equal(string(bigVal)))
}) })
It("handles message payload slice with server-assisted client-size caching", func() {
pubsub := client.Subscribe(ctx, "__redis__:invalidate")
defer pubsub.Close()
client2 := redis.NewClient(redisOptions())
defer client2.Close()
err := client2.Do(ctx, "CLIENT", "TRACKING", "on", "REDIRECT", clientID).Err()
Expect(err).NotTo(HaveOccurred())
err = client2.Do(ctx, "GET", "mykey").Err()
Expect(err).To(Equal(redis.Nil))
err = client2.Do(ctx, "SET", "mykey", "myvalue").Err()
Expect(err).NotTo(HaveOccurred())
ch := pubsub.Channel()
var msg *redis.Message
Eventually(ch).Should(Receive(&msg))
Expect(msg.Channel).To(Equal("__redis__:invalidate"))
Expect(msg.PayloadSlice).To(Equal([]string{"mykey"}))
})
It("supports concurrent Ping and Receive", func() { It("supports concurrent Ping and Receive", func() {
const N = 100 const N = 100