tile38/internal/server/aofshrink.go

305 lines
7.9 KiB
Go
Raw Permalink Normal View History

package server
2016-03-30 19:32:02 +03:00
import (
2016-07-15 23:01:15 +03:00
"math"
2016-03-30 19:32:02 +03:00
"os"
"strconv"
2016-03-30 19:32:02 +03:00
"strings"
"time"
"github.com/tidwall/btree"
"github.com/tidwall/tile38/internal/collection"
2022-09-20 03:47:38 +03:00
"github.com/tidwall/tile38/internal/field"
"github.com/tidwall/tile38/internal/log"
2022-09-21 00:20:53 +03:00
"github.com/tidwall/tile38/internal/object"
2016-03-30 19:32:02 +03:00
)
const maxkeys = 8
const maxids = 32
const maxchunk = 4 * 1024 * 1024
2016-03-30 19:32:02 +03:00
2021-12-09 19:24:26 +03:00
func (s *Server) aofshrink() {
2016-03-30 19:32:02 +03:00
start := time.Now()
2021-12-09 19:24:26 +03:00
s.mu.Lock()
2022-09-27 02:43:55 +03:00
if s.aof == nil || s.shrinking {
2021-12-09 19:24:26 +03:00
s.mu.Unlock()
2016-03-30 19:32:02 +03:00
return
}
2021-12-09 19:24:26 +03:00
s.shrinking = true
s.shrinklog = nil
s.mu.Unlock()
2016-03-30 19:32:02 +03:00
defer func() {
2021-12-09 19:24:26 +03:00
s.mu.Lock()
s.shrinking = false
s.shrinklog = nil
s.mu.Unlock()
log.Infof("aof shrink ended %v", time.Since(start))
}()
2016-03-30 19:32:02 +03:00
err := func() error {
f, err := os.Create(s.opts.AppendFileName + "-shrink")
2016-03-30 19:32:02 +03:00
if err != nil {
return err
2016-03-30 19:32:02 +03:00
}
defer f.Close()
var aofbuf []byte
var values []string
var keys []string
var nextkey string
var keysdone bool
2016-03-30 19:32:02 +03:00
for {
if len(keys) == 0 {
// load more keys
if keysdone {
2016-03-30 19:32:02 +03:00
break
}
keysdone = true
func() {
2021-12-09 19:24:26 +03:00
s.mu.Lock()
defer s.mu.Unlock()
s.cols.Ascend(nextkey,
func(key string, col *collection.Collection) bool {
if len(keys) == maxkeys {
keysdone = false
nextkey = key
return false
}
keys = append(keys, key)
return true
},
)
}()
continue
2016-03-30 19:32:02 +03:00
}
var idsdone bool
var nextid string
for {
if idsdone {
keys = keys[1:]
break
}
2016-03-30 19:32:02 +03:00
// load more objects
func() {
idsdone = true
2021-12-09 19:24:26 +03:00
s.mu.Lock()
defer s.mu.Unlock()
col, ok := s.cols.Get(keys[0])
if !ok {
return
}
2019-11-17 17:25:25 +03:00
var now = time.Now().UnixNano() // used for expiration
var count = 0 // the object count
2019-04-24 15:09:41 +03:00
col.ScanGreaterOrEqual(nextid, false, nil, nil,
2022-09-21 00:20:53 +03:00
func(o *object.Object) bool {
if count == maxids {
// we reached the max number of ids for one batch
2022-09-21 00:20:53 +03:00
nextid = o.ID()
idsdone = false
return false
}
// here we fill the values array with a new command
values = values[:0]
values = append(values, "set")
values = append(values, keys[0])
2022-09-21 00:20:53 +03:00
values = append(values, o.ID())
o.Fields().Scan(func(f field.Field) bool {
2022-09-20 03:47:38 +03:00
if !f.Value().IsZero() {
values = append(values, "field")
values = append(values, f.Name())
values = append(values, f.Value().JSON())
}
2022-09-20 03:47:38 +03:00
return true
})
2022-09-21 00:20:53 +03:00
if o.Expires() != 0 {
ttl := math.Floor(float64(o.Expires()-now)/float64(time.Second)*10) / 10
if ttl < 0.1 {
// always leave a little bit of ttl.
ttl = 0.1
}
values = append(values, "ex")
values = append(values, strconv.FormatFloat(ttl, 'f', -1, 64))
}
2022-09-21 00:20:53 +03:00
if objIsSpatial(o.Geo()) {
values = append(values, "object")
2022-09-21 00:20:53 +03:00
values = append(values, string(o.Geo().AppendJSON(nil)))
} else {
values = append(values, "string")
2022-09-21 00:20:53 +03:00
values = append(values, o.Geo().String())
2016-07-15 23:01:15 +03:00
}
2016-03-30 19:32:02 +03:00
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
2016-03-30 19:32:02 +03:00
// increment the object count
count++
return true
},
)
}()
2017-12-12 03:12:15 +03:00
if len(aofbuf) > maxchunk {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
2016-03-30 19:32:02 +03:00
}
}
}
// load hooks
// first load the names of the hooks
var hnames []string
func() {
2021-12-09 19:24:26 +03:00
s.mu.Lock()
defer s.mu.Unlock()
hnames = make([]string, 0, s.hooks.Len())
s.hooks.Walk(func(v []interface{}) {
for _, v := range v {
hnames = append(hnames, v.(*Hook).Name)
}
})
}()
var hookHint btree.PathHint
for _, name := range hnames {
func() {
2021-12-09 19:24:26 +03:00
s.mu.Lock()
defer s.mu.Unlock()
hook, _ := s.hooks.GetHint(&Hook{Name: name}, &hookHint).(*Hook)
if hook == nil {
2016-03-30 19:32:02 +03:00
return
}
2018-08-14 03:05:30 +03:00
hook.cond.L.Lock()
defer hook.cond.L.Unlock()
var values []string
2018-08-14 03:05:30 +03:00
if hook.channel {
values = append(values, "setchan", name)
} else {
values = append(values, "sethook", name,
strings.Join(hook.Endpoints, ","))
}
for _, meta := range hook.Metas {
values = append(values, "meta", meta.Name, meta.Value)
}
2018-08-14 06:27:22 +03:00
if !hook.expires.IsZero() {
ex := float64(time.Until(hook.expires)) / float64(time.Second)
2018-08-14 06:27:22 +03:00
values = append(values, "ex",
strconv.FormatFloat(ex, 'f', 1, 64))
}
values = append(values, hook.Message.Args...)
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}()
2016-03-30 19:32:02 +03:00
}
if len(aofbuf) > 0 {
if _, err := f.Write(aofbuf); err != nil {
return err
}
aofbuf = aofbuf[:0]
2016-03-30 19:32:02 +03:00
}
if err := f.Sync(); err != nil {
return err
2016-03-30 19:32:02 +03:00
}
// finally grab any new data that may have been written since
// the aofshrink has started and swap out the files.
return func() error {
2021-12-09 19:24:26 +03:00
s.mu.Lock()
defer s.mu.Unlock()
// kill all followers connections and close their files. This
// ensures that there is only one opened AOF at a time which is
// what Windows requires in order to perform the Rename function
// below.
2021-12-09 19:24:26 +03:00
for conn, f := range s.aofconnM {
conn.Close()
f.Close()
}
// send a broadcast to all sleeping followers
2021-12-09 19:24:26 +03:00
s.fcond.Broadcast()
// flush the aof buffer
2021-12-09 19:24:26 +03:00
s.flushAOF(false)
aofbuf = aofbuf[:0]
2021-12-09 19:24:26 +03:00
for _, values := range s.shrinklog {
// append the values to the aof buffer
aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
for _, value := range values {
aofbuf = append(aofbuf, '$')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
aofbuf = append(aofbuf, '\r', '\n')
aofbuf = append(aofbuf, value...)
aofbuf = append(aofbuf, '\r', '\n')
}
}
if _, err := f.Write(aofbuf); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
// we now have a shrunken aof file that is fully in-sync with
// the current dataset. let's swap out the on disk files and
// point to the new file.
// anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error
2021-12-09 19:24:26 +03:00
if err := s.aof.Close(); err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink live aof close fatal operation: %v", err)
}
if err := f.Close(); err != nil {
log.Fatalf("shrink new aof close fatal operation: %v", err)
}
if err := os.Rename(s.opts.AppendFileName, s.opts.AppendFileName+"-bak"); err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink backup fatal operation: %v", err)
}
if err := os.Rename(s.opts.AppendFileName+"-shrink", s.opts.AppendFileName); err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink rename fatal operation: %v", err)
}
s.aof, err = os.OpenFile(s.opts.AppendFileName, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink openfile fatal operation: %v", err)
}
var n int64
2021-12-09 19:24:26 +03:00
n, err = s.aof.Seek(0, 2)
if err != nil {
2017-12-13 00:05:22 +03:00
log.Fatalf("shrink seek end fatal operation: %v", err)
}
2021-12-09 19:24:26 +03:00
s.aofsz = int(n)
os.Remove(s.opts.AppendFileName + "-bak") // ignore error
return nil
}()
}()
if err != nil {
log.Errorf("aof shrink failed: %v", err)
return
}
2016-03-30 19:32:02 +03:00
}