use redis-style expires

Updated the Tile38 expires to match the Redis implmentation at
https://redis.io/commands/expire#how-redis-expires-keys.

It now supports passive and active expires with sub-millisecond
accuracy.

This addresses issue #156
This commit is contained in:
Josh Baker 2017-03-29 12:50:04 -07:00
parent a2fe25865c
commit f9fa48db21
5 changed files with 181 additions and 150 deletions

View File

@ -90,6 +90,7 @@ type Controller struct {
hookcols map[string]map[string]*Hook // col key hookcols map[string]map[string]*Hook // col key
aofconnM map[net.Conn]bool aofconnM map[net.Conn]bool
expires map[string]map[string]time.Time expires map[string]map[string]time.Time
exlist []exitem
conns map[*server.Conn]*clientConn conns map[*server.Conn]*clientConn
started time.Time started time.Time
http bool http bool
@ -176,6 +177,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
return err return err
} }
c.mu.Lock() c.mu.Lock()
c.fillExpiresList()
if c.config.FollowHost != "" { if c.config.FollowHost != "" {
go c.follow(c.config.FollowHost, c.config.FollowPort, c.followc) go c.follow(c.config.FollowHost, c.config.FollowPort, c.followc)
} }

View File

@ -165,6 +165,7 @@ func (c *Controller) cmdGet(msg *server.Message) (string, error) {
return "", errKeyNotFound return "", errKeyNotFound
} }
o, fields, ok := col.Get(id) o, fields, ok := col.Get(id)
ok = ok && !c.hasExpired(key, id)
if !ok { if !ok {
if msg.OutputType == server.RESP { if msg.OutputType == server.RESP {
return "$-1\r\n", nil return "$-1\r\n", nil
@ -372,6 +373,7 @@ func (c *Controller) cmdPdel(msg *server.Message) (res string, d commandDetailsT
return true return true
} }
var expired int
col := c.getCol(d.key) col := c.getCol(d.key)
if col != nil { if col != nil {
g := glob.Parse(d.pattern, false) g := glob.Parse(d.pattern, false)
@ -412,7 +414,11 @@ func (c *Controller) cmdPdel(msg *server.Message) (res string, d commandDetailsT
case server.JSON: case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP: case server.RESP:
res = ":" + strconv.FormatInt(int64(len(d.children)), 10) + "\r\n" total := len(d.children) - expired
if total < 0 {
total = 0
}
res = ":" + strconv.FormatInt(int64(total), 10) + "\r\n"
} }
return return
} }
@ -884,13 +890,19 @@ func (c *Controller) cmdExpire(msg *server.Message) (res string, d commandDetail
col := c.getCol(key) col := c.getCol(key)
if col != nil { if col != nil {
_, _, ok = col.Get(id) _, _, ok = col.Get(id)
if ok { ok = ok && !c.hasExpired(key, id)
c.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value))) }
} if ok {
c.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value)))
d.updated = true
} }
switch msg.OutputType { switch msg.OutputType {
case server.JSON: case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" if ok {
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
} else {
return "", d, errIDNotFound
}
case server.RESP: case server.RESP:
if ok { if ok {
res = ":1\r\n" res = ":1\r\n"
@ -918,20 +930,30 @@ func (c *Controller) cmdPersist(msg *server.Message) (res string, d commandDetai
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
var bit int var cleared bool
ok = false ok = false
col := c.getCol(key) col := c.getCol(key)
if col != nil { if col != nil {
_, _, ok = col.Get(id) _, _, ok = col.Get(id)
ok = ok && !c.hasExpired(key, id)
if ok { if ok {
bit = c.clearIDExpires(key, id) cleared = c.clearIDExpires(key, id)
} }
} }
if !ok {
if msg.OutputType == server.RESP {
return ":0\r\n", d, nil
}
return "", d, errIDNotFound
}
d.command = "persist"
d.updated = cleared
d.timestamp = time.Now()
switch msg.OutputType { switch msg.OutputType {
case server.JSON: case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP: case server.RESP:
if ok && bit == 1 { if cleared {
res = ":1\r\n" res = ":1\r\n"
} else { } else {
res = ":0\r\n" res = ":0\r\n"
@ -963,13 +985,18 @@ func (c *Controller) cmdTTL(msg *server.Message) (res string, err error) {
col := c.getCol(key) col := c.getCol(key)
if col != nil { if col != nil {
_, _, ok = col.Get(id) _, _, ok = col.Get(id)
ok = ok && !c.hasExpired(key, id)
if ok { if ok {
var at time.Time var at time.Time
at, ok2 = c.getExpires(key, id) at, ok2 = c.getExpires(key, id)
if ok2 { if ok2 {
v = float64(at.Sub(time.Now())) / float64(time.Second) if time.Now().After(at) {
if v < 0 { ok2 = false
v = 0 } else {
v = float64(at.Sub(time.Now())) / float64(time.Second)
if v < 0 {
v = 0
}
} }
} }
} }

View File

@ -1,37 +1,65 @@
package controller package controller
import ( import (
"log"
"math/rand"
"time" "time"
"github.com/tidwall/btree"
"github.com/tidwall/resp" "github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/controller/server"
) )
type exitem struct {
key, id string
at time.Time
}
func (a *exitem) Less(v btree.Item, ctx interface{}) bool {
b := v.(*exitem)
if a.at.Before(b.at) {
return true
}
if a.at.After(b.at) {
return false
}
if a.key < b.key {
return true
}
if a.key > b.key {
return false
}
return a.id < b.id
}
// clearAllExpires removes all items that are marked at expires. // clearAllExpires removes all items that are marked at expires.
func (c *Controller) clearAllExpires() { func (c *Controller) clearAllExpires() {
c.expires = make(map[string]map[string]time.Time) c.expires = make(map[string]map[string]time.Time)
} }
// clearIDExpires will clear a single item from the expires list. // clearIDExpires clears a single item from the expires list.
func (c *Controller) clearIDExpires(key, id string) int { func (c *Controller) clearIDExpires(key, id string) (cleared bool) {
m := c.expires[key] if len(c.expires) == 0 {
if m == nil { return false
return 0 }
m, ok := c.expires[key]
if !ok {
return false
}
_, ok = m[id]
if !ok {
return false
} }
delete(m, id) delete(m, id)
if len(m) == 0 { return true
delete(c.expires, key)
}
return 1
} }
// clearKeyExpires will clear all items that are marked as expires from a single key. // clearKeyExpires clears all items that are marked as expires from a single key.
func (c *Controller) clearKeyExpires(key string) { func (c *Controller) clearKeyExpires(key string) {
delete(c.expires, key) delete(c.expires, key)
} }
// expireAt will mark an item as expires at a specific time. // expireAt marks an item as expires at a specific time.
func (c *Controller) expireAt(key, id string, at time.Time) { func (c *Controller) expireAt(key, id string, at time.Time) {
m := c.expires[key] m := c.expires[key]
if m == nil { if m == nil {
@ -39,86 +67,82 @@ func (c *Controller) expireAt(key, id string, at time.Time) {
c.expires[key] = m c.expires[key] = m
} }
m[id] = at m[id] = at
if c.exlist != nil {
c.exlist = append(c.exlist, exitem{key, id, at})
}
} }
// getExpires will return the when the item expires. // getExpires returns the when an item expires.
func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) { func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) {
m := c.expires[key] if len(c.expires) == 0 {
if m == nil { return at, false
ok = false }
return m, ok := c.expires[key]
if !ok {
return at, false
} }
at, ok = m[id] at, ok = m[id]
return return at, ok
} }
// backgroundExpiring watches for when items must expire from the database. // hasExpired returns true if an item has expired.
// It's runs through every item that has been marked as expires five times func (c *Controller) hasExpired(key, id string) bool {
// per second. at, ok := c.getExpires(key, id)
func (c *Controller) backgroundExpiring() { if !ok {
const stop = 0 return false
const delay = 1 }
const nodelay = 2 return time.Now().After(at)
for { }
op := func() int {
c.mu.RLock()
defer c.mu.RUnlock()
if c.stopBackgroundExpiring {
return stop
}
// Only excute for leaders. Followers should ignore.
if c.config.FollowHost == "" {
now := time.Now()
for key, m := range c.expires {
for id, at := range m {
if now.After(at) {
// issue a DEL command
c.mu.RUnlock()
c.mu.Lock()
// double check because locks were swapped func (c *Controller) fillExpiresList() {
var del bool c.exlist = make([]exitem, 0)
if m2, ok := c.expires[key]; ok { for key, m := range c.expires {
if at2, ok := m2[id]; ok { for id, at := range m {
if now.After(at2) { c.exlist = append(c.exlist, exitem{key, id, at})
del = true
}
}
}
if !del {
return nodelay
}
c.statsExpired++
msg := &server.Message{}
msg.Values = resp.MultiBulkValue("del", key, id).Array()
msg.Command = "del"
_, d, err := c.cmdDel(msg)
if err != nil {
c.mu.Unlock()
log.Fatal(err)
continue
}
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
c.mu.Unlock()
log.Fatal(err)
continue
}
c.mu.Unlock()
c.mu.RLock()
return nodelay
}
}
}
}
return delay
}()
switch op {
case stop:
return
case delay:
time.Sleep(time.Millisecond * 100)
case nodelay:
time.Sleep(time.Microsecond)
} }
} }
} }
// backgroundExpiring watches for when items that have expired must be purged
// from the database. It's executes 10 times a seconds.
func (c *Controller) backgroundExpiring() {
rand.Seed(time.Now().UnixNano())
for {
c.mu.Lock()
if c.stopBackgroundExpiring {
c.mu.Unlock()
return
}
now := time.Now()
var purged int
for i := 0; i < 20 && len(c.exlist) > 0; i++ {
ix := rand.Int() % len(c.exlist)
if now.After(c.exlist[ix].at) {
if c.hasExpired(c.exlist[ix].key, c.exlist[ix].id) {
msg := &server.Message{}
msg.Values = resp.MultiBulkValue("del", c.exlist[ix].key, c.exlist[ix].id).Array()
msg.Command = "del"
_, d, err := c.cmdDel(msg)
if err != nil {
c.mu.Unlock()
log.Fatal(err)
continue
}
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
c.mu.Unlock()
log.Fatal(err)
continue
}
purged++
}
c.exlist[ix] = c.exlist[len(c.exlist)-1]
c.exlist = c.exlist[:len(c.exlist)-1]
}
}
c.mu.Unlock()
if purged > 5 {
continue
}
time.Sleep(time.Second / 10)
}
}

View File

@ -295,7 +295,6 @@ func (c *Controller) cmdNearby(msg *server.Message) (res string, err error) {
if s.fence { if s.fence {
return "", s return "", s
} }
minZ, maxZ := zMinMaxFromWheres(s.wheres) minZ, maxZ := zMinMaxFromWheres(s.wheres)
sw, err := c.newScanWriter(wr, msg, s.key, s.output, s.precision, s.glob, false, s.limit, s.wheres, s.nofields) sw, err := c.newScanWriter(wr, msg, s.key, s.output, s.precision, s.glob, false, s.limit, s.wheres, s.nofields)
if err != nil { if err != nil {
@ -307,12 +306,14 @@ func (c *Controller) cmdNearby(msg *server.Message) (res string, err error) {
sw.writeHead() sw.writeHead()
if sw.col != nil { if sw.col != nil {
iter := func(id string, o geojson.Object, fields []float64) bool { iter := func(id string, o geojson.Object, fields []float64) bool {
if c.hasExpired(s.key, id) {
return true
}
// Calculate distance if we need to // Calculate distance if we need to
distance := 0.0 distance := 0.0
if s.distance { if s.distance {
distance = o.CalculatedPoint().DistanceTo(geojson.Position{X: s.lon, Y: s.lat, Z: 0}) distance = o.CalculatedPoint().DistanceTo(geojson.Position{X: s.lon, Y: s.lat, Z: 0})
} }
return sw.writeObject(ScanWriterParams{ return sw.writeObject(ScanWriterParams{
id: id, id: id,
o: o, o: o,
@ -369,6 +370,9 @@ func (c *Controller) cmdWithinOrIntersects(cmd string, msg *server.Message) (res
if cmd == "within" { if cmd == "within" {
s.cursor = sw.col.Within(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, minZ, maxZ, s.cursor = sw.col.Within(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, minZ, maxZ,
func(id string, o geojson.Object, fields []float64) bool { func(id string, o geojson.Object, fields []float64) bool {
if c.hasExpired(s.key, id) {
return true
}
return sw.writeObject(ScanWriterParams{ return sw.writeObject(ScanWriterParams{
id: id, id: id,
o: o, o: o,
@ -379,6 +383,9 @@ func (c *Controller) cmdWithinOrIntersects(cmd string, msg *server.Message) (res
} else if cmd == "intersects" { } else if cmd == "intersects" {
s.cursor = sw.col.Intersects(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, minZ, maxZ, s.cursor = sw.col.Intersects(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, minZ, maxZ,
func(id string, o geojson.Object, fields []float64) bool { func(id string, o geojson.Object, fields []float64) bool {
if c.hasExpired(s.key, id) {
return true
}
return sw.writeObject(ScanWriterParams{ return sw.writeObject(ScanWriterParams{
id: id, id: id,
o: o, o: o,

View File

@ -1,13 +1,12 @@
package tests package tests
import ( import (
"errors"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -264,63 +263,35 @@ func psaux(pid int) PSAUX {
} }
func keys_SET_EX_test(mc *mockServer) (err error) { func keys_SET_EX_test(mc *mockServer) (err error) {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
mc.conn.Do("GC")
mc.conn.Do("OUTPUT", "json")
var json string
json, err = redis.String(mc.conn.Do("SERVER"))
if err != nil {
return
}
heap := gjson.Get(json, "stats.heap_size").Int()
//released := gjson.Get(json, "stats.heap_released").Int()
//fmt.Printf("%v %v %v\n", heap, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ)
mc.conn.Do("OUTPUT", "resp")
var wg sync.WaitGroup // add a bunch of points
wg.Add(1) for i := 0; i < 20000; i++ {
go func() { val := fmt.Sprintf("val:%d", i)
defer wg.Done() var resp string
for i := 0; i < 20000; i++ { var lat, lon float64
val := fmt.Sprintf("val:%d", i) lat = rand.Float64()*180 - 90
// fmt.Printf("id: %s\n", val) lon = rand.Float64()*360 - 180
var resp string resp, err = redis.String(mc.conn.Do("SET",
var lat, lon float64 fmt.Sprintf("mykey%d", i%3), val,
lat = rand.Float64()*180 - 90 "EX", 1+rand.Float64(),
lon = rand.Float64()*360 - 180 "POINT", lat, lon))
resp, err = redis.String(mc.conn.Do("SET", "mykey", val, "EX", 1+rand.Float64(), "POINT", lat, lon))
if err != nil {
return
}
if resp != "OK" {
err = fmt.Errorf("expected 'OK', got '%s'", resp)
return
}
}
}()
wg.Wait()
time.Sleep(time.Second * 3)
wg.Add(1)
go func() {
defer wg.Done()
mc.conn.Do("GC")
mc.conn.Do("OUTPUT", "json")
var json string
json, err = redis.String(mc.conn.Do("SERVER"))
if err != nil { if err != nil {
return return
} }
mc.conn.Do("OUTPUT", "resp") if resp != "OK" {
heap2 := gjson.Get(json, "stats.heap_size").Int() err = fmt.Errorf("expected 'OK', got '%s'", resp)
//released := gjson.Get(json, "stats.heap_released").Int()
//fmt.Printf("%v %v %v\n", heap2, released, psaux(int(gjson.Get(json, "stats.pid").Int())).VSZ)
if math.Abs(float64(heap)-float64(heap2)) > 100000 {
err = fmt.Errorf("garbage not collecting, possible leak")
return return
} }
}() time.Sleep(time.Nanosecond)
wg.Wait() }
if err != nil { time.Sleep(time.Second * 3)
return mc.conn.Do("OUTPUT", "json")
json, _ := redis.String(mc.conn.Do("SERVER"))
if !gjson.Get(json, "ok").Bool() {
return errors.New("not ok")
}
if gjson.Get(json, "stats.num_objects").Int() > 0 {
return errors.New("items left in database")
} }
mc.conn.Do("FLUSHDB") mc.conn.Do("FLUSHDB")
return nil return nil