From e514a0287fb7e4c6ec9dc8d7c24550151039842a Mon Sep 17 00:00:00 2001 From: Alex Roitman Date: Wed, 24 Apr 2019 12:02:39 -0700 Subject: [PATCH] Add timeout subcommand to scan/search commands. Use per-query timeout for those commands, if it was given. --- internal/deadline/deadline.go | 5 +++++ internal/server/scan.go | 3 +++ internal/server/search.go | 9 +++++++++ internal/server/server.go | 2 +- internal/server/token.go | 16 ++++++++++++++++ 5 files changed, 34 insertions(+), 1 deletion(-) diff --git a/internal/deadline/deadline.go b/internal/deadline/deadline.go index 7acf2577..cb22525e 100644 --- a/internal/deadline/deadline.go +++ b/internal/deadline/deadline.go @@ -13,6 +13,11 @@ func New(deadline time.Time) *Deadline { return &Deadline{unixNano: deadline.UnixNano()} } +// Update the deadline from a given time object +func (deadline *Deadline) Update(newDeadline time.Time) { + deadline.unixNano = newDeadline.UnixNano() +} + // Check the deadline and panic when reached //go:noinline func (deadline *Deadline) Check() { diff --git a/internal/server/scan.go b/internal/server/scan.go index 064fca6b..bded9749 100644 --- a/internal/server/scan.go +++ b/internal/server/scan.go @@ -55,6 +55,9 @@ func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) { wr.WriteString(`{"ok":true`) } sw.writeHead() + if s.timeout != 0 { + msg.Deadline.Update(start.Add(s.timeout)) + } if sw.col != nil { if sw.output == outputCount && len(sw.wheres) == 0 && len(sw.whereins) == 0 && sw.globEverything == true { diff --git a/internal/server/search.go b/internal/server/search.go index 1b7f9af0..d4083a74 100644 --- a/internal/server/search.go +++ b/internal/server/search.go @@ -370,6 +370,9 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { wr.WriteString(`{"ok":true`) } sw.writeHead() + if s.timeout != 0 { + msg.Deadline.Update(start.Add(s.timeout)) + } if sw.col != nil { iter := func(id string, o geojson.Object, fields []float64, dist float64) bool { meters := 0.0 @@ -480,6 +483,9 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. wr.WriteString(`{"ok":true`) } sw.writeHead() + if s.timeout != 0 { + msg.Deadline.Update(start.Add(s.timeout)) + } if sw.col != nil { if cmd == "within" { sw.col.Within(s.obj, s.sparse, sw, msg.Deadline, func( @@ -570,6 +576,9 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) { wr.WriteString(`{"ok":true`) } sw.writeHead() + if s.timeout != 0 { + msg.Deadline.Update(start.Add(s.timeout)) + } if sw.col != nil { if sw.output == outputCount && len(sw.wheres) == 0 && sw.globEverything == true { count := sw.col.Count() - int(s.cursor) diff --git a/internal/server/server.go b/internal/server/server.go index fd60b582..bea3564b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1024,7 +1024,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { // No locking for pubsub } res, d, err := func() (res resp.Value, d commandDetails, err error) { - if client.timeout != 0 && !write { + if !write { msg.Deadline = deadline.New(start.Add(client.timeout)) defer func() { if msg.Deadline.Hit() { diff --git a/internal/server/token.go b/internal/server/token.go index 71a81044..8b46ef01 100644 --- a/internal/server/token.go +++ b/internal/server/token.go @@ -6,6 +6,7 @@ import ( "math" "strconv" "strings" + "time" "github.com/yuin/gopher-lua" ) @@ -247,6 +248,7 @@ type searchScanBaseTokens struct { sparse uint8 desc bool clip bool + timeout time.Duration } func (c *Server) parseSearchScanBaseTokens( @@ -579,6 +581,20 @@ func (c *Server) parseSearchScanBaseTokens( } t.clip = true continue + case "timeout": + vs = nvs + var valStr string + if vs, valStr, ok = tokenval(vs); !ok || valStr == "" { + err = errInvalidNumberOfArguments + return + } + timeout, _err := strconv.ParseFloat(valStr, 64) + if _err != nil || timeout < 0 { + err = errInvalidArgument(valStr) + return + } + t.timeout = time.Duration(timeout * float64(time.Second)) + continue } } break