memory optimizations

This commit is contained in:
Josh Baker 2016-09-06 22:56:44 -07:00
parent 349b9006f1
commit 4276409b25
4 changed files with 145 additions and 62 deletions

View File

@ -147,8 +147,8 @@ $ GOMAXPROCS=1 go run example/clone.go
``` ```
``` ```
redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512 redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512
SET: 1320655.00 requests per second SET: 3119151.50 requests per second
GET: 1552354.25 requests per second GET: 4142502.25 requests per second
``` ```
**Redcon**: Multi-threaded, no disk persistence. **Redcon**: Multi-threaded, no disk persistence.
@ -158,8 +158,8 @@ $ GOMAXPROCS=0 go run example/clone.go
``` ```
``` ```
$ redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512 $ redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512
SET: 2740477.00 requests per second SET: 3637686.25 requests per second
GET: 3210272.75 requests per second GET: 4249894.00 requests per second
``` ```
*Running on a MacBook Pro 15" 2.8 GHz Intel Core i7 using Go 1.7* *Running on a MacBook Pro 15" 2.8 GHz Intel Core i7 using Go 1.7*

View File

@ -2,7 +2,6 @@ package main
import ( import (
"log" "log"
"strings"
"sync" "sync"
"github.com/tidwall/redcon" "github.com/tidwall/redcon"
@ -17,7 +16,7 @@ func main() {
err := redcon.ListenAndServe(addr, err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, commands [][]string) { func(conn redcon.Conn, commands [][]string) {
for _, args := range commands { for _, args := range commands {
switch strings.ToLower(args[0]) { switch args[0] {
default: default:
conn.WriteError("ERR unknown command '" + args[0] + "'") conn.WriteError("ERR unknown command '" + args[0] + "'")
case "ping": case "ping":

171
redcon.go
View File

@ -49,7 +49,10 @@ var (
errInvalidMultiBulkLength = &errProtocol{"invalid multibulk length"} errInvalidMultiBulkLength = &errProtocol{"invalid multibulk length"}
) )
const defaultBufLen = 1024 * 64 const (
defaultBufLen = 2 * 1024
defaultPoolSize = 3
)
type errProtocol struct { type errProtocol struct {
msg string msg string
@ -69,6 +72,8 @@ type Server struct {
ln *net.TCPListener ln *net.TCPListener
done bool done bool
conns map[*conn]bool conns map[*conn]bool
rdpool [][]byte
wrpool [][]byte
} }
// NewServer returns a new server // NewServer returns a new server
@ -155,13 +160,24 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
tcpc.RemoteAddr().String(), tcpc.RemoteAddr().String(),
nil, nil,
} }
s.mu.Lock()
if len(s.rdpool) > 0 {
c.rd.buf = s.rdpool[len(s.rdpool)-1]
s.rdpool = s.rdpool[:len(s.rdpool)-1]
}
if len(s.wrpool) > 0 {
c.wr.b = s.wrpool[len(s.wrpool)-1]
s.wrpool = s.wrpool[:len(s.wrpool)-1]
}
s.conns[c] = true
s.mu.Unlock()
if accept != nil && !accept(c) { if accept != nil && !accept(c) {
s.mu.Lock()
delete(s.conns, c)
s.mu.Unlock()
c.Close() c.Close()
continue continue
} }
s.mu.Lock()
s.conns[c] = true
s.mu.Unlock()
go handle(s, c, handler, closed) go handle(s, c, handler, closed)
} }
} }
@ -191,6 +207,12 @@ func handle(
} }
closed(c, err) closed(c, err)
} }
if len(s.rdpool) < defaultPoolSize {
s.rdpool = append(s.rdpool, c.rd.buf)
}
if len(s.wrpool) < defaultPoolSize {
s.wrpool = append(s.wrpool, c.wr.b)
}
}() }()
}() }()
err = func() error { err = func() error {
@ -271,9 +293,10 @@ func (c *conn) SetReadBuffer(bytes int) {
// Reader represents a RESP command reader. // Reader represents a RESP command reader.
type reader struct { type reader struct {
r io.Reader // base reader r io.Reader // base reader
b []byte // unprocessed bytes buf []byte
a []byte // static read buffer start int
buflen int // buffer len end int
buflen int
} }
// NewReader returns a RESP command reader. // NewReader returns a RESP command reader.
@ -283,23 +306,29 @@ func newReader(r io.Reader) *reader {
buflen: defaultBufLen, buflen: defaultBufLen,
} }
} }
func (rd *reader) reassign(r io.Reader) {
rd.r = r
rd.start = 0
rd.end = 0
}
// ReadCommands reads one or more commands from the reader. // ReadCommands reads one or more commands from the reader.
func (r *reader) ReadCommands() ([][]string, error) { func (r *reader) ReadCommands() ([][]string, error) {
if len(r.b) > 0 { if r.end-r.start > 0 {
b := r.buf[r.start:r.end]
// we have some potential commands. // we have some potential commands.
var cmds [][]string var cmds [][]string
next: next:
switch r.b[0] { switch b[0] {
default: default:
// just a plain text command // just a plain text command
for i := 0; i < len(r.b); i++ { for i := 0; i < len(b); i++ {
if r.b[i] == '\n' { if b[i] == '\n' {
var line []byte var line []byte
if i > 0 && r.b[i-1] == '\r' { if i > 0 && b[i-1] == '\r' {
line = r.b[:i-1] line = b[:i-1]
} else { } else {
line = r.b[:i] line = b[:i]
} }
var args []string var args []string
var quote bool var quote bool
@ -362,8 +391,8 @@ func (r *reader) ReadCommands() ([][]string, error) {
if len(args) > 0 { if len(args) > 0 {
cmds = append(cmds, args) cmds = append(cmds, args)
} }
r.b = r.b[i+1:] b = b[i+1:]
if len(r.b) > 0 { if len(b) > 0 {
goto next goto next
} else { } else {
goto done goto done
@ -374,47 +403,54 @@ func (r *reader) ReadCommands() ([][]string, error) {
// resp formatted command // resp formatted command
var si int var si int
outer2: outer2:
for i := 0; i < len(r.b); i++ { for i := 0; i < len(b); i++ {
var args []string var args []string
if r.b[i] == '\n' { if b[i] == '\n' {
if r.b[i-1] != '\r' { if b[i-1] != '\r' {
return nil, errInvalidMultiBulkLength return nil, errInvalidMultiBulkLength
} }
ni, err := strconv.ParseInt(string(r.b[si+1:i-1]), 10, 64) ni, err := parseInt(b[si+1 : i-1])
if err != nil || ni <= 0 { if err != nil || ni <= 0 {
return nil, errInvalidMultiBulkLength return nil, errInvalidMultiBulkLength
} }
args = make([]string, 0, int(ni)) args = make([]string, 0, ni)
for j := 0; j < int(ni); j++ { for j := 0; j < ni; j++ {
// read bulk length // read bulk length
i++ i++
if i < len(r.b) { if i < len(b) {
if r.b[i] != '$' { if b[i] != '$' {
return nil, &errProtocol{"expected '$', got '" + return nil, &errProtocol{"expected '$', got '" +
string(r.b[i]) + "'"} string(b[i]) + "'"}
} }
si = i si = i
for ; i < len(r.b); i++ { for ; i < len(b); i++ {
if r.b[i] == '\n' { if b[i] == '\n' {
if r.b[i-1] != '\r' { if b[i-1] != '\r' {
return nil, errInvalidBulkLength return nil, errInvalidBulkLength
} }
s := string(r.b[si+1 : i-1]) ni2, err := parseInt(b[si+1 : i-1])
ni2, err := strconv.ParseInt(s, 10, 64)
if err != nil || ni2 < 0 { if err != nil || ni2 < 0 {
return nil, errInvalidBulkLength return nil, errInvalidBulkLength
} }
if i+int(ni2)+2 >= len(r.b) { if i+ni2+2 >= len(b) {
// not ready // not ready
break outer2 break outer2
} }
if r.b[i+int(ni2)+2] != '\n' || if b[i+ni2+2] != '\n' ||
r.b[i+int(ni2)+1] != '\r' { b[i+ni2+1] != '\r' {
return nil, errInvalidBulkLength return nil, errInvalidBulkLength
} }
arg := string(r.b[i+1 : i+1+int(ni2)]) i++
i += int(ni2) + 2 arg := b[i : i+ni2]
args = append(args, arg) if len(args) == 0 {
for j := 0; j < len(arg); j++ {
if arg[j] >= 'A' && arg[j] <= 'Z' {
arg[j] += 32
}
}
}
i += ni2 + 1
args = append(args, string(arg))
break break
} }
} }
@ -422,8 +458,8 @@ func (r *reader) ReadCommands() ([][]string, error) {
} }
if len(args) == cap(args) { if len(args) == cap(args) {
cmds = append(cmds, args) cmds = append(cmds, args)
r.b = r.b[i+1:] b = b[i+1:]
if len(r.b) > 0 { if len(b) > 0 {
goto next goto next
} else { } else {
goto done goto done
@ -433,34 +469,57 @@ func (r *reader) ReadCommands() ([][]string, error) {
} }
} }
done: done:
if len(r.b) == 0 { if len(b) == 0 {
r.b = nil r.start = 0
r.end = 0
} else {
r.start = r.end - len(b)
} }
if len(cmds) > 0 { if len(cmds) > 0 {
return cmds, nil return cmds, nil
} }
} }
if len(r.a) == 0 { if r.end == len(r.buf) {
r.a = make([]byte, r.buflen) if len(r.buf) == 0 {
r.buf = make([]byte, r.buflen)
} else {
nbuf := make([]byte, len(r.buf)*2)
copy(nbuf, r.buf)
r.buf = nbuf
}
} }
n, err := r.r.Read(r.a) n, err := r.r.Read(r.buf[r.end:])
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
if len(r.b) > 0 { if r.end > 0 {
return nil, io.ErrUnexpectedEOF return nil, io.ErrUnexpectedEOF
} }
} }
return nil, err return nil, err
} }
if len(r.b) == 0 { r.end += n
r.b = r.a[:n]
} else {
r.b = append(r.b, r.a[:n]...)
}
r.a = r.a[n:]
return r.ReadCommands() return r.ReadCommands()
} }
func parseInt(b []byte) (int, error) {
switch len(b) {
case 1:
if b[0] >= '0' && b[0] <= '9' {
return int(b[0] - '0'), nil
}
case 2:
if b[0] >= '0' && b[0] <= '9' && b[1] >= '0' && b[1] <= '9' {
return int(b[0]-'0')*10 + int(b[1]-'0'), nil
}
}
var n int
for i := 0; i < len(b); i++ {
if b[i] < '0' || b[i] > '9' {
return 0, errors.New("invalid number")
}
n = n*10 + int(b[i]-'0')
}
return n, nil
}
var errClosed = errors.New("closed") var errClosed = errors.New("closed")
@ -544,9 +603,13 @@ func (w *writer) WriteString(msg string) error {
if w.err != nil { if w.err != nil {
return w.err return w.err
} }
w.b = append(w.b, '+') if msg == "OK" {
w.b = append(w.b, []byte(msg)...) w.b = append(w.b, '+', 'O', 'K', '\r', '\n')
w.b = append(w.b, '\r', '\n') } else {
w.b = append(w.b, '+')
w.b = append(w.b, []byte(msg)...)
w.b = append(w.b, '\r', '\n')
}
return nil return nil
} }

View File

@ -166,9 +166,30 @@ func TestRandomCommands(t *testing.T) {
t.Fatalf("len not equal for index %d -- %d != %d", idx, len(cmd), len(gcmds[idx])) t.Fatalf("len not equal for index %d -- %d != %d", idx, len(cmd), len(gcmds[idx]))
} }
for i := 0; i < len(cmd); i++ { for i := 0; i < len(cmd); i++ {
if cmd[i] != gcmds[idx][i] { if i == 0 {
t.Fatalf("not equal for index %d/%d", idx, i) if len(cmd[i]) == len(gcmds[idx][i]) {
ok := true
for j := 0; j < len(cmd[i]); j++ {
c1, c2 := cmd[i][j], gcmds[idx][i][j]
if c1 >= 'A' && c1 <= 'Z' {
c1 += 32
}
if c2 >= 'A' && c2 <= 'Z' {
c2 += 32
}
if c1 != c2 {
ok = false
break
}
}
if ok {
continue
}
}
} else if cmd[i] == gcmds[idx][i] {
continue
} }
t.Fatalf("not equal for index %d/%d", idx, i)
} }
idx++ idx++
cnt++ cnt++