Merge and refator

This commit is contained in:
tidwall 2020-10-31 07:18:14 -07:00
commit 7c780e8bd7
9 changed files with 1153 additions and 606 deletions

View File

@ -1,478 +0,0 @@
package redcon
import (
// Kind is the kind of command
type Kind int
const (
// Redis is returned for Redis protocol commands
Redis Kind = iota
// Tile38 is returnd for Tile38 native protocol commands
// Telnet is returnd for plain telnet commands
var errInvalidMessage = &errProtocol{"invalid message"}
// ReadNextCommand reads the next command from the provided packet. It's
// possible that the packet contains multiple commands, or zero commands
// when the packet is incomplete.
// 'argsbuf' is an optional reusable buffer and it can be nil.
// 'complete' indicates that a command was read. false means no more commands.
// 'args' are the output arguments for the command.
// 'kind' is the type of command that was read.
// 'leftover' is any remaining unused bytes which belong to the next command.
// 'err' is returned when a protocol error was encountered.
func ReadNextCommand(packet []byte, argsbuf [][]byte) (
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
) {
args = argsbuf[:0]
if len(packet) > 0 {
if packet[0] != '*' {
if packet[0] == '$' {
return readTile38Command(packet, args)
return readTelnetCommand(packet, args)
// standard redis command
for s, i := 1, 1; i < len(packet); i++ {
if packet[i] == '\n' {
if packet[i-1] != '\r' {
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
count, ok := parseInt(packet[s : i-1])
if !ok || count < 0 {
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
if count == 0 {
return true, args[:0], Redis, packet[i:], nil
for j := 0; j < count; j++ {
if i == len(packet) {
if packet[i] != '$' {
return false, args[:0], Redis, packet,
&errProtocol{"expected '$', got '" +
string(packet[i]) + "'"}
for s := i + 1; i < len(packet); i++ {
if packet[i] == '\n' {
if packet[i-1] != '\r' {
return false, args[:0], Redis, packet, errInvalidBulkLength
n, ok := parseInt(packet[s : i-1])
if !ok || count <= 0 {
return false, args[:0], Redis, packet, errInvalidBulkLength
if len(packet)-i >= n+2 {
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
return false, args[:0], Redis, packet, errInvalidBulkLength
args = append(args, packet[i:i+n])
i += n + 2
if j == count-1 {
// done reading
return true, args, Redis, packet[i:], nil
continue nextArg
return false, args[:0], Redis, packet, nil
func readTile38Command(packet []byte, argsbuf [][]byte) (
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
) {
for i := 1; i < len(packet); i++ {
if packet[i] == ' ' {
n, ok := parseInt(packet[1:i])
if !ok || n < 0 {
return false, args[:0], Tile38, packet, errInvalidMessage
if len(packet) >= i+n+2 {
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
return false, args[:0], Tile38, packet, errInvalidMessage
line := packet[i : i+n]
for len(line) != 0 {
if line[0] == '{' {
// The native protocol cannot understand json boundaries so it assumes that
// a json element must be at the end of the line.
args = append(args, line)
if line[0] == '"' && line[len(line)-1] == '"' {
if len(args) > 0 &&
strings.ToLower(string(args[0])) == "set" &&
strings.ToLower(string(args[len(args)-1])) == "string" {
// Setting a string value that is contained inside double quotes.
// This is only because of the boundary issues of the native protocol.
args = append(args, line[1:len(line)-1])
i := 0
for ; i < len(line); i++ {
if line[i] == ' ' {
value := line[:i]
if len(value) > 0 {
args = append(args, value)
line = line[i+1:]
continue reading
args = append(args, line)
return true, args, Tile38, packet[i+n+2:], nil
return false, args[:0], Tile38, packet, nil
func readTelnetCommand(packet []byte, argsbuf [][]byte) (
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
) {
// just a plain text command
for i := 0; i < len(packet); i++ {
if packet[i] == '\n' {
var line []byte
if i > 0 && packet[i-1] == '\r' {
line = packet[:i-1]
} else {
line = packet[:i]
var quote bool
var quotech byte
var escape bool
for {
nline := make([]byte, 0, len(line))
for i := 0; i < len(line); i++ {
c := line[i]
if !quote {
if c == ' ' {
if len(nline) > 0 {
args = append(args, nline)
line = line[i+1:]
continue outer
if c == '"' || c == '\'' {
if i != 0 {
return false, args[:0], Telnet, packet, errUnbalancedQuotes
quotech = c
quote = true
line = line[i+1:]
continue outer
} else {
if escape {
escape = false
switch c {
case 'n':
c = '\n'
case 'r':
c = '\r'
case 't':
c = '\t'
} else if c == quotech {
quote = false
quotech = 0
args = append(args, nline)
line = line[i+1:]
if len(line) > 0 && line[0] != ' ' {
return false, args[:0], Telnet, packet, errUnbalancedQuotes
continue outer
} else if c == '\\' {
escape = true
nline = append(nline, c)
if quote {
return false, args[:0], Telnet, packet, errUnbalancedQuotes
if len(line) > 0 {
args = append(args, line)
return true, args, Telnet, packet[i+1:], nil
return false, args[:0], Telnet, packet, nil
// appendPrefix will append a "$3\r\n" style redis prefix for a message.
func appendPrefix(b []byte, c byte, n int64) []byte {
if n >= 0 && n <= 9 {
return append(b, c, byte('0'+n), '\r', '\n')
b = append(b, c)
b = strconv.AppendInt(b, n, 10)
return append(b, '\r', '\n')
// AppendUint appends a Redis protocol uint64 to the input bytes.
func AppendUint(b []byte, n uint64) []byte {
b = append(b, ':')
b = strconv.AppendUint(b, n, 10)
return append(b, '\r', '\n')
// AppendInt appends a Redis protocol int64 to the input bytes.
func AppendInt(b []byte, n int64) []byte {
return appendPrefix(b, ':', n)
// AppendArray appends a Redis protocol array to the input bytes.
func AppendArray(b []byte, n int) []byte {
return appendPrefix(b, '*', int64(n))
// AppendBulk appends a Redis protocol bulk byte slice to the input bytes.
func AppendBulk(b []byte, bulk []byte) []byte {
b = appendPrefix(b, '$', int64(len(bulk)))
b = append(b, bulk...)
return append(b, '\r', '\n')
// AppendBulkString appends a Redis protocol bulk string to the input bytes.
func AppendBulkString(b []byte, bulk string) []byte {
b = appendPrefix(b, '$', int64(len(bulk)))
b = append(b, bulk...)
return append(b, '\r', '\n')
// AppendString appends a Redis protocol string to the input bytes.
func AppendString(b []byte, s string) []byte {
b = append(b, '+')
b = append(b, stripNewlines(s)...)
return append(b, '\r', '\n')
// AppendError appends a Redis protocol error to the input bytes.
func AppendError(b []byte, s string) []byte {
b = append(b, '-')
b = append(b, stripNewlines(s)...)
return append(b, '\r', '\n')
// AppendOK appends a Redis protocol OK to the input bytes.
func AppendOK(b []byte) []byte {
return append(b, '+', 'O', 'K', '\r', '\n')
func stripNewlines(s string) string {
for i := 0; i < len(s); i++ {
if s[i] == '\r' || s[i] == '\n' {
s = strings.Replace(s, "\r", " ", -1)
s = strings.Replace(s, "\n", " ", -1)
return s
// AppendTile38 appends a Tile38 message to the input bytes.
func AppendTile38(b []byte, data []byte) []byte {
b = append(b, '$')
b = strconv.AppendInt(b, int64(len(data)), 10)
b = append(b, ' ')
b = append(b, data...)
return append(b, '\r', '\n')
// AppendNull appends a Redis protocol null to the input bytes.
func AppendNull(b []byte) []byte {
return append(b, '$', '-', '1', '\r', '\n')
// AppendBulkFloat appends a float64, as bulk bytes.
func AppendBulkFloat(dst []byte, f float64) []byte {
return AppendBulk(dst, strconv.AppendFloat(nil, f, 'f', -1, 64))
// AppendBulkInt appends an int64, as bulk bytes.
func AppendBulkInt(dst []byte, x int64) []byte {
return AppendBulk(dst, strconv.AppendInt(nil, x, 10))
// AppendBulkUint appends an uint64, as bulk bytes.
func AppendBulkUint(dst []byte, x uint64) []byte {
return AppendBulk(dst, strconv.AppendUint(nil, x, 10))
func prefixERRIfNeeded(msg string) string {
msg = strings.TrimSpace(msg)
firstWord := strings.Split(msg, " ")[0]
addERR := len(firstWord) == 0
for i := 0; i < len(firstWord); i++ {
if firstWord[i] < 'A' || firstWord[i] > 'Z' {
addERR = true
if addERR {
msg = strings.TrimSpace("ERR " + msg)
return msg
// SimpleString is for representing a non-bulk representation of a string
// from an *Any call.
type SimpleString string
// SimpleInt is for representing a non-bulk representation of a int
// from an *Any call.
type SimpleInt int
// Marshaler is the interface implemented by types that
// can marshal themselves into a Redis response type from an *Any call.
// The return value is not check for validity.
type Marshaler interface {
MarshalRESP() []byte
// AppendAny appends any type to valid Redis type.
// nil -> null
// error -> error (adds "ERR " when first word is not uppercase)
// string -> bulk-string
// numbers -> bulk-string
// []byte -> bulk-string
// bool -> bulk-string ("0" or "1")
// slice -> array
// map -> array with key/value pairs
// SimpleString -> string
// SimpleInt -> integer
// Marshaler -> raw bytes
// everything-else -> bulk-string representation using fmt.Sprint()
func AppendAny(b []byte, v interface{}) []byte {
switch v := v.(type) {
case SimpleString:
b = AppendString(b, string(v))
case SimpleInt:
b = AppendInt(b, int64(v))
case nil:
b = AppendNull(b)
case error:
b = AppendError(b, prefixERRIfNeeded(v.Error()))
case string:
b = AppendBulkString(b, v)
case []byte:
b = AppendBulk(b, v)
case bool:
if v {
b = AppendBulkString(b, "1")
} else {
b = AppendBulkString(b, "0")
case int:
b = AppendBulkInt(b, int64(v))
case int8:
b = AppendBulkInt(b, int64(v))
case int16:
b = AppendBulkInt(b, int64(v))
case int32:
b = AppendBulkInt(b, int64(v))
case int64:
b = AppendBulkInt(b, int64(v))
case uint:
b = AppendBulkUint(b, uint64(v))
case uint8:
b = AppendBulkUint(b, uint64(v))
case uint16:
b = AppendBulkUint(b, uint64(v))
case uint32:
b = AppendBulkUint(b, uint64(v))
case uint64:
b = AppendBulkUint(b, uint64(v))
case float32:
b = AppendBulkFloat(b, float64(v))
case float64:
b = AppendBulkFloat(b, float64(v))
case Marshaler:
b = append(b, v.MarshalRESP()...)
vv := reflect.ValueOf(v)
switch vv.Kind() {
case reflect.Slice:
n := vv.Len()
b = AppendArray(b, n)
for i := 0; i < n; i++ {
b = AppendAny(b, vv.Index(i).Interface())
case reflect.Map:
n := vv.Len()
b = AppendArray(b, n*2)
var i int
var strKey bool
var strsKeyItems []strKeyItem
iter := vv.MapRange()
for iter.Next() {
key := iter.Key().Interface()
if i == 0 {
if _, ok := key.(string); ok {
strKey = true
strsKeyItems = make([]strKeyItem, n)
if strKey {
strsKeyItems[i] = strKeyItem{
key.(string), iter.Value().Interface(),
} else {
b = AppendAny(b, key)
b = AppendAny(b, iter.Value().Interface())
if strKey {
sort.Slice(strsKeyItems, func(i, j int) bool {
return strsKeyItems[i].key < strsKeyItems[j].key
for _, item := range strsKeyItems {
b = AppendBulkString(b, item.key)
b = AppendAny(b, item.value)
b = AppendBulkString(b, fmt.Sprint(v))
return b
type strKeyItem struct {
key string
value interface{}

View File

@ -1,127 +0,0 @@
package redcon
import (
func TestNextCommand(t *testing.T) {
start := time.Now()
for time.Since(start) < time.Second {
// keep copy of pipeline args for final compare
var plargs [][][]byte
// create a pipeline of random number of commands with random data.
N := rand.Int() % 10000
var data []byte
for i := 0; i < N; i++ {
nargs := rand.Int() % 10
data = AppendArray(data, nargs)
var args [][]byte
for j := 0; j < nargs; j++ {
arg := make([]byte, rand.Int()%100)
if _, err := rand.Read(arg); err != nil {
data = AppendBulk(data, arg)
args = append(args, arg)
plargs = append(plargs, args)
// break data into random number of chunks
chunkn := rand.Int() % 100
if chunkn == 0 {
chunkn = 1
if len(data) < chunkn {
var chunks [][]byte
var chunksz int
for i := 0; i < len(data); i += chunksz {
chunksz = rand.Int() % (len(data) / chunkn)
var chunk []byte
if i+chunksz < len(data) {
chunk = data[i : i+chunksz]
} else {
chunk = data[i:]
chunks = append(chunks, chunk)
// process chunks
var rbuf []byte
var fargs [][][]byte
for _, chunk := range chunks {
var data []byte
if len(rbuf) > 0 {
data = append(rbuf, chunk...)
} else {
data = chunk
for {
complete, args, _, leftover, err := ReadNextCommand(data, nil)
data = leftover
if err != nil {
if !complete {
fargs = append(fargs, args)
rbuf = append(rbuf[:0], data...)
// compare final args to original
if len(plargs) != len(fargs) {
t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs))
for i := 0; i < len(plargs); i++ {
if len(plargs[i]) != len(fargs[i]) {
t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i]))
for j := 0; j < len(plargs[i]); j++ {
if !bytes.Equal(plargs[i][j], plargs[i][j]) {
t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j]))
func TestAppendBulkFloat(t *testing.T) {
var b []byte
b = AppendString(b, "HELLO")
b = AppendBulkFloat(b, 9.123192839)
b = AppendString(b, "HELLO")
exp := "+HELLO\r\n$11\r\n9.123192839\r\n+HELLO\r\n"
if string(b) != exp {
t.Fatalf("expected '%s', got '%s'", exp, b)
func TestAppendBulkInt(t *testing.T) {
var b []byte
b = AppendString(b, "HELLO")
b = AppendBulkInt(b, -9182739137)
b = AppendString(b, "HELLO")
exp := "+HELLO\r\n$11\r\n-9182739137\r\n+HELLO\r\n"
if string(b) != exp {
t.Fatalf("expected '%s', got '%s'", exp, b)
func TestAppendBulkUint(t *testing.T) {
var b []byte
b = AppendString(b, "HELLO")
b = AppendBulkInt(b, 91827391370)
b = AppendString(b, "HELLO")
exp := "+HELLO\r\n$11\r\n91827391370\r\n+HELLO\r\n"
if string(b) != exp {
t.Fatalf("expected '%s', got '%s'", exp, b)

View File

@ -13,12 +13,33 @@ var addr = ":6380"
func main() {
var mu sync.RWMutex
var items = make(map[string][]byte)
var ps redcon.PubSub
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
case "publish":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2]))
case "subscribe", "psubscribe":
if len(cmd.Args) < 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
command := strings.ToLower(string(cmd.Args[0]))
for i := 1; i < len(cmd.Args); i++ {
if command == "psubscribe" {
ps.Psubscribe(conn, string(cmd.Args[i]))
} else {
ps.Subscribe(conn, string(cmd.Args[i]))
case "detach":
hconn := conn.Detach()
log.Printf("connection has been detached")
@ -27,7 +48,6 @@ func main() {
case "ping":
case "quit":

go.mod Normal file
View File

@ -0,0 +1,8 @@
go 1.15
require ( v0.2.2 v1.0.1

go.sum Normal file
View File

@ -0,0 +1,4 @@ v0.2.2 h1:VVo0JW/tdidNdQzNsDR4wMbL3heaxA1DGleyzQ3/niY= v0.2.2/go.mod h1:huei1BkDWJ3/sLXmO+bsCNELL+Bp2Kks9OLyQFkzvA8= v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=

View File

@ -5,10 +5,14 @@ import (
var (
@ -1005,3 +1009,340 @@ func (m *ServeMux) ServeRESP(conn Conn, cmd Command) {
conn.WriteError("ERR unknown command '" + command + "'")
// PubSub is a Redis compatible pub/sub server
type PubSub struct {
mu sync.RWMutex
nextid uint64
initd bool
chans *btree.BTree
conns map[Conn]*pubSubConn
// Subscribe a connection to PubSub
func (ps *PubSub) Subscribe(conn Conn, channel string) {
ps.subscribe(conn, false, channel)
// Psubscribe a connection to PubSub
func (ps *PubSub) Psubscribe(conn Conn, channel string) {
ps.subscribe(conn, true, channel)
// Publish a message to subscribers
func (ps *PubSub) Publish(channel, message string) int {
if !ps.initd {
return 0
var sent int
// write messages to all clients that are subscribed on the channel
pivot := &pubSubEntry{pattern: false, channel: channel}
ps.chans.Ascend(pivot, func(item interface{}) bool {
entry := item.(*pubSubEntry)
if != || entry.pattern != pivot.pattern {
return false
entry.sconn.writeMessage(entry.pattern, "", channel, message)
return true
// match on and write all psubscribe clients
pivot = &pubSubEntry{pattern: true}
ps.chans.Ascend(pivot, func(item interface{}) bool {
entry := item.(*pubSubEntry)
if match.Match(channel, {
entry.sconn.writeMessage(entry.pattern,, channel,
return true
return sent
type pubSubConn struct {
id uint64
mu sync.Mutex
conn Conn
dconn DetachedConn
entries map[*pubSubEntry]bool
type pubSubEntry struct {
pattern bool
sconn *pubSubConn
channel string
func (sconn *pubSubConn) writeMessage(pat bool, pchan, channel, msg string) {
if pat {
} else {
// bgrunner runs in the background and reads incoming commands from the
// detached client.
func (sconn *pubSubConn) bgrunner(ps *PubSub) {
defer func() {
// client connection has ended, disconnect from the PubSub instances
// and close the network connection.
for entry := range sconn.entries {
delete(ps.conns, sconn.conn)
for {
cmd, err := sconn.dconn.ReadCommand()
if err != nil {
if len(cmd.Args) == 0 {
switch strings.ToLower(string(cmd.Args[0])) {
case "psubscribe", "subscribe":
if len(cmd.Args) < 2 {
func() {
sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+
"arguments for '%s'", cmd.Args[0]))
command := strings.ToLower(string(cmd.Args[0]))
for i := 1; i < len(cmd.Args); i++ {
if command == "psubscribe" {
ps.Psubscribe(sconn.conn, string(cmd.Args[i]))
} else {
ps.Subscribe(sconn.conn, string(cmd.Args[i]))
case "unsubscribe", "punsubscribe":
pattern := strings.ToLower(string(cmd.Args[0])) == "punsubscribe"
if len(cmd.Args) == 1 {
ps.unsubscribe(sconn.conn, pattern, true, "")
} else {
for i := 1; i < len(cmd.Args); i++ {
channel := string(cmd.Args[i])
ps.unsubscribe(sconn.conn, pattern, false, channel)
case "quit":
func() {
case "ping":
var msg string
switch len(cmd.Args) {
case 1:
case 2:
msg = string(cmd.Args[1])
func() {
sconn.dconn.WriteError(fmt.Sprintf("ERR wrong number of "+
"arguments for '%s'", cmd.Args[0]))
func() {
func() {
sconn.dconn.WriteError(fmt.Sprintf("ERR Can't execute '%s': "+
"allowed in this context", cmd.Args[0]))
// byEntry is a "less" function that sorts the entries in a btree. The tree
// is sorted be (pattern, channel, All pattern=true entries are at
// the end (right) of the tree.
func byEntry(a, b interface{}) bool {
aa := a.(*pubSubEntry)
bb := b.(*pubSubEntry)
if !aa.pattern && bb.pattern {
return true
if aa.pattern && !bb.pattern {
return false
if < {
return true
if > {
return false
var aid uint64
var bid uint64
if aa.sconn != nil {
aid =
if bb.sconn != nil {
bid =
return aid < bid
func (ps *PubSub) subscribe(conn Conn, pattern bool, channel string) {
// initialize the PubSub instance
if !ps.initd {
ps.conns = make(map[Conn]*pubSubConn)
ps.chans = btree.New(byEntry)
ps.initd = true
// fetch the pubSubConn
sconn, ok := ps.conns[conn]
if !ok {
// initialize a new pubSubConn, which runs on a detached connection,
// and attach it to the PubSub channels/conn btree
dconn := conn.Detach()
sconn = &pubSubConn{
id: ps.nextid,
conn: conn,
dconn: dconn,
entries: make(map[*pubSubEntry]bool),
ps.conns[conn] = sconn
// add an entry to the pubsub btree
entry := &pubSubEntry{
pattern: pattern,
channel: channel,
sconn: sconn,
sconn.entries[entry] = true
// send a message to the client
if pattern {
} else {
var count int
for ient := range sconn.entries {
if ient.pattern == pattern {
// start the background client operation
if !ok {
go sconn.bgrunner(ps)
func (ps *PubSub) unsubscribe(conn Conn, pattern, all bool, channel string) {
// fetch the pubSubConn. This must exist
sconn := ps.conns[conn]
removeEntry := func(entry *pubSubEntry) {
if entry != nil {
delete(sconn.entries, entry)
if pattern {
} else {
if entry != nil {
} else {
var count int
for ient := range sconn.entries {
if ient.pattern == pattern {
if all {
// unsubscribe from all (p)subscribe entries
var entries []*pubSubEntry
for ient := range sconn.entries {
if ient.pattern == pattern {
entries = append(entries, ient)
if len(entries) == 0 {
} else {
for _, entry := range entries {
} else {
// unsubscribe single channel from (p)subscribe.
var entry *pubSubEntry
for ient := range sconn.entries {
if ient.pattern == pattern && == channel {

View File

@ -1,6 +1,7 @@
package redcon
import (
@ -10,6 +11,7 @@ import (
@ -554,3 +556,185 @@ func TestParse(t *testing.T) {
t.Fatalf("expected '%v', got '%v'", "A", string(cmd.Args[0]))
func TestPubSub(t *testing.T) {
addr := ":12346"
done := make(chan bool)
go func() {
var ps PubSub
go func() {
tch := time.NewTicker(time.Millisecond * 5)
defer tch.Stop()
channels := []string{"achan1", "bchan2", "cchan3", "dchan4"}
for i := 0; ; i++ {
select {
case <-tch.C:
case <-done:
for {
var empty bool
if len(ps.conns) == 0 {
if ps.chans.Len() != 0 {
panic("chans not empty")
empty = true
if empty {
time.Sleep(time.Millisecond * 10)
done <- true
channel := channels[i%len(channels)]
message := fmt.Sprintf("message %d", i)
ps.Publish(channel, message)
t.Fatal(ListenAndServe(addr, func(conn Conn, cmd Command) {
switch strings.ToLower(string(cmd.Args[0])) {
conn.WriteError("ERR unknown command '" +
string(cmd.Args[0]) + "'")
case "publish":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" +
string(cmd.Args[0]) + "' command")
count := ps.Publish(string(cmd.Args[1]), string(cmd.Args[2]))
case "subscribe", "psubscribe":
if len(cmd.Args) < 2 {
conn.WriteError("ERR wrong number of arguments for '" +
string(cmd.Args[0]) + "' command")
command := strings.ToLower(string(cmd.Args[0]))
for i := 1; i < len(cmd.Args); i++ {
if command == "psubscribe" {
ps.Psubscribe(conn, string(cmd.Args[i]))
} else {
ps.Subscribe(conn, string(cmd.Args[i]))
}, nil, nil))
final := make(chan bool)
go func() {
select {
case <-time.Tick(time.Second * 30):
case <-final:
// create 10 connections
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go func(i int) {
defer wg.Done()
var conn net.Conn
for i := 0; i < 5; i++ {
var err error
conn, err = net.Dial("tcp", addr)
if err != nil {
time.Sleep(time.Second / 10)
if conn == nil {
panic("could not connect to server")
defer conn.Close()
regs := make(map[string]int)
var maxp int
var maxs int
fmt.Fprintf(conn, "subscribe achan1\r\n")
fmt.Fprintf(conn, "subscribe bchan2 cchan3\r\n")
fmt.Fprintf(conn, "psubscribe a*1\r\n")
fmt.Fprintf(conn, "psubscribe b*2 c*3\r\n")
// collect 50 messages from each channel
rd := bufio.NewReader(conn)
var buf []byte
for {
line, err := rd.ReadBytes('\n')
if err != nil {
buf = append(buf, line...)
n, resp := ReadNextRESP(buf)
if n == 0 {
buf = nil
if resp.Type != Array {
panic("expected array")
var vals []RESP
resp.ForEach(func(item RESP) bool {
vals = append(vals, item)
return true
name := string(vals[0].Data)
switch name {
case "subscribe":
if len(vals) != 3 {
panic("invalid count")
ch := string(vals[1].Data)
regs[ch] = 0
maxs, _ = strconv.Atoi(string(vals[2].Data))
case "psubscribe":
if len(vals) != 3 {
panic("invalid count")
ch := string(vals[1].Data)
regs[ch] = 0
maxp, _ = strconv.Atoi(string(vals[2].Data))
case "message":
if len(vals) != 3 {
panic("invalid count")
ch := string(vals[1].Data)
regs[ch] = regs[ch] + 1
case "pmessage":
if len(vals) != 4 {
panic("invalid count")
ch := string(vals[1].Data)
regs[ch] = regs[ch] + 1
if len(regs) == 6 && maxp == 3 && maxs == 3 {
ready := true
for _, count := range regs {
if count < 50 {
ready = false
if ready {
// all messages have been received
// notify sender
done <- true
// wait for sender
// stop the timeout
final <- true

View File

@ -1,7 +1,11 @@
package redcon
import (
// Type of RESP
@ -128,3 +132,472 @@ func ReadNextRESP(b []byte) (n int, resp RESP) {
resp.Raw = b[0 : i+tn]
return len(resp.Raw), resp
// Kind is the kind of command
type Kind int
const (
// Redis is returned for Redis protocol commands
Redis Kind = iota
// Tile38 is returnd for Tile38 native protocol commands
// Telnet is returnd for plain telnet commands
var errInvalidMessage = &errProtocol{"invalid message"}
// ReadNextCommand reads the next command from the provided packet. It's
// possible that the packet contains multiple commands, or zero commands
// when the packet is incomplete.
// 'argsbuf' is an optional reusable buffer and it can be nil.
// 'complete' indicates that a command was read. false means no more commands.
// 'args' are the output arguments for the command.
// 'kind' is the type of command that was read.
// 'leftover' is any remaining unused bytes which belong to the next command.
// 'err' is returned when a protocol error was encountered.
func ReadNextCommand(packet []byte, argsbuf [][]byte) (
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
) {
args = argsbuf[:0]
if len(packet) > 0 {
if packet[0] != '*' {
if packet[0] == '$' {
return readTile38Command(packet, args)
return readTelnetCommand(packet, args)
// standard redis command
for s, i := 1, 1; i < len(packet); i++ {
if packet[i] == '\n' {
if packet[i-1] != '\r' {
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
count, ok := parseInt(packet[s : i-1])
if !ok || count < 0 {
return false, args[:0], Redis, packet, errInvalidMultiBulkLength
if count == 0 {
return true, args[:0], Redis, packet[i:], nil
for j := 0; j < count; j++ {
if i == len(packet) {
if packet[i] != '$' {
return false, args[:0], Redis, packet,
&errProtocol{"expected '$', got '" +
string(packet[i]) + "'"}
for s := i + 1; i < len(packet); i++ {
if packet[i] == '\n' {
if packet[i-1] != '\r' {
return false, args[:0], Redis, packet, errInvalidBulkLength
n, ok := parseInt(packet[s : i-1])
if !ok || count <= 0 {
return false, args[:0], Redis, packet, errInvalidBulkLength
if len(packet)-i >= n+2 {
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
return false, args[:0], Redis, packet, errInvalidBulkLength
args = append(args, packet[i:i+n])
i += n + 2
if j == count-1 {
// done reading
return true, args, Redis, packet[i:], nil
continue nextArg
return false, args[:0], Redis, packet, nil
func readTile38Command(packet []byte, argsbuf [][]byte) (
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
) {
for i := 1; i < len(packet); i++ {
if packet[i] == ' ' {
n, ok := parseInt(packet[1:i])
if !ok || n < 0 {
return false, args[:0], Tile38, packet, errInvalidMessage
if len(packet) >= i+n+2 {
if packet[i+n] != '\r' || packet[i+n+1] != '\n' {
return false, args[:0], Tile38, packet, errInvalidMessage
line := packet[i : i+n]
for len(line) != 0 {
if line[0] == '{' {
// The native protocol cannot understand json boundaries so it assumes that
// a json element must be at the end of the line.
args = append(args, line)
if line[0] == '"' && line[len(line)-1] == '"' {
if len(args) > 0 &&
strings.ToLower(string(args[0])) == "set" &&
strings.ToLower(string(args[len(args)-1])) == "string" {
// Setting a string value that is contained inside double quotes.
// This is only because of the boundary issues of the native protocol.
args = append(args, line[1:len(line)-1])
i := 0
for ; i < len(line); i++ {
if line[i] == ' ' {
value := line[:i]
if len(value) > 0 {
args = append(args, value)
line = line[i+1:]
continue reading
args = append(args, line)
return true, args, Tile38, packet[i+n+2:], nil
return false, args[:0], Tile38, packet, nil
func readTelnetCommand(packet []byte, argsbuf [][]byte) (
complete bool, args [][]byte, kind Kind, leftover []byte, err error,
) {
// just a plain text command
for i := 0; i < len(packet); i++ {
if packet[i] == '\n' {
var line []byte
if i > 0 && packet[i-1] == '\r' {
line = packet[:i-1]
} else {
line = packet[:i]
var quote bool
var quotech byte
var escape bool
for {
nline := make([]byte, 0, len(line))
for i := 0; i < len(line); i++ {
c := line[i]
if !quote {
if c == ' ' {
if len(nline) > 0 {
args = append(args, nline)
line = line[i+1:]
continue outer
if c == '"' || c == '\'' {
if i != 0 {
return false, args[:0], Telnet, packet, errUnbalancedQuotes
quotech = c
quote = true
line = line[i+1:]
continue outer
} else {
if escape {
escape = false
switch c {
case 'n':
c = '\n'
case 'r':
c = '\r'
case 't':
c = '\t'
} else if c == quotech {
quote = false
quotech = 0
args = append(args, nline)
line = line[i+1:]
if len(line) > 0 && line[0] != ' ' {
return false, args[:0], Telnet, packet, errUnbalancedQuotes
continue outer
} else if c == '\\' {
escape = true
nline = append(nline, c)
if quote {
return false, args[:0], Telnet, packet, errUnbalancedQuotes
if len(line) > 0 {
args = append(args, line)
return true, args, Telnet, packet[i+1:], nil
return false, args[:0], Telnet, packet, nil
// appendPrefix will append a "$3\r\n" style redis prefix for a message.
func appendPrefix(b []byte, c byte, n int64) []byte {
if n >= 0 && n <= 9 {
return append(b, c, byte('0'+n), '\r', '\n')
b = append(b, c)
b = strconv.AppendInt(b, n, 10)
return append(b, '\r', '\n')
// AppendUint appends a Redis protocol uint64 to the input bytes.
func AppendUint(b []byte, n uint64) []byte {
b = append(b, ':')
b = strconv.AppendUint(b, n, 10)
return append(b, '\r', '\n')
// AppendInt appends a Redis protocol int64 to the input bytes.
func AppendInt(b []byte, n int64) []byte {
return appendPrefix(b, ':', n)
// AppendArray appends a Redis protocol array to the input bytes.
func AppendArray(b []byte, n int) []byte {
return appendPrefix(b, '*', int64(n))
// AppendBulk appends a Redis protocol bulk byte slice to the input bytes.
func AppendBulk(b []byte, bulk []byte) []byte {
b = appendPrefix(b, '$', int64(len(bulk)))
b = append(b, bulk...)
return append(b, '\r', '\n')
// AppendBulkString appends a Redis protocol bulk string to the input bytes.
func AppendBulkString(b []byte, bulk string) []byte {
b = appendPrefix(b, '$', int64(len(bulk)))
b = append(b, bulk...)
return append(b, '\r', '\n')
// AppendString appends a Redis protocol string to the input bytes.
func AppendString(b []byte, s string) []byte {
b = append(b, '+')
b = append(b, stripNewlines(s)...)
return append(b, '\r', '\n')
// AppendError appends a Redis protocol error to the input bytes.
func AppendError(b []byte, s string) []byte {
b = append(b, '-')
b = append(b, stripNewlines(s)...)
return append(b, '\r', '\n')
// AppendOK appends a Redis protocol OK to the input bytes.
func AppendOK(b []byte) []byte {
return append(b, '+', 'O', 'K', '\r', '\n')
func stripNewlines(s string) string {
for i := 0; i < len(s); i++ {
if s[i] == '\r' || s[i] == '\n' {
s = strings.Replace(s, "\r", " ", -1)
s = strings.Replace(s, "\n", " ", -1)
return s
// AppendTile38 appends a Tile38 message to the input bytes.
func AppendTile38(b []byte, data []byte) []byte {
b = append(b, '$')
b = strconv.AppendInt(b, int64(len(data)), 10)
b = append(b, ' ')
b = append(b, data...)
return append(b, '\r', '\n')
// AppendNull appends a Redis protocol null to the input bytes.
func AppendNull(b []byte) []byte {
return append(b, '$', '-', '1', '\r', '\n')
// AppendBulkFloat appends a float64, as bulk bytes.
func AppendBulkFloat(dst []byte, f float64) []byte {
return AppendBulk(dst, strconv.AppendFloat(nil, f, 'f', -1, 64))
// AppendBulkInt appends an int64, as bulk bytes.
func AppendBulkInt(dst []byte, x int64) []byte {
return AppendBulk(dst, strconv.AppendInt(nil, x, 10))
// AppendBulkUint appends an uint64, as bulk bytes.
func AppendBulkUint(dst []byte, x uint64) []byte {
return AppendBulk(dst, strconv.AppendUint(nil, x, 10))
func prefixERRIfNeeded(msg string) string {
msg = strings.TrimSpace(msg)
firstWord := strings.Split(msg, " ")[0]
addERR := len(firstWord) == 0
for i := 0; i < len(firstWord); i++ {
if firstWord[i] < 'A' || firstWord[i] > 'Z' {
addERR = true
if addERR {
msg = strings.TrimSpace("ERR " + msg)
return msg
// SimpleString is for representing a non-bulk representation of a string
// from an *Any call.
type SimpleString string
// SimpleInt is for representing a non-bulk representation of a int
// from an *Any call.
type SimpleInt int
// Marshaler is the interface implemented by types that
// can marshal themselves into a Redis response type from an *Any call.
// The return value is not check for validity.
type Marshaler interface {
MarshalRESP() []byte
// AppendAny appends any type to valid Redis type.
// nil -> null
// error -> error (adds "ERR " when first word is not uppercase)
// string -> bulk-string
// numbers -> bulk-string
// []byte -> bulk-string
// bool -> bulk-string ("0" or "1")
// slice -> array
// map -> array with key/value pairs
// SimpleString -> string
// SimpleInt -> integer
// Marshaler -> raw bytes
// everything-else -> bulk-string representation using fmt.Sprint()
func AppendAny(b []byte, v interface{}) []byte {
switch v := v.(type) {
case SimpleString:
b = AppendString(b, string(v))
case SimpleInt:
b = AppendInt(b, int64(v))
case nil:
b = AppendNull(b)
case error:
b = AppendError(b, prefixERRIfNeeded(v.Error()))
case string:
b = AppendBulkString(b, v)
case []byte:
b = AppendBulk(b, v)
case bool:
if v {
b = AppendBulkString(b, "1")
} else {
b = AppendBulkString(b, "0")
case int:
b = AppendBulkInt(b, int64(v))
case int8:
b = AppendBulkInt(b, int64(v))
case int16:
b = AppendBulkInt(b, int64(v))
case int32:
b = AppendBulkInt(b, int64(v))
case int64:
b = AppendBulkInt(b, int64(v))
case uint:
b = AppendBulkUint(b, uint64(v))
case uint8:
b = AppendBulkUint(b, uint64(v))
case uint16:
b = AppendBulkUint(b, uint64(v))
case uint32:
b = AppendBulkUint(b, uint64(v))
case uint64:
b = AppendBulkUint(b, uint64(v))
case float32:
b = AppendBulkFloat(b, float64(v))
case float64:
b = AppendBulkFloat(b, float64(v))
case Marshaler:
b = append(b, v.MarshalRESP()...)
vv := reflect.ValueOf(v)
switch vv.Kind() {
case reflect.Slice:
n := vv.Len()
b = AppendArray(b, n)
for i := 0; i < n; i++ {
b = AppendAny(b, vv.Index(i).Interface())
case reflect.Map:
n := vv.Len()
b = AppendArray(b, n*2)
var i int
var strKey bool
var strsKeyItems []strKeyItem
iter := vv.MapRange()
for iter.Next() {
key := iter.Key().Interface()
if i == 0 {
if _, ok := key.(string); ok {
strKey = true
strsKeyItems = make([]strKeyItem, n)
if strKey {
strsKeyItems[i] = strKeyItem{
key.(string), iter.Value().Interface(),
} else {
b = AppendAny(b, key)
b = AppendAny(b, iter.Value().Interface())
if strKey {
sort.Slice(strsKeyItems, func(i, j int) bool {
return strsKeyItems[i].key < strsKeyItems[j].key
for _, item := range strsKeyItems {
b = AppendBulkString(b, item.key)
b = AppendAny(b, item.value)
b = AppendBulkString(b, fmt.Sprint(v))
return b
type strKeyItem struct {
key string
value interface{}

View File

@ -1,9 +1,12 @@
package redcon
import (
func isEmptyRESP(resp RESP) bool {
@ -128,3 +131,122 @@ func TestRESP(t *testing.T) {
t.Fatalf("expected %v, got %v", 3, xx)
func TestNextCommand(t *testing.T) {
start := time.Now()
for time.Since(start) < time.Second {
// keep copy of pipeline args for final compare
var plargs [][][]byte
// create a pipeline of random number of commands with random data.
N := rand.Int() % 10000
var data []byte
for i := 0; i < N; i++ {
nargs := rand.Int() % 10
data = AppendArray(data, nargs)
var args [][]byte
for j := 0; j < nargs; j++ {
arg := make([]byte, rand.Int()%100)
if _, err := rand.Read(arg); err != nil {
data = AppendBulk(data, arg)
args = append(args, arg)
plargs = append(plargs, args)
// break data into random number of chunks
chunkn := rand.Int() % 100
if chunkn == 0 {
chunkn = 1
if len(data) < chunkn {
var chunks [][]byte
var chunksz int
for i := 0; i < len(data); i += chunksz {
chunksz = rand.Int() % (len(data) / chunkn)
var chunk []byte
if i+chunksz < len(data) {
chunk = data[i : i+chunksz]
} else {
chunk = data[i:]
chunks = append(chunks, chunk)
// process chunks
var rbuf []byte
var fargs [][][]byte
for _, chunk := range chunks {
var data []byte
if len(rbuf) > 0 {
data = append(rbuf, chunk...)
} else {
data = chunk
for {
complete, args, _, leftover, err := ReadNextCommand(data, nil)
data = leftover
if err != nil {
if !complete {
fargs = append(fargs, args)
rbuf = append(rbuf[:0], data...)
// compare final args to original
if len(plargs) != len(fargs) {
t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs))
for i := 0; i < len(plargs); i++ {
if len(plargs[i]) != len(fargs[i]) {
t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i]))
for j := 0; j < len(plargs[i]); j++ {
if !bytes.Equal(plargs[i][j], plargs[i][j]) {
t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j]))
func TestAppendBulkFloat(t *testing.T) {
var b []byte
b = AppendString(b, "HELLO")
b = AppendBulkFloat(b, 9.123192839)
b = AppendString(b, "HELLO")
exp := "+HELLO\r\n$11\r\n9.123192839\r\n+HELLO\r\n"
if string(b) != exp {
t.Fatalf("expected '%s', got '%s'", exp, b)
func TestAppendBulkInt(t *testing.T) {
var b []byte
b = AppendString(b, "HELLO")
b = AppendBulkInt(b, -9182739137)
b = AppendString(b, "HELLO")
exp := "+HELLO\r\n$11\r\n-9182739137\r\n+HELLO\r\n"
if string(b) != exp {
t.Fatalf("expected '%s', got '%s'", exp, b)
func TestAppendBulkUint(t *testing.T) {
var b []byte
b = AppendString(b, "HELLO")
b = AppendBulkInt(b, 91827391370)
b = AppendString(b, "HELLO")
exp := "+HELLO\r\n$11\r\n91827391370\r\n+HELLO\r\n"
if string(b) != exp {
t.Fatalf("expected '%s', got '%s'", exp, b)