2017-02-12 17:58:03 +03:00
package tests
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
2019-01-09 10:23:53 +03:00
"log"
2017-02-12 17:58:03 +03:00
"net"
"strconv"
2019-01-09 10:23:53 +03:00
"strings"
2020-10-23 19:50:51 +03:00
"sync"
"sync/atomic"
2017-02-12 17:58:03 +03:00
"testing"
"time"
2018-10-29 15:00:54 +03:00
"github.com/gomodule/redigo/redis"
2017-02-12 17:58:03 +03:00
"github.com/tidwall/gjson"
)
func subTestFence ( t * testing . T , mc * mockServer ) {
2019-01-10 21:28:29 +03:00
// Standard
2017-02-24 16:03:11 +03:00
runStep ( t , mc , "basic" , fence_basic_test )
2019-01-09 10:23:53 +03:00
runStep ( t , mc , "channel message order" , fence_channel_message_order_test )
2017-02-24 16:03:11 +03:00
runStep ( t , mc , "detect inside,outside" , fence_detect_inside_test )
2019-01-10 21:28:29 +03:00
// Roaming
runStep ( t , mc , "roaming live" , fence_roaming_live_test )
runStep ( t , mc , "roaming channel" , fence_roaming_channel_test )
runStep ( t , mc , "roaming webhook" , fence_roaming_webhook_test )
2019-08-03 20:10:28 +03:00
// channel meta
runStep ( t , mc , "channel meta" , fence_channel_meta_test )
2020-10-23 19:50:51 +03:00
// various
runStep ( t , mc , "detect eecio" , fence_eecio_test )
2017-02-12 17:58:03 +03:00
}
type fenceReader struct {
conn net . Conn
rd * bufio . Reader
}
func ( fr * fenceReader ) receive ( ) ( string , error ) {
if err := fr . conn . SetReadDeadline ( time . Now ( ) . Add ( time . Second ) ) ; err != nil {
return "" , err
}
line , err := fr . rd . ReadBytes ( '\n' )
if err != nil {
return "" , err
}
if len ( line ) < 4 || line [ 0 ] != '$' || line [ len ( line ) - 2 ] != '\r' || line [ len ( line ) - 1 ] != '\n' {
return "" , errors . New ( "invalid message" )
}
n , err := strconv . ParseUint ( string ( line [ 1 : len ( line ) - 2 ] ) , 10 , 64 )
if err != nil {
return "" , err
}
buf := make ( [ ] byte , int ( n ) + 2 )
_ , err = io . ReadFull ( fr . rd , buf )
if err != nil {
return "" , err
}
if buf [ len ( buf ) - 2 ] != '\r' || buf [ len ( buf ) - 1 ] != '\n' {
return "" , errors . New ( "invalid message" )
}
js := buf [ : len ( buf ) - 2 ]
var m interface { }
if err := json . Unmarshal ( js , & m ) ; err != nil {
return "" , err
}
return string ( js ) , nil
}
func ( fr * fenceReader ) receiveExpect ( valex ... string ) error {
s , err := fr . receive ( )
if err != nil {
return err
}
for i := 0 ; i < len ( valex ) ; i += 2 {
if gjson . Get ( s , valex [ i ] ) . String ( ) != valex [ i + 1 ] {
return fmt . Errorf ( "expected '%s'='%s', got '%s'" , valex [ i ] , valex [ i + 1 ] , gjson . Get ( s , valex [ i ] ) . String ( ) )
}
}
return nil
}
2017-02-24 16:03:11 +03:00
func fence_basic_test ( mc * mockServer ) error {
2017-02-12 17:58:03 +03:00
conn , err := net . Dial ( "tcp" , fmt . Sprintf ( ":%d" , mc . port ) )
if err != nil {
return err
}
defer conn . Close ( )
_ , err = fmt . Fprintf ( conn , "NEARBY mykey FENCE POINT 33 -115 5000\r\n" )
if err != nil {
return err
}
buf := make ( [ ] byte , 4096 )
n , err := conn . Read ( buf )
if err != nil {
return err
}
res := string ( buf [ : n ] )
if res != "+OK\r\n" {
return fmt . Errorf ( "expected OK, got '%v'" , res )
}
rd := & fenceReader { conn , bufio . NewReader ( conn ) }
// send a point
c , err := redis . Dial ( "tcp" , fmt . Sprintf ( ":%d" , mc . port ) )
if err != nil {
return err
}
defer c . Close ( )
res , err = redis . String ( c . Do ( "SET" , "mykey" , "myid1" , "POINT" , 33 , - 115 ) )
if err != nil {
return err
}
if res != "OK" {
return fmt . Errorf ( "expected OK, got '%v'" , res )
}
// receive the message
if err := rd . receiveExpect ( "command" , "set" ,
"detect" , "enter" ,
"key" , "mykey" ,
"id" , "myid1" ,
"object.type" , "Point" ,
"object.coordinates" , "[-115,33]" ) ; err != nil {
return err
}
if err := rd . receiveExpect ( "command" , "set" ,
"detect" , "inside" ,
"key" , "mykey" ,
"id" , "myid1" ,
"object.type" , "Point" ,
"object.coordinates" , "[-115,33]" ) ; err != nil {
return err
}
res , err = redis . String ( c . Do ( "SET" , "mykey" , "myid1" , "POINT" , 34 , - 115 ) )
if err != nil {
return err
}
if res != "OK" {
return fmt . Errorf ( "expected OK, got '%v'" , res )
}
// receive the message
if err := rd . receiveExpect ( "command" , "set" ,
"detect" , "exit" ,
"key" , "mykey" ,
"id" , "myid1" ,
"object.type" , "Point" ,
"object.coordinates" , "[-115,34]" ) ; err != nil {
return err
}
if err := rd . receiveExpect ( "command" , "set" ,
"detect" , "outside" ,
"key" , "mykey" ,
"id" , "myid1" ,
"object.type" , "Point" ,
"object.coordinates" , "[-115,34]" ) ; err != nil {
return err
}
return nil
}
2019-01-09 10:23:53 +03:00
func fence_channel_message_order_test ( mc * mockServer ) error {
// Create a channel to store the goroutines error
finalErr := make ( chan error )
// Concurrently subscribe for notifications
go func ( ) {
// Create the subscription connection to Tile38 to subscribe for updates
sc , err := redis . Dial ( "tcp" , fmt . Sprintf ( ":%d" , mc . port ) )
if err != nil {
log . Println ( err )
return
}
defer sc . Close ( )
// Subscribe the subscription client to the * pattern
psc := redis . PubSubConn { Conn : sc }
if err := psc . PSubscribe ( "*" ) ; err != nil {
log . Println ( err )
return
}
var msgs [ ] string
// While not a permanent error on the connection.
loop :
for sc . Err ( ) == nil {
switch v := psc . Receive ( ) . ( type ) {
case redis . Message :
msgs = append ( msgs , string ( v . Data ) )
if len ( msgs ) == 8 {
break loop
}
case error :
fmt . Printf ( err . Error ( ) )
}
}
// Verify all messages
correctOrder := [ ] string { "exit:A" , "exit:B" , "outside:A" , "outside:B" , "enter:C" , "enter:D" , "inside:C" , "inside:D" }
for i := range msgs {
if gjson . Get ( msgs [ i ] , "detect" ) . String ( ) + ":" +
gjson . Get ( msgs [ i ] , "hook" ) . String ( ) != correctOrder [ i ] {
finalErr <- errors . New ( "INVALID MESSAGE ORDER" )
}
}
finalErr <- nil
} ( )
2019-01-10 21:28:29 +03:00
// Create the base connection for setting up points and geofences
2019-01-09 10:23:53 +03:00
bc , err := redis . Dial ( "tcp" , fmt . Sprintf ( ":%d" , mc . port ) )
if err != nil {
return err
}
defer bc . Close ( )
// Fire all setup commands on the base client
for _ , cmd := range [ ] string {
"SET points point POINT 33.412529053733444 -111.93368911743164" ,
fmt . Sprintf ( ` SETCHAN A WITHIN points FENCE OBJECT { "type":"Polygon","coordinates":[[[-111.95205688476562,33.400491820565236],[-111.92630767822266,33.400491820565236],[-111.92630767822266,33.422272258866045],[-111.95205688476562,33.422272258866045],[-111.95205688476562,33.400491820565236]]]} ` ) ,
fmt . Sprintf ( ` SETCHAN B WITHIN points FENCE OBJECT { "type":"Polygon","coordinates":[[[-111.93952560424803,33.403501285221594],[-111.92630767822266,33.403501285221594],[-111.92630767822266,33.41997983836345],[-111.93952560424803,33.41997983836345],[-111.93952560424803,33.403501285221594]]]} ` ) ,
fmt . Sprintf ( ` SETCHAN C WITHIN points FENCE OBJECT { "type":"Polygon","coordinates":[[[-111.9255781173706,33.40342963251261],[-111.91201686859131,33.40342963251261],[-111.91201686859131,33.41994401881284],[-111.9255781173706,33.41994401881284],[-111.9255781173706,33.40342963251261]]]} ` ) ,
fmt . Sprintf ( ` SETCHAN D WITHIN points FENCE OBJECT { "type":"Polygon","coordinates":[[[-111.92562103271484,33.40063513076968],[-111.90021514892578,33.40063513076968],[-111.90021514892578,33.42212898435788],[-111.92562103271484,33.42212898435788],[-111.92562103271484,33.40063513076968]]]} ` ) ,
"SET points point POINT 33.412529053733444 -111.91909790039062" ,
} {
if _ , err := do ( bc , cmd ) ; err != nil {
return err
}
}
return <- finalErr
}
2017-02-24 16:03:11 +03:00
func fence_detect_inside_test ( mc * mockServer ) error {
conn , err := net . Dial ( "tcp" , fmt . Sprintf ( ":%d" , mc . port ) )
if err != nil {
return err
}
defer conn . Close ( )
_ , err = fmt . Fprintf ( conn , "WITHIN users FENCE DETECT inside,outside POINTS BOUNDS 33.618824 -84.457973 33.654359 -84.399859\r\n" )
if err != nil {
return err
}
buf := make ( [ ] byte , 4096 )
n , err := conn . Read ( buf )
if err != nil {
return err
}
res := string ( buf [ : n ] )
if res != "+OK\r\n" {
return fmt . Errorf ( "expected OK, got '%v'" , res )
}
rd := & fenceReader { conn , bufio . NewReader ( conn ) }
// send a point
c , err := redis . Dial ( "tcp" , fmt . Sprintf ( ":%d" , mc . port ) )
if err != nil {
return err
}
defer c . Close ( )
res , err = redis . String ( c . Do ( "SET" , "users" , "200" , "POINT" , "33.642301" , "-84.43118" ) )
if err != nil {
return err
}
if res != "OK" {
return fmt . Errorf ( "expected OK, got '%v'" , res )
}
if err := rd . receiveExpect ( "command" , "set" ,
"detect" , "inside" ,
"key" , "users" ,
"id" , "200" ,
"point" , ` { "lat":33.642301,"lon":-84.43118} ` ) ; err != nil {
return err
}
res , err = redis . String ( c . Do ( "SET" , "users" , "200" , "POINT" , "34.642301" , "-84.43118" ) )
if err != nil {
return err
}
if res != "OK" {
return fmt . Errorf ( "expected OK, got '%v'" , res )
}
// receive the message
if err := rd . receiveExpect ( "command" , "set" ,
"detect" , "outside" ,
"key" , "users" ,
"id" , "200" ,
"point" , ` { "lat":34.642301,"lon":-84.43118} ` ) ; err != nil {
return err
}
return nil
}
2019-01-09 10:23:53 +03:00
// do performs the passed command on the passed redis client
func do ( c redis . Conn , cmd string ) ( interface { } , error ) {
// Split out all parameters
params := strings . Split ( cmd , " " )
// Produce a slice of interfaces for use in the arguments
var args [ ] interface { }
for _ , p := range params [ 1 : ] {
args = append ( args , p )
}
// Perform the request and return the response
return c . Do ( params [ 0 ] , args ... )
}
2019-08-03 20:10:28 +03:00
func fence_channel_meta_test ( mc * mockServer ) error {
return mc . DoBatch ( [ ] [ ] interface { } {
{ "SETCHAN" , "carbon" , "NEARBY" , "x" , "MATCH" , "carbon*" , "FENCE" , "NODWELL" , "points" , "ROAM" , "x" , "*" , "200000" } , { "1" } ,
{ "OUTPUT" , "json" } , { ` { "ok":true} ` } ,
// check for valid json on the chans command
{ "CHANS" , "*" } , {
func ( v interface { } ) ( resp , expect interface { } ) {
// v is the value as strings or slices of strings
// test will pass as long as `resp` and `expect` are the same.
if ! json . Valid ( [ ] byte ( v . ( string ) ) ) {
return v , "Valid JSON"
}
return true , true
} ,
} ,
} )
}
2020-10-23 19:50:51 +03:00
func dialTile38 ( port int ) ( redis . Conn , error ) {
conn , err := redis . Dial ( "tcp" , fmt . Sprintf ( ":%d" , port ) )
if err != nil {
return nil , err
}
if _ , err := conn . Do ( "OUTPUT" , "json" ) ; err != nil {
conn . Close ( )
return nil , err
}
return conn , nil
}
func doTile38 ( c redis . Conn , cmd string , args ... interface { } ) ( string , error ) {
js , err := redis . String ( c . Do ( cmd , args ... ) )
if ! gjson . Get ( js , "ok" ) . Bool ( ) {
return "" , errors . New ( gjson . Get ( js , "err" ) . String ( ) )
}
return js , err
}
func fence_eecio_test ( mc * mockServer ) error {
// simulates issue #578
var wg sync . WaitGroup
wg . Add ( 3 )
ch := make ( chan bool )
var err1 , err2 , err3 error
var msgs1 , msgs2 [ ] string
// terminal 1
go func ( ) {
defer wg . Done ( )
err1 = func ( ) error {
conn , err := dialTile38 ( mc . port )
if err != nil {
return err
}
defer conn . Close ( )
_ , err = doTile38 ( conn ,
"SETCHAN" , "test-eec" , "NEARBY" , "fleet" ,
"FENCE" , "DETECT" , "enter,exit,cross" ,
"POINT" , "10.000" , "10.000" , "10000" )
if err != nil {
return err
}
_ , err = doTile38 ( conn , "SUBSCRIBE" , "test-eec" )
if err != nil {
return err
}
ch <- true
for {
js , err := redis . String ( conn . Receive ( ) )
if err != nil {
return err
}
if js == ` "DONE" ` {
break
}
msgs1 = append ( msgs1 , js )
}
return nil
} ( )
} ( )
// terminal 2
go func ( ) {
defer wg . Done ( )
err2 = func ( ) error {
conn , err := dialTile38 ( mc . port )
if err != nil {
return err
}
defer conn . Close ( )
_ , err = doTile38 ( conn ,
"SETCHAN" , "test-eecio" , "NEARBY" , "fleet" ,
"FENCE" , "DETECT" , "enter,exit,cross,inside,outside" ,
"POINT" , "10.000" , "10.000" , "10000" )
if err != nil {
return err
}
_ , err = doTile38 ( conn , "SUBSCRIBE" , "test-eecio" )
if err != nil {
return err
}
ch <- true
for {
js , err := redis . String ( conn . Receive ( ) )
if err != nil {
return err
}
if js == ` "DONE" ` {
break
}
msgs2 = append ( msgs2 , js )
}
return nil
} ( )
} ( )
// terminal 3
var ok bool
go func ( ) {
defer wg . Done ( )
err3 = func ( ) error {
<- ch // terminal 1
<- ch // terminal 2
conn , err := dialTile38 ( mc . port )
if err != nil {
return err
}
defer conn . Close ( )
if _ , err = doTile38 ( conn ,
"SET" , "fleet" , "vehicle_1" ,
"POINT" , "10.0" , "10.0" ) ; err != nil {
return err
}
if _ , err = doTile38 ( conn ,
"SET" , "fleet" , "vehicle_1" ,
"POINT" , "0.0" , "0.0" ) ; err != nil {
return err
}
if _ , err = doTile38 ( conn ,
"SET" , "fleet" , "vehicle_1" ,
"POINT" , "20.0" , "20.0" ) ; err != nil {
return err
}
if _ , err = doTile38 ( conn , "PUBLISH" , "test-eecio" ,
"DONE" ) ; err != nil {
return err
}
if _ , err = doTile38 ( conn , "PUBLISH" , "test-eec" ,
"DONE" ) ; err != nil {
return err
}
ok = true
return nil
} ( )
} ( )
var timeok int32
go func ( ) {
time . Sleep ( time . Second * 10 )
if atomic . LoadInt32 ( & timeok ) == 0 {
panic ( "timeout" )
}
} ( )
wg . Wait ( )
atomic . StoreInt32 ( & timeok , 1 )
if err3 != nil {
return err3
}
if ! ok {
if err2 != nil {
return err2
}
if err1 != nil {
return err1
}
}
var detects [ ] string
for i := 0 ; i < len ( msgs1 ) ; i ++ {
detects = append ( detects , gjson . Get ( msgs1 [ i ] , "detect" ) . String ( ) )
}
if strings . Join ( detects , "," ) != "enter,exit,cross" {
errmsg := fmt . Sprintf ( "expected 'enter,exit,cross', got '%s'\n" ,
strings . Join ( detects , "," ) )
return errors . New ( errmsg )
}
detects = nil
for i := 0 ; i < len ( msgs2 ) ; i ++ {
detects = append ( detects , gjson . Get ( msgs2 [ i ] , "detect" ) . String ( ) )
}
if strings . Join ( detects , "," ) != "enter,inside,exit,outside,cross,outside" {
errmsg := fmt . Sprintf (
"expected 'enter,inside,exit,outside,cross,outside', got '%s'\n" ,
strings . Join ( detects , "," ) )
return errors . New ( errmsg )
}
return nil
}