From 4276409b25d9f64ac18471b41552e0274a81c020 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Tue, 6 Sep 2016 22:56:44 -0700 Subject: [PATCH] memory optimizations --- README.md | 8 +-- example/clone.go | 3 +- redcon.go | 171 ++++++++++++++++++++++++++++++++--------------- redcon_test.go | 25 ++++++- 4 files changed, 145 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 1fc9728..464f832 100644 --- a/README.md +++ b/README.md @@ -147,8 +147,8 @@ $ GOMAXPROCS=1 go run example/clone.go ``` ``` redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512 -SET: 1320655.00 requests per second -GET: 1552354.25 requests per second +SET: 3119151.50 requests per second +GET: 4142502.25 requests per second ``` **Redcon**: Multi-threaded, no disk persistence. @@ -158,8 +158,8 @@ $ GOMAXPROCS=0 go run example/clone.go ``` ``` $ redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512 -SET: 2740477.00 requests per second -GET: 3210272.75 requests per second +SET: 3637686.25 requests per second +GET: 4249894.00 requests per second ``` *Running on a MacBook Pro 15" 2.8 GHz Intel Core i7 using Go 1.7* diff --git a/example/clone.go b/example/clone.go index 0c76182..60bf465 100644 --- a/example/clone.go +++ b/example/clone.go @@ -2,7 +2,6 @@ package main import ( "log" - "strings" "sync" "github.com/tidwall/redcon" @@ -17,7 +16,7 @@ func main() { err := redcon.ListenAndServe(addr, func(conn redcon.Conn, commands [][]string) { for _, args := range commands { - switch strings.ToLower(args[0]) { + switch args[0] { default: conn.WriteError("ERR unknown command '" + args[0] + "'") case "ping": diff --git a/redcon.go b/redcon.go index 3961c65..fdcc306 100644 --- a/redcon.go +++ b/redcon.go @@ -49,7 +49,10 @@ var ( errInvalidMultiBulkLength = &errProtocol{"invalid multibulk length"} ) -const defaultBufLen = 1024 * 64 +const ( + defaultBufLen = 2 * 1024 + defaultPoolSize = 3 +) type errProtocol struct { msg string @@ -69,6 +72,8 @@ type Server struct { ln *net.TCPListener done bool conns map[*conn]bool + rdpool [][]byte + wrpool [][]byte } // NewServer returns a new server @@ -155,13 +160,24 @@ func (s *Server) ListenServeAndSignal(signal chan error) error { tcpc.RemoteAddr().String(), nil, } + s.mu.Lock() + if len(s.rdpool) > 0 { + c.rd.buf = s.rdpool[len(s.rdpool)-1] + s.rdpool = s.rdpool[:len(s.rdpool)-1] + } + if len(s.wrpool) > 0 { + c.wr.b = s.wrpool[len(s.wrpool)-1] + s.wrpool = s.wrpool[:len(s.wrpool)-1] + } + s.conns[c] = true + s.mu.Unlock() if accept != nil && !accept(c) { + s.mu.Lock() + delete(s.conns, c) + s.mu.Unlock() c.Close() continue } - s.mu.Lock() - s.conns[c] = true - s.mu.Unlock() go handle(s, c, handler, closed) } } @@ -191,6 +207,12 @@ func handle( } closed(c, err) } + if len(s.rdpool) < defaultPoolSize { + s.rdpool = append(s.rdpool, c.rd.buf) + } + if len(s.wrpool) < defaultPoolSize { + s.wrpool = append(s.wrpool, c.wr.b) + } }() }() err = func() error { @@ -271,9 +293,10 @@ func (c *conn) SetReadBuffer(bytes int) { // Reader represents a RESP command reader. type reader struct { r io.Reader // base reader - b []byte // unprocessed bytes - a []byte // static read buffer - buflen int // buffer len + buf []byte + start int + end int + buflen int } // NewReader returns a RESP command reader. @@ -283,23 +306,29 @@ func newReader(r io.Reader) *reader { buflen: defaultBufLen, } } +func (rd *reader) reassign(r io.Reader) { + rd.r = r + rd.start = 0 + rd.end = 0 +} // ReadCommands reads one or more commands from the reader. func (r *reader) ReadCommands() ([][]string, error) { - if len(r.b) > 0 { + if r.end-r.start > 0 { + b := r.buf[r.start:r.end] // we have some potential commands. var cmds [][]string next: - switch r.b[0] { + switch b[0] { default: // just a plain text command - for i := 0; i < len(r.b); i++ { - if r.b[i] == '\n' { + for i := 0; i < len(b); i++ { + if b[i] == '\n' { var line []byte - if i > 0 && r.b[i-1] == '\r' { - line = r.b[:i-1] + if i > 0 && b[i-1] == '\r' { + line = b[:i-1] } else { - line = r.b[:i] + line = b[:i] } var args []string var quote bool @@ -362,8 +391,8 @@ func (r *reader) ReadCommands() ([][]string, error) { if len(args) > 0 { cmds = append(cmds, args) } - r.b = r.b[i+1:] - if len(r.b) > 0 { + b = b[i+1:] + if len(b) > 0 { goto next } else { goto done @@ -374,47 +403,54 @@ func (r *reader) ReadCommands() ([][]string, error) { // resp formatted command var si int outer2: - for i := 0; i < len(r.b); i++ { + for i := 0; i < len(b); i++ { var args []string - if r.b[i] == '\n' { - if r.b[i-1] != '\r' { + if b[i] == '\n' { + if b[i-1] != '\r' { return nil, errInvalidMultiBulkLength } - ni, err := strconv.ParseInt(string(r.b[si+1:i-1]), 10, 64) + ni, err := parseInt(b[si+1 : i-1]) if err != nil || ni <= 0 { return nil, errInvalidMultiBulkLength } - args = make([]string, 0, int(ni)) - for j := 0; j < int(ni); j++ { + args = make([]string, 0, ni) + for j := 0; j < ni; j++ { // read bulk length i++ - if i < len(r.b) { - if r.b[i] != '$' { + if i < len(b) { + if b[i] != '$' { return nil, &errProtocol{"expected '$', got '" + - string(r.b[i]) + "'"} + string(b[i]) + "'"} } si = i - for ; i < len(r.b); i++ { - if r.b[i] == '\n' { - if r.b[i-1] != '\r' { + for ; i < len(b); i++ { + if b[i] == '\n' { + if b[i-1] != '\r' { return nil, errInvalidBulkLength } - s := string(r.b[si+1 : i-1]) - ni2, err := strconv.ParseInt(s, 10, 64) + ni2, err := parseInt(b[si+1 : i-1]) if err != nil || ni2 < 0 { return nil, errInvalidBulkLength } - if i+int(ni2)+2 >= len(r.b) { + if i+ni2+2 >= len(b) { // not ready break outer2 } - if r.b[i+int(ni2)+2] != '\n' || - r.b[i+int(ni2)+1] != '\r' { + if b[i+ni2+2] != '\n' || + b[i+ni2+1] != '\r' { return nil, errInvalidBulkLength } - arg := string(r.b[i+1 : i+1+int(ni2)]) - i += int(ni2) + 2 - args = append(args, arg) + i++ + arg := b[i : i+ni2] + if len(args) == 0 { + for j := 0; j < len(arg); j++ { + if arg[j] >= 'A' && arg[j] <= 'Z' { + arg[j] += 32 + } + } + } + i += ni2 + 1 + args = append(args, string(arg)) break } } @@ -422,8 +458,8 @@ func (r *reader) ReadCommands() ([][]string, error) { } if len(args) == cap(args) { cmds = append(cmds, args) - r.b = r.b[i+1:] - if len(r.b) > 0 { + b = b[i+1:] + if len(b) > 0 { goto next } else { goto done @@ -433,34 +469,57 @@ func (r *reader) ReadCommands() ([][]string, error) { } } done: - if len(r.b) == 0 { - r.b = nil + if len(b) == 0 { + r.start = 0 + r.end = 0 + } else { + r.start = r.end - len(b) } if len(cmds) > 0 { return cmds, nil } } - if len(r.a) == 0 { - r.a = make([]byte, r.buflen) + if r.end == len(r.buf) { + if len(r.buf) == 0 { + r.buf = make([]byte, r.buflen) + } else { + nbuf := make([]byte, len(r.buf)*2) + copy(nbuf, r.buf) + r.buf = nbuf + } } - n, err := r.r.Read(r.a) + n, err := r.r.Read(r.buf[r.end:]) if err != nil { if err == io.EOF { - if len(r.b) > 0 { + if r.end > 0 { return nil, io.ErrUnexpectedEOF } } return nil, err } - if len(r.b) == 0 { - r.b = r.a[:n] - } else { - r.b = append(r.b, r.a[:n]...) - } - r.a = r.a[n:] - + r.end += n return r.ReadCommands() } +func parseInt(b []byte) (int, error) { + switch len(b) { + case 1: + if b[0] >= '0' && b[0] <= '9' { + return int(b[0] - '0'), nil + } + case 2: + if b[0] >= '0' && b[0] <= '9' && b[1] >= '0' && b[1] <= '9' { + return int(b[0]-'0')*10 + int(b[1]-'0'), nil + } + } + var n int + for i := 0; i < len(b); i++ { + if b[i] < '0' || b[i] > '9' { + return 0, errors.New("invalid number") + } + n = n*10 + int(b[i]-'0') + } + return n, nil +} var errClosed = errors.New("closed") @@ -544,9 +603,13 @@ func (w *writer) WriteString(msg string) error { if w.err != nil { return w.err } - w.b = append(w.b, '+') - w.b = append(w.b, []byte(msg)...) - w.b = append(w.b, '\r', '\n') + if msg == "OK" { + w.b = append(w.b, '+', 'O', 'K', '\r', '\n') + } else { + w.b = append(w.b, '+') + w.b = append(w.b, []byte(msg)...) + w.b = append(w.b, '\r', '\n') + } return nil } diff --git a/redcon_test.go b/redcon_test.go index 6f87153..73c2b4a 100644 --- a/redcon_test.go +++ b/redcon_test.go @@ -166,9 +166,30 @@ func TestRandomCommands(t *testing.T) { t.Fatalf("len not equal for index %d -- %d != %d", idx, len(cmd), len(gcmds[idx])) } for i := 0; i < len(cmd); i++ { - if cmd[i] != gcmds[idx][i] { - t.Fatalf("not equal for index %d/%d", idx, i) + if i == 0 { + if len(cmd[i]) == len(gcmds[idx][i]) { + ok := true + for j := 0; j < len(cmd[i]); j++ { + c1, c2 := cmd[i][j], gcmds[idx][i][j] + if c1 >= 'A' && c1 <= 'Z' { + c1 += 32 + } + if c2 >= 'A' && c2 <= 'Z' { + c2 += 32 + } + if c1 != c2 { + ok = false + break + } + } + if ok { + continue + } + } + } else if cmd[i] == gcmds[idx][i] { + continue } + t.Fatalf("not equal for index %d/%d", idx, i) } idx++ cnt++