From aa73fcd17b94aaa50c88897ae6cc15a599cf6065 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Tue, 4 Jun 2019 09:19:12 -0700 Subject: [PATCH] Added unsubscribing from active channel related #448 --- internal/server/pubsub.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/internal/server/pubsub.go b/internal/server/pubsub.go index 2d5ee408..d6d8ff3e 100644 --- a/internal/server/pubsub.go +++ b/internal/server/pubsub.go @@ -202,7 +202,7 @@ func (c *Server) liveSubscription( write([]byte(`{"ok":true` + `,"elapsed":"` + time.Now().Sub(start).String() + `"}`)) case RESP: - write([]byte(`+OK\r\n`)) + write([]byte("+OK\r\n")) } } writeWrongNumberOfArgsErr := func(command string) { @@ -211,8 +211,8 @@ func (c *Server) liveSubscription( write([]byte(`{"ok":false,"err":"invalid number of arguments"` + `,"elapsed":"` + time.Now().Sub(start).String() + `"}`)) case RESP: - write([]byte(`-ERR wrong number of arguments ` + - `for '` + command + `' command\r\n`)) + write([]byte("-ERR wrong number of arguments " + + "for '" + command + "' command\r\n")) } } writeOnlyPubsubErr := func() { @@ -284,8 +284,8 @@ func (c *Server) liveSubscription( } m := [2]map[string]bool{ - make(map[string]bool), - make(map[string]bool), + make(map[string]bool), // pubsubChannel + make(map[string]bool), // pubsubPattern } target := newSubtarget() @@ -330,24 +330,35 @@ func (c *Server) liveSubscription( for _, msg := range msgs { start = time.Now() var kind int + var un bool switch msg.Command() { case "quit": writeOK() return nil case "psubscribe": - kind = pubsubPattern + kind, un = pubsubPattern, false + case "punsubscribe": + kind, un = pubsubPattern, true case "subscribe": - kind = pubsubChannel + kind, un = pubsubChannel, false + case "unsubscribe": + kind, un = pubsubChannel, true default: writeOnlyPubsubErr() + continue } if len(msg.Args) < 2 { writeWrongNumberOfArgsErr(msg.Command()) } for i := 1; i < len(msg.Args); i++ { channel := msg.Args[i] - m[kind][channel] = true - c.pubsub.register(kind, channel, target) + if un { + delete(m[kind], channel) + c.pubsub.unregister(kind, channel, target) + } else { + m[kind][channel] = true + c.pubsub.register(kind, channel, target) + } writeSubscribe(msg.Command(), channel, len(m[0])+len(m[1])) } }