refactor flush

This commit is contained in:
wenyekui 2014-08-16 12:38:49 +08:00
parent 8eb9109e91
commit 928b4a03ae
4 changed files with 65 additions and 35 deletions

View File

@ -1,6 +1,7 @@
package ledis package ledis
import ( import (
"fmt"
"github.com/siddontang/ledisdb/store" "github.com/siddontang/ledisdb/store"
) )
@ -50,3 +51,40 @@ func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err
it.Close() it.Close()
return return
} }
func (db *DB) flushType(t *tx, dataType byte) (drop int64, err error) {
var deleteFunc func(t *tx, key []byte) int64
var metaDataType byte
switch dataType {
case KVType:
deleteFunc = db.delete
metaDataType = KVType
case ListType:
deleteFunc = db.lDelete
metaDataType = LMetaType
case HashType:
deleteFunc = db.hDelete
metaDataType = HSizeType
case ZSetType:
deleteFunc = db.zDelete
metaDataType = ZSizeType
default:
return 0, fmt.Errorf("invalid data type: %s", TypeName[dataType])
}
var keys [][]byte
keys, err = db.scan(metaDataType, nil, 1024, false)
for len(keys) != 0 || err != nil {
for _, key := range keys {
drop += deleteFunc(t, key)
db.rmExpire(t, dataType, key)
}
if err = t.Commit(); err != nil {
return
}
keys, err = db.scan(metaDataType, nil, 1024, false)
}
return
}

View File

@ -917,17 +917,19 @@ func (db *DB) bFlush() (drop int64, err error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
minKey := make([]byte, 2) var startKey []byte = nil
minKey[0] = db.index var keys [][]byte
minKey[1] = BitType for keys, err = db.BScan(startKey, 1024, false); len(keys) != 0 || err != nil; {
for _, key := range keys {
drop += db.bDelete(t, key)
db.rmExpire(t, BitType, key)
}
maxKey := make([]byte, 2) if err = t.Commit(); err != nil {
maxKey[0] = db.index return
maxKey[1] = BitMetaType + 1 }
startKey = keys[len(keys)-1]
drop, err = db.flushRegion(t, minKey, maxKey) keys, err = db.BScan(startKey, 1024, false)
err = db.expFlush(t, BitType) }
err = t.Commit()
return return
} }

View File

@ -453,22 +453,12 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
} }
func (db *DB) hFlush() (drop int64, err error) { func (db *DB) hFlush() (drop int64, err error) {
minKey := make([]byte, 2)
minKey[0] = db.index
minKey[1] = HashType
maxKey := make([]byte, 2)
maxKey[0] = db.index
maxKey[1] = HSizeType + 1
t := db.kvTx t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) // return db.flushAll(t *tx, dataType byte, expDataType byte)
err = db.expFlush(t, HashType)
err = t.Commit()
return return
} }

View File

@ -424,22 +424,22 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
} }
func (db *DB) lFlush() (drop int64, err error) { func (db *DB) lFlush() (drop int64, err error) {
minKey := make([]byte, 2) t := db.binTx
minKey[0] = db.index
minKey[1] = ListType
maxKey := make([]byte, 2)
maxKey[0] = db.index
maxKey[1] = LMetaType + 1
t := db.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) var startKey []byte = nil
err = db.expFlush(t, ListType) var keys [][]byte
for keys, err = db.LScan(startKey, 1024, false); len(keys) != 0 || err != nil; {
err = t.Commit() var num int64
if num, err = db.LMclear(keys...); err != nil {
return
} else {
drop += num
}
startKey = keys[len(keys)-1]
keys, err = db.LScan(startKey, 1024, false)
}
return return
} }