diff --git a/controller/aof.go b/controller/aof.go index 7a337820..3f0805fb 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -173,6 +173,13 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { } } } + if c.shrinking { + var values []string + for _, value := range value.Array() { + values = append(values, value.String()) + } + c.shrinklog = append(c.shrinklog, values) + } data, err := value.MarshalRESP() if err != nil { return err diff --git a/controller/aofshrink.go b/controller/aofshrink.go index 45a13963..668ce204 100644 --- a/controller/aofshrink.go +++ b/controller/aofshrink.go @@ -1,258 +1,299 @@ package controller import ( - "bytes" - "errors" - "io" "math" "os" "path" "sort" + "strconv" "strings" "time" - "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/collection" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/geojson" ) -type objFields struct { - obj geojson.Object - fields []float64 - expires time.Duration -} +const maxkeys = 8 +const maxids = 32 +const maxchunk = 4 * 1024 * 1024 -const maxKeyGroup = 10 -const maxIDGroup = 10 - -// aofshrink shrinks the aof file to it's minimum size. -// There are some pauses but each pause should not take more that 100ms on a busy server. func (c *Controller) aofshrink() { start := time.Now() c.mu.Lock() - c.f.Sync() if c.shrinking { c.mu.Unlock() return } - f, err := os.Create(path.Join(c.dir, "shrink")) - if err != nil { - log.Errorf("aof shrink failed: %s\n", err.Error()) - return - } - defer func() { - f.Close() - //os.RemoveAll(rewritePath) - }() - var ferr error // stores the final error c.shrinking = true - c.currentShrinkStart = start - endpos := int64(c.aofsz) // 1) Log the aofsize at start. Locked + c.shrinklog = nil c.mu.Unlock() + defer func() { c.mu.Lock() - defer c.mu.Unlock() c.shrinking = false - c.lastShrinkDuration = time.Now().Sub(start) - c.currentShrinkStart = time.Time{} - defer func() { - if ferr != nil { - log.Errorf("aof shrink failed: %s\n", ferr.Error()) - } else { - log.Printf("aof shrink completed in %s", c.lastShrinkDuration) - } - }() - if ferr != nil { - return - } + c.shrinklog = nil + c.mu.Unlock() + log.Infof("aof shrink ended %v", time.Now().Sub(start)) + return + }() - of, err := os.Open(c.f.Name()) + err := func() error { + f, err := os.Create(path.Join(c.dir, "shrink")) if err != nil { - ferr = err - return + return err } - defer of.Close() - if _, err := of.Seek(endpos, 0); err != nil { - ferr = err - return - } - rd := resp.NewReader(of) + defer f.Close() + var aofbuf []byte + var values []string + var keys []string + var nextkey string + var keysdone bool for { - v, telnet, _, err := rd.ReadMultiBulk() - if err != nil { - if err == io.EOF { + if len(keys) == 0 { + // load more keys + if keysdone { break } - ferr = err - return + keysdone = true + func() { + c.mu.Lock() + defer c.mu.Unlock() + c.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool { + if len(keys) == maxkeys { + keysdone = false + nextkey = key + return false + } + keys = append(keys, key) + return true + }) + }() + continue } - if telnet { - ferr = errors.New("invalid RESP message") - return - } - data, err := v.MarshalRESP() - if err != nil { - ferr = err - return - } - if _, err := f.Write(data); err != nil { - ferr = err - return - } - break - } - of.Close() - // swap files - f.Close() - c.f.Close() - err = os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")) - if err != nil { - log.Fatal("shink rename fatal operation") - } - c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600) - if err != nil { - log.Fatal("shink openfile fatal operation") - } - var n int64 - n, err = c.f.Seek(0, 2) - if err != nil { - log.Fatal("shink seek end fatal operation") - } - c.aofsz = int(n) - // kill all followers connections - for conn := range c.aofconnM { - conn.Close() - } - }() - log.Infof("aof shrink started at pos %d", endpos) - // Ascend collections. Load maxKeyGroup at a time. - nextKey := "" - for { - cols := make(map[string]*collection.Collection) - c.mu.Lock() - c.scanGreaterOrEqual(nextKey, func(key string, col *collection.Collection) bool { - if key != nextKey { - cols[key] = col - nextKey = key - } - return len(cols) < maxKeyGroup - }) - c.mu.Unlock() - - keys := make([]string, 0, maxKeyGroup) - for key := range cols { - keys = append(keys, key) - } - sort.Strings(keys) - - for _, key := range keys { - col := cols[key] - // Ascend objects. Load maxIDGroup at a time. - nextID := "" + var idsdone bool + var nextid string for { - objs := make(map[string]objFields) - c.mu.Lock() - now := time.Now() - exm := c.expires[key] - fnames := col.FieldArr() // reload an array of field names to match each object - col.ScanGreaterOrEqual(nextID, 0, false, - func(id string, obj geojson.Object, fields []float64) bool { - if id != nextID { - o := objFields{obj: obj, fields: fields} + if idsdone { + keys = keys[1:] + break + } + + // load more objects + func() { + idsdone = true + c.mu.Lock() + defer c.mu.Unlock() + col := c.getCol(keys[0]) + if col == nil { + return + } + var fnames = col.FieldArr() // reload an array of field names to match each object + var exm = c.expires[keys[0]] // the expiration map + var now = time.Now() // used for expiration + var count = 0 // the object count + col.ScanGreaterOrEqual(nextid, 0, false, + func(id string, obj geojson.Object, fields []float64) bool { + if count == maxids { + // we reached the max number of ids for one batch + nextid = id + idsdone = false + return false + } + + // here we fill the values array with a new command + values = values[:0] + values = append(values, "set") + values = append(values, keys[0]) + values = append(values, id) + for i, fvalue := range fields { + if fvalue != 0 { + values = append(values, "field") + values = append(values, fnames[i]) + values = append(values, strconv.FormatFloat(fvalue, 'f', -1, 64)) + } + } if exm != nil { at, ok := exm[id] if ok { - o.expires = at.Sub(now) + expires := at.Sub(now) + if expires > 0 { + values = append(values, "ex") + values = append(values, strconv.FormatFloat(math.Floor(float64(expires)/float64(time.Second)*10)/10, 'f', -1, 64)) + } + } + } + switch obj := obj.(type) { + default: + if obj.IsGeometry() { + values = append(values, "object") + values = append(values, obj.JSON()) + } else { + values = append(values, "string") + values = append(values, obj.String()) + } + case geojson.SimplePoint: + values = append(values, "point") + values = append(values, strconv.FormatFloat(obj.Y, 'f', -1, 64)) + values = append(values, strconv.FormatFloat(obj.X, 'f', -1, 64)) + case geojson.Point: + if obj.Coordinates.Z == 0 { + values = append(values, "point") + values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64)) + values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64)) + values = append(values, strconv.FormatFloat(obj.Coordinates.Z, 'f', -1, 64)) + } else { + values = append(values, "point") + values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64)) + values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64)) } } - objs[id] = o - nextID = id - } - return len(objs) < maxIDGroup - }, - ) - c.mu.Unlock() - ids := make([]string, 0, maxIDGroup) - for id := range objs { - ids = append(ids, id) - } - sort.Strings(ids) + // append the values to the aof buffer + aofbuf = append(aofbuf, '*') + aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...) + aofbuf = append(aofbuf, '\r', '\n') + for _, value := range values { + aofbuf = append(aofbuf, '$') + aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...) + aofbuf = append(aofbuf, '\r', '\n') + aofbuf = append(aofbuf, value...) + aofbuf = append(aofbuf, '\r', '\n') + } - linebuf := &bytes.Buffer{} - for _, id := range ids { - obj := objs[id] - values := make([]resp.Value, 0, len(obj.fields)*3+16) - values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id)) - for i, fvalue := range obj.fields { - if fvalue != 0 { - values = append(values, resp.StringValue("field"), resp.StringValue(fnames[i]), resp.FloatValue(fvalue)) - } - } - if obj.expires > 0 { - values = append(values, resp.StringValue("ex"), resp.FloatValue(math.Floor(float64(obj.expires)/float64(time.Second)*10)/10)) - } - switch obj := obj.obj.(type) { - default: - if obj.IsGeometry() { - values = append(values, resp.StringValue("object"), resp.StringValue(obj.JSON())) - } else { - values = append(values, resp.StringValue("string"), resp.StringValue(obj.String())) - } - case geojson.SimplePoint: - values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Y), resp.FloatValue(obj.X)) - case geojson.Point: - if obj.Coordinates.Z == 0 { - values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Coordinates.Y), resp.FloatValue(obj.Coordinates.X)) - } else { - values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Coordinates.Y), resp.FloatValue(obj.Coordinates.X), resp.FloatValue(obj.Coordinates.Z)) - } - } - data, err := resp.ArrayValue(values).MarshalRESP() - if err != nil { - ferr = err - return - } - linebuf.Write(data) - } - if _, err := f.Write(linebuf.Bytes()); err != nil { - ferr = err - return - } - if len(objs) < maxIDGroup { - break + // increment the object count + count++ + return true + }, + ) + + }() + } + if len(aofbuf) > maxchunk { + if _, err := f.Write(aofbuf); err != nil { + return err } + aofbuf = aofbuf[:0] } } - if len(cols) < maxKeyGroup { - break - } - } - // load hooks - c.mu.Lock() - for name, hook := range c.hooks { - values := make([]resp.Value, 0, 3+len(hook.Message.Values)) - endpoints := make([]string, len(hook.Endpoints)) - for i, endpoint := range hook.Endpoints { - endpoints[i] = endpoint - } - values = append(values, resp.StringValue("sethook"), resp.StringValue(name), resp.StringValue(strings.Join(endpoints, ","))) - values = append(values, hook.Message.Values...) - data, err := resp.ArrayValue(values).MarshalRESP() - if err != nil { - c.mu.Unlock() - ferr = err - return - } - if _, err := f.Write(data); err != nil { - c.mu.Unlock() - ferr = err - return - } - } - c.mu.Unlock() + // load hooks + // first load the names of the hooks + var hnames []string + func() { + c.mu.Lock() + defer c.mu.Unlock() + for name := range c.hooks { + hnames = append(hnames, name) + } + }() + // sort the names for consistency + sort.Strings(hnames) + for _, name := range hnames { + func() { + c.mu.Lock() + defer c.mu.Unlock() + hook := c.hooks[name] + if hook == nil { + return + } + hook.mu.Lock() + defer hook.mu.Unlock() + var values []string + values = append(values, "sethook") + values = append(values, name) + values = append(values, strings.Join(hook.Endpoints, ",")) + for _, value := range hook.Message.Values { + values = append(values, value.String()) + } + + // append the values to the aof buffer + aofbuf = append(aofbuf, '*') + aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...) + aofbuf = append(aofbuf, '\r', '\n') + for _, value := range values { + aofbuf = append(aofbuf, '$') + aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...) + aofbuf = append(aofbuf, '\r', '\n') + aofbuf = append(aofbuf, value...) + aofbuf = append(aofbuf, '\r', '\n') + } + }() + } + if len(aofbuf) > 0 { + if _, err := f.Write(aofbuf); err != nil { + return err + } + aofbuf = aofbuf[:0] + } + if err := f.Sync(); err != nil { + return err + } + + // finally grab any new data that may have been written since + // the aofshrink has started and swap out the files. + return func() error { + c.mu.Lock() + defer c.mu.Unlock() + aofbuf = aofbuf[:0] + for _, values := range c.shrinklog { + // append the values to the aof buffer + aofbuf = append(aofbuf, '*') + aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...) + aofbuf = append(aofbuf, '\r', '\n') + for _, value := range values { + aofbuf = append(aofbuf, '$') + aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...) + aofbuf = append(aofbuf, '\r', '\n') + aofbuf = append(aofbuf, value...) + aofbuf = append(aofbuf, '\r', '\n') + } + } + if _, err := f.Write(aofbuf); err != nil { + return err + } + if err := f.Sync(); err != nil { + return err + } + // we now have a shrunken aof file that is fully in-sync with + // the current dataset. let's swap out the on disk files and + // point to the new file. + + // anything below this point is unrecoverable. just log and exit process + // back up the live aof, just in case of fatal error + if err := os.Rename(path.Join(c.dir, "appendonly.aof"), path.Join(c.dir, "appendonly.bak")); err != nil { + log.Fatalf("shink backup fatal operation: %v", err) + } + if err := os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")); err != nil { + log.Fatalf("shink rename fatal operation: %v", err) + } + if err := c.f.Close(); err != nil { + log.Fatalf("shink live aof close fatal operation: %v", err) + } + c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + log.Fatalf("shink openfile fatal operation: %v", err) + } + var n int64 + n, err = c.f.Seek(0, 2) + if err != nil { + log.Fatalf("shink seek end fatal operation: %v", err) + } + c.aofsz = int(n) + + os.Remove(path.Join(c.dir, "appendonly.bak")) // ignore error + + // kill all followers connections + for conn := range c.aofconnM { + conn.Close() + } + return nil + }() + }() + if err != nil { + log.Errorf("aof shrink failed: %v", err) + return + } } diff --git a/controller/controller.go b/controller/controller.go index 5ff3f087..b700f974 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -72,6 +72,7 @@ type Controller struct { lcond *sync.Cond fcup bool // follow caught up shrinking bool // aof shrinking flag + shrinklog [][]string // aof shrinking log hooks map[string]*Hook // hook name hookcols map[string]map[string]*Hook // col key aofconnM map[net.Conn]bool @@ -416,8 +417,15 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message, // this is local connection operation. Locks not needed. case "massinsert": // dev operation - // ** danger zone ** - // no locks! DEV MODE ONLY + c.mu.Lock() + defer c.mu.Unlock() + case "shutdown": + // dev operation + c.mu.Lock() + defer c.mu.Unlock() + case "aofshrink": + c.mu.RLock() + defer c.mu.RUnlock() } res, d, err := c.command(msg, w) @@ -489,6 +497,12 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co res, d, err = c.cmdTTL(msg) case "hooks": res, err = c.cmdHooks(msg) + case "shutdown": + if !core.DevMode { + err = fmt.Errorf("unknown command '%s'", msg.Values[0]) + return + } + log.Fatal("shutdown requested by developer") case "massinsert": if !core.DevMode { err = fmt.Errorf("unknown command '%s'", msg.Values[0]) diff --git a/controller/crud.go b/controller/crud.go index fb696365..5242cfcc 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -670,12 +670,15 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, d.oldObj, d.oldFields, d.fields = col.ReplaceOrInsert(d.id, d.obj, fields, values) d.command = "set" d.updated = true // perhaps we should do a diff on the previous object? - fmap = col.FieldMap() - d.fmap = make(map[string]int) - for key, idx := range fmap { - d.fmap[key] = idx - } d.timestamp = time.Now() + if msg.ConnType != server.Null || msg.OutputType != server.Null { + // likely loaded from aof at server startup, ignore field remapping. + fmap = col.FieldMap() + d.fmap = make(map[string]int) + for key, idx := range fmap { + d.fmap[key] = idx + } + } if ex != nil { c.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex)))) } diff --git a/controller/dev.go b/controller/dev.go index c9f53db8..6743eac2 100644 --- a/controller/dev.go +++ b/controller/dev.go @@ -2,10 +2,10 @@ package controller import ( "errors" + "fmt" "math/rand" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -79,14 +79,12 @@ func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error) return "", errInvalidArgument(snumPoints) } docmd := func(values []resp.Value) error { - c.mu.Lock() - defer c.mu.Unlock() nmsg := &server.Message{} *nmsg = *msg nmsg.Values = values nmsg.Command = strings.ToLower(values[0].String()) - - _, d, err := c.command(nmsg, nil) + var d commandDetailsT + _, d, err = c.command(nmsg, nil) if err != nil { return err } @@ -97,37 +95,38 @@ func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error) } rand.Seed(time.Now().UnixNano()) objs = int(n) - var wg sync.WaitGroup var k uint64 - wg.Add(cols) for i := 0; i < cols; i++ { key := "mi:" + strconv.FormatInt(int64(i), 10) - go func(key string) { - defer func() { - wg.Done() - }() - + func(key string) { + // lock cycle for j := 0; j < objs; j++ { id := strconv.FormatInt(int64(j), 10) - lat, lon := randMassInsertPosition(minLat, minLon, maxLat, maxLon) - values := make([]resp.Value, 0, 16) - values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id)) - if useRandField { - values = append(values, resp.StringValue("FIELD"), resp.StringValue("field"), resp.FloatValue(rand.Float64()*10)) + var values []resp.Value + if j%8 == 0 { + lat, lon := randMassInsertPosition(minLat, minLon, maxLat, maxLon) + values = make([]resp.Value, 0, 16) + values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id)) + if useRandField { + values = append(values, resp.StringValue("FIELD"), resp.StringValue("fname"), resp.FloatValue(rand.Float64()*10)) + } + values = append(values, resp.StringValue("POINT"), resp.FloatValue(lat), resp.FloatValue(lon)) + } else { + values = append(values, resp.StringValue("set"), + resp.StringValue(key), resp.StringValue(id), + resp.StringValue("STRING"), resp.StringValue(fmt.Sprintf("str%v", j))) } - values = append(values, resp.StringValue("POINT"), resp.FloatValue(lat), resp.FloatValue(lon)) if err := docmd(values); err != nil { log.Fatal(err) return } atomic.AddUint64(&k, 1) - if j%10000 == 10000-1 { + if j%1000 == 1000-1 { log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) } } }(key) } - wg.Wait() log.Infof("massinsert: done %d objects", atomic.LoadUint64(&k)) return server.OKMessage(msg, start), nil } diff --git a/controller/hooks.go b/controller/hooks.go index 115960a7..403b600e 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -93,7 +93,10 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai cmsg := &server.Message{} *cmsg = *msg - cmsg.Values = commandvs + cmsg.Values = make([]resp.Value, len(commandvs)) + for i := 0; i < len(commandvs); i++ { + cmsg.Values[i] = commandvs[i] + } cmsg.Command = strings.ToLower(cmsg.Values[0].String()) hook := &Hook{