Merge pull request #459 from tidwall/unsubscribe

Added unsubscribing from active channel
This commit is contained in:
Josh Baker 2019-06-04 10:01:52 -07:00
commit acb3695523
1 changed files with 20 additions and 9 deletions

View File

@ -202,7 +202,7 @@ func (c *Server) liveSubscription(
write([]byte(`{"ok":true` + write([]byte(`{"ok":true` +
`,"elapsed":"` + time.Now().Sub(start).String() + `"}`)) `,"elapsed":"` + time.Now().Sub(start).String() + `"}`))
case RESP: case RESP:
write([]byte(`+OK\r\n`)) write([]byte("+OK\r\n"))
} }
} }
writeWrongNumberOfArgsErr := func(command string) { writeWrongNumberOfArgsErr := func(command string) {
@ -211,8 +211,8 @@ func (c *Server) liveSubscription(
write([]byte(`{"ok":false,"err":"invalid number of arguments"` + write([]byte(`{"ok":false,"err":"invalid number of arguments"` +
`,"elapsed":"` + time.Now().Sub(start).String() + `"}`)) `,"elapsed":"` + time.Now().Sub(start).String() + `"}`))
case RESP: case RESP:
write([]byte(`-ERR wrong number of arguments ` + write([]byte("-ERR wrong number of arguments " +
`for '` + command + `' command\r\n`)) "for '" + command + "' command\r\n"))
} }
} }
writeOnlyPubsubErr := func() { writeOnlyPubsubErr := func() {
@ -284,8 +284,8 @@ func (c *Server) liveSubscription(
} }
m := [2]map[string]bool{ m := [2]map[string]bool{
make(map[string]bool), make(map[string]bool), // pubsubChannel
make(map[string]bool), make(map[string]bool), // pubsubPattern
} }
target := newSubtarget() target := newSubtarget()
@ -330,24 +330,35 @@ func (c *Server) liveSubscription(
for _, msg := range msgs { for _, msg := range msgs {
start = time.Now() start = time.Now()
var kind int var kind int
var un bool
switch msg.Command() { switch msg.Command() {
case "quit": case "quit":
writeOK() writeOK()
return nil return nil
case "psubscribe": case "psubscribe":
kind = pubsubPattern kind, un = pubsubPattern, false
case "punsubscribe":
kind, un = pubsubPattern, true
case "subscribe": case "subscribe":
kind = pubsubChannel kind, un = pubsubChannel, false
case "unsubscribe":
kind, un = pubsubChannel, true
default: default:
writeOnlyPubsubErr() writeOnlyPubsubErr()
continue
} }
if len(msg.Args) < 2 { if len(msg.Args) < 2 {
writeWrongNumberOfArgsErr(msg.Command()) writeWrongNumberOfArgsErr(msg.Command())
} }
for i := 1; i < len(msg.Args); i++ { for i := 1; i < len(msg.Args); i++ {
channel := msg.Args[i] channel := msg.Args[i]
m[kind][channel] = true if un {
c.pubsub.register(kind, channel, target) delete(m[kind], channel)
c.pubsub.unregister(kind, channel, target)
} else {
m[kind][channel] = true
c.pubsub.register(kind, channel, target)
}
writeSubscribe(msg.Command(), channel, len(m[0])+len(m[1])) writeSubscribe(msg.Command(), channel, len(m[0])+len(m[1]))
} }
} }