fix to aof corruption during aofshrink command.

This addresses an issue #73 that @huangpeizhi discovered while using
Tile38 in production where AOFSHRINK sometimes corrupts the database
causing the server to not start the next time.
This commit is contained in:
Josh Baker 2016-12-05 16:24:26 -07:00
parent 196016688b
commit 0afdf67c90
6 changed files with 303 additions and 236 deletions

View File

@ -173,6 +173,13 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
}
}
}
if c.shrinking {
var values []string
for _, value := range value.Array() {
values = append(values, value.String())
}
c.shrinklog = append(c.shrinklog, values)
}
data, err := value.MarshalRESP()
if err != nil {
return err

View File

@ -1,258 +1,299 @@
package controller
import (
"bytes"
"errors"
"io"
"math"
"os"
"path"
"sort"
"strconv"
"strings"
"time"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/collection"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/geojson"
)
type objFields struct {
obj geojson.Object
fields []float64
expires time.Duration
}
const maxkeys = 8
const maxids = 32
const maxchunk = 4 * 1024 * 1024
const maxKeyGroup = 10
const maxIDGroup = 10
// aofshrink shrinks the aof file to it's minimum size.
// There are some pauses but each pause should not take more that 100ms on a busy server.
func (c *Controller) aofshrink() {
start := time.Now()
c.mu.Lock()
c.f.Sync()
if c.shrinking {
c.mu.Unlock()
return
}
f, err := os.Create(path.Join(c.dir, "shrink"))
if err != nil {
log.Errorf("aof shrink failed: %s\n", err.Error())
return
}
defer func() {
f.Close()
//os.RemoveAll(rewritePath)
}()
var ferr error // stores the final error
c.shrinking = true
c.currentShrinkStart = start
endpos := int64(c.aofsz) // 1) Log the aofsize at start. Locked
c.shrinklog = nil
c.mu.Unlock()
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
c.shrinking = false
c.lastShrinkDuration = time.Now().Sub(start)
c.currentShrinkStart = time.Time{}
defer func() {
if ferr != nil {
log.Errorf("aof shrink failed: %s\n", ferr.Error())
} else {
log.Printf("aof shrink completed in %s", c.lastShrinkDuration)
}
}()
if ferr != nil {
return
}
c.shrinklog = nil
c.mu.Unlock()
log.Infof("aof shrink ended %v", time.Now().Sub(start))
return
}()
of, err := os.Open(c.f.Name())
err := func() error {
f, err := os.Create(path.Join(c.dir, "shrink"))
if err != nil {
ferr = err
return
return err
}
defer of.Close()
if _, err := of.Seek(endpos, 0); err != nil {
ferr = err
return
}
rd := resp.NewReader(of)
defer f.Close()
var aofbuf []byte
var values []string
var keys []string
var nextkey string
var keysdone bool
for {
v, telnet, _, err := rd.ReadMultiBulk()
if err != nil {
if err == io.EOF {
if len(keys) == 0 {
// load more keys
if keysdone {
break
}
ferr = err
return
keysdone = true
func() {
c.mu.Lock()
defer c.mu.Unlock()
c.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
if len(keys) == maxkeys {
keysdone = false
nextkey = key
return false
}
keys = append(keys, key)
return true
})
}()
continue
}
if telnet {
ferr = errors.New("invalid RESP message")
return
}
data, err := v.MarshalRESP()
if err != nil {
ferr = err
return
}
if _, err := f.Write(data); err != nil {
ferr = err
return
}
break
}
of.Close()
// swap files
f.Close()
c.f.Close()
err = os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof"))
if err != nil {
log.Fatal("shink rename fatal operation")
}
c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatal("shink openfile fatal operation")
}
var n int64
n, err = c.f.Seek(0, 2)
if err != nil {
log.Fatal("shink seek end fatal operation")
}
c.aofsz = int(n)
// kill all followers connections
for conn := range c.aofconnM {
conn.Close()
}
}()
log.Infof("aof shrink started at pos %d", endpos)
// Ascend collections. Load maxKeyGroup at a time.
nextKey := ""
for {
cols := make(map[string]*collection.Collection)
c.mu.Lock()
c.scanGreaterOrEqual(nextKey, func(key string, col *collection.Collection) bool {
if key != nextKey {
cols[key] = col
nextKey = key
}
return len(cols) < maxKeyGroup
})
c.mu.Unlock()
keys := make([]string, 0, maxKeyGroup)
for key := range cols {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
col := cols[key]
// Ascend objects. Load maxIDGroup at a time.
nextID := ""
var idsdone bool
var nextid string
for {
objs := make(map[string]objFields)
c.mu.Lock()
now := time.Now()
exm := c.expires[key]
fnames := col.FieldArr() // reload an array of field names to match each object
col.ScanGreaterOrEqual(nextID, 0, false,
func(id string, obj geojson.Object, fields []float64) bool {
if id != nextID {
o := objFields{obj: obj, fields: fields}
if idsdone {
keys = keys[1:]
break
}
// load more objects
func() {
idsdone = true
c.mu.Lock()
defer c.mu.Unlock()
col := c.getCol(keys[0])
if col == nil {
return
}
var fnames = col.FieldArr() // reload an array of field names to match each object
var exm = c.expires[keys[0]] // the expiration map
var now = time.Now() // used for expiration
var count = 0 // the object count
col.ScanGreaterOrEqual(nextid, 0, false,
func(id string, obj geojson.Object, fields []float64) bool {
if count == maxids {
// we reached the max number of ids for one batch
nextid = id
idsdone = false
return false
}
// here we fill the values array with a new command
values = values[:0]
values = append(values, "set")
values = append(values, keys[0])
values = append(values, id)
for i, fvalue := range fields {
if fvalue != 0 {
values = append(values, "field")
values = append(values, fnames[i])
values = append(values, strconv.FormatFloat(fvalue, 'f', -1, 64))
}
}
if exm != nil {
at, ok := exm[id]
if ok {
o.expires = at.Sub(now)
expires := at.Sub(now)
if expires > 0 {
values = append(values, "ex")
values = append(values, strconv.FormatFloat(math.Floor(float64(expires)/float64(time.Second)*10)/10, 'f', -1, 64))
}
}
}
switch obj := obj.(type) {
default:
if obj.IsGeometry() {
values = append(values, "object")
values = append(values, obj.JSON())
} else {
values = append(values, "string")
values = append(values, obj.String())
}
case geojson.SimplePoint:
values = append(values, "point")
values = append(values, strconv.FormatFloat(obj.Y, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.X, 'f', -1, 64))
case geojson.Point:
if obj.Coordinates.Z == 0 {
values = append(values, "point")
values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.Coordinates.Z, 'f', -1, 64))
} else {
values = append(values, "point")
values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64))
}
}
objs[id] = o
nextID = id
}
return len(objs) < maxIDGroup
},
)
c.mu.Unlock()
ids := make([]string, 0, maxIDGroup)
for id := range objs {
ids = append(ids, id)
}
sort.Strings(ids)
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
linebuf := &bytes.Buffer{}
for _, id := range ids {
obj := objs[id]
values := make([]resp.Value, 0, len(obj.fields)*3+16)
values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id))
for i, fvalue := range obj.fields {
if fvalue != 0 {
values = append(values, resp.StringValue("field"), resp.StringValue(fnames[i]), resp.FloatValue(fvalue))
}
}
if obj.expires > 0 {
values = append(values, resp.StringValue("ex"), resp.FloatValue(math.Floor(float64(obj.expires)/float64(time.Second)*10)/10))
}
switch obj := obj.obj.(type) {
default:
if obj.IsGeometry() {
values = append(values, resp.StringValue("object"), resp.StringValue(obj.JSON()))
} else {
values = append(values, resp.StringValue("string"), resp.StringValue(obj.String()))
}
case geojson.SimplePoint:
values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Y), resp.FloatValue(obj.X))
case geojson.Point:
if obj.Coordinates.Z == 0 {
values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Coordinates.Y), resp.FloatValue(obj.Coordinates.X))
} else {
values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Coordinates.Y), resp.FloatValue(obj.Coordinates.X), resp.FloatValue(obj.Coordinates.Z))
}
}
data, err := resp.ArrayValue(values).MarshalRESP()
if err != nil {
ferr = err
return
}
linebuf.Write(data)
}
if _, err := f.Write(linebuf.Bytes()); err != nil {
ferr = err
return
}
if len(objs) < maxIDGroup {
break
// increment the object count
count++
return true
},
)
}()
}
if len(aofbuf) > maxchunk {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
}
}
if len(cols) < maxKeyGroup {
break
}
}
// load hooks
c.mu.Lock()
for name, hook := range c.hooks {
values := make([]resp.Value, 0, 3+len(hook.Message.Values))
endpoints := make([]string, len(hook.Endpoints))
for i, endpoint := range hook.Endpoints {
endpoints[i] = endpoint
}
values = append(values, resp.StringValue("sethook"), resp.StringValue(name), resp.StringValue(strings.Join(endpoints, ",")))
values = append(values, hook.Message.Values...)
data, err := resp.ArrayValue(values).MarshalRESP()
if err != nil {
c.mu.Unlock()
ferr = err
return
}
if _, err := f.Write(data); err != nil {
c.mu.Unlock()
ferr = err
return
}
}
c.mu.Unlock()
// load hooks
// first load the names of the hooks
var hnames []string
func() {
c.mu.Lock()
defer c.mu.Unlock()
for name := range c.hooks {
hnames = append(hnames, name)
}
}()
// sort the names for consistency
sort.Strings(hnames)
for _, name := range hnames {
func() {
c.mu.Lock()
defer c.mu.Unlock()
hook := c.hooks[name]
if hook == nil {
return
}
hook.mu.Lock()
defer hook.mu.Unlock()
var values []string
values = append(values, "sethook")
values = append(values, name)
values = append(values, strings.Join(hook.Endpoints, ","))
for _, value := range hook.Message.Values {
values = append(values, value.String())
}
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}()
}
if len(aofbuf) > 0 {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
}
if err := f.Sync(); err != nil {
return err
}
// finally grab any new data that may have been written since
// the aofshrink has started and swap out the files.
return func() error {
c.mu.Lock()
defer c.mu.Unlock()
aofbuf = aofbuf[:0]
for _, values := range c.shrinklog {
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}
if _, err := f.Write(aofbuf); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
// we now have a shrunken aof file that is fully in-sync with
// the current dataset. let's swap out the on disk files and
// point to the new file.
// anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error
if err := os.Rename(path.Join(c.dir, "appendonly.aof"), path.Join(c.dir, "appendonly.bak")); err != nil {
log.Fatalf("shink backup fatal operation: %v", err)
}
if err := os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")); err != nil {
log.Fatalf("shink rename fatal operation: %v", err)
}
if err := c.f.Close(); err != nil {
log.Fatalf("shink live aof close fatal operation: %v", err)
}
c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatalf("shink openfile fatal operation: %v", err)
}
var n int64
n, err = c.f.Seek(0, 2)
if err != nil {
log.Fatalf("shink seek end fatal operation: %v", err)
}
c.aofsz = int(n)
os.Remove(path.Join(c.dir, "appendonly.bak")) // ignore error
// kill all followers connections
for conn := range c.aofconnM {
conn.Close()
}
return nil
}()
}()
if err != nil {
log.Errorf("aof shrink failed: %v", err)
return
}
}

