mirror of https://github.com/tidwall/tile38.git
Merge pull request #442 from rshura/query-timeout
Add per-query timeouts.
This commit is contained in:
commit
5702789c2c
|
@ -359,6 +359,12 @@
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"optional": true
|
"optional": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"command": "TIMEOUT",
|
||||||
|
"name": "seconds",
|
||||||
|
"type": "double",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"command": "MATCH",
|
"command": "MATCH",
|
||||||
"name": "pattern",
|
"name": "pattern",
|
||||||
|
@ -450,6 +456,12 @@
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"optional": true
|
"optional": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"command": "TIMEOUT",
|
||||||
|
"name": "seconds",
|
||||||
|
"type": "double",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"command": "MATCH",
|
"command": "MATCH",
|
||||||
"name": "pattern",
|
"name": "pattern",
|
||||||
|
@ -559,6 +571,12 @@
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"optional": true
|
"optional": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"command": "TIMEOUT",
|
||||||
|
"name": "seconds",
|
||||||
|
"type": "double",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"command": "SPARSE",
|
"command": "SPARSE",
|
||||||
"name": "spread",
|
"name": "spread",
|
||||||
|
@ -725,6 +743,12 @@
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"optional": true
|
"optional": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"command": "TIMEOUT",
|
||||||
|
"name": "seconds",
|
||||||
|
"type": "double",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"command": "SPARSE",
|
"command": "SPARSE",
|
||||||
"name": "spread",
|
"name": "spread",
|
||||||
|
@ -946,6 +970,12 @@
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"optional": true
|
"optional": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"command": "TIMEOUT",
|
||||||
|
"name": "seconds",
|
||||||
|
"type": "double",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"command": "SPARSE",
|
"command": "SPARSE",
|
||||||
"name": "spread",
|
"name": "spread",
|
||||||
|
@ -1299,6 +1329,17 @@
|
||||||
],
|
],
|
||||||
"group": "connection"
|
"group": "connection"
|
||||||
},
|
},
|
||||||
|
"TIMEOUT": {
|
||||||
|
"summary": "Gets or sets the query timeout for the current connection.",
|
||||||
|
"arguments": [
|
||||||
|
{
|
||||||
|
"name": "seconds",
|
||||||
|
"optional": true,
|
||||||
|
"type": "double"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"group": "connection"
|
||||||
|
},
|
||||||
"SETHOOK": {
|
"SETHOOK": {
|
||||||
"summary": "Creates a webhook which points to geofenced search",
|
"summary": "Creates a webhook which points to geofenced search",
|
||||||
"arguments": [
|
"arguments": [
|
||||||
|
|
|
@ -13,6 +13,15 @@ func New(deadline time.Time) *Deadline {
|
||||||
return &Deadline{unixNano: deadline.UnixNano()}
|
return &Deadline{unixNano: deadline.UnixNano()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Empty() *Deadline {
|
||||||
|
return &Deadline{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the deadline from a given time object
|
||||||
|
func (deadline *Deadline) Update(newDeadline time.Time) {
|
||||||
|
deadline.unixNano = newDeadline.UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
// Check the deadline and panic when reached
|
// Check the deadline and panic when reached
|
||||||
//go:noinline
|
//go:noinline
|
||||||
func (deadline *Deadline) Check() {
|
func (deadline *Deadline) Check() {
|
||||||
|
|
|
@ -55,6 +55,9 @@ func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
||||||
wr.WriteString(`{"ok":true`)
|
wr.WriteString(`{"ok":true`)
|
||||||
}
|
}
|
||||||
sw.writeHead()
|
sw.writeHead()
|
||||||
|
if s.timeout != 0 {
|
||||||
|
msg.Deadline.Update(start.Add(s.timeout))
|
||||||
|
}
|
||||||
if sw.col != nil {
|
if sw.col != nil {
|
||||||
if sw.output == outputCount && len(sw.wheres) == 0 &&
|
if sw.output == outputCount && len(sw.wheres) == 0 &&
|
||||||
len(sw.whereins) == 0 && sw.globEverything == true {
|
len(sw.whereins) == 0 && sw.globEverything == true {
|
||||||
|
|
|
@ -370,6 +370,9 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) {
|
||||||
wr.WriteString(`{"ok":true`)
|
wr.WriteString(`{"ok":true`)
|
||||||
}
|
}
|
||||||
sw.writeHead()
|
sw.writeHead()
|
||||||
|
if s.timeout != 0 {
|
||||||
|
msg.Deadline.Update(start.Add(s.timeout))
|
||||||
|
}
|
||||||
if sw.col != nil {
|
if sw.col != nil {
|
||||||
iter := func(id string, o geojson.Object, fields []float64, dist float64) bool {
|
iter := func(id string, o geojson.Object, fields []float64, dist float64) bool {
|
||||||
meters := 0.0
|
meters := 0.0
|
||||||
|
@ -480,6 +483,9 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.
|
||||||
wr.WriteString(`{"ok":true`)
|
wr.WriteString(`{"ok":true`)
|
||||||
}
|
}
|
||||||
sw.writeHead()
|
sw.writeHead()
|
||||||
|
if s.timeout != 0 {
|
||||||
|
msg.Deadline.Update(start.Add(s.timeout))
|
||||||
|
}
|
||||||
if sw.col != nil {
|
if sw.col != nil {
|
||||||
if cmd == "within" {
|
if cmd == "within" {
|
||||||
sw.col.Within(s.obj, s.sparse, sw, msg.Deadline, func(
|
sw.col.Within(s.obj, s.sparse, sw, msg.Deadline, func(
|
||||||
|
@ -570,6 +576,9 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) {
|
||||||
wr.WriteString(`{"ok":true`)
|
wr.WriteString(`{"ok":true`)
|
||||||
}
|
}
|
||||||
sw.writeHead()
|
sw.writeHead()
|
||||||
|
if s.timeout != 0 {
|
||||||
|
msg.Deadline.Update(start.Add(s.timeout))
|
||||||
|
}
|
||||||
if sw.col != nil {
|
if sw.col != nil {
|
||||||
if sw.output == outputCount && len(sw.wheres) == 0 && sw.globEverything == true {
|
if sw.output == outputCount && len(sw.wheres) == 0 && sw.globEverything == true {
|
||||||
count := sw.col.Count() - int(s.cursor)
|
count := sw.col.Count() - int(s.cursor)
|
||||||
|
|
|
@ -1024,8 +1024,14 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
|
||||||
// No locking for pubsub
|
// No locking for pubsub
|
||||||
}
|
}
|
||||||
res, d, err := func() (res resp.Value, d commandDetails, err error) {
|
res, d, err := func() (res resp.Value, d commandDetails, err error) {
|
||||||
if client.timeout != 0 && !write {
|
if !write {
|
||||||
|
if client.timeout == 0 {
|
||||||
|
// the command itself might have a timeout,
|
||||||
|
// which will be used to update this trivial deadline.
|
||||||
|
msg.Deadline = deadline.Empty()
|
||||||
|
} else {
|
||||||
msg.Deadline = deadline.New(start.Add(client.timeout))
|
msg.Deadline = deadline.New(start.Add(client.timeout))
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if msg.Deadline.Hit() {
|
if msg.Deadline.Hit() {
|
||||||
v := recover()
|
v := recover()
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/yuin/gopher-lua"
|
"github.com/yuin/gopher-lua"
|
||||||
)
|
)
|
||||||
|
@ -247,6 +248,7 @@ type searchScanBaseTokens struct {
|
||||||
sparse uint8
|
sparse uint8
|
||||||
desc bool
|
desc bool
|
||||||
clip bool
|
clip bool
|
||||||
|
timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Server) parseSearchScanBaseTokens(
|
func (c *Server) parseSearchScanBaseTokens(
|
||||||
|
@ -579,6 +581,20 @@ func (c *Server) parseSearchScanBaseTokens(
|
||||||
}
|
}
|
||||||
t.clip = true
|
t.clip = true
|
||||||
continue
|
continue
|
||||||
|
case "timeout":
|
||||||
|
vs = nvs
|
||||||
|
var valStr string
|
||||||
|
if vs, valStr, ok = tokenval(vs); !ok || valStr == "" {
|
||||||
|
err = errInvalidNumberOfArguments
|
||||||
|
return
|
||||||
|
}
|
||||||
|
timeout, _err := strconv.ParseFloat(valStr, 64)
|
||||||
|
if _err != nil || timeout < 0 {
|
||||||
|
err = errInvalidArgument(valStr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.timeout = time.Duration(timeout * float64(time.Second))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
|
|
@ -47,6 +47,7 @@ func TestAll(t *testing.T) {
|
||||||
runSubTest(t, "scripts", mc, subTestScripts)
|
runSubTest(t, "scripts", mc, subTestScripts)
|
||||||
runSubTest(t, "info", mc, subTestInfo)
|
runSubTest(t, "info", mc, subTestInfo)
|
||||||
runSubTest(t, "client", mc, subTestClient)
|
runSubTest(t, "client", mc, subTestClient)
|
||||||
|
runSubTest(t, "timeouts", mc, subTestTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runSubTest(t *testing.T, name string, mc *mockServer, test func(t *testing.T, mc *mockServer)) {
|
func runSubTest(t *testing.T, name string, mc *mockServer, test func(t *testing.T, mc *mockServer)) {
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
package tests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
)
|
||||||
|
|
||||||
|
func subTestTimeout(t *testing.T, mc *mockServer) {
|
||||||
|
runStep(t, mc, "session set/unset", timeout_session_set_unset_test)
|
||||||
|
runStep(t, mc, "session spatial", timeout_session_spatial_test)
|
||||||
|
runStep(t, mc, "session search", timeout_session_search_test)
|
||||||
|
runStep(t, mc, "command spatial", timeout_command_spatial_test)
|
||||||
|
runStep(t, mc, "command search", timeout_command_search_test)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setup(mc *mockServer, count int, points bool) (err error) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
// add a bunch of points
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
val := fmt.Sprintf("val:%d", i)
|
||||||
|
var resp string
|
||||||
|
var lat, lon, fval float64
|
||||||
|
fval = rand.Float64()
|
||||||
|
if points {
|
||||||
|
lat = rand.Float64()*180 - 90
|
||||||
|
lon = rand.Float64()*360 - 180
|
||||||
|
resp, err = redis.String(mc.conn.Do("SET",
|
||||||
|
"mykey", val,
|
||||||
|
"FIELD", "foo", fval,
|
||||||
|
"POINT", lat, lon))
|
||||||
|
} else {
|
||||||
|
resp, err = redis.String(mc.conn.Do("SET",
|
||||||
|
"mykey", val,
|
||||||
|
"FIELD", "foo", fval,
|
||||||
|
"STRING", val))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if resp != "OK" {
|
||||||
|
err = fmt.Errorf("expected 'OK', got '%s'", resp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(time.Nanosecond)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func timeout_session_set_unset_test(mc *mockServer) (err error) {
|
||||||
|
return mc.DoBatch([][]interface{}{
|
||||||
|
{"TIMEOUT"}, {"0"},
|
||||||
|
{"TIMEOUT", "0.25"}, {"OK"},
|
||||||
|
{"TIMEOUT"}, {"0.25"},
|
||||||
|
{"TIMEOUT", "0"}, {"OK"},
|
||||||
|
{"TIMEOUT"}, {"0"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func timeout_session_spatial_test(mc *mockServer) (err error) {
|
||||||
|
err = setup(mc, 10000, true)
|
||||||
|
|
||||||
|
return mc.DoBatch([][]interface{}{
|
||||||
|
{"SCAN", "mykey", "WHERE", "foo", -1, 2, "COUNT"}, {"10000"},
|
||||||
|
{"INTERSECTS", "mykey", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"10000"},
|
||||||
|
{"WITHIN", "mykey", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"10000"},
|
||||||
|
|
||||||
|
{"TIMEOUT", "0.000001"}, {"OK"},
|
||||||
|
|
||||||
|
{"SCAN", "mykey", "WHERE", "foo", -1, 2, "COUNT"}, {"ERR timeout"},
|
||||||
|
{"INTERSECTS", "mykey", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"ERR timeout"},
|
||||||
|
{"WITHIN", "mykey", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"ERR timeout"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func timeout_command_spatial_test(mc *mockServer) (err error) {
|
||||||
|
err = setup(mc, 10000, true)
|
||||||
|
|
||||||
|
return mc.DoBatch([][]interface{}{
|
||||||
|
{"TIMEOUT", "1"}, {"OK"},
|
||||||
|
{"SCAN", "mykey", "WHERE", "foo", -1, 2, "COUNT"}, {"10000"},
|
||||||
|
{"INTERSECTS", "mykey", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"10000"},
|
||||||
|
{"WITHIN", "mykey", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"10000"},
|
||||||
|
|
||||||
|
{"SCAN", "mykey", "TIMEOUT", "0.000001", "WHERE", "foo", -1, 2, "COUNT"}, {"ERR timeout"},
|
||||||
|
{"INTERSECTS", "mykey", "TIMEOUT", "0.000001", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"ERR timeout"},
|
||||||
|
{"WITHIN", "mykey", "TIMEOUT", "0.000001", "WHERE", "foo", -1, 2, "COUNT", "BOUNDS", -90, -180, 90, 180}, {"ERR timeout"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func timeout_session_search_test(mc *mockServer) (err error) {
|
||||||
|
err = setup(mc, 10000, false)
|
||||||
|
|
||||||
|
return mc.DoBatch([][]interface{}{
|
||||||
|
{"SEARCH", "mykey", "MATCH", "val:*", "COUNT"}, {"10000"},
|
||||||
|
{"TIMEOUT", "0.000001"}, {"OK"},
|
||||||
|
{"SEARCH", "mykey", "MATCH", "val:*", "COUNT"}, {"ERR timeout"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func timeout_command_search_test(mc *mockServer) (err error) {
|
||||||
|
err = setup(mc, 10000, false)
|
||||||
|
|
||||||
|
return mc.DoBatch([][]interface{}{
|
||||||
|
{"TIMEOUT", "1"}, {"OK"},
|
||||||
|
{"SEARCH", "mykey", "MATCH", "val:*", "COUNT"}, {"10000"},
|
||||||
|
{"SEARCH", "mykey", "TIMEOUT", "0.000001", "MATCH", "val:*", "COUNT"}, {"ERR timeout"},
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue