diff --git a/ledis/const.go b/ledis/const.go index 024e38c..ce5a4e6 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -5,28 +5,22 @@ import ( ) const ( - kvType byte = iota + 1 - hashType - hSizeType - listType - lMetaType - zsetType - zSizeType - zScoreType + noneType byte = 0 + kvType byte = 1 + hashType byte = 2 + hSizeType byte = 3 + listType byte = 4 + lMetaType byte = 5 + zsetType byte = 6 + zSizeType byte = 7 + zScoreType byte = 8 + binType byte = 9 + binMetaType byte = 10 - kvExpType - kvExpMetaType - lExpType - lExpMetaType - hExpType - hExpMetaType - zExpType - zExpMetaType + maxDataType byte = 100 - binType - binMetaType - bExpType - bExpMetaType + expTimeType byte = 101 + expMetaType byte = 102 ) const ( diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index cdb4e94..58b93e8 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -26,11 +26,11 @@ func (db *DB) FlushAll() (drop int64, err error) { func (db *DB) newEliminator() *elimination { eliminator := newEliminator(db) - eliminator.regRetireContext(kvExpType, db.kvTx, db.delete) - eliminator.regRetireContext(lExpType, db.listTx, db.lDelete) - eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete) - eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete) - eliminator.regRetireContext(bExpType, db.binTx, db.bDelete) + eliminator.regRetireContext(kvType, db.kvTx, db.delete) + eliminator.regRetireContext(listType, db.listTx, db.lDelete) + eliminator.regRetireContext(hashType, db.hashTx, db.hDelete) + eliminator.regRetireContext(zsetType, db.zsetTx, db.zDelete) + eliminator.regRetireContext(binType, db.binTx, db.bDelete) return eliminator } @@ -38,7 +38,7 @@ func (db *DB) newEliminator() *elimination { func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) { it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen) for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + t.Delete(it.RawKey()) drop++ if drop&1023 == 0 { if err = t.Commit(); err != nil { diff --git a/ledis/t_bin.go b/ledis/t_bin.go index ddc7e94..2d62047 100644 --- a/ledis/t_bin.go +++ b/ledis/t_bin.go @@ -355,7 +355,7 @@ func (db *DB) bExpireAt(key []byte, when int64) (int64, error) { if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { return 0, err } else { - db.expireAt(t, bExpType, key, when) + db.expireAt(t, binType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -407,7 +407,7 @@ func (db *DB) BDelete(key []byte) (drop int64, err error) { defer t.Unlock() drop = db.bDelete(t, key) - db.rmExpire(t, bExpType, key) + db.rmExpire(t, binType, key) err = t.Commit() return @@ -736,7 +736,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32 // clear the old data in case db.bDelete(t, dstkey) - db.rmExpire(t, bExpType, dstkey) + db.rmExpire(t, binType, dstkey) // set data db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) @@ -786,7 +786,7 @@ func (db *DB) BTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(bExpType, key) + return db.ttl(binType, key) } func (db *DB) BPersist(key []byte) (int64, error) { @@ -798,7 +798,7 @@ func (db *DB) BPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, bExpType, key) + n, err := db.rmExpire(t, binType, key) if err != nil { return 0, err } @@ -825,7 +825,7 @@ func (db *DB) bFlush() (drop int64, err error) { maxKey[1] = binMetaType + 1 drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, bExpType) + err = db.expFlush(t, binType) err = t.Commit() return diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 68e9bec..711d83b 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -151,7 +151,7 @@ func (db *DB) hExpireAt(key []byte, when int64) (int64, error) { if hlen, err := db.HLen(key); err != nil || hlen == 0 { return 0, err } else { - db.expireAt(t, hExpType, key, when) + db.expireAt(t, hashType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -304,7 +304,7 @@ func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) { if size <= 0 { size = 0 t.Delete(sk) - db.rmExpire(t, hExpType, key) + db.rmExpire(t, hashType, key) } else { t.Put(sk, PutInt64(size)) } @@ -428,7 +428,7 @@ func (db *DB) HClear(key []byte) (int64, error) { defer t.Unlock() num := db.hDelete(t, key) - db.rmExpire(t, hExpType, key) + db.rmExpire(t, hashType, key) err := t.Commit() return num, err @@ -445,7 +445,7 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) { } db.hDelete(t, key) - db.rmExpire(t, hExpType, key) + db.rmExpire(t, hashType, key) } err := t.Commit() @@ -466,7 +466,7 @@ func (db *DB) hFlush() (drop int64, err error) { defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, hExpType) + err = db.expFlush(t, hashType) err = t.Commit() return @@ -530,7 +530,7 @@ func (db *DB) HTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(hExpType, key) + return db.ttl(hashType, key) } func (db *DB) HPersist(key []byte) (int64, error) { @@ -542,7 +542,7 @@ func (db *DB) HPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, hExpType, key) + n, err := db.rmExpire(t, hashType, key) if err != nil { return 0, err } diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 30850e8..b62700b 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -100,7 +100,7 @@ func (db *DB) setExpireAt(key []byte, when int64) (int64, error) { if exist, err := db.Exists(key); err != nil || exist == 0 { return 0, err } else { - db.expireAt(t, kvExpType, key, when) + db.expireAt(t, kvType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -132,7 +132,7 @@ func (db *DB) Del(keys ...[]byte) (int64, error) { for i, k := range keys { t.Delete(codedKeys[i]) - db.rmExpire(t, kvExpType, k) + db.rmExpire(t, kvType, k) } err := t.Commit() @@ -317,7 +317,7 @@ func (db *DB) flush() (drop int64, err error) { defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, kvExpType) + err = db.expFlush(t, kvType) err = t.Commit() return @@ -382,7 +382,7 @@ func (db *DB) TTL(key []byte) (int64, error) { return -1, err } - return db.ttl(kvExpType, key) + return db.ttl(kvType, key) } func (db *DB) Persist(key []byte) (int64, error) { @@ -393,7 +393,7 @@ func (db *DB) Persist(key []byte) (int64, error) { t := db.kvTx t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, kvExpType, key) + n, err := db.rmExpire(t, kvType, key) if err != nil { return 0, err } diff --git a/ledis/t_list.go b/ledis/t_list.go index 34dea19..39ca6ed 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -175,7 +175,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { t.Delete(itemKey) size := db.lSetMeta(metaKey, headSeq, tailSeq) if size == 0 { - db.rmExpire(t, hExpType, key) + db.rmExpire(t, hashType, key) } err = t.Commit() @@ -264,7 +264,7 @@ func (db *DB) lExpireAt(key []byte, when int64) (int64, error) { if llen, err := db.LLen(key); err != nil || llen == 0 { return 0, err } else { - db.expireAt(t, lExpType, key, when) + db.expireAt(t, listType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -384,7 +384,7 @@ func (db *DB) LClear(key []byte) (int64, error) { defer t.Unlock() num := db.lDelete(t, key) - db.rmExpire(t, lExpType, key) + db.rmExpire(t, listType, key) err := t.Commit() return num, err @@ -401,7 +401,7 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) { } db.lDelete(t, key) - db.rmExpire(t, lExpType, key) + db.rmExpire(t, listType, key) } @@ -423,7 +423,7 @@ func (db *DB) lFlush() (drop int64, err error) { defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, lExpType) + err = db.expFlush(t, listType) err = t.Commit() return @@ -450,7 +450,7 @@ func (db *DB) LTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(lExpType, key) + return db.ttl(listType, key) } func (db *DB) LPersist(key []byte) (int64, error) { @@ -462,7 +462,7 @@ func (db *DB) LPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, lExpType, key) + n, err := db.rmExpire(t, listType, key) if err != nil { return 0, err } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index 1750095..f1614bd 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -7,30 +7,28 @@ import ( "time" ) -var mapExpMetaType = map[byte]byte{ - kvExpType: kvExpMetaType, - lExpType: lExpMetaType, - hExpType: hExpMetaType, - zExpType: zExpMetaType, - bExpType: bExpMetaType} +var ( + errExpMetaKey = errors.New("invalid expire meta key") + errExpTimeKey = errors.New("invalid expire time key") +) type retireCallback func(*tx, []byte) int64 type elimination struct { db *DB - exp2Tx map[byte]*tx - exp2Retire map[byte]retireCallback + exp2Tx []*tx + exp2Retire []retireCallback } var errExpType = errors.New("invalid expire type") -func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte { - // format : db[8] / expType[8] / when[64] / key[...] - buf := make([]byte, len(key)+10) +func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte { + buf := make([]byte, len(key)+11) buf[0] = db.index - buf[1] = expType - pos := 2 + buf[1] = expTimeType + buf[2] = dataType + pos := 3 binary.BigEndian.PutUint64(buf[pos:], uint64(when)) pos += 8 @@ -40,43 +38,49 @@ func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte { return buf } -func (db *DB) expEncodeMetaKey(expType byte, key []byte) []byte { - // format : db[8] / expType[8] / key[...] - buf := make([]byte, len(key)+2) +func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte { + buf := make([]byte, len(key)+3) buf[0] = db.index - buf[1] = expType - pos := 2 + buf[1] = expMetaType + buf[2] = dataType + pos := 3 copy(buf[pos:], key) return buf } -// usage : separate out the original key -func (db *DB) expDecodeMetaKey(mk []byte) []byte { - if len(mk) <= 2 { - // check db ? check type ? - return nil +func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { + if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType { + return 0, nil, errExpMetaKey } - return mk[2:] + return mk[2], mk[3:], nil } -func (db *DB) expire(t *tx, expType byte, key []byte, duration int64) { - db.expireAt(t, expType, key, time.Now().Unix()+duration) +func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) { + if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType { + return 0, nil, 0, errExpTimeKey + } + + return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil } -func (db *DB) expireAt(t *tx, expType byte, key []byte, when int64) { - mk := db.expEncodeMetaKey(expType+1, key) - tk := db.expEncodeTimeKey(expType, key, when) +func (db *DB) expire(t *tx, dataType byte, key []byte, duration int64) { + db.expireAt(t, dataType, key, time.Now().Unix()+duration) +} + +func (db *DB) expireAt(t *tx, dataType byte, key []byte, when int64) { + mk := db.expEncodeMetaKey(dataType, key) + tk := db.expEncodeTimeKey(dataType, key, when) t.Put(tk, mk) t.Put(mk, PutInt64(when)) } -func (db *DB) ttl(expType byte, key []byte) (t int64, err error) { - mk := db.expEncodeMetaKey(expType+1, key) +func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { + mk := db.expEncodeMetaKey(dataType, key) if t, err = Int64(db.db.Get(mk)); err != nil || t == 0 { t = -1 @@ -91,8 +95,8 @@ func (db *DB) ttl(expType byte, key []byte) (t int64, err error) { return t, err } -func (db *DB) rmExpire(t *tx, expType byte, key []byte) (int64, error) { - mk := db.expEncodeMetaKey(expType+1, key) +func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) { + mk := db.expEncodeMetaKey(dataType, key) if v, err := db.db.Get(mk); err != nil { return 0, err } else if v == nil { @@ -100,26 +104,23 @@ func (db *DB) rmExpire(t *tx, expType byte, key []byte) (int64, error) { } else if when, err2 := Int64(v, nil); err2 != nil { return 0, err2 } else { - tk := db.expEncodeTimeKey(expType, key, when) + tk := db.expEncodeTimeKey(dataType, key, when) t.Delete(mk) t.Delete(tk) return 1, nil } } -func (db *DB) expFlush(t *tx, expType byte) (err error) { - expMetaType, ok := mapExpMetaType[expType] - if !ok { - return errExpType - } - - minKey := make([]byte, 2) +func (db *DB) expFlush(t *tx, dataType byte) (err error) { + minKey := make([]byte, 3) minKey[0] = db.index - minKey[1] = expType + minKey[1] = expTimeType + minKey[2] = dataType - maxKey := make([]byte, 2) + maxKey := make([]byte, 3) maxKey[0] = db.index - maxKey[1] = expMetaType + 1 + maxKey[1] = expMetaType + maxKey[2] = dataType + 1 _, err = db.flushRegion(t, minKey, maxKey) err = t.Commit() @@ -133,17 +134,17 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) { func newEliminator(db *DB) *elimination { eli := new(elimination) eli.db = db - eli.exp2Tx = make(map[byte]*tx) - eli.exp2Retire = make(map[byte]retireCallback) + eli.exp2Tx = make([]*tx, maxDataType) + eli.exp2Retire = make([]retireCallback, maxDataType) return eli } -func (eli *elimination) regRetireContext(expType byte, t *tx, onRetire retireCallback) { +func (eli *elimination) regRetireContext(dataType byte, t *tx, onRetire retireCallback) { // todo .. need to ensure exist - mapExpMetaType[expType] - eli.exp2Tx[expType] = t - eli.exp2Retire[expType] = onRetire + eli.exp2Tx[dataType] = t + eli.exp2Retire[dataType] = onRetire } // call by outside ... (from *db to another *db) @@ -151,56 +152,43 @@ func (eli *elimination) active() { now := time.Now().Unix() db := eli.db dbGet := db.db.Get - expKeys := make([][]byte, 0, 1024) - expTypes := [...]byte{kvExpType, lExpType, hExpType, zExpType} - for _, et := range expTypes { - // search those keys' which expire till the moment - minKey := db.expEncodeTimeKey(et, nil, 0) - maxKey := db.expEncodeTimeKey(et, nil, now+1) - expKeys = expKeys[0:0] + minKey := db.expEncodeTimeKey(noneType, nil, 0) + maxKey := db.expEncodeTimeKey(maxDataType, nil, now) - t, _ := eli.exp2Tx[et] - onRetire, _ := eli.exp2Retire[et] - if t == nil || onRetire == nil { - // todo : log error + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + tk := it.RawKey() + mk := it.RawValue() + + dt, k, _, err := db.expDecodeTimeKey(tk) + if err != nil { continue } - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) - for it.Valid() { - for i := 1; i < 512 && it.Valid(); i++ { - expKeys = append(expKeys, it.Key(), it.Value()) - it.Next() + t := eli.exp2Tx[dt] + onRetire := eli.exp2Retire[dt] + if tk == nil || onRetire == nil { + continue + } + + t.Lock() + + if exp, err := Int64(dbGet(mk)); err == nil { + // check expire again + if exp <= now { + onRetire(t, k) + t.Delete(tk) + t.Delete(mk) + + t.Commit() } - var cnt int = len(expKeys) - if cnt == 0 { - continue - } + } - t.Lock() - var mk, ek, k []byte - for i := 0; i < cnt; i += 2 { - ek, mk = expKeys[i], expKeys[i+1] - if exp, err := Int64(dbGet(mk)); err == nil { - // check expire again - if exp > now { - continue - } - - // delete keys - k = db.expDecodeMetaKey(mk) - onRetire(t, k) - t.Delete(ek) - t.Delete(mk) - } - } - t.Commit() - t.Unlock() - } // end : it - it.Close() - } // end : expType + t.Unlock() + } + it.Close() return } diff --git a/ledis/t_zset.go b/ledis/t_zset.go index d3e14e6..e9b9c4b 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -260,7 +260,7 @@ func (db *DB) zExpireAt(key []byte, when int64) (int64, error) { if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 { return 0, err } else { - db.expireAt(t, zExpType, key, when) + db.expireAt(t, zsetType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -314,7 +314,7 @@ func (db *DB) zIncrSize(t *tx, key []byte, delta int64) (int64, error) { if size <= 0 { size = 0 t.Delete(sk) - db.rmExpire(t, zExpType, key) + db.rmExpire(t, zsetType, key) } else { t.Put(sk, PutInt64(size)) } @@ -752,7 +752,7 @@ func (db *DB) zFlush() (drop int64, err error) { } it.Close() - db.expFlush(t, zExpType) + db.expFlush(t, zsetType) err = t.Commit() return @@ -818,7 +818,7 @@ func (db *DB) ZTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(zExpType, key) + return db.ttl(zsetType, key) } func (db *DB) ZPersist(key []byte) (int64, error) { @@ -830,7 +830,7 @@ func (db *DB) ZPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, zExpType, key) + n, err := db.rmExpire(t, zsetType, key) if err != nil { return 0, err }