From 4dcf45bac0ae093f58590ca1073e73f6d19fe9b8 Mon Sep 17 00:00:00 2001 From: tidwall Date: Sat, 31 Oct 2020 07:30:48 -0700 Subject: [PATCH] Added pub/sub documentation --- README.md | 42 ++++++++++++++++++++++++++++++++++++++++-- example/clone.go | 10 ++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6909e33..7da477f 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Features - Support for pipelining and telnet commands - Works with Redis clients such as [redigo](https://github.com/garyburd/redigo), [redis-py](https://github.com/andymccurdy/redis-py), [node_redis](https://github.com/NodeRedis/node_redis), and [jedis](https://github.com/xetorthio/jedis) - [TLS Support](#tls-example) +- Compatible [pub/sub](#pub_sub) - Multithreaded Installing @@ -35,6 +36,8 @@ Here's a full example of a Redis clone that accepts: - SET key value - GET key - DEL key +- PUBLISH channel message +- (P)SUBSCRIBE channel - PING - QUIT @@ -60,12 +63,47 @@ var addr = ":6380" func main() { var mu sync.RWMutex var items = make(map[string][]byte) + var ps redcon.PubSub go log.Printf("started server at %s", addr) err := redcon.ListenAndServe(addr, func(conn redcon.Conn, cmd redcon.Command) { switch strings.ToLower(string(cmd.Args[0])) { default: conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") + case "publish": + // Publish to all pub/sub subscribers and return the number of + // messages that were sent. + if len(cmd.Args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])) + conn.WriteInt(count) + case "subscribe", "psubscribe": + // Subscribe to a pub/sub channel. The `Psubscribe` and + // `Subscribe` operations will detach the connection from the + // event handler and manage all network I/O for this connection + // in the background. + if len(cmd.Args) < 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + command := strings.ToLower(string(cmd.Args[0])) + for i := 1; i < len(cmd.Args); i++ { + if command == "psubscribe" { + ps.Psubscribe(conn, string(cmd.Args[i])) + } else { + ps.Subscribe(conn, string(cmd.Args[i])) + } + } + case "detach": + hconn := conn.Detach() + log.Printf("connection has been detached") + go func() { + defer hconn.Close() + hconn.WriteString("OK") + hconn.Flush() + }() case "ping": conn.WriteString("PONG") case "quit": @@ -110,12 +148,12 @@ func main() { } }, func(conn redcon.Conn) bool { - // use this function to accept or deny the connection. + // Use this function to accept or deny the connection. // log.Printf("accept: %s", conn.RemoteAddr()) return true }, func(conn redcon.Conn, err error) { - // this is called when the connection has been closed + // This is called when the connection has been closed // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) }, ) diff --git a/example/clone.go b/example/clone.go index cac6440..ae97e1d 100644 --- a/example/clone.go +++ b/example/clone.go @@ -21,6 +21,8 @@ func main() { default: conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") case "publish": + // Publish to all pub/sub subscribers and return the number of + // messages that were sent. if len(cmd.Args) != 3 { conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return @@ -28,6 +30,10 @@ func main() { count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])) conn.WriteInt(count) case "subscribe", "psubscribe": + // Subscribe to a pub/sub channel. The `Psubscribe` and + // `Subscribe` operations will detach the connection from the + // event handler and manage all network I/O for this connection + // in the background. if len(cmd.Args) < 2 { conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return @@ -92,12 +98,12 @@ func main() { } }, func(conn redcon.Conn) bool { - // use this function to accept or deny the connection. + // Use this function to accept or deny the connection. // log.Printf("accept: %s", conn.RemoteAddr()) return true }, func(conn redcon.Conn, err error) { - // this is called when the connection has been closed + // This is called when the connection has been closed // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) }, )