From 3e3d3649119a846fe65b9baf2cfa0009f90a93be Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Wed, 28 Dec 2016 11:16:28 -0700 Subject: [PATCH] test app for #107 --- controller/expire.go | 41 ++++- controller/fence.go | 97 +++++----- controller/json.go | 13 ++ tests/107/.gitignore | 3 + tests/107/LINK | 1 + tests/107/main.go | 412 +++++++++++++++++++++++++++++++++++++++++++ tests/keys_test.go | 131 ++++++++++++++ tests/mock_test.go | 4 + 8 files changed, 653 insertions(+), 49 deletions(-) create mode 100644 tests/107/.gitignore create mode 100644 tests/107/LINK create mode 100644 tests/107/main.go diff --git a/controller/expire.go b/controller/expire.go index b068acca..7c2197d5 100644 --- a/controller/expire.go +++ b/controller/expire.go @@ -56,12 +56,15 @@ func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) { // It's runs through every item that has been marked as expires five times // per second. func (c *Controller) backgroundExpiring() { + const stop = 0 + const delay = 1 + const nodelay = 2 for { - ok := func() bool { - c.mu.Lock() - defer c.mu.Unlock() + op := func() int { + c.mu.RLock() + defer c.mu.RUnlock() if c.stopBackgroundExpiring { - return false + return stop } // Only excute for leaders. Followers should ignore. if c.config.FollowHost == "" { @@ -70,28 +73,52 @@ func (c *Controller) backgroundExpiring() { for id, at := range m { if now.After(at) { // issue a DEL command + c.mu.RUnlock() + c.mu.Lock() + + // double check because locks were swapped + var del bool + if m2, ok := c.expires[key]; ok { + if at2, ok := m2[id]; ok { + if now.After(at2) { + del = true + } + } + } + if !del { + return nodelay + } c.statsExpired++ msg := &server.Message{} msg.Values = resp.MultiBulkValue("del", key, id).Array() msg.Command = "del" _, d, err := c.cmdDel(msg) if err != nil { + c.mu.Unlock() log.Fatal(err) continue } if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil { + c.mu.Unlock() log.Fatal(err) continue } + c.mu.Unlock() + c.mu.RLock() + return nodelay } } } } - return true + return delay }() - if !ok { + switch op { + case stop: return + case delay: + time.Sleep(time.Millisecond * 100) + case nodelay: + time.Sleep(time.Microsecond) } - time.Sleep(time.Second / 5) } } diff --git a/controller/fence.go b/controller/fence.go index 6fc8cbb2..b2b5cb4d 100644 --- a/controller/fence.go +++ b/controller/fence.go @@ -1,8 +1,10 @@ package controller import ( + "fmt" "math" "strconv" + "time" "github.com/tidwall/gjson" "github.com/tidwall/tile38/controller/glob" @@ -10,10 +12,13 @@ import ( "github.com/tidwall/tile38/geojson" ) -var tmfmt = "2006-01-02T15:04:05.999999999Z07:00" - // FenceMatch executes a fence match returns back json messages for fence detection. func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) [][]byte { + overall := time.Now() + defer func() { + return + fmt.Printf(">> %v\n", time.Since(overall)) + }() msgs := fenceMatch(hookName, sw, fence, details) if len(fence.accept) == 0 { return msgs @@ -26,58 +31,64 @@ func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai } return nmsgs } +func appendJSONTimeFormat(b []byte, t time.Time) []byte { + b = append(b, '"') + b = t.AppendFormat(b, "2006-01-02T15:04:05.999999999Z07:00") + b = append(b, '"') + return b +} +func jsonTimeFormat(t time.Time) string { + var b []byte + b = appendJSONTimeFormat(b, t) + return string(b) +} func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) [][]byte { - jshookName := jsonString(hookName) - jstime := jsonString(details.timestamp.Format(tmfmt)) - pattern := fence.glob if details.command == "drop" { - return [][]byte{[]byte(`{"command":"drop","hook":` + jshookName + `,"time":` + jstime + `}`)} + return [][]byte{[]byte(`{"command":"drop","hook":` + jsonString(hookName) + `,"time":` + jsonTimeFormat(details.timestamp) + `}`)} } - match := true - if pattern != "" && pattern != "*" { - match, _ = glob.Match(pattern, details.id) + if len(fence.glob) > 0 && !(len(fence.glob) == 1 && fence.glob[0] == '*') { + match, _ := glob.Match(fence.glob, details.id) + if !match { + return nil + } } - if !match { + if details.obj == nil || !details.obj.IsGeometry() { return nil } - - sw.mu.Lock() - nofields := sw.nofields - sw.mu.Unlock() - - if details.obj == nil || !details.obj.IsGeometry() || (details.command == "fset" && nofields) { - return nil + if details.command == "fset" { + sw.mu.Lock() + nofields := sw.nofields + sw.mu.Unlock() + if nofields { + return nil + } + } + if details.command == "del" { + return [][]byte{[]byte(`{"command":"del","hook":` + jsonString(hookName) + `,"id":` + jsonString(details.id) + `,"time":` + jsonTimeFormat(details.timestamp) + `}`)} } - match = false var roamkeys, roamids []string var roammeters []float64 - detect := "outside" + var detect string = "outside" if fence != nil { if fence.roam.on { if details.command == "set" { - // println("roam", fence.roam.key, fence.roam.id, strconv.FormatFloat(fence.roam.meters, 'f', -1, 64)) roamkeys, roamids, roammeters = fenceMatchRoam(sw.c, fence, details.key, details.id, details.obj) } if len(roamids) == 0 || len(roamids) != len(roamkeys) { return nil } - match = true detect = "roam" } else { - // not using roaming match1 := fenceMatchObject(fence, details.oldObj) match2 := fenceMatchObject(fence, details.obj) if match1 && match2 { - match = true detect = "inside" } else if match1 && !match2 { - match = true detect = "exit" } else if !match1 && match2 { - match = true detect = "enter" if details.command == "fset" { detect = "inside" @@ -101,7 +112,6 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai temp = true } if fenceMatchObject(fence, ls) { - //match = true detect = "cross" } if temp { @@ -112,12 +122,14 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai } } } - if details.command == "del" { - return [][]byte{[]byte(`{"command":"del","hook":` + jshookName + `,"id":` + jsonString(details.id) + `,"time":` + jstime + `}`)} - } + if details.fmap == nil { return nil } + if fence.detect != nil && !fence.detect[detect] { + return nil + } + sw.mu.Lock() sw.fmap = details.fmap sw.fullFields = true @@ -159,23 +171,21 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai } } - jskey := jsonString(details.key) - msgs := make([][]byte, 0, 4) if fence.detect == nil || fence.detect[detect] { if len(res) > 0 && res[0] == '{' { - res = makemsg(details.command, group, detect, jshookName, jskey, jstime, res[1:]) + res = makemsg(details.command, group, detect, hookName, details.key, details.timestamp, res[1:]) } msgs = append(msgs, res) } switch detect { case "enter": if fence.detect == nil || fence.detect["inside"] { - msgs = append(msgs, makemsg(details.command, group, "inside", jshookName, jskey, jstime, res[1:])) + msgs = append(msgs, makemsg(details.command, group, "inside", hookName, details.key, details.timestamp, res[1:])) } case "exit", "cross": if fence.detect == nil || fence.detect["outside"] { - msgs = append(msgs, makemsg(details.command, group, "outside", jshookName, jskey, jstime, res[1:])) + msgs = append(msgs, makemsg(details.command, group, "outside", hookName, details.key, details.timestamp, res[1:])) } case "roam": if len(msgs) > 0 { @@ -185,9 +195,9 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai nmsg := append([]byte(nil), msg...) nmsg = append(nmsg, `,"nearby":{"key":`...) - nmsg = append(nmsg, jsonString(roamkeys[i])...) + nmsg = appendJSONString(nmsg, roamkeys[i]) nmsg = append(nmsg, `,"id":`...) - nmsg = append(nmsg, jsonString(id)...) + nmsg = appendJSONString(nmsg, id) nmsg = append(nmsg, `,"meters":`...) nmsg = append(nmsg, strconv.FormatFloat(roammeters[i], 'f', -1, 64)...) @@ -234,14 +244,14 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai return msgs } -func makemsg(command, group, detect, jshookName, jskey, jstime string, tail []byte) []byte { +func makemsg(command, group, detect, hookName string, key string, t time.Time, tail []byte) []byte { var buf []byte buf = append(append(buf, `{"command":"`...), command...) buf = append(append(buf, `","group":"`...), group...) buf = append(append(buf, `","detect":"`...), detect...) - buf = append(append(buf, `","hook":`...), jshookName...) - buf = append(append(buf, `,"key":`...), jskey...) - buf = append(append(buf, `,"time":`...), jstime...) + buf = appendJSONString(append(buf, `","hook":`...), hookName) + buf = appendJSONString(append(buf, `,"key":`...), key) + buf = appendJSONTimeFormat(append(buf, `,"time":`...), t) buf = append(append(buf, ','), tail...) return buf } @@ -254,9 +264,11 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool { // we need to check this object against return false } + if fence.cmd == "nearby" { return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters) - } else if fence.cmd == "within" { + } + if fence.cmd == "within" { if fence.o != nil { return obj.Within(fence.o) } @@ -264,7 +276,8 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool { Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0}, Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0}, }) - } else if fence.cmd == "intersects" { + } + if fence.cmd == "intersects" { if fence.o != nil { return obj.Intersects(fence.o) } diff --git a/controller/json.go b/controller/json.go index 442a7ad7..91183f9f 100644 --- a/controller/json.go +++ b/controller/json.go @@ -15,6 +15,19 @@ import ( "github.com/tidwall/tile38/geojson" ) +func appendJSONString(b []byte, s string) []byte { + for i := 0; i < len(s); i++ { + if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 { + d, _ := json.Marshal(s) + return append(b, string(d)...) + } + } + b = append(b, '"') + b = append(b, s...) + b = append(b, '"') + return b +} + func jsonString(s string) string { for i := 0; i < len(s); i++ { if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 { diff --git a/tests/107/.gitignore b/tests/107/.gitignore new file mode 100644 index 00000000..ae2b2006 --- /dev/null +++ b/tests/107/.gitignore @@ -0,0 +1,3 @@ +appendonly.aof +log +data/ diff --git a/tests/107/LINK b/tests/107/LINK new file mode 100644 index 00000000..ce5b4d7f --- /dev/null +++ b/tests/107/LINK @@ -0,0 +1 @@ +https://github.com/tidwall/tile38/issues/107 diff --git a/tests/107/main.go b/tests/107/main.go new file mode 100644 index 00000000..71e20ab3 --- /dev/null +++ b/tests/107/main.go @@ -0,0 +1,412 @@ +package main + +import ( + "archive/zip" + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math/rand" + "net/http" + "os" + "os/exec" + "path" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/tidwall/gjson" + "github.com/tidwall/tile38/controller" +) + +const tile38Port = 9191 +const httpPort = 9292 +const dir = "data" + +var tile38Addr string +var httpAddr string + +var wd string +var server string + +var minX float64 +var minY float64 +var maxX float64 +var maxY float64 +var pool = &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", tile38Addr) + }, +} +var providedTile38 bool +var providedHTTP bool + +const blank = false +const hookServer = true + +var logf *os.File + +func main() { + flag.StringVar(&tile38Addr, "tile38", "", + "Tile38 address, leave blank to start a new server") + flag.StringVar(&httpAddr, "hook", "", + "Hook HTTP url, leave blank to start a new server") + flag.Parse() + log.Println("mockfill-107 (Github #107: Memory leak)") + + if tile38Addr == "" { + tile38Addr = "127.0.0.1:" + strconv.FormatInt(int64(tile38Port), 10) + } else { + providedTile38 = true + } + if httpAddr == "" { + httpAddr = "http://127.0.0.1:" + strconv.FormatInt(int64(httpPort), 10) + "/hook" + } else { + providedHTTP = true + } + var err error + wd, err = os.Getwd() + if err != nil { + log.Fatal(err) + } + logf, err = os.Create("log") + if err != nil { + log.Fatal(err) + } + defer logf.Close() + if !providedTile38 { + copyAOF() + go startTile38Server() + } + if !providedHTTP { + if hookServer { + go startHookServer() + } + } + go waitForServers(func() { + log.Printf("servers ready") + logServer("START") + setPoints() + logServer("DONE") + }) + select {} + return +} + +func startTile38Server() { + log.Println("start tile38 server") + err := controller.ListenAndServe("localhost", tile38Port, "data") + if err != nil { + log.Fatal(err) + } +} + +func startHookServer() { + log.Println("start hook server") + http.HandleFunc("/ping", func(w http.ResponseWriter, _ *http.Request) { + io.WriteString(w, "pong") + }) + http.HandleFunc("/hook", func(w http.ResponseWriter, req *http.Request) { + data, err := ioutil.ReadAll(req.Body) + if err != nil { + log.Fatal(err) + } + log.Println(string(data)) + }) + err := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", httpPort), nil) + if err != nil { + log.Fatal(err) + } +} + +func waitForServers(cb func()) { + log.Println("wait for servers") + var err error + start := time.Now() + for { + if time.Since(start) > time.Second*5 { + log.Fatal("connection failed:", err) + } + func() { + conn := pool.Get() + defer conn.Close() + var s string + s, err = redis.String(conn.Do("PING")) + if err != nil { + return + } + if s != "PONG" { + log.Fatalf("expected '%v', got '%v'", "PONG", s) + } + }() + if err == nil { + break + } + time.Sleep(time.Second / 5) + } + if hookServer { + start = time.Now() + for { + if time.Since(start) > time.Second*5 { + log.Fatal("connection failed:", err) + } + func() { + var resp *http.Response + resp, err = http.Get(httpAddr + "/notreal") + if err != nil { + return + } + defer resp.Body.Close() + if resp.StatusCode != 200 && resp.StatusCode != 404 { + log.Fatalf("expected '%v', got '%v'", "200 or 404", + resp.StatusCode) + } + }() + if err == nil { + break + } + time.Sleep(time.Second / 5) + } + } + cb() +} + +func downloadAOF() { + log.Println("downloading aof") + resp, err := http.Get("https://github.com/tidwall/tile38/files/675225/appendonly.aof.zip") + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Fatal(err) + } + rd, err := zip.NewReader(bytes.NewReader(body), int64(len(body))) + if err != nil { + log.Fatal(err) + } + for _, f := range rd.File { + if path.Ext(f.Name) == ".aof" { + rc, err := f.Open() + if err != nil { + log.Fatal(err) + } + defer rc.Close() + + data, err := ioutil.ReadAll(rc) + if err != nil { + log.Fatal(err) + } + err = ioutil.WriteFile(path.Join(wd, "appendonly.aof"), data, 0666) + if err != nil { + log.Fatal(err) + } + return + } + } + log.Fatal("invalid appendonly.aof.zip") +} + +func copyAOF() { + if err := os.RemoveAll(path.Join(wd, "data")); err != nil { + log.Fatal(err) + } + if err := os.MkdirAll(path.Join(wd, "data"), 0777); err != nil { + log.Fatal(err) + } + fin, err := os.Open(path.Join(wd, "appendonly.aof")) + if err != nil { + if os.IsNotExist(err) { + downloadAOF() + fin, err = os.Open(path.Join(wd, "appendonly.aof")) + if err != nil { + log.Fatal(err) + } + } else { + log.Fatal(err) + } + } + defer fin.Close() + + log.Println("load aof") + fout, err := os.Create(path.Join(wd, "data", "appendonly.aof")) + if err != nil { + log.Fatal(err) + } + defer fout.Close() + data, err := ioutil.ReadAll(fin) + if err != nil { + log.Fatal(err) + } + rep := httpAddr + rep = "$" + strconv.FormatInt(int64(len(rep)), 10) + "\r\n" + rep + "\r\n" + data = bytes.Replace(data, + []byte("$23\r\nhttp://172.17.0.1:9999/\r\n"), []byte(rep), -1) + if blank { + data = nil + } + if _, err := fout.Write(data); err != nil { + log.Fatal(err) + } +} + +func respGet(resp interface{}, idx ...int) interface{} { + for i := 0; i < len(idx); i++ { + arr, _ := redis.Values(resp, nil) + resp = arr[idx[i]] + } + return resp +} + +type PSAUX struct { + User string + PID int + CPU float64 + Mem float64 + VSZ int + RSS int + TTY string + Stat string + Start string + Time string + Command string +} + +func atoi(s string) int { + n, _ := strconv.ParseInt(s, 10, 64) + return int(n) +} +func atof(s string) float64 { + n, _ := strconv.ParseFloat(s, 64) + return float64(n) +} +func psaux(pid int) PSAUX { + var res []byte + res, err := exec.Command("ps", "ux", "-p", strconv.FormatInt(int64(pid), 10)).CombinedOutput() + if err != nil { + return PSAUX{} + } + pids := strconv.FormatInt(int64(pid), 10) + for _, line := range strings.Split(string(res), "\n") { + var words []string + for _, word := range strings.Split(line, " ") { + if word != "" { + words = append(words, word) + } + } + if len(words) >= 11 { + if words[1] == pids { + return PSAUX{ + User: words[0], + PID: atoi(words[1]), + CPU: atof(words[2]), + Mem: atof(words[3]), + VSZ: atoi(words[4]), + RSS: atoi(words[5]), + TTY: words[6], + Stat: words[7], + Start: words[8], + Time: words[9], + Command: words[10], + } + } + } + } + return PSAUX{} +} +func respGetFloat(resp interface{}, idx ...int) float64 { + resp = respGet(resp, idx...) + f, _ := redis.Float64(resp, nil) + return f +} +func logServer(tag string) { + conn := pool.Get() + defer conn.Close() + _, err := conn.Do("OUTPUT", "json") + if err != nil { + log.Fatal(err) + } + _, err = redis.String(conn.Do("GC")) + if err != nil { + log.Fatal(err) + } + json, err := redis.String(conn.Do("SERVER")) + if err != nil { + log.Fatal(err) + } + _, err = conn.Do("OUTPUT", "resp") + if err != nil { + log.Fatal(err) + } + rss := float64(psaux(int(gjson.Get(json, "stats.pid").Int())).RSS) / 1024 + heapSize := gjson.Get(json, "stats.heap_size").Float() / 1024 / 1024 + heapReleased := gjson.Get(json, "stats.heap_released").Float() / 1024 / 1024 + fmt.Fprintf(logf, "%s %10.2f MB (heap) %10.2f MB (released) %10.2f MB (system)\n", + time.Now().Format("2006-01-02T15:04:05Z07:00"), + heapSize, heapReleased, rss) +} +func setPoints() { + go func() { + var i int + for range time.NewTicker(time.Second * 1).C { + logServer(fmt.Sprintf("SECOND-%d", i*1)) + i++ + } + }() + + rand.Seed(time.Now().UnixNano()) + n := 1000000 + ex := time.Second * 10 + log.Printf("time to pump data (%d points, expires %s)", n, ex) + conn := pool.Get() + defer conn.Close() + if blank { + minX = -124.40959167480469 + minY = 32.53415298461914 + maxX = -114.13121032714844 + maxY = 42.009521484375 + } else { + resp, err := conn.Do("bounds", "boundies") + if err != nil { + log.Fatal(err) + } + minX = respGetFloat(resp, 0, 0) + minY = respGetFloat(resp, 0, 1) + maxX = respGetFloat(resp, 1, 0) + maxY = respGetFloat(resp, 1, 1) + } + log.Printf("bbox: [[%.4f,%.4f],[%.4f,%.4f]]\n", minX, minY, maxX, maxY) + var idx uint64 + for i := 0; i < 4; i++ { + go func() { + conn := pool.Get() + defer conn.Close() + for i := 0; i < n; i++ { + atomic.AddUint64(&idx, 1) + id := fmt.Sprintf("person:%d", idx) + x := rand.Float64()*(maxX-minX) + minX + y := rand.Float64()*(maxY-minY) + minY + ok, err := redis.String(conn.Do("SET", "people", id, + "EX", float64(ex/time.Second), + "POINT", y, x)) + if err != nil { + log.Fatal(err) + } + if ok != "OK" { + log.Fatalf("expected 'OK', got '%v", ok) + } + log.Printf("SET people %v EX %v POINT %v %v", + id, float64(ex/time.Second), y, x) + } + }() + } + select {} +} diff --git a/tests/keys_test.go b/tests/keys_test.go index 6fadacf6..c7950ea4 100644 --- a/tests/keys_test.go +++ b/tests/keys_test.go @@ -1,8 +1,18 @@ package tests import ( + "fmt" + "math" + "math/rand" + "os/exec" + "strconv" + "strings" + "sync" "testing" "time" + + "github.com/garyburd/redigo/redis" + "github.com/tidwall/gjson" ) func subTestKeys(t *testing.T, mc *mockServer) { @@ -17,6 +27,7 @@ func subTestKeys(t *testing.T, mc *mockServer) { runStep(t, mc, "SET", keys_SET_test) runStep(t, mc, "STATS", keys_STATS_test) runStep(t, mc, "TTL", keys_TTL_test) + runStep(t, mc, "SET EX", keys_SET_EX_test) } func keys_BOUNDS_test(mc *mockServer) error { @@ -192,3 +203,123 @@ func keys_TTL_test(mc *mockServer) error { {"TTL", "mykey", "myid"}, {1}, }) } + +type PSAUX struct { + User string + PID int + CPU float64 + Mem float64 + VSZ int + RSS int + TTY string + Stat string + Start string + Time string + Command string +} + +func atoi(s string) int { + n, _ := strconv.ParseInt(s, 10, 64) + return int(n) +} +func atof(s string) float64 { + n, _ := strconv.ParseFloat(s, 64) + return float64(n) +} +func psaux(pid int) PSAUX { + var res []byte + res, err := exec.Command("ps", "aux").CombinedOutput() + if err != nil { + return PSAUX{} + } + pids := strconv.FormatInt(int64(pid), 10) + for _, line := range strings.Split(string(res), "\n") { + var words []string + for _, word := range strings.Split(line, " ") { + if word != "" { + words = append(words, word) + } + if len(words) > 11 { + if words[1] == pids { + return PSAUX{ + User: words[0], + PID: atoi(words[1]), + CPU: atof(words[2]), + Mem: atof(words[3]), + VSZ: atoi(words[4]), + RSS: atoi(words[5]), + TTY: words[6], + Stat: words[7], + Start: words[8], + Time: words[9], + Command: words[10], + } + } + } + } + } + return PSAUX{} +} +func keys_SET_EX_test(mc *mockServer) (err error) { + rand.Seed(time.Now().UnixNano()) + mc.conn.Do("GC") + mc.conn.Do("OUTPUT", "json") + var json string + json, err = redis.String(mc.conn.Do("SERVER")) + if err != nil { + return + } + heap := gjson.Get(json, "stats.heap_size").Int() + //released := gjson.Get(json, "stats.heap_released").Int() + //fmt.Printf("%v %v %v\n", heap, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ) + mc.conn.Do("OUTPUT", "resp") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 20000; i++ { + val := fmt.Sprintf("val:%d", i) + // fmt.Printf("id: %s\n", val) + var resp string + var lat, lon float64 + lat = rand.Float64()*180 - 90 + lon = rand.Float64()*360 - 180 + resp, err = redis.String(mc.conn.Do("SET", "mykey", val, "EX", 1+rand.Float64(), "POINT", lat, lon)) + if err != nil { + return + } + if resp != "OK" { + err = fmt.Errorf("expected 'OK', got '%s'", resp) + return + } + } + }() + wg.Wait() + time.Sleep(time.Second * 3) + wg.Add(1) + go func() { + defer wg.Done() + mc.conn.Do("GC") + mc.conn.Do("OUTPUT", "json") + var json string + json, err = redis.String(mc.conn.Do("SERVER")) + if err != nil { + return + } + mc.conn.Do("OUTPUT", "resp") + heap2 := gjson.Get(json, "stats.heap_size").Int() + //released := gjson.Get(json, "stats.heap_released").Int() + //fmt.Printf("%v %v %v\n", heap2, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ) + if math.Abs(float64(heap)-float64(heap2)) > 100000 { + err = fmt.Errorf("garbage not collecting, possible leak") + return + } + }() + wg.Wait() + if err != nil { + return + } + mc.conn.Do("FLUSHDB") + return nil +} diff --git a/tests/mock_test.go b/tests/mock_test.go index c7a01abb..3f387744 100644 --- a/tests/mock_test.go +++ b/tests/mock_test.go @@ -195,6 +195,7 @@ func (mc *mockServer) DoExpect(expect interface{}, commandName string, args ...i } return err } + oresp := resp resp = normalize(resp) if expect == nil && resp != nil { return fmt.Errorf("expected '%v', got '%v'", expect, resp) @@ -225,6 +226,9 @@ func (mc *mockServer) DoExpect(expect interface{}, commandName string, args ...i resp = string([]byte(b)) } } + if fn, ok := expect.(func(v, org interface{}) (resp, expect interface{})); ok { + resp, expect = fn(resp, oresp) + } if fn, ok := expect.(func(v interface{}) (resp, expect interface{})); ok { resp, expect = fn(resp) }