View File

@ -72,6 +72,7 @@ type Controller struct {
lcond *sync.Cond
fcup bool // follow caught up
shrinking bool // aof shrinking flag
shrinklog [][]string // aof shrinking log
hooks map[string]*Hook // hook name
hookcols map[string]map[string]*Hook // col key
aofconnM map[net.Conn]bool
@ -416,8 +417,15 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message,
// this is local connection operation. Locks not needed.
case "massinsert":
// dev operation
// ** danger zone **
// no locks! DEV MODE ONLY
c.mu.Lock()
defer c.mu.Unlock()
case "shutdown":
// dev operation
c.mu.Lock()
defer c.mu.Unlock()
case "aofshrink":
c.mu.RLock()
defer c.mu.RUnlock()
}
res, d, err := c.command(msg, w)
@ -489,6 +497,12 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co
res, d, err = c.cmdTTL(msg)
case "hooks":
res, err = c.cmdHooks(msg)
case "shutdown":
if !core.DevMode {
err = fmt.Errorf("unknown command '%s'", msg.Values[0])
return
}
log.Fatal("shutdown requested by developer")
case "massinsert":
if !core.DevMode {
err = fmt.Errorf("unknown command '%s'", msg.Values[0])

View File

@ -670,12 +670,15 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT,
d.oldObj, d.oldFields, d.fields = col.ReplaceOrInsert(d.id, d.obj, fields, values)
d.command = "set"
d.updated = true // perhaps we should do a diff on the previous object?
fmap = col.FieldMap()
d.fmap = make(map[string]int)
for key, idx := range fmap {
d.fmap[key] = idx
}
d.timestamp = time.Now()
if msg.ConnType != server.Null || msg.OutputType != server.Null {
// likely loaded from aof at server startup, ignore field remapping.
fmap = col.FieldMap()
d.fmap = make(map[string]int)
for key, idx := range fmap {
d.fmap[key] = idx
}
}
if ex != nil {
c.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex))))
}

