forked from mirror/redcon
Refactor code
This commit is contained in:
parent
d5576bf8a3
commit
a6a8495429
469
append.go
469
append.go
|
@ -1,469 +0,0 @@
|
||||||
package redcon
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Kind is the kind of command
|
|
||||||
type Kind int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Redis is returned for Redis protocol commands
|
|
||||||
Redis Kind = iota
|
|
||||||
// Tile38 is returnd for Tile38 native protocol commands
|
|
||||||
Tile38
|
|
||||||
// Telnet is returnd for plain telnet commands
|
|
||||||
Telnet
|
|
||||||
)
|
|
||||||
|
|
||||||
var errInvalidMessage = &errProtocol{"invalid message"}
|
|
||||||
|
|
||||||
// ReadNextCommand reads the next command from the provided packet. It's
|
|
||||||
// possible that the packet contains multiple commands, or zero commands
|
|
||||||
// when the packet is incomplete.
|
|
||||||
// 'argsbuf' is an optional reusable buffer and it can be nil.
|
|
||||||
// 'complete' indicates that a command was read. false means no more commands.
|
|
||||||
// 'args' are the output arguments for the command.
|
|
||||||
// 'kind' is the type of command that was read.
|
|
||||||
// 'leftover' is any remaining unused bytes which belong to the next command.
|
|
||||||
// 'err' is returned when a protocol error was encountered.
|
|
||||||
func ReadNextCommand(packet []byte, argsbuf [][]byte) (
|
|
||||||
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
|
|
||||||
) {
|
|
||||||
args = argsbuf[:0]
|
|
||||||
if len(packet) > 0 {
|
|
||||||
if packet[0] != '*' {
|
|
||||||
if packet[0] == '$' {
|
|
||||||
return readTile38Command(packet, args)
|
|
||||||
}
|
|
||||||
return readTelnetCommand(packet, args)
|
|
||||||
}
|
|
||||||
// standard redis command
|
|
||||||
for s, i := 1, 1; i < len(packet); i++ {
|
|
||||||
if packet[i] == '\n' {
|
|
||||||
if packet[i-1] != '\r' {
|
|
||||||
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
|
|
||||||
}
|
|
||||||
count, ok := parseInt(packet[s : i-1])
|
|
||||||
if !ok || count < 0 {
|
|
||||||
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
if count == 0 {
|
|
||||||
return true, args[:0], Redis, packet[i:], nil
|
|
||||||
}
|
|
||||||
nextArg:
|
|
||||||
for j := 0; j < count; j++ {
|
|
||||||
if i == len(packet) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if packet[i] != '$' {
|
|
||||||
return false, args[:0], Redis, packet,
|
|
||||||
&errProtocol{"expected '$', got '" +
|
|
||||||
string(packet[i]) + "'"}
|
|
||||||
}
|
|
||||||
for s := i + 1; i < len(packet); i++ {
|
|
||||||
if packet[i] == '\n' {
|
|
||||||
if packet[i-1] != '\r' {
|
|
||||||
return false, args[:0], Redis, packet, errInvalidBulkLength
|
|
||||||
}
|
|
||||||
n, ok := parseInt(packet[s : i-1])
|
|
||||||
if !ok || count <= 0 {
|
|
||||||
return false, args[:0], Redis, packet, errInvalidBulkLength
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
if len(packet)-i >= n+2 {
|
|
||||||
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
|
|
||||||
return false, args[:0], Redis, packet, errInvalidBulkLength
|
|
||||||
}
|
|
||||||
args = append(args, packet[i:i+n])
|
|
||||||
i += n + 2
|
|
||||||
if j == count-1 {
|
|
||||||
// done reading
|
|
||||||
return true, args, Redis, packet[i:], nil
|
|
||||||
}
|
|
||||||
continue nextArg
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, args[:0], Redis, packet, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func readTile38Command(packet []byte, argsbuf [][]byte) (
|
|
||||||
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
|
|
||||||
) {
|
|
||||||
for i := 1; i < len(packet); i++ {
|
|
||||||
if packet[i] == ' ' {
|
|
||||||
n, ok := parseInt(packet[1:i])
|
|
||||||
if !ok || n < 0 {
|
|
||||||
return false, args[:0], Tile38, packet, errInvalidMessage
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
if len(packet) >= i+n+2 {
|
|
||||||
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
|
|
||||||
return false, args[:0], Tile38, packet, errInvalidMessage
|
|
||||||
}
|
|
||||||
line := packet[i : i+n]
|
|
||||||
reading:
|
|
||||||
for len(line) != 0 {
|
|
||||||
if line[0] == '{' {
|
|
||||||
// The native protocol cannot understand json boundaries so it assumes that
|
|
||||||
// a json element must be at the end of the line.
|
|
||||||
args = append(args, line)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if line[0] == '"' && line[len(line)-1] == '"' {
|
|
||||||
if len(args) > 0 &&
|
|
||||||
strings.ToLower(string(args[0])) == "set" &&
|
|
||||||
strings.ToLower(string(args[len(args)-1])) == "string" {
|
|
||||||
// Setting a string value that is contained inside double quotes.
|
|
||||||
// This is only because of the boundary issues of the native protocol.
|
|
||||||
args = append(args, line[1:len(line)-1])
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
i := 0
|
|
||||||
for ; i < len(line); i++ {
|
|
||||||
if line[i] == ' ' {
|
|
||||||
value := line[:i]
|
|
||||||
if len(value) > 0 {
|
|
||||||
args = append(args, value)
|
|
||||||
}
|
|
||||||
line = line[i+1:]
|
|
||||||
continue reading
|
|
||||||
}
|
|
||||||
}
|
|
||||||
args = append(args, line)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return true, args, Tile38, packet[i+n+2:], nil
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, args[:0], Tile38, packet, nil
|
|
||||||
}
|
|
||||||
func readTelnetCommand(packet []byte, argsbuf [][]byte) (
|
|
||||||
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
|
|
||||||
) {
|
|
||||||
// just a plain text command
|
|
||||||
for i := 0; i < len(packet); i++ {
|
|
||||||
if packet[i] == '\n' {
|
|
||||||
var line []byte
|
|
||||||
if i > 0 && packet[i-1] == '\r' {
|
|
||||||
line = packet[:i-1]
|
|
||||||
} else {
|
|
||||||
line = packet[:i]
|
|
||||||
}
|
|
||||||
var quote bool
|
|
||||||
var quotech byte
|
|
||||||
var escape bool
|
|
||||||
outer:
|
|
||||||
for {
|
|
||||||
nline := make([]byte, 0, len(line))
|
|
||||||
for i := 0; i < len(line); i++ {
|
|
||||||
c := line[i]
|
|
||||||
if !quote {
|
|
||||||
if c == ' ' {
|
|
||||||
if len(nline) > 0 {
|
|
||||||
args = append(args, nline)
|
|
||||||
}
|
|
||||||
line = line[i+1:]
|
|
||||||
continue outer
|
|
||||||
}
|
|
||||||
if c == '"' || c == '\'' {
|
|
||||||
if i != 0 {
|
|
||||||
return false, args[:0], Telnet, packet, errUnbalancedQuotes
|
|
||||||
}
|
|
||||||
quotech = c
|
|
||||||
quote = true
|
|
||||||
line = line[i+1:]
|
|
||||||
continue outer
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if escape {
|
|
||||||
escape = false
|
|
||||||
switch c {
|
|
||||||
case 'n':
|
|
||||||
c = '\n'
|
|
||||||
case 'r':
|
|
||||||
c = '\r'
|
|
||||||
case 't':
|
|
||||||
c = '\t'
|
|
||||||
}
|
|
||||||
} else if c == quotech {
|
|
||||||
quote = false
|
|
||||||
quotech = 0
|
|
||||||
args = append(args, nline)
|
|
||||||
line = line[i+1:]
|
|
||||||
if len(line) > 0 && line[0] != ' ' {
|
|
||||||
return false, args[:0], Telnet, packet, errUnbalancedQuotes
|
|
||||||
}
|
|
||||||
continue outer
|
|
||||||
} else if c == '\\' {
|
|
||||||
escape = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nline = append(nline, c)
|
|
||||||
}
|
|
||||||
if quote {
|
|
||||||
return false, args[:0], Telnet, packet, errUnbalancedQuotes
|
|
||||||
}
|
|
||||||
if len(line) > 0 {
|
|
||||||
args = append(args, line)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return true, args, Telnet, packet[i+1:], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, args[:0], Telnet, packet, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// appendPrefix will append a "$3\r\n" style redis prefix for a message.
|
|
||||||
func appendPrefix(b []byte, c byte, n int64) []byte {
|
|
||||||
if n >= 0 && n <= 9 {
|
|
||||||
return append(b, c, byte('0'+n), '\r', '\n')
|
|
||||||
}
|
|
||||||
b = append(b, c)
|
|
||||||
b = strconv.AppendInt(b, n, 10)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendUint appends a Redis protocol uint64 to the input bytes.
|
|
||||||
func AppendUint(b []byte, n uint64) []byte {
|
|
||||||
b = append(b, ':')
|
|
||||||
b = strconv.AppendUint(b, n, 10)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendInt appends a Redis protocol int64 to the input bytes.
|
|
||||||
func AppendInt(b []byte, n int64) []byte {
|
|
||||||
return appendPrefix(b, ':', n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendArray appends a Redis protocol array to the input bytes.
|
|
||||||
func AppendArray(b []byte, n int) []byte {
|
|
||||||
return appendPrefix(b, '*', int64(n))
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendBulk appends a Redis protocol bulk byte slice to the input bytes.
|
|
||||||
func AppendBulk(b []byte, bulk []byte) []byte {
|
|
||||||
b = appendPrefix(b, '$', int64(len(bulk)))
|
|
||||||
b = append(b, bulk...)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendBulkString appends a Redis protocol bulk string to the input bytes.
|
|
||||||
func AppendBulkString(b []byte, bulk string) []byte {
|
|
||||||
b = appendPrefix(b, '$', int64(len(bulk)))
|
|
||||||
b = append(b, bulk...)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendString appends a Redis protocol string to the input bytes.
|
|
||||||
func AppendString(b []byte, s string) []byte {
|
|
||||||
b = append(b, '+')
|
|
||||||
b = append(b, stripNewlines(s)...)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendError appends a Redis protocol error to the input bytes.
|
|
||||||
func AppendError(b []byte, s string) []byte {
|
|
||||||
b = append(b, '-')
|
|
||||||
b = append(b, stripNewlines(s)...)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendOK appends a Redis protocol OK to the input bytes.
|
|
||||||
func AppendOK(b []byte) []byte {
|
|
||||||
return append(b, '+', 'O', 'K', '\r', '\n')
|
|
||||||
}
|
|
||||||
func stripNewlines(s string) string {
|
|
||||||
for i := 0; i < len(s); i++ {
|
|
||||||
if s[i] == '\r' || s[i] == '\n' {
|
|
||||||
s = strings.Replace(s, "\r", " ", -1)
|
|
||||||
s = strings.Replace(s, "\n", " ", -1)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendTile38 appends a Tile38 message to the input bytes.
|
|
||||||
func AppendTile38(b []byte, data []byte) []byte {
|
|
||||||
b = append(b, '$')
|
|
||||||
b = strconv.AppendInt(b, int64(len(data)), 10)
|
|
||||||
b = append(b, ' ')
|
|
||||||
b = append(b, data...)
|
|
||||||
return append(b, '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendNull appends a Redis protocol null to the input bytes.
|
|
||||||
func AppendNull(b []byte) []byte {
|
|
||||||
return append(b, '$', '-', '1', '\r', '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendBulkFloat appends a float64, as bulk bytes.
|
|
||||||
func AppendBulkFloat(dst []byte, f float64) []byte {
|
|
||||||
return AppendBulk(dst, strconv.AppendFloat(nil, f, 'f', -1, 64))
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendBulkInt appends an int64, as bulk bytes.
|
|
||||||
func AppendBulkInt(dst []byte, x int64) []byte {
|
|
||||||
return AppendBulk(dst, strconv.AppendInt(nil, x, 10))
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendBulkUint appends an uint64, as bulk bytes.
|
|
||||||
func AppendBulkUint(dst []byte, x uint64) []byte {
|
|
||||||
return AppendBulk(dst, strconv.AppendUint(nil, x, 10))
|
|
||||||
}
|
|
||||||
|
|
||||||
func prefixERRIfNeeded(msg string) string {
|
|
||||||
msg = strings.TrimSpace(msg)
|
|
||||||
firstWord := strings.Split(msg, " ")[0]
|
|
||||||
addERR := len(firstWord) == 0
|
|
||||||
for i := 0; i < len(firstWord); i++ {
|
|
||||||
if firstWord[i] < 'A' || firstWord[i] > 'Z' {
|
|
||||||
addERR = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if addERR {
|
|
||||||
msg = strings.TrimSpace("ERR " + msg)
|
|
||||||
}
|
|
||||||
return msg
|
|
||||||
}
|
|
||||||
|
|
||||||
// SimpleString is for representing a non-bulk representation of a string
|
|
||||||
// from an *Any call.
|
|
||||||
type SimpleString string
|
|
||||||
|
|
||||||
// SimpleInt is for representing a non-bulk representation of a int
|
|
||||||
// from an *Any call.
|
|
||||||
type SimpleInt int
|
|
||||||
|
|
||||||
// AppendAny appends any type to valid Redis type.
|
|
||||||
// nil -> null
|
|
||||||
// error -> error (adds "ERR " when first word is not uppercase)
|
|
||||||
// string -> bulk-string
|
|
||||||
// numbers -> bulk-string
|
|
||||||
// []byte -> bulk-string
|
|
||||||
// bool -> bulk-string ("0" or "1")
|
|
||||||
// slice -> array
|
|
||||||
// map -> array with key/value pairs
|
|
||||||
// SimpleString -> string
|
|
||||||
// SimpleInt -> integer
|
|
||||||
// everything-else -> bulk-string representation using fmt.Sprint()
|
|
||||||
func AppendAny(b []byte, v interface{}) []byte {
|
|
||||||
switch v := v.(type) {
|
|
||||||
case SimpleString:
|
|
||||||
b = AppendString(b, string(v))
|
|
||||||
case SimpleInt:
|
|
||||||
b = AppendInt(b, int64(v))
|
|
||||||
case nil:
|
|
||||||
b = AppendNull(b)
|
|
||||||
case error:
|
|
||||||
b = AppendError(b, prefixERRIfNeeded(v.Error()))
|
|
||||||
|
|
||||||
case string:
|
|
||||||
b = AppendBulkString(b, v)
|
|
||||||
case []byte:
|
|
||||||
b = AppendBulk(b, v)
|
|
||||||
case bool:
|
|
||||||
if v {
|
|
||||||
b = AppendBulkString(b, "1")
|
|
||||||
} else {
|
|
||||||
b = AppendBulkString(b, "0")
|
|
||||||
}
|
|
||||||
case int:
|
|
||||||
b = AppendBulkInt(b, int64(v))
|
|
||||||
case int8:
|
|
||||||
b = AppendBulkInt(b, int64(v))
|
|
||||||
case int16:
|
|
||||||
b = AppendBulkInt(b, int64(v))
|
|
||||||
case int32:
|
|
||||||
b = AppendBulkInt(b, int64(v))
|
|
||||||
case int64:
|
|
||||||
b = AppendBulkInt(b, int64(v))
|
|
||||||
case uint:
|
|
||||||
b = AppendBulkUint(b, uint64(v))
|
|
||||||
case uint8:
|
|
||||||
b = AppendBulkUint(b, uint64(v))
|
|
||||||
case uint16:
|
|
||||||
b = AppendBulkUint(b, uint64(v))
|
|
||||||
case uint32:
|
|
||||||
b = AppendBulkUint(b, uint64(v))
|
|
||||||
case uint64:
|
|
||||||
b = AppendBulkUint(b, uint64(v))
|
|
||||||
case float32:
|
|
||||||
b = AppendBulkFloat(b, float64(v))
|
|
||||||
case float64:
|
|
||||||
b = AppendBulkFloat(b, float64(v))
|
|
||||||
default:
|
|
||||||
vv := reflect.ValueOf(v)
|
|
||||||
switch vv.Kind() {
|
|
||||||
case reflect.Slice:
|
|
||||||
n := vv.Len()
|
|
||||||
b = AppendArray(b, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
b = AppendAny(b, vv.Index(i).Interface())
|
|
||||||
}
|
|
||||||
case reflect.Map:
|
|
||||||
n := vv.Len()
|
|
||||||
b = AppendArray(b, n*2)
|
|
||||||
var i int
|
|
||||||
var strKey bool
|
|
||||||
var strsKeyItems []strKeyItem
|
|
||||||
|
|
||||||
iter := vv.MapRange()
|
|
||||||
for iter.Next() {
|
|
||||||
key := iter.Key().Interface()
|
|
||||||
if i == 0 {
|
|
||||||
if _, ok := key.(string); ok {
|
|
||||||
strKey = true
|
|
||||||
strsKeyItems = make([]strKeyItem, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if strKey {
|
|
||||||
strsKeyItems[i] = strKeyItem{
|
|
||||||
key.(string), iter.Value().Interface(),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
b = AppendAny(b, key)
|
|
||||||
b = AppendAny(b, iter.Value().Interface())
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
if strKey {
|
|
||||||
sort.Slice(strsKeyItems, func(i, j int) bool {
|
|
||||||
return strsKeyItems[i].key < strsKeyItems[j].key
|
|
||||||
})
|
|
||||||
for _, item := range strsKeyItems {
|
|
||||||
b = AppendBulkString(b, item.key)
|
|
||||||
b = AppendAny(b, item.value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
b = AppendBulkString(b, fmt.Sprint(v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
type strKeyItem struct {
|
|
||||||
key string
|
|
||||||
value interface{}
|
|
||||||
}
|
|
127
append_test.go
127
append_test.go
|
@ -1,127 +0,0 @@
|
||||||
package redcon
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"math/rand"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNextCommand(t *testing.T) {
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
start := time.Now()
|
|
||||||
for time.Since(start) < time.Second {
|
|
||||||
// keep copy of pipeline args for final compare
|
|
||||||
var plargs [][][]byte
|
|
||||||
|
|
||||||
// create a pipeline of random number of commands with random data.
|
|
||||||
N := rand.Int() % 10000
|
|
||||||
var data []byte
|
|
||||||
for i := 0; i < N; i++ {
|
|
||||||
nargs := rand.Int() % 10
|
|
||||||
data = AppendArray(data, nargs)
|
|
||||||
var args [][]byte
|
|
||||||
for j := 0; j < nargs; j++ {
|
|
||||||
arg := make([]byte, rand.Int()%100)
|
|
||||||
if _, err := rand.Read(arg); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
data = AppendBulk(data, arg)
|
|
||||||
args = append(args, arg)
|
|
||||||
}
|
|
||||||
plargs = append(plargs, args)
|
|
||||||
}
|
|
||||||
|
|
||||||
// break data into random number of chunks
|
|
||||||
chunkn := rand.Int() % 100
|
|
||||||
if chunkn == 0 {
|
|
||||||
chunkn = 1
|
|
||||||
}
|
|
||||||
if len(data) < chunkn {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var chunks [][]byte
|
|
||||||
var chunksz int
|
|
||||||
for i := 0; i < len(data); i += chunksz {
|
|
||||||
chunksz = rand.Int() % (len(data) / chunkn)
|
|
||||||
var chunk []byte
|
|
||||||
if i+chunksz < len(data) {
|
|
||||||
chunk = data[i : i+chunksz]
|
|
||||||
} else {
|
|
||||||
chunk = data[i:]
|
|
||||||
}
|
|
||||||
chunks = append(chunks, chunk)
|
|
||||||
}
|
|
||||||
|
|
||||||
// process chunks
|
|
||||||
var rbuf []byte
|
|
||||||
var fargs [][][]byte
|
|
||||||
for _, chunk := range chunks {
|
|
||||||
var data []byte
|
|
||||||
if len(rbuf) > 0 {
|
|
||||||
data = append(rbuf, chunk...)
|
|
||||||
} else {
|
|
||||||
data = chunk
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
complete, args, _, leftover, err := ReadNextCommand(data, nil)
|
|
||||||
data = leftover
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !complete {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
fargs = append(fargs, args)
|
|
||||||
}
|
|
||||||
rbuf = append(rbuf[:0], data...)
|
|
||||||
}
|
|
||||||
// compare final args to original
|
|
||||||
if len(plargs) != len(fargs) {
|
|
||||||
t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs))
|
|
||||||
}
|
|
||||||
for i := 0; i < len(plargs); i++ {
|
|
||||||
if len(plargs[i]) != len(fargs[i]) {
|
|
||||||
t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i]))
|
|
||||||
}
|
|
||||||
for j := 0; j < len(plargs[i]); j++ {
|
|
||||||
if !bytes.Equal(plargs[i][j], plargs[i][j]) {
|
|
||||||
t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAppendBulkFloat(t *testing.T) {
|
|
||||||
var b []byte
|
|
||||||
b = AppendString(b, "HELLO")
|
|
||||||
b = AppendBulkFloat(b, 9.123192839)
|
|
||||||
b = AppendString(b, "HELLO")
|
|
||||||
exp := "+HELLO\r\n$11\r\n9.123192839\r\n+HELLO\r\n"
|
|
||||||
if string(b) != exp {
|
|
||||||
t.Fatalf("expected '%s', got '%s'", exp, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAppendBulkInt(t *testing.T) {
|
|
||||||
var b []byte
|
|
||||||
b = AppendString(b, "HELLO")
|
|
||||||
b = AppendBulkInt(b, -9182739137)
|
|
||||||
b = AppendString(b, "HELLO")
|
|
||||||
exp := "+HELLO\r\n$11\r\n-9182739137\r\n+HELLO\r\n"
|
|
||||||
if string(b) != exp {
|
|
||||||
t.Fatalf("expected '%s', got '%s'", exp, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAppendBulkUint(t *testing.T) {
|
|
||||||
var b []byte
|
|
||||||
b = AppendString(b, "HELLO")
|
|
||||||
b = AppendBulkInt(b, 91827391370)
|
|
||||||
b = AppendString(b, "HELLO")
|
|
||||||
exp := "+HELLO\r\n$11\r\n91827391370\r\n+HELLO\r\n"
|
|
||||||
if string(b) != exp {
|
|
||||||
t.Fatalf("expected '%s', got '%s'", exp, b)
|
|
||||||
}
|
|
||||||
}
|
|
1
go.mod
1
go.mod
|
@ -3,6 +3,7 @@ module github.com/tidwall/redcon
|
||||||
go 1.15
|
go 1.15
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/google/btree v1.0.0
|
||||||
github.com/tidwall/btree v0.2.2
|
github.com/tidwall/btree v0.2.2
|
||||||
github.com/tidwall/match v1.0.1
|
github.com/tidwall/match v1.0.1
|
||||||
)
|
)
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -1,3 +1,5 @@
|
||||||
|
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
|
||||||
|
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||||
github.com/tidwall/btree v0.2.2 h1:VVo0JW/tdidNdQzNsDR4wMbL3heaxA1DGleyzQ3/niY=
|
github.com/tidwall/btree v0.2.2 h1:VVo0JW/tdidNdQzNsDR4wMbL3heaxA1DGleyzQ3/niY=
|
||||||
github.com/tidwall/btree v0.2.2/go.mod h1:huei1BkDWJ3/sLXmO+bsCNELL+Bp2Kks9OLyQFkzvA8=
|
github.com/tidwall/btree v0.2.2/go.mod h1:huei1BkDWJ3/sLXmO+bsCNELL+Bp2Kks9OLyQFkzvA8=
|
||||||
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
|
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
|
||||||
|
|
347
pubsub.go
347
pubsub.go
|
@ -1,347 +0,0 @@
|
||||||
package redcon
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/tidwall/btree"
|
|
||||||
"github.com/tidwall/match"
|
|
||||||
)
|
|
||||||
|
|
||||||
// PubSub is a Redis compatible pub/sub server
|
|
||||||
type PubSub struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
nextid uint64
|
|
||||||
initd bool
|
|
||||||
chans *btree.BTree
|
|
||||||
conns map[Conn]*pubSubConn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe a connection to PubSub
|
|
||||||
func (ps *PubSub) Subscribe(conn Conn, channel string) {
|
|
||||||
ps.subscribe(conn, false, channel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Psubscribe a connection to PubSub
|
|
||||||
func (ps *PubSub) Psubscribe(conn Conn, channel string) {
|
|
||||||
ps.subscribe(conn, true, channel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish a message to subscribers
|
|
||||||
func (ps *PubSub) Publish(channel, message string) int {
|
|
||||||
ps.mu.RLock()
|
|
||||||
defer ps.mu.RUnlock()
|
|
||||||
if !ps.initd {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var sent int
|
|
||||||
// write messages to all clients that are subscribed on the channel
|
|
||||||
pivot := &pubSubEntry{pattern: false, channel: channel}
|
|
||||||
ps.chans.Ascend(pivot, func(item interface{}) bool {
|
|
||||||
entry := item.(*pubSubEntry)
|
|
||||||
if entry.channel != pivot.channel || entry.pattern != pivot.pattern {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
entry.sconn.writeMessage(entry.pattern, "", channel, message)
|
|
||||||
sent++
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
// match on and write all psubscribe clients
|
|
||||||
pivot = &pubSubEntry{pattern: true}
|
|
||||||
ps.chans.Ascend(pivot, func(item interface{}) bool {
|
|
||||||
entry := item.(*pubSubEntry)
|
|
||||||
if match.Match(channel, entry.channel) {
|
|
||||||
entry.sconn.writeMessage(entry.pattern, entry.channel, channel,
|
|
||||||
message)
|
|
||||||
}
|
|
||||||
sent++
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
return sent
|
|
||||||
}
|
|
||||||
|
|
||||||
type pubSubConn struct {
|
|
||||||
id uint64
|
|
||||||
mu sync.Mutex
|
|
||||||
conn Conn
|
|
||||||
dconn DetachedConn
|
|
||||||
entries map[*pubSubEntry]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type pubSubEntry struct {
|
|
||||||
pattern bool
|
|
||||||
sconn *pubSubConn
|
|
||||||
channel string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sconn *pubSubConn) writeMessage(pat bool, pchan, channel, msg string) {
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
if pat {
|
|
||||||
sconn.dconn.WriteArray(4)
|
|
||||||
sconn.dconn.WriteBulkString("pmessage")
|
|
||||||
sconn.dconn.WriteBulkString(pchan)
|
|
||||||
sconn.dconn.WriteBulkString(channel)
|
|
||||||
sconn.dconn.WriteBulkString(msg)
|
|
||||||
} else {
|
|
||||||
sconn.dconn.WriteArray(3)
|
|
||||||
sconn.dconn.WriteBulkString("message")
|
|
||||||
sconn.dconn.WriteBulkString(channel)
|
|
||||||
sconn.dconn.WriteBulkString(msg)
|
|
||||||
}
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
// bgrunner runs in the background and reads incoming commands from the
|
|
||||||
// detached client.
|
|
||||||
func (sconn *pubSubConn) bgrunner(ps *PubSub) {
|
|
||||||
defer func() {
|
|
||||||
// client connection has ended, disconnect from the PubSub instances
|
|
||||||
// and close the network connection.
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
for entry := range sconn.entries {
|
|
||||||
ps.chans.Delete(entry)
|
|
||||||
}
|
|
||||||
delete(ps.conns, sconn.conn)
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
sconn.dconn.Close()
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
cmd, err := sconn.dconn.ReadCommand()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(cmd.Args) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
switch strings.ToLower(string(cmd.Args[0])) {
|
|
||||||
case "psubscribe", "subscribe":
|
|
||||||
if len(cmd.Args) < 2 {
|
|
||||||
func() {
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+
|
|
||||||
"arguments for '%s'", cmd.Args[0]))
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
}()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
command := strings.ToLower(string(cmd.Args[0]))
|
|
||||||
for i := 1; i < len(cmd.Args); i++ {
|
|
||||||
if command == "psubscribe" {
|
|
||||||
ps.Psubscribe(sconn.conn, string(cmd.Args[i]))
|
|
||||||
} else {
|
|
||||||
ps.Subscribe(sconn.conn, string(cmd.Args[i]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "unsubscribe", "punsubscribe":
|
|
||||||
pattern := strings.ToLower(string(cmd.Args[0])) == "punsubscribe"
|
|
||||||
if len(cmd.Args) == 1 {
|
|
||||||
ps.unsubscribe(sconn.conn, pattern, true, "")
|
|
||||||
} else {
|
|
||||||
for i := 1; i < len(cmd.Args); i++ {
|
|
||||||
channel := string(cmd.Args[i])
|
|
||||||
ps.unsubscribe(sconn.conn, pattern, false, channel)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "quit":
|
|
||||||
func() {
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
sconn.dconn.WriteString("OK")
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
sconn.dconn.Close()
|
|
||||||
}()
|
|
||||||
return
|
|
||||||
case "ping":
|
|
||||||
var msg string
|
|
||||||
switch len(cmd.Args) {
|
|
||||||
case 1:
|
|
||||||
case 2:
|
|
||||||
msg = string(cmd.Args[1])
|
|
||||||
default:
|
|
||||||
func() {
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+
|
|
||||||
"arguments for '%s'", cmd.Args[0]))
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
}()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
func() {
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
sconn.dconn.WriteArray(2)
|
|
||||||
sconn.dconn.WriteBulkString("pong")
|
|
||||||
sconn.dconn.WriteBulkString(msg)
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
}()
|
|
||||||
default:
|
|
||||||
func() {
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
sconn.dconn.WriteError(fmt.Sprintf("ERR Can't execute '%s': "+
|
|
||||||
"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT are "+
|
|
||||||
"allowed in this context", cmd.Args[0]))
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// byEntry is a "less" function that sorts the entries in a btree. The tree
|
|
||||||
// is sorted be (pattern, channel, conn.id). All pattern=true entries are at
|
|
||||||
// the end (right) of the tree.
|
|
||||||
func byEntry(a, b interface{}) bool {
|
|
||||||
aa := a.(*pubSubEntry)
|
|
||||||
bb := b.(*pubSubEntry)
|
|
||||||
if !aa.pattern && bb.pattern {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if aa.pattern && !bb.pattern {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if aa.channel < bb.channel {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if aa.channel > bb.channel {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
var aid uint64
|
|
||||||
var bid uint64
|
|
||||||
if aa.sconn != nil {
|
|
||||||
aid = aa.sconn.id
|
|
||||||
}
|
|
||||||
if bb.sconn != nil {
|
|
||||||
bid = bb.sconn.id
|
|
||||||
}
|
|
||||||
return aid < bid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *PubSub) subscribe(conn Conn, pattern bool, channel string) {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
|
|
||||||
// initialize the PubSub instance
|
|
||||||
if !ps.initd {
|
|
||||||
ps.conns = make(map[Conn]*pubSubConn)
|
|
||||||
ps.chans = btree.New(byEntry)
|
|
||||||
ps.initd = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch the pubSubConn
|
|
||||||
sconn, ok := ps.conns[conn]
|
|
||||||
if !ok {
|
|
||||||
// initialize a new pubSubConn, which runs on a detached connection,
|
|
||||||
// and attach it to the PubSub channels/conn btree
|
|
||||||
ps.nextid++
|
|
||||||
dconn := conn.Detach()
|
|
||||||
sconn = &pubSubConn{
|
|
||||||
id: ps.nextid,
|
|
||||||
conn: conn,
|
|
||||||
dconn: dconn,
|
|
||||||
entries: make(map[*pubSubEntry]bool),
|
|
||||||
}
|
|
||||||
ps.conns[conn] = sconn
|
|
||||||
}
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
|
|
||||||
// add an entry to the pubsub btree
|
|
||||||
entry := &pubSubEntry{
|
|
||||||
pattern: pattern,
|
|
||||||
channel: channel,
|
|
||||||
sconn: sconn,
|
|
||||||
}
|
|
||||||
ps.chans.Set(entry)
|
|
||||||
sconn.entries[entry] = true
|
|
||||||
|
|
||||||
// send a message to the client
|
|
||||||
sconn.dconn.WriteArray(3)
|
|
||||||
if pattern {
|
|
||||||
sconn.dconn.WriteBulkString("psubscribe")
|
|
||||||
} else {
|
|
||||||
sconn.dconn.WriteBulkString("subscribe")
|
|
||||||
}
|
|
||||||
sconn.dconn.WriteBulkString(channel)
|
|
||||||
var count int
|
|
||||||
for ient := range sconn.entries {
|
|
||||||
if ient.pattern == pattern {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sconn.dconn.WriteInt(count)
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
|
|
||||||
// start the background client operation
|
|
||||||
if !ok {
|
|
||||||
go sconn.bgrunner(ps)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *PubSub) unsubscribe(conn Conn, pattern, all bool, channel string) {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
// fetch the pubSubConn. This must exist
|
|
||||||
sconn := ps.conns[conn]
|
|
||||||
sconn.mu.Lock()
|
|
||||||
defer sconn.mu.Unlock()
|
|
||||||
|
|
||||||
removeEntry := func(entry *pubSubEntry) {
|
|
||||||
if entry != nil {
|
|
||||||
ps.chans.Delete(entry)
|
|
||||||
delete(sconn.entries, entry)
|
|
||||||
}
|
|
||||||
sconn.dconn.WriteArray(3)
|
|
||||||
if pattern {
|
|
||||||
sconn.dconn.WriteBulkString("punsubscribe")
|
|
||||||
} else {
|
|
||||||
sconn.dconn.WriteBulkString("unsubscribe")
|
|
||||||
}
|
|
||||||
if entry != nil {
|
|
||||||
sconn.dconn.WriteBulkString(entry.channel)
|
|
||||||
} else {
|
|
||||||
sconn.dconn.WriteNull()
|
|
||||||
}
|
|
||||||
var count int
|
|
||||||
for ient := range sconn.entries {
|
|
||||||
if ient.pattern == pattern {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sconn.dconn.WriteInt(count)
|
|
||||||
}
|
|
||||||
if all {
|
|
||||||
// unsubscribe from all (p)subscribe entries
|
|
||||||
var entries []*pubSubEntry
|
|
||||||
for ient := range sconn.entries {
|
|
||||||
if ient.pattern == pattern {
|
|
||||||
entries = append(entries, ient)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(entries) == 0 {
|
|
||||||
removeEntry(nil)
|
|
||||||
} else {
|
|
||||||
for _, entry := range entries {
|
|
||||||
removeEntry(entry)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// unsubscribe single channel from (p)subscribe.
|
|
||||||
var entry *pubSubEntry
|
|
||||||
for ient := range sconn.entries {
|
|
||||||
if ient.pattern == pattern && ient.channel == channel {
|
|
||||||
removeEntry(entry)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
removeEntry(entry)
|
|
||||||
}
|
|
||||||
sconn.dconn.Flush()
|
|
||||||
}
|
|
194
pubsub_test.go
194
pubsub_test.go
|
@ -1,194 +0,0 @@
|
||||||
package redcon
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPubSub(t *testing.T) {
|
|
||||||
addr := ":12346"
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
var ps PubSub
|
|
||||||
go func() {
|
|
||||||
tch := time.NewTicker(time.Millisecond * 5)
|
|
||||||
defer tch.Stop()
|
|
||||||
channels := []string{"achan1", "bchan2", "cchan3", "dchan4"}
|
|
||||||
for i := 0; ; i++ {
|
|
||||||
select {
|
|
||||||
case <-tch.C:
|
|
||||||
case <-done:
|
|
||||||
for {
|
|
||||||
var empty bool
|
|
||||||
ps.mu.Lock()
|
|
||||||
if len(ps.conns) == 0 {
|
|
||||||
if ps.chans.Len() != 0 {
|
|
||||||
panic("chans not empty")
|
|
||||||
}
|
|
||||||
empty = true
|
|
||||||
}
|
|
||||||
ps.mu.Unlock()
|
|
||||||
if empty {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(time.Millisecond * 10)
|
|
||||||
}
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
channel := channels[i%len(channels)]
|
|
||||||
message := fmt.Sprintf("message %d", i)
|
|
||||||
ps.Publish(channel, message)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
t.Fatal(ListenAndServe(addr, func(conn Conn, cmd Command) {
|
|
||||||
switch strings.ToLower(string(cmd.Args[0])) {
|
|
||||||
default:
|
|
||||||
conn.WriteError("ERR unknown command '" +
|
|
||||||
string(cmd.Args[0]) + "'")
|
|
||||||
case "publish":
|
|
||||||
if len(cmd.Args) != 3 {
|
|
||||||
conn.WriteError("ERR wrong number of arguments for '" +
|
|
||||||
string(cmd.Args[0]) + "' command")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2]))
|
|
||||||
conn.WriteInt(count)
|
|
||||||
case "subscribe", "psubscribe":
|
|
||||||
if len(cmd.Args) < 2 {
|
|
||||||
conn.WriteError("ERR wrong number of arguments for '" +
|
|
||||||
string(cmd.Args[0]) + "' command")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
command := strings.ToLower(string(cmd.Args[0]))
|
|
||||||
for i := 1; i < len(cmd.Args); i++ {
|
|
||||||
if command == "psubscribe" {
|
|
||||||
ps.Psubscribe(conn, string(cmd.Args[i]))
|
|
||||||
} else {
|
|
||||||
ps.Subscribe(conn, string(cmd.Args[i]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, nil, nil))
|
|
||||||
}()
|
|
||||||
|
|
||||||
final := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-time.Tick(time.Second * 30):
|
|
||||||
panic("timeout")
|
|
||||||
case <-final:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// create 10 connections
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(10)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
go func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
var conn net.Conn
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
var err error
|
|
||||||
conn, err = net.Dial("tcp", addr)
|
|
||||||
if err != nil {
|
|
||||||
time.Sleep(time.Second / 10)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if conn == nil {
|
|
||||||
panic("could not connect to server")
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
regs := make(map[string]int)
|
|
||||||
var maxp int
|
|
||||||
var maxs int
|
|
||||||
fmt.Fprintf(conn, "subscribe achan1\r\n")
|
|
||||||
fmt.Fprintf(conn, "subscribe bchan2 cchan3\r\n")
|
|
||||||
fmt.Fprintf(conn, "psubscribe a*1\r\n")
|
|
||||||
fmt.Fprintf(conn, "psubscribe b*2 c*3\r\n")
|
|
||||||
|
|
||||||
// collect 50 messages from each channel
|
|
||||||
rd := bufio.NewReader(conn)
|
|
||||||
var buf []byte
|
|
||||||
for {
|
|
||||||
line, err := rd.ReadBytes('\n')
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
buf = append(buf, line...)
|
|
||||||
n, resp := ReadNextRESP(buf)
|
|
||||||
if n == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
buf = nil
|
|
||||||
if resp.Type != Array {
|
|
||||||
panic("expected array")
|
|
||||||
}
|
|
||||||
var vals []RESP
|
|
||||||
resp.ForEach(func(item RESP) bool {
|
|
||||||
vals = append(vals, item)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
name := string(vals[0].Data)
|
|
||||||
switch name {
|
|
||||||
case "subscribe":
|
|
||||||
if len(vals) != 3 {
|
|
||||||
panic("invalid count")
|
|
||||||
}
|
|
||||||
ch := string(vals[1].Data)
|
|
||||||
regs[ch] = 0
|
|
||||||
maxs, _ = strconv.Atoi(string(vals[2].Data))
|
|
||||||
case "psubscribe":
|
|
||||||
if len(vals) != 3 {
|
|
||||||
panic("invalid count")
|
|
||||||
}
|
|
||||||
ch := string(vals[1].Data)
|
|
||||||
regs[ch] = 0
|
|
||||||
maxp, _ = strconv.Atoi(string(vals[2].Data))
|
|
||||||
case "message":
|
|
||||||
if len(vals) != 3 {
|
|
||||||
panic("invalid count")
|
|
||||||
}
|
|
||||||
ch := string(vals[1].Data)
|
|
||||||
regs[ch] = regs[ch] + 1
|
|
||||||
case "pmessage":
|
|
||||||
if len(vals) != 4 {
|
|
||||||
panic("invalid count")
|
|
||||||
}
|
|
||||||
ch := string(vals[1].Data)
|
|
||||||
regs[ch] = regs[ch] + 1
|
|
||||||
}
|
|
||||||
if len(regs) == 6 && maxp == 3 && maxs == 3 {
|
|
||||||
ready := true
|
|
||||||
for _, count := range regs {
|
|
||||||
if count < 50 {
|
|
||||||
ready = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ready {
|
|
||||||
// all messages have been received
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
// notify sender
|
|
||||||
done <- true
|
|
||||||
// wait for sender
|
|
||||||
<-done
|
|
||||||
// stop the timeout
|
|
||||||
final <- true
|
|
||||||
}
|
|
341
redcon.go
341
redcon.go
|
@ -5,10 +5,14 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/tidwall/btree"
|
||||||
|
"github.com/tidwall/match"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1005,3 +1009,340 @@ func (m *ServeMux) ServeRESP(conn Conn, cmd Command) {
|
||||||
conn.WriteError("ERR unknown command '" + command + "'")
|
conn.WriteError("ERR unknown command '" + command + "'")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PubSub is a Redis compatible pub/sub server
|
||||||
|
type PubSub struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
nextid uint64
|
||||||
|
initd bool
|
||||||
|
chans *btree.BTree
|
||||||
|
conns map[Conn]*pubSubConn
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe a connection to PubSub
|
||||||
|
func (ps *PubSub) Subscribe(conn Conn, channel string) {
|
||||||
|
ps.subscribe(conn, false, channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Psubscribe a connection to PubSub
|
||||||
|
func (ps *PubSub) Psubscribe(conn Conn, channel string) {
|
||||||
|
ps.subscribe(conn, true, channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish a message to subscribers
|
||||||
|
func (ps *PubSub) Publish(channel, message string) int {
|
||||||
|
ps.mu.RLock()
|
||||||
|
defer ps.mu.RUnlock()
|
||||||
|
if !ps.initd {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
var sent int
|
||||||
|
// write messages to all clients that are subscribed on the channel
|
||||||
|
pivot := &pubSubEntry{pattern: false, channel: channel}
|
||||||
|
ps.chans.Ascend(pivot, func(item interface{}) bool {
|
||||||
|
entry := item.(*pubSubEntry)
|
||||||
|
if entry.channel != pivot.channel || entry.pattern != pivot.pattern {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
entry.sconn.writeMessage(entry.pattern, "", channel, message)
|
||||||
|
sent++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// match on and write all psubscribe clients
|
||||||
|
pivot = &pubSubEntry{pattern: true}
|
||||||
|
ps.chans.Ascend(pivot, func(item interface{}) bool {
|
||||||
|
entry := item.(*pubSubEntry)
|
||||||
|
if match.Match(channel, entry.channel) {
|
||||||
|
entry.sconn.writeMessage(entry.pattern, entry.channel, channel,
|
||||||
|
message)
|
||||||
|
}
|
||||||
|
sent++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return sent
|
||||||
|
}
|
||||||
|
|
||||||
|
type pubSubConn struct {
|
||||||
|
id uint64
|
||||||
|
mu sync.Mutex
|
||||||
|
conn Conn
|
||||||
|
dconn DetachedConn
|
||||||
|
entries map[*pubSubEntry]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type pubSubEntry struct {
|
||||||
|
pattern bool
|
||||||
|
sconn *pubSubConn
|
||||||
|
channel string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sconn *pubSubConn) writeMessage(pat bool, pchan, channel, msg string) {
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
if pat {
|
||||||
|
sconn.dconn.WriteArray(4)
|
||||||
|
sconn.dconn.WriteBulkString("pmessage")
|
||||||
|
sconn.dconn.WriteBulkString(pchan)
|
||||||
|
sconn.dconn.WriteBulkString(channel)
|
||||||
|
sconn.dconn.WriteBulkString(msg)
|
||||||
|
} else {
|
||||||
|
sconn.dconn.WriteArray(3)
|
||||||
|
sconn.dconn.WriteBulkString("message")
|
||||||
|
sconn.dconn.WriteBulkString(channel)
|
||||||
|
sconn.dconn.WriteBulkString(msg)
|
||||||
|
}
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// bgrunner runs in the background and reads incoming commands from the
|
||||||
|
// detached client.
|
||||||
|
func (sconn *pubSubConn) bgrunner(ps *PubSub) {
|
||||||
|
defer func() {
|
||||||
|
// client connection has ended, disconnect from the PubSub instances
|
||||||
|
// and close the network connection.
|
||||||
|
ps.mu.Lock()
|
||||||
|
defer ps.mu.Unlock()
|
||||||
|
for entry := range sconn.entries {
|
||||||
|
ps.chans.Delete(entry)
|
||||||
|
}
|
||||||
|
delete(ps.conns, sconn.conn)
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
sconn.dconn.Close()
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
cmd, err := sconn.dconn.ReadCommand()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(cmd.Args) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch strings.ToLower(string(cmd.Args[0])) {
|
||||||
|
case "psubscribe", "subscribe":
|
||||||
|
if len(cmd.Args) < 2 {
|
||||||
|
func() {
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+
|
||||||
|
"arguments for '%s'", cmd.Args[0]))
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
}()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
command := strings.ToLower(string(cmd.Args[0]))
|
||||||
|
for i := 1; i < len(cmd.Args); i++ {
|
||||||
|
if command == "psubscribe" {
|
||||||
|
ps.Psubscribe(sconn.conn, string(cmd.Args[i]))
|
||||||
|
} else {
|
||||||
|
ps.Subscribe(sconn.conn, string(cmd.Args[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "unsubscribe", "punsubscribe":
|
||||||
|
pattern := strings.ToLower(string(cmd.Args[0])) == "punsubscribe"
|
||||||
|
if len(cmd.Args) == 1 {
|
||||||
|
ps.unsubscribe(sconn.conn, pattern, true, "")
|
||||||
|
} else {
|
||||||
|
for i := 1; i < len(cmd.Args); i++ {
|
||||||
|
channel := string(cmd.Args[i])
|
||||||
|
ps.unsubscribe(sconn.conn, pattern, false, channel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "quit":
|
||||||
|
func() {
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
sconn.dconn.WriteString("OK")
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
sconn.dconn.Close()
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
case "ping":
|
||||||
|
var msg string
|
||||||
|
switch len(cmd.Args) {
|
||||||
|
case 1:
|
||||||
|
case 2:
|
||||||
|
msg = string(cmd.Args[1])
|
||||||
|
default:
|
||||||
|
func() {
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+
|
||||||
|
"arguments for '%s'", cmd.Args[0]))
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
}()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
func() {
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
sconn.dconn.WriteArray(2)
|
||||||
|
sconn.dconn.WriteBulkString("pong")
|
||||||
|
sconn.dconn.WriteBulkString(msg)
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
}()
|
||||||
|
default:
|
||||||
|
func() {
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
sconn.dconn.WriteError(fmt.Sprintf("ERR Can't execute '%s': "+
|
||||||
|
"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT are "+
|
||||||
|
"allowed in this context", cmd.Args[0]))
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// byEntry is a "less" function that sorts the entries in a btree. The tree
|
||||||
|
// is sorted be (pattern, channel, conn.id). All pattern=true entries are at
|
||||||
|
// the end (right) of the tree.
|
||||||
|
func byEntry(a, b interface{}) bool {
|
||||||
|
aa := a.(*pubSubEntry)
|
||||||
|
bb := b.(*pubSubEntry)
|
||||||
|
if !aa.pattern && bb.pattern {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if aa.pattern && !bb.pattern {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if aa.channel < bb.channel {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if aa.channel > bb.channel {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var aid uint64
|
||||||
|
var bid uint64
|
||||||
|
if aa.sconn != nil {
|
||||||
|
aid = aa.sconn.id
|
||||||
|
}
|
||||||
|
if bb.sconn != nil {
|
||||||
|
bid = bb.sconn.id
|
||||||
|
}
|
||||||
|
return aid < bid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PubSub) subscribe(conn Conn, pattern bool, channel string) {
|
||||||
|
ps.mu.Lock()
|
||||||
|
defer ps.mu.Unlock()
|
||||||
|
|
||||||
|
// initialize the PubSub instance
|
||||||
|
if !ps.initd {
|
||||||
|
ps.conns = make(map[Conn]*pubSubConn)
|
||||||
|
ps.chans = btree.New(byEntry)
|
||||||
|
ps.initd = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch the pubSubConn
|
||||||
|
sconn, ok := ps.conns[conn]
|
||||||
|
if !ok {
|
||||||
|
// initialize a new pubSubConn, which runs on a detached connection,
|
||||||
|
// and attach it to the PubSub channels/conn btree
|
||||||
|
ps.nextid++
|
||||||
|
dconn := conn.Detach()
|
||||||
|
sconn = &pubSubConn{
|
||||||
|
id: ps.nextid,
|
||||||
|
conn: conn,
|
||||||
|
dconn: dconn,
|
||||||
|
entries: make(map[*pubSubEntry]bool),
|
||||||
|
}
|
||||||
|
ps.conns[conn] = sconn
|
||||||
|
}
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
|
||||||
|
// add an entry to the pubsub btree
|
||||||
|
entry := &pubSubEntry{
|
||||||
|
pattern: pattern,
|
||||||
|
channel: channel,
|
||||||
|
sconn: sconn,
|
||||||
|
}
|
||||||
|
ps.chans.Set(entry)
|
||||||
|
sconn.entries[entry] = true
|
||||||
|
|
||||||
|
// send a message to the client
|
||||||
|
sconn.dconn.WriteArray(3)
|
||||||
|
if pattern {
|
||||||
|
sconn.dconn.WriteBulkString("psubscribe")
|
||||||
|
} else {
|
||||||
|
sconn.dconn.WriteBulkString("subscribe")
|
||||||
|
}
|
||||||
|
sconn.dconn.WriteBulkString(channel)
|
||||||
|
var count int
|
||||||
|
for ient := range sconn.entries {
|
||||||
|
if ient.pattern == pattern {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sconn.dconn.WriteInt(count)
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
|
||||||
|
// start the background client operation
|
||||||
|
if !ok {
|
||||||
|
go sconn.bgrunner(ps)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PubSub) unsubscribe(conn Conn, pattern, all bool, channel string) {
|
||||||
|
ps.mu.Lock()
|
||||||
|
defer ps.mu.Unlock()
|
||||||
|
// fetch the pubSubConn. This must exist
|
||||||
|
sconn := ps.conns[conn]
|
||||||
|
sconn.mu.Lock()
|
||||||
|
defer sconn.mu.Unlock()
|
||||||
|
|
||||||
|
removeEntry := func(entry *pubSubEntry) {
|
||||||
|
if entry != nil {
|
||||||
|
ps.chans.Delete(entry)
|
||||||
|
delete(sconn.entries, entry)
|
||||||
|
}
|
||||||
|
sconn.dconn.WriteArray(3)
|
||||||
|
if pattern {
|
||||||
|
sconn.dconn.WriteBulkString("punsubscribe")
|
||||||
|
} else {
|
||||||
|
sconn.dconn.WriteBulkString("unsubscribe")
|
||||||
|
}
|
||||||
|
if entry != nil {
|
||||||
|
sconn.dconn.WriteBulkString(entry.channel)
|
||||||
|
} else {
|
||||||
|
sconn.dconn.WriteNull()
|
||||||
|
}
|
||||||
|
var count int
|
||||||
|
for ient := range sconn.entries {
|
||||||
|
if ient.pattern == pattern {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sconn.dconn.WriteInt(count)
|
||||||
|
}
|
||||||
|
if all {
|
||||||
|
// unsubscribe from all (p)subscribe entries
|
||||||
|
var entries []*pubSubEntry
|
||||||
|
for ient := range sconn.entries {
|
||||||
|
if ient.pattern == pattern {
|
||||||
|
entries = append(entries, ient)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(entries) == 0 {
|
||||||
|
removeEntry(nil)
|
||||||
|
} else {
|
||||||
|
for _, entry := range entries {
|
||||||
|
removeEntry(entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// unsubscribe single channel from (p)subscribe.
|
||||||
|
var entry *pubSubEntry
|
||||||
|
for ient := range sconn.entries {
|
||||||
|
if ient.pattern == pattern && ient.channel == channel {
|
||||||
|
removeEntry(entry)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
removeEntry(entry)
|
||||||
|
}
|
||||||
|
sconn.dconn.Flush()
|
||||||
|
}
|
||||||
|
|
184
redcon_test.go
184
redcon_test.go
|
@ -1,6 +1,7 @@
|
||||||
package redcon
|
package redcon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -10,6 +11,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -554,3 +556,185 @@ func TestParse(t *testing.T) {
|
||||||
t.Fatalf("expected '%v', got '%v'", "A", string(cmd.Args[0]))
|
t.Fatalf("expected '%v', got '%v'", "A", string(cmd.Args[0]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPubSub(t *testing.T) {
|
||||||
|
addr := ":12346"
|
||||||
|
done := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
var ps PubSub
|
||||||
|
go func() {
|
||||||
|
tch := time.NewTicker(time.Millisecond * 5)
|
||||||
|
defer tch.Stop()
|
||||||
|
channels := []string{"achan1", "bchan2", "cchan3", "dchan4"}
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
select {
|
||||||
|
case <-tch.C:
|
||||||
|
case <-done:
|
||||||
|
for {
|
||||||
|
var empty bool
|
||||||
|
ps.mu.Lock()
|
||||||
|
if len(ps.conns) == 0 {
|
||||||
|
if ps.chans.Len() != 0 {
|
||||||
|
panic("chans not empty")
|
||||||
|
}
|
||||||
|
empty = true
|
||||||
|
}
|
||||||
|
ps.mu.Unlock()
|
||||||
|
if empty {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
}
|
||||||
|
done <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
channel := channels[i%len(channels)]
|
||||||
|
message := fmt.Sprintf("message %d", i)
|
||||||
|
ps.Publish(channel, message)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
t.Fatal(ListenAndServe(addr, func(conn Conn, cmd Command) {
|
||||||
|
switch strings.ToLower(string(cmd.Args[0])) {
|
||||||
|
default:
|
||||||
|
conn.WriteError("ERR unknown command '" +
|
||||||
|
string(cmd.Args[0]) + "'")
|
||||||
|
case "publish":
|
||||||
|
if len(cmd.Args) != 3 {
|
||||||
|
conn.WriteError("ERR wrong number of arguments for '" +
|
||||||
|
string(cmd.Args[0]) + "' command")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2]))
|
||||||
|
conn.WriteInt(count)
|
||||||
|
case "subscribe", "psubscribe":
|
||||||
|
if len(cmd.Args) < 2 {
|
||||||
|
conn.WriteError("ERR wrong number of arguments for '" +
|
||||||
|
string(cmd.Args[0]) + "' command")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
command := strings.ToLower(string(cmd.Args[0]))
|
||||||
|
for i := 1; i < len(cmd.Args); i++ {
|
||||||
|
if command == "psubscribe" {
|
||||||
|
ps.Psubscribe(conn, string(cmd.Args[i]))
|
||||||
|
} else {
|
||||||
|
ps.Subscribe(conn, string(cmd.Args[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, nil, nil))
|
||||||
|
}()
|
||||||
|
|
||||||
|
final := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-time.Tick(time.Second * 30):
|
||||||
|
panic("timeout")
|
||||||
|
case <-final:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create 10 connections
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
var conn net.Conn
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
var err error
|
||||||
|
conn, err = net.Dial("tcp", addr)
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(time.Second / 10)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if conn == nil {
|
||||||
|
panic("could not connect to server")
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
regs := make(map[string]int)
|
||||||
|
var maxp int
|
||||||
|
var maxs int
|
||||||
|
fmt.Fprintf(conn, "subscribe achan1\r\n")
|
||||||
|
fmt.Fprintf(conn, "subscribe bchan2 cchan3\r\n")
|
||||||
|
fmt.Fprintf(conn, "psubscribe a*1\r\n")
|
||||||
|
fmt.Fprintf(conn, "psubscribe b*2 c*3\r\n")
|
||||||
|
|
||||||
|
// collect 50 messages from each channel
|
||||||
|
rd := bufio.NewReader(conn)
|
||||||
|
var buf []byte
|
||||||
|
for {
|
||||||
|
line, err := rd.ReadBytes('\n')
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
buf = append(buf, line...)
|
||||||
|
n, resp := ReadNextRESP(buf)
|
||||||
|
if n == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
buf = nil
|
||||||
|
if resp.Type != Array {
|
||||||
|
panic("expected array")
|
||||||
|
}
|
||||||
|
var vals []RESP
|
||||||
|
resp.ForEach(func(item RESP) bool {
|
||||||
|
vals = append(vals, item)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
name := string(vals[0].Data)
|
||||||
|
switch name {
|
||||||
|
case "subscribe":
|
||||||
|
if len(vals) != 3 {
|
||||||
|
panic("invalid count")
|
||||||
|
}
|
||||||
|
ch := string(vals[1].Data)
|
||||||
|
regs[ch] = 0
|
||||||
|
maxs, _ = strconv.Atoi(string(vals[2].Data))
|
||||||
|
case "psubscribe":
|
||||||
|
if len(vals) != 3 {
|
||||||
|
panic("invalid count")
|
||||||
|
}
|
||||||
|
ch := string(vals[1].Data)
|
||||||
|
regs[ch] = 0
|
||||||
|
maxp, _ = strconv.Atoi(string(vals[2].Data))
|
||||||
|
case "message":
|
||||||
|
if len(vals) != 3 {
|
||||||
|
panic("invalid count")
|
||||||
|
}
|
||||||
|
ch := string(vals[1].Data)
|
||||||
|
regs[ch] = regs[ch] + 1
|
||||||
|
case "pmessage":
|
||||||
|
if len(vals) != 4 {
|
||||||
|
panic("invalid count")
|
||||||
|
}
|
||||||
|
ch := string(vals[1].Data)
|
||||||
|
regs[ch] = regs[ch] + 1
|
||||||
|
}
|
||||||
|
if len(regs) == 6 && maxp == 3 && maxs == 3 {
|
||||||
|
ready := true
|
||||||
|
for _, count := range regs {
|
||||||
|
if count < 50 {
|
||||||
|
ready = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ready {
|
||||||
|
// all messages have been received
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
// notify sender
|
||||||
|
done <- true
|
||||||
|
// wait for sender
|
||||||
|
<-done
|
||||||
|
// stop the timeout
|
||||||
|
final <- true
|
||||||
|
}
|
||||||
|
|
464
resp.go
464
resp.go
|
@ -1,7 +1,11 @@
|
||||||
package redcon
|
package redcon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Type of RESP
|
// Type of RESP
|
||||||
|
@ -128,3 +132,463 @@ func ReadNextRESP(b []byte) (n int, resp RESP) {
|
||||||
resp.Raw = b[0 : i+tn]
|
resp.Raw = b[0 : i+tn]
|
||||||
return len(resp.Raw), resp
|
return len(resp.Raw), resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Kind is the kind of command
|
||||||
|
type Kind int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Redis is returned for Redis protocol commands
|
||||||
|
Redis Kind = iota
|
||||||
|
// Tile38 is returnd for Tile38 native protocol commands
|
||||||
|
Tile38
|
||||||
|
// Telnet is returnd for plain telnet commands
|
||||||
|
Telnet
|
||||||
|
)
|
||||||
|
|
||||||
|
var errInvalidMessage = &errProtocol{"invalid message"}
|
||||||
|
|
||||||
|
// ReadNextCommand reads the next command from the provided packet. It's
|
||||||
|
// possible that the packet contains multiple commands, or zero commands
|
||||||
|
// when the packet is incomplete.
|
||||||
|
// 'argsbuf' is an optional reusable buffer and it can be nil.
|
||||||
|
// 'complete' indicates that a command was read. false means no more commands.
|
||||||
|
// 'args' are the output arguments for the command.
|
||||||
|
// 'kind' is the type of command that was read.
|
||||||
|
// 'leftover' is any remaining unused bytes which belong to the next command.
|
||||||
|
// 'err' is returned when a protocol error was encountered.
|
||||||
|
func ReadNextCommand(packet []byte, argsbuf [][]byte) (
|
||||||
|
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
|
||||||
|
) {
|
||||||
|
args = argsbuf[:0]
|
||||||
|
if len(packet) > 0 {
|
||||||
|
if packet[0] != '*' {
|
||||||
|
if packet[0] == '$' {
|
||||||
|
return readTile38Command(packet, args)
|
||||||
|
}
|
||||||
|
return readTelnetCommand(packet, args)
|
||||||
|
}
|
||||||
|
// standard redis command
|
||||||
|
for s, i := 1, 1; i < len(packet); i++ {
|
||||||
|
if packet[i] == '\n' {
|
||||||
|
if packet[i-1] != '\r' {
|
||||||
|
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
|
||||||
|
}
|
||||||
|
count, ok := parseInt(packet[s : i-1])
|
||||||
|
if !ok || count < 0 {
|
||||||
|
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
if count == 0 {
|
||||||
|
return true, args[:0], Redis, packet[i:], nil
|
||||||
|
}
|
||||||
|
nextArg:
|
||||||
|
for j := 0; j < count; j++ {
|
||||||
|
if i == len(packet) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if packet[i] != '$' {
|
||||||
|
return false, args[:0], Redis, packet,
|
||||||
|
&errProtocol{"expected '$', got '" +
|
||||||
|
string(packet[i]) + "'"}
|
||||||
|
}
|
||||||
|
for s := i + 1; i < len(packet); i++ {
|
||||||
|
if packet[i] == '\n' {
|
||||||
|
if packet[i-1] != '\r' {
|
||||||
|
return false, args[:0], Redis, packet, errInvalidBulkLength
|
||||||
|
}
|
||||||
|
n, ok := parseInt(packet[s : i-1])
|
||||||
|
if !ok || count <= 0 {
|
||||||
|
return false, args[:0], Redis, packet, errInvalidBulkLength
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
if len(packet)-i >= n+2 {
|
||||||
|
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
|
||||||
|
return false, args[:0], Redis, packet, errInvalidBulkLength
|
||||||
|
}
|
||||||
|
args = append(args, packet[i:i+n])
|
||||||
|
i += n + 2
|
||||||
|
if j == count-1 {
|
||||||
|
// done reading
|
||||||
|
return true, args, Redis, packet[i:], nil
|
||||||
|
}
|
||||||
|
continue nextArg
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, args[:0], Redis, packet, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readTile38Command(packet []byte, argsbuf [][]byte) (
|
||||||
|
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
|
||||||
|
) {
|
||||||
|
for i := 1; i < len(packet); i++ {
|
||||||
|
if packet[i] == ' ' {
|
||||||
|
n, ok := parseInt(packet[1:i])
|
||||||
|
if !ok || n < 0 {
|
||||||
|
return false, args[:0], Tile38, packet, errInvalidMessage
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
if len(packet) >= i+n+2 {
|
||||||
|
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
|
||||||
|
return false, args[:0], Tile38, packet, errInvalidMessage
|
||||||
|
}
|
||||||
|
line := packet[i : i+n]
|
||||||
|
reading:
|
||||||
|
for len(line) != 0 {
|
||||||
|
if line[0] == '{' {
|
||||||
|
// The native protocol cannot understand json boundaries so it assumes that
|
||||||
|
// a json element must be at the end of the line.
|
||||||
|
args = append(args, line)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if line[0] == '"' && line[len(line)-1] == '"' {
|
||||||
|
if len(args) > 0 &&
|
||||||
|
strings.ToLower(string(args[0])) == "set" &&
|
||||||
|
strings.ToLower(string(args[len(args)-1])) == "string" {
|
||||||
|
// Setting a string value that is contained inside double quotes.
|
||||||
|
// This is only because of the boundary issues of the native protocol.
|
||||||
|
args = append(args, line[1:len(line)-1])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i := 0
|
||||||
|
for ; i < len(line); i++ {
|
||||||
|
if line[i] == ' ' {
|
||||||
|
value := line[:i]
|
||||||
|
if len(value) > 0 {
|
||||||
|
args = append(args, value)
|
||||||
|
}
|
||||||
|
line = line[i+1:]
|
||||||
|
continue reading
|
||||||
|
}
|
||||||
|
}
|
||||||
|
args = append(args, line)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return true, args, Tile38, packet[i+n+2:], nil
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, args[:0], Tile38, packet, nil
|
||||||
|
}
|
||||||
|
func readTelnetCommand(packet []byte, argsbuf [][]byte) (
|
||||||
|
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
|
||||||
|
) {
|
||||||
|
// just a plain text command
|
||||||
|
for i := 0; i < len(packet); i++ {
|
||||||
|
if packet[i] == '\n' {
|
||||||
|
var line []byte
|
||||||
|
if i > 0 && packet[i-1] == '\r' {
|
||||||
|
line = packet[:i-1]
|
||||||
|
} else {
|
||||||
|
line = packet[:i]
|
||||||
|
}
|
||||||
|
var quote bool
|
||||||
|
var quotech byte
|
||||||
|
var escape bool
|
||||||
|
outer:
|
||||||
|
for {
|
||||||
|
nline := make([]byte, 0, len(line))
|
||||||
|
for i := 0; i < len(line); i++ {
|
||||||
|
c := line[i]
|
||||||
|
if !quote {
|
||||||
|
if c == ' ' {
|
||||||
|
if len(nline) > 0 {
|
||||||
|
args = append(args, nline)
|
||||||
|
}
|
||||||
|
line = line[i+1:]
|
||||||
|
continue outer
|
||||||
|
}
|
||||||
|
if c == '"' || c == '\'' {
|
||||||
|
if i != 0 {
|
||||||
|
return false, args[:0], Telnet, packet, errUnbalancedQuotes
|
||||||
|
}
|
||||||
|
quotech = c
|
||||||
|
quote = true
|
||||||
|
line = line[i+1:]
|
||||||
|
continue outer
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if escape {
|
||||||
|
escape = false
|
||||||
|
switch c {
|
||||||
|
case 'n':
|
||||||
|
c = '\n'
|
||||||
|
case 'r':
|
||||||
|
c = '\r'
|
||||||
|
case 't':
|
||||||
|
c = '\t'
|
||||||
|
}
|
||||||
|
} else if c == quotech {
|
||||||
|
quote = false
|
||||||
|
quotech = 0
|
||||||
|
args = append(args, nline)
|
||||||
|
line = line[i+1:]
|
||||||
|
if len(line) > 0 && line[0] != ' ' {
|
||||||
|
return false, args[:0], Telnet, packet, errUnbalancedQuotes
|
||||||
|
}
|
||||||
|
continue outer
|
||||||
|
} else if c == '\\' {
|
||||||
|
escape = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nline = append(nline, c)
|
||||||
|
}
|
||||||
|
if quote {
|
||||||
|
return false, args[:0], Telnet, packet, errUnbalancedQuotes
|
||||||
|
}
|
||||||
|
if len(line) > 0 {
|
||||||
|
args = append(args, line)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return true, args, Telnet, packet[i+1:], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, args[:0], Telnet, packet, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// appendPrefix will append a "$3\r\n" style redis prefix for a message.
|
||||||
|
func appendPrefix(b []byte, c byte, n int64) []byte {
|
||||||
|
if n >= 0 && n <= 9 {
|
||||||
|
return append(b, c, byte('0'+n), '\r', '\n')
|
||||||
|
}
|
||||||
|
b = append(b, c)
|
||||||
|
b = strconv.AppendInt(b, n, 10)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendUint appends a Redis protocol uint64 to the input bytes.
|
||||||
|
func AppendUint(b []byte, n uint64) []byte {
|
||||||
|
b = append(b, ':')
|
||||||
|
b = strconv.AppendUint(b, n, 10)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendInt appends a Redis protocol int64 to the input bytes.
|
||||||
|
func AppendInt(b []byte, n int64) []byte {
|
||||||
|
return appendPrefix(b, ':', n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendArray appends a Redis protocol array to the input bytes.
|
||||||
|
func AppendArray(b []byte, n int) []byte {
|
||||||
|
return appendPrefix(b, '*', int64(n))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendBulk appends a Redis protocol bulk byte slice to the input bytes.
|
||||||
|
func AppendBulk(b []byte, bulk []byte) []byte {
|
||||||
|
b = appendPrefix(b, '$', int64(len(bulk)))
|
||||||
|
b = append(b, bulk...)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendBulkString appends a Redis protocol bulk string to the input bytes.
|
||||||
|
func AppendBulkString(b []byte, bulk string) []byte {
|
||||||
|
b = appendPrefix(b, '$', int64(len(bulk)))
|
||||||
|
b = append(b, bulk...)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendString appends a Redis protocol string to the input bytes.
|
||||||
|
func AppendString(b []byte, s string) []byte {
|
||||||
|
b = append(b, '+')
|
||||||
|
b = append(b, stripNewlines(s)...)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendError appends a Redis protocol error to the input bytes.
|
||||||
|
func AppendError(b []byte, s string) []byte {
|
||||||
|
b = append(b, '-')
|
||||||
|
b = append(b, stripNewlines(s)...)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendOK appends a Redis protocol OK to the input bytes.
|
||||||
|
func AppendOK(b []byte) []byte {
|
||||||
|
return append(b, '+', 'O', 'K', '\r', '\n')
|
||||||
|
}
|
||||||
|
func stripNewlines(s string) string {
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
if s[i] == '\r' || s[i] == '\n' {
|
||||||
|
s = strings.Replace(s, "\r", " ", -1)
|
||||||
|
s = strings.Replace(s, "\n", " ", -1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendTile38 appends a Tile38 message to the input bytes.
|
||||||
|
func AppendTile38(b []byte, data []byte) []byte {
|
||||||
|
b = append(b, '$')
|
||||||
|
b = strconv.AppendInt(b, int64(len(data)), 10)
|
||||||
|
b = append(b, ' ')
|
||||||
|
b = append(b, data...)
|
||||||
|
return append(b, '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendNull appends a Redis protocol null to the input bytes.
|
||||||
|
func AppendNull(b []byte) []byte {
|
||||||
|
return append(b, '$', '-', '1', '\r', '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendBulkFloat appends a float64, as bulk bytes.
|
||||||
|
func AppendBulkFloat(dst []byte, f float64) []byte {
|
||||||
|
return AppendBulk(dst, strconv.AppendFloat(nil, f, 'f', -1, 64))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendBulkInt appends an int64, as bulk bytes.
|
||||||
|
func AppendBulkInt(dst []byte, x int64) []byte {
|
||||||
|
return AppendBulk(dst, strconv.AppendInt(nil, x, 10))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendBulkUint appends an uint64, as bulk bytes.
|
||||||
|
func AppendBulkUint(dst []byte, x uint64) []byte {
|
||||||
|
return AppendBulk(dst, strconv.AppendUint(nil, x, 10))
|
||||||
|
}
|
||||||
|
|
||||||
|
func prefixERRIfNeeded(msg string) string {
|
||||||
|
msg = strings.TrimSpace(msg)
|
||||||
|
firstWord := strings.Split(msg, " ")[0]
|
||||||
|
addERR := len(firstWord) == 0
|
||||||
|
for i := 0; i < len(firstWord); i++ {
|
||||||
|
if firstWord[i] < 'A' || firstWord[i] > 'Z' {
|
||||||
|
addERR = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if addERR {
|
||||||
|
msg = strings.TrimSpace("ERR " + msg)
|
||||||
|
}
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
|
||||||
|
// SimpleString is for representing a non-bulk representation of a string
|
||||||
|
// from an *Any call.
|
||||||
|
type SimpleString string
|
||||||
|
|
||||||
|
// SimpleInt is for representing a non-bulk representation of a int
|
||||||
|
// from an *Any call.
|
||||||
|
type SimpleInt int
|
||||||
|
|
||||||
|
// AppendAny appends any type to valid Redis type.
|
||||||
|
// nil -> null
|
||||||
|
// error -> error (adds "ERR " when first word is not uppercase)
|
||||||
|
// string -> bulk-string
|
||||||
|
// numbers -> bulk-string
|
||||||
|
// []byte -> bulk-string
|
||||||
|
// bool -> bulk-string ("0" or "1")
|
||||||
|
// slice -> array
|
||||||
|
// map -> array with key/value pairs
|
||||||
|
// SimpleString -> string
|
||||||
|
// SimpleInt -> integer
|
||||||
|
// everything-else -> bulk-string representation using fmt.Sprint()
|
||||||
|
func AppendAny(b []byte, v interface{}) []byte {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case SimpleString:
|
||||||
|
b = AppendString(b, string(v))
|
||||||
|
case SimpleInt:
|
||||||
|
b = AppendInt(b, int64(v))
|
||||||
|
case nil:
|
||||||
|
b = AppendNull(b)
|
||||||
|
case error:
|
||||||
|
b = AppendError(b, prefixERRIfNeeded(v.Error()))
|
||||||
|
|
||||||
|
case string:
|
||||||
|
b = AppendBulkString(b, v)
|
||||||
|
case []byte:
|
||||||
|
b = AppendBulk(b, v)
|
||||||
|
case bool:
|
||||||
|
if v {
|
||||||
|
b = AppendBulkString(b, "1")
|
||||||
|
} else {
|
||||||
|
b = AppendBulkString(b, "0")
|
||||||
|
}
|
||||||
|
case int:
|
||||||
|
b = AppendBulkInt(b, int64(v))
|
||||||
|
case int8:
|
||||||
|
b = AppendBulkInt(b, int64(v))
|
||||||
|
case int16:
|
||||||
|
b = AppendBulkInt(b, int64(v))
|
||||||
|
case int32:
|
||||||
|
b = AppendBulkInt(b, int64(v))
|
||||||
|
case int64:
|
||||||
|
b = AppendBulkInt(b, int64(v))
|
||||||
|
case uint:
|
||||||
|
b = AppendBulkUint(b, uint64(v))
|
||||||
|
case uint8:
|
||||||
|
b = AppendBulkUint(b, uint64(v))
|
||||||
|
case uint16:
|
||||||
|
b = AppendBulkUint(b, uint64(v))
|
||||||
|
case uint32:
|
||||||
|
b = AppendBulkUint(b, uint64(v))
|
||||||
|
case uint64:
|
||||||
|
b = AppendBulkUint(b, uint64(v))
|
||||||
|
case float32:
|
||||||
|
b = AppendBulkFloat(b, float64(v))
|
||||||
|
case float64:
|
||||||
|
b = AppendBulkFloat(b, float64(v))
|
||||||
|
default:
|
||||||
|
vv := reflect.ValueOf(v)
|
||||||
|
switch vv.Kind() {
|
||||||
|
case reflect.Slice:
|
||||||
|
n := vv.Len()
|
||||||
|
b = AppendArray(b, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
b = AppendAny(b, vv.Index(i).Interface())
|
||||||
|
}
|
||||||
|
case reflect.Map:
|
||||||
|
n := vv.Len()
|
||||||
|
b = AppendArray(b, n*2)
|
||||||
|
var i int
|
||||||
|
var strKey bool
|
||||||
|
var strsKeyItems []strKeyItem
|
||||||
|
|
||||||
|
iter := vv.MapRange()
|
||||||
|
for iter.Next() {
|
||||||
|
key := iter.Key().Interface()
|
||||||
|
if i == 0 {
|
||||||
|
if _, ok := key.(string); ok {
|
||||||
|
strKey = true
|
||||||
|
strsKeyItems = make([]strKeyItem, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if strKey {
|
||||||
|
strsKeyItems[i] = strKeyItem{
|
||||||
|
key.(string), iter.Value().Interface(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
b = AppendAny(b, key)
|
||||||
|
b = AppendAny(b, iter.Value().Interface())
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
if strKey {
|
||||||
|
sort.Slice(strsKeyItems, func(i, j int) bool {
|
||||||
|
return strsKeyItems[i].key < strsKeyItems[j].key
|
||||||
|
})
|
||||||
|
for _, item := range strsKeyItems {
|
||||||
|
b = AppendBulkString(b, item.key)
|
||||||
|
b = AppendAny(b, item.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
b = AppendBulkString(b, fmt.Sprint(v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
type strKeyItem struct {
|
||||||
|
key string
|
||||||
|
value interface{}
|
||||||
|
}
|
||||||
|
|
122
resp_test.go
122
resp_test.go
|
@ -1,9 +1,12 @@
|
||||||
package redcon
|
package redcon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func isEmptyRESP(resp RESP) bool {
|
func isEmptyRESP(resp RESP) bool {
|
||||||
|
@ -128,3 +131,122 @@ func TestRESP(t *testing.T) {
|
||||||
t.Fatalf("expected %v, got %v", 3, xx)
|
t.Fatalf("expected %v, got %v", 3, xx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNextCommand(t *testing.T) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
start := time.Now()
|
||||||
|
for time.Since(start) < time.Second {
|
||||||
|
// keep copy of pipeline args for final compare
|
||||||
|
var plargs [][][]byte
|
||||||
|
|
||||||
|
// create a pipeline of random number of commands with random data.
|
||||||
|
N := rand.Int() % 10000
|
||||||
|
var data []byte
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
nargs := rand.Int() % 10
|
||||||
|
data = AppendArray(data, nargs)
|
||||||
|
var args [][]byte
|
||||||
|
for j := 0; j < nargs; j++ {
|
||||||
|
arg := make([]byte, rand.Int()%100)
|
||||||
|
if _, err := rand.Read(arg); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
data = AppendBulk(data, arg)
|
||||||
|
args = append(args, arg)
|
||||||
|
}
|
||||||
|
plargs = append(plargs, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// break data into random number of chunks
|
||||||
|
chunkn := rand.Int() % 100
|
||||||
|
if chunkn == 0 {
|
||||||
|
chunkn = 1
|
||||||
|
}
|
||||||
|
if len(data) < chunkn {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var chunks [][]byte
|
||||||
|
var chunksz int
|
||||||
|
for i := 0; i < len(data); i += chunksz {
|
||||||
|
chunksz = rand.Int() % (len(data) / chunkn)
|
||||||
|
var chunk []byte
|
||||||
|
if i+chunksz < len(data) {
|
||||||
|
chunk = data[i : i+chunksz]
|
||||||
|
} else {
|
||||||
|
chunk = data[i:]
|
||||||
|
}
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// process chunks
|
||||||
|
var rbuf []byte
|
||||||
|
var fargs [][][]byte
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
var data []byte
|
||||||
|
if len(rbuf) > 0 {
|
||||||
|
data = append(rbuf, chunk...)
|
||||||
|
} else {
|
||||||
|
data = chunk
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
complete, args, _, leftover, err := ReadNextCommand(data, nil)
|
||||||
|
data = leftover
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !complete {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fargs = append(fargs, args)
|
||||||
|
}
|
||||||
|
rbuf = append(rbuf[:0], data...)
|
||||||
|
}
|
||||||
|
// compare final args to original
|
||||||
|
if len(plargs) != len(fargs) {
|
||||||
|
t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs))
|
||||||
|
}
|
||||||
|
for i := 0; i < len(plargs); i++ {
|
||||||
|
if len(plargs[i]) != len(fargs[i]) {
|
||||||
|
t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i]))
|
||||||
|
}
|
||||||
|
for j := 0; j < len(plargs[i]); j++ {
|
||||||
|
if !bytes.Equal(plargs[i][j], plargs[i][j]) {
|
||||||
|
t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAppendBulkFloat(t *testing.T) {
|
||||||
|
var b []byte
|
||||||
|
b = AppendString(b, "HELLO")
|
||||||
|
b = AppendBulkFloat(b, 9.123192839)
|
||||||
|
b = AppendString(b, "HELLO")
|
||||||
|
exp := "+HELLO\r\n$11\r\n9.123192839\r\n+HELLO\r\n"
|
||||||
|
if string(b) != exp {
|
||||||
|
t.Fatalf("expected '%s', got '%s'", exp, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAppendBulkInt(t *testing.T) {
|
||||||
|
var b []byte
|
||||||
|
b = AppendString(b, "HELLO")
|
||||||
|
b = AppendBulkInt(b, -9182739137)
|
||||||
|
b = AppendString(b, "HELLO")
|
||||||
|
exp := "+HELLO\r\n$11\r\n-9182739137\r\n+HELLO\r\n"
|
||||||
|
if string(b) != exp {
|
||||||
|
t.Fatalf("expected '%s', got '%s'", exp, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAppendBulkUint(t *testing.T) {
|
||||||
|
var b []byte
|
||||||
|
b = AppendString(b, "HELLO")
|
||||||
|
b = AppendBulkInt(b, 91827391370)
|
||||||
|
b = AppendString(b, "HELLO")
|
||||||
|
exp := "+HELLO\r\n$11\r\n91827391370\r\n+HELLO\r\n"
|
||||||
|
if string(b) != exp {
|
||||||
|
t.Fatalf("expected '%s', got '%s'", exp, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue