ledisdb/ledis/t_ttl.go

195 lines
4.0 KiB
Go

package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/ledisdb/leveldb"
"time"
)
var (
errExpMetaKey = errors.New("invalid expire meta key")
errExpTimeKey = errors.New("invalid expire time key")
)
type retireCallback func(*tx, []byte) int64
type elimination struct {
db *DB
exp2Tx []*tx
exp2Retire []retireCallback
}
var errExpType = errors.New("invalid expire type")
func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
buf := make([]byte, len(key)+11)
buf[0] = db.index
buf[1] = expTimeType
buf[2] = dataType
pos := 3
binary.BigEndian.PutUint64(buf[pos:], uint64(when))
pos += 8
copy(buf[pos:], key)
return buf
}
func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
buf := make([]byte, len(key)+3)
buf[0] = db.index
buf[1] = expMetaType
buf[2] = dataType
pos := 3
copy(buf[pos:], key)
return buf
}
func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType {
return 0, nil, errExpMetaKey
}
return mk[2], mk[3:], nil
}
func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType {
return 0, nil, 0, errExpTimeKey
}
return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil
}
func (db *DB) expire(t *tx, dataType byte, key []byte, duration int64) {
db.expireAt(t, dataType, key, time.Now().Unix()+duration)
}
func (db *DB) expireAt(t *tx, dataType byte, key []byte, when int64) {
mk := db.expEncodeMetaKey(dataType, key)
tk := db.expEncodeTimeKey(dataType, key, when)
t.Put(tk, mk)
t.Put(mk, PutInt64(when))
}
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
mk := db.expEncodeMetaKey(dataType, key)
if t, err = Int64(db.db.Get(mk)); err != nil || t == 0 {
t = -1
} else {
t -= time.Now().Unix()
if t <= 0 {
t = -1
}
// if t == -1 : to remove ????
}
return t, err
}
func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) {
mk := db.expEncodeMetaKey(dataType, key)
if v, err := db.db.Get(mk); err != nil {
return 0, err
} else if v == nil {
return 0, nil
} else if when, err2 := Int64(v, nil); err2 != nil {
return 0, err2
} else {
tk := db.expEncodeTimeKey(dataType, key, when)
t.Delete(mk)
t.Delete(tk)
return 1, nil
}
}
func (db *DB) expFlush(t *tx, dataType byte) (err error) {
minKey := make([]byte, 3)
minKey[0] = db.index
minKey[1] = expTimeType
minKey[2] = dataType
maxKey := make([]byte, 3)
maxKey[0] = db.index
maxKey[1] = expMetaType
maxKey[2] = dataType + 1
_, err = db.flushRegion(t, minKey, maxKey)
err = t.Commit()
return
}
//////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////
func newEliminator(db *DB) *elimination {
eli := new(elimination)
eli.db = db
eli.exp2Tx = make([]*tx, maxDataType)
eli.exp2Retire = make([]retireCallback, maxDataType)
return eli
}
func (eli *elimination) regRetireContext(dataType byte, t *tx, onRetire retireCallback) {
// todo .. need to ensure exist - mapExpMetaType[expType]
eli.exp2Tx[dataType] = t
eli.exp2Retire[dataType] = onRetire
}
// call by outside ... (from *db to another *db)
func (eli *elimination) active() {
now := time.Now().Unix()
db := eli.db
dbGet := db.db.Get
minKey := db.expEncodeTimeKey(noneType, nil, 0)
maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
tk := it.RawKey()
mk := it.RawValue()
dt, k, _, err := db.expDecodeTimeKey(tk)
if err != nil {
continue
}
t := eli.exp2Tx[dt]
onRetire := eli.exp2Retire[dt]
if tk == nil || onRetire == nil {
continue
}
t.Lock()
if exp, err := Int64(dbGet(mk)); err == nil {
// check expire again
if exp <= now {
onRetire(t, k)
t.Delete(tk)
t.Delete(mk)
t.Commit()
}
}
t.Unlock()
}
it.Close()
return
}