first commit

This commit is contained in:
Josh Baker 2016-07-28 07:54:02 -07:00
commit ffcf13732b
5 changed files with 841 additions and 0 deletions

20
LICENSE Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2016 Josh Baker
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

110
README.md Normal file
View File

@ -0,0 +1,110 @@
Redcon [GoDoc](https://godoc.org/github.com/tidwall/redcon?status.svg)](https://godoc.org/github.com/tidwall/redcon)
======
Super fast Redis RESP server implementation.
Supports pipelining and telnet commands.
There is only one function `ListenAndServe`, and one type `Conn`.
Here's a full example of a Redis clone that accepts:
- SET key value
- GET key
- DEL key
- PING
- QUIT
You can run this example from a terminal:
```sh
go run examples/clone.go
```
```go
package main
import (
"log"
"strings"
"sync"
"github.com/tidwall/redcon"
)
var addr = ":6380"
func main() {
var mu sync.RWMutex
var items = make(map[string]string)
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, commands [][]string) {
for _, args := range commands {
switch strings.ToLower(args[0]) {
default:
conn.WriteError("ERR unknown command '" + args[0] + "'")
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command")
continue
}
mu.Lock()
items[args[1]] = args[2]
mu.Unlock()
conn.WriteString("OK")
case "get":
if len(args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command")
continue
}
mu.RLock()
val, ok := items[args[1]]
mu.RUnlock()
if !ok {
conn.WriteNull()
} else {
conn.WriteBulk(val)
}
case "del":
if len(args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command")
continue
}
mu.Lock()
_, ok := items[args[1]]
delete(items, args[1])
mu.Unlock()
if !ok {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
}
}
},
func(conn redcon.Conn) bool {
log.Printf("accept: %s", conn.RemoteAddr())
return true
},
func(conn redcon.Conn, err error) {
log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}
```
Contact
-------
Josh Baker [@tidwall](http://twitter.com/tidwall)
License
-------
Redcon source code is available under the MIT [License](/LICENSE).

78
example/clone.go Normal file
View File

@ -0,0 +1,78 @@
package main
import (
"log"
"strings"
"sync"
"github.com/tidwall/redcon"
)
var addr = ":6380"
func main() {
var mu sync.RWMutex
var items = make(map[string]string)
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, commands [][]string) {
for _, args := range commands {
switch strings.ToLower(args[0]) {
default:
conn.WriteError("ERR unknown command '" + args[0] + "'")
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command")
continue
}
mu.Lock()
items[args[1]] = args[2]
mu.Unlock()
conn.WriteString("OK")
case "get":
if len(args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command")
continue
}
mu.RLock()
val, ok := items[args[1]]
mu.RUnlock()
if !ok {
conn.WriteNull()
} else {
conn.WriteBulk(val)
}
case "del":
if len(args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command")
continue
}
mu.Lock()
_, ok := items[args[1]]
delete(items, args[1])
mu.Unlock()
if !ok {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
}
}
},
func(conn redcon.Conn) bool {
log.Printf("accept: %s", conn.RemoteAddr())
return true
},
func(conn redcon.Conn, err error) {
log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}

424
redcon.go Normal file
View File

@ -0,0 +1,424 @@
package redcon
import (
"bytes"
"errors"
"io"
"net"
"strconv"
"sync"
)
type Conn interface {
RemoteAddr() string
Close() error
WriteError(msg string)
WriteString(str string)
WriteBulk(bulk string)
WriteInt(num int)
WriteArray(count int)
WriteNull()
}
var (
errUnbalancedQuotes = &errProtocol{"unbalanced quotes in request"}
errInvalidBulkLength = &errProtocol{"invalid bulk length"}
errInvalidMultiBulkLength = &errProtocol{"invalid multibulk length"}
)
type errProtocol struct {
msg string
}
func (err *errProtocol) Error() string {
return "Protocol error: " + err.msg
}
func ListenAndServe(
addr string, handler func(conn Conn, cmds [][]string),
accept func(conn Conn) bool, closed func(conn Conn, err error),
) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
defer ln.Close()
var mu sync.Mutex
for {
conn, err := ln.Accept()
if err != nil {
return err
}
wr := newWriter(conn)
wrc := &connWriter{wr, conn.RemoteAddr().String()}
if accept != nil && !accept(wrc) {
conn.Close()
continue
}
go func() {
var err error
defer func() {
conn.Close()
if closed != nil {
mu.Lock()
defer mu.Unlock()
if err == io.EOF {
err = nil
}
closed(wrc, err)
}
}()
rd := newReader(conn)
err = func() error {
for {
cmds, err := rd.ReadCommands()
if err != nil {
if err, ok := err.(*errProtocol); ok {
// All protocol errors should attempt a response to
// the client. Ignore errors.
wr.WriteError("ERR " + err.Error())
wr.Flush()
}
return err
}
if len(cmds) > 0 {
handler(wrc, cmds)
}
if wr.err != nil {
if wr.err == errClosed {
return nil
}
return wr.err
}
if err := wr.Flush(); err != nil {
return err
}
}
}()
}()
}
}
type connWriter struct {
wr *respWriter
addr string
}
func (wrc *connWriter) Close() error {
return wrc.wr.Close()
}
func (wrc *connWriter) WriteString(str string) {
wrc.wr.WriteString(str)
}
func (wrc *connWriter) WriteBulk(bulk string) {
wrc.wr.WriteBulk(bulk)
}
func (wrc *connWriter) WriteInt(num int) {
wrc.wr.WriteInt(num)
}
func (wrc *connWriter) WriteError(msg string) {
wrc.wr.WriteError(msg)
}
func (wrc *connWriter) WriteArray(count int) {
wrc.wr.WriteMultiBulkStart(count)
}
func (wrc *connWriter) WriteNull() {
wrc.wr.WriteNull()
}
func (wrc *connWriter) RemoteAddr() string {
return wrc.addr
}
// Reader represents a RESP command reader.
type respReader struct {
r io.Reader // base reader
b []byte // unprocessed bytes
a []byte // static read buffer
}
// NewReader returns a RESP command reader.
func newReader(r io.Reader) *respReader {
return &respReader{
r: r,
a: make([]byte, 8192),
}
}
// ReadCommands reads one or more commands from the reader.
func (r *respReader) ReadCommands() ([][]string, error) {
if len(r.b) > 0 {
// we have some potential commands.
var cmds [][]string
next:
switch r.b[0] {
default:
// just a plain text command
for i := 0; i < len(r.b); i++ {
if r.b[i] == '\n' {
var line []byte
if i > 0 && r.b[i-1] == '\r' {
line = r.b[:i-1]
} else {
line = r.b[:i]
}
var args []string
var quote bool
var escape bool
outer:
for {
nline := make([]byte, 0, len(line))
for i := 0; i < len(line); i++ {
c := line[i]
if !quote {
if c == ' ' {
if len(nline) > 0 {
args = append(args, string(nline))
}
line = line[i+1:]
continue outer
}
if c == '"' {
if i != 0 {
return nil, errUnbalancedQuotes
}
quote = true
line = line[i+1:]
continue outer
}
} else {
if escape {
escape = false
switch c {
case 'n':
c = '\n'
case 'r':
c = '\r'
case 't':
c = '\t'
}
} else if c == '"' {
quote = false
args = append(args, string(nline))
line = line[i+1:]
if len(line) > 0 && line[0] != ' ' {
return nil, errUnbalancedQuotes
}
continue outer
} else if c == '\\' {
escape = true
continue
}
}
nline = append(nline, c)
}
if quote {
return nil, errUnbalancedQuotes
}
if len(line) > 0 {
args = append(args, string(line))
}
break
}
if len(args) > 0 {
cmds = append(cmds, args)
}
r.b = r.b[i+1:]
if len(r.b) > 0 {
goto next
} else {
goto done
}
}
}
case '*':
// resp formatted command
var si int
outer2:
for i := 0; i < len(r.b); i++ {
var args []string
if r.b[i] == '\n' {
if r.b[i-1] != '\r' {
return nil, errInvalidMultiBulkLength
}
ni, err := strconv.ParseInt(string(r.b[si+1:i-1]), 10, 64)
if err != nil || ni <= 0 {
return nil, errInvalidMultiBulkLength
}
args = make([]string, 0, int(ni))
for j := 0; j < int(ni); j++ {
// read bulk length
i++
if i < len(r.b) {
if r.b[i] != '$' {
return nil, &errProtocol{"expected '$', got '" +
string(r.b[i]) + "'"}
}
si = i
for ; i < len(r.b); i++ {
if r.b[i] == '\n' {
if r.b[i-1] != '\r' {
return nil, errInvalidBulkLength
}
s := string(r.b[si+1 : i-1])
ni2, err := strconv.ParseInt(s, 10, 64)
if err != nil || ni2 < 0 {
return nil, errInvalidBulkLength
}
if i+int(ni2)+2 >= len(r.b) {
// not ready
break outer2
}
if r.b[i+int(ni2)+2] != '\n' ||
r.b[i+int(ni2)+1] != '\r' {
return nil, errInvalidBulkLength
}
arg := string(r.b[i+1 : i+1+int(ni2)])
i += int(ni2) + 2
args = append(args, arg)
break
}
}
}
}
if len(args) == cap(args) {
cmds = append(cmds, args)
r.b = r.b[i+1:]
if len(r.b) > 0 {
goto next
} else {
goto done
}
}
}
}
}
done:
if len(r.b) == 0 {
r.b = nil
}
if len(cmds) > 0 {
return cmds, nil
}
}
n, err := r.r.Read(r.a[:])
if err != nil {
if err == io.EOF {
if len(r.b) > 0 {
return nil, io.ErrUnexpectedEOF
}
}
return nil, err
}
r.b = append(r.b, r.a[:n]...)
return r.ReadCommands()
}
var errClosed = errors.New("closed")
type respWriter struct {
w io.Writer
b *bytes.Buffer
err error
}
func newWriter(w io.Writer) *respWriter {
return &respWriter{w: w, b: &bytes.Buffer{}}
}
func (w *respWriter) WriteNull() error {
if w.err != nil {
return w.err
}
w.b.WriteString("$-1\r\n")
return nil
}
func (w *respWriter) WriteMultiBulkStart(count int) error {
if w.err != nil {
return w.err
}
w.b.WriteByte('*')
w.b.WriteString(strconv.FormatInt(int64(count), 10))
w.b.WriteString("\r\n")
return nil
}
func (w *respWriter) WriteBulk(bulk string) error {
if w.err != nil {
return w.err
}
w.b.WriteByte('$')
w.b.WriteString(strconv.FormatInt(int64(len(bulk)), 10))
w.b.WriteString("\r\n")
w.b.WriteString(bulk)
w.b.WriteString("\r\n")
return nil
}
func (w *respWriter) Flush() error {
if w.err != nil {
return w.err
}
if w.b.Len() == 0 {
return nil
}
if _, err := w.b.WriteTo(w.w); err != nil {
w.err = err
return err
}
w.b.Reset()
return nil
}
func (w *respWriter) WriteMultiBulk(bulks []string) error {
if err := w.WriteMultiBulkStart(len(bulks)); err != nil {
return err
}
for _, bulk := range bulks {
if err := w.WriteBulk(bulk); err != nil {
return err
}
}
return nil
}
func (w *respWriter) WriteError(msg string) error {
if w.err != nil {
return w.err
}
w.b.WriteByte('-')
w.b.WriteString(msg)
w.b.WriteString("\r\n")
return nil
}
func (w *respWriter) WriteString(msg string) error {
if w.err != nil {
return w.err
}
w.b.WriteByte('+')
w.b.WriteString(msg)
w.b.WriteString("\r\n")
return nil
}
func (w *respWriter) WriteInt(num int) error {
if w.err != nil {
return w.err
}
w.b.WriteByte(':')
w.b.WriteString(strconv.FormatInt(int64(num), 10))
w.b.WriteString("\r\n")
return nil
}
func (w *respWriter) Close() error {
if w.err != nil {
return w.err
}
if err := w.Flush(); err != nil {
w.err = err
return err
}
w.err = errClosed
return nil
}

209
redcon_test.go Normal file
View File

@ -0,0 +1,209 @@
package redcon
import (
"fmt"
"io"
"log"
"math/rand"
"testing"
"time"
)
// TestRandomCommands fills a bunch of random commands and test various
// ways that the reader may receive data.
func TestRandomCommands(t *testing.T) {
rand.Seed(time.Now().UnixNano())
// build random commands.
gcmds := make([][]string, 10000)
for i := 0; i < len(gcmds); i++ {
args := make([]string, (rand.Int()%50)+1) // 1-50 args
for j := 0; j < len(args); j++ {
n := rand.Int() % 10
if j == 0 {
n++
}
arg := make([]byte, n)
for k := 0; k < len(arg); k++ {
arg[k] = byte(rand.Int() % 0xFF)
}
args[j] = string(arg)
}
gcmds[i] = args
}
// create a list of a buffers
var bufs []string
// pipe valid RESP commands
for i := 0; i < len(gcmds); i++ {
args := gcmds[i]
msg := fmt.Sprintf("*%d\r\n", len(args))
for j := 0; j < len(args); j++ {
msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j])
}
bufs = append(bufs, msg)
}
bufs = append(bufs, "RESET THE INDEX\r\n")
// pipe valid plain commands
for i := 0; i < len(gcmds); i++ {
args := gcmds[i]
var msg string
for j := 0; j < len(args); j++ {
quotes := false
var narg []byte
arg := args[j]
if len(arg) == 0 {
quotes = true
}
for k := 0; k < len(arg); k++ {
switch arg[k] {
default:
narg = append(narg, arg[k])
case ' ':
quotes = true
narg = append(narg, arg[k])
case '\\', '"', '*':
quotes = true
narg = append(narg, '\\', arg[k])
case '\r':
quotes = true
narg = append(narg, '\\', 'r')
case '\n':
quotes = true
narg = append(narg, '\\', 'n')
}
}
msg += " "
if quotes {
msg += "\""
}
msg += string(narg)
if quotes {
msg += "\""
}
}
if msg != "" {
msg = msg[1:]
}
msg += "\r\n"
bufs = append(bufs, msg)
}
bufs = append(bufs, "RESET THE INDEX\r\n")
// pipe valid RESP commands in broken chunks
lmsg := ""
for i := 0; i < len(gcmds); i++ {
args := gcmds[i]
msg := fmt.Sprintf("*%d\r\n", len(args))
for j := 0; j < len(args); j++ {
msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j])
}
msg = lmsg + msg
if len(msg) > 0 {
lmsg = msg[len(msg)/2:]
msg = msg[:len(msg)/2]
}
bufs = append(bufs, msg)
}
bufs = append(bufs, lmsg)
bufs = append(bufs, "RESET THE INDEX\r\n")
// pipe valid RESP commands in large broken chunks
lmsg = ""
for i := 0; i < len(gcmds); i++ {
args := gcmds[i]
msg := fmt.Sprintf("*%d\r\n", len(args))
for j := 0; j < len(args); j++ {
msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j])
}
if len(lmsg) < 1500 {
lmsg += msg
continue
}
msg = lmsg + msg
if len(msg) > 0 {
lmsg = msg[len(msg)/2:]
msg = msg[:len(msg)/2]
}
bufs = append(bufs, msg)
}
bufs = append(bufs, lmsg)
bufs = append(bufs, "RESET THE INDEX\r\n")
// Pipe the buffers in a background routine
rd, wr := io.Pipe()
go func() {
defer wr.Close()
for _, msg := range bufs {
io.WriteString(wr, msg)
}
}()
defer rd.Close()
cnt := 0
idx := 0
start := time.Now()
r := newReader(rd)
for {
cmds, err := r.ReadCommands()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
for _, cmd := range cmds {
if len(cmd) == 3 && cmd[0] == "RESET" && cmd[1] == "THE" && cmd[2] == "INDEX" {
if idx != len(gcmds) {
t.Fatalf("did not process all commands")
}
idx = 0
break
}
if len(cmd) != len(gcmds[idx]) {
t.Fatalf("len not equal for index %d -- %d != %d", idx, len(cmd), len(gcmds[idx]))
}
for i := 0; i < len(cmd); i++ {
if cmd[i] != gcmds[idx][i] {
t.Fatalf("not equal for index %d/%d", idx, i)
}
}
idx++
cnt++
}
}
if false {
dur := time.Now().Sub(start)
fmt.Printf("%d commands in %s - %.0f ops/sec\n", cnt, dur, float64(cnt)/(float64(dur)/float64(time.Second)))
}
}
/*
func TestServer(t *testing.T) {
err := ListenAndServe(":11111",
func(conn Conn, cmds [][]string) {
for _, cmd := range cmds {
switch strings.ToLower(cmd[0]) {
default:
conn.WriteError("ERR unknown command '" + cmd[0] + "'")
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
}
}
},
func(conn Conn) bool {
log.Printf("accept: %s", conn.RemoteAddr())
return true
},
func(conn Conn, err error) {
log.Printf("closed: %s [%v]", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}
*/