test app for #107

This commit is contained in:
Josh Baker 2016-12-28 11:16:28 -07:00
parent b8a0f59b81
commit 3e3d364911
8 changed files with 653 additions and 49 deletions

View File

@ -56,12 +56,15 @@ func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) {
// It's runs through every item that has been marked as expires five times // It's runs through every item that has been marked as expires five times
// per second. // per second.
func (c *Controller) backgroundExpiring() { func (c *Controller) backgroundExpiring() {
const stop = 0
const delay = 1
const nodelay = 2
for { for {
ok := func() bool { op := func() int {
c.mu.Lock() c.mu.RLock()
defer c.mu.Unlock() defer c.mu.RUnlock()
if c.stopBackgroundExpiring { if c.stopBackgroundExpiring {
return false return stop
} }
// Only excute for leaders. Followers should ignore. // Only excute for leaders. Followers should ignore.
if c.config.FollowHost == "" { if c.config.FollowHost == "" {
@ -70,28 +73,52 @@ func (c *Controller) backgroundExpiring() {
for id, at := range m { for id, at := range m {
if now.After(at) { if now.After(at) {
// issue a DEL command // issue a DEL command
c.mu.RUnlock()
c.mu.Lock()
// double check because locks were swapped
var del bool
if m2, ok := c.expires[key]; ok {
if at2, ok := m2[id]; ok {
if now.After(at2) {
del = true
}
}
}
if !del {
return nodelay
}
c.statsExpired++ c.statsExpired++
msg := &server.Message{} msg := &server.Message{}
msg.Values = resp.MultiBulkValue("del", key, id).Array() msg.Values = resp.MultiBulkValue("del", key, id).Array()
msg.Command = "del" msg.Command = "del"
_, d, err := c.cmdDel(msg) _, d, err := c.cmdDel(msg)
if err != nil { if err != nil {
c.mu.Unlock()
log.Fatal(err) log.Fatal(err)
continue continue
} }
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil { if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
c.mu.Unlock()
log.Fatal(err) log.Fatal(err)
continue continue
} }
c.mu.Unlock()
c.mu.RLock()
return nodelay
} }
} }
} }
} }
return true return delay
}() }()
if !ok { switch op {
case stop:
return return
case delay:
time.Sleep(time.Millisecond * 100)
case nodelay:
time.Sleep(time.Microsecond)
} }
time.Sleep(time.Second / 5)
} }
} }

View File

@ -1,8 +1,10 @@
package controller package controller
import ( import (
"fmt"
"math" "math"
"strconv" "strconv"
"time"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/tidwall/tile38/controller/glob" "github.com/tidwall/tile38/controller/glob"
@ -10,10 +12,13 @@ import (
"github.com/tidwall/tile38/geojson" "github.com/tidwall/tile38/geojson"
) )
var tmfmt = "2006-01-02T15:04:05.999999999Z07:00"
// FenceMatch executes a fence match returns back json messages for fence detection. // FenceMatch executes a fence match returns back json messages for fence detection.
func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) [][]byte { func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) [][]byte {
overall := time.Now()
defer func() {
return
fmt.Printf(">> %v\n", time.Since(overall))
}()
msgs := fenceMatch(hookName, sw, fence, details) msgs := fenceMatch(hookName, sw, fence, details)
if len(fence.accept) == 0 { if len(fence.accept) == 0 {
return msgs return msgs
@ -26,58 +31,64 @@ func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
} }
return nmsgs return nmsgs
} }
func appendJSONTimeFormat(b []byte, t time.Time) []byte {
b = append(b, '"')
b = t.AppendFormat(b, "2006-01-02T15:04:05.999999999Z07:00")
b = append(b, '"')
return b
}
func jsonTimeFormat(t time.Time) string {
var b []byte
b = appendJSONTimeFormat(b, t)
return string(b)
}
func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) [][]byte { func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) [][]byte {
jshookName := jsonString(hookName)
jstime := jsonString(details.timestamp.Format(tmfmt))
pattern := fence.glob
if details.command == "drop" { if details.command == "drop" {
return [][]byte{[]byte(`{"command":"drop","hook":` + jshookName + `,"time":` + jstime + `}`)} return [][]byte{[]byte(`{"command":"drop","hook":` + jsonString(hookName) + `,"time":` + jsonTimeFormat(details.timestamp) + `}`)}
} }
match := true if len(fence.glob) > 0 && !(len(fence.glob) == 1 && fence.glob[0] == '*') {
if pattern != "" && pattern != "*" { match, _ := glob.Match(fence.glob, details.id)
match, _ = glob.Match(pattern, details.id) if !match {
return nil
}
} }
if !match { if details.obj == nil || !details.obj.IsGeometry() {
return nil return nil
} }
if details.command == "fset" {
sw.mu.Lock() sw.mu.Lock()
nofields := sw.nofields nofields := sw.nofields
sw.mu.Unlock() sw.mu.Unlock()
if nofields {
if details.obj == nil || !details.obj.IsGeometry() || (details.command == "fset" && nofields) { return nil
return nil }
}
if details.command == "del" {
return [][]byte{[]byte(`{"command":"del","hook":` + jsonString(hookName) + `,"id":` + jsonString(details.id) + `,"time":` + jsonTimeFormat(details.timestamp) + `}`)}
} }
match = false
var roamkeys, roamids []string var roamkeys, roamids []string
var roammeters []float64 var roammeters []float64
detect := "outside" var detect string = "outside"
if fence != nil { if fence != nil {
if fence.roam.on { if fence.roam.on {
if details.command == "set" { if details.command == "set" {
// println("roam", fence.roam.key, fence.roam.id, strconv.FormatFloat(fence.roam.meters, 'f', -1, 64))
roamkeys, roamids, roammeters = fenceMatchRoam(sw.c, fence, details.key, details.id, details.obj) roamkeys, roamids, roammeters = fenceMatchRoam(sw.c, fence, details.key, details.id, details.obj)
} }
if len(roamids) == 0 || len(roamids) != len(roamkeys) { if len(roamids) == 0 || len(roamids) != len(roamkeys) {
return nil return nil
} }
match = true
detect = "roam" detect = "roam"
} else { } else {
// not using roaming // not using roaming
match1 := fenceMatchObject(fence, details.oldObj) match1 := fenceMatchObject(fence, details.oldObj)
match2 := fenceMatchObject(fence, details.obj) match2 := fenceMatchObject(fence, details.obj)
if match1 && match2 { if match1 && match2 {
match = true
detect = "inside" detect = "inside"
} else if match1 && !match2 { } else if match1 && !match2 {
match = true
detect = "exit" detect = "exit"
} else if !match1 && match2 { } else if !match1 && match2 {
match = true
detect = "enter" detect = "enter"
if details.command == "fset" { if details.command == "fset" {
detect = "inside" detect = "inside"
@ -101,7 +112,6 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
temp = true temp = true
} }
if fenceMatchObject(fence, ls) { if fenceMatchObject(fence, ls) {
//match = true
detect = "cross" detect = "cross"
} }
if temp { if temp {
@ -112,12 +122,14 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
} }
} }
} }
if details.command == "del" {
return [][]byte{[]byte(`{"command":"del","hook":` + jshookName + `,"id":` + jsonString(details.id) + `,"time":` + jstime + `}`)}
}
if details.fmap == nil { if details.fmap == nil {
return nil return nil
} }
if fence.detect != nil && !fence.detect[detect] {
return nil
}
sw.mu.Lock() sw.mu.Lock()
sw.fmap = details.fmap sw.fmap = details.fmap
sw.fullFields = true sw.fullFields = true
@ -159,23 +171,21 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
} }
} }
jskey := jsonString(details.key)
msgs := make([][]byte, 0, 4) msgs := make([][]byte, 0, 4)
if fence.detect == nil || fence.detect[detect] { if fence.detect == nil || fence.detect[detect] {
if len(res) > 0 && res[0] == '{' { if len(res) > 0 && res[0] == '{' {
res = makemsg(details.command, group, detect, jshookName, jskey, jstime, res[1:]) res = makemsg(details.command, group, detect, hookName, details.key, details.timestamp, res[1:])
} }
msgs = append(msgs, res) msgs = append(msgs, res)
} }
switch detect { switch detect {
case "enter": case "enter":
if fence.detect == nil || fence.detect["inside"] { if fence.detect == nil || fence.detect["inside"] {
msgs = append(msgs, makemsg(details.command, group, "inside", jshookName, jskey, jstime, res[1:])) msgs = append(msgs, makemsg(details.command, group, "inside", hookName, details.key, details.timestamp, res[1:]))
} }
case "exit", "cross": case "exit", "cross":
if fence.detect == nil || fence.detect["outside"] { if fence.detect == nil || fence.detect["outside"] {
msgs = append(msgs, makemsg(details.command, group, "outside", jshookName, jskey, jstime, res[1:])) msgs = append(msgs, makemsg(details.command, group, "outside", hookName, details.key, details.timestamp, res[1:]))
} }
case "roam": case "roam":
if len(msgs) > 0 { if len(msgs) > 0 {
@ -185,9 +195,9 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
nmsg := append([]byte(nil), msg...) nmsg := append([]byte(nil), msg...)
nmsg = append(nmsg, `,"nearby":{"key":`...) nmsg = append(nmsg, `,"nearby":{"key":`...)
nmsg = append(nmsg, jsonString(roamkeys[i])...) nmsg = appendJSONString(nmsg, roamkeys[i])
nmsg = append(nmsg, `,"id":`...) nmsg = append(nmsg, `,"id":`...)
nmsg = append(nmsg, jsonString(id)...) nmsg = appendJSONString(nmsg, id)
nmsg = append(nmsg, `,"meters":`...) nmsg = append(nmsg, `,"meters":`...)
nmsg = append(nmsg, strconv.FormatFloat(roammeters[i], 'f', -1, 64)...) nmsg = append(nmsg, strconv.FormatFloat(roammeters[i], 'f', -1, 64)...)
@ -234,14 +244,14 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
return msgs return msgs
} }
func makemsg(command, group, detect, jshookName, jskey, jstime string, tail []byte) []byte { func makemsg(command, group, detect, hookName string, key string, t time.Time, tail []byte) []byte {
var buf []byte var buf []byte
buf = append(append(buf, `{"command":"`...), command...) buf = append(append(buf, `{"command":"`...), command...)
buf = append(append(buf, `","group":"`...), group...) buf = append(append(buf, `","group":"`...), group...)
buf = append(append(buf, `","detect":"`...), detect...) buf = append(append(buf, `","detect":"`...), detect...)
buf = append(append(buf, `","hook":`...), jshookName...) buf = appendJSONString(append(buf, `","hook":`...), hookName)
buf = append(append(buf, `,"key":`...), jskey...) buf = appendJSONString(append(buf, `,"key":`...), key)
buf = append(append(buf, `,"time":`...), jstime...) buf = appendJSONTimeFormat(append(buf, `,"time":`...), t)
buf = append(append(buf, ','), tail...) buf = append(append(buf, ','), tail...)
return buf return buf
} }
@ -254,9 +264,11 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool {
// we need to check this object against // we need to check this object against
return false return false
} }
if fence.cmd == "nearby" { if fence.cmd == "nearby" {
return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters) return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters)
} else if fence.cmd == "within" { }
if fence.cmd == "within" {
if fence.o != nil { if fence.o != nil {
return obj.Within(fence.o) return obj.Within(fence.o)
} }
@ -264,7 +276,8 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool {
Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0}, Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0},
Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0}, Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0},
}) })
} else if fence.cmd == "intersects" { }
if fence.cmd == "intersects" {
if fence.o != nil { if fence.o != nil {
return obj.Intersects(fence.o) return obj.Intersects(fence.o)
} }

View File

@ -15,6 +15,19 @@ import (
"github.com/tidwall/tile38/geojson" "github.com/tidwall/tile38/geojson"
) )
func appendJSONString(b []byte, s string) []byte {
for i := 0; i < len(s); i++ {
if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 {
d, _ := json.Marshal(s)
return append(b, string(d)...)
}
}
b = append(b, '"')
b = append(b, s...)
b = append(b, '"')
return b
}
func jsonString(s string) string { func jsonString(s string) string {
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 { if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 {

3
tests/107/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
appendonly.aof
log
data/

1
tests/107/LINK Normal file
View File

@ -0,0 +1 @@
https://github.com/tidwall/tile38/issues/107

412
tests/107/main.go Normal file
View File

@ -0,0 +1,412 @@
package main
import (
"archive/zip"
"bytes"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/garyburd/redigo/redis"
"github.com/tidwall/gjson"
"github.com/tidwall/tile38/controller"
)
const tile38Port = 9191
const httpPort = 9292
const dir = "data"
var tile38Addr string
var httpAddr string
var wd string
var server string
var minX float64
var minY float64
var maxX float64
var maxY float64
var pool = &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", tile38Addr)
},
}
var providedTile38 bool
var providedHTTP bool
const blank = false
const hookServer = true
var logf *os.File
func main() {
flag.StringVar(&tile38Addr, "tile38", "",
"Tile38 address, leave blank to start a new server")
flag.StringVar(&httpAddr, "hook", "",
"Hook HTTP url, leave blank to start a new server")
flag.Parse()
log.Println("mockfill-107 (Github #107: Memory leak)")
if tile38Addr == "" {
tile38Addr = "127.0.0.1:" + strconv.FormatInt(int64(tile38Port), 10)
} else {
providedTile38 = true
}
if httpAddr == "" {
httpAddr = "http://127.0.0.1:" + strconv.FormatInt(int64(httpPort), 10) + "/hook"
} else {
providedHTTP = true
}
var err error
wd, err = os.Getwd()
if err != nil {
log.Fatal(err)
}
logf, err = os.Create("log")
if err != nil {
log.Fatal(err)
}
defer logf.Close()
if !providedTile38 {
copyAOF()
go startTile38Server()
}
if !providedHTTP {
if hookServer {
go startHookServer()
}
}
go waitForServers(func() {
log.Printf("servers ready")
logServer("START")
setPoints()
logServer("DONE")
})
select {}
return
}
func startTile38Server() {
log.Println("start tile38 server")
err := controller.ListenAndServe("localhost", tile38Port, "data")
if err != nil {
log.Fatal(err)
}
}
func startHookServer() {
log.Println("start hook server")
http.HandleFunc("/ping", func(w http.ResponseWriter, _ *http.Request) {
io.WriteString(w, "pong")
})
http.HandleFunc("/hook", func(w http.ResponseWriter, req *http.Request) {
data, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Fatal(err)
}
log.Println(string(data))
})
err := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", httpPort), nil)
if err != nil {
log.Fatal(err)
}
}
func waitForServers(cb func()) {
log.Println("wait for servers")
var err error
start := time.Now()
for {
if time.Since(start) > time.Second*5 {
log.Fatal("connection failed:", err)
}
func() {
conn := pool.Get()
defer conn.Close()
var s string
s, err = redis.String(conn.Do("PING"))
if err != nil {
return
}
if s != "PONG" {
log.Fatalf("expected '%v', got '%v'", "PONG", s)
}
}()
if err == nil {
break
}
time.Sleep(time.Second / 5)
}
if hookServer {
start = time.Now()
for {
if time.Since(start) > time.Second*5 {
log.Fatal("connection failed:", err)
}
func() {
var resp *http.Response
resp, err = http.Get(httpAddr + "/notreal")
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 && resp.StatusCode != 404 {
log.Fatalf("expected '%v', got '%v'", "200 or 404",
resp.StatusCode)
}
}()
if err == nil {
break
}
time.Sleep(time.Second / 5)
}
}
cb()
}
func downloadAOF() {
log.Println("downloading aof")
resp, err := http.Get("https://github.com/tidwall/tile38/files/675225/appendonly.aof.zip")
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
rd, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
log.Fatal(err)
}
for _, f := range rd.File {
if path.Ext(f.Name) == ".aof" {
rc, err := f.Open()
if err != nil {
log.Fatal(err)
}
defer rc.Close()
data, err := ioutil.ReadAll(rc)
if err != nil {
log.Fatal(err)
}
err = ioutil.WriteFile(path.Join(wd, "appendonly.aof"), data, 0666)
if err != nil {
log.Fatal(err)
}
return
}
}
log.Fatal("invalid appendonly.aof.zip")
}
func copyAOF() {
if err := os.RemoveAll(path.Join(wd, "data")); err != nil {
log.Fatal(err)
}
if err := os.MkdirAll(path.Join(wd, "data"), 0777); err != nil {
log.Fatal(err)
}
fin, err := os.Open(path.Join(wd, "appendonly.aof"))
if err != nil {
if os.IsNotExist(err) {
downloadAOF()
fin, err = os.Open(path.Join(wd, "appendonly.aof"))
if err != nil {
log.Fatal(err)
}
} else {
log.Fatal(err)
}
}
defer fin.Close()
log.Println("load aof")
fout, err := os.Create(path.Join(wd, "data", "appendonly.aof"))
if err != nil {
log.Fatal(err)
}
defer fout.Close()
data, err := ioutil.ReadAll(fin)
if err != nil {
log.Fatal(err)
}
rep := httpAddr
rep = "$" + strconv.FormatInt(int64(len(rep)), 10) + "\r\n" + rep + "\r\n"
data = bytes.Replace(data,
[]byte("$23\r\nhttp://172.17.0.1:9999/\r\n"), []byte(rep), -1)
if blank {
data = nil
}
if _, err := fout.Write(data); err != nil {
log.Fatal(err)
}
}
func respGet(resp interface{}, idx ...int) interface{} {
for i := 0; i < len(idx); i++ {
arr, _ := redis.Values(resp, nil)
resp = arr[idx[i]]
}
return resp
}
type PSAUX struct {
User string
PID int
CPU float64
Mem float64
VSZ int
RSS int
TTY string
Stat string
Start string
Time string
Command string
}
func atoi(s string) int {
n, _ := strconv.ParseInt(s, 10, 64)
return int(n)
}
func atof(s string) float64 {
n, _ := strconv.ParseFloat(s, 64)
return float64(n)
}
func psaux(pid int) PSAUX {
var res []byte
res, err := exec.Command("ps", "ux", "-p", strconv.FormatInt(int64(pid), 10)).CombinedOutput()
if err != nil {
return PSAUX{}
}
pids := strconv.FormatInt(int64(pid), 10)
for _, line := range strings.Split(string(res), "\n") {
var words []string
for _, word := range strings.Split(line, " ") {
if word != "" {
words = append(words, word)
}
}
if len(words) >= 11 {
if words[1] == pids {
return PSAUX{
User: words[0],
PID: atoi(words[1]),
CPU: atof(words[2]),
Mem: atof(words[3]),
VSZ: atoi(words[4]),
RSS: atoi(words[5]),
TTY: words[6],
Stat: words[7],
Start: words[8],
Time: words[9],
Command: words[10],
}
}
}
}
return PSAUX{}
}
func respGetFloat(resp interface{}, idx ...int) float64 {
resp = respGet(resp, idx...)
f, _ := redis.Float64(resp, nil)
return f
}
func logServer(tag string) {
conn := pool.Get()
defer conn.Close()
_, err := conn.Do("OUTPUT", "json")
if err != nil {
log.Fatal(err)
}
_, err = redis.String(conn.Do("GC"))
if err != nil {
log.Fatal(err)
}
json, err := redis.String(conn.Do("SERVER"))
if err != nil {
log.Fatal(err)
}
_, err = conn.Do("OUTPUT", "resp")
if err != nil {
log.Fatal(err)
}
rss := float64(psaux(int(gjson.Get(json, "stats.pid").Int())).RSS) / 1024
heapSize := gjson.Get(json, "stats.heap_size").Float() / 1024 / 1024
heapReleased := gjson.Get(json, "stats.heap_released").Float() / 1024 / 1024
fmt.Fprintf(logf, "%s %10.2f MB (heap) %10.2f MB (released) %10.2f MB (system)\n",
time.Now().Format("2006-01-02T15:04:05Z07:00"),
heapSize, heapReleased, rss)
}
func setPoints() {
go func() {
var i int
for range time.NewTicker(time.Second * 1).C {
logServer(fmt.Sprintf("SECOND-%d", i*1))
i++
}
}()
rand.Seed(time.Now().UnixNano())
n := 1000000
ex := time.Second * 10
log.Printf("time to pump data (%d points, expires %s)", n, ex)
conn := pool.Get()
defer conn.Close()
if blank {
minX = -124.40959167480469
minY = 32.53415298461914
maxX = -114.13121032714844
maxY = 42.009521484375
} else {
resp, err := conn.Do("bounds", "boundies")
if err != nil {
log.Fatal(err)
}
minX = respGetFloat(resp, 0, 0)
minY = respGetFloat(resp, 0, 1)
maxX = respGetFloat(resp, 1, 0)
maxY = respGetFloat(resp, 1, 1)
}
log.Printf("bbox: [[%.4f,%.4f],[%.4f,%.4f]]\n", minX, minY, maxX, maxY)
var idx uint64
for i := 0; i < 4; i++ {
go func() {
conn := pool.Get()
defer conn.Close()
for i := 0; i < n; i++ {
atomic.AddUint64(&idx, 1)
id := fmt.Sprintf("person:%d", idx)
x := rand.Float64()*(maxX-minX) + minX
y := rand.Float64()*(maxY-minY) + minY
ok, err := redis.String(conn.Do("SET", "people", id,
"EX", float64(ex/time.Second),
"POINT", y, x))
if err != nil {
log.Fatal(err)
}
if ok != "OK" {
log.Fatalf("expected 'OK', got '%v", ok)
}
log.Printf("SET people %v EX %v POINT %v %v",
id, float64(ex/time.Second), y, x)
}
}()
}
select {}
}

