From f9fa48db21bcd14a3a0f94be2106b9407697bfd9 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Wed, 29 Mar 2017 12:50:04 -0700 Subject: [PATCH] use redis-style expires Updated the Tile38 expires to match the Redis implmentation at https://redis.io/commands/expire#how-redis-expires-keys. It now supports passive and active expires with sub-millisecond accuracy. This addresses issue #156 --- controller/controller.go | 2 + controller/crud.go | 49 +++++++--- controller/expire.go | 192 ++++++++++++++++++++++----------------- controller/search.go | 11 ++- tests/keys_test.go | 77 +++++----------- 5 files changed, 181 insertions(+), 150 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index eb1ce533..41b89b50 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -90,6 +90,7 @@ type Controller struct { hookcols map[string]map[string]*Hook // col key aofconnM map[net.Conn]bool expires map[string]map[string]time.Time + exlist []exitem conns map[*server.Conn]*clientConn started time.Time http bool @@ -176,6 +177,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http return err } c.mu.Lock() + c.fillExpiresList() if c.config.FollowHost != "" { go c.follow(c.config.FollowHost, c.config.FollowPort, c.followc) } diff --git a/controller/crud.go b/controller/crud.go index aa1325b6..deaca8e5 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -165,6 +165,7 @@ func (c *Controller) cmdGet(msg *server.Message) (string, error) { return "", errKeyNotFound } o, fields, ok := col.Get(id) + ok = ok && !c.hasExpired(key, id) if !ok { if msg.OutputType == server.RESP { return "$-1\r\n", nil @@ -372,6 +373,7 @@ func (c *Controller) cmdPdel(msg *server.Message) (res string, d commandDetailsT return true } + var expired int col := c.getCol(d.key) if col != nil { g := glob.Parse(d.pattern, false) @@ -412,7 +414,11 @@ func (c *Controller) cmdPdel(msg *server.Message) (res string, d commandDetailsT case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" case server.RESP: - res = ":" + strconv.FormatInt(int64(len(d.children)), 10) + "\r\n" + total := len(d.children) - expired + if total < 0 { + total = 0 + } + res = ":" + strconv.FormatInt(int64(total), 10) + "\r\n" } return } @@ -884,13 +890,19 @@ func (c *Controller) cmdExpire(msg *server.Message) (res string, d commandDetail col := c.getCol(key) if col != nil { _, _, ok = col.Get(id) - if ok { - c.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value))) - } + ok = ok && !c.hasExpired(key, id) + } + if ok { + c.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value))) + d.updated = true } switch msg.OutputType { case server.JSON: - res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + if ok { + res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + } else { + return "", d, errIDNotFound + } case server.RESP: if ok { res = ":1\r\n" @@ -918,20 +930,30 @@ func (c *Controller) cmdPersist(msg *server.Message) (res string, d commandDetai err = errInvalidNumberOfArguments return } - var bit int + var cleared bool ok = false col := c.getCol(key) if col != nil { _, _, ok = col.Get(id) + ok = ok && !c.hasExpired(key, id) if ok { - bit = c.clearIDExpires(key, id) + cleared = c.clearIDExpires(key, id) } } + if !ok { + if msg.OutputType == server.RESP { + return ":0\r\n", d, nil + } + return "", d, errIDNotFound + } + d.command = "persist" + d.updated = cleared + d.timestamp = time.Now() switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" case server.RESP: - if ok && bit == 1 { + if cleared { res = ":1\r\n" } else { res = ":0\r\n" @@ -963,13 +985,18 @@ func (c *Controller) cmdTTL(msg *server.Message) (res string, err error) { col := c.getCol(key) if col != nil { _, _, ok = col.Get(id) + ok = ok && !c.hasExpired(key, id) if ok { var at time.Time at, ok2 = c.getExpires(key, id) if ok2 { - v = float64(at.Sub(time.Now())) / float64(time.Second) - if v < 0 { - v = 0 + if time.Now().After(at) { + ok2 = false + } else { + v = float64(at.Sub(time.Now())) / float64(time.Second) + if v < 0 { + v = 0 + } } } } diff --git a/controller/expire.go b/controller/expire.go index 7c2197d5..186a5c77 100644 --- a/controller/expire.go +++ b/controller/expire.go @@ -1,37 +1,65 @@ package controller import ( + "log" + "math/rand" "time" + "github.com/tidwall/btree" "github.com/tidwall/resp" - "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" ) +type exitem struct { + key, id string + at time.Time +} + +func (a *exitem) Less(v btree.Item, ctx interface{}) bool { + b := v.(*exitem) + if a.at.Before(b.at) { + return true + } + if a.at.After(b.at) { + return false + } + if a.key < b.key { + return true + } + if a.key > b.key { + return false + } + return a.id < b.id +} + // clearAllExpires removes all items that are marked at expires. func (c *Controller) clearAllExpires() { c.expires = make(map[string]map[string]time.Time) } -// clearIDExpires will clear a single item from the expires list. -func (c *Controller) clearIDExpires(key, id string) int { - m := c.expires[key] - if m == nil { - return 0 +// clearIDExpires clears a single item from the expires list. +func (c *Controller) clearIDExpires(key, id string) (cleared bool) { + if len(c.expires) == 0 { + return false + } + m, ok := c.expires[key] + if !ok { + return false + } + _, ok = m[id] + if !ok { + return false } delete(m, id) - if len(m) == 0 { - delete(c.expires, key) - } - return 1 + return true } -// clearKeyExpires will clear all items that are marked as expires from a single key. +// clearKeyExpires clears all items that are marked as expires from a single key. func (c *Controller) clearKeyExpires(key string) { delete(c.expires, key) } -// expireAt will mark an item as expires at a specific time. +// expireAt marks an item as expires at a specific time. func (c *Controller) expireAt(key, id string, at time.Time) { m := c.expires[key] if m == nil { @@ -39,86 +67,82 @@ func (c *Controller) expireAt(key, id string, at time.Time) { c.expires[key] = m } m[id] = at + if c.exlist != nil { + c.exlist = append(c.exlist, exitem{key, id, at}) + } } -// getExpires will return the when the item expires. +// getExpires returns the when an item expires. func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) { - m := c.expires[key] - if m == nil { - ok = false - return + if len(c.expires) == 0 { + return at, false + } + m, ok := c.expires[key] + if !ok { + return at, false } at, ok = m[id] - return + return at, ok } -// backgroundExpiring watches for when items must expire from the database. -// It's runs through every item that has been marked as expires five times -// per second. -func (c *Controller) backgroundExpiring() { - const stop = 0 - const delay = 1 - const nodelay = 2 - for { - op := func() int { - c.mu.RLock() - defer c.mu.RUnlock() - if c.stopBackgroundExpiring { - return stop - } - // Only excute for leaders. Followers should ignore. - if c.config.FollowHost == "" { - now := time.Now() - for key, m := range c.expires { - for id, at := range m { - if now.After(at) { - // issue a DEL command - c.mu.RUnlock() - c.mu.Lock() +// hasExpired returns true if an item has expired. +func (c *Controller) hasExpired(key, id string) bool { + at, ok := c.getExpires(key, id) + if !ok { + return false + } + return time.Now().After(at) +} - // double check because locks were swapped - var del bool - if m2, ok := c.expires[key]; ok { - if at2, ok := m2[id]; ok { - if now.After(at2) { - del = true - } - } - } - if !del { - return nodelay - } - c.statsExpired++ - msg := &server.Message{} - msg.Values = resp.MultiBulkValue("del", key, id).Array() - msg.Command = "del" - _, d, err := c.cmdDel(msg) - if err != nil { - c.mu.Unlock() - log.Fatal(err) - continue - } - if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil { - c.mu.Unlock() - log.Fatal(err) - continue - } - c.mu.Unlock() - c.mu.RLock() - return nodelay - } - } - } - } - return delay - }() - switch op { - case stop: - return - case delay: - time.Sleep(time.Millisecond * 100) - case nodelay: - time.Sleep(time.Microsecond) +func (c *Controller) fillExpiresList() { + c.exlist = make([]exitem, 0) + for key, m := range c.expires { + for id, at := range m { + c.exlist = append(c.exlist, exitem{key, id, at}) } } } + +// backgroundExpiring watches for when items that have expired must be purged +// from the database. It's executes 10 times a seconds. +func (c *Controller) backgroundExpiring() { + rand.Seed(time.Now().UnixNano()) + for { + c.mu.Lock() + if c.stopBackgroundExpiring { + c.mu.Unlock() + return + } + now := time.Now() + var purged int + for i := 0; i < 20 && len(c.exlist) > 0; i++ { + ix := rand.Int() % len(c.exlist) + if now.After(c.exlist[ix].at) { + if c.hasExpired(c.exlist[ix].key, c.exlist[ix].id) { + msg := &server.Message{} + msg.Values = resp.MultiBulkValue("del", c.exlist[ix].key, c.exlist[ix].id).Array() + msg.Command = "del" + _, d, err := c.cmdDel(msg) + if err != nil { + c.mu.Unlock() + log.Fatal(err) + continue + } + if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil { + c.mu.Unlock() + log.Fatal(err) + continue + } + purged++ + } + c.exlist[ix] = c.exlist[len(c.exlist)-1] + c.exlist = c.exlist[:len(c.exlist)-1] + } + } + c.mu.Unlock() + if purged > 5 { + continue + } + time.Sleep(time.Second / 10) + } +} diff --git a/controller/search.go b/controller/search.go index 3430eeef..18902943 100644 --- a/controller/search.go +++ b/controller/search.go @@ -295,7 +295,6 @@ func (c *Controller) cmdNearby(msg *server.Message) (res string, err error) { if s.fence { return "", s } - minZ, maxZ := zMinMaxFromWheres(s.wheres) sw, err := c.newScanWriter(wr, msg, s.key, s.output, s.precision, s.glob, false, s.limit, s.wheres, s.nofields) if err != nil { @@ -307,12 +306,14 @@ func (c *Controller) cmdNearby(msg *server.Message) (res string, err error) { sw.writeHead() if sw.col != nil { iter := func(id string, o geojson.Object, fields []float64) bool { + if c.hasExpired(s.key, id) { + return true + } // Calculate distance if we need to distance := 0.0 if s.distance { distance = o.CalculatedPoint().DistanceTo(geojson.Position{X: s.lon, Y: s.lat, Z: 0}) } - return sw.writeObject(ScanWriterParams{ id: id, o: o, @@ -369,6 +370,9 @@ func (c *Controller) cmdWithinOrIntersects(cmd string, msg *server.Message) (res if cmd == "within" { s.cursor = sw.col.Within(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, minZ, maxZ, func(id string, o geojson.Object, fields []float64) bool { + if c.hasExpired(s.key, id) { + return true + } return sw.writeObject(ScanWriterParams{ id: id, o: o, @@ -379,6 +383,9 @@ func (c *Controller) cmdWithinOrIntersects(cmd string, msg *server.Message) (res } else if cmd == "intersects" { s.cursor = sw.col.Intersects(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, minZ, maxZ, func(id string, o geojson.Object, fields []float64) bool { + if c.hasExpired(s.key, id) { + return true + } return sw.writeObject(ScanWriterParams{ id: id, o: o, diff --git a/tests/keys_test.go b/tests/keys_test.go index 8e2085be..5f207ef3 100644 --- a/tests/keys_test.go +++ b/tests/keys_test.go @@ -1,13 +1,12 @@ package tests import ( + "errors" "fmt" - "math" "math/rand" "os/exec" "strconv" "strings" - "sync" "testing" "time" @@ -264,63 +263,35 @@ func psaux(pid int) PSAUX { } func keys_SET_EX_test(mc *mockServer) (err error) { rand.Seed(time.Now().UnixNano()) - mc.conn.Do("GC") - mc.conn.Do("OUTPUT", "json") - var json string - json, err = redis.String(mc.conn.Do("SERVER")) - if err != nil { - return - } - heap := gjson.Get(json, "stats.heap_size").Int() - //released := gjson.Get(json, "stats.heap_released").Int() - //fmt.Printf("%v %v %v\n", heap, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ) - mc.conn.Do("OUTPUT", "resp") - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 20000; i++ { - val := fmt.Sprintf("val:%d", i) - // fmt.Printf("id: %s\n", val) - var resp string - var lat, lon float64 - lat = rand.Float64()*180 - 90 - lon = rand.Float64()*360 - 180 - resp, err = redis.String(mc.conn.Do("SET", "mykey", val, "EX", 1+rand.Float64(), "POINT", lat, lon)) - if err != nil { - return - } - if resp != "OK" { - err = fmt.Errorf("expected 'OK', got '%s'", resp) - return - } - } - }() - wg.Wait() - time.Sleep(time.Second * 3) - wg.Add(1) - go func() { - defer wg.Done() - mc.conn.Do("GC") - mc.conn.Do("OUTPUT", "json") - var json string - json, err = redis.String(mc.conn.Do("SERVER")) + // add a bunch of points + for i := 0; i < 20000; i++ { + val := fmt.Sprintf("val:%d", i) + var resp string + var lat, lon float64 + lat = rand.Float64()*180 - 90 + lon = rand.Float64()*360 - 180 + resp, err = redis.String(mc.conn.Do("SET", + fmt.Sprintf("mykey%d", i%3), val, + "EX", 1+rand.Float64(), + "POINT", lat, lon)) if err != nil { return } - mc.conn.Do("OUTPUT", "resp") - heap2 := gjson.Get(json, "stats.heap_size").Int() - //released := gjson.Get(json, "stats.heap_released").Int() - //fmt.Printf("%v %v %v\n", heap2, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ) - if math.Abs(float64(heap)-float64(heap2)) > 100000 { - err = fmt.Errorf("garbage not collecting, possible leak") + if resp != "OK" { + err = fmt.Errorf("expected 'OK', got '%s'", resp) return } - }() - wg.Wait() - if err != nil { - return + time.Sleep(time.Nanosecond) + } + time.Sleep(time.Second * 3) + mc.conn.Do("OUTPUT", "json") + json, _ := redis.String(mc.conn.Do("SERVER")) + if !gjson.Get(json, "ok").Bool() { + return errors.New("not ok") + } + if gjson.Get(json, "stats.num_objects").Int() > 0 { + return errors.New("items left in database") } mc.conn.Do("FLUSHDB") return nil