From 9e68703841a99ba3c8dfbb6f9db438df63d62ff6 Mon Sep 17 00:00:00 2001 From: tidwall Date: Mon, 12 Jul 2021 13:37:50 -0700 Subject: [PATCH] Update expiration logic This commit changes the logic for managing the expiration of objects in the database. Before: There was a server-wide hashmap that stored the collection key, id, and expiration timestamp for all objects that had a TTL. The hashmap was occasionally probed at 20 random positions, looking for objects that have expired. Those expired objects were immediately deleted, and if there was 5 or more objects deleted, then the probe happened again, with no delay. If the number of objects was less than 5 then the there was a 1/10th of a second delay before the next probe. Now: Rather than a server-wide hashmap, each collection has its own ordered priority queue that stores objects with TTLs. Rather than probing, there is a background routine that executes every 1/10th of a second, which pops the expired objects from the collection queues, and deletes them. The collection/queue method is a more stable approach than the hashmap/probing method. With probing, we can run into major cache misses for some cases where there is wide TTL duration, such as in the hours or days. This may cause the system to occasionally fall behind, leaving should-be expired objects in memory. Using a queue, there is no cache misses, all objects that should be expired will be right away, regardless of the TTL durations. Fixes #616 --- go.mod | 1 - go.sum | 8 - internal/collection/collection.go | 92 +++++-- internal/collection/collection_test.go | 76 +++--- internal/server/aofshrink.go | 26 +- internal/server/crud.go | 63 ++--- internal/server/dev.go | 4 + internal/server/expire.go | 138 +++-------- internal/server/fence.go | 5 +- internal/server/json.go | 16 +- internal/server/scripts.go | 2 +- internal/server/search.go | 12 +- internal/server/server.go | 18 +- internal/server/test.go | 2 +- tests/616/main.go | 133 ++++++++++ vendor/github.com/cespare/xxhash/LICENSE.txt | 22 -- vendor/github.com/cespare/xxhash/README.md | 50 ---- vendor/github.com/cespare/xxhash/go.mod | 6 - vendor/github.com/cespare/xxhash/go.sum | 4 - vendor/github.com/cespare/xxhash/rotate.go | 14 -- vendor/github.com/cespare/xxhash/rotate19.go | 14 -- vendor/github.com/cespare/xxhash/xxhash.go | 168 ------------- .../github.com/cespare/xxhash/xxhash_amd64.go | 12 - .../github.com/cespare/xxhash/xxhash_amd64.s | 233 ------------------ .../github.com/cespare/xxhash/xxhash_other.go | 75 ------ .../github.com/cespare/xxhash/xxhash_safe.go | 10 - .../cespare/xxhash/xxhash_unsafe.go | 30 --- vendor/github.com/tidwall/rhh/LICENSE | 13 - vendor/github.com/tidwall/rhh/README.md | 51 ---- vendor/github.com/tidwall/rhh/go.mod | 8 - vendor/github.com/tidwall/rhh/go.sum | 6 - vendor/github.com/tidwall/rhh/map.go | 205 --------------- vendor/github.com/tidwall/rhh/u64.go | 185 -------------- vendor/modules.txt | 5 - 34 files changed, 342 insertions(+), 1365 deletions(-) create mode 100644 tests/616/main.go delete mode 100644 vendor/github.com/cespare/xxhash/LICENSE.txt delete mode 100644 vendor/github.com/cespare/xxhash/README.md delete mode 100644 vendor/github.com/cespare/xxhash/go.mod delete mode 100644 vendor/github.com/cespare/xxhash/go.sum delete mode 100644 vendor/github.com/cespare/xxhash/rotate.go delete mode 100644 vendor/github.com/cespare/xxhash/rotate19.go delete mode 100644 vendor/github.com/cespare/xxhash/xxhash.go delete mode 100644 vendor/github.com/cespare/xxhash/xxhash_amd64.go delete mode 100644 vendor/github.com/cespare/xxhash/xxhash_amd64.s delete mode 100644 vendor/github.com/cespare/xxhash/xxhash_other.go delete mode 100644 vendor/github.com/cespare/xxhash/xxhash_safe.go delete mode 100644 vendor/github.com/cespare/xxhash/xxhash_unsafe.go delete mode 100644 vendor/github.com/tidwall/rhh/LICENSE delete mode 100644 vendor/github.com/tidwall/rhh/README.md delete mode 100644 vendor/github.com/tidwall/rhh/go.mod delete mode 100644 vendor/github.com/tidwall/rhh/go.sum delete mode 100644 vendor/github.com/tidwall/rhh/map.go delete mode 100644 vendor/github.com/tidwall/rhh/u64.go diff --git a/go.mod b/go.mod index b454d8fd..1fa5ac86 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,6 @@ require ( github.com/tidwall/redbench v0.1.0 github.com/tidwall/redcon v1.4.1 github.com/tidwall/resp v0.1.0 - github.com/tidwall/rhh v1.1.1 github.com/tidwall/rtree v1.2.7 github.com/tidwall/sjson v1.1.6 github.com/xdg/scram v1.0.3 diff --git a/go.sum b/go.sum index 488afffc..543979fe 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= -github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.27.2 h1:1EyY1dsxNDUQEv0O/4TsjosHI2CgB1uo9H/v56xzTxc= github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II= @@ -64,8 +62,6 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= -github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -371,8 +367,6 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -416,8 +410,6 @@ github.com/tidwall/redcon v1.4.1 h1:oupK+lM1FeSGNhhZn85KvofEpboQReM1eIKNWTmD3K8= github.com/tidwall/redcon v1.4.1/go.mod h1:XwNPFbJ4ShWNNSA2Jazhbdje6jegTCcwFR6mfaADvHA= github.com/tidwall/resp v0.1.0 h1:zZ6Hq+2cY4QqhZ4LqrV05T5yLOSPspj+l+DgAoJ25Ak= github.com/tidwall/resp v0.1.0/go.mod h1:18xEj855iMY2bK6tNF2A4x+nZy5gWO1iO7OOl3jETKw= -github.com/tidwall/rhh v1.1.1 h1:8zDpMKcK1pA1zU+Jyuo1UdzTFvME8pH3Sx/MdYgM5sE= -github.com/tidwall/rhh v1.1.1/go.mod h1:DmqiIRtSnlVEi5CSKqNaX6m3YTa3YNSYrGB4FlfdLUU= github.com/tidwall/rtred v0.1.2 h1:exmoQtOLvDoO8ud++6LwVsAMTu0KPzLTUrMln8u1yu8= github.com/tidwall/rtred v0.1.2/go.mod h1:hd69WNXQ5RP9vHd7dqekAz+RIdtfBogmglkZSRxCHFQ= github.com/tidwall/rtree v1.2.7 h1:FlhF5kip3H8BSE4zjiZQYlk9sWvk6q/IU6p4vuPl9kc= diff --git a/internal/collection/collection.go b/internal/collection/collection.go index ee4ebe74..41344cfd 100644 --- a/internal/collection/collection.go +++ b/internal/collection/collection.go @@ -24,6 +24,7 @@ type Cursor interface { type itemT struct { id string obj geojson.Object + expires int64 // unix nano expiration fieldValuesSlot fieldValuesSlot } @@ -44,11 +45,25 @@ func byValue(a, b interface{}) bool { return byID(a, b) } +func byExpires(a, b interface{}) bool { + item1 := a.(*itemT) + item2 := b.(*itemT) + if item1.expires < item2.expires { + return true + } + if item1.expires > item2.expires { + return false + } + // the values match so we'll compare IDs, which are always unique. + return byID(a, b) +} + // Collection represents a collection of geojson objects. type Collection struct { - items *btree.BTree // items sorted by keys + items *btree.BTree // items sorted by id index *geoindex.Index // items geospatially indexed - values *btree.BTree // items sorted by value+key + values *btree.BTree // items sorted by value+id + expires *btree.BTree // items sorted by ex+id fieldMap map[string]int fieldArr []string fieldValues *fieldValues @@ -64,6 +79,7 @@ func New() *Collection { items: btree.New(byID), index: geoindex.Wrap(&rtree.RTree{}), values: btree.New(byValue), + expires: btree.New(byExpires), fieldMap: make(map[string]int), fieldArr: make([]string, 0), fieldValues: &fieldValues{}, @@ -141,11 +157,11 @@ func (c *Collection) indexInsert(item *itemT) { // The fields argument is optional. // The return values are the old object, the old fields, and the new fields func (c *Collection) Set( - id string, obj geojson.Object, fields []string, values []float64, + id string, obj geojson.Object, fields []string, values []float64, ex int64, ) ( oldObject geojson.Object, oldFieldValues []float64, newFieldValues []float64, ) { - newItem := &itemT{id: id, obj: obj, fieldValuesSlot: nilValuesSlot} + newItem := &itemT{id: id, obj: obj, fieldValuesSlot: nilValuesSlot, expires: ex} // add the new item to main btree and remove the old one if needed oldItem := c.items.Set(newItem) @@ -159,6 +175,10 @@ func (c *Collection) Set( c.values.Delete(oldItem) c.nobjects-- } + // delete old item from the expires queue + if oldItem.expires != 0 { + c.expires.Delete(oldItem) + } // decrement the point count c.points -= oldItem.obj.NumPoints() @@ -191,6 +211,10 @@ func (c *Collection) Set( c.values.Set(newItem) c.nobjects++ } + // insert item into expires queue. + if newItem.expires != 0 { + c.expires.Set(newItem) + } // increment the point count c.points += newItem.obj.NumPoints() @@ -206,11 +230,11 @@ func (c *Collection) Set( func (c *Collection) Delete(id string) ( obj geojson.Object, fields []float64, ok bool, ) { - oldItemV := c.items.Delete(&itemT{id: id}) - if oldItemV == nil { + v := c.items.Delete(&itemT{id: id}) + if v == nil { return nil, nil, false } - oldItem := oldItemV.(*itemT) + oldItem := v.(*itemT) if objIsSpatial(oldItem.obj) { if !oldItem.obj.Empty() { c.indexDelete(oldItem) @@ -220,6 +244,10 @@ func (c *Collection) Delete(id string) ( c.values.Delete(oldItem) c.nobjects-- } + // delete old item from expires queue + if oldItem.expires != 0 { + c.expires.Delete(oldItem) + } c.weight -= c.objWeight(oldItem) c.points -= oldItem.obj.NumPoints() @@ -231,14 +259,30 @@ func (c *Collection) Delete(id string) ( // Get returns an object. // If the object does not exist then the 'ok' return value will be false. func (c *Collection) Get(id string) ( - obj geojson.Object, fields []float64, ok bool, + obj geojson.Object, fields []float64, ex int64, ok bool, ) { itemV := c.items.Get(&itemT{id: id}) if itemV == nil { - return nil, nil, false + return nil, nil, 0, false } item := itemV.(*itemT) - return item.obj, c.fieldValues.get(item.fieldValuesSlot), true + return item.obj, c.fieldValues.get(item.fieldValuesSlot), item.expires, true +} + +func (c *Collection) SetExpires(id string, ex int64) bool { + v := c.items.Get(&itemT{id: id}) + if v == nil { + return false + } + item := v.(*itemT) + if item.expires != 0 { + c.expires.Delete(item) + } + item.expires = ex + if item.expires != 0 { + c.expires.Set(item) + } + return true } // SetField set a field value for an object and returns that object. @@ -487,7 +531,7 @@ func bGT(tr *btree.BTree, a, b interface{}) bool { return tr.Less(b, a) } func (c *Collection) ScanGreaterOrEqual(id string, desc bool, cursor Cursor, deadline *deadline.Deadline, - iterator func(id string, obj geojson.Object, fields []float64) bool, + iterator func(id string, obj geojson.Object, fields []float64, ex int64) bool, ) bool { var keepon = true var count uint64 @@ -496,14 +540,14 @@ func (c *Collection) ScanGreaterOrEqual(id string, desc bool, offset = cursor.Offset() cursor.Step(offset) } - iter := func(value interface{}) bool { + iter := func(v interface{}) bool { count++ if count <= offset { return true } nextStep(count, cursor, deadline) - iitm := value.(*itemT) - keepon = iterator(iitm.id, iitm.obj, c.fieldValues.get(iitm.fieldValuesSlot)) + item := v.(*itemT) + keepon = iterator(item.id, item.obj, c.fieldValues.get(item.fieldValuesSlot), item.expires) return keepon } if desc { @@ -757,3 +801,23 @@ func nextStep(step uint64, cursor Cursor, deadline *deadline.Deadline) { cursor.Step(1) } } + +type Expired struct { + ID string + Obj geojson.Object + Fields []float64 +} + +// Expired returns a list of all objects that have expired. +func (c *Collection) Expired(now int64, buffer []string) (ids []string) { + ids = buffer[:0] + c.expires.Ascend(nil, func(v interface{}) bool { + item := v.(*itemT) + if now < item.expires { + return false + } + ids = append(ids, item.id) + return true + }) + return ids +} diff --git a/internal/collection/collection_test.go b/internal/collection/collection_test.go index 70edd6a5..3f32c658 100644 --- a/internal/collection/collection_test.go +++ b/internal/collection/collection_test.go @@ -46,7 +46,7 @@ func TestCollectionNewCollection(t *testing.T) { id := strconv.FormatInt(int64(i), 10) obj := PO(rand.Float64()*360-180, rand.Float64()*180-90) objs[id] = obj - c.Set(id, obj, nil, nil) + c.Set(id, obj, nil, nil, 0) } count := 0 bbox := geometry.Rect{ @@ -71,7 +71,7 @@ func TestCollectionSet(t *testing.T) { t.Run("AddString", func(t *testing.T) { c := New() str1 := String("hello") - oldObject, oldFields, newFields := c.Set("str", str1, nil, nil) + oldObject, oldFields, newFields := c.Set("str", str1, nil, nil, 0) expect(t, oldObject == nil) expect(t, len(oldFields) == 0) expect(t, len(newFields) == 0) @@ -80,11 +80,11 @@ func TestCollectionSet(t *testing.T) { c := New() str1 := String("hello") str2 := String("world") - oldObject, oldFields, newFields := c.Set("str", str1, nil, nil) + oldObject, oldFields, newFields := c.Set("str", str1, nil, nil, 0) expect(t, oldObject == nil) expect(t, len(oldFields) == 0) expect(t, len(newFields) == 0) - oldObject, oldFields, newFields = c.Set("str", str2, nil, nil) + oldObject, oldFields, newFields = c.Set("str", str2, nil, nil, 0) expect(t, oldObject == str1) expect(t, len(oldFields) == 0) expect(t, len(newFields) == 0) @@ -92,7 +92,7 @@ func TestCollectionSet(t *testing.T) { t.Run("AddPoint", func(t *testing.T) { c := New() point1 := PO(-112.1, 33.1) - oldObject, oldFields, newFields := c.Set("point", point1, nil, nil) + oldObject, oldFields, newFields := c.Set("point", point1, nil, nil, 0) expect(t, oldObject == nil) expect(t, len(oldFields) == 0) expect(t, len(newFields) == 0) @@ -101,11 +101,11 @@ func TestCollectionSet(t *testing.T) { c := New() point1 := PO(-112.1, 33.1) point2 := PO(-112.2, 33.2) - oldObject, oldFields, newFields := c.Set("point", point1, nil, nil) + oldObject, oldFields, newFields := c.Set("point", point1, nil, nil, 0) expect(t, oldObject == nil) expect(t, len(oldFields) == 0) expect(t, len(newFields) == 0) - oldObject, oldFields, newFields = c.Set("point", point2, nil, nil) + oldObject, oldFields, newFields = c.Set("point", point2, nil, nil, 0) expect(t, oldObject == point1) expect(t, len(oldFields) == 0) expect(t, len(newFields) == 0) @@ -115,19 +115,19 @@ func TestCollectionSet(t *testing.T) { str1 := String("hello") fNames := []string{"a", "b", "c"} fValues := []float64{1, 2, 3} - oldObj, oldFlds, newFlds := c.Set("str", str1, fNames, fValues) + oldObj, oldFlds, newFlds := c.Set("str", str1, fNames, fValues, 0) expect(t, oldObj == nil) expect(t, len(oldFlds) == 0) expect(t, reflect.DeepEqual(newFlds, fValues)) str2 := String("hello") fNames = []string{"d", "e", "f"} fValues = []float64{4, 5, 6} - oldObj, oldFlds, newFlds = c.Set("str", str2, fNames, fValues) + oldObj, oldFlds, newFlds = c.Set("str", str2, fNames, fValues, 0) expect(t, oldObj == str1) expect(t, reflect.DeepEqual(oldFlds, []float64{1, 2, 3})) expect(t, reflect.DeepEqual(newFlds, []float64{1, 2, 3, 4, 5, 6})) fValues = []float64{7, 8, 9, 10, 11, 12} - oldObj, oldFlds, newFlds = c.Set("str", str1, nil, fValues) + oldObj, oldFlds, newFlds = c.Set("str", str1, nil, fValues, 0) expect(t, oldObj == str2) expect(t, reflect.DeepEqual(oldFlds, []float64{1, 2, 3, 4, 5, 6})) expect(t, reflect.DeepEqual(newFlds, []float64{7, 8, 9, 10, 11, 12})) @@ -135,9 +135,9 @@ func TestCollectionSet(t *testing.T) { t.Run("Delete", func(t *testing.T) { c := New() - c.Set("1", String("1"), nil, nil) - c.Set("2", String("2"), nil, nil) - c.Set("3", PO(1, 2), nil, nil) + c.Set("1", String("1"), nil, nil, 0) + c.Set("2", String("2"), nil, nil, 0) + c.Set("3", PO(1, 2), nil, nil, 0) expect(t, c.Count() == 3) expect(t, c.StringCount() == 2) @@ -203,7 +203,7 @@ func TestCollectionSet(t *testing.T) { expect(t, !ok) expect(t, c.Count() == 0) expect(t, bounds(c) == geometry.Rect{}) - v, _, ok = c.Get("3") + v, _, _, ok = c.Get("3") expect(t, v == nil) expect(t, !ok) _, _, _, ok = c.SetField("3", "hello", 123) @@ -225,7 +225,7 @@ func TestCollectionScan(t *testing.T) { c := New() for _, i := range rand.Perm(N) { id := fmt.Sprintf("%04d", i) - c.Set(id, String(id), []string{"ex"}, []float64{float64(i)}) + c.Set(id, String(id), []string{"ex"}, []float64{float64(i)}, 0) } var n int var prevID string @@ -279,7 +279,7 @@ func TestCollectionScan(t *testing.T) { n = 0 c.ScanGreaterOrEqual("0070", true, nil, nil, - func(id string, obj geojson.Object, fields []float64) bool { + func(id string, obj geojson.Object, fields []float64, ex int64) bool { if n > 0 { expect(t, id < prevID) } @@ -292,7 +292,7 @@ func TestCollectionScan(t *testing.T) { n = 0 c.ScanGreaterOrEqual("0070", false, nil, nil, - func(id string, obj geojson.Object, fields []float64) bool { + func(id string, obj geojson.Object, fields []float64, ex int64) bool { if n > 0 { expect(t, id > prevID) } @@ -312,7 +312,7 @@ func TestCollectionSearch(t *testing.T) { id := fmt.Sprintf("%04d", j) ex := fmt.Sprintf("%04d", i) c.Set(id, String(ex), []string{"i", "j"}, - []float64{float64(i), float64(j)}) + []float64{float64(i), float64(j)}, 0) } var n int var prevValue string @@ -367,13 +367,14 @@ func TestCollectionSearch(t *testing.T) { func TestCollectionWeight(t *testing.T) { c := New() - c.Set("1", String("1"), nil, nil) + c.Set("1", String("1"), nil, nil, 0) expect(t, c.TotalWeight() > 0) c.Delete("1") expect(t, c.TotalWeight() == 0) c.Set("1", String("1"), []string{"a", "b", "c"}, []float64{1, 2, 3}, + 0, ) expect(t, c.TotalWeight() > 0) c.Delete("1") @@ -381,14 +382,17 @@ func TestCollectionWeight(t *testing.T) { c.Set("1", String("1"), []string{"a", "b", "c"}, []float64{1, 2, 3}, + 0, ) c.Set("2", String("2"), []string{"d", "e", "f"}, []float64{4, 5, 6}, + 0, ) c.Set("1", String("1"), []string{"d", "e", "f"}, []float64{4, 5, 6}, + 0, ) c.Delete("1") c.Delete("2") @@ -424,13 +428,13 @@ func TestSpatialSearch(t *testing.T) { q4, _ := geojson.Parse(gjson.Get(json, `features.#[id=="q4"]`).Raw, nil) c := New() - c.Set("p1", p1, nil, nil) - c.Set("p2", p2, nil, nil) - c.Set("p3", p3, nil, nil) - c.Set("p4", p4, nil, nil) - c.Set("r1", r1, nil, nil) - c.Set("r2", r2, nil, nil) - c.Set("r3", r3, nil, nil) + c.Set("p1", p1, nil, nil, 0) + c.Set("p2", p2, nil, nil, 0) + c.Set("p3", p3, nil, nil, 0) + c.Set("p4", p4, nil, nil, 0) + c.Set("r1", r1, nil, nil, 0) + c.Set("r2", r2, nil, nil, 0) + c.Set("r3", r3, nil, nil, 0) var n int @@ -530,7 +534,7 @@ func TestCollectionSparse(t *testing.T) { x := (r.Max.X-r.Min.X)*rand.Float64() + r.Min.X y := (r.Max.Y-r.Min.Y)*rand.Float64() + r.Min.Y point := PO(x, y) - c.Set(fmt.Sprintf("%d", i), point, nil, nil) + c.Set(fmt.Sprintf("%d", i), point, nil, nil, 0) } var n int n = 0 @@ -591,7 +595,7 @@ func TestCollectionSparse(t *testing.T) { func testCollectionVerifyContents(t *testing.T, c *Collection, objs map[string]geojson.Object) { for id, o2 := range objs { - o1, _, ok := c.Get(id) + o1, _, _, ok := c.Get(id) if !ok { t.Fatalf("ok[%s] = false, expect true", id) } @@ -622,7 +626,7 @@ func TestManyCollections(t *testing.T) { col = New() colsM[key] = col } - col.Set(id, obj, nil, nil) + col.Set(id, obj, nil, nil, 0) k++ } } @@ -674,7 +678,7 @@ func benchmarkInsert(t *testing.B, nFields int) { col := New() t.ResetTimer() for i := 0; i < t.N; i++ { - col.Set(items[i].id, items[i].object, nil, items[i].fields) + col.Set(items[i].id, items[i].object, nil, items[i].fields, 0) } } @@ -698,11 +702,11 @@ func benchmarkReplace(t *testing.B, nFields int) { } col := New() for i := 0; i < t.N; i++ { - col.Set(items[i].id, items[i].object, nil, items[i].fields) + col.Set(items[i].id, items[i].object, nil, items[i].fields, 0) } t.ResetTimer() for _, i := range rand.Perm(t.N) { - o, _, _ := col.Set(items[i].id, items[i].object, nil, nil) + o, _, _ := col.Set(items[i].id, items[i].object, nil, nil, 0) if o != items[i].object { t.Fatal("shoot!") } @@ -729,11 +733,11 @@ func benchmarkGet(t *testing.B, nFields int) { } col := New() for i := 0; i < t.N; i++ { - col.Set(items[i].id, items[i].object, nil, items[i].fields) + col.Set(items[i].id, items[i].object, nil, items[i].fields, 0) } t.ResetTimer() for _, i := range rand.Perm(t.N) { - o, _, _ := col.Get(items[i].id) + o, _, _, _ := col.Get(items[i].id) if o != items[i].object { t.Fatal("shoot!") } @@ -760,7 +764,7 @@ func benchmarkRemove(t *testing.B, nFields int) { } col := New() for i := 0; i < t.N; i++ { - col.Set(items[i].id, items[i].object, nil, items[i].fields) + col.Set(items[i].id, items[i].object, nil, items[i].fields, 0) } t.ResetTimer() for _, i := range rand.Perm(t.N) { @@ -791,7 +795,7 @@ func benchmarkScan(t *testing.B, nFields int) { } col := New() for i := 0; i < t.N; i++ { - col.Set(items[i].id, items[i].object, nil, items[i].fields) + col.Set(items[i].id, items[i].object, nil, items[i].fields, 0) } t.ResetTimer() for i := 0; i < t.N; i++ { diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index 7aa7b331..f53db56e 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -9,7 +9,6 @@ import ( "time" "github.com/tidwall/geojson" - "github.com/tidwall/rhh" "github.com/tidwall/tile38/core" "github.com/tidwall/tile38/internal/collection" "github.com/tidwall/tile38/internal/log" @@ -92,23 +91,18 @@ func (server *Server) aofshrink() { if col == nil { return } - var fnames = col.FieldArr() // reload an array of field names to match each object - var fmap = col.FieldMap() // - var exm *rhh.Map // the expiration map - if value, ok := server.expires.Get(keys[0]); ok { - exm = value.(*rhh.Map) - } + var fnames = col.FieldArr() // reload an array of field names to match each object + var fmap = col.FieldMap() // var now = time.Now().UnixNano() // used for expiration var count = 0 // the object count col.ScanGreaterOrEqual(nextid, false, nil, nil, - func(id string, obj geojson.Object, fields []float64) bool { + func(id string, obj geojson.Object, fields []float64, ex int64) bool { if count == maxids { // we reached the max number of ids for one batch nextid = id idsdone = false return false } - // here we fill the values array with a new command values = values[:0] values = append(values, "set") @@ -124,14 +118,14 @@ func (server *Server) aofshrink() { } } } - if exm != nil { - if at, ok := exm.Get(id); ok { - expires := at.(int64) - now - if expires > 0 { - values = append(values, "ex") - values = append(values, strconv.FormatFloat(math.Floor(float64(expires)/float64(time.Second)*10)/10, 'f', -1, 64)) - } + if ex != 0 { + ttl := math.Floor(float64(ex-now)/float64(time.Second)*10) / 10 + if ttl < 0.1 { + // always leave a little bit of ttl. + ttl = 0.1 } + values = append(values, "ex") + values = append(values, strconv.FormatFloat(ttl, 'f', -1, 64)) } if objIsSpatial(obj) { values = append(values, "object") diff --git a/internal/server/crud.go b/internal/server/crud.go index 16fa38a0..e67243f7 100644 --- a/internal/server/crud.go +++ b/internal/server/crud.go @@ -11,7 +11,6 @@ import ( "github.com/tidwall/geojson" "github.com/tidwall/geojson/geometry" "github.com/tidwall/resp" - "github.com/tidwall/rhh" "github.com/tidwall/rtree" "github.com/tidwall/tile38/internal/collection" "github.com/tidwall/tile38/internal/glob" @@ -150,8 +149,7 @@ func (server *Server) cmdGet(msg *Message) (resp.Value, error) { } return NOMessage, errKeyNotFound } - o, fields, ok := col.Get(id) - ok = ok && !server.hasExpired(key, id) + o, fields, _, ok := col.Get(id) if !ok { if msg.OutputType == RESP { return resp.NullValue(), nil @@ -310,7 +308,6 @@ func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, er found = true } } - server.clearIDExpires(d.key, d.id) d.command = "del" d.updated = found d.timestamp = time.Now() @@ -375,7 +372,6 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e } else { d.children[i] = dc } - server.clearIDExpires(d.key, dc.id) } if atLeastOneNotDeleted { var nchildren []*commandDetails @@ -429,7 +425,6 @@ func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, e } d.command = "drop" d.timestamp = time.Now() - server.clearKeyExpires(d.key) switch msg.OutputType { case JSON: res = resp.StringValue(`{"ok":true,"elapsed":"` + time.Since(start).String() + "\"}") @@ -478,13 +473,11 @@ func (server *Server) cmdRename(msg *Message, nx bool) (res resp.Value, d comman d.updated = false } else { server.deleteCol(d.newKey) - server.clearKeyExpires(d.newKey) d.updated = true } if d.updated { server.deleteCol(d.key) server.setCol(d.newKey, col) - server.moveKeyExpires(d.key, d.newKey) } d.timestamp = time.Now() switch msg.OutputType { @@ -510,7 +503,6 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails return } server.cols = btree.New(byCollectionKey) - server.expires = rhh.New(0) server.hooks = make(map[string]*Hook) server.hooksOut = make(map[string]*Hook) server.hookTree = rtree.RTree{} @@ -530,7 +522,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails func (server *Server) parseSetArgs(vs []string) ( d commandDetails, fields []string, values []float64, xx, nx bool, - expires *float64, etype []byte, evs []string, err error, + ex int64, etype []byte, evs []string, err error, ) { var ok bool var typ []byte @@ -577,7 +569,7 @@ func (server *Server) parseSetArgs(vs []string) ( } if lcb(arg, "ex") { vs = nvs - if expires != nil { + if ex != 0 { err = errInvalidArgument(string(arg)) return } @@ -592,7 +584,7 @@ func (server *Server) parseSetArgs(vs []string) ( err = errInvalidArgument(s) return } - expires = &v + ex = time.Now().UnixNano() + int64(float64(time.Second)*v) continue } if lcb(arg, "xx") { @@ -747,7 +739,7 @@ func (server *Server) parseSetArgs(vs []string) ( return } -func (server *Server) cmdSet(msg *Message, resetExpires bool) (res resp.Value, d commandDetails, err error) { +func (server *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, err error) { if server.config.maxMemory() > 0 && server.outOfMemory.on() { err = errOOM return @@ -758,7 +750,7 @@ func (server *Server) cmdSet(msg *Message, resetExpires bool) (res resp.Value, d var fields []string var values []float64 var xx, nx bool - var ex *float64 + var ex int64 d, fields, values, xx, nx, ex, _, _, err = server.parseSetArgs(vs) if err != nil { return @@ -772,15 +764,12 @@ func (server *Server) cmdSet(msg *Message, resetExpires bool) (res resp.Value, d server.setCol(d.key, col) } if xx || nx { - _, _, ok := col.Get(d.id) + _, _, _, ok := col.Get(d.id) if (nx && ok) || (xx && !ok) { goto notok } } - if resetExpires { - server.clearIDExpires(d.key, d.id) - } - d.oldObj, d.oldFields, d.fields = col.Set(d.id, d.obj, fields, values) + d.oldObj, d.oldFields, d.fields = col.Set(d.id, d.obj, fields, values, ex) d.command = "set" d.updated = true // perhaps we should do a diff on the previous object? d.timestamp = time.Now() @@ -792,9 +781,9 @@ func (server *Server) cmdSet(msg *Message, resetExpires bool) (res resp.Value, d d.fmap[key] = idx } } - if ex != nil { - server.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex)))) - } + // if ex != nil { + // server.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex)))) + // } switch msg.OutputType { default: case JSON: @@ -936,11 +925,10 @@ func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, ok = false col := server.getCol(key) if col != nil { - _, _, ok = col.Get(id) - ok = ok && !server.hasExpired(key, id) + ex := time.Now().Add(time.Duration(float64(time.Second) * value)).UnixNano() + ok = col.SetExpires(id, ex) } if ok { - server.expireAt(key, id, time.Now().Add(time.Duration(float64(time.Second)*value))) d.updated = true } switch msg.OutputType { @@ -981,10 +969,13 @@ func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails ok = false col := server.getCol(key) if col != nil { - _, _, ok = col.Get(id) - ok = ok && !server.hasExpired(key, id) - if ok { - cleared = server.clearIDExpires(key, id) + var ex int64 + _, _, ex, ok = col.Get(id) + if ok && ex != 0 { + ok = col.SetExpires(id, 0) + if ok { + cleared = true + } } } if !ok { @@ -1031,19 +1022,19 @@ func (server *Server) cmdTTL(msg *Message) (res resp.Value, err error) { var ok2 bool col := server.getCol(key) if col != nil { - _, _, ok = col.Get(id) - ok = ok && !server.hasExpired(key, id) + var ex int64 + _, _, ex, ok = col.Get(id) if ok { - var at time.Time - at, ok2 = server.getExpires(key, id) - if ok2 { - if time.Now().After(at) { + if ex != 0 { + now := start.UnixNano() + if now > ex { ok2 = false } else { - v = float64(time.Until(at)) / float64(time.Second) + v = float64(ex-now) / float64(time.Second) if v < 0 { v = 0 } + ok2 = true } } } diff --git a/internal/server/dev.go b/internal/server/dev.go index 8b73d0cb..1f6e47f9 100644 --- a/internal/server/dev.go +++ b/internal/server/dev.go @@ -119,6 +119,10 @@ func (s *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { fmt.Sprintf("fname:%d", i), strconv.FormatFloat(fval, 'f', -1, 64)) } + if rand.Int()%2 == 0 { + values = append(values, "EX", fmt.Sprint(rand.Intn(25)+5)) + } + if j%8 == 0 { values = append(values, "STRING", fmt.Sprintf("str%v", j)) } else { diff --git a/internal/server/expire.go b/internal/server/expire.go index f892ad97..216cece3 100644 --- a/internal/server/expire.go +++ b/internal/server/expire.go @@ -1,123 +1,49 @@ package server import ( - "math/rand" "time" - "github.com/tidwall/rhh" "github.com/tidwall/tile38/internal/log" ) -// clearIDExpires clears a single item from the expires list. -func (s *Server) clearIDExpires(key, id string) (cleared bool) { - if s.expires.Len() > 0 { - if idm, ok := s.expires.Get(key); ok { - if _, ok := idm.(*rhh.Map).Delete(id); ok { - if idm.(*rhh.Map).Len() == 0 { - s.expires.Delete(key) - } - return true - } - } - } - return false -} - -// clearKeyExpires clears all items that are marked as expires from a single key. -func (s *Server) clearKeyExpires(key string) { - s.expires.Delete(key) -} - -// moveKeyExpires moves all items that are marked as expires from a key to a newKey. -func (s *Server) moveKeyExpires(key, newKey string) { - if idm, ok := s.expires.Delete(key); ok { - s.expires.Set(newKey, idm) - } -} - -// expireAt marks an item as expires at a specific time. -func (s *Server) expireAt(key, id string, at time.Time) { - idm, ok := s.expires.Get(key) - if !ok { - idm = rhh.New(0) - s.expires.Set(key, idm) - } - idm.(*rhh.Map).Set(id, at.UnixNano()) -} - -// getExpires returns the when an item expires. -func (s *Server) getExpires(key, id string) (at time.Time, ok bool) { - if s.expires.Len() > 0 { - if idm, ok := s.expires.Get(key); ok { - if atv, ok := idm.(*rhh.Map).Get(id); ok { - return time.Unix(0, atv.(int64)), true - } - } - } - return time.Time{}, false -} - -// hasExpired returns true if an item has expired. -func (s *Server) hasExpired(key, id string) bool { - if at, ok := s.getExpires(key, id); ok { - return time.Now().After(at) - } - return false -} - const bgExpireDelay = time.Second / 10 -const bgExpireSegmentSize = 20 -// expirePurgeSweep is ran from backgroundExpiring operation and performs -// segmented sweep of the expires list -func (s *Server) expirePurgeSweep(rng *rand.Rand) (purged int) { - now := time.Now().UnixNano() - s.mu.Lock() - defer s.mu.Unlock() - if s.expires.Len() == 0 { - return 0 - } - for i := 0; i < bgExpireSegmentSize; i++ { - if key, idm, ok := s.expires.GetPos(rng.Uint64()); ok { - id, atv, ok := idm.(*rhh.Map).GetPos(rng.Uint64()) - if ok { - if now > atv.(int64) { - // expired, purge from database - msg := &Message{} - msg.Args = []string{"del", key, id} - _, d, err := s.cmdDel(msg) - if err != nil { - log.Fatal(err) - } - if err := s.writeAOF(msg.Args, &d); err != nil { - log.Fatal(err) - } - purged++ - } - } - } - // recycle the lock - s.mu.Unlock() - s.mu.Lock() - } - return purged -} - -// backgroundExpiring watches for when items that have expired must be purged -// from the database. It's executes 10 times a seconds. +// backgroundExpiring deletes expired items from the database. +// It's executes every 1/10 of a second. func (s *Server) backgroundExpiring() { - rng := rand.New(rand.NewSource(time.Now().UnixNano())) for { if s.stopServer.on() { return } - purged := s.expirePurgeSweep(rng) - if purged > bgExpireSegmentSize/4 { - // do another purge immediately - continue - } else { - // back off - time.Sleep(bgExpireDelay) - } + func() { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now().UnixNano() + var ids []string + var msgs []*Message + s.cols.Ascend(nil, func(v interface{}) bool { + col := v.(*collectionKeyContainer) + ids = col.col.Expired(now, ids[:0]) + for _, id := range ids { + msgs = append(msgs, &Message{ + Args: []string{"del", col.key, id}, + }) + } + return true + }) + for _, msg := range msgs { + _, d, err := s.cmdDel(msg) + if err != nil { + log.Fatal(err) + } + if err := s.writeAOF(msg.Args, &d); err != nil { + log.Fatal(err) + } + } + if len(msgs) > 0 { + log.Debugf("Expired %d items\n", len(msgs)) + } + }() + time.Sleep(bgExpireDelay) } } diff --git a/internal/server/fence.go b/internal/server/fence.go index 2c3f71ec..d0596dcf 100644 --- a/internal/server/fence.go +++ b/internal/server/fence.go @@ -276,7 +276,7 @@ func extendRoamMessage( nmsg = append(nmsg, `,"scan":[`...) col := sw.s.getCol(fence.roam.key) if col != nil { - obj, _, ok := col.Get(match.id) + obj, _, _, ok := col.Get(match.id) if ok { nmsg = append(nmsg, `{"id":`...) nmsg = appendJSONString(nmsg, match.id) @@ -375,9 +375,6 @@ func fenceMatchNearbys( col.Intersects(geojson.NewRect(rect), 0, nil, nil, func( id2 string, obj2 geojson.Object, fields []float64, ) bool { - if s.hasExpired(fence.roam.key, id2) { - return true // skip expired - } var idMatch bool if id2 == id { return true // skip self diff --git a/internal/server/json.go b/internal/server/json.go index 7ccc9526..ab7fb1be 100644 --- a/internal/server/json.go +++ b/internal/server/json.go @@ -194,7 +194,7 @@ func (s *Server) cmdJget(msg *Message) (resp.Value, error) { } return NOMessage, errKeyNotFound } - o, _, ok := col.Get(id) + o, _, _, ok := col.Get(id) if !ok { if msg.OutputType == RESP { return resp.NullValue(), nil @@ -273,7 +273,7 @@ func (s *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er } var json string var geoobj bool - o, _, ok := col.Get(id) + o, _, _, ok := col.Get(id) if ok { geoobj = objIsSpatial(o) json = o.String() @@ -293,7 +293,7 @@ func (s *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er nmsg := *msg nmsg.Args = []string{"SET", key, id, "OBJECT", json} // SET key id OBJECT json - return s.cmdSet(&nmsg, false) + return s.cmdSet(&nmsg) } if createcol { s.setCol(key, col) @@ -305,8 +305,7 @@ func (s *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er d.timestamp = time.Now() d.updated = true - s.clearIDExpires(key, id) - col.Set(d.id, d.obj, nil, nil) + col.Set(d.id, d.obj, nil, nil, 0) switch msg.OutputType { case JSON: var buf bytes.Buffer @@ -339,7 +338,7 @@ func (s *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er var json string var geoobj bool - o, _, ok := col.Get(id) + o, _, _, ok := col.Get(id) if ok { geoobj = objIsSpatial(o) json = o.String() @@ -362,7 +361,7 @@ func (s *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er nmsg := *msg nmsg.Args = []string{"SET", key, id, "OBJECT", json} // SET key id OBJECT json - return s.cmdSet(&nmsg, false) + return s.cmdSet(&nmsg) } d.key = key @@ -371,8 +370,7 @@ func (s *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er d.timestamp = time.Now() d.updated = true - s.clearIDExpires(d.key, d.id) - col.Set(d.id, d.obj, nil, nil) + col.Set(d.id, d.obj, nil, nil, 0) switch msg.OutputType { case JSON: var buf bytes.Buffer diff --git a/internal/server/scripts.go b/internal/server/scripts.go index 053bb40c..98e16d94 100644 --- a/internal/server/scripts.go +++ b/internal/server/scripts.go @@ -592,7 +592,7 @@ func (s *Server) commandInScript(msg *Message) ( default: err = fmt.Errorf("unknown command '%s'", msg.Args[0]) case "set": - res, d, err = s.cmdSet(msg, true) + res, d, err = s.cmdSet(msg) case "fset": res, d, err = s.cmdFset(msg) case "del": diff --git a/internal/server/search.go b/internal/server/search.go index 9d647b5b..0aa66054 100644 --- a/internal/server/search.go +++ b/internal/server/search.go @@ -304,7 +304,7 @@ func (server *Server) cmdSearchArgs( err = errKeyNotFound return } - s.obj, _, ok = col.Get(id) + s.obj, _, _, ok = col.Get(id) if !ok { err = errIDNotFound return @@ -417,10 +417,6 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { if sw.col != nil { maxDist := s.obj.(*geojson.Circle).Meters() iter := func(id string, o geojson.Object, fields []float64, dist float64) bool { - if server.hasExpired(s.key, id) { - return true - } - if maxDist > 0 && dist > maxDist { return false } @@ -496,9 +492,6 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. sw.col.Within(s.obj, s.sparse, sw, msg.Deadline, func( id string, o geojson.Object, fields []float64, ) bool { - if server.hasExpired(s.key, id) { - return true - } return sw.writeObject(ScanWriterParams{ id: id, o: o, @@ -512,9 +505,6 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. o geojson.Object, fields []float64, ) bool { - if server.hasExpired(s.key, id) { - return true - } params := ScanWriterParams{ id: id, o: o, diff --git a/internal/server/server.go b/internal/server/server.go index 11499a28..d7ac15fb 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,7 +10,7 @@ import ( "fmt" "io" "net" - net_http "net/http" + "net/http" "net/url" "os" "path" @@ -30,7 +30,6 @@ import ( "github.com/tidwall/gjson" "github.com/tidwall/redcon" "github.com/tidwall/resp" - "github.com/tidwall/rhh" "github.com/tidwall/rtree" "github.com/tidwall/tile38/core" "github.com/tidwall/tile38/internal/collection" @@ -108,7 +107,6 @@ type Server struct { qdb *buntdb.DB // hook queue log qidx uint64 // hook queue log last idx cols *btree.BTree // data collections - expires *rhh.Map // map[string]map[string]time.Time follows map[*bytes.Buffer]bool fcond *sync.Cond @@ -135,7 +133,7 @@ type Server struct { } // Serve starts a new tile38 server -func Serve(host string, port int, dir string, http bool, metricsAddr string) error { +func Serve(host string, port int, dir string, useHTTP bool, metricsAddr string) error { if core.AppendFileName == "" { core.AppendFileName = path.Join(dir, "appendonly.aof") } @@ -156,10 +154,9 @@ func Serve(host string, port int, dir string, http bool, metricsAddr string) err hooks: make(map[string]*Hook), hooksOut: make(map[string]*Hook), aofconnM: make(map[net.Conn]io.Closer), - expires: rhh.New(0), started: time.Now(), conns: make(map[int]*Client), - http: http, + http: useHTTP, pubsub: newPubsub(), monconns: make(map[net.Conn]bool), cols: btree.New(byCollectionKey), @@ -289,9 +286,9 @@ func Serve(host string, port int, dir string, http bool, metricsAddr string) err if metricsAddr != "" { log.Infof("Listening for metrics at: %s", metricsAddr) go func() { - net_http.HandleFunc("/", server.MetricsIndexHandler) - net_http.HandleFunc("/metrics", server.MetricsHandler) - log.Fatal(net_http.ListenAndServe(metricsAddr, nil)) + http.HandleFunc("/", server.MetricsIndexHandler) + http.HandleFunc("/metrics", server.MetricsHandler) + log.Fatal(http.ListenAndServe(metricsAddr, nil)) }() } @@ -993,7 +990,6 @@ func randomKey(n int) string { func (server *Server) reset() { server.aofsz = 0 server.cols = btree.New(byCollectionKey) - server.expires = rhh.New(0) } func (server *Server) command(msg *Message, client *Client) ( @@ -1003,7 +999,7 @@ func (server *Server) command(msg *Message, client *Client) ( default: err = fmt.Errorf("unknown command '%s'", msg.Args[0]) case "set": - res, d, err = server.cmdSet(msg, true) + res, d, err = server.cmdSet(msg) case "fset": res, d, err = server.cmdFset(msg) case "del": diff --git a/internal/server/test.go b/internal/server/test.go index 21c98869..520babd6 100644 --- a/internal/server/test.go +++ b/internal/server/test.go @@ -216,7 +216,7 @@ func (s *Server) parseArea(ovs []string, doClip bool) (vs []string, o geojson.Ob err = errKeyNotFound return } - o, _, ok = col.Get(id) + o, _, _, ok = col.Get(id) if !ok { err = errIDNotFound return diff --git a/tests/616/main.go b/tests/616/main.go new file mode 100644 index 00000000..1b0681e2 --- /dev/null +++ b/tests/616/main.go @@ -0,0 +1,133 @@ +// Test Tile38 for Expiration Drift +// Issue #616 + +package main + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/gomodule/redigo/redis" + "github.com/tidwall/btree" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const exsecs = 10 +const key = "__issue_616__" + +func makeID() string { + const chars = "0123456789abcdefghijklmnopqrstuvwxyz-" + var buf [10]byte + rand.Read(buf[:]) + for i := 0; i < len(buf); i++ { + buf[i] = chars[int(buf[i])%len(chars)] + } + return string(buf[:]) +} + +func main() { + fmt.Printf( + "The SCAN and ACTUAL values should reach about 1850 and stay\n" + + "roughly the same from there on.\n") + var mu sync.Mutex + objs := btree.New(func(a, b interface{}) bool { + ajson := a.(string) + bjson := b.(string) + return gjson.Get(ajson, "id").String() < gjson.Get(bjson, "id").String() + }) + expires := btree.New(func(a, b interface{}) bool { + ajson := a.(string) + bjson := b.(string) + if gjson.Get(ajson, "properties.ex").Int() < gjson.Get(bjson, "properties.ex").Int() { + return true + } + if gjson.Get(ajson, "properties.ex").Int() > gjson.Get(bjson, "properties.ex").Int() { + return false + } + return gjson.Get(ajson, "id").String() < gjson.Get(bjson, "id").String() + }) + + conn := must(redis.Dial("tcp", ":9851")).(redis.Conn) + must(conn.Do("DROP", key)) + must(nil, conn.Close()) + + go func() { + conn := must(redis.Dial("tcp", ":9851")).(redis.Conn) + defer conn.Close() + for { + ex := time.Now().UnixNano() + int64(exsecs*time.Second) + for i := 0; i < 10; i++ { + id := makeID() + x := rand.Float64()*360 - 180 + y := rand.Float64()*180 - 90 + obj := fmt.Sprintf(`{"type":"Feature","geometry":{"type":"Point","coordinates":[%f,%f]},"properties":{}}`, x, y) + obj, _ = sjson.Set(obj, "properties.ex", ex) + obj, _ = sjson.Set(obj, "id", id) + res := must(redis.String(conn.Do("SET", key, id, "ex", exsecs, "OBJECT", obj))).(string) + if res != "OK" { + panic(fmt.Sprintf("expected 'OK', got '%s'", res)) + } + mu.Lock() + prev := objs.Set(obj) + if prev != nil { + expires.Delete(obj) + } + expires.Set(obj) + mu.Unlock() + } + time.Sleep(time.Second / 20) + } + }() + + go func() { + conn := must(redis.Dial("tcp", ":9851")).(redis.Conn) + defer conn.Close() + for { + time.Sleep(time.Second * 5) + must(conn.Do("AOFSHRINK")) + } + }() + + go func() { + conn := must(redis.Dial("tcp", ":9851")).(redis.Conn) + defer conn.Close() + must(conn.Do("OUTPUT", "JSON")) + for { + time.Sleep(time.Second / 10) + var ids []string + res := must(redis.String(conn.Do("SCAN", key, "LIMIT", 100000000))).(string) + gjson.Get(res, "objects").ForEach(func(_, res gjson.Result) bool { + ids = append(ids, res.Get("id").String()) + return true + }) + now := time.Now().UnixNano() + mu.Lock() + var exobjs []string + expires.Ascend(nil, func(v interface{}) bool { + ex := gjson.Get(v.(string), "properties.ex").Int() + if ex > now { + return false + } + exobjs = append(exobjs, v.(string)) + return true + }) + for _, obj := range exobjs { + objs.Delete(obj) + expires.Delete(obj) + } + fmt.Printf("\rSCAN: %d, ACTUAL: %d ", len(ids), objs.Len()) + mu.Unlock() + } + }() + select {} +} + +func must(v interface{}, err error) interface{} { + if err != nil { + panic(err) + } + return v +} diff --git a/vendor/github.com/cespare/xxhash/LICENSE.txt b/vendor/github.com/cespare/xxhash/LICENSE.txt deleted file mode 100644 index 24b53065..00000000 --- a/vendor/github.com/cespare/xxhash/LICENSE.txt +++ /dev/null @@ -1,22 +0,0 @@ -Copyright (c) 2016 Caleb Spare - -MIT License - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/cespare/xxhash/README.md b/vendor/github.com/cespare/xxhash/README.md deleted file mode 100644 index 0982fd25..00000000 --- a/vendor/github.com/cespare/xxhash/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# xxhash - -[![GoDoc](https://godoc.org/github.com/cespare/xxhash?status.svg)](https://godoc.org/github.com/cespare/xxhash) - -xxhash is a Go implementation of the 64-bit -[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a -high-quality hashing algorithm that is much faster than anything in the Go -standard library. - -The API is very small, taking its cue from the other hashing packages in the -standard library: - - $ go doc github.com/cespare/xxhash ! - package xxhash // import "github.com/cespare/xxhash" - - Package xxhash implements the 64-bit variant of xxHash (XXH64) as described - at http://cyan4973.github.io/xxHash/. - - func New() hash.Hash64 - func Sum64(b []byte) uint64 - func Sum64String(s string) uint64 - -This implementation provides a fast pure-Go implementation and an even faster -assembly implementation for amd64. - -## Benchmarks - -Here are some quick benchmarks comparing the pure-Go and assembly -implementations of Sum64 against another popular Go XXH64 implementation, -[github.com/OneOfOne/xxhash](https://github.com/OneOfOne/xxhash): - -| input size | OneOfOne | cespare (purego) | cespare | -| --- | --- | --- | --- | -| 5 B | 416 MB/s | 720 MB/s | 872 MB/s | -| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s | -| 4 KB | 12727 MB/s | 12999 MB/s | 13026 MB/s | -| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s | - -These numbers were generated with: - -``` -$ go test -benchtime 10s -bench '/OneOfOne,' -$ go test -tags purego -benchtime 10s -bench '/xxhash,' -$ go test -benchtime 10s -bench '/xxhash,' -``` - -## Projects using this package - -- [InfluxDB](https://github.com/influxdata/influxdb) -- [Prometheus](https://github.com/prometheus/prometheus) diff --git a/vendor/github.com/cespare/xxhash/go.mod b/vendor/github.com/cespare/xxhash/go.mod deleted file mode 100644 index 10605a6a..00000000 --- a/vendor/github.com/cespare/xxhash/go.mod +++ /dev/null @@ -1,6 +0,0 @@ -module github.com/cespare/xxhash - -require ( - github.com/OneOfOne/xxhash v1.2.2 - github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 -) diff --git a/vendor/github.com/cespare/xxhash/go.sum b/vendor/github.com/cespare/xxhash/go.sum deleted file mode 100644 index f6b55426..00000000 --- a/vendor/github.com/cespare/xxhash/go.sum +++ /dev/null @@ -1,4 +0,0 @@ -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= -github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/vendor/github.com/cespare/xxhash/rotate.go b/vendor/github.com/cespare/xxhash/rotate.go deleted file mode 100644 index f3eac5eb..00000000 --- a/vendor/github.com/cespare/xxhash/rotate.go +++ /dev/null @@ -1,14 +0,0 @@ -// +build !go1.9 - -package xxhash - -// TODO(caleb): After Go 1.10 comes out, remove this fallback code. - -func rol1(x uint64) uint64 { return (x << 1) | (x >> (64 - 1)) } -func rol7(x uint64) uint64 { return (x << 7) | (x >> (64 - 7)) } -func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) } -func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) } -func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) } -func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) } -func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) } -func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) } diff --git a/vendor/github.com/cespare/xxhash/rotate19.go b/vendor/github.com/cespare/xxhash/rotate19.go deleted file mode 100644 index b99612ba..00000000 --- a/vendor/github.com/cespare/xxhash/rotate19.go +++ /dev/null @@ -1,14 +0,0 @@ -// +build go1.9 - -package xxhash - -import "math/bits" - -func rol1(x uint64) uint64 { return bits.RotateLeft64(x, 1) } -func rol7(x uint64) uint64 { return bits.RotateLeft64(x, 7) } -func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) } -func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) } -func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) } -func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) } -func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) } -func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) } diff --git a/vendor/github.com/cespare/xxhash/xxhash.go b/vendor/github.com/cespare/xxhash/xxhash.go deleted file mode 100644 index f896bd28..00000000 --- a/vendor/github.com/cespare/xxhash/xxhash.go +++ /dev/null @@ -1,168 +0,0 @@ -// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described -// at http://cyan4973.github.io/xxHash/. -package xxhash - -import ( - "encoding/binary" - "hash" -) - -const ( - prime1 uint64 = 11400714785074694791 - prime2 uint64 = 14029467366897019727 - prime3 uint64 = 1609587929392839161 - prime4 uint64 = 9650029242287828579 - prime5 uint64 = 2870177450012600261 -) - -// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where -// possible in the Go code is worth a small (but measurable) performance boost -// by avoiding some MOVQs. Vars are needed for the asm and also are useful for -// convenience in the Go code in a few places where we need to intentionally -// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the -// result overflows a uint64). -var ( - prime1v = prime1 - prime2v = prime2 - prime3v = prime3 - prime4v = prime4 - prime5v = prime5 -) - -type xxh struct { - v1 uint64 - v2 uint64 - v3 uint64 - v4 uint64 - total int - mem [32]byte - n int // how much of mem is used -} - -// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm. -func New() hash.Hash64 { - var x xxh - x.Reset() - return &x -} - -func (x *xxh) Reset() { - x.n = 0 - x.total = 0 - x.v1 = prime1v + prime2 - x.v2 = prime2 - x.v3 = 0 - x.v4 = -prime1v -} - -func (x *xxh) Size() int { return 8 } -func (x *xxh) BlockSize() int { return 32 } - -// Write adds more data to x. It always returns len(b), nil. -func (x *xxh) Write(b []byte) (n int, err error) { - n = len(b) - x.total += len(b) - - if x.n+len(b) < 32 { - // This new data doesn't even fill the current block. - copy(x.mem[x.n:], b) - x.n += len(b) - return - } - - if x.n > 0 { - // Finish off the partial block. - copy(x.mem[x.n:], b) - x.v1 = round(x.v1, u64(x.mem[0:8])) - x.v2 = round(x.v2, u64(x.mem[8:16])) - x.v3 = round(x.v3, u64(x.mem[16:24])) - x.v4 = round(x.v4, u64(x.mem[24:32])) - b = b[32-x.n:] - x.n = 0 - } - - if len(b) >= 32 { - // One or more full blocks left. - b = writeBlocks(x, b) - } - - // Store any remaining partial block. - copy(x.mem[:], b) - x.n = len(b) - - return -} - -func (x *xxh) Sum(b []byte) []byte { - s := x.Sum64() - return append( - b, - byte(s>>56), - byte(s>>48), - byte(s>>40), - byte(s>>32), - byte(s>>24), - byte(s>>16), - byte(s>>8), - byte(s), - ) -} - -func (x *xxh) Sum64() uint64 { - var h uint64 - - if x.total >= 32 { - v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4 - h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4) - h = mergeRound(h, v1) - h = mergeRound(h, v2) - h = mergeRound(h, v3) - h = mergeRound(h, v4) - } else { - h = x.v3 + prime5 - } - - h += uint64(x.total) - - i, end := 0, x.n - for ; i+8 <= end; i += 8 { - k1 := round(0, u64(x.mem[i:i+8])) - h ^= k1 - h = rol27(h)*prime1 + prime4 - } - if i+4 <= end { - h ^= uint64(u32(x.mem[i:i+4])) * prime1 - h = rol23(h)*prime2 + prime3 - i += 4 - } - for i < end { - h ^= uint64(x.mem[i]) * prime5 - h = rol11(h) * prime1 - i++ - } - - h ^= h >> 33 - h *= prime2 - h ^= h >> 29 - h *= prime3 - h ^= h >> 32 - - return h -} - -func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) } -func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) } - -func round(acc, input uint64) uint64 { - acc += input * prime2 - acc = rol31(acc) - acc *= prime1 - return acc -} - -func mergeRound(acc, val uint64) uint64 { - val = round(0, val) - acc ^= val - acc = acc*prime1 + prime4 - return acc -} diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.go b/vendor/github.com/cespare/xxhash/xxhash_amd64.go deleted file mode 100644 index d6176526..00000000 --- a/vendor/github.com/cespare/xxhash/xxhash_amd64.go +++ /dev/null @@ -1,12 +0,0 @@ -// +build !appengine -// +build gc -// +build !purego - -package xxhash - -// Sum64 computes the 64-bit xxHash digest of b. -// -//go:noescape -func Sum64(b []byte) uint64 - -func writeBlocks(x *xxh, b []byte) []byte diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.s b/vendor/github.com/cespare/xxhash/xxhash_amd64.s deleted file mode 100644 index 757f2011..00000000 --- a/vendor/github.com/cespare/xxhash/xxhash_amd64.s +++ /dev/null @@ -1,233 +0,0 @@ -// +build !appengine -// +build gc -// +build !purego - -#include "textflag.h" - -// Register allocation: -// AX h -// CX pointer to advance through b -// DX n -// BX loop end -// R8 v1, k1 -// R9 v2 -// R10 v3 -// R11 v4 -// R12 tmp -// R13 prime1v -// R14 prime2v -// R15 prime4v - -// round reads from and advances the buffer pointer in CX. -// It assumes that R13 has prime1v and R14 has prime2v. -#define round(r) \ - MOVQ (CX), R12 \ - ADDQ $8, CX \ - IMULQ R14, R12 \ - ADDQ R12, r \ - ROLQ $31, r \ - IMULQ R13, r - -// mergeRound applies a merge round on the two registers acc and val. -// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v. -#define mergeRound(acc, val) \ - IMULQ R14, val \ - ROLQ $31, val \ - IMULQ R13, val \ - XORQ val, acc \ - IMULQ R13, acc \ - ADDQ R15, acc - -// func Sum64(b []byte) uint64 -TEXT ·Sum64(SB), NOSPLIT, $0-32 - // Load fixed primes. - MOVQ ·prime1v(SB), R13 - MOVQ ·prime2v(SB), R14 - MOVQ ·prime4v(SB), R15 - - // Load slice. - MOVQ b_base+0(FP), CX - MOVQ b_len+8(FP), DX - LEAQ (CX)(DX*1), BX - - // The first loop limit will be len(b)-32. - SUBQ $32, BX - - // Check whether we have at least one block. - CMPQ DX, $32 - JLT noBlocks - - // Set up initial state (v1, v2, v3, v4). - MOVQ R13, R8 - ADDQ R14, R8 - MOVQ R14, R9 - XORQ R10, R10 - XORQ R11, R11 - SUBQ R13, R11 - - // Loop until CX > BX. -blockLoop: - round(R8) - round(R9) - round(R10) - round(R11) - - CMPQ CX, BX - JLE blockLoop - - MOVQ R8, AX - ROLQ $1, AX - MOVQ R9, R12 - ROLQ $7, R12 - ADDQ R12, AX - MOVQ R10, R12 - ROLQ $12, R12 - ADDQ R12, AX - MOVQ R11, R12 - ROLQ $18, R12 - ADDQ R12, AX - - mergeRound(AX, R8) - mergeRound(AX, R9) - mergeRound(AX, R10) - mergeRound(AX, R11) - - JMP afterBlocks - -noBlocks: - MOVQ ·prime5v(SB), AX - -afterBlocks: - ADDQ DX, AX - - // Right now BX has len(b)-32, and we want to loop until CX > len(b)-8. - ADDQ $24, BX - - CMPQ CX, BX - JG fourByte - -wordLoop: - // Calculate k1. - MOVQ (CX), R8 - ADDQ $8, CX - IMULQ R14, R8 - ROLQ $31, R8 - IMULQ R13, R8 - - XORQ R8, AX - ROLQ $27, AX - IMULQ R13, AX - ADDQ R15, AX - - CMPQ CX, BX - JLE wordLoop - -fourByte: - ADDQ $4, BX - CMPQ CX, BX - JG singles - - MOVL (CX), R8 - ADDQ $4, CX - IMULQ R13, R8 - XORQ R8, AX - - ROLQ $23, AX - IMULQ R14, AX - ADDQ ·prime3v(SB), AX - -singles: - ADDQ $4, BX - CMPQ CX, BX - JGE finalize - -singlesLoop: - MOVBQZX (CX), R12 - ADDQ $1, CX - IMULQ ·prime5v(SB), R12 - XORQ R12, AX - - ROLQ $11, AX - IMULQ R13, AX - - CMPQ CX, BX - JL singlesLoop - -finalize: - MOVQ AX, R12 - SHRQ $33, R12 - XORQ R12, AX - IMULQ R14, AX - MOVQ AX, R12 - SHRQ $29, R12 - XORQ R12, AX - IMULQ ·prime3v(SB), AX - MOVQ AX, R12 - SHRQ $32, R12 - XORQ R12, AX - - MOVQ AX, ret+24(FP) - RET - -// writeBlocks uses the same registers as above except that it uses AX to store -// the x pointer. - -// func writeBlocks(x *xxh, b []byte) []byte -TEXT ·writeBlocks(SB), NOSPLIT, $0-56 - // Load fixed primes needed for round. - MOVQ ·prime1v(SB), R13 - MOVQ ·prime2v(SB), R14 - - // Load slice. - MOVQ b_base+8(FP), CX - MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below - MOVQ b_len+16(FP), DX - LEAQ (CX)(DX*1), BX - SUBQ $32, BX - - // Load vN from x. - MOVQ x+0(FP), AX - MOVQ 0(AX), R8 // v1 - MOVQ 8(AX), R9 // v2 - MOVQ 16(AX), R10 // v3 - MOVQ 24(AX), R11 // v4 - - // We don't need to check the loop condition here; this function is - // always called with at least one block of data to process. -blockLoop: - round(R8) - round(R9) - round(R10) - round(R11) - - CMPQ CX, BX - JLE blockLoop - - // Copy vN back to x. - MOVQ R8, 0(AX) - MOVQ R9, 8(AX) - MOVQ R10, 16(AX) - MOVQ R11, 24(AX) - - // Construct return slice. - // NOTE: It's important that we don't construct a slice that has a base - // pointer off the end of the original slice, as in Go 1.7+ this will - // cause runtime crashes. (See discussion in, for example, - // https://github.com/golang/go/issues/16772.) - // Therefore, we calculate the length/cap first, and if they're zero, we - // keep the old base. This is what the compiler does as well if you - // write code like - // b = b[len(b):] - - // New length is 32 - (CX - BX) -> BX+32 - CX. - ADDQ $32, BX - SUBQ CX, BX - JZ afterSetBase - - MOVQ CX, ret_base+32(FP) - -afterSetBase: - MOVQ BX, ret_len+40(FP) - MOVQ BX, ret_cap+48(FP) // set cap == len - - RET diff --git a/vendor/github.com/cespare/xxhash/xxhash_other.go b/vendor/github.com/cespare/xxhash/xxhash_other.go deleted file mode 100644 index c68d13f8..00000000 --- a/vendor/github.com/cespare/xxhash/xxhash_other.go +++ /dev/null @@ -1,75 +0,0 @@ -// +build !amd64 appengine !gc purego - -package xxhash - -// Sum64 computes the 64-bit xxHash digest of b. -func Sum64(b []byte) uint64 { - // A simpler version would be - // x := New() - // x.Write(b) - // return x.Sum64() - // but this is faster, particularly for small inputs. - - n := len(b) - var h uint64 - - if n >= 32 { - v1 := prime1v + prime2 - v2 := prime2 - v3 := uint64(0) - v4 := -prime1v - for len(b) >= 32 { - v1 = round(v1, u64(b[0:8:len(b)])) - v2 = round(v2, u64(b[8:16:len(b)])) - v3 = round(v3, u64(b[16:24:len(b)])) - v4 = round(v4, u64(b[24:32:len(b)])) - b = b[32:len(b):len(b)] - } - h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4) - h = mergeRound(h, v1) - h = mergeRound(h, v2) - h = mergeRound(h, v3) - h = mergeRound(h, v4) - } else { - h = prime5 - } - - h += uint64(n) - - i, end := 0, len(b) - for ; i+8 <= end; i += 8 { - k1 := round(0, u64(b[i:i+8:len(b)])) - h ^= k1 - h = rol27(h)*prime1 + prime4 - } - if i+4 <= end { - h ^= uint64(u32(b[i:i+4:len(b)])) * prime1 - h = rol23(h)*prime2 + prime3 - i += 4 - } - for ; i < end; i++ { - h ^= uint64(b[i]) * prime5 - h = rol11(h) * prime1 - } - - h ^= h >> 33 - h *= prime2 - h ^= h >> 29 - h *= prime3 - h ^= h >> 32 - - return h -} - -func writeBlocks(x *xxh, b []byte) []byte { - v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4 - for len(b) >= 32 { - v1 = round(v1, u64(b[0:8:len(b)])) - v2 = round(v2, u64(b[8:16:len(b)])) - v3 = round(v3, u64(b[16:24:len(b)])) - v4 = round(v4, u64(b[24:32:len(b)])) - b = b[32:len(b):len(b)] - } - x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4 - return b -} diff --git a/vendor/github.com/cespare/xxhash/xxhash_safe.go b/vendor/github.com/cespare/xxhash/xxhash_safe.go deleted file mode 100644 index dfa15ab7..00000000 --- a/vendor/github.com/cespare/xxhash/xxhash_safe.go +++ /dev/null @@ -1,10 +0,0 @@ -// +build appengine - -// This file contains the safe implementations of otherwise unsafe-using code. - -package xxhash - -// Sum64String computes the 64-bit xxHash digest of s. -func Sum64String(s string) uint64 { - return Sum64([]byte(s)) -} diff --git a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go deleted file mode 100644 index d2b64e8b..00000000 --- a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go +++ /dev/null @@ -1,30 +0,0 @@ -// +build !appengine - -// This file encapsulates usage of unsafe. -// xxhash_safe.go contains the safe implementations. - -package xxhash - -import ( - "reflect" - "unsafe" -) - -// Sum64String computes the 64-bit xxHash digest of s. -// It may be faster than Sum64([]byte(s)) by avoiding a copy. -// -// TODO(caleb): Consider removing this if an optimization is ever added to make -// it unnecessary: https://golang.org/issue/2205. -// -// TODO(caleb): We still have a function call; we could instead write Go/asm -// copies of Sum64 for strings to squeeze out a bit more speed. -func Sum64String(s string) uint64 { - // See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ - // for some discussion about this unsafe conversion. - var b []byte - bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data - bh.Len = len(s) - bh.Cap = len(s) - return Sum64(b) -} diff --git a/vendor/github.com/tidwall/rhh/LICENSE b/vendor/github.com/tidwall/rhh/LICENSE deleted file mode 100644 index f251424f..00000000 --- a/vendor/github.com/tidwall/rhh/LICENSE +++ /dev/null @@ -1,13 +0,0 @@ -Copyright 2019, Joshua J Baker - -Permission to use, copy, modify, and/or distribute this software for any -purpose with or without fee is hereby granted, provided that the above -copyright notice and this permission notice appear in all copies. - -THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION -OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. \ No newline at end of file diff --git a/vendor/github.com/tidwall/rhh/README.md b/vendor/github.com/tidwall/rhh/README.md deleted file mode 100644 index c45309c0..00000000 --- a/vendor/github.com/tidwall/rhh/README.md +++ /dev/null @@ -1,51 +0,0 @@ -# `rhh` (Robin Hood Hashmap) - -[![GoDoc](https://img.shields.io/badge/api-reference-blue.svg?style=flat-square)](https://godoc.org/github.com/tidwall/rhh) - -A simple and efficient hashmap package for Go using the -[`xxhash`](http://www.xxhash.com) algorithm, -[open addressing](https://en.wikipedia.org/wiki/Hash_table#Open_addressing), and -[robin hood hashing](https://en.wikipedia.org/wiki/Hash_table#Robin_Hood_hashing). - -This is an alternative to the standard [Go map](https://golang.org/ref/spec#Map_types). - -# Getting Started - -## Installing - -To start using `rhh`, install Go and run `go get`: - -```sh -$ go get -u github.com/tidwall/rhh -``` - -This will retrieve the library. - -## Usage - -The `Map` type works similar to a standard Go map, and includes four methods: -`Set`, `Get`, `Delete`, `Len`. - -```go -var m rhh.Map -m.Set("Hello", "Dolly!") -val, _ := m.Get("Hello") -fmt.Printf("%v\n", val) -val, _ = m.Delete("Hello") -fmt.Printf("%v\n", val) -val, _ = m.Get("Hello") -fmt.Printf("%v\n", val) - -// Output: -// Dolly! -// Dolly! -// -``` - -## Contact - -Josh Baker [@tidwall](http://twitter.com/tidwall) - -## License - -`rhh` source code is available under the MIT [License](/LICENSE). diff --git a/vendor/github.com/tidwall/rhh/go.mod b/vendor/github.com/tidwall/rhh/go.mod deleted file mode 100644 index 30239354..00000000 --- a/vendor/github.com/tidwall/rhh/go.mod +++ /dev/null @@ -1,8 +0,0 @@ -module github.com/tidwall/rhh - -go 1.15 - -require ( - github.com/cespare/xxhash v1.1.0 - github.com/tidwall/lotsa v1.0.1 -) diff --git a/vendor/github.com/tidwall/rhh/go.sum b/vendor/github.com/tidwall/rhh/go.sum deleted file mode 100644 index 44b20b43..00000000 --- a/vendor/github.com/tidwall/rhh/go.sum +++ /dev/null @@ -1,6 +0,0 @@ -github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= -github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/tidwall/lotsa v1.0.1 h1:w4gpDvI7RdkgbMC0q5ndKqG2ffrwCgerUY/gM2TYkH4= -github.com/tidwall/lotsa v1.0.1/go.mod h1:X6NiU+4yHA3fE3Puvpnn1XMDrFZrE9JO2/w+UMuqgR8= diff --git a/vendor/github.com/tidwall/rhh/map.go b/vendor/github.com/tidwall/rhh/map.go deleted file mode 100644 index b443f7f9..00000000 --- a/vendor/github.com/tidwall/rhh/map.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2019 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an ISC-style -// license that can be found in the LICENSE file. - -package rhh - -import ( - "github.com/cespare/xxhash" -) - -const ( - loadFactor = 0.85 // must be above 50% - dibBitSize = 16 // 0xFFFF - hashBitSize = 64 - dibBitSize // 0xFFFFFFFFFFFF - maxHash = ^uint64(0) >> dibBitSize // max 28,147,497,671,0655 - maxDIB = ^uint64(0) >> hashBitSize // max 65,535 -) - -type entry struct { - hdib uint64 // bitfield { hash:48 dib:16 } - key string // user key - value interface{} // user value -} - -func (e *entry) dib() int { - return int(e.hdib & maxDIB) -} -func (e *entry) hash() int { - return int(e.hdib >> dibBitSize) -} -func (e *entry) setDIB(dib int) { - e.hdib = e.hdib>>dibBitSize<> dibBitSize) -} - -// Map is a hashmap. Like map[string]interface{} -type Map struct { - cap int - length int - mask int - growAt int - shrinkAt int - buckets []entry -} - -// New returns a new Map. Like map[string]interface{} -func New(cap int) *Map { - m := new(Map) - m.cap = cap - sz := 8 - for sz < m.cap { - sz *= 2 - } - m.buckets = make([]entry, sz) - m.mask = len(m.buckets) - 1 - m.growAt = int(float64(len(m.buckets)) * loadFactor) - m.shrinkAt = int(float64(len(m.buckets)) * (1 - loadFactor)) - return m -} - -func (m *Map) resize(newCap int) { - nmap := New(newCap) - for i := 0; i < len(m.buckets); i++ { - if m.buckets[i].dib() > 0 { - nmap.set(m.buckets[i].hash(), m.buckets[i].key, m.buckets[i].value) - } - } - cap := m.cap - *m = *nmap - m.cap = cap -} - -// Set assigns a value to a key. -// Returns the previous value, or false when no value was assigned. -func (m *Map) Set(key string, value interface{}) (interface{}, bool) { - if len(m.buckets) == 0 { - *m = *New(0) - } - if m.length >= m.growAt { - m.resize(len(m.buckets) * 2) - } - return m.set(m.hash(key), key, value) -} - -func (m *Map) set(hash int, key string, value interface{}) (interface{}, bool) { - e := entry{makeHDIB(hash, 1), key, value} - i := e.hash() & m.mask - for { - if m.buckets[i].dib() == 0 { - m.buckets[i] = e - m.length++ - return nil, false - } - if e.hash() == m.buckets[i].hash() && e.key == m.buckets[i].key { - old := m.buckets[i].value - m.buckets[i].value = e.value - return old, true - } - if m.buckets[i].dib() < e.dib() { - e, m.buckets[i] = m.buckets[i], e - } - i = (i + 1) & m.mask - e.setDIB(e.dib() + 1) - } -} - -// Get returns a value for a key. -// Returns false when no value has been assign for key. -func (m *Map) Get(key string) (interface{}, bool) { - if len(m.buckets) == 0 { - return nil, false - } - hash := m.hash(key) - i := hash & m.mask - for { - if m.buckets[i].dib() == 0 { - return nil, false - } - if m.buckets[i].hash() == hash && m.buckets[i].key == key { - return m.buckets[i].value, true - } - i = (i + 1) & m.mask - } -} - -// Len returns the number of values in map. -func (m *Map) Len() int { - return m.length -} - -// Delete deletes a value for a key. -// Returns the deleted value, or false when no value was assigned. -func (m *Map) Delete(key string) (interface{}, bool) { - if len(m.buckets) == 0 { - return nil, false - } - hash := m.hash(key) - i := hash & m.mask - for { - if m.buckets[i].dib() == 0 { - return nil, false - } - if m.buckets[i].hash() == hash && m.buckets[i].key == key { - old := m.buckets[i].value - m.remove(i) - return old, true - } - i = (i + 1) & m.mask - } -} - -func (m *Map) remove(i int) { - m.buckets[i].setDIB(0) - for { - pi := i - i = (i + 1) & m.mask - if m.buckets[i].dib() <= 1 { - m.buckets[pi] = entry{} - break - } - m.buckets[pi] = m.buckets[i] - m.buckets[pi].setDIB(m.buckets[pi].dib() - 1) - } - m.length-- - if len(m.buckets) > m.cap && m.length <= m.shrinkAt { - m.resize(m.length) - } -} - -// Range iterates over all key/values. -// It's not safe to call or Set or Delete while ranging. -func (m *Map) Range(iter func(key string, value interface{}) bool) { - for i := 0; i < len(m.buckets); i++ { - if m.buckets[i].dib() > 0 { - if !iter(m.buckets[i].key, m.buckets[i].value) { - return - } - } - } -} - -// GetPos gets a single keys/value nearby a position -// The pos param can be any valid uint64. Useful for grabbing a random item -// from the map. -// It's not safe to call or Set or Delete while ranging. -func (m *Map) GetPos(pos uint64) (key string, value interface{}, ok bool) { - for i := 0; i < len(m.buckets); i++ { - index := (pos + uint64(i)) & uint64(m.mask) - if m.buckets[index].dib() > 0 { - return m.buckets[index].key, m.buckets[index].value, true - } - } - return "", nil, false -} diff --git a/vendor/github.com/tidwall/rhh/u64.go b/vendor/github.com/tidwall/rhh/u64.go deleted file mode 100644 index 5bf83a3a..00000000 --- a/vendor/github.com/tidwall/rhh/u64.go +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright 2019 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an ISC-style -// license that can be found in the LICENSE file. - -package rhh - -import ( - "reflect" - "unsafe" - - "github.com/cespare/xxhash" -) - -type entryU64 struct { - hdib uint64 // bitfield { hash:48 dib:16 } - key uint64 // user key - value interface{} // user value -} - -func (e *entryU64) dib() int { - return int(e.hdib & maxDIB) -} -func (e *entryU64) hash() int { - return int(e.hdib >> dibBitSize) -} -func (e *entryU64) setDIB(dib int) { - e.hdib = e.hdib>>dibBitSize<> dibBitSize) -} - -// MapU64 is a map. Like map[uint64]interface{} -type MapU64 struct { - cap int - length int - mask int - growAt int - shrinkAt int - buckets []entryU64 -} - -// NewU64 returns a new map. Like map[uint64]interface{} -func NewU64(cap int) *MapU64 { - m := new(MapU64) - m.cap = cap - sz := 8 - for sz < m.cap { - sz *= 2 - } - m.buckets = make([]entryU64, sz) - m.mask = len(m.buckets) - 1 - m.growAt = int(float64(len(m.buckets)) * loadFactor) - m.shrinkAt = int(float64(len(m.buckets)) * (1 - loadFactor)) - return m -} - -func (m *MapU64) resize(newCap int) { - nmap := NewU64(newCap) - for i := 0; i < len(m.buckets); i++ { - if m.buckets[i].dib() > 0 { - nmap.set(m.buckets[i].hash(), m.buckets[i].key, m.buckets[i].value) - } - } - cap := m.cap - *m = *nmap - m.cap = cap -} - -// Set assigns a value to a key. -// Returns the previous value, or false when no value was assigned. -func (m *MapU64) Set(key uint64, value interface{}) (interface{}, bool) { - if len(m.buckets) == 0 { - *m = *NewU64(0) - } - if m.length >= m.growAt { - m.resize(len(m.buckets) * 2) - } - return m.set(m.hash(key), key, value) -} - -func (m *MapU64) set(hash int, key uint64, value interface{}) (interface{}, bool) { - e := entryU64{makeHDIB(hash, 1), key, value} - i := e.hash() & m.mask - for { - if m.buckets[i].dib() == 0 { - m.buckets[i] = e - m.length++ - return nil, false - } - if e.hash() == m.buckets[i].hash() && e.key == m.buckets[i].key { - old := m.buckets[i].value - m.buckets[i].value = e.value - return old, true - } - if m.buckets[i].dib() < e.dib() { - e, m.buckets[i] = m.buckets[i], e - } - i = (i + 1) & m.mask - e.setDIB(e.dib() + 1) - } -} - -// Get returns a value for a key. -// Returns false when no value has been assign for key. -func (m *MapU64) Get(key uint64) (interface{}, bool) { - if len(m.buckets) == 0 { - return nil, false - } - hash := m.hash(key) - i := hash & m.mask - for { - if m.buckets[i].dib() == 0 { - return nil, false - } - if m.buckets[i].hash() == hash && m.buckets[i].key == key { - return m.buckets[i].value, true - } - i = (i + 1) & m.mask - } -} - -// Len returns the number of values in map. -func (m *MapU64) Len() int { - return m.length -} - -// Delete deletes a value for a key. -// Returns the deleted value, or false when no value was assigned. -func (m *MapU64) Delete(key uint64) (interface{}, bool) { - if len(m.buckets) == 0 { - return nil, false - } - hash := m.hash(key) - i := hash & m.mask - for { - if m.buckets[i].dib() == 0 { - return nil, false - } - if m.buckets[i].hash() == hash && m.buckets[i].key == key { - old := m.buckets[i].value - m.remove(i) - return old, true - } - i = (i + 1) & m.mask - } -} - -func (m *MapU64) remove(i int) { - m.buckets[i].setDIB(0) - for { - pi := i - i = (i + 1) & m.mask - if m.buckets[i].dib() <= 1 { - m.buckets[pi] = entryU64{} - break - } - m.buckets[pi] = m.buckets[i] - m.buckets[pi].setDIB(m.buckets[pi].dib() - 1) - } - m.length-- - if len(m.buckets) > m.cap && m.length <= m.shrinkAt { - m.resize(m.length) - } -} - -// Range iterates overall all key/values. -// It's not safe to call or Set or Delete while ranging. -func (m *MapU64) Range(iter func(key uint64, value interface{}) bool) { - for i := 0; i < len(m.buckets); i++ { - if m.buckets[i].dib() > 0 { - if !iter(m.buckets[i].key, m.buckets[i].value) { - return - } - } - } -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 280d5b1d..bbbad1ca 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -59,8 +59,6 @@ github.com/aws/aws-sdk-go/service/sts github.com/aws/aws-sdk-go/service/sts/stsiface # github.com/beorn7/perks v1.0.1 github.com/beorn7/perks/quantile -# github.com/cespare/xxhash v1.1.0 -github.com/cespare/xxhash # github.com/cespare/xxhash/v2 v2.1.1 github.com/cespare/xxhash/v2 # github.com/davecgh/go-spew v1.1.1 @@ -206,9 +204,6 @@ github.com/tidwall/redcon # github.com/tidwall/resp v0.1.0 ## explicit github.com/tidwall/resp -# github.com/tidwall/rhh v1.1.1 -## explicit -github.com/tidwall/rhh # github.com/tidwall/rtred v0.1.2 github.com/tidwall/rtred github.com/tidwall/rtred/base