diff --git a/go.sum b/go.sum index 720fe160..44f58978 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/golang/protobuf v0.0.0-20170920220647-130e6b02ab05 h1:Kesru7U6Mhpf/x7 github.com/golang/protobuf v0.0.0-20170920220647-130e6b02ab05/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.7.0 h1:ZKld1VOtsGhAe37E7wMxEDgAlGM5dvFY+DiOhSkhP9Y= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.1-0.20181026001555-e8fc0692a7e2+incompatible h1:H4S5GVLXZxCnS6q3+HrRBu/ObgobnAHg92tWG8cLfX8= github.com/gomodule/redigo v2.0.1-0.20181026001555-e8fc0692a7e2+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= diff --git a/internal/server/fence.go b/internal/server/fence.go index 1f875dbc..5674c302 100644 --- a/internal/server/fence.go +++ b/internal/server/fence.go @@ -2,6 +2,7 @@ package server import ( "math" + "sort" "strconv" "time" @@ -61,6 +62,7 @@ func fenceMatch( if details.command == "drop" { return []string{ `{"command":"drop"` + hookJSONString(hookName, metas) + + `,"key":` + jsonString(details.key) + `,"time":` + jsonTimeFormat(details.timestamp) + `}`, } } @@ -82,14 +84,6 @@ func fenceMatch( } } if details.command == "del" { - if fence.roam.on { - if fence.roam.nearbys != nil { - delete(fence.roam.nearbys, details.id) - if len(fence.roam.nearbys) == 0 { - fence.roam.nearbys = nil - } - } - } return []string{ `{"command":"del"` + hookJSONString(hookName, metas) + `,"key":` + jsonString(details.key) + @@ -103,8 +97,8 @@ func fenceMatch( if fence.roam.on { if details.command == "set" { roamNearbys, roamFaraways = - fenceMatchRoam(sw.s, fence, details.key, - details.id, details.obj) + fenceMatchRoam(sw.s, fence, details.id, + details.oldObj, details.obj) } if len(roamNearbys) == 0 && len(roamFaraways) == 0 { return nil @@ -354,16 +348,17 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool { return false } -func fenceMatchRoam( +func fenceMatchNearbys( s *Server, fence *liveFenceSwitches, - tkey, tid string, obj geojson.Object, -) (nearbys, faraways []roamMatch) { + id string, obj geojson.Object, +) (nearbys []roamMatch) { + if obj == nil { + return nil + } col := s.getCol(fence.roam.key) if col == nil { - return + return nil } - prevNearbys := fence.roam.nearbys[tid] - var newNearbys map[string]bool center := obj.Center() minLat, minLon, maxLat, maxLon := geo.RectFromCenter(center.Y, center.X, fence.roam.meters) @@ -372,66 +367,75 @@ func fenceMatchRoam( Max: geometry.Point{X: maxLon, Y: maxLat}, } col.Intersects(geojson.NewRect(rect), 0, nil, nil, func( - id string, obj2 geojson.Object, fields []float64, + id2 string, obj2 geojson.Object, fields []float64, ) bool { - if s.hasExpired(fence.roam.key, id) { - return true + if s.hasExpired(fence.roam.key, id2) { + return true // skip expired } var idMatch bool - if id == tid { + if id2 == id { return true // skip self } + meters := obj.Distance(obj2) + if meters > fence.roam.meters { + return true // skip outside radius + } if fence.roam.pattern { - idMatch, _ = glob.Match(fence.roam.id, id) + idMatch, _ = glob.Match(fence.roam.id, id2) } else { - idMatch = fence.roam.id == id + idMatch = fence.roam.id == id2 } if !idMatch { - return true - } - if newNearbys == nil { - newNearbys = make(map[string]bool) - } - newNearbys[id] = true - prev := prevNearbys[id] - if prev { - delete(prevNearbys, id) + return true // skip non-id match } match := roamMatch{ - id: id, + id: id2, obj: obj2, meters: obj.Distance(obj2), } - - if !prev || !fence.nodwell { - // brand new "nearby" - nearbys = append(nearbys, match) - } + nearbys = append(nearbys, match) return true }) - for id := range prevNearbys { - obj2, _, ok := col.Get(id) - if ok && !s.hasExpired(fence.roam.key, id) { - faraways = append(faraways, roamMatch{ - id: id, - obj: obj2, - meters: obj.Distance(obj2), - }) - } - } + return nearbys +} - if len(newNearbys) == 0 { - if fence.roam.nearbys != nil { - delete(fence.roam.nearbys, tid) - if len(fence.roam.nearbys) == 0 { - fence.roam.nearbys = nil +func fenceMatchRoam( + s *Server, fence *liveFenceSwitches, + id string, old, obj geojson.Object, +) (nearbys, faraways []roamMatch) { + oldNearbys := fenceMatchNearbys(s, fence, id, old) + newNearbys := fenceMatchNearbys(s, fence, id, obj) + + // Go through all matching objects in new-nearbys and old-nearbys. + for i := 0; i < len(oldNearbys); i++ { + var match bool + var j int + for ; j < len(newNearbys); j++ { + if newNearbys[i].id == oldNearbys[i].id { + match = true + break } } - } else { - if fence.roam.nearbys == nil { - fence.roam.nearbys = make(map[string]map[string]bool) + if match { + // dwelling, more from old-nearbys + oldNearbys[i] = oldNearbys[len(oldNearbys)-1] + oldNearbys = oldNearbys[:len(oldNearbys)-1] + if fence.nodwell { + // no dwelling allowed, remove from both lists + newNearbys[j] = newNearbys[len(newNearbys)-1] + newNearbys = newNearbys[:len(newNearbys)-1] + } } - fence.roam.nearbys[tid] = newNearbys } - return + faraways, nearbys = oldNearbys, newNearbys + for i := 0; i < len(faraways); i++ { + faraways[i].meters = faraways[i].obj.Distance(obj) + } + sort.Slice(faraways, func(i, j int) bool { + return faraways[i].meters < faraways[j].meters + }) + sort.Slice(nearbys, func(i, j int) bool { + return nearbys[i].meters < nearbys[j].meters + }) + return nearbys, faraways } diff --git a/internal/server/search.go b/internal/server/search.go index f9c4eda2..96f9d461 100644 --- a/internal/server/search.go +++ b/internal/server/search.go @@ -35,7 +35,6 @@ type roamSwitches struct { pattern bool meters float64 scan string - nearbys map[string]map[string]bool } type roamMatch struct { diff --git a/tests/fence_roaming_test.go b/tests/fence_roaming_test.go index 8a6e68ba..e96084a3 100644 --- a/tests/fence_roaming_test.go +++ b/tests/fence_roaming_test.go @@ -1,11 +1,12 @@ package tests import ( - "errors" "fmt" "io/ioutil" "net/http" "net/http/httptest" + "sync" + "time" "github.com/gomodule/redigo/redis" "github.com/tidwall/pretty" @@ -82,78 +83,95 @@ func fence_roaming_webhook_test(mc *mockServer) error { return <-finalErr } +func goMultiFunc(mc *mockServer, fns ...func() error) error { + errs := make([]error, len(fns)) + var wg sync.WaitGroup + wg.Add(len(fns)) + for i := 0; i < len(fns); i++ { + go func(i int) { + defer wg.Done() + errs[i] = fns[i]() + }(i) + } + wg.Wait() + var ferrs []error + for i := 0; i < len(errs); i++ { + if errs[i] != nil { + ferrs = append(ferrs, errs[i]) + } + } + if len(ferrs) == 0 { + return nil + } + if len(ferrs) == 1 { + return ferrs[0] + } + return fmt.Errorf("%v", ferrs) +} + func fence_roaming_live_test(mc *mockServer) error { car1, car2, expected := roamingTestData() - finalErr := make(chan error) - - go func() { - // Create a connection for subscribing to geofence notifications - sc, err := redis.Dial("tcp", fmt.Sprintf(":%d", mc.port)) - if err != nil { - finalErr <- err - return - } - defer sc.Close() - - // Set up a live geofence stream - if _, err := sc.Do("NEARBY", "cars", "FENCE", "ROAM", "cars", "*", 1000); err != nil { - finalErr <- err - return - } - - actual := []string{} - for sc.Err() == nil { - if err := func() error { - bodyi, err := sc.Receive() + var liveReady sync.WaitGroup + liveReady.Add(1) + return goMultiFunc(mc, + func() error { + sc, err := redis.DialTimeout("tcp", fmt.Sprintf(":%d", mc.port), + 0, time.Second*5, time.Second*5) + if err != nil { + liveReady.Done() + return err + } + defer sc.Close() + // Set up a live geofence stream + reply, err := redis.String( + sc.Do("NEARBY", "cars", "FENCE", "ROAM", "cars", "*", 1000), + ) + if err != nil { + liveReady.Done() + return err + } + if reply != "OK" { + liveReady.Done() + return fmt.Errorf("expected 'OK', got '%v'", reply) + } + liveReady.Done() + for i := 0; i < len(expected); i++ { + reply, err := redis.String(sc.Receive()) if err != nil { return err } - body, ok := bodyi.([]byte) - if !ok { - return errors.New("Non byte-slice received") + reply = cleanMessage([]byte(reply)) + if reply != expected[i] { + return fmt.Errorf("Expected '%s' but got '%s'", + expected[i], reply) } - - // If the new message doesn't match whats expected an error - // should be returned - actual = append(actual, cleanMessage(body)) - pos := len(actual) - 1 - if len(expected) < pos+1 { - return fmt.Errorf("More messages than expected were received : '%s'", actual[pos]) - } - if actual[pos] != expected[pos] { - return fmt.Errorf("Expected '%s' but got '%s'", expected[pos], - actual[pos]) - } - if len(actual) == len(expected) { - finalErr <- nil - } - return nil - }(); err != nil { - finalErr <- err } - } - }() + return nil + }, + func() error { + liveReady.Wait() + bc, err := redis.Dial("tcp", fmt.Sprintf(":%d", mc.port)) + if err != nil { + return err + } + defer bc.Close() - // Create the base connection for setting up points and geofences - bc, err := redis.Dial("tcp", fmt.Sprintf(":%d", mc.port)) - if err != nil { - return err - } - defer bc.Close() + // Fire all car movement commands on the base client + for i := range car1 { - // Fire all car movement commands on the base client - for i := range car1 { - if _, err := bc.Do("SET", "cars", "car1", "POINT", car1[i][1], - car1[i][0]); err != nil { - return err - } - if _, err := bc.Do("SET", "cars", "car2", "POINT", car2[i][1], - car2[i][0]); err != nil { - return err - } - } + if _, err := bc.Do("SET", "cars", "car1", "POINT", car1[i][1], + car1[i][0]); err != nil { + return err + } + if _, err := bc.Do("SET", "cars", "car2", "POINT", car2[i][1], + car2[i][0]); err != nil { + return err + } + } - return <-finalErr + return nil + }, + ) } func fence_roaming_channel_test(mc *mockServer) error { @@ -271,7 +289,6 @@ func roamingTestData() (car1 [][]float64, car2 [][]float64, output []string) { `{"command":"set","detect":"roam","key":"cars","id":"car2","object":{"type":"Point","coordinates":[-111.91781044006346,33.414750027566235]},"nearby":{"key":"cars","id":"car1","object":{"type":"Point","coordinates":[-111.91789627075195,33.414750027566235]},"meters":7.966}}`, `{"command":"set","detect":"roam","key":"cars","id":"car1","object":{"type":"Point","coordinates":[-111.9111156463623,33.414750027566235]},"nearby":{"key":"cars","id":"car2","object":{"type":"Point","coordinates":[-111.91781044006346,33.414750027566235]},"meters":621.377}}`, `{"command":"set","detect":"roam","key":"cars","id":"car2","object":{"type":"Point","coordinates":[-111.92416191101074,33.414750027566235]},"faraway":{"key":"cars","id":"car1","object":{"type":"Point","coordinates":[-111.9111156463623,33.414750027566235]},"meters":1210.89}}`, - `{"command":"set","detect":"roam","key":"cars","id":"car1","object":{"type":"Point","coordinates":[-111.90510749816895,33.414750027566235]},"faraway":{"key":"cars","id":"car2","object":{"type":"Point","coordinates":[-111.92416191101074,33.414750027566235]},"meters":1768.536}}`, } return }