View File

@ -2,10 +2,10 @@ package controller
import (
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -79,14 +79,12 @@ func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error)
return "", errInvalidArgument(snumPoints)
}
docmd := func(values []resp.Value) error {
c.mu.Lock()
defer c.mu.Unlock()
nmsg := &server.Message{}
*nmsg = *msg
nmsg.Values = values
nmsg.Command = strings.ToLower(values[0].String())
_, d, err := c.command(nmsg, nil)
var d commandDetailsT
_, d, err = c.command(nmsg, nil)
if err != nil {
return err
}
@ -97,37 +95,38 @@ func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error)
}
rand.Seed(time.Now().UnixNano())
objs = int(n)
var wg sync.WaitGroup
var k uint64
wg.Add(cols)
for i := 0; i < cols; i++ {
key := "mi:" + strconv.FormatInt(int64(i), 10)
go func(key string) {
defer func() {
wg.Done()
}()
func(key string) {
// lock cycle
for j := 0; j < objs; j++ {
id := strconv.FormatInt(int64(j), 10)
lat, lon := randMassInsertPosition(minLat, minLon, maxLat, maxLon)
values := make([]resp.Value, 0, 16)
values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id))
if useRandField {
values = append(values, resp.StringValue("FIELD"), resp.StringValue("field"), resp.FloatValue(rand.Float64()*10))
var values []resp.Value
if j%8 == 0 {
lat, lon := randMassInsertPosition(minLat, minLon, maxLat, maxLon)
values = make([]resp.Value, 0, 16)
values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id))
if useRandField {
values = append(values, resp.StringValue("FIELD"), resp.StringValue("fname"), resp.FloatValue(rand.Float64()*10))
}
values = append(values, resp.StringValue("POINT"), resp.FloatValue(lat), resp.FloatValue(lon))
} else {
values = append(values, resp.StringValue("set"),
resp.StringValue(key), resp.StringValue(id),
resp.StringValue("STRING"), resp.StringValue(fmt.Sprintf("str%v", j)))
}
values = append(values, resp.StringValue("POINT"), resp.FloatValue(lat), resp.FloatValue(lon))
if err := docmd(values); err != nil {
log.Fatal(err)
return
}
atomic.AddUint64(&k, 1)
if j%10000 == 10000-1 {
if j%1000 == 1000-1 {
log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs)
}
}
}(key)
}
wg.Wait()
log.Infof("massinsert: done %d objects", atomic.LoadUint64(&k))
return server.OKMessage(msg, start), nil
}

View File

@ -93,7 +93,10 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai
cmsg := &server.Message{}
*cmsg = *msg
cmsg.Values = commandvs
cmsg.Values = make([]resp.Value, len(commandvs))
for i := 0; i < len(commandvs); i++ {
cmsg.Values[i] = commandvs[i]
}
cmsg.Command = strings.ToLower(cmsg.Values[0].String())
hook := &Hook{