From f5f73f803307ac9dd9d90771569a1ad3c3973264 Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 21 Aug 2020 17:19:31 +0800 Subject: [PATCH] Support string array in pubsub message payload --- pubsub.go | 29 ++++++++++++++++++++++------- pubsub_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/pubsub.go b/pubsub.go index 61735032..ce5aa43f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -284,9 +284,10 @@ func (m *Subscription) String() string { // Message received as result of a PUBLISH command issued by another client. type Message struct { - Channel string - Pattern string - Payload string + Channel string + Pattern string + Payload string + PayloadSlice []string } func (m *Message) String() string { @@ -322,10 +323,24 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { Count: int(reply[2].(int64)), }, nil case "message": - return &Message{ - Channel: reply[1].(string), - Payload: reply[2].(string), - }, nil + switch payload := reply[2].(type) { + case string: + return &Message{ + Channel: reply[1].(string), + Payload: payload, + }, nil + case []interface{}: + ss := make([]string, len(payload)) + for i, s := range payload { + ss[i] = s.(string) + } + return &Message{ + Channel: reply[1].(string), + PayloadSlice: ss, + }, nil + default: + return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload) + } case "pmessage": return &Message{ Pattern: reply[1].(string), diff --git a/pubsub_test.go b/pubsub_test.go index b9633b2f..d32d5e0b 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -1,6 +1,7 @@ package redis_test import ( + "context" "io" "net" "sync" @@ -14,11 +15,16 @@ import ( var _ = Describe("PubSub", func() { var client *redis.Client + var clientID int64 BeforeEach(func() { opt := redisOptions() opt.MinIdleConns = 0 opt.MaxConnAge = 0 + opt.OnConnect = func(ctx context.Context, cn *redis.Conn) (err error) { + clientID, err = cn.ClientID(ctx).Result() + return err + } client = redis.NewClient(opt) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) }) @@ -415,6 +421,30 @@ var _ = Describe("PubSub", func() { Expect(msg.Payload).To(Equal(string(bigVal))) }) + It("handles message payload slice with server-assisted client-size caching", func() { + pubsub := client.Subscribe(ctx, "__redis__:invalidate") + defer pubsub.Close() + + client2 := redis.NewClient(redisOptions()) + defer client2.Close() + + err := client2.Do(ctx, "CLIENT", "TRACKING", "on", "REDIRECT", clientID).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client2.Do(ctx, "GET", "mykey").Err() + Expect(err).To(Equal(redis.Nil)) + + err = client2.Do(ctx, "SET", "mykey", "myvalue").Err() + Expect(err).NotTo(HaveOccurred()) + + ch := pubsub.Channel() + + var msg *redis.Message + Eventually(ch).Should(Receive(&msg)) + Expect(msg.Channel).To(Equal("__redis__:invalidate")) + Expect(msg.PayloadSlice).To(Equal([]string{"mykey"})) + }) + It("supports concurrent Ping and Receive", func() { const N = 100