refactor t/l, reduce code

not backwards compatibility,
This commit is contained in:
siddontang 2014-07-09 17:01:29 +08:00
parent 99e90cf837
commit 5e33e67d89
8 changed files with 129 additions and 147 deletions

View File

@ -5,28 +5,22 @@ import (
) )
const ( const (
kvType byte = iota + 1 noneType byte = 0
hashType kvType byte = 1
hSizeType hashType byte = 2
listType hSizeType byte = 3
lMetaType listType byte = 4
zsetType lMetaType byte = 5
zSizeType zsetType byte = 6
zScoreType zSizeType byte = 7
zScoreType byte = 8
binType byte = 9
binMetaType byte = 10
kvExpType maxDataType byte = 100
kvExpMetaType
lExpType
lExpMetaType
hExpType
hExpMetaType
zExpType
zExpMetaType
binType expTimeType byte = 101
binMetaType expMetaType byte = 102
bExpType
bExpMetaType
) )
const ( const (

View File

@ -26,11 +26,11 @@ func (db *DB) FlushAll() (drop int64, err error) {
func (db *DB) newEliminator() *elimination { func (db *DB) newEliminator() *elimination {
eliminator := newEliminator(db) eliminator := newEliminator(db)
eliminator.regRetireContext(kvExpType, db.kvTx, db.delete) eliminator.regRetireContext(kvType, db.kvTx, db.delete)
eliminator.regRetireContext(lExpType, db.listTx, db.lDelete) eliminator.regRetireContext(listType, db.listTx, db.lDelete)
eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete) eliminator.regRetireContext(hashType, db.hashTx, db.hDelete)
eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete) eliminator.regRetireContext(zsetType, db.zsetTx, db.zDelete)
eliminator.regRetireContext(bExpType, db.binTx, db.bDelete) eliminator.regRetireContext(binType, db.binTx, db.bDelete)
return eliminator return eliminator
} }
@ -38,7 +38,7 @@ func (db *DB) newEliminator() *elimination {
func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) { func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) {
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen) it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.RawKey())
drop++ drop++
if drop&1023 == 0 { if drop&1023 == 0 {
if err = t.Commit(); err != nil { if err = t.Commit(); err != nil {

View File

@ -355,7 +355,7 @@ func (db *DB) bExpireAt(key []byte, when int64) (int64, error) {
if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, bExpType, key, when) db.expireAt(t, binType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -407,7 +407,7 @@ func (db *DB) BDelete(key []byte) (drop int64, err error) {
defer t.Unlock() defer t.Unlock()
drop = db.bDelete(t, key) drop = db.bDelete(t, key)
db.rmExpire(t, bExpType, key) db.rmExpire(t, binType, key)
err = t.Commit() err = t.Commit()
return return
@ -736,7 +736,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
// clear the old data in case // clear the old data in case
db.bDelete(t, dstkey) db.bDelete(t, dstkey)
db.rmExpire(t, bExpType, dstkey) db.rmExpire(t, binType, dstkey)
// set data // set data
db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff)
@ -786,7 +786,7 @@ func (db *DB) BTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(bExpType, key) return db.ttl(binType, key)
} }
func (db *DB) BPersist(key []byte) (int64, error) { func (db *DB) BPersist(key []byte) (int64, error) {
@ -798,7 +798,7 @@ func (db *DB) BPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, bExpType, key) n, err := db.rmExpire(t, binType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -825,7 +825,7 @@ func (db *DB) bFlush() (drop int64, err error) {
maxKey[1] = binMetaType + 1 maxKey[1] = binMetaType + 1
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, bExpType) err = db.expFlush(t, binType)
err = t.Commit() err = t.Commit()
return return

View File

@ -151,7 +151,7 @@ func (db *DB) hExpireAt(key []byte, when int64) (int64, error) {
if hlen, err := db.HLen(key); err != nil || hlen == 0 { if hlen, err := db.HLen(key); err != nil || hlen == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, hExpType, key, when) db.expireAt(t, hashType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -304,7 +304,7 @@ func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) {
if size <= 0 { if size <= 0 {
size = 0 size = 0
t.Delete(sk) t.Delete(sk)
db.rmExpire(t, hExpType, key) db.rmExpire(t, hashType, key)
} else { } else {
t.Put(sk, PutInt64(size)) t.Put(sk, PutInt64(size))
} }
@ -428,7 +428,7 @@ func (db *DB) HClear(key []byte) (int64, error) {
defer t.Unlock() defer t.Unlock()
num := db.hDelete(t, key) num := db.hDelete(t, key)
db.rmExpire(t, hExpType, key) db.rmExpire(t, hashType, key)
err := t.Commit() err := t.Commit()
return num, err return num, err
@ -445,7 +445,7 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
} }
db.hDelete(t, key) db.hDelete(t, key)
db.rmExpire(t, hExpType, key) db.rmExpire(t, hashType, key)
} }
err := t.Commit() err := t.Commit()
@ -466,7 +466,7 @@ func (db *DB) hFlush() (drop int64, err error) {
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, hExpType) err = db.expFlush(t, hashType)
err = t.Commit() err = t.Commit()
return return
@ -530,7 +530,7 @@ func (db *DB) HTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(hExpType, key) return db.ttl(hashType, key)
} }
func (db *DB) HPersist(key []byte) (int64, error) { func (db *DB) HPersist(key []byte) (int64, error) {
@ -542,7 +542,7 @@ func (db *DB) HPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, hExpType, key) n, err := db.rmExpire(t, hashType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -100,7 +100,7 @@ func (db *DB) setExpireAt(key []byte, when int64) (int64, error) {
if exist, err := db.Exists(key); err != nil || exist == 0 { if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, kvExpType, key, when) db.expireAt(t, kvType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -132,7 +132,7 @@ func (db *DB) Del(keys ...[]byte) (int64, error) {
for i, k := range keys { for i, k := range keys {
t.Delete(codedKeys[i]) t.Delete(codedKeys[i])
db.rmExpire(t, kvExpType, k) db.rmExpire(t, kvType, k)
} }
err := t.Commit() err := t.Commit()
@ -317,7 +317,7 @@ func (db *DB) flush() (drop int64, err error) {
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, kvExpType) err = db.expFlush(t, kvType)
err = t.Commit() err = t.Commit()
return return
@ -382,7 +382,7 @@ func (db *DB) TTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(kvExpType, key) return db.ttl(kvType, key)
} }
func (db *DB) Persist(key []byte) (int64, error) { func (db *DB) Persist(key []byte) (int64, error) {
@ -393,7 +393,7 @@ func (db *DB) Persist(key []byte) (int64, error) {
t := db.kvTx t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, kvExpType, key) n, err := db.rmExpire(t, kvType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -175,7 +175,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
t.Delete(itemKey) t.Delete(itemKey)
size := db.lSetMeta(metaKey, headSeq, tailSeq) size := db.lSetMeta(metaKey, headSeq, tailSeq)
if size == 0 { if size == 0 {
db.rmExpire(t, hExpType, key) db.rmExpire(t, hashType, key)
} }
err = t.Commit() err = t.Commit()
@ -264,7 +264,7 @@ func (db *DB) lExpireAt(key []byte, when int64) (int64, error) {
if llen, err := db.LLen(key); err != nil || llen == 0 { if llen, err := db.LLen(key); err != nil || llen == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, lExpType, key, when) db.expireAt(t, listType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -384,7 +384,7 @@ func (db *DB) LClear(key []byte) (int64, error) {
defer t.Unlock() defer t.Unlock()
num := db.lDelete(t, key) num := db.lDelete(t, key)
db.rmExpire(t, lExpType, key) db.rmExpire(t, listType, key)
err := t.Commit() err := t.Commit()
return num, err return num, err
@ -401,7 +401,7 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
} }
db.lDelete(t, key) db.lDelete(t, key)
db.rmExpire(t, lExpType, key) db.rmExpire(t, listType, key)
} }
@ -423,7 +423,7 @@ func (db *DB) lFlush() (drop int64, err error) {
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, lExpType) err = db.expFlush(t, listType)
err = t.Commit() err = t.Commit()
return return
@ -450,7 +450,7 @@ func (db *DB) LTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(lExpType, key) return db.ttl(listType, key)
} }
func (db *DB) LPersist(key []byte) (int64, error) { func (db *DB) LPersist(key []byte) (int64, error) {
@ -462,7 +462,7 @@ func (db *DB) LPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, lExpType, key) n, err := db.rmExpire(t, listType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -7,30 +7,28 @@ import (
"time" "time"
) )
var mapExpMetaType = map[byte]byte{ var (
kvExpType: kvExpMetaType, errExpMetaKey = errors.New("invalid expire meta key")
lExpType: lExpMetaType, errExpTimeKey = errors.New("invalid expire time key")
hExpType: hExpMetaType, )
zExpType: zExpMetaType,
bExpType: bExpMetaType}
type retireCallback func(*tx, []byte) int64 type retireCallback func(*tx, []byte) int64
type elimination struct { type elimination struct {
db *DB db *DB
exp2Tx map[byte]*tx exp2Tx []*tx
exp2Retire map[byte]retireCallback exp2Retire []retireCallback
} }
var errExpType = errors.New("invalid expire type") var errExpType = errors.New("invalid expire type")
func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte { func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
// format : db[8] / expType[8] / when[64] / key[...] buf := make([]byte, len(key)+11)
buf := make([]byte, len(key)+10)
buf[0] = db.index buf[0] = db.index
buf[1] = expType buf[1] = expTimeType
pos := 2 buf[2] = dataType
pos := 3
binary.BigEndian.PutUint64(buf[pos:], uint64(when)) binary.BigEndian.PutUint64(buf[pos:], uint64(when))
pos += 8 pos += 8
@ -40,43 +38,49 @@ func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte {
return buf return buf
} }
func (db *DB) expEncodeMetaKey(expType byte, key []byte) []byte { func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
// format : db[8] / expType[8] / key[...] buf := make([]byte, len(key)+3)
buf := make([]byte, len(key)+2)
buf[0] = db.index buf[0] = db.index
buf[1] = expType buf[1] = expMetaType
pos := 2 buf[2] = dataType
pos := 3
copy(buf[pos:], key) copy(buf[pos:], key)
return buf return buf
} }
// usage : separate out the original key func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
func (db *DB) expDecodeMetaKey(mk []byte) []byte { if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType {
if len(mk) <= 2 { return 0, nil, errExpMetaKey
// check db ? check type ?
return nil
} }
return mk[2:] return mk[2], mk[3:], nil
} }
func (db *DB) expire(t *tx, expType byte, key []byte, duration int64) { func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
db.expireAt(t, expType, key, time.Now().Unix()+duration) if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType {
return 0, nil, 0, errExpTimeKey
}
return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil
} }
func (db *DB) expireAt(t *tx, expType byte, key []byte, when int64) { func (db *DB) expire(t *tx, dataType byte, key []byte, duration int64) {
mk := db.expEncodeMetaKey(expType+1, key) db.expireAt(t, dataType, key, time.Now().Unix()+duration)
tk := db.expEncodeTimeKey(expType, key, when) }
func (db *DB) expireAt(t *tx, dataType byte, key []byte, when int64) {
mk := db.expEncodeMetaKey(dataType, key)
tk := db.expEncodeTimeKey(dataType, key, when)
t.Put(tk, mk) t.Put(tk, mk)
t.Put(mk, PutInt64(when)) t.Put(mk, PutInt64(when))
} }
func (db *DB) ttl(expType byte, key []byte) (t int64, err error) { func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
mk := db.expEncodeMetaKey(expType+1, key) mk := db.expEncodeMetaKey(dataType, key)
if t, err = Int64(db.db.Get(mk)); err != nil || t == 0 { if t, err = Int64(db.db.Get(mk)); err != nil || t == 0 {
t = -1 t = -1
@ -91,8 +95,8 @@ func (db *DB) ttl(expType byte, key []byte) (t int64, err error) {
return t, err return t, err
} }
func (db *DB) rmExpire(t *tx, expType byte, key []byte) (int64, error) { func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) {
mk := db.expEncodeMetaKey(expType+1, key) mk := db.expEncodeMetaKey(dataType, key)
if v, err := db.db.Get(mk); err != nil { if v, err := db.db.Get(mk); err != nil {
return 0, err return 0, err
} else if v == nil { } else if v == nil {
@ -100,26 +104,23 @@ func (db *DB) rmExpire(t *tx, expType byte, key []byte) (int64, error) {
} else if when, err2 := Int64(v, nil); err2 != nil { } else if when, err2 := Int64(v, nil); err2 != nil {
return 0, err2 return 0, err2
} else { } else {
tk := db.expEncodeTimeKey(expType, key, when) tk := db.expEncodeTimeKey(dataType, key, when)
t.Delete(mk) t.Delete(mk)
t.Delete(tk) t.Delete(tk)
return 1, nil return 1, nil
} }
} }
func (db *DB) expFlush(t *tx, expType byte) (err error) { func (db *DB) expFlush(t *tx, dataType byte) (err error) {
expMetaType, ok := mapExpMetaType[expType] minKey := make([]byte, 3)
if !ok {
return errExpType
}
minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = expType minKey[1] = expTimeType
minKey[2] = dataType
maxKey := make([]byte, 2) maxKey := make([]byte, 3)
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = expMetaType + 1 maxKey[1] = expMetaType
maxKey[2] = dataType + 1
_, err = db.flushRegion(t, minKey, maxKey) _, err = db.flushRegion(t, minKey, maxKey)
err = t.Commit() err = t.Commit()
@ -133,17 +134,17 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
func newEliminator(db *DB) *elimination { func newEliminator(db *DB) *elimination {
eli := new(elimination) eli := new(elimination)
eli.db = db eli.db = db
eli.exp2Tx = make(map[byte]*tx) eli.exp2Tx = make([]*tx, maxDataType)
eli.exp2Retire = make(map[byte]retireCallback) eli.exp2Retire = make([]retireCallback, maxDataType)
return eli return eli
} }
func (eli *elimination) regRetireContext(expType byte, t *tx, onRetire retireCallback) { func (eli *elimination) regRetireContext(dataType byte, t *tx, onRetire retireCallback) {
// todo .. need to ensure exist - mapExpMetaType[expType] // todo .. need to ensure exist - mapExpMetaType[expType]
eli.exp2Tx[expType] = t eli.exp2Tx[dataType] = t
eli.exp2Retire[expType] = onRetire eli.exp2Retire[dataType] = onRetire
} }
// call by outside ... (from *db to another *db) // call by outside ... (from *db to another *db)
@ -151,56 +152,43 @@ func (eli *elimination) active() {
now := time.Now().Unix() now := time.Now().Unix()
db := eli.db db := eli.db
dbGet := db.db.Get dbGet := db.db.Get
expKeys := make([][]byte, 0, 1024)
expTypes := [...]byte{kvExpType, lExpType, hExpType, zExpType}
for _, et := range expTypes { minKey := db.expEncodeTimeKey(noneType, nil, 0)
// search those keys' which expire till the moment maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
minKey := db.expEncodeTimeKey(et, nil, 0)
maxKey := db.expEncodeTimeKey(et, nil, now+1)
expKeys = expKeys[0:0]
t, _ := eli.exp2Tx[et] it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
onRetire, _ := eli.exp2Retire[et] for ; it.Valid(); it.Next() {
if t == nil || onRetire == nil { tk := it.RawKey()
// todo : log error mk := it.RawValue()
dt, k, _, err := db.expDecodeTimeKey(tk)
if err != nil {
continue continue
} }
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) t := eli.exp2Tx[dt]
for it.Valid() { onRetire := eli.exp2Retire[dt]
for i := 1; i < 512 && it.Valid(); i++ { if tk == nil || onRetire == nil {
expKeys = append(expKeys, it.Key(), it.Value()) continue
it.Next() }
t.Lock()
if exp, err := Int64(dbGet(mk)); err == nil {
// check expire again
if exp <= now {
onRetire(t, k)
t.Delete(tk)
t.Delete(mk)
t.Commit()
} }
var cnt int = len(expKeys) }
if cnt == 0 {
continue
}
t.Lock() t.Unlock()
var mk, ek, k []byte }
for i := 0; i < cnt; i += 2 { it.Close()
ek, mk = expKeys[i], expKeys[i+1]
if exp, err := Int64(dbGet(mk)); err == nil {
// check expire again
if exp > now {
continue
}
// delete keys
k = db.expDecodeMetaKey(mk)
onRetire(t, k)
t.Delete(ek)
t.Delete(mk)
}
}
t.Commit()
t.Unlock()
} // end : it
it.Close()
} // end : expType
return return
} }

View File

@ -260,7 +260,7 @@ func (db *DB) zExpireAt(key []byte, when int64) (int64, error) {
if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 { if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, zExpType, key, when) db.expireAt(t, zsetType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -314,7 +314,7 @@ func (db *DB) zIncrSize(t *tx, key []byte, delta int64) (int64, error) {
if size <= 0 { if size <= 0 {
size = 0 size = 0
t.Delete(sk) t.Delete(sk)
db.rmExpire(t, zExpType, key) db.rmExpire(t, zsetType, key)
} else { } else {
t.Put(sk, PutInt64(size)) t.Put(sk, PutInt64(size))
} }
@ -752,7 +752,7 @@ func (db *DB) zFlush() (drop int64, err error) {
} }
it.Close() it.Close()
db.expFlush(t, zExpType) db.expFlush(t, zsetType)
err = t.Commit() err = t.Commit()
return return
@ -818,7 +818,7 @@ func (db *DB) ZTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(zExpType, key) return db.ttl(zsetType, key)
} }
func (db *DB) ZPersist(key []byte) (int64, error) { func (db *DB) ZPersist(key []byte) (int64, error) {
@ -830,7 +830,7 @@ func (db *DB) ZPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, zExpType, key) n, err := db.rmExpire(t, zsetType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }