diff --git a/controller/aof.go b/controller/aof.go index 79a220fb..33d21f0e 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -168,8 +168,18 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { } if c.config.FollowHost == "" { // process hooks, for leader only - if err := c.queueHooks(d); err != nil { - return err + if d.parent { + // process children only + for _, d := range d.children { + if err := c.queueHooks(d); err != nil { + return err + } + } + } else { + // process parent + if err := c.queueHooks(d); err != nil { + return err + } } } } @@ -198,7 +208,13 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { if d != nil { // write to live connection streams c.lcond.L.Lock() - c.lstack = append(c.lstack, d) + if d.parent { + for _, d := range d.children { + c.lstack = append(c.lstack, d) + } + } else { + c.lstack = append(c.lstack, d) + } c.lcond.Broadcast() c.lcond.L.Unlock() } diff --git a/controller/controller.go b/controller/controller.go index 9023f9d1..29f62fe2 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -47,6 +47,10 @@ type commandDetailsT struct { oldFields []float64 updated bool timestamp time.Time + + parent bool // when true, only children are forwarded + pattern string // PDEL key pattern + children []*commandDetailsT // for multi actions such as "PDEL" } func (col *collectionT) Less(item btree.Item, ctx interface{}) bool { @@ -392,7 +396,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message, c.mu.RLock() defer c.mu.RUnlock() case "set", "del", "drop", "fset", "flushdb", "sethook", "pdelhook", "delhook", - "expire", "persist", "jset": + "expire", "persist", "jset", "pdel": // write operations write = true c.mu.Lock() @@ -482,6 +486,8 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co res, d, err = c.cmdFset(msg) case "del": res, d, err = c.cmdDel(msg) + case "pdel": + res, d, err = c.cmdPdel(msg) case "drop": res, d, err = c.cmdDrop(msg) case "flushdb": diff --git a/controller/crud.go b/controller/crud.go index 5242cfcc..eb525e76 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -10,6 +10,7 @@ import ( "github.com/tidwall/btree" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/collection" + "github.com/tidwall/tile38/controller/glob" "github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/geojson" "github.com/tidwall/tile38/geojson/geohash" @@ -341,6 +342,81 @@ func (c *Controller) cmdDel(msg *server.Message) (res string, d commandDetailsT, return } +func (c *Controller) cmdPdel(msg *server.Message) (res string, d commandDetailsT, err error) { + start := time.Now() + vs := msg.Values[1:] + var ok bool + if vs, d.key, ok = tokenval(vs); !ok || d.key == "" { + err = errInvalidNumberOfArguments + return + } + if vs, d.pattern, ok = tokenval(vs); !ok || d.pattern == "" { + err = errInvalidNumberOfArguments + return + } + if len(vs) != 0 { + err = errInvalidNumberOfArguments + return + } + now := time.Now() + iter := func(id string, o geojson.Object, fields []float64) bool { + if match, _ := glob.Match(d.pattern, id); match { + d.children = append(d.children, &commandDetailsT{ + command: "del", + updated: true, + timestamp: now, + key: d.key, + id: id, + }) + } + return true + } + + col := c.getCol(d.key) + if col != nil { + g := glob.Parse(d.pattern, false) + if g.Limits[0] == "" && g.Limits[1] == "" { + col.Scan(0, false, iter) + } else { + col.ScanRange(0, g.Limits[0], g.Limits[1], false, iter) + } + var atLeastOneNotDeleted bool + for i, dc := range d.children { + dc.obj, dc.fields, ok = col.Remove(dc.id) + if !ok { + d.children[i].command = "?" + atLeastOneNotDeleted = true + } else { + d.children[i] = dc + } + c.clearIDExpires(d.key, dc.id) + } + if atLeastOneNotDeleted { + var nchildren []*commandDetailsT + for _, dc := range d.children { + if dc.command == "del" { + nchildren = append(nchildren, dc) + } + } + d.children = nchildren + } + if col.Count() == 0 { + c.deleteCol(d.key) + } + } + d.command = "pdel" + d.updated = len(d.children) > 0 + d.timestamp = now + d.parent = true + switch msg.OutputType { + case server.JSON: + res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + case server.RESP: + res = ":" + strconv.FormatInt(int64(len(d.children)), 10) + "\r\n" + } + return +} + func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT, err error) { start := time.Now() vs := msg.Values[1:] diff --git a/core/commands.json b/core/commands.json index e12fa6e4..f622252f 100644 --- a/core/commands.json +++ b/core/commands.json @@ -508,6 +508,12 @@ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "type", "optional": true, @@ -638,6 +644,12 @@ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "type", "optional": true, @@ -812,6 +824,12 @@ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "type", "optional": true, @@ -1104,6 +1122,12 @@ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "param", "type": "string", @@ -1142,5 +1166,19 @@ } ], "group": "webhook" + }, + "PDEL": { + "summary": "Removes all objects matching a pattern", + "arguments":[ + { + "name": "key", + "type": "string" + }, + { + "name": "pattern", + "type": "pattern" + } + ], + "group": "keys" } } diff --git a/core/commands_gen.go b/core/commands_gen.go index ba4e90de..0ac996f0 100644 --- a/core/commands_gen.go +++ b/core/commands_gen.go @@ -670,6 +670,12 @@ var commandsJSON = `{ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "type", "optional": true, @@ -800,6 +806,12 @@ var commandsJSON = `{ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "type", "optional": true, @@ -974,6 +986,12 @@ var commandsJSON = `{ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "type", "optional": true, @@ -1266,6 +1284,12 @@ var commandsJSON = `{ "type": ["string"], "optional": true }, + { + "command": "COMMAND", + "name": ["which"], + "type": ["string"], + "optional": true + }, { "name": "param", "type": "string", @@ -1304,5 +1328,19 @@ var commandsJSON = `{ } ], "group": "webhook" + }, + "PDEL": { + "summary": "Removes all objects matching a pattern", + "arguments":[ + { + "name": "key", + "type": "string" + }, + { + "name": "pattern", + "type": "pattern" + } + ], + "group": "keys" } }`