faster aof loading

About 30% faster loading of AOF file during server restart.
This commit is contained in:
Josh Baker 2016-11-09 13:43:26 -07:00
parent 1ac6ad9ebd
commit fbeecaacd9
3 changed files with 128 additions and 31 deletions

View File

@ -1,6 +1,7 @@
package controller
import (
"bufio"
"errors"
"fmt"
"io"
@ -27,6 +28,8 @@ func (err errAOFHook) Error() string {
return fmt.Sprintf("hook: %v", err.err)
}
var errInvalidAOF = errors.New("invalid aof file")
func (c *Controller) loadAOF() error {
fi, err := c.f.Stat()
if err != nil {
@ -50,32 +53,102 @@ func (c *Controller) loadAOF() error {
log.Infof("AOF loaded %d commands: %.2fs, %.0f/s, %s",
count, float64(d)/float64(time.Second), ps, byteSpeed)
}()
rd := resp.NewReader(c.f)
var msg server.Message
rd := bufio.NewReader(c.f)
for {
v, _, n, err := rd.ReadMultiBulk()
var nn int
ch, err := rd.ReadByte()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
values := v.Array()
if len(values) == 0 {
return errors.New("multibulk missing command component")
nn += 1
if ch != '*' {
return errInvalidAOF
}
msg := &server.Message{
Command: strings.ToLower(values[0].String()),
Values: values,
ns, err := rd.ReadString('\n')
if err != nil {
return err
}
if _, _, err := c.command(msg, nil); err != nil {
nn += len(ns)
if len(ns) < 2 || ns[len(ns)-2] != '\r' {
return errInvalidAOF
}
n, err := strconv.ParseUint(ns[:len(ns)-2], 10, 64)
if err != nil {
return err
}
if int(n) == 0 {
continue
}
msg.Values = msg.Values[:0]
for i := 0; i < int(n); i++ {
ch, err := rd.ReadByte()
if err != nil {
return err
}
if ch != '$' {
return errInvalidAOF
}
ns, err := rd.ReadString('\n')
if err != nil {
return err
}
if len(ns) < 2 || ns[len(ns)-2] != '\r' {
return errInvalidAOF
}
n, err := strconv.ParseUint(ns[:len(ns)-2], 10, 64)
if err != nil {
return err
}
b := make([]byte, int(n))
_, err = io.ReadFull(rd, b)
if err != nil {
return err
}
if ch, err := rd.ReadByte(); err != nil {
return err
} else if ch != '\r' {
return errInvalidAOF
}
if ch, err := rd.ReadByte(); err != nil {
return err
} else if ch != '\n' {
return errInvalidAOF
}
msg.Values = append(msg.Values, resp.BytesValue(b))
if i == 0 {
msg.Command = qlower(b)
}
nn += 1 + len(ns) + int(n) + 2
}
if _, _, err := c.command(&msg, nil); err != nil {
if commandErrIsFatal(err) {
return err
}
}
c.aofsz += n
c.aofsz += nn
count++
}
}
func qlower(s []byte) string {
if len(s) == 3 {
if s[0] == 'S' && s[1] == 'E' && s[2] == 'T' {
return "set"
}
if s[0] == 'D' && s[1] == 'E' && s[2] == 'L' {
return "del"
}
}
for i := 0; i < len(s); i++ {
if s[i] >= 'A' || s[i] <= 'Z' {
return strings.ToLower(string(s))
}
}
return string(s)
}
func commandErrIsFatal(err error) bool {
// FSET (and other writable commands) may return errors that we need

View File

@ -403,10 +403,10 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai
func (c *Controller) parseSetArgs(vs []resp.Value) (
d commandDetailsT, fields []string, values []float64,
xx, nx bool,
expires *float64, etype string, evs []resp.Value, err error,
expires *float64, etype []byte, evs []resp.Value, err error,
) {
var ok bool
var typ string
var typ []byte
if vs, d.key, ok = tokenval(vs); !ok || d.key == "" {
err = errInvalidNumberOfArguments
return
@ -415,16 +415,14 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
err = errInvalidNumberOfArguments
return
}
var arg string
var arg []byte
var nvs []resp.Value
fields = make([]string, 0, 8)
values = make([]float64, 0, 8)
for {
if nvs, arg, ok = tokenval(vs); !ok || arg == "" {
if nvs, arg, ok = tokenvalbytes(vs); !ok || len(arg) == 0 {
err = errInvalidNumberOfArguments
return
}
if lc(arg, "field") {
if lcb(arg, "field") {
vs = nvs
var name string
var svalue string
@ -450,10 +448,10 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
values = append(values, value)
continue
}
if lc(arg, "ex") {
if lcb(arg, "ex") {
vs = nvs
if expires != nil {
err = errInvalidArgument(arg)
err = errInvalidArgument(string(arg))
return
}
var s string
@ -470,19 +468,19 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
expires = &v
continue
}
if lc(arg, "xx") {
if lcb(arg, "xx") {
vs = nvs
if nx {
err = errInvalidArgument(arg)
err = errInvalidArgument(string(arg))
return
}
xx = true
continue
}
if lc(arg, "nx") {
if lcb(arg, "nx") {
vs = nvs
if xx {
err = errInvalidArgument(arg)
err = errInvalidArgument(string(arg))
return
}
nx = true
@ -490,7 +488,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
}
break
}
if vs, typ, ok = tokenval(vs); !ok || typ == "" {
if vs, typ, ok = tokenvalbytes(vs); !ok || len(typ) == 0 {
err = errInvalidNumberOfArguments
return
}
@ -502,16 +500,16 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
evs = vs
switch {
default:
err = errInvalidArgument(typ)
err = errInvalidArgument(string(typ))
return
case lc(typ, "string"):
case lcb(typ, "string"):
var str string
if vs, str, ok = tokenval(vs); !ok {
err = errInvalidNumberOfArguments
return
}
d.obj = geojson.String(str)
case lc(typ, "point"):
case lcb(typ, "point"):
var slat, slon, sz string
if vs, slat, ok = tokenval(vs); !ok || slat == "" {
err = errInvalidNumberOfArguments
@ -554,7 +552,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
}
d.obj = sp
}
case lc(typ, "bounds"):
case lcb(typ, "bounds"):
var sminlat, sminlon, smaxlat, smaxlon string
if vs, sminlat, ok = tokenval(vs); !ok || sminlat == "" {
err = errInvalidNumberOfArguments
@ -605,7 +603,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
},
}
d.obj = g
case lc(typ, "hash"):
case lcb(typ, "hash"):
var sp geojson.SimplePoint
var shash string
if vs, shash, ok = tokenval(vs); !ok || shash == "" {
@ -620,7 +618,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
sp.X = lon
sp.Y = lat
d.obj = sp
case lc(typ, "object"):
case lcb(typ, "object"):
var object string
if vs, object, ok = tokenval(vs); !ok || object == "" {
err = errInvalidNumberOfArguments
@ -682,6 +680,7 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT,
c.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex))))
}
switch msg.OutputType {
default:
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:
@ -690,6 +689,7 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT,
return
notok:
switch msg.OutputType {
default:
case server.JSON:
res = `{"ok":false,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:

View File

@ -40,6 +40,15 @@ func tokenval(vs []resp.Value) (nvs []resp.Value, token string, ok bool) {
return
}
func tokenvalbytes(vs []resp.Value) (nvs []resp.Value, token []byte, ok bool) {
if len(vs) > 0 {
token = vs[0].Bytes()
nvs = vs[1:]
ok = true
}
return
}
func tokenlc(line string) (newLine, token string) {
for i := 0; i < len(line); i++ {
ch := line[i]
@ -69,7 +78,22 @@ func tokenlc(line string) (newLine, token string) {
}
return "", line
}
func lcb(s1 []byte, s2 string) bool {
if len(s1) != len(s2) {
return false
}
for i := 0; i < len(s1); i++ {
ch := s1[i]
if ch >= 'A' && ch <= 'Z' {
if ch+32 != s2[i] {
return false
}
} else if ch != s2[i] {
return false
}
}
return true
}
func lc(s1, s2 string) bool {
if len(s1) != len(s2) {
return false