tile38/controller/expire.go

125 lines
2.7 KiB
Go
Raw Normal View History

2016-07-15 22:22:48 +03:00
package controller
import (
"time"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/controller/server"
)
// clearAllExpires removes all items that are marked at expires.
func (c *Controller) clearAllExpires() {
c.expires = make(map[string]map[string]time.Time)
}
// clearIDExpires will clear a single item from the expires list.
2016-12-02 19:14:34 +03:00
func (c *Controller) clearIDExpires(key, id string) int {
2016-07-15 22:22:48 +03:00
m := c.expires[key]
if m == nil {
2016-12-02 19:14:34 +03:00
return 0
2016-07-15 22:22:48 +03:00
}
delete(m, id)
if len(m) == 0 {
delete(c.expires, key)
}
2016-12-02 19:14:34 +03:00
return 1
2016-07-15 22:22:48 +03:00
}
// clearKeyExpires will clear all items that are marked as expires from a single key.
func (c *Controller) clearKeyExpires(key string) {
delete(c.expires, key)
}
// expireAt will mark an item as expires at a specific time.
func (c *Controller) expireAt(key, id string, at time.Time) {
m := c.expires[key]
if m == nil {
m = make(map[string]time.Time)
c.expires[key] = m
}
m[id] = at
}
// getExpires will return the when the item expires.
func (c *Controller) getExpires(key, id string) (at time.Time, ok bool) {
m := c.expires[key]
if m == nil {
ok = false
return
}
at, ok = m[id]
return
}
// backgroundExpiring watches for when items must expire from the database.
// It's runs through every item that has been marked as expires five times
// per second.
func (c *Controller) backgroundExpiring() {
2016-12-28 21:16:28 +03:00
const stop = 0
const delay = 1
const nodelay = 2
2016-07-15 22:22:48 +03:00
for {
2016-12-28 21:16:28 +03:00
op := func() int {
c.mu.RLock()
defer c.mu.RUnlock()
2016-10-16 18:50:02 +03:00
if c.stopBackgroundExpiring {
2016-12-28 21:16:28 +03:00
return stop
2016-10-16 18:50:02 +03:00
}
// Only excute for leaders. Followers should ignore.
if c.config.FollowHost == "" {
now := time.Now()
for key, m := range c.expires {
for id, at := range m {
if now.After(at) {
// issue a DEL command
2016-12-28 21:16:28 +03:00
c.mu.RUnlock()
c.mu.Lock()
// double check because locks were swapped
var del bool
if m2, ok := c.expires[key]; ok {
if at2, ok := m2[id]; ok {
if now.After(at2) {
del = true
}
}
}
if !del {
return nodelay
}
2016-10-16 18:50:02 +03:00
c.statsExpired++
msg := &server.Message{}
msg.Values = resp.MultiBulkValue("del", key, id).Array()
msg.Command = "del"
_, d, err := c.cmdDel(msg)
if err != nil {
2016-12-28 21:16:28 +03:00
c.mu.Unlock()
2016-10-16 18:50:02 +03:00
log.Fatal(err)
continue
}
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
2016-12-28 21:16:28 +03:00
c.mu.Unlock()
2016-10-16 18:50:02 +03:00
log.Fatal(err)
continue
}
2016-12-28 21:16:28 +03:00
c.mu.Unlock()
c.mu.RLock()
return nodelay
2016-07-15 22:22:48 +03:00
}
}
}
}
2016-12-28 21:16:28 +03:00
return delay
2016-10-16 18:50:02 +03:00
}()
2016-12-28 21:16:28 +03:00
switch op {
case stop:
2016-10-16 18:50:02 +03:00
return
2016-12-28 21:16:28 +03:00
case delay:
time.Sleep(time.Millisecond * 100)
case nodelay:
time.Sleep(time.Microsecond)
2016-07-15 22:22:48 +03:00
}
}
}