tile38/controller/aofshrink.go

300 lines
8.0 KiB
Go
Raw Normal View History

2016-03-30 19:32:02 +03:00
package controller
import (
2016-07-15 23:01:15 +03:00
"math"
2016-03-30 19:32:02 +03:00
"os"
"path"
"sort"
"strconv"
2016-03-30 19:32:02 +03:00
"strings"
"time"
"github.com/tidwall/tile38/controller/collection"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/geojson"
)
const maxkeys = 8
const maxids = 32
const maxchunk = 4 * 1024 * 1024
2016-03-30 19:32:02 +03:00
func (c *Controller) aofshrink() {
start := time.Now()
c.mu.Lock()
if c.shrinking {
c.mu.Unlock()
return
}
c.shrinking = true
c.shrinklog = nil
2016-03-30 19:32:02 +03:00
c.mu.Unlock()
2016-03-30 19:32:02 +03:00
defer func() {
c.mu.Lock()
c.shrinking = false
c.shrinklog = nil
c.mu.Unlock()
log.Infof("aof shrink ended %v", time.Now().Sub(start))
return
}()
2016-03-30 19:32:02 +03:00
err := func() error {
f, err := os.Create(path.Join(c.dir, "shrink"))
2016-03-30 19:32:02 +03:00
if err != nil {
return err
2016-03-30 19:32:02 +03:00
}
defer f.Close()
var aofbuf []byte
var values []string
var keys []string
var nextkey string
var keysdone bool
2016-03-30 19:32:02 +03:00
for {
if len(keys) == 0 {
// load more keys
if keysdone {
2016-03-30 19:32:02 +03:00
break
}
keysdone = true
func() {
c.mu.Lock()
defer c.mu.Unlock()
c.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
if len(keys) == maxkeys {
keysdone = false
nextkey = key
return false
}
keys = append(keys, key)
return true
})
}()
continue
2016-03-30 19:32:02 +03:00
}
var idsdone bool
var nextid string
for {
if idsdone {
keys = keys[1:]
break
}
2016-03-30 19:32:02 +03:00
// load more objects
func() {
idsdone = true
c.mu.Lock()
defer c.mu.Unlock()
col := c.getCol(keys[0])
if col == nil {
return
}
var fnames = col.FieldArr() // reload an array of field names to match each object
var exm = c.expires[keys[0]] // the expiration map
var now = time.Now() // used for expiration
var count = 0 // the object count
col.ScanGreaterOrEqual(nextid, false,
func(id string, obj geojson.Object, fields []float64) bool {
if count == maxids {
// we reached the max number of ids for one batch
nextid = id
idsdone = false
return false
}
2016-03-30 19:32:02 +03:00
// here we fill the values array with a new command
values = values[:0]
values = append(values, "set")
values = append(values, keys[0])
values = append(values, id)
for i, fvalue := range fields {
if fvalue != 0 {
values = append(values, "field")
values = append(values, fnames[i])
values = append(values, strconv.FormatFloat(fvalue, 'f', -1, 64))
}
}
2016-07-15 23:01:15 +03:00
if exm != nil {
at, ok := exm[id]
if ok {
expires := at.Sub(now)
if expires > 0 {
values = append(values, "ex")
values = append(values, strconv.FormatFloat(math.Floor(float64(expires)/float64(time.Second)*10)/10, 'f', -1, 64))
}
}
}
switch obj := obj.(type) {
default:
if obj.IsGeometry() {
values = append(values, "object")
values = append(values, obj.JSON())
} else {
values = append(values, "string")
values = append(values, obj.String())
}
case geojson.SimplePoint:
values = append(values, "point")
values = append(values, strconv.FormatFloat(obj.Y, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.X, 'f', -1, 64))
case geojson.Point:
if obj.Coordinates.Z == 0 {
values = append(values, "point")
values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.Coordinates.Z, 'f', -1, 64))
} else {
values = append(values, "point")
values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64))
values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64))
2016-07-15 23:01:15 +03:00
}
}
2016-03-30 19:32:02 +03:00
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
2016-03-30 19:32:02 +03:00
// increment the object count
count++
return true
},
)
}()
}
if len(aofbuf) > maxchunk {
if _, err := f.Write(aofbuf); err != nil {
return err
2016-03-30 19:32:02 +03:00
}
aofbuf = aofbuf[:0]
}
}
// load hooks
// first load the names of the hooks
var hnames []string
func() {
c.mu.Lock()
defer c.mu.Unlock()
for name := range c.hooks {
hnames = append(hnames, name)
}
}()
// sort the names for consistency
sort.Strings(hnames)
for _, name := range hnames {
func() {
c.mu.Lock()
defer c.mu.Unlock()
hook := c.hooks[name]
if hook == nil {
2016-03-30 19:32:02 +03:00
return
}
hook.mu.Lock()
defer hook.mu.Unlock()
var values []string
values = append(values, "sethook")
values = append(values, name)
values = append(values, strings.Join(hook.Endpoints, ","))
for _, value := range hook.Message.Values {
values = append(values, value.String())
2016-03-30 19:32:02 +03:00
}
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}()
2016-03-30 19:32:02 +03:00
}
if len(aofbuf) > 0 {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
2016-03-30 19:32:02 +03:00
}
if err := f.Sync(); err != nil {
return err
2016-03-30 19:32:02 +03:00
}
// finally grab any new data that may have been written since
// the aofshrink has started and swap out the files.
return func() error {
c.mu.Lock()
defer c.mu.Unlock()
aofbuf = aofbuf[:0]
for _, values := range c.shrinklog {
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}
if _, err := f.Write(aofbuf); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
// we now have a shrunken aof file that is fully in-sync with
// the current dataset. let's swap out the on disk files and
// point to the new file.
// anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error
if err := os.Rename(path.Join(c.dir, "appendonly.aof"), path.Join(c.dir, "appendonly.bak")); err != nil {
log.Fatalf("shink backup fatal operation: %v", err)
}
if err := os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")); err != nil {
log.Fatalf("shink rename fatal operation: %v", err)
}
if err := c.f.Close(); err != nil {
log.Fatalf("shink live aof close fatal operation: %v", err)
}
c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatalf("shink openfile fatal operation: %v", err)
}
var n int64
n, err = c.f.Seek(0, 2)
if err != nil {
log.Fatalf("shink seek end fatal operation: %v", err)
}
c.aofsz = int(n)
os.Remove(path.Join(c.dir, "appendonly.bak")) // ignore error
// kill all followers connections
for conn := range c.aofconnM {
conn.Close()
}
return nil
}()
}()
if err != nil {
log.Errorf("aof shrink failed: %v", err)
return
}
2016-03-30 19:32:02 +03:00
}