This commit is contained in:
Josh Baker 2016-07-15 12:22:48 -07:00
parent a5069d5fe8
commit 7455c52cb5
4 changed files with 277 additions and 44 deletions

View File

@ -17,7 +17,6 @@ import (
) )
// AsyncHooks indicates that the hooks should happen in the background. // AsyncHooks indicates that the hooks should happen in the background.
const AsyncHooks = true
type errAOFHook struct { type errAOFHook struct {
err error err error
@ -128,16 +127,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
func (c *Controller) processHooks(d *commandDetailsT) error { func (c *Controller) processHooks(d *commandDetailsT) error {
if hm, ok := c.hookcols[d.key]; ok { if hm, ok := c.hookcols[d.key]; ok {
for _, hook := range hm { for _, hook := range hm {
if AsyncHooks {
go hook.Do(d) go hook.Do(d)
} else {
if err := hook.Do(d); err != nil {
if d.revert != nil {
d.revert()
}
return errAOFHook{err}
}
}
} }
} }
return nil return nil

View File

@ -41,7 +41,6 @@ type commandDetailsT struct {
oldObj geojson.Object oldObj geojson.Object
oldFields []float64 oldFields []float64
updated bool updated bool
revert func()
timestamp time.Time timestamp time.Time
} }
@ -70,7 +69,9 @@ type Controller struct {
hooks map[string]*Hook // hook name hooks map[string]*Hook // hook name
hookcols map[string]map[string]*Hook // col key hookcols map[string]map[string]*Hook // col key
aofconnM map[net.Conn]bool aofconnM map[net.Conn]bool
expires map[string]map[string]time.Time
stopBackgroundExpiring bool
stopWatchingMemory bool stopWatchingMemory bool
outOfMemory bool outOfMemory bool
} }
@ -90,6 +91,7 @@ func ListenAndServe(host string, port int, dir string) error {
hooks: make(map[string]*Hook), hooks: make(map[string]*Hook),
hookcols: make(map[string]map[string]*Hook), hookcols: make(map[string]map[string]*Hook),
aofconnM: make(map[net.Conn]bool), aofconnM: make(map[net.Conn]bool),
expires: make(map[string]map[string]time.Time),
} }
if err := os.MkdirAll(dir, 0700); err != nil { if err := os.MkdirAll(dir, 0700); err != nil {
return err return err
@ -120,8 +122,10 @@ func ListenAndServe(host string, port int, dir string) error {
}() }()
go c.processLives() go c.processLives()
go c.watchMemory() go c.watchMemory()
go c.backgroundExpiring()
defer func() { defer func() {
c.mu.Lock() c.mu.Lock()
c.stopBackgroundExpiring = true
c.stopWatchingMemory = true c.stopWatchingMemory = true
c.mu.Unlock() c.mu.Unlock()
}() }()
@ -318,7 +322,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message,
default: default:
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
case "set", "del", "drop", "fset", "flushdb", "sethook", "delhook": case "set", "del", "drop", "fset", "flushdb", "sethook", "delhook", "expire", "persist":
// write operations // write operations
write = true write = true
c.mu.Lock() c.mu.Lock()
@ -329,7 +333,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message,
if c.config.ReadOnly { if c.config.ReadOnly {
return writeErr(errors.New("read only")) return writeErr(errors.New("read only"))
} }
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search": case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search", "ttl":
// read operations // read operations
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -391,11 +395,9 @@ func (c *Controller) reset() {
} }
func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d commandDetailsT, err error) { func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d commandDetailsT, err error) {
switch msg.Command { switch msg.Command {
default: default:
err = fmt.Errorf("unknown command '%s'", msg.Values[0]) err = fmt.Errorf("unknown command '%s'", msg.Values[0])
// lock
case "set": case "set":
res, d, err = c.cmdSet(msg) res, d, err = c.cmdSet(msg)
case "fset": case "fset":
@ -410,6 +412,12 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co
res, d, err = c.cmdSetHook(msg) res, d, err = c.cmdSetHook(msg)
case "delhook": case "delhook":
res, d, err = c.cmdDelHook(msg) res, d, err = c.cmdDelHook(msg)
case "expire":
res, d, err = c.cmdExpire(msg)
case "persist":
res, d, err = c.cmdPersist(msg)
case "ttl":
res, d, err = c.cmdTTL(msg)
case "hooks": case "hooks":
res, err = c.cmdHooks(msg) res, err = c.cmdHooks(msg)
case "massinsert": case "massinsert":

View File

@ -231,18 +231,11 @@ func (c *Controller) cmdDel(msg *server.Message) (res string, d commandDetailsT,
if ok { if ok {
if col.Count() == 0 { if col.Count() == 0 {
c.deleteCol(d.key) c.deleteCol(d.key)
d.revert = func() {
c.setCol(d.key, col)
col.ReplaceOrInsert(d.id, d.obj, nil, d.fields)
}
} else {
d.revert = func() {
col.ReplaceOrInsert(d.id, d.obj, nil, d.fields)
}
} }
found = true found = true
} }
} }
c.clearIDExpires(d.key, d.id)
d.command = "del" d.command = "del"
d.updated = found d.updated = found
d.timestamp = time.Now() d.timestamp = time.Now()
@ -274,9 +267,6 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT
col := c.getCol(d.key) col := c.getCol(d.key)
if col != nil { if col != nil {
c.deleteCol(d.key) c.deleteCol(d.key)
d.revert = func() {
c.setCol(d.key, col)
}
d.updated = true d.updated = true
} else { } else {
d.key = "" // ignore the details d.key = "" // ignore the details
@ -284,6 +274,7 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT
} }
d.command = "drop" d.command = "drop"
d.timestamp = time.Now() d.timestamp = time.Now()
c.clearKeyExpires(d.key)
switch msg.OutputType { switch msg.OutputType {
case server.JSON: case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
@ -305,6 +296,7 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai
return return
} }
c.cols = btree.New(16, 0) c.cols = btree.New(16, 0)
c.clearAllExpires()
c.hooks = make(map[string]*Hook) c.hooks = make(map[string]*Hook)
c.hookcols = make(map[string]map[string]*Hook) c.hookcols = make(map[string]map[string]*Hook)
d.command = "flushdb" d.command = "flushdb"
@ -319,7 +311,10 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai
return return
} }
func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields []string, values []float64, etype string, evs []resp.Value, err error) { func (c *Controller) parseSetArgs(vs []resp.Value) (
d commandDetailsT, fields []string, values []float64,
expires *float64, etype string, evs []resp.Value, err error,
) {
var ok bool var ok bool
var typ string var typ string
if vs, d.key, ok = tokenval(vs); !ok || d.key == "" { if vs, d.key, ok = tokenval(vs); !ok || d.key == "" {
@ -330,7 +325,6 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields []
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
var arg string var arg string
var nvs []resp.Value var nvs []resp.Value
fields = make([]string, 0, 8) fields = make([]string, 0, 8)
@ -366,6 +360,26 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields []
values = append(values, value) values = append(values, value)
continue continue
} }
if lc(arg, "ex") {
vs = nvs
if expires != nil {
err = errInvalidArgument(arg)
return
}
var s string
var v float64
if vs, s, ok = tokenval(vs); !ok || s == "" {
err = errInvalidNumberOfArguments
return
}
v, err = strconv.ParseFloat(s, 64)
if err != nil {
err = errInvalidArgument(s)
return
}
expires = &v
continue
}
break break
} }
if vs, typ, ok = tokenval(vs); !ok || typ == "" { if vs, typ, ok = tokenval(vs); !ok || typ == "" {
@ -378,7 +392,6 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (d commandDetailsT, fields []
} }
etype = typ etype = typ
evs = vs evs = vs
switch { switch {
default: default:
err = errInvalidArgument(typ) err = errInvalidArgument(typ)
@ -525,27 +538,19 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT,
vs := msg.Values[1:] vs := msg.Values[1:]
var fields []string var fields []string
var values []float64 var values []float64
d, fields, values, _, _, err = c.parseSetArgs(vs) var ex *float64
d, fields, values, ex, _, _, err = c.parseSetArgs(vs)
if err != nil { if err != nil {
return return
} }
addedcol := false ex = ex
col := c.getCol(d.key) col := c.getCol(d.key)
if col == nil { if col == nil {
col = collection.New() col = collection.New()
c.setCol(d.key, col) c.setCol(d.key, col)
addedcol = true
} }
c.clearIDExpires(d.key, d.id)
d.oldObj, d.oldFields, d.fields = col.ReplaceOrInsert(d.id, d.obj, fields, values) d.oldObj, d.oldFields, d.fields = col.ReplaceOrInsert(d.id, d.obj, fields, values)
d.revert = func() {
if addedcol {
c.deleteCol(d.key)
} else if d.oldObj != nil {
col.ReplaceOrInsert(d.id, d.oldObj, nil, d.oldFields)
} else {
col.Remove(d.id)
}
}
d.command = "set" d.command = "set"
d.updated = true // perhaps we should do a diff on the previous object? d.updated = true // perhaps we should do a diff on the previous object?
fmap := col.FieldMap() fmap := col.FieldMap()
@ -554,6 +559,9 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT,
d.fmap[key] = idx d.fmap[key] = idx
} }
d.timestamp = time.Now() d.timestamp = time.Now()
if ex != nil {
c.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex))))
}
switch msg.OutputType { switch msg.OutputType {
case server.JSON: case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
@ -633,3 +641,140 @@ func (c *Controller) cmdFset(msg *server.Message) (res string, d commandDetailsT
} }
return return
} }
func (c *Controller) cmdExpire(msg *server.Message) (res string, d commandDetailsT, err error) {
start := time.Now()
vs := msg.Values[1:]
var key, id, svalue string
var ok bool
if vs, key, ok = tokenval(vs); !ok || key == "" {
err = errInvalidNumberOfArguments
return
}
if vs, id, ok = tokenval(vs); !ok || id == "" {
err = errInvalidNumberOfArguments
return
}
if vs, svalue, ok = tokenval(vs); !ok || svalue == "" {
err = errInvalidNumberOfArguments
return
}
if len(vs) != 0 {
err = errInvalidNumberOfArguments
return
}
var value float64
value, err = strconv.ParseFloat(svalue, 64)
if err != nil {
err = errInvalidArgument(svalue)
return
}
ok = false
col := c.getCol(key)
if col != nil {
_, _, ok = col.Get(id)
if ok {
c.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value)))
}
}
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:
if ok {
res = ":1\r\n"
} else {
res = ":0\r\n"
}
}
return
}
func (c *Controller) cmdPersist(msg *server.Message) (res string, d commandDetailsT, err error) {
start := time.Now()
vs := msg.Values[1:]
var key, id string
var ok bool
if vs, key, ok = tokenval(vs); !ok || key == "" {
err = errInvalidNumberOfArguments
return
}
if vs, id, ok = tokenval(vs); !ok || id == "" {
err = errInvalidNumberOfArguments
return
}
if len(vs) != 0 {
err = errInvalidNumberOfArguments
return
}
ok = false
col := c.getCol(key)
if col != nil {
_, _, ok = col.Get(id)
if ok {
c.clearIDExpires(key, id)
}
}
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:
if ok {
res = ":1\r\n"
} else {
res = ":0\r\n"
}
}
return
}
func (c *Controller) cmdTTL(msg *server.Message) (res string, d commandDetailsT, err error) {
start := time.Now()
vs := msg.Values[1:]
var key, id string
var ok bool
if vs, key, ok = tokenval(vs); !ok || key == "" {
err = errInvalidNumberOfArguments
return
}
if vs, id, ok = tokenval(vs); !ok || id == "" {
err = errInvalidNumberOfArguments
return
}
if len(vs) != 0 {
err = errInvalidNumberOfArguments
return
}
var v float64
ok = false
var ok2 bool
col := c.getCol(key)
if col != nil {
_, _, ok = col.Get(id)
if ok {
var at time.Time
at, ok2 = c.getExpires(key, id)
if ok2 {
v = float64(at.Sub(time.Now())) / float64(time.Second)
if v < 0 {
v = 0
}
}
}
}
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:
if ok {
if ok2 {
res = ":" + strconv.FormatFloat(v, 'f', 0, 64) + "\r\n"
} else {
res = ":-1\r\n"
}
} else {
res = ":-2\r\n"
}
}
return
}

90
controller/expire.go Normal file
View File

@ -0,0 +1,90 @@
package controller
import (
"time"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/controller/server"
)
// clearAllExpires removes all items that are marked at expires.
func (c *Controller) clearAllExpires() {
c.expires = make(map[string]map[string]time.Time)
}
// clearIDExpires will clear a single item from the expires list.
func (c *Controller) clearIDExpires(key, id string) {
m := c.expires[key]
if m == nil {
return
}
delete(m, id)
if len(m) == 0 {
delete(c.expires, key)
}
}
// clearKeyExpires will clear all items that are marked as expires from a single key.
func (c *Controller) clearKeyExpires(key string) {
delete(c.expires, key)
}
// expireAt will mark an item as expires at a specific time.
func (c *Controller) expireAt(key, id string, at time.Time) {
m := c.expires[key]
if m == nil {
m = make(map[string]time.Time)
c.expires[key] = m
}
m[id] = at
}
// getExpires will return the when the item expires.
func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) {
m := c.expires[key]
if m == nil {
ok = false
return
}
at, ok = m[id]
return
}
// backgroundExpiring watches for when items must expire from the database.
// It's runs through every item that has been marked as expires five times
// per second.
func (c *Controller) backgroundExpiring() {
for {
c.mu.Lock()
if c.stopBackgroundExpiring {
c.mu.Unlock()
return
}
// Only excute for leaders. Followers should ignore.
if c.config.FollowHost == "" {
now := time.Now()
for key, m := range c.expires {
for id, at := range m {
if now.After(at) {
// issue a DEL command
msg := &server.Message{}
msg.Values = resp.MultiBulkValue("del", key, id).Array()
msg.Command = "del"
_, d, err := c.cmdDel(msg)
if err != nil {
log.Fatal(err)
continue
}
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
log.Fatal(err)
continue
}
}
}
}
}
c.mu.Unlock()
time.Sleep(time.Second / 5)
}
}