diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go new file mode 100644 index 0000000..167acce --- /dev/null +++ b/ledis/t_ttl.go @@ -0,0 +1,208 @@ +package ledis + +import ( + "encoding/binary" + "errors" + "github.com/siddontang/go-leveldb/leveldb" + "time" +) + +var mapExpMetaType = map[byte]byte{ + kvExpType: kvExpMetaType, + lExpType: lExpMetaType, + hExpType: hExpMetaType, + zExpType: zExpMetaType} + +type retireCallback func(*tx, []byte) int64 + +type Elimination struct { + db *DB + exp2Tx map[byte]*tx + exp2Retire map[byte]retireCallback +} + +var errExpType = errors.New("invalid expire type") + +func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte { + // format : db[8] / expType[8] / when[64] / key[...] + buf := make([]byte, len(key)+10) + + buf[0] = db.index + buf[1] = expType + pos := 2 + + binary.BigEndian.PutUint64(buf[pos:], uint64(when)) + pos += 8 + + copy(buf[pos:], key) + + return buf +} + +func (db *DB) expEncodeMetaKey(expType byte, key []byte) []byte { + // format : db[8] / expType[8] / key[...] + buf := make([]byte, len(key)+2) + + buf[0] = db.index + buf[1] = expType + pos := 2 + + copy(buf[pos:], key) + + return buf +} + +// usage : separate out the original key +func (db *DB) expDecodeMetaKey(mk []byte) []byte { + if len(mk) <= 2 { + // check db ? check type ? + return nil + } + + return mk[2:] +} + +func (db *DB) expire(t *tx, expType byte, key []byte, duration int64) { + db.expireAt(t, expType, key, time.Now().Unix()+duration) +} + +func (db *DB) expireAt(t *tx, expType byte, key []byte, when int64) { + mk := db.expEncodeMetaKey(expType+1, key) + tk := db.expEncodeTimeKey(expType, key, when) + + t.Put(tk, mk) + t.Put(mk, PutInt64(when)) +} + +func (db *DB) ttl(expType byte, key []byte) (t int64, err error) { + mk := db.expEncodeMetaKey(expType+1, key) + + if t, err = Int64(db.db.Get(mk)); err != nil || t == 0 { + t = -1 + } else { + t -= time.Now().Unix() + if t <= 0 { + t = -1 + } + // if t == -1 : to remove ???? + } + + return t, err +} + +func (db *DB) rmExpire(t *tx, expType byte, key []byte) { + mk := db.expEncodeMetaKey(expType+1, key) + when, err := Int64(db.db.Get(mk)) + if err == nil && when > 0 { + tk := db.expEncodeTimeKey(expType, key, when) + + t.Delete(mk) + t.Delete(tk) + } +} + +func (db *DB) expFlush(t *tx, expType byte) (err error) { + expMetaType, ok := mapExpMetaType[expType] + if !ok { + return errExpType + } + + drop := 0 + + minKey := make([]byte, 2) + minKey[0] = db.index + minKey[1] = expType + + maxKey := make([]byte, 2) + maxKey[0] = db.index + maxKey[1] = expMetaType + 1 + + it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + drop++ + if drop&1023 == 0 { + if err = t.Commit(); err != nil { + return + } + } + } + + err = t.Commit() + return +} + +////////////////////////////////////////////////////////// +// +////////////////////////////////////////////////////////// + +func newEliminator(db *DB) *Elimination { + eli := new(Elimination) + eli.db = db + eli.exp2Tx = make(map[byte]*tx) + eli.exp2Retire = make(map[byte]retireCallback) + return eli +} + +func (eli *Elimination) regRetireContext(expType byte, t *tx, onRetire retireCallback) { + eli.exp2Tx[expType] = t + eli.exp2Retire[expType] = onRetire +} + +// call by outside ... (from *db to another *db) +func (eli *Elimination) active() { + now := time.Now().Unix() + db := eli.db + dbGet := db.db.Get + expKeys := make([][]byte, 0, 1024) + expTypes := [...]byte{kvExpType, lExpType, hExpType, zExpType} + + for _, et := range expTypes { + // search those keys' which expire till the moment + minKey := db.expEncodeTimeKey(et, nil, 0) + maxKey := db.expEncodeTimeKey(et, nil, now+1) + expKeys = expKeys[0:0] + + t, _ := eli.exp2Tx[et] + onRetire, _ := eli.exp2Retire[et] + if t == nil || onRetire == nil { + // todo : log error + continue + } + + it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + for it.Valid() { + for i := 1; i < 512 && it.Valid(); i++ { + expKeys = append(expKeys, it.Key(), it.Value()) + it.Next() + } + + var cnt int = len(expKeys) + if cnt == 0 { + continue + } + + t.Lock() + var mk, ek, k []byte + for i := 0; i < cnt; i += 2 { + ek, mk = expKeys[i], expKeys[i+1] + if exp, err := Int64(dbGet(mk)); err == nil { + // check expire again + if exp > now { + continue + } + + // delete keys + k = db.expDecodeMetaKey(mk) + onRetire(t, k) + t.Delete(ek) + t.Delete(mk) + } + } + t.Commit() + t.Unlock() + } // end : it + } // end : expType + + return +} diff --git a/ledis/t_ttl_test.go b/ledis/t_ttl_test.go new file mode 100644 index 0000000..0483ae7 --- /dev/null +++ b/ledis/t_ttl_test.go @@ -0,0 +1,172 @@ +package ledis + +import ( + "fmt" + "sync" + "testing" + "time" +) + +var m sync.Mutex + +func TestKvExpire(t *testing.T) { + db := getTestDB() + m.Lock() + defer m.Unlock() + + k := []byte("ttl_a") + ek := []byte("ttl_b") + db.Set(k, []byte("1")) + + if ok, _ := db.Expire(k, 10); ok != 1 { + t.Fatal(ok) + } + + // err - expire on an inexisting key + if ok, _ := db.Expire(ek, 10); ok != 0 { + t.Fatal(ok) + } + + // err - duration is zero + if ok, err := db.Expire(k, 0); err == nil || ok != 0 { + t.Fatal(fmt.Sprintf("res = %d, err = %s", ok, err)) + } + + // err - duration is negative + if ok, err := db.Expire(k, -10); err == nil || ok != 0 { + t.Fatal(fmt.Sprintf("res = %d, err = %s", ok, err)) + } +} + +func TestKvExpireAt(t *testing.T) { + db := getTestDB() + m.Lock() + defer m.Unlock() + + k := []byte("ttl_a") + ek := []byte("ttl_b") + db.Set(k, []byte("1")) + + now := time.Now().Unix() + + if ok, _ := db.ExpireAt(k, now+5); ok != 1 { + t.Fatal(ok) + } + + // err - expire on an inexisting key + if ok, _ := db.ExpireAt(ek, now+5); ok != 0 { + t.Fatal(ok) + } + + // err - expire with the current time + if ok, err := db.ExpireAt(k, now); err == nil || ok != 0 { + t.Fatal(fmt.Sprintf("res = %d, err = %s", ok, err)) + } + + // err - expire with the time before + if ok, err := db.ExpireAt(k, now-5); err == nil || ok != 0 { + t.Fatal(fmt.Sprintf("res = %d, err = %s", ok, err)) + } +} + +func TestKvTtl(t *testing.T) { + db := getTestDB() + m.Lock() + defer m.Unlock() + + k := []byte("ttl_a") + ek := []byte("ttl_b") + + db.Set(k, []byte("1")) + db.Expire(k, 2) + + if tRemain, _ := db.Ttl(k); tRemain != 2 { + t.Fatal(tRemain) + } + + // err - check ttl on an inexisting key + if tRemain, _ := db.Ttl(ek); tRemain != -1 { + t.Fatal(tRemain) + } + + db.Del(k) + if tRemain, _ := db.Ttl(k); tRemain != -1 { + t.Fatal(tRemain) + } +} + +func TestKvExpCompose(t *testing.T) { + db := getTestDB() + m.Lock() + defer m.Unlock() + + k0 := []byte("ttl_a") + k1 := []byte("ttl_b") + k2 := []byte("ttl_c") + + db.Set(k0, k0) + db.Set(k1, k1) + db.Set(k2, k2) + + db.Expire(k0, 5) + db.Expire(k1, 2) + db.Expire(k2, 60) + + if tRemain, _ := db.Ttl(k0); tRemain != 5 { + t.Fatal(tRemain) + } + if tRemain, _ := db.Ttl(k1); tRemain != 2 { + t.Fatal(tRemain) + } + if tRemain, _ := db.Ttl(k2); tRemain != 60 { + t.Fatal(tRemain) + } + + // after 1 sec + time.Sleep(1 * time.Second) + if tRemain, _ := db.Ttl(k0); tRemain != 4 { + t.Fatal(tRemain) + } + if tRemain, _ := db.Ttl(k1); tRemain != 1 { + t.Fatal(tRemain) + } + + // after 2 sec + time.Sleep(2 * time.Second) + if tRemain, _ := db.Ttl(k1); tRemain != -1 { + t.Fatal(tRemain) + } + if v, _ := db.Get(k1); v != nil { + t.Fatal(v) + } + + if tRemain, _ := db.Ttl(k0); tRemain != 2 { + t.Fatal(tRemain) + } + if v, _ := db.Get(k0); v == nil { + t.Fatal(v) + } + + // refresh the expiration of key + if tRemain, _ := db.Ttl(k2); !(0 < tRemain && tRemain < 60) { + t.Fatal(tRemain) + } + + if ok, _ := db.Expire(k2, 100); ok != 1 { + t.Fatal(false) + } + + if tRemain, _ := db.Ttl(k2); tRemain != 100 { + t.Fatal(tRemain) + } + + // expire an inexisting key + if ok, _ := db.Expire(k1, 10); ok == 1 { + t.Fatal(false) + } + if tRemain, _ := db.Ttl(k1); tRemain != -1 { + t.Fatal(tRemain) + } + + return +}