tile38/internal/server/expire.go

124 lines
2.9 KiB
Go

package server
import (
"math/rand"
"time"
"github.com/tidwall/rhh"
"github.com/tidwall/tile38/internal/log"
)
// clearIDExpires clears a single item from the expires list.
func (s *Server) clearIDExpires(key, id string) (cleared bool) {
if s.expires.Len() > 0 {
if idm, ok := s.expires.Get(key); ok {
if _, ok := idm.(*rhh.Map).Delete(id); ok {
if idm.(*rhh.Map).Len() == 0 {
s.expires.Delete(key)
}
return true
}
}
}
return false
}
// clearKeyExpires clears all items that are marked as expires from a single key.
func (s *Server) clearKeyExpires(key string) {
s.expires.Delete(key)
}
// moveKeyExpires moves all items that are marked as expires from a key to a newKey.
func (s *Server) moveKeyExpires(key, newKey string) {
if idm, ok := s.expires.Delete(key); ok {
s.expires.Set(newKey, idm)
}
}
// expireAt marks an item as expires at a specific time.
func (s *Server) expireAt(key, id string, at time.Time) {
idm, ok := s.expires.Get(key)
if !ok {
idm = rhh.New(0)
s.expires.Set(key, idm)
}
idm.(*rhh.Map).Set(id, at.UnixNano())
}
// getExpires returns the when an item expires.
func (s *Server) getExpires(key, id string) (at time.Time, ok bool) {
if s.expires.Len() > 0 {
if idm, ok := s.expires.Get(key); ok {
if atv, ok := idm.(*rhh.Map).Get(id); ok {
return time.Unix(0, atv.(int64)), true
}
}
}
return time.Time{}, false
}
// hasExpired returns true if an item has expired.
func (s *Server) hasExpired(key, id string) bool {
if at, ok := s.getExpires(key, id); ok {
return time.Now().After(at)
}
return false
}
const bgExpireDelay = time.Second / 10
const bgExpireSegmentSize = 20
// expirePurgeSweep is ran from backgroundExpiring operation and performs
// segmented sweep of the expires list
func (s *Server) expirePurgeSweep(rng *rand.Rand) (purged int) {
now := time.Now().UnixNano()
s.mu.Lock()
defer s.mu.Unlock()
if s.expires.Len() == 0 {
return 0
}
for i := 0; i < bgExpireSegmentSize; i++ {
if key, idm, ok := s.expires.GetPos(rng.Uint64()); ok {
id, atv, ok := idm.(*rhh.Map).GetPos(rng.Uint64())
if ok {
if now > atv.(int64) {
// expired, purge from database
msg := &Message{}
msg.Args = []string{"del", key, id}
_, d, err := s.cmdDel(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
purged++
}
}
}
// recycle the lock
s.mu.Unlock()
s.mu.Lock()
}
return purged
}
// backgroundExpiring watches for when items that have expired must be purged
// from the database. It's executes 10 times a seconds.
func (s *Server) backgroundExpiring() {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
if s.stopServer.on() {
return
}
purged := s.expirePurgeSweep(rng)
if purged > bgExpireSegmentSize/4 {
// do another purge immediately
continue
} else {
// back off
time.Sleep(bgExpireDelay)
}
}
}