commit ffcf13732b36d0952cf0ec2127c98f5760731e0b Author: Josh Baker Date: Thu Jul 28 07:54:02 2016 -0700 first commit diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..58f5819 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2016 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a5b01a3 --- /dev/null +++ b/README.md @@ -0,0 +1,110 @@ +Redcon [GoDoc](https://godoc.org/github.com/tidwall/redcon?status.svg)](https://godoc.org/github.com/tidwall/redcon) +====== +Super fast Redis RESP server implementation. +Supports pipelining and telnet commands. + +There is only one function `ListenAndServe`, and one type `Conn`. + +Here's a full example of a Redis clone that accepts: + +- SET key value +- GET key +- DEL key +- PING +- QUIT + +You can run this example from a terminal: + +```sh +go run examples/clone.go +``` + +```go +package main + +import ( + "log" + "strings" + "sync" + + "github.com/tidwall/redcon" +) + +var addr = ":6380" + +func main() { + var mu sync.RWMutex + var items = make(map[string]string) + go log.Printf("started server at %s", addr) + err := redcon.ListenAndServe(addr, + func(conn redcon.Conn, commands [][]string) { + for _, args := range commands { + switch strings.ToLower(args[0]) { + default: + conn.WriteError("ERR unknown command '" + args[0] + "'") + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + case "set": + if len(args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command") + continue + } + mu.Lock() + items[args[1]] = args[2] + mu.Unlock() + conn.WriteString("OK") + case "get": + if len(args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command") + continue + } + mu.RLock() + val, ok := items[args[1]] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteBulk(val) + } + case "del": + if len(args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command") + continue + } + mu.Lock() + _, ok := items[args[1]] + delete(items, args[1]) + mu.Unlock() + if !ok { + conn.WriteInt(0) + } else { + conn.WriteInt(1) + } + } + } + }, + func(conn redcon.Conn) bool { + log.Printf("accept: %s", conn.RemoteAddr()) + return true + }, + func(conn redcon.Conn, err error) { + log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) + }, + ) + if err != nil { + log.Fatal(err) + } +} +``` + +Contact +------- +Josh Baker [@tidwall](http://twitter.com/tidwall) + +License +------- +Redcon source code is available under the MIT [License](/LICENSE). + diff --git a/example/clone.go b/example/clone.go new file mode 100644 index 0000000..dcfe484 --- /dev/null +++ b/example/clone.go @@ -0,0 +1,78 @@ +package main + +import ( + "log" + "strings" + "sync" + + "github.com/tidwall/redcon" +) + +var addr = ":6380" + +func main() { + var mu sync.RWMutex + var items = make(map[string]string) + go log.Printf("started server at %s", addr) + err := redcon.ListenAndServe(addr, + func(conn redcon.Conn, commands [][]string) { + for _, args := range commands { + switch strings.ToLower(args[0]) { + default: + conn.WriteError("ERR unknown command '" + args[0] + "'") + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + case "set": + if len(args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command") + continue + } + mu.Lock() + items[args[1]] = args[2] + mu.Unlock() + conn.WriteString("OK") + case "get": + if len(args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command") + continue + } + mu.RLock() + val, ok := items[args[1]] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteBulk(val) + } + case "del": + if len(args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + args[0] + "' command") + continue + } + mu.Lock() + _, ok := items[args[1]] + delete(items, args[1]) + mu.Unlock() + if !ok { + conn.WriteInt(0) + } else { + conn.WriteInt(1) + } + } + } + }, + func(conn redcon.Conn) bool { + log.Printf("accept: %s", conn.RemoteAddr()) + return true + }, + func(conn redcon.Conn, err error) { + log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) + }, + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/redcon.go b/redcon.go new file mode 100644 index 0000000..18187ad --- /dev/null +++ b/redcon.go @@ -0,0 +1,424 @@ +package redcon + +import ( + "bytes" + "errors" + "io" + "net" + "strconv" + "sync" +) + +type Conn interface { + RemoteAddr() string + Close() error + WriteError(msg string) + WriteString(str string) + WriteBulk(bulk string) + WriteInt(num int) + WriteArray(count int) + WriteNull() +} + +var ( + errUnbalancedQuotes = &errProtocol{"unbalanced quotes in request"} + errInvalidBulkLength = &errProtocol{"invalid bulk length"} + errInvalidMultiBulkLength = &errProtocol{"invalid multibulk length"} +) + +type errProtocol struct { + msg string +} + +func (err *errProtocol) Error() string { + return "Protocol error: " + err.msg +} + +func ListenAndServe( + addr string, handler func(conn Conn, cmds [][]string), + accept func(conn Conn) bool, closed func(conn Conn, err error), +) error { + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + defer ln.Close() + var mu sync.Mutex + for { + conn, err := ln.Accept() + if err != nil { + return err + } + wr := newWriter(conn) + wrc := &connWriter{wr, conn.RemoteAddr().String()} + if accept != nil && !accept(wrc) { + conn.Close() + continue + } + go func() { + var err error + defer func() { + conn.Close() + if closed != nil { + mu.Lock() + defer mu.Unlock() + if err == io.EOF { + err = nil + } + closed(wrc, err) + } + }() + rd := newReader(conn) + err = func() error { + for { + cmds, err := rd.ReadCommands() + if err != nil { + if err, ok := err.(*errProtocol); ok { + // All protocol errors should attempt a response to + // the client. Ignore errors. + wr.WriteError("ERR " + err.Error()) + wr.Flush() + } + return err + } + if len(cmds) > 0 { + handler(wrc, cmds) + } + if wr.err != nil { + if wr.err == errClosed { + return nil + } + return wr.err + } + if err := wr.Flush(); err != nil { + return err + } + } + }() + }() + } +} + +type connWriter struct { + wr *respWriter + addr string +} + +func (wrc *connWriter) Close() error { + return wrc.wr.Close() +} +func (wrc *connWriter) WriteString(str string) { + wrc.wr.WriteString(str) +} +func (wrc *connWriter) WriteBulk(bulk string) { + wrc.wr.WriteBulk(bulk) +} +func (wrc *connWriter) WriteInt(num int) { + wrc.wr.WriteInt(num) +} +func (wrc *connWriter) WriteError(msg string) { + wrc.wr.WriteError(msg) +} +func (wrc *connWriter) WriteArray(count int) { + wrc.wr.WriteMultiBulkStart(count) +} +func (wrc *connWriter) WriteNull() { + wrc.wr.WriteNull() +} +func (wrc *connWriter) RemoteAddr() string { + return wrc.addr +} + +// Reader represents a RESP command reader. +type respReader struct { + r io.Reader // base reader + b []byte // unprocessed bytes + a []byte // static read buffer +} + +// NewReader returns a RESP command reader. +func newReader(r io.Reader) *respReader { + return &respReader{ + r: r, + a: make([]byte, 8192), + } +} + +// ReadCommands reads one or more commands from the reader. +func (r *respReader) ReadCommands() ([][]string, error) { + if len(r.b) > 0 { + // we have some potential commands. + var cmds [][]string + next: + switch r.b[0] { + default: + // just a plain text command + for i := 0; i < len(r.b); i++ { + if r.b[i] == '\n' { + var line []byte + if i > 0 && r.b[i-1] == '\r' { + line = r.b[:i-1] + } else { + line = r.b[:i] + } + var args []string + var quote bool + var escape bool + outer: + for { + nline := make([]byte, 0, len(line)) + for i := 0; i < len(line); i++ { + c := line[i] + if !quote { + if c == ' ' { + if len(nline) > 0 { + args = append(args, string(nline)) + } + line = line[i+1:] + continue outer + } + if c == '"' { + if i != 0 { + return nil, errUnbalancedQuotes + } + quote = true + line = line[i+1:] + continue outer + } + } else { + if escape { + escape = false + switch c { + case 'n': + c = '\n' + case 'r': + c = '\r' + case 't': + c = '\t' + } + } else if c == '"' { + quote = false + args = append(args, string(nline)) + line = line[i+1:] + if len(line) > 0 && line[0] != ' ' { + return nil, errUnbalancedQuotes + } + continue outer + } else if c == '\\' { + escape = true + continue + } + } + nline = append(nline, c) + } + if quote { + return nil, errUnbalancedQuotes + } + if len(line) > 0 { + args = append(args, string(line)) + } + break + } + if len(args) > 0 { + cmds = append(cmds, args) + } + r.b = r.b[i+1:] + if len(r.b) > 0 { + goto next + } else { + goto done + } + } + } + case '*': + // resp formatted command + var si int + outer2: + for i := 0; i < len(r.b); i++ { + var args []string + if r.b[i] == '\n' { + if r.b[i-1] != '\r' { + return nil, errInvalidMultiBulkLength + } + ni, err := strconv.ParseInt(string(r.b[si+1:i-1]), 10, 64) + if err != nil || ni <= 0 { + return nil, errInvalidMultiBulkLength + } + args = make([]string, 0, int(ni)) + for j := 0; j < int(ni); j++ { + // read bulk length + i++ + if i < len(r.b) { + if r.b[i] != '$' { + return nil, &errProtocol{"expected '$', got '" + + string(r.b[i]) + "'"} + } + si = i + for ; i < len(r.b); i++ { + if r.b[i] == '\n' { + if r.b[i-1] != '\r' { + return nil, errInvalidBulkLength + } + s := string(r.b[si+1 : i-1]) + ni2, err := strconv.ParseInt(s, 10, 64) + if err != nil || ni2 < 0 { + return nil, errInvalidBulkLength + } + if i+int(ni2)+2 >= len(r.b) { + // not ready + break outer2 + } + if r.b[i+int(ni2)+2] != '\n' || + r.b[i+int(ni2)+1] != '\r' { + return nil, errInvalidBulkLength + } + arg := string(r.b[i+1 : i+1+int(ni2)]) + i += int(ni2) + 2 + args = append(args, arg) + break + } + } + } + } + if len(args) == cap(args) { + cmds = append(cmds, args) + r.b = r.b[i+1:] + if len(r.b) > 0 { + goto next + } else { + goto done + } + } + } + } + } + done: + if len(r.b) == 0 { + r.b = nil + } + if len(cmds) > 0 { + return cmds, nil + } + } + n, err := r.r.Read(r.a[:]) + if err != nil { + if err == io.EOF { + if len(r.b) > 0 { + return nil, io.ErrUnexpectedEOF + } + } + return nil, err + } + r.b = append(r.b, r.a[:n]...) + return r.ReadCommands() +} + +var errClosed = errors.New("closed") + +type respWriter struct { + w io.Writer + b *bytes.Buffer + err error +} + +func newWriter(w io.Writer) *respWriter { + return &respWriter{w: w, b: &bytes.Buffer{}} +} + +func (w *respWriter) WriteNull() error { + if w.err != nil { + return w.err + } + w.b.WriteString("$-1\r\n") + return nil +} +func (w *respWriter) WriteMultiBulkStart(count int) error { + if w.err != nil { + return w.err + } + w.b.WriteByte('*') + w.b.WriteString(strconv.FormatInt(int64(count), 10)) + w.b.WriteString("\r\n") + return nil +} + +func (w *respWriter) WriteBulk(bulk string) error { + if w.err != nil { + return w.err + } + w.b.WriteByte('$') + w.b.WriteString(strconv.FormatInt(int64(len(bulk)), 10)) + w.b.WriteString("\r\n") + w.b.WriteString(bulk) + w.b.WriteString("\r\n") + return nil +} + +func (w *respWriter) Flush() error { + if w.err != nil { + return w.err + } + if w.b.Len() == 0 { + return nil + } + if _, err := w.b.WriteTo(w.w); err != nil { + w.err = err + return err + } + w.b.Reset() + return nil +} + +func (w *respWriter) WriteMultiBulk(bulks []string) error { + if err := w.WriteMultiBulkStart(len(bulks)); err != nil { + return err + } + for _, bulk := range bulks { + if err := w.WriteBulk(bulk); err != nil { + return err + } + } + return nil +} + +func (w *respWriter) WriteError(msg string) error { + if w.err != nil { + return w.err + } + w.b.WriteByte('-') + w.b.WriteString(msg) + w.b.WriteString("\r\n") + return nil +} + +func (w *respWriter) WriteString(msg string) error { + if w.err != nil { + return w.err + } + w.b.WriteByte('+') + w.b.WriteString(msg) + w.b.WriteString("\r\n") + return nil +} + +func (w *respWriter) WriteInt(num int) error { + if w.err != nil { + return w.err + } + w.b.WriteByte(':') + w.b.WriteString(strconv.FormatInt(int64(num), 10)) + w.b.WriteString("\r\n") + return nil +} + +func (w *respWriter) Close() error { + if w.err != nil { + return w.err + } + if err := w.Flush(); err != nil { + w.err = err + return err + } + w.err = errClosed + return nil +} diff --git a/redcon_test.go b/redcon_test.go new file mode 100644 index 0000000..8f88239 --- /dev/null +++ b/redcon_test.go @@ -0,0 +1,209 @@ +package redcon + +import ( + "fmt" + "io" + "log" + "math/rand" + "testing" + "time" +) + +// TestRandomCommands fills a bunch of random commands and test various +// ways that the reader may receive data. +func TestRandomCommands(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + // build random commands. + gcmds := make([][]string, 10000) + for i := 0; i < len(gcmds); i++ { + args := make([]string, (rand.Int()%50)+1) // 1-50 args + for j := 0; j < len(args); j++ { + n := rand.Int() % 10 + if j == 0 { + n++ + } + arg := make([]byte, n) + for k := 0; k < len(arg); k++ { + arg[k] = byte(rand.Int() % 0xFF) + } + args[j] = string(arg) + } + gcmds[i] = args + } + // create a list of a buffers + var bufs []string + + // pipe valid RESP commands + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + msg := fmt.Sprintf("*%d\r\n", len(args)) + for j := 0; j < len(args); j++ { + msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j]) + } + bufs = append(bufs, msg) + } + bufs = append(bufs, "RESET THE INDEX\r\n") + + // pipe valid plain commands + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + var msg string + for j := 0; j < len(args); j++ { + quotes := false + var narg []byte + arg := args[j] + if len(arg) == 0 { + quotes = true + } + for k := 0; k < len(arg); k++ { + switch arg[k] { + default: + narg = append(narg, arg[k]) + case ' ': + quotes = true + narg = append(narg, arg[k]) + case '\\', '"', '*': + quotes = true + narg = append(narg, '\\', arg[k]) + case '\r': + quotes = true + narg = append(narg, '\\', 'r') + case '\n': + quotes = true + narg = append(narg, '\\', 'n') + } + } + msg += " " + if quotes { + msg += "\"" + } + msg += string(narg) + if quotes { + msg += "\"" + } + } + if msg != "" { + msg = msg[1:] + } + msg += "\r\n" + bufs = append(bufs, msg) + } + bufs = append(bufs, "RESET THE INDEX\r\n") + + // pipe valid RESP commands in broken chunks + lmsg := "" + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + msg := fmt.Sprintf("*%d\r\n", len(args)) + for j := 0; j < len(args); j++ { + msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j]) + } + msg = lmsg + msg + if len(msg) > 0 { + lmsg = msg[len(msg)/2:] + msg = msg[:len(msg)/2] + } + bufs = append(bufs, msg) + } + bufs = append(bufs, lmsg) + bufs = append(bufs, "RESET THE INDEX\r\n") + + // pipe valid RESP commands in large broken chunks + lmsg = "" + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + msg := fmt.Sprintf("*%d\r\n", len(args)) + for j := 0; j < len(args); j++ { + msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j]) + } + if len(lmsg) < 1500 { + lmsg += msg + continue + } + msg = lmsg + msg + if len(msg) > 0 { + lmsg = msg[len(msg)/2:] + msg = msg[:len(msg)/2] + } + bufs = append(bufs, msg) + } + bufs = append(bufs, lmsg) + bufs = append(bufs, "RESET THE INDEX\r\n") + + // Pipe the buffers in a background routine + rd, wr := io.Pipe() + go func() { + defer wr.Close() + for _, msg := range bufs { + io.WriteString(wr, msg) + } + }() + defer rd.Close() + cnt := 0 + idx := 0 + start := time.Now() + r := newReader(rd) + for { + cmds, err := r.ReadCommands() + if err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + for _, cmd := range cmds { + if len(cmd) == 3 && cmd[0] == "RESET" && cmd[1] == "THE" && cmd[2] == "INDEX" { + if idx != len(gcmds) { + t.Fatalf("did not process all commands") + } + idx = 0 + break + } + if len(cmd) != len(gcmds[idx]) { + t.Fatalf("len not equal for index %d -- %d != %d", idx, len(cmd), len(gcmds[idx])) + } + for i := 0; i < len(cmd); i++ { + if cmd[i] != gcmds[idx][i] { + t.Fatalf("not equal for index %d/%d", idx, i) + } + } + idx++ + cnt++ + } + } + if false { + dur := time.Now().Sub(start) + fmt.Printf("%d commands in %s - %.0f ops/sec\n", cnt, dur, float64(cnt)/(float64(dur)/float64(time.Second))) + } +} + +/* +func TestServer(t *testing.T) { + err := ListenAndServe(":11111", + func(conn Conn, cmds [][]string) { + for _, cmd := range cmds { + switch strings.ToLower(cmd[0]) { + default: + conn.WriteError("ERR unknown command '" + cmd[0] + "'") + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + } + } + }, + func(conn Conn) bool { + log.Printf("accept: %s", conn.RemoteAddr()) + return true + }, + func(conn Conn, err error) { + log.Printf("closed: %s [%v]", conn.RemoteAddr(), err) + }, + ) + if err != nil { + log.Fatal(err) + } +} +*/