Reset wheres while geofencing

This commit is contained in:
tidwall 2022-08-30 16:50:19 -07:00
parent cc9320e246
commit 67916f38f8
2 changed files with 28 additions and 9 deletions

View File

@ -294,6 +294,7 @@ func (s *Server) queueHooks(d *commandDetails) error {
for _, hook := range candidates {
// Calculate all matching fence messages for all candidates and append
// them to the appropriate message slice
hook.ScanWriter.loadWheres()
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, hook.Metas, d)
if len(msgs) > 0 {
if hook.channel {

View File

@ -33,6 +33,7 @@ type scanWriter struct {
mu sync.Mutex
s *Server
wr *bytes.Buffer
key string
msg *Message
col *collection.Collection
fmap map[string]int
@ -58,6 +59,8 @@ type scanWriter struct {
values []resp.Value
matchValues bool
respOut resp.Value
orgWheres []whereT
orgWhereins []whereinT
}
// ScanWriterParams ...
@ -97,6 +100,7 @@ func (s *Server) newScanWriter(
sw := &scanWriter{
s: s,
wr: wr,
key: key,
msg: msg,
limit: limit,
cursor: cursor,
@ -114,34 +118,48 @@ func (s *Server) newScanWriter(
sw.globSingle = true
}
}
sw.col = s.getCol(key)
sw.orgWheres = wheres
sw.orgWhereins = whereins
sw.loadWheres()
return sw, nil
}
func (sw *scanWriter) loadWheres() {
sw.fmap = nil
sw.farr = nil
sw.wheres = nil
sw.whereins = nil
sw.fvals = nil
sw.col = sw.s.getCol(sw.key)
if sw.col != nil {
sw.fmap = sw.col.FieldMap()
sw.farr = sw.col.FieldArr()
// This fills index value in wheres/whereins
// so we don't have to map string field names for each tested object
var ok bool
if len(wheres) > 0 {
sw.wheres = make([]whereT, len(wheres))
for i, where := range wheres {
if len(sw.orgWheres) > 0 {
sw.wheres = make([]whereT, len(sw.orgWheres))
for i, where := range sw.orgWheres {
if where.index, ok = sw.fmap[where.field]; !ok {
where.index = math.MaxInt32
}
sw.wheres[i] = where
}
}
if len(whereins) > 0 {
sw.whereins = make([]whereinT, len(whereins))
for i, wherein := range whereins {
if len(sw.orgWhereins) > 0 {
sw.whereins = make([]whereinT, len(sw.orgWhereins))
for i, wherein := range sw.orgWhereins {
if wherein.index, ok = sw.fmap[wherein.field]; !ok {
wherein.index = math.MaxInt32
}
sw.whereins[i] = wherein
}
}
if len(sw.farr) > 0 {
sw.fvals = make([]float64, len(sw.farr))
}
}
sw.fvals = make([]float64, len(sw.farr))
return sw, nil
}
func (sw *scanWriter) hasFieldsOutput() bool {