mirror of https://github.com/tidwall/tile38.git
Update hook expiration logic
This commit is contained in:
parent
7e10a80319
commit
decafae2d7
|
@ -1,114 +0,0 @@
|
|||
package expire
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Item is a something that can expire
|
||||
type Item interface {
|
||||
Expires() time.Time
|
||||
}
|
||||
|
||||
// List of expireable items
|
||||
type List struct {
|
||||
mu sync.Mutex
|
||||
queue queue
|
||||
bgrun bool
|
||||
Expired func(item Item)
|
||||
}
|
||||
|
||||
// Push an item onto the queue
|
||||
func (list *List) Push(item Item) {
|
||||
unix := item.Expires().UnixNano()
|
||||
list.mu.Lock()
|
||||
if !list.bgrun {
|
||||
list.bgrun = true
|
||||
go list.bg()
|
||||
}
|
||||
list.queue.push(unix, item)
|
||||
list.mu.Unlock()
|
||||
}
|
||||
|
||||
func (list *List) bg() {
|
||||
now := time.Now().UnixNano()
|
||||
for {
|
||||
list.mu.Lock()
|
||||
if list.queue.len == 0 {
|
||||
list.bgrun = false
|
||||
list.mu.Unlock()
|
||||
break
|
||||
}
|
||||
if now > list.queue.peek().unix { // now.After(list.queue.peek().unix)
|
||||
n := list.queue.pop()
|
||||
exfn := list.Expired
|
||||
list.mu.Unlock()
|
||||
if exfn != nil {
|
||||
exfn(n.item)
|
||||
}
|
||||
} else {
|
||||
list.mu.Unlock()
|
||||
time.Sleep(time.Second / 10)
|
||||
now = time.Now().UnixNano()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type qnode struct {
|
||||
unix int64
|
||||
item Item
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
nodes []qnode
|
||||
len int
|
||||
}
|
||||
|
||||
func (q *queue) push(unix int64, item Item) {
|
||||
if q.nodes == nil {
|
||||
q.nodes = make([]qnode, 2)
|
||||
} else {
|
||||
q.nodes = append(q.nodes, qnode{})
|
||||
}
|
||||
i := q.len + 1
|
||||
j := i / 2
|
||||
for i > 1 && q.nodes[j].unix > unix {
|
||||
q.nodes[i] = q.nodes[j]
|
||||
i = j
|
||||
j = j / 2
|
||||
}
|
||||
q.nodes[i].unix = unix
|
||||
q.nodes[i].item = item
|
||||
q.len++
|
||||
}
|
||||
|
||||
func (q *queue) peek() qnode {
|
||||
if q.len == 0 {
|
||||
return qnode{}
|
||||
}
|
||||
return q.nodes[1]
|
||||
}
|
||||
|
||||
func (q *queue) pop() qnode {
|
||||
if q.len == 0 {
|
||||
return qnode{}
|
||||
}
|
||||
n := q.nodes[1]
|
||||
q.nodes[1] = q.nodes[q.len]
|
||||
q.len--
|
||||
var j, k int
|
||||
i := 1
|
||||
for i != q.len+1 {
|
||||
k = q.len + 1
|
||||
j = 2 * i
|
||||
if j <= q.len && q.nodes[j].unix < q.nodes[k].unix {
|
||||
k = j
|
||||
}
|
||||
if j+1 <= q.len && q.nodes[j+1].unix < q.nodes[k].unix {
|
||||
k = j + 1
|
||||
}
|
||||
q.nodes[i] = q.nodes[k]
|
||||
i = k
|
||||
}
|
||||
return n
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
package expire
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type testItem struct {
|
||||
str string
|
||||
exp time.Time
|
||||
}
|
||||
|
||||
func (item *testItem) Expires() time.Time {
|
||||
return item.exp
|
||||
}
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
var list List
|
||||
now := time.Now()
|
||||
list.Push(&testItem{"13", now.Add(13)})
|
||||
list.Push(&testItem{"11", now.Add(11)})
|
||||
list.Push(&testItem{"14", now.Add(14)})
|
||||
list.Push(&testItem{"10", now.Add(10)})
|
||||
list.Push(&testItem{"15", now.Add(15)})
|
||||
list.Push(&testItem{"12", now.Add(12)})
|
||||
|
||||
var lunix int64
|
||||
for list.queue.len > 0 {
|
||||
n2 := list.queue.pop()
|
||||
if n2.unix < lunix {
|
||||
t.Fatal("out of order")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandomQueue(t *testing.T) {
|
||||
N := 1000
|
||||
now := time.Now()
|
||||
var list List
|
||||
for i := 0; i < N; i++ {
|
||||
list.Push(&testItem{fmt.Sprintf("%d", i),
|
||||
now.Add(time.Duration(rand.Float64() * float64(time.Second)))})
|
||||
}
|
||||
var items []Item
|
||||
for list.queue.len > 0 {
|
||||
n1 := list.queue.peek()
|
||||
n2 := list.queue.pop()
|
||||
if n1 != n2 {
|
||||
t.Fatal("mismatch")
|
||||
}
|
||||
if n1.unix > n2.unix {
|
||||
t.Fatal("out of order")
|
||||
}
|
||||
items = append(items, n2.item)
|
||||
}
|
||||
|
||||
if !sort.SliceIsSorted(items, func(i, j int) bool {
|
||||
return items[i].Expires().Before(items[j].Expires())
|
||||
}) {
|
||||
t.Fatal("out of order")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestExpires(t *testing.T) {
|
||||
N := 1000
|
||||
now := time.Now()
|
||||
var list List
|
||||
for i := 0; i < N; i++ {
|
||||
list.Push(&testItem{fmt.Sprintf("%d", i),
|
||||
now.Add(time.Duration(rand.Float64() * float64(time.Second)))})
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(N)
|
||||
var items []Item
|
||||
list.Expired = func(item Item) {
|
||||
items = append(items, item)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
if len(items) != N {
|
||||
t.Fatal("wrong result")
|
||||
}
|
||||
}
|
|
@ -508,6 +508,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
|
|||
server.cols = btree.NewNonConcurrent(byCollectionKey)
|
||||
server.groupHooks = btree.NewNonConcurrent(byGroupHook)
|
||||
server.groupObjects = btree.NewNonConcurrent(byGroupObject)
|
||||
server.hookExpires = btree.NewNonConcurrent(byHookExpires)
|
||||
server.hooks = make(map[string]*Hook)
|
||||
server.hooksOut = make(map[string]*Hook)
|
||||
server.hookTree = &rtree.RTree{}
|
||||
|
|
|
@ -18,32 +18,70 @@ func (s *Server) backgroundExpiring() {
|
|||
func() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
now := time.Now().UnixNano()
|
||||
var ids []string
|
||||
var msgs []*Message
|
||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
||||
col := v.(*collectionKeyContainer)
|
||||
ids = col.col.Expired(now, ids[:0])
|
||||
for _, id := range ids {
|
||||
msgs = append(msgs, &Message{
|
||||
Args: []string{"del", col.key, id},
|
||||
})
|
||||
}
|
||||
return true
|
||||
})
|
||||
for _, msg := range msgs {
|
||||
_, d, err := s.cmdDel(msg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
log.Debugf("Expired %d items\n", len(msgs))
|
||||
}
|
||||
now := time.Now()
|
||||
s.backgroundExpireObjects(now)
|
||||
s.backgroundExpireHooks(now)
|
||||
}()
|
||||
time.Sleep(bgExpireDelay)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) backgroundExpireObjects(now time.Time) {
|
||||
nano := now.UnixNano()
|
||||
var ids []string
|
||||
var msgs []*Message
|
||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
||||
col := v.(*collectionKeyContainer)
|
||||
ids = col.col.Expired(nano, ids[:0])
|
||||
for _, id := range ids {
|
||||
msgs = append(msgs, &Message{
|
||||
Args: []string{"del", col.key, id},
|
||||
})
|
||||
}
|
||||
return true
|
||||
})
|
||||
for _, msg := range msgs {
|
||||
_, d, err := s.cmdDel(msg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
log.Debugf("Expired %d objects\n", len(msgs))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *Server) backgroundExpireHooks(now time.Time) {
|
||||
var msgs []*Message
|
||||
s.hookExpires.Ascend(nil, func(v interface{}) bool {
|
||||
h := v.(*Hook)
|
||||
if h.expires.After(now) {
|
||||
return false
|
||||
}
|
||||
msg := &Message{}
|
||||
if h.channel {
|
||||
msg.Args = []string{"delchan", h.Name}
|
||||
} else {
|
||||
msg.Args = []string{"delhook", h.Name}
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
return true
|
||||
})
|
||||
|
||||
for _, msg := range msgs {
|
||||
_, d, err := s.cmdDelHook(msg, msg.Args[0] == "delchan")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
log.Debugf("Expired %d hooks\n", len(msgs))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
// for good measure.
|
||||
prevHook.Signal()
|
||||
if !hook.expires.IsZero() {
|
||||
s.hookex.Push(hook)
|
||||
s.hookExpires.Set(hook)
|
||||
}
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -182,6 +182,9 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
prevHook.Close()
|
||||
delete(s.hooks, name)
|
||||
delete(s.hooksOut, name)
|
||||
if !prevHook.expires.IsZero() {
|
||||
s.hookExpires.Delete(prevHook)
|
||||
}
|
||||
s.groupDisconnectHook(name)
|
||||
}
|
||||
|
||||
|
@ -224,7 +227,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
|
||||
hook.Open() // Opens a goroutine to notify the hook
|
||||
if !hook.expires.IsZero() {
|
||||
s.hookex.Push(hook)
|
||||
s.hookExpires.Set(hook)
|
||||
}
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -235,6 +238,18 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
return NOMessage, d, nil
|
||||
}
|
||||
|
||||
func byHookExpires(a, b interface{}) bool {
|
||||
ha := a.(*Hook)
|
||||
hb := b.(*Hook)
|
||||
if ha.expires.Before(hb.expires) {
|
||||
return true
|
||||
}
|
||||
if ha.expires.After(hb.expires) {
|
||||
return false
|
||||
}
|
||||
return ha.Name < hb.Name
|
||||
}
|
||||
|
||||
func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
||||
res resp.Value, d commandDetails, err error,
|
||||
) {
|
||||
|
@ -254,6 +269,9 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
|||
// remove hook from maps
|
||||
delete(s.hooks, hook.Name)
|
||||
delete(s.hooksOut, hook.Name)
|
||||
if !hook.expires.IsZero() {
|
||||
s.hookExpires.Delete(hook)
|
||||
}
|
||||
// remove any hook / object connections
|
||||
s.groupDisconnectHook(hook.Name)
|
||||
// remove hook from spatial index
|
||||
|
@ -314,6 +332,9 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
|
|||
// remove hook from maps
|
||||
delete(s.hooks, hook.Name)
|
||||
delete(s.hooksOut, hook.Name)
|
||||
if !hook.expires.IsZero() {
|
||||
s.hookExpires.Delete(hook)
|
||||
}
|
||||
// remove any hook / object connections
|
||||
s.groupDisconnectHook(hook.Name)
|
||||
// remove hook from spatial index
|
||||
|
@ -344,35 +365,6 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
|
|||
return
|
||||
}
|
||||
|
||||
// possiblyExpireHook will evaluate a hook by it's name for expiration and
|
||||
// purge it from the database if needed. This operation is called from an
|
||||
// independent goroutine
|
||||
func (s *Server) possiblyExpireHook(name string) {
|
||||
s.mu.Lock()
|
||||
if h, ok := s.hooks[name]; ok {
|
||||
if !h.expires.IsZero() && time.Now().After(h.expires) {
|
||||
// purge from database
|
||||
msg := &Message{}
|
||||
if h.channel {
|
||||
msg.Args = []string{"delchan", h.Name}
|
||||
} else {
|
||||
msg.Args = []string{"delhook", h.Name}
|
||||
}
|
||||
_, d, err := s.cmdDelHook(msg, h.channel)
|
||||
if err != nil {
|
||||
s.mu.Unlock()
|
||||
panic(err)
|
||||
}
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
s.mu.Unlock()
|
||||
panic(err)
|
||||
}
|
||||
log.Debugf("purged hook %v", h.Name)
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) cmdHooks(msg *Message, channel bool) (
|
||||
res resp.Value, err error,
|
||||
) {
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"github.com/tidwall/tile38/internal/collection"
|
||||
"github.com/tidwall/tile38/internal/deadline"
|
||||
"github.com/tidwall/tile38/internal/endpoint"
|
||||
"github.com/tidwall/tile38/internal/expire"
|
||||
"github.com/tidwall/tile38/internal/log"
|
||||
)
|
||||
|
||||
|
@ -123,13 +122,13 @@ type Server struct {
|
|||
hooksOut map[string]*Hook // hooks with "outside" detection
|
||||
groupHooks *btree.BTree // hooks that are connected to objects
|
||||
groupObjects *btree.BTree // objects that are connected to hooks
|
||||
hookExpires *btree.BTree // queue of all hooks marked for expiration
|
||||
|
||||
aofconnM map[net.Conn]io.Closer
|
||||
luascripts *lScriptMap
|
||||
luapool *lStatePool
|
||||
|
||||
pubsub *pubsub
|
||||
hookex expire.List
|
||||
|
||||
monconnsMu sync.RWMutex
|
||||
monconns map[net.Conn]bool // monitor connections
|
||||
|
@ -179,14 +178,9 @@ func Serve(opts Options) error {
|
|||
|
||||
groupHooks: btree.NewNonConcurrent(byGroupHook),
|
||||
groupObjects: btree.NewNonConcurrent(byGroupObject),
|
||||
hookExpires: btree.NewNonConcurrent(byHookExpires),
|
||||
}
|
||||
|
||||
server.hookex.Expired = func(item expire.Item) {
|
||||
switch v := item.(type) {
|
||||
case *Hook:
|
||||
server.possiblyExpireHook(v.Name)
|
||||
}
|
||||
}
|
||||
server.epc = endpoint.NewManager(server)
|
||||
server.luascripts = server.newScriptMap()
|
||||
server.luapool = server.newPool()
|
||||
|
|
Loading…
Reference in New Issue