View File

@ -1,8 +1,18 @@
package tests package tests
import ( import (
"fmt"
"math"
"math/rand"
"os/exec"
"strconv"
"strings"
"sync"
"testing" "testing"
"time" "time"
"github.com/garyburd/redigo/redis"
"github.com/tidwall/gjson"
) )
func subTestKeys(t *testing.T, mc *mockServer) { func subTestKeys(t *testing.T, mc *mockServer) {
@ -17,6 +27,7 @@ func subTestKeys(t *testing.T, mc *mockServer) {
runStep(t, mc, "SET", keys_SET_test) runStep(t, mc, "SET", keys_SET_test)
runStep(t, mc, "STATS", keys_STATS_test) runStep(t, mc, "STATS", keys_STATS_test)
runStep(t, mc, "TTL", keys_TTL_test) runStep(t, mc, "TTL", keys_TTL_test)
runStep(t, mc, "SET EX", keys_SET_EX_test)
} }
func keys_BOUNDS_test(mc *mockServer) error { func keys_BOUNDS_test(mc *mockServer) error {
@ -192,3 +203,123 @@ func keys_TTL_test(mc *mockServer) error {
{"TTL", "mykey", "myid"}, {1}, {"TTL", "mykey", "myid"}, {1},
}) })
} }
type PSAUX struct {
User string
PID int
CPU float64
Mem float64
VSZ int
RSS int
TTY string
Stat string
Start string
Time string
Command string
}
func atoi(s string) int {
n, _ := strconv.ParseInt(s, 10, 64)
return int(n)
}
func atof(s string) float64 {
n, _ := strconv.ParseFloat(s, 64)
return float64(n)
}
func psaux(pid int) PSAUX {
var res []byte
res, err := exec.Command("ps", "aux").CombinedOutput()
if err != nil {
return PSAUX{}
}
pids := strconv.FormatInt(int64(pid), 10)
for _, line := range strings.Split(string(res), "\n") {
var words []string
for _, word := range strings.Split(line, " ") {
if word != "" {
words = append(words, word)
}
if len(words) > 11 {
if words[1] == pids {
return PSAUX{
User: words[0],
PID: atoi(words[1]),
CPU: atof(words[2]),
Mem: atof(words[3]),
VSZ: atoi(words[4]),
RSS: atoi(words[5]),
TTY: words[6],
Stat: words[7],
Start: words[8],
Time: words[9],
Command: words[10],
}
}
}
}
}
return PSAUX{}
}
func keys_SET_EX_test(mc *mockServer) (err error) {
rand.Seed(time.Now().UnixNano())
mc.conn.Do("GC")
mc.conn.Do("OUTPUT", "json")
var json string
json, err = redis.String(mc.conn.Do("SERVER"))
if err != nil {
return
}
heap := gjson.Get(json, "stats.heap_size").Int()
//released := gjson.Get(json, "stats.heap_released").Int()
//fmt.Printf("%v %v %v\n", heap, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ)
mc.conn.Do("OUTPUT", "resp")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20000; i++ {
val := fmt.Sprintf("val:%d", i)
// fmt.Printf("id: %s\n", val)
var resp string
var lat, lon float64
lat = rand.Float64()*180 - 90
lon = rand.Float64()*360 - 180
resp, err = redis.String(mc.conn.Do("SET", "mykey", val, "EX", 1+rand.Float64(), "POINT", lat, lon))
if err != nil {
return
}
if resp != "OK" {
err = fmt.Errorf("expected 'OK', got '%s'", resp)
return
}
}
}()
wg.Wait()
time.Sleep(time.Second * 3)
wg.Add(1)
go func() {
defer wg.Done()
mc.conn.Do("GC")
mc.conn.Do("OUTPUT", "json")
var json string
json, err = redis.String(mc.conn.Do("SERVER"))
if err != nil {
return
}
mc.conn.Do("OUTPUT", "resp")
heap2 := gjson.Get(json, "stats.heap_size").Int()
//released := gjson.Get(json, "stats.heap_released").Int()
//fmt.Printf("%v %v %v\n", heap2, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ)
if math.Abs(float64(heap)-float64(heap2)) > 100000 {
err = fmt.Errorf("garbage not collecting, possible leak")
return
}
}()
wg.Wait()
if err != nil {
return
}
mc.conn.Do("FLUSHDB")
return nil
}

View File

@ -195,6 +195,7 @@ func (mc *mockServer) DoExpect(expect interface{}, commandName string, args ...i
} }
return err return err
} }
oresp := resp
resp = normalize(resp) resp = normalize(resp)
if expect == nil && resp != nil { if expect == nil && resp != nil {
return fmt.Errorf("expected '%v', got '%v'", expect, resp) return fmt.Errorf("expected '%v', got '%v'", expect, resp)
@ -225,6 +226,9 @@ func (mc *mockServer) DoExpect(expect interface{}, commandName string, args ...i
resp = string([]byte(b)) resp = string([]byte(b))
} }
} }
if fn, ok := expect.(func(v, org interface{}) (resp, expect interface{})); ok {
resp, expect = fn(resp, oresp)
}
if fn, ok := expect.(func(v interface{}) (resp, expect interface{})); ok { if fn, ok := expect.(func(v interface{}) (resp, expect interface{})); ok {
resp, expect = fn(resp) resp, expect = fn(resp)
} }