diff --git a/append.go b/append.go deleted file mode 100644 index a30038a..0000000 --- a/append.go +++ /dev/null @@ -1,478 +0,0 @@ -package redcon - -import ( - "fmt" - "reflect" - "sort" - "strconv" - "strings" -) - -// Kind is the kind of command -type Kind int - -const ( - // Redis is returned for Redis protocol commands - Redis Kind = iota - // Tile38 is returnd for Tile38 native protocol commands - Tile38 - // Telnet is returnd for plain telnet commands - Telnet -) - -var errInvalidMessage = &errProtocol{"invalid message"} - -// ReadNextCommand reads the next command from the provided packet. It's -// possible that the packet contains multiple commands, or zero commands -// when the packet is incomplete. -// 'argsbuf' is an optional reusable buffer and it can be nil. -// 'complete' indicates that a command was read. false means no more commands. -// 'args' are the output arguments for the command. -// 'kind' is the type of command that was read. -// 'leftover' is any remaining unused bytes which belong to the next command. -// 'err' is returned when a protocol error was encountered. -func ReadNextCommand(packet []byte, argsbuf [][]byte) ( - complete bool, args [][]byte, kind Kind, leftover []byte, err error, -) { - args = argsbuf[:0] - if len(packet) > 0 { - if packet[0] != '*' { - if packet[0] == '$' { - return readTile38Command(packet, args) - } - return readTelnetCommand(packet, args) - } - // standard redis command - for s, i := 1, 1; i < len(packet); i++ { - if packet[i] == '\n' { - if packet[i-1] != '\r' { - return false, args[:0], Redis, packet, errInvalidMultiBulkLength - } - count, ok := parseInt(packet[s : i-1]) - if !ok || count < 0 { - return false, args[:0], Redis, packet, errInvalidMultiBulkLength - } - i++ - if count == 0 { - return true, args[:0], Redis, packet[i:], nil - } - nextArg: - for j := 0; j < count; j++ { - if i == len(packet) { - break - } - if packet[i] != '$' { - return false, args[:0], Redis, packet, - &errProtocol{"expected '$', got '" + - string(packet[i]) + "'"} - } - for s := i + 1; i < len(packet); i++ { - if packet[i] == '\n' { - if packet[i-1] != '\r' { - return false, args[:0], Redis, packet, errInvalidBulkLength - } - n, ok := parseInt(packet[s : i-1]) - if !ok || count <= 0 { - return false, args[:0], Redis, packet, errInvalidBulkLength - } - i++ - if len(packet)-i >= n+2 { - if packet[i+n] != '\r' || packet[i+n+1] != '\n' { - return false, args[:0], Redis, packet, errInvalidBulkLength - } - args = append(args, packet[i:i+n]) - i += n + 2 - if j == count-1 { - // done reading - return true, args, Redis, packet[i:], nil - } - continue nextArg - } - break - } - } - break - } - break - } - } - } - return false, args[:0], Redis, packet, nil -} - -func readTile38Command(packet []byte, argsbuf [][]byte) ( - complete bool, args [][]byte, kind Kind, leftover []byte, err error, -) { - for i := 1; i < len(packet); i++ { - if packet[i] == ' ' { - n, ok := parseInt(packet[1:i]) - if !ok || n < 0 { - return false, args[:0], Tile38, packet, errInvalidMessage - } - i++ - if len(packet) >= i+n+2 { - if packet[i+n] != '\r' || packet[i+n+1] != '\n' { - return false, args[:0], Tile38, packet, errInvalidMessage - } - line := packet[i : i+n] - reading: - for len(line) != 0 { - if line[0] == '{' { - // The native protocol cannot understand json boundaries so it assumes that - // a json element must be at the end of the line. - args = append(args, line) - break - } - if line[0] == '"' && line[len(line)-1] == '"' { - if len(args) > 0 && - strings.ToLower(string(args[0])) == "set" && - strings.ToLower(string(args[len(args)-1])) == "string" { - // Setting a string value that is contained inside double quotes. - // This is only because of the boundary issues of the native protocol. - args = append(args, line[1:len(line)-1]) - break - } - } - i := 0 - for ; i < len(line); i++ { - if line[i] == ' ' { - value := line[:i] - if len(value) > 0 { - args = append(args, value) - } - line = line[i+1:] - continue reading - } - } - args = append(args, line) - break - } - return true, args, Tile38, packet[i+n+2:], nil - } - break - } - } - return false, args[:0], Tile38, packet, nil -} -func readTelnetCommand(packet []byte, argsbuf [][]byte) ( - complete bool, args [][]byte, kind Kind, leftover []byte, err error, -) { - // just a plain text command - for i := 0; i < len(packet); i++ { - if packet[i] == '\n' { - var line []byte - if i > 0 && packet[i-1] == '\r' { - line = packet[:i-1] - } else { - line = packet[:i] - } - var quote bool - var quotech byte - var escape bool - outer: - for { - nline := make([]byte, 0, len(line)) - for i := 0; i < len(line); i++ { - c := line[i] - if !quote { - if c == ' ' { - if len(nline) > 0 { - args = append(args, nline) - } - line = line[i+1:] - continue outer - } - if c == '"' || c == '\'' { - if i != 0 { - return false, args[:0], Telnet, packet, errUnbalancedQuotes - } - quotech = c - quote = true - line = line[i+1:] - continue outer - } - } else { - if escape { - escape = false - switch c { - case 'n': - c = '\n' - case 'r': - c = '\r' - case 't': - c = '\t' - } - } else if c == quotech { - quote = false - quotech = 0 - args = append(args, nline) - line = line[i+1:] - if len(line) > 0 && line[0] != ' ' { - return false, args[:0], Telnet, packet, errUnbalancedQuotes - } - continue outer - } else if c == '\\' { - escape = true - continue - } - } - nline = append(nline, c) - } - if quote { - return false, args[:0], Telnet, packet, errUnbalancedQuotes - } - if len(line) > 0 { - args = append(args, line) - } - break - } - return true, args, Telnet, packet[i+1:], nil - } - } - return false, args[:0], Telnet, packet, nil -} - -// appendPrefix will append a "$3\r\n" style redis prefix for a message. -func appendPrefix(b []byte, c byte, n int64) []byte { - if n >= 0 && n <= 9 { - return append(b, c, byte('0'+n), '\r', '\n') - } - b = append(b, c) - b = strconv.AppendInt(b, n, 10) - return append(b, '\r', '\n') -} - -// AppendUint appends a Redis protocol uint64 to the input bytes. -func AppendUint(b []byte, n uint64) []byte { - b = append(b, ':') - b = strconv.AppendUint(b, n, 10) - return append(b, '\r', '\n') -} - -// AppendInt appends a Redis protocol int64 to the input bytes. -func AppendInt(b []byte, n int64) []byte { - return appendPrefix(b, ':', n) -} - -// AppendArray appends a Redis protocol array to the input bytes. -func AppendArray(b []byte, n int) []byte { - return appendPrefix(b, '*', int64(n)) -} - -// AppendBulk appends a Redis protocol bulk byte slice to the input bytes. -func AppendBulk(b []byte, bulk []byte) []byte { - b = appendPrefix(b, '$', int64(len(bulk))) - b = append(b, bulk...) - return append(b, '\r', '\n') -} - -// AppendBulkString appends a Redis protocol bulk string to the input bytes. -func AppendBulkString(b []byte, bulk string) []byte { - b = appendPrefix(b, '$', int64(len(bulk))) - b = append(b, bulk...) - return append(b, '\r', '\n') -} - -// AppendString appends a Redis protocol string to the input bytes. -func AppendString(b []byte, s string) []byte { - b = append(b, '+') - b = append(b, stripNewlines(s)...) - return append(b, '\r', '\n') -} - -// AppendError appends a Redis protocol error to the input bytes. -func AppendError(b []byte, s string) []byte { - b = append(b, '-') - b = append(b, stripNewlines(s)...) - return append(b, '\r', '\n') -} - -// AppendOK appends a Redis protocol OK to the input bytes. -func AppendOK(b []byte) []byte { - return append(b, '+', 'O', 'K', '\r', '\n') -} -func stripNewlines(s string) string { - for i := 0; i < len(s); i++ { - if s[i] == '\r' || s[i] == '\n' { - s = strings.Replace(s, "\r", " ", -1) - s = strings.Replace(s, "\n", " ", -1) - break - } - } - return s -} - -// AppendTile38 appends a Tile38 message to the input bytes. -func AppendTile38(b []byte, data []byte) []byte { - b = append(b, '$') - b = strconv.AppendInt(b, int64(len(data)), 10) - b = append(b, ' ') - b = append(b, data...) - return append(b, '\r', '\n') -} - -// AppendNull appends a Redis protocol null to the input bytes. -func AppendNull(b []byte) []byte { - return append(b, '$', '-', '1', '\r', '\n') -} - -// AppendBulkFloat appends a float64, as bulk bytes. -func AppendBulkFloat(dst []byte, f float64) []byte { - return AppendBulk(dst, strconv.AppendFloat(nil, f, 'f', -1, 64)) -} - -// AppendBulkInt appends an int64, as bulk bytes. -func AppendBulkInt(dst []byte, x int64) []byte { - return AppendBulk(dst, strconv.AppendInt(nil, x, 10)) -} - -// AppendBulkUint appends an uint64, as bulk bytes. -func AppendBulkUint(dst []byte, x uint64) []byte { - return AppendBulk(dst, strconv.AppendUint(nil, x, 10)) -} - -func prefixERRIfNeeded(msg string) string { - msg = strings.TrimSpace(msg) - firstWord := strings.Split(msg, " ")[0] - addERR := len(firstWord) == 0 - for i := 0; i < len(firstWord); i++ { - if firstWord[i] < 'A' || firstWord[i] > 'Z' { - addERR = true - break - } - } - if addERR { - msg = strings.TrimSpace("ERR " + msg) - } - return msg -} - -// SimpleString is for representing a non-bulk representation of a string -// from an *Any call. -type SimpleString string - -// SimpleInt is for representing a non-bulk representation of a int -// from an *Any call. -type SimpleInt int - -// Marshaler is the interface implemented by types that -// can marshal themselves into a Redis response type from an *Any call. -// The return value is not check for validity. -type Marshaler interface { - MarshalRESP() []byte -} - -// AppendAny appends any type to valid Redis type. -// nil -> null -// error -> error (adds "ERR " when first word is not uppercase) -// string -> bulk-string -// numbers -> bulk-string -// []byte -> bulk-string -// bool -> bulk-string ("0" or "1") -// slice -> array -// map -> array with key/value pairs -// SimpleString -> string -// SimpleInt -> integer -// Marshaler -> raw bytes -// everything-else -> bulk-string representation using fmt.Sprint() -func AppendAny(b []byte, v interface{}) []byte { - switch v := v.(type) { - case SimpleString: - b = AppendString(b, string(v)) - case SimpleInt: - b = AppendInt(b, int64(v)) - case nil: - b = AppendNull(b) - case error: - b = AppendError(b, prefixERRIfNeeded(v.Error())) - case string: - b = AppendBulkString(b, v) - case []byte: - b = AppendBulk(b, v) - case bool: - if v { - b = AppendBulkString(b, "1") - } else { - b = AppendBulkString(b, "0") - } - case int: - b = AppendBulkInt(b, int64(v)) - case int8: - b = AppendBulkInt(b, int64(v)) - case int16: - b = AppendBulkInt(b, int64(v)) - case int32: - b = AppendBulkInt(b, int64(v)) - case int64: - b = AppendBulkInt(b, int64(v)) - case uint: - b = AppendBulkUint(b, uint64(v)) - case uint8: - b = AppendBulkUint(b, uint64(v)) - case uint16: - b = AppendBulkUint(b, uint64(v)) - case uint32: - b = AppendBulkUint(b, uint64(v)) - case uint64: - b = AppendBulkUint(b, uint64(v)) - case float32: - b = AppendBulkFloat(b, float64(v)) - case float64: - b = AppendBulkFloat(b, float64(v)) - case Marshaler: - b = append(b, v.MarshalRESP()...) - default: - vv := reflect.ValueOf(v) - switch vv.Kind() { - case reflect.Slice: - n := vv.Len() - b = AppendArray(b, n) - for i := 0; i < n; i++ { - b = AppendAny(b, vv.Index(i).Interface()) - } - case reflect.Map: - n := vv.Len() - b = AppendArray(b, n*2) - var i int - var strKey bool - var strsKeyItems []strKeyItem - - iter := vv.MapRange() - for iter.Next() { - key := iter.Key().Interface() - if i == 0 { - if _, ok := key.(string); ok { - strKey = true - strsKeyItems = make([]strKeyItem, n) - } - } - if strKey { - strsKeyItems[i] = strKeyItem{ - key.(string), iter.Value().Interface(), - } - } else { - b = AppendAny(b, key) - b = AppendAny(b, iter.Value().Interface()) - } - i++ - } - if strKey { - sort.Slice(strsKeyItems, func(i, j int) bool { - return strsKeyItems[i].key < strsKeyItems[j].key - }) - for _, item := range strsKeyItems { - b = AppendBulkString(b, item.key) - b = AppendAny(b, item.value) - } - } - default: - b = AppendBulkString(b, fmt.Sprint(v)) - } - } - return b -} - -type strKeyItem struct { - key string - value interface{} -} diff --git a/append_test.go b/append_test.go deleted file mode 100644 index bfa8d86..0000000 --- a/append_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package redcon - -import ( - "bytes" - "math/rand" - "testing" - "time" -) - -func TestNextCommand(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - start := time.Now() - for time.Since(start) < time.Second { - // keep copy of pipeline args for final compare - var plargs [][][]byte - - // create a pipeline of random number of commands with random data. - N := rand.Int() % 10000 - var data []byte - for i := 0; i < N; i++ { - nargs := rand.Int() % 10 - data = AppendArray(data, nargs) - var args [][]byte - for j := 0; j < nargs; j++ { - arg := make([]byte, rand.Int()%100) - if _, err := rand.Read(arg); err != nil { - t.Fatal(err) - } - data = AppendBulk(data, arg) - args = append(args, arg) - } - plargs = append(plargs, args) - } - - // break data into random number of chunks - chunkn := rand.Int() % 100 - if chunkn == 0 { - chunkn = 1 - } - if len(data) < chunkn { - continue - } - var chunks [][]byte - var chunksz int - for i := 0; i < len(data); i += chunksz { - chunksz = rand.Int() % (len(data) / chunkn) - var chunk []byte - if i+chunksz < len(data) { - chunk = data[i : i+chunksz] - } else { - chunk = data[i:] - } - chunks = append(chunks, chunk) - } - - // process chunks - var rbuf []byte - var fargs [][][]byte - for _, chunk := range chunks { - var data []byte - if len(rbuf) > 0 { - data = append(rbuf, chunk...) - } else { - data = chunk - } - for { - complete, args, _, leftover, err := ReadNextCommand(data, nil) - data = leftover - if err != nil { - t.Fatal(err) - } - if !complete { - break - } - fargs = append(fargs, args) - } - rbuf = append(rbuf[:0], data...) - } - // compare final args to original - if len(plargs) != len(fargs) { - t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs)) - } - for i := 0; i < len(plargs); i++ { - if len(plargs[i]) != len(fargs[i]) { - t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i])) - } - for j := 0; j < len(plargs[i]); j++ { - if !bytes.Equal(plargs[i][j], plargs[i][j]) { - t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j])) - } - } - } - } -} - -func TestAppendBulkFloat(t *testing.T) { - var b []byte - b = AppendString(b, "HELLO") - b = AppendBulkFloat(b, 9.123192839) - b = AppendString(b, "HELLO") - exp := "+HELLO\r\n$11\r\n9.123192839\r\n+HELLO\r\n" - if string(b) != exp { - t.Fatalf("expected '%s', got '%s'", exp, b) - } -} - -func TestAppendBulkInt(t *testing.T) { - var b []byte - b = AppendString(b, "HELLO") - b = AppendBulkInt(b, -9182739137) - b = AppendString(b, "HELLO") - exp := "+HELLO\r\n$11\r\n-9182739137\r\n+HELLO\r\n" - if string(b) != exp { - t.Fatalf("expected '%s', got '%s'", exp, b) - } -} - -func TestAppendBulkUint(t *testing.T) { - var b []byte - b = AppendString(b, "HELLO") - b = AppendBulkInt(b, 91827391370) - b = AppendString(b, "HELLO") - exp := "+HELLO\r\n$11\r\n91827391370\r\n+HELLO\r\n" - if string(b) != exp { - t.Fatalf("expected '%s', got '%s'", exp, b) - } -} diff --git a/example/clone.go b/example/clone.go index f32bc1c..cac6440 100644 --- a/example/clone.go +++ b/example/clone.go @@ -13,12 +13,33 @@ var addr = ":6380" func main() { var mu sync.RWMutex var items = make(map[string][]byte) + var ps redcon.PubSub go log.Printf("started server at %s", addr) err := redcon.ListenAndServe(addr, func(conn redcon.Conn, cmd redcon.Command) { switch strings.ToLower(string(cmd.Args[0])) { default: conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") + case "publish": + if len(cmd.Args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])) + conn.WriteInt(count) + case "subscribe", "psubscribe": + if len(cmd.Args) < 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + command := strings.ToLower(string(cmd.Args[0])) + for i := 1; i < len(cmd.Args); i++ { + if command == "psubscribe" { + ps.Psubscribe(conn, string(cmd.Args[i])) + } else { + ps.Subscribe(conn, string(cmd.Args[i])) + } + } case "detach": hconn := conn.Detach() log.Printf("connection has been detached") @@ -27,7 +48,6 @@ func main() { hconn.WriteString("OK") hconn.Flush() }() - return case "ping": conn.WriteString("PONG") case "quit": diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bb6ad5a --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/tidwall/redcon + +go 1.15 + +require ( + github.com/tidwall/btree v0.2.2 + github.com/tidwall/match v1.0.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..34a771c --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/tidwall/btree v0.2.2 h1:VVo0JW/tdidNdQzNsDR4wMbL3heaxA1DGleyzQ3/niY= +github.com/tidwall/btree v0.2.2/go.mod h1:huei1BkDWJ3/sLXmO+bsCNELL+Bp2Kks9OLyQFkzvA8= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= diff --git a/redcon.go b/redcon.go index dc38b78..74b5464 100644 --- a/redcon.go +++ b/redcon.go @@ -5,10 +5,14 @@ import ( "bufio" "crypto/tls" "errors" + "fmt" "io" "net" "strings" "sync" + + "github.com/tidwall/btree" + "github.com/tidwall/match" ) var ( @@ -1005,3 +1009,340 @@ func (m *ServeMux) ServeRESP(conn Conn, cmd Command) { conn.WriteError("ERR unknown command '" + command + "'") } } + +// PubSub is a Redis compatible pub/sub server +type PubSub struct { + mu sync.RWMutex + nextid uint64 + initd bool + chans *btree.BTree + conns map[Conn]*pubSubConn +} + +// Subscribe a connection to PubSub +func (ps *PubSub) Subscribe(conn Conn, channel string) { + ps.subscribe(conn, false, channel) +} + +// Psubscribe a connection to PubSub +func (ps *PubSub) Psubscribe(conn Conn, channel string) { + ps.subscribe(conn, true, channel) +} + +// Publish a message to subscribers +func (ps *PubSub) Publish(channel, message string) int { + ps.mu.RLock() + defer ps.mu.RUnlock() + if !ps.initd { + return 0 + } + var sent int + // write messages to all clients that are subscribed on the channel + pivot := &pubSubEntry{pattern: false, channel: channel} + ps.chans.Ascend(pivot, func(item interface{}) bool { + entry := item.(*pubSubEntry) + if entry.channel != pivot.channel || entry.pattern != pivot.pattern { + return false + } + entry.sconn.writeMessage(entry.pattern, "", channel, message) + sent++ + return true + }) + + // match on and write all psubscribe clients + pivot = &pubSubEntry{pattern: true} + ps.chans.Ascend(pivot, func(item interface{}) bool { + entry := item.(*pubSubEntry) + if match.Match(channel, entry.channel) { + entry.sconn.writeMessage(entry.pattern, entry.channel, channel, + message) + } + sent++ + return true + }) + + return sent +} + +type pubSubConn struct { + id uint64 + mu sync.Mutex + conn Conn + dconn DetachedConn + entries map[*pubSubEntry]bool +} + +type pubSubEntry struct { + pattern bool + sconn *pubSubConn + channel string +} + +func (sconn *pubSubConn) writeMessage(pat bool, pchan, channel, msg string) { + sconn.mu.Lock() + defer sconn.mu.Unlock() + if pat { + sconn.dconn.WriteArray(4) + sconn.dconn.WriteBulkString("pmessage") + sconn.dconn.WriteBulkString(pchan) + sconn.dconn.WriteBulkString(channel) + sconn.dconn.WriteBulkString(msg) + } else { + sconn.dconn.WriteArray(3) + sconn.dconn.WriteBulkString("message") + sconn.dconn.WriteBulkString(channel) + sconn.dconn.WriteBulkString(msg) + } + sconn.dconn.Flush() +} + +// bgrunner runs in the background and reads incoming commands from the +// detached client. +func (sconn *pubSubConn) bgrunner(ps *PubSub) { + defer func() { + // client connection has ended, disconnect from the PubSub instances + // and close the network connection. + ps.mu.Lock() + defer ps.mu.Unlock() + for entry := range sconn.entries { + ps.chans.Delete(entry) + } + delete(ps.conns, sconn.conn) + sconn.mu.Lock() + defer sconn.mu.Unlock() + sconn.dconn.Close() + }() + for { + cmd, err := sconn.dconn.ReadCommand() + if err != nil { + return + } + if len(cmd.Args) == 0 { + continue + } + switch strings.ToLower(string(cmd.Args[0])) { + case "psubscribe", "subscribe": + if len(cmd.Args) < 2 { + func() { + sconn.mu.Lock() + defer sconn.mu.Unlock() + sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+ + "arguments for '%s'", cmd.Args[0])) + sconn.dconn.Flush() + }() + continue + } + command := strings.ToLower(string(cmd.Args[0])) + for i := 1; i < len(cmd.Args); i++ { + if command == "psubscribe" { + ps.Psubscribe(sconn.conn, string(cmd.Args[i])) + } else { + ps.Subscribe(sconn.conn, string(cmd.Args[i])) + } + } + case "unsubscribe", "punsubscribe": + pattern := strings.ToLower(string(cmd.Args[0])) == "punsubscribe" + if len(cmd.Args) == 1 { + ps.unsubscribe(sconn.conn, pattern, true, "") + } else { + for i := 1; i < len(cmd.Args); i++ { + channel := string(cmd.Args[i]) + ps.unsubscribe(sconn.conn, pattern, false, channel) + } + } + case "quit": + func() { + sconn.mu.Lock() + defer sconn.mu.Unlock() + sconn.dconn.WriteString("OK") + sconn.dconn.Flush() + sconn.dconn.Close() + }() + return + case "ping": + var msg string + switch len(cmd.Args) { + case 1: + case 2: + msg = string(cmd.Args[1]) + default: + func() { + sconn.mu.Lock() + defer sconn.mu.Unlock() + sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+ + "arguments for '%s'", cmd.Args[0])) + sconn.dconn.Flush() + }() + continue + } + func() { + sconn.mu.Lock() + defer sconn.mu.Unlock() + sconn.dconn.WriteArray(2) + sconn.dconn.WriteBulkString("pong") + sconn.dconn.WriteBulkString(msg) + sconn.dconn.Flush() + }() + default: + func() { + sconn.mu.Lock() + defer sconn.mu.Unlock() + sconn.dconn.WriteError(fmt.Sprintf("ERR Can't execute '%s': "+ + "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT are "+ + "allowed in this context", cmd.Args[0])) + sconn.dconn.Flush() + }() + } + } +} + +// byEntry is a "less" function that sorts the entries in a btree. The tree +// is sorted be (pattern, channel, conn.id). All pattern=true entries are at +// the end (right) of the tree. +func byEntry(a, b interface{}) bool { + aa := a.(*pubSubEntry) + bb := b.(*pubSubEntry) + if !aa.pattern && bb.pattern { + return true + } + if aa.pattern && !bb.pattern { + return false + } + if aa.channel < bb.channel { + return true + } + if aa.channel > bb.channel { + return false + } + var aid uint64 + var bid uint64 + if aa.sconn != nil { + aid = aa.sconn.id + } + if bb.sconn != nil { + bid = bb.sconn.id + } + return aid < bid +} + +func (ps *PubSub) subscribe(conn Conn, pattern bool, channel string) { + ps.mu.Lock() + defer ps.mu.Unlock() + + // initialize the PubSub instance + if !ps.initd { + ps.conns = make(map[Conn]*pubSubConn) + ps.chans = btree.New(byEntry) + ps.initd = true + } + + // fetch the pubSubConn + sconn, ok := ps.conns[conn] + if !ok { + // initialize a new pubSubConn, which runs on a detached connection, + // and attach it to the PubSub channels/conn btree + ps.nextid++ + dconn := conn.Detach() + sconn = &pubSubConn{ + id: ps.nextid, + conn: conn, + dconn: dconn, + entries: make(map[*pubSubEntry]bool), + } + ps.conns[conn] = sconn + } + sconn.mu.Lock() + defer sconn.mu.Unlock() + + // add an entry to the pubsub btree + entry := &pubSubEntry{ + pattern: pattern, + channel: channel, + sconn: sconn, + } + ps.chans.Set(entry) + sconn.entries[entry] = true + + // send a message to the client + sconn.dconn.WriteArray(3) + if pattern { + sconn.dconn.WriteBulkString("psubscribe") + } else { + sconn.dconn.WriteBulkString("subscribe") + } + sconn.dconn.WriteBulkString(channel) + var count int + for ient := range sconn.entries { + if ient.pattern == pattern { + count++ + } + } + sconn.dconn.WriteInt(count) + sconn.dconn.Flush() + + // start the background client operation + if !ok { + go sconn.bgrunner(ps) + } +} + +func (ps *PubSub) unsubscribe(conn Conn, pattern, all bool, channel string) { + ps.mu.Lock() + defer ps.mu.Unlock() + // fetch the pubSubConn. This must exist + sconn := ps.conns[conn] + sconn.mu.Lock() + defer sconn.mu.Unlock() + + removeEntry := func(entry *pubSubEntry) { + if entry != nil { + ps.chans.Delete(entry) + delete(sconn.entries, entry) + } + sconn.dconn.WriteArray(3) + if pattern { + sconn.dconn.WriteBulkString("punsubscribe") + } else { + sconn.dconn.WriteBulkString("unsubscribe") + } + if entry != nil { + sconn.dconn.WriteBulkString(entry.channel) + } else { + sconn.dconn.WriteNull() + } + var count int + for ient := range sconn.entries { + if ient.pattern == pattern { + count++ + } + } + sconn.dconn.WriteInt(count) + } + if all { + // unsubscribe from all (p)subscribe entries + var entries []*pubSubEntry + for ient := range sconn.entries { + if ient.pattern == pattern { + entries = append(entries, ient) + } + } + if len(entries) == 0 { + removeEntry(nil) + } else { + for _, entry := range entries { + removeEntry(entry) + } + } + } else { + // unsubscribe single channel from (p)subscribe. + var entry *pubSubEntry + for ient := range sconn.entries { + if ient.pattern == pattern && ient.channel == channel { + removeEntry(entry) + break + } + } + removeEntry(entry) + } + sconn.dconn.Flush() +} diff --git a/redcon_test.go b/redcon_test.go index c93d511..3dd0d7c 100644 --- a/redcon_test.go +++ b/redcon_test.go @@ -1,6 +1,7 @@ package redcon import ( + "bufio" "bytes" "fmt" "io" @@ -10,6 +11,7 @@ import ( "os" "strconv" "strings" + "sync" "testing" "time" ) @@ -554,3 +556,185 @@ func TestParse(t *testing.T) { t.Fatalf("expected '%v', got '%v'", "A", string(cmd.Args[0])) } } + +func TestPubSub(t *testing.T) { + addr := ":12346" + done := make(chan bool) + go func() { + var ps PubSub + go func() { + tch := time.NewTicker(time.Millisecond * 5) + defer tch.Stop() + channels := []string{"achan1", "bchan2", "cchan3", "dchan4"} + for i := 0; ; i++ { + select { + case <-tch.C: + case <-done: + for { + var empty bool + ps.mu.Lock() + if len(ps.conns) == 0 { + if ps.chans.Len() != 0 { + panic("chans not empty") + } + empty = true + } + ps.mu.Unlock() + if empty { + break + } + time.Sleep(time.Millisecond * 10) + } + done <- true + return + } + channel := channels[i%len(channels)] + message := fmt.Sprintf("message %d", i) + ps.Publish(channel, message) + } + }() + t.Fatal(ListenAndServe(addr, func(conn Conn, cmd Command) { + switch strings.ToLower(string(cmd.Args[0])) { + default: + conn.WriteError("ERR unknown command '" + + string(cmd.Args[0]) + "'") + case "publish": + if len(cmd.Args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + + string(cmd.Args[0]) + "' command") + return + } + count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])) + conn.WriteInt(count) + case "subscribe", "psubscribe": + if len(cmd.Args) < 2 { + conn.WriteError("ERR wrong number of arguments for '" + + string(cmd.Args[0]) + "' command") + return + } + command := strings.ToLower(string(cmd.Args[0])) + for i := 1; i < len(cmd.Args); i++ { + if command == "psubscribe" { + ps.Psubscribe(conn, string(cmd.Args[i])) + } else { + ps.Subscribe(conn, string(cmd.Args[i])) + } + } + } + }, nil, nil)) + }() + + final := make(chan bool) + go func() { + select { + case <-time.Tick(time.Second * 30): + panic("timeout") + case <-final: + return + } + }() + + // create 10 connections + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func(i int) { + defer wg.Done() + var conn net.Conn + for i := 0; i < 5; i++ { + var err error + conn, err = net.Dial("tcp", addr) + if err != nil { + time.Sleep(time.Second / 10) + continue + } + } + if conn == nil { + panic("could not connect to server") + } + defer conn.Close() + + regs := make(map[string]int) + var maxp int + var maxs int + fmt.Fprintf(conn, "subscribe achan1\r\n") + fmt.Fprintf(conn, "subscribe bchan2 cchan3\r\n") + fmt.Fprintf(conn, "psubscribe a*1\r\n") + fmt.Fprintf(conn, "psubscribe b*2 c*3\r\n") + + // collect 50 messages from each channel + rd := bufio.NewReader(conn) + var buf []byte + for { + line, err := rd.ReadBytes('\n') + if err != nil { + panic(err) + } + buf = append(buf, line...) + n, resp := ReadNextRESP(buf) + if n == 0 { + continue + } + buf = nil + if resp.Type != Array { + panic("expected array") + } + var vals []RESP + resp.ForEach(func(item RESP) bool { + vals = append(vals, item) + return true + }) + + name := string(vals[0].Data) + switch name { + case "subscribe": + if len(vals) != 3 { + panic("invalid count") + } + ch := string(vals[1].Data) + regs[ch] = 0 + maxs, _ = strconv.Atoi(string(vals[2].Data)) + case "psubscribe": + if len(vals) != 3 { + panic("invalid count") + } + ch := string(vals[1].Data) + regs[ch] = 0 + maxp, _ = strconv.Atoi(string(vals[2].Data)) + case "message": + if len(vals) != 3 { + panic("invalid count") + } + ch := string(vals[1].Data) + regs[ch] = regs[ch] + 1 + case "pmessage": + if len(vals) != 4 { + panic("invalid count") + } + ch := string(vals[1].Data) + regs[ch] = regs[ch] + 1 + } + if len(regs) == 6 && maxp == 3 && maxs == 3 { + ready := true + for _, count := range regs { + if count < 50 { + ready = false + break + } + } + if ready { + // all messages have been received + return + } + } + } + }(i) + } + wg.Wait() + // notify sender + done <- true + // wait for sender + <-done + // stop the timeout + final <- true +} diff --git a/resp.go b/resp.go index b69d99c..4f58dad 100644 --- a/resp.go +++ b/resp.go @@ -1,7 +1,11 @@ package redcon import ( + "fmt" + "reflect" + "sort" "strconv" + "strings" ) // Type of RESP @@ -128,3 +132,472 @@ func ReadNextRESP(b []byte) (n int, resp RESP) { resp.Raw = b[0 : i+tn] return len(resp.Raw), resp } + +// Kind is the kind of command +type Kind int + +const ( + // Redis is returned for Redis protocol commands + Redis Kind = iota + // Tile38 is returnd for Tile38 native protocol commands + Tile38 + // Telnet is returnd for plain telnet commands + Telnet +) + +var errInvalidMessage = &errProtocol{"invalid message"} + +// ReadNextCommand reads the next command from the provided packet. It's +// possible that the packet contains multiple commands, or zero commands +// when the packet is incomplete. +// 'argsbuf' is an optional reusable buffer and it can be nil. +// 'complete' indicates that a command was read. false means no more commands. +// 'args' are the output arguments for the command. +// 'kind' is the type of command that was read. +// 'leftover' is any remaining unused bytes which belong to the next command. +// 'err' is returned when a protocol error was encountered. +func ReadNextCommand(packet []byte, argsbuf [][]byte) ( + complete bool, args [][]byte, kind Kind, leftover []byte, err error, +) { + args = argsbuf[:0] + if len(packet) > 0 { + if packet[0] != '*' { + if packet[0] == '$' { + return readTile38Command(packet, args) + } + return readTelnetCommand(packet, args) + } + // standard redis command + for s, i := 1, 1; i < len(packet); i++ { + if packet[i] == '\n' { + if packet[i-1] != '\r' { + return false, args[:0], Redis, packet, errInvalidMultiBulkLength + } + count, ok := parseInt(packet[s : i-1]) + if !ok || count < 0 { + return false, args[:0], Redis, packet, errInvalidMultiBulkLength + } + i++ + if count == 0 { + return true, args[:0], Redis, packet[i:], nil + } + nextArg: + for j := 0; j < count; j++ { + if i == len(packet) { + break + } + if packet[i] != '$' { + return false, args[:0], Redis, packet, + &errProtocol{"expected '$', got '" + + string(packet[i]) + "'"} + } + for s := i + 1; i < len(packet); i++ { + if packet[i] == '\n' { + if packet[i-1] != '\r' { + return false, args[:0], Redis, packet, errInvalidBulkLength + } + n, ok := parseInt(packet[s : i-1]) + if !ok || count <= 0 { + return false, args[:0], Redis, packet, errInvalidBulkLength + } + i++ + if len(packet)-i >= n+2 { + if packet[i+n] != '\r' || packet[i+n+1] != '\n' { + return false, args[:0], Redis, packet, errInvalidBulkLength + } + args = append(args, packet[i:i+n]) + i += n + 2 + if j == count-1 { + // done reading + return true, args, Redis, packet[i:], nil + } + continue nextArg + } + break + } + } + break + } + break + } + } + } + return false, args[:0], Redis, packet, nil +} + +func readTile38Command(packet []byte, argsbuf [][]byte) ( + complete bool, args [][]byte, kind Kind, leftover []byte, err error, +) { + for i := 1; i < len(packet); i++ { + if packet[i] == ' ' { + n, ok := parseInt(packet[1:i]) + if !ok || n < 0 { + return false, args[:0], Tile38, packet, errInvalidMessage + } + i++ + if len(packet) >= i+n+2 { + if packet[i+n] != '\r' || packet[i+n+1] != '\n' { + return false, args[:0], Tile38, packet, errInvalidMessage + } + line := packet[i : i+n] + reading: + for len(line) != 0 { + if line[0] == '{' { + // The native protocol cannot understand json boundaries so it assumes that + // a json element must be at the end of the line. + args = append(args, line) + break + } + if line[0] == '"' && line[len(line)-1] == '"' { + if len(args) > 0 && + strings.ToLower(string(args[0])) == "set" && + strings.ToLower(string(args[len(args)-1])) == "string" { + // Setting a string value that is contained inside double quotes. + // This is only because of the boundary issues of the native protocol. + args = append(args, line[1:len(line)-1]) + break + } + } + i := 0 + for ; i < len(line); i++ { + if line[i] == ' ' { + value := line[:i] + if len(value) > 0 { + args = append(args, value) + } + line = line[i+1:] + continue reading + } + } + args = append(args, line) + break + } + return true, args, Tile38, packet[i+n+2:], nil + } + break + } + } + return false, args[:0], Tile38, packet, nil +} +func readTelnetCommand(packet []byte, argsbuf [][]byte) ( + complete bool, args [][]byte, kind Kind, leftover []byte, err error, +) { + // just a plain text command + for i := 0; i < len(packet); i++ { + if packet[i] == '\n' { + var line []byte + if i > 0 && packet[i-1] == '\r' { + line = packet[:i-1] + } else { + line = packet[:i] + } + var quote bool + var quotech byte + var escape bool + outer: + for { + nline := make([]byte, 0, len(line)) + for i := 0; i < len(line); i++ { + c := line[i] + if !quote { + if c == ' ' { + if len(nline) > 0 { + args = append(args, nline) + } + line = line[i+1:] + continue outer + } + if c == '"' || c == '\'' { + if i != 0 { + return false, args[:0], Telnet, packet, errUnbalancedQuotes + } + quotech = c + quote = true + line = line[i+1:] + continue outer + } + } else { + if escape { + escape = false + switch c { + case 'n': + c = '\n' + case 'r': + c = '\r' + case 't': + c = '\t' + } + } else if c == quotech { + quote = false + quotech = 0 + args = append(args, nline) + line = line[i+1:] + if len(line) > 0 && line[0] != ' ' { + return false, args[:0], Telnet, packet, errUnbalancedQuotes + } + continue outer + } else if c == '\\' { + escape = true + continue + } + } + nline = append(nline, c) + } + if quote { + return false, args[:0], Telnet, packet, errUnbalancedQuotes + } + if len(line) > 0 { + args = append(args, line) + } + break + } + return true, args, Telnet, packet[i+1:], nil + } + } + return false, args[:0], Telnet, packet, nil +} + +// appendPrefix will append a "$3\r\n" style redis prefix for a message. +func appendPrefix(b []byte, c byte, n int64) []byte { + if n >= 0 && n <= 9 { + return append(b, c, byte('0'+n), '\r', '\n') + } + b = append(b, c) + b = strconv.AppendInt(b, n, 10) + return append(b, '\r', '\n') +} + +// AppendUint appends a Redis protocol uint64 to the input bytes. +func AppendUint(b []byte, n uint64) []byte { + b = append(b, ':') + b = strconv.AppendUint(b, n, 10) + return append(b, '\r', '\n') +} + +// AppendInt appends a Redis protocol int64 to the input bytes. +func AppendInt(b []byte, n int64) []byte { + return appendPrefix(b, ':', n) +} + +// AppendArray appends a Redis protocol array to the input bytes. +func AppendArray(b []byte, n int) []byte { + return appendPrefix(b, '*', int64(n)) +} + +// AppendBulk appends a Redis protocol bulk byte slice to the input bytes. +func AppendBulk(b []byte, bulk []byte) []byte { + b = appendPrefix(b, '$', int64(len(bulk))) + b = append(b, bulk...) + return append(b, '\r', '\n') +} + +// AppendBulkString appends a Redis protocol bulk string to the input bytes. +func AppendBulkString(b []byte, bulk string) []byte { + b = appendPrefix(b, '$', int64(len(bulk))) + b = append(b, bulk...) + return append(b, '\r', '\n') +} + +// AppendString appends a Redis protocol string to the input bytes. +func AppendString(b []byte, s string) []byte { + b = append(b, '+') + b = append(b, stripNewlines(s)...) + return append(b, '\r', '\n') +} + +// AppendError appends a Redis protocol error to the input bytes. +func AppendError(b []byte, s string) []byte { + b = append(b, '-') + b = append(b, stripNewlines(s)...) + return append(b, '\r', '\n') +} + +// AppendOK appends a Redis protocol OK to the input bytes. +func AppendOK(b []byte) []byte { + return append(b, '+', 'O', 'K', '\r', '\n') +} +func stripNewlines(s string) string { + for i := 0; i < len(s); i++ { + if s[i] == '\r' || s[i] == '\n' { + s = strings.Replace(s, "\r", " ", -1) + s = strings.Replace(s, "\n", " ", -1) + break + } + } + return s +} + +// AppendTile38 appends a Tile38 message to the input bytes. +func AppendTile38(b []byte, data []byte) []byte { + b = append(b, '$') + b = strconv.AppendInt(b, int64(len(data)), 10) + b = append(b, ' ') + b = append(b, data...) + return append(b, '\r', '\n') +} + +// AppendNull appends a Redis protocol null to the input bytes. +func AppendNull(b []byte) []byte { + return append(b, '$', '-', '1', '\r', '\n') +} + +// AppendBulkFloat appends a float64, as bulk bytes. +func AppendBulkFloat(dst []byte, f float64) []byte { + return AppendBulk(dst, strconv.AppendFloat(nil, f, 'f', -1, 64)) +} + +// AppendBulkInt appends an int64, as bulk bytes. +func AppendBulkInt(dst []byte, x int64) []byte { + return AppendBulk(dst, strconv.AppendInt(nil, x, 10)) +} + +// AppendBulkUint appends an uint64, as bulk bytes. +func AppendBulkUint(dst []byte, x uint64) []byte { + return AppendBulk(dst, strconv.AppendUint(nil, x, 10)) +} + +func prefixERRIfNeeded(msg string) string { + msg = strings.TrimSpace(msg) + firstWord := strings.Split(msg, " ")[0] + addERR := len(firstWord) == 0 + for i := 0; i < len(firstWord); i++ { + if firstWord[i] < 'A' || firstWord[i] > 'Z' { + addERR = true + break + } + } + if addERR { + msg = strings.TrimSpace("ERR " + msg) + } + return msg +} + +// SimpleString is for representing a non-bulk representation of a string +// from an *Any call. +type SimpleString string + +// SimpleInt is for representing a non-bulk representation of a int +// from an *Any call. +type SimpleInt int + +// Marshaler is the interface implemented by types that +// can marshal themselves into a Redis response type from an *Any call. +// The return value is not check for validity. +type Marshaler interface { + MarshalRESP() []byte +} + +// AppendAny appends any type to valid Redis type. +// nil -> null +// error -> error (adds "ERR " when first word is not uppercase) +// string -> bulk-string +// numbers -> bulk-string +// []byte -> bulk-string +// bool -> bulk-string ("0" or "1") +// slice -> array +// map -> array with key/value pairs +// SimpleString -> string +// SimpleInt -> integer +// Marshaler -> raw bytes +// everything-else -> bulk-string representation using fmt.Sprint() +func AppendAny(b []byte, v interface{}) []byte { + switch v := v.(type) { + case SimpleString: + b = AppendString(b, string(v)) + case SimpleInt: + b = AppendInt(b, int64(v)) + case nil: + b = AppendNull(b) + case error: + b = AppendError(b, prefixERRIfNeeded(v.Error())) + case string: + b = AppendBulkString(b, v) + case []byte: + b = AppendBulk(b, v) + case bool: + if v { + b = AppendBulkString(b, "1") + } else { + b = AppendBulkString(b, "0") + } + case int: + b = AppendBulkInt(b, int64(v)) + case int8: + b = AppendBulkInt(b, int64(v)) + case int16: + b = AppendBulkInt(b, int64(v)) + case int32: + b = AppendBulkInt(b, int64(v)) + case int64: + b = AppendBulkInt(b, int64(v)) + case uint: + b = AppendBulkUint(b, uint64(v)) + case uint8: + b = AppendBulkUint(b, uint64(v)) + case uint16: + b = AppendBulkUint(b, uint64(v)) + case uint32: + b = AppendBulkUint(b, uint64(v)) + case uint64: + b = AppendBulkUint(b, uint64(v)) + case float32: + b = AppendBulkFloat(b, float64(v)) + case float64: + b = AppendBulkFloat(b, float64(v)) + case Marshaler: + b = append(b, v.MarshalRESP()...) + default: + vv := reflect.ValueOf(v) + switch vv.Kind() { + case reflect.Slice: + n := vv.Len() + b = AppendArray(b, n) + for i := 0; i < n; i++ { + b = AppendAny(b, vv.Index(i).Interface()) + } + case reflect.Map: + n := vv.Len() + b = AppendArray(b, n*2) + var i int + var strKey bool + var strsKeyItems []strKeyItem + + iter := vv.MapRange() + for iter.Next() { + key := iter.Key().Interface() + if i == 0 { + if _, ok := key.(string); ok { + strKey = true + strsKeyItems = make([]strKeyItem, n) + } + } + if strKey { + strsKeyItems[i] = strKeyItem{ + key.(string), iter.Value().Interface(), + } + } else { + b = AppendAny(b, key) + b = AppendAny(b, iter.Value().Interface()) + } + i++ + } + if strKey { + sort.Slice(strsKeyItems, func(i, j int) bool { + return strsKeyItems[i].key < strsKeyItems[j].key + }) + for _, item := range strsKeyItems { + b = AppendBulkString(b, item.key) + b = AppendAny(b, item.value) + } + } + default: + b = AppendBulkString(b, fmt.Sprint(v)) + } + } + return b +} + +type strKeyItem struct { + key string + value interface{} +} diff --git a/resp_test.go b/resp_test.go index cf45f56..462fcda 100644 --- a/resp_test.go +++ b/resp_test.go @@ -1,9 +1,12 @@ package redcon import ( + "bytes" "fmt" + "math/rand" "strconv" "testing" + "time" ) func isEmptyRESP(resp RESP) bool { @@ -128,3 +131,122 @@ func TestRESP(t *testing.T) { t.Fatalf("expected %v, got %v", 3, xx) } } + +func TestNextCommand(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + start := time.Now() + for time.Since(start) < time.Second { + // keep copy of pipeline args for final compare + var plargs [][][]byte + + // create a pipeline of random number of commands with random data. + N := rand.Int() % 10000 + var data []byte + for i := 0; i < N; i++ { + nargs := rand.Int() % 10 + data = AppendArray(data, nargs) + var args [][]byte + for j := 0; j < nargs; j++ { + arg := make([]byte, rand.Int()%100) + if _, err := rand.Read(arg); err != nil { + t.Fatal(err) + } + data = AppendBulk(data, arg) + args = append(args, arg) + } + plargs = append(plargs, args) + } + + // break data into random number of chunks + chunkn := rand.Int() % 100 + if chunkn == 0 { + chunkn = 1 + } + if len(data) < chunkn { + continue + } + var chunks [][]byte + var chunksz int + for i := 0; i < len(data); i += chunksz { + chunksz = rand.Int() % (len(data) / chunkn) + var chunk []byte + if i+chunksz < len(data) { + chunk = data[i : i+chunksz] + } else { + chunk = data[i:] + } + chunks = append(chunks, chunk) + } + + // process chunks + var rbuf []byte + var fargs [][][]byte + for _, chunk := range chunks { + var data []byte + if len(rbuf) > 0 { + data = append(rbuf, chunk...) + } else { + data = chunk + } + for { + complete, args, _, leftover, err := ReadNextCommand(data, nil) + data = leftover + if err != nil { + t.Fatal(err) + } + if !complete { + break + } + fargs = append(fargs, args) + } + rbuf = append(rbuf[:0], data...) + } + // compare final args to original + if len(plargs) != len(fargs) { + t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs)) + } + for i := 0; i < len(plargs); i++ { + if len(plargs[i]) != len(fargs[i]) { + t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i])) + } + for j := 0; j < len(plargs[i]); j++ { + if !bytes.Equal(plargs[i][j], plargs[i][j]) { + t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j])) + } + } + } + } +} + +func TestAppendBulkFloat(t *testing.T) { + var b []byte + b = AppendString(b, "HELLO") + b = AppendBulkFloat(b, 9.123192839) + b = AppendString(b, "HELLO") + exp := "+HELLO\r\n$11\r\n9.123192839\r\n+HELLO\r\n" + if string(b) != exp { + t.Fatalf("expected '%s', got '%s'", exp, b) + } +} + +func TestAppendBulkInt(t *testing.T) { + var b []byte + b = AppendString(b, "HELLO") + b = AppendBulkInt(b, -9182739137) + b = AppendString(b, "HELLO") + exp := "+HELLO\r\n$11\r\n-9182739137\r\n+HELLO\r\n" + if string(b) != exp { + t.Fatalf("expected '%s', got '%s'", exp, b) + } +} + +func TestAppendBulkUint(t *testing.T) { + var b []byte + b = AppendString(b, "HELLO") + b = AppendBulkInt(b, 91827391370) + b = AppendString(b, "HELLO") + exp := "+HELLO\r\n$11\r\n91827391370\r\n+HELLO\r\n" + if string(b) != exp { + t.Fatalf("expected '%s', got '%s'", exp, b) + } +}