From 7455c52cb512ed166f28ed796b8685d1c5ffa5b9 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Fri, 15 Jul 2016 12:22:48 -0700 Subject: [PATCH] expires --- controller/aof.go | 12 +-- controller/controller.go | 22 +++-- controller/crud.go | 197 +++++++++++++++++++++++++++++++++------ controller/expire.go | 90 ++++++++++++++++++ 4 files changed, 277 insertions(+), 44 deletions(-) create mode 100644 controller/expire.go diff --git a/controller/aof.go b/controller/aof.go index e187562b..5f7361ba 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -17,7 +17,6 @@ import ( ) // AsyncHooks indicates that the hooks should happen in the background. -const AsyncHooks = true type errAOFHook struct { err error @@ -128,16 +127,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { func (c *Controller) processHooks(d *commandDetailsT) error { if hm, ok := c.hookcols[d.key]; ok { for _, hook := range hm { - if AsyncHooks { - go hook.Do(d) - } else { - if err := hook.Do(d); err != nil { - if d.revert != nil { - d.revert() - } - return errAOFHook{err} - } - } + go hook.Do(d) } } return nil diff --git a/controller/controller.go b/controller/controller.go index fe26142c..3d5a789f 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -41,7 +41,6 @@ type commandDetailsT struct { oldObj geojson.Object oldFields []float64 updated bool - revert func() timestamp time.Time } @@ -70,9 +69,11 @@ type Controller struct { hooks map[string]*Hook // hook name hookcols map[string]map[string]*Hook // col key aofconnM map[net.Conn]bool + expires map[string]map[string]time.Time - stopWatchingMemory bool - outOfMemory bool + stopBackgroundExpiring bool + stopWatchingMemory bool + outOfMemory bool } // ListenAndServe starts a new tile38 server @@ -90,6 +91,7 @@ func ListenAndServe(host string, port int, dir string) error { hooks: make(map[string]*Hook), hookcols: make(map[string]map[string]*Hook), aofconnM: make(map[net.Conn]bool), + expires: make(map[string]map[string]time.Time), } if err := os.MkdirAll(dir, 0700); err != nil { return err @@ -120,8 +122,10 @@ func ListenAndServe(host string, port int, dir string) error { }() go c.processLives() go c.watchMemory() + go c.backgroundExpiring() defer func() { c.mu.Lock() + c.stopBackgroundExpiring = true c.stopWatchingMemory = true c.mu.Unlock() }() @@ -318,7 +322,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message, default: c.mu.RLock() defer c.mu.RUnlock() - case "set", "del", "drop", "fset", "flushdb", "sethook", "delhook": + case "set", "del", "drop", "fset", "flushdb", "sethook", "delhook", "expire", "persist": // write operations write = true c.mu.Lock() @@ -329,7 +333,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message, if c.config.ReadOnly { return writeErr(errors.New("read only")) } - case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search": + case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search", "ttl": // read operations c.mu.RLock() defer c.mu.RUnlock() @@ -391,11 +395,9 @@ func (c *Controller) reset() { } func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d commandDetailsT, err error) { - switch msg.Command { default: err = fmt.Errorf("unknown command '%s'", msg.Values[0]) - // lock case "set": res, d, err = c.cmdSet(msg) case "fset": @@ -410,6 +412,12 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co res, d, err = c.cmdSetHook(msg) case "delhook": res, d, err = c.cmdDelHook(msg) + case "expire": + res, d, err = c.cmdExpire(msg) + case "persist": + res, d, err = c.cmdPersist(msg) + case "ttl": + res, d, err = c.cmdTTL(msg) case "hooks": res, err = c.cmdHooks(msg) case "massinsert": diff --git a/controller/crud.go b/controller/crud.go index 28349593..e3ba9fa1 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -231,18 +231,11 @@ func (c *Controller) cmdDel(msg *server.Message) (res string, d commandDetailsT, if ok { if col.Count() == 0 { c.deleteCol(d.key) - d.revert = func() { - c.setCol(d.key, col) - col.ReplaceOrInsert(d.id, d.obj, nil, d.fields) - } - } else { - d.revert = func() { - col.ReplaceOrInsert(d.id, d.obj, nil, d.fields) - } } found = true } } + c.clearIDExpires(d.key, d.id) d.command = "del" d.updated = found d.timestamp = time.Now() @@ -274,9 +267,6 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT col := c.getCol(d.key) if col != nil { c.deleteCol(d.key) - d.revert = func() { - c.setCol(d.key, col) - } d.updated = true } else { d.key = "" // ignore the details @@ -284,6 +274,7 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT } d.command = "drop" d.timestamp = time.Now() + c.clearKeyExpires(d.key) switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" @@ -305,6 +296,7 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai return } c.cols = btree.New(16, 0) + c.clearAllExpires() c.hooks = make(map[string]*Hook) c.hookcols = make(map[string]map[string]*Hook) d.command = "flushdb" @@ -319,7 +311,10 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai return } -func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields []string, values []float64, etype string, evs []resp.Value, err error) { +func (c *Controller) parseSetArgs(vs []resp.Value) ( + d commandDetailsT, fields []string, values []float64, + expires *float64, etype string, evs []resp.Value, err error, +) { var ok bool var typ string if vs, d.key, ok = tokenval(vs); !ok || d.key == "" { @@ -330,7 +325,6 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields [] err = errInvalidNumberOfArguments return } - var arg string var nvs []resp.Value fields = make([]string, 0, 8) @@ -366,6 +360,26 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields [] values = append(values, value) continue } + if lc(arg, "ex") { + vs = nvs + if expires != nil { + err = errInvalidArgument(arg) + return + } + var s string + var v float64 + if vs, s, ok = tokenval(vs); !ok || s == "" { + err = errInvalidNumberOfArguments + return + } + v, err = strconv.ParseFloat(s, 64) + if err != nil { + err = errInvalidArgument(s) + return + } + expires = &v + continue + } break } if vs, typ, ok = tokenval(vs); !ok || typ == "" { @@ -378,7 +392,6 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields [] } etype = typ evs = vs - switch { default: err = errInvalidArgument(typ) @@ -525,27 +538,19 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, vs := msg.Values[1:] var fields []string var values []float64 - d, fields, values, _, _, err = c.parseSetArgs(vs) + var ex *float64 + d, fields, values, ex, _, _, err = c.parseSetArgs(vs) if err != nil { return } - addedcol := false + ex = ex col := c.getCol(d.key) if col == nil { col = collection.New() c.setCol(d.key, col) - addedcol = true } + c.clearIDExpires(d.key, d.id) d.oldObj, d.oldFields, d.fields = col.ReplaceOrInsert(d.id, d.obj, fields, values) - d.revert = func() { - if addedcol { - c.deleteCol(d.key) - } else if d.oldObj != nil { - col.ReplaceOrInsert(d.id, d.oldObj, nil, d.oldFields) - } else { - col.Remove(d.id) - } - } d.command = "set" d.updated = true // perhaps we should do a diff on the previous object? fmap := col.FieldMap() @@ -554,6 +559,9 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, d.fmap[key] = idx } d.timestamp = time.Now() + if ex != nil { + c.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex)))) + } switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" @@ -633,3 +641,140 @@ func (c *Controller) cmdFset(msg *server.Message) (res string, d commandDetailsT } return } + +func (c *Controller) cmdExpire(msg *server.Message) (res string, d commandDetailsT, err error) { + start := time.Now() + vs := msg.Values[1:] + var key, id, svalue string + var ok bool + if vs, key, ok = tokenval(vs); !ok || key == "" { + err = errInvalidNumberOfArguments + return + } + if vs, id, ok = tokenval(vs); !ok || id == "" { + err = errInvalidNumberOfArguments + return + } + if vs, svalue, ok = tokenval(vs); !ok || svalue == "" { + err = errInvalidNumberOfArguments + return + } + if len(vs) != 0 { + err = errInvalidNumberOfArguments + return + } + var value float64 + value, err = strconv.ParseFloat(svalue, 64) + if err != nil { + err = errInvalidArgument(svalue) + return + } + ok = false + col := c.getCol(key) + if col != nil { + _, _, ok = col.Get(id) + if ok { + c.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value))) + } + } + switch msg.OutputType { + case server.JSON: + res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + case server.RESP: + if ok { + res = ":1\r\n" + } else { + res = ":0\r\n" + } + } + return +} + +func (c *Controller) cmdPersist(msg *server.Message) (res string, d commandDetailsT, err error) { + start := time.Now() + vs := msg.Values[1:] + var key, id string + var ok bool + if vs, key, ok = tokenval(vs); !ok || key == "" { + err = errInvalidNumberOfArguments + return + } + if vs, id, ok = tokenval(vs); !ok || id == "" { + err = errInvalidNumberOfArguments + return + } + if len(vs) != 0 { + err = errInvalidNumberOfArguments + return + } + ok = false + col := c.getCol(key) + if col != nil { + _, _, ok = col.Get(id) + if ok { + c.clearIDExpires(key, id) + } + } + switch msg.OutputType { + case server.JSON: + res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + case server.RESP: + if ok { + res = ":1\r\n" + } else { + res = ":0\r\n" + } + } + return +} + +func (c *Controller) cmdTTL(msg *server.Message) (res string, d commandDetailsT, err error) { + start := time.Now() + vs := msg.Values[1:] + var key, id string + var ok bool + if vs, key, ok = tokenval(vs); !ok || key == "" { + err = errInvalidNumberOfArguments + return + } + if vs, id, ok = tokenval(vs); !ok || id == "" { + err = errInvalidNumberOfArguments + return + } + if len(vs) != 0 { + err = errInvalidNumberOfArguments + return + } + var v float64 + ok = false + var ok2 bool + col := c.getCol(key) + if col != nil { + _, _, ok = col.Get(id) + if ok { + var at time.Time + at, ok2 = c.getExpires(key, id) + if ok2 { + v = float64(at.Sub(time.Now())) / float64(time.Second) + if v < 0 { + v = 0 + } + } + } + } + switch msg.OutputType { + case server.JSON: + res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + case server.RESP: + if ok { + if ok2 { + res = ":" + strconv.FormatFloat(v, 'f', 0, 64) + "\r\n" + } else { + res = ":-1\r\n" + } + } else { + res = ":-2\r\n" + } + } + return +} diff --git a/controller/expire.go b/controller/expire.go new file mode 100644 index 00000000..d1fb3f63 --- /dev/null +++ b/controller/expire.go @@ -0,0 +1,90 @@ +package controller + +import ( + "time" + + "github.com/tidwall/resp" + "github.com/tidwall/tile38/controller/log" + "github.com/tidwall/tile38/controller/server" +) + +// clearAllExpires removes all items that are marked at expires. +func (c *Controller) clearAllExpires() { + c.expires = make(map[string]map[string]time.Time) +} + +// clearIDExpires will clear a single item from the expires list. +func (c *Controller) clearIDExpires(key, id string) { + m := c.expires[key] + if m == nil { + return + } + delete(m, id) + if len(m) == 0 { + delete(c.expires, key) + } +} + +// clearKeyExpires will clear all items that are marked as expires from a single key. +func (c *Controller) clearKeyExpires(key string) { + delete(c.expires, key) +} + +// expireAt will mark an item as expires at a specific time. +func (c *Controller) expireAt(key, id string, at time.Time) { + m := c.expires[key] + if m == nil { + m = make(map[string]time.Time) + c.expires[key] = m + } + m[id] = at +} + +// getExpires will return the when the item expires. +func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) { + m := c.expires[key] + if m == nil { + ok = false + return + } + at, ok = m[id] + return +} + +// backgroundExpiring watches for when items must expire from the database. +// It's runs through every item that has been marked as expires five times +// per second. +func (c *Controller) backgroundExpiring() { + for { + c.mu.Lock() + if c.stopBackgroundExpiring { + c.mu.Unlock() + return + } + // Only excute for leaders. Followers should ignore. + if c.config.FollowHost == "" { + now := time.Now() + for key, m := range c.expires { + for id, at := range m { + if now.After(at) { + // issue a DEL command + msg := &server.Message{} + msg.Values = resp.MultiBulkValue("del", key, id).Array() + msg.Command = "del" + _, d, err := c.cmdDel(msg) + if err != nil { + log.Fatal(err) + continue + } + if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil { + log.Fatal(err) + continue + } + } + } + } + } + c.mu.Unlock() + time.Sleep(time.Second / 5) + } +}