tile38/internal/server/aofshrink.go

308 lines
8.1 KiB
Go
Raw Normal View History

package server
2016-03-30 19:32:02 +03:00
import (
2016-07-15 23:01:15 +03:00
"math"
2016-03-30 19:32:02 +03:00
"os"
"sort"
"strconv"
2016-03-30 19:32:02 +03:00
"strings"
"time"
"github.com/tidwall/geojson"
"github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/internal/collection"
"github.com/tidwall/tile38/internal/log"
2016-03-30 19:32:02 +03:00
)
const maxkeys = 8
const maxids = 32
const maxchunk = 4 * 1024 * 1024
2016-03-30 19:32:02 +03:00
func (server *Server) aofshrink() {
if server.aof == nil {
return
}
2016-03-30 19:32:02 +03:00
start := time.Now()
server.mu.Lock()
if server.shrinking {
server.mu.Unlock()
2016-03-30 19:32:02 +03:00
return
}
server.shrinking = true
server.shrinklog = nil
server.mu.Unlock()
2016-03-30 19:32:02 +03:00
defer func() {
server.mu.Lock()
server.shrinking = false
server.shrinklog = nil
server.mu.Unlock()
log.Infof("aof shrink ended %v", time.Since(start))
}()
2016-03-30 19:32:02 +03:00
err := func() error {
f, err := os.Create(core.AppendFileName + "-shrink")
2016-03-30 19:32:02 +03:00
if err != nil {
return err
2016-03-30 19:32:02 +03:00
}
defer f.Close()
var aofbuf []byte
var values []string
var keys []string
var nextkey string
var keysdone bool
2016-03-30 19:32:02 +03:00
for {
if len(keys) == 0 {
// load more keys
if keysdone {
2016-03-30 19:32:02 +03:00
break
}
keysdone = true
func() {
server.mu.Lock()
defer server.mu.Unlock()
server.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
if len(keys) == maxkeys {
keysdone = false
nextkey = key
return false
}
keys = append(keys, key)
return true
})
}()
continue
2016-03-30 19:32:02 +03:00
}
var idsdone bool
var nextid string
for {
if idsdone {
keys = keys[1:]
break
}
2016-03-30 19:32:02 +03:00
// load more objects
func() {
idsdone = true
server.mu.Lock()
defer server.mu.Unlock()
col := server.getCol(keys[0])
if col == nil {
return
}
var fnames = col.FieldArr() // reload an array of field names to match each object
var fmap = col.FieldMap() //
2019-11-17 17:25:25 +03:00
var now = time.Now().UnixNano() // used for expiration
var count = 0 // the object count
2019-04-24 15:09:41 +03:00
col.ScanGreaterOrEqual(nextid, false, nil, nil,
func(id string, obj geojson.Object, fields []float64, ex int64) bool {
if count == maxids {
// we reached the max number of ids for one batch
nextid = id
idsdone = false
return false
}
// here we fill the values array with a new command
values = values[:0]
values = append(values, "set")
values = append(values, keys[0])
values = append(values, id)
if len(fields) > 0 {
fvs := orderFields(fmap, fnames, fields)
for _, fv := range fvs {
if fv.value != 0 {
values = append(values, "field")
values = append(values, fv.field)
values = append(values, strconv.FormatFloat(fv.value, 'f', -1, 64))
}
}
}
if ex != 0 {
ttl := math.Floor(float64(ex-now)/float64(time.Second)*10) / 10
if ttl < 0.1 {
// always leave a little bit of ttl.
ttl = 0.1
}
values = append(values, "ex")
values = append(values, strconv.FormatFloat(ttl, 'f', -1, 64))
}
if objIsSpatial(obj) {
values = append(values, "object")
values = append(values, string(obj.AppendJSON(nil)))
} else {
values = append(values, "string")
values = append(values, obj.String())
2016-07-15 23:01:15 +03:00
}
2016-03-30 19:32:02 +03:00
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
2016-03-30 19:32:02 +03:00
// increment the object count
count++
return true
},
)
}()
2017-12-12 03:12:15 +03:00
if len(aofbuf) > maxchunk {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
2016-03-30 19:32:02 +03:00
}
}
}
// load hooks
// first load the names of the hooks
var hnames []string
func() {
server.mu.Lock()
defer server.mu.Unlock()
for name := range server.hooks {
hnames = append(hnames, name)
}
}()
// sort the names for consistency
sort.Strings(hnames)
for _, name := range hnames {
func() {
server.mu.Lock()
defer server.mu.Unlock()
hook := server.hooks[name]
if hook == nil {
2016-03-30 19:32:02 +03:00
return
}
2018-08-14 03:05:30 +03:00
hook.cond.L.Lock()
defer hook.cond.L.Unlock()
var values []string
2018-08-14 03:05:30 +03:00
if hook.channel {
values = append(values, "setchan", name)
} else {
values = append(values, "sethook", name,
strings.Join(hook.Endpoints, ","))
}
for _, meta := range hook.Metas {
values = append(values, "meta", meta.Name, meta.Value)
}
2018-08-14 06:27:22 +03:00
if !hook.expires.IsZero() {
ex := float64(time.Until(hook.expires)) / float64(time.Second)
2018-08-14 06:27:22 +03:00
values = append(values, "ex",
strconv.FormatFloat(ex, 'f', 1, 64))
}
values = append(values, hook.Message.Args...)
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}()
2016-03-30 19:32:02 +03:00
}
if len(aofbuf) > 0 {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
2016-03-30 19:32:02 +03:00
}
if err := f.Sync(); err != nil {
return err
2016-03-30 19:32:02 +03:00
}
// finally grab any new data that may have been written since
// the aofshrink has started and swap out the files.
return func() error {
server.mu.Lock()
defer server.mu.Unlock()
// kill all followers connections and close their files. This
// ensures that there is only one opened AOF at a time which is
// what Windows requires in order to perform the Rename function
// below.
for conn, f := range server.aofconnM {
conn.Close()
f.Close()
}
// send a broadcast to all sleeping followers
server.fcond.Broadcast()
// flush the aof buffer
2019-03-10 20:48:14 +03:00
server.flushAOF(false)
aofbuf = aofbuf[:0]
for _, values := range server.shrinklog {
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}
if _, err := f.Write(aofbuf); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
// we now have a shrunken aof file that is fully in-sync with
// the current dataset. let's swap out the on disk files and
// point to the new file.
// anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error
if err := server.aof.Close(); err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink live aof close fatal operation: %v", err)
}
if err := f.Close(); err != nil {
log.Fatalf("shrink new aof close fatal operation: %v", err)
}
if err := os.Rename(core.AppendFileName, core.AppendFileName+"-bak"); err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink backup fatal operation: %v", err)
}
if err := os.Rename(core.AppendFileName+"-shrink", core.AppendFileName); err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink rename fatal operation: %v", err)
}
server.aof, err = os.OpenFile(core.AppendFileName, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink openfile fatal operation: %v", err)
}
var n int64
n, err = server.aof.Seek(0, 2)
if err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink seek end fatal operation: %v", err)
}
server.aofsz = int(n)
os.Remove(core.AppendFileName + "-bak") // ignore error
return nil
}()
}()
if err != nil {
log.Errorf("aof shrink failed: %v", err)
return
}
2016-03-30 19:32:02 +03:00
}