tile38/internal/server/aofshrink.go

308 lines
7.9 KiB
Go

package server
import (
"math"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/tidwall/geojson"
"github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/internal/collection"
"github.com/tidwall/tile38/internal/log"
)
const maxkeys = 8
const maxids = 32
const maxchunk = 4 * 1024 * 1024
func (server *Server) aofshrink() {
if server.aof == nil {
return
}
start := time.Now()
server.mu.Lock()
if server.shrinking {
server.mu.Unlock()
return
}
server.shrinking = true
server.shrinklog = nil
server.mu.Unlock()
defer func() {
server.mu.Lock()
server.shrinking = false
server.shrinklog = nil
server.mu.Unlock()
log.Infof("aof shrink ended %v", time.Now().Sub(start))
return
}()
err := func() error {
f, err := os.Create(core.AppendFileName + "-shrink")
if err != nil {
return err
}
defer f.Close()
var aofbuf []byte
var values []string
var keys []string
var nextkey string
var keysdone bool
for {
if len(keys) == 0 {
// load more keys
if keysdone {
break
}
keysdone = true
func() {
server.mu.Lock()
defer server.mu.Unlock()
server.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
if len(keys) == maxkeys {
keysdone = false
nextkey = key
return false
}
keys = append(keys, key)
return true
})
}()
continue
}
var idsdone bool
var nextid string
for {
if idsdone {
keys = keys[1:]
break
}
// load more objects
func() {
idsdone = true
server.mu.Lock()
defer server.mu.Unlock()
col := server.getCol(keys[0])
if col == nil {
return
}
var fnames = col.FieldArr() // reload an array of field names to match each object
var exm = server.expires[keys[0]] // the expiration map
var now = time.Now() // used for expiration
var count = 0 // the object count
col.ScanGreaterOrEqual(nextid, false, nil,
func(id string, obj geojson.Object, fields *collection.Fields) bool {
if count == maxids {
// we reached the max number of ids for one batch
nextid = id
idsdone = false
return false
}
// here we fill the values array with a new command
values = values[:0]
values = append(values, "set")
values = append(values, keys[0])
values = append(values, id)
var fidx int
fields.ForEach(len(fnames), func(fvalue float64) bool {
if fvalue != 0 {
values = append(values, "field")
values = append(values, fnames[fidx])
values = append(values, strconv.FormatFloat(fvalue, 'f', -1, 64))
}
fidx++
return true
})
if exm != nil {
at, ok := exm[id]
if ok {
expires := at.Sub(now)
if expires > 0 {
values = append(values, "ex")
values = append(values, strconv.FormatFloat(math.Floor(float64(expires)/float64(time.Second)*10)/10, 'f', -1, 64))
}
}
}
if objIsSpatial(obj) {
values = append(values, "object")
values = append(values, string(obj.AppendJSON(nil)))
} else {
values = append(values, "string")
values = append(values, obj.String())
}
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
// increment the object count
count++
return true
},
)
}()
if len(aofbuf) > maxchunk {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
}
}
}
// load hooks
// first load the names of the hooks
var hnames []string
func() {
server.mu.Lock()
defer server.mu.Unlock()
for name := range server.hooks {
hnames = append(hnames, name)
}
}()
// sort the names for consistency
sort.Strings(hnames)
for _, name := range hnames {
func() {
server.mu.Lock()
defer server.mu.Unlock()
hook := server.hooks[name]
if hook == nil {
return
}
hook.cond.L.Lock()
defer hook.cond.L.Unlock()
var values []string
if hook.channel {
values = append(values, "setchan", name)
} else {
values = append(values, "sethook", name,
strings.Join(hook.Endpoints, ","))
values = append(values)
}
for _, meta := range hook.Metas {
values = append(values, "meta", meta.Name, meta.Value)
}
if !hook.expires.IsZero() {
ex := float64(hook.expires.Sub(time.Now())) /
float64(time.Second)
values = append(values, "ex",
strconv.FormatFloat(ex, 'f', 1, 64))
}
for _, value := range hook.Message.Args {
values = append(values, value)
}
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}()
}
if len(aofbuf) > 0 {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
}
if err := f.Sync(); err != nil {
return err
}
// finally grab any new data that may have been written since
// the aofshrink has started and swap out the files.
return func() error {
server.mu.Lock()
defer server.mu.Unlock()
// flush the aof buffer
server.flushAOF()
aofbuf = aofbuf[:0]
for _, values := range server.shrinklog {
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}
if _, err := f.Write(aofbuf); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
// we now have a shrunken aof file that is fully in-sync with
// the current dataset. let's swap out the on disk files and
// point to the new file.
// anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error
if err := server.aof.Close(); err != nil {
log.Fatalf("shrink live aof close fatal operation: %v", err)
}
if err := f.Close(); err != nil {
log.Fatalf("shrink new aof close fatal operation: %v", err)
}
if err := os.Rename(core.AppendFileName, core.AppendFileName+"-bak"); err != nil {
log.Fatalf("shrink backup fatal operation: %v", err)
}
if err := os.Rename(core.AppendFileName+"-shrink", core.AppendFileName); err != nil {
log.Fatalf("shrink rename fatal operation: %v", err)
}
server.aof, err = os.OpenFile(core.AppendFileName, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatalf("shrink openfile fatal operation: %v", err)
}
var n int64
n, err = server.aof.Seek(0, 2)
if err != nil {
log.Fatalf("shrink seek end fatal operation: %v", err)
}
server.aofsz = int(n)
os.Remove(core.AppendFileName + "-bak") // ignore error
// kill all followers connections
for conn := range server.aofconnM {
conn.Close()
}
return nil
}()
}()
if err != nil {
log.Errorf("aof shrink failed: %v", err)
return
}
}