add expire for the kv type; simplify the listMetaKey format

This commit is contained in:
silentsai 2014-06-03 15:40:10 +08:00
parent 2b106981ba
commit 5c7d885168
10 changed files with 218 additions and 105 deletions

View File

@ -13,6 +13,15 @@ const (
zsetType
zSizeType
zScoreType
kvExpType
kvExpMetaType
lExpType
lExpMetaType
hExpType
hExpMetaType
zExpType
zExpMetaType
)
const (
@ -41,6 +50,7 @@ var (
ErrValueSize = errors.New("invalid value size")
ErrHashFieldSize = errors.New("invalid hash field size")
ErrZSetMemberSize = errors.New("invalid zset member size")
ErrExpireValue = errors.New("invalid expire value")
)
const (

View File

@ -82,6 +82,8 @@ func newDB(l *Ledis, index uint8) *DB {
d.hashTx = newTx(l)
d.zsetTx = newTx(l)
d.activeExpireCycle()
return d
}

View File

@ -1,8 +1,12 @@
package ledis
func (db *DB) Flush() (drop int64, err error) {
import (
"time"
)
func (db *DB) FlushAll() (drop int64, err error) {
all := [...](func() (int64, error)){
db.KvFlush,
db.Flush,
db.LFlush,
db.HFlush,
db.ZFlush}
@ -15,5 +19,18 @@ func (db *DB) Flush() (drop int64, err error) {
drop += n
}
}
return
}
func (db *DB) activeExpireCycle() {
eliminator := newEliminator(db)
eliminator.regRetireContext(kvExpType, db.kvTx, db.delete)
go func() {
for {
eliminator.active()
time.Sleep(1 * time.Second)
}
}()
}

View File

@ -81,7 +81,7 @@ func TestFlush(t *testing.T) {
db1.LPush([]byte("lst"), []byte("a1"), []byte("b2"))
db1.ZAdd([]byte("zset_0"), ScorePair{int64(3), []byte("mc")})
db1.Flush()
db1.FlushAll()
// 0 - existing
if exists, _ := db0.Exists([]byte("a")); exists <= 0 {

View File

@ -424,6 +424,7 @@ func (db *DB) HFlush() (drop int64, err error) {
}
}
}
it.Close()
err = t.Commit()
return
@ -461,6 +462,7 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa
v = append(v, FVPair{Field: f, Value: it.Value()})
}
}
it.Close()
return v, nil
}

View File

@ -3,6 +3,7 @@ package ledis
import (
"errors"
"github.com/siddontang/go-leveldb/leveldb"
"time"
)
type KVPair struct {
@ -83,6 +84,12 @@ func (db *DB) incr(key []byte, delta int64) (int64, error) {
return n, err
}
func (db *DB) delete(t *tx, key []byte) int64 {
key = db.encodeKVKey(key)
t.Delete(key)
return 1
}
func (db *DB) Decr(key []byte) (int64, error) {
return db.incr(key, -1)
}
@ -96,22 +103,22 @@ func (db *DB) Del(keys ...[]byte) (int64, error) {
return 0, nil
}
var err error
for i := range keys {
keys[i] = db.encodeKVKey(keys[i])
codedKeys := make([][]byte, len(keys))
for i, k := range keys {
codedKeys[i] = db.encodeKVKey(k)
}
t := db.kvTx
t.Lock()
defer t.Unlock()
for i := range keys {
t.Delete(keys[i])
for i, k := range keys {
t.Delete(codedKeys[i])
db.rmExpire(t, kvExpType, k)
//todo binlog
}
err = t.Commit()
err := t.Commit()
return int64(len(keys)), err
}
@ -287,7 +294,7 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
return n, err
}
func (db *DB) KvFlush() (drop int64, err error) {
func (db *DB) Flush() (drop int64, err error) {
t := db.kvTx
t.Lock()
defer t.Unlock()
@ -300,14 +307,16 @@ func (db *DB) KvFlush() (drop int64, err error) {
t.Delete(it.Key())
drop++
if drop%1000 == 0 {
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
err = t.Commit()
err = db.expFlush(t, kvExpType)
return
}
@ -344,6 +353,57 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) {
v = append(v, KVPair{Key: key, Value: it.Value()})
}
}
it.Close()
return v, nil
}
func (db *DB) Expire(key []byte, duration int64) (int64, error) {
if duration <= 0 {
return 0, ErrExpireValue
}
t := db.kvTx
t.Lock()
defer t.Unlock()
if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err
} else {
db.expire(t, kvExpType, key, duration)
if err := t.Commit(); err != nil {
return 0, err
} else {
return 1, nil
}
}
}
func (db *DB) ExpireAt(key []byte, when int64) (int64, error) {
if when <= time.Now().Unix() {
return 0, ErrExpireValue
}
t := db.kvTx
t.Lock()
defer t.Unlock()
if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err
} else {
db.expireAt(t, kvExpType, key, when)
if err := t.Commit(); err != nil {
return 0, err
} else {
return 1, nil
}
}
}
func (db *DB) Ttl(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return -1, err
}
return db.ttl(kvExpType, key)
}

View File

@ -29,7 +29,7 @@ func TestDBKV(t *testing.T) {
func TestDBScan(t *testing.T) {
db := getTestDB()
db.Flush()
db.FlushAll()
if v, err := db.Scan(nil, 10, true); err != nil {
t.Fatal(err)

View File

@ -84,62 +84,53 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
var err error
metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, size, err = db.lGetMeta(metaKey)
if err != nil {
return 0, err
}
if len(args) == 0 {
_, _, size, err := db.lGetMeta(metaKey)
return int64(size), err
var pushCnt int = len(args)
if pushCnt == 0 {
return int64(size), nil
}
var seq int32 = headSeq
var delta int32 = -1
if whereSeq == listTailSeq {
seq = tailSeq
delta = 1
}
t := db.listTx
t.Lock()
defer t.Unlock()
if headSeq, tailSeq, size, err = db.lGetMeta(metaKey); err != nil {
return 0, err
}
var delta int32 = 1
var seq int32 = 0
if whereSeq == listHeadSeq {
delta = -1
seq = headSeq
} else {
seq = tailSeq
}
if size == 0 {
headSeq = listInitialSeq
tailSeq = listInitialSeq
seq = headSeq
} else {
// append elements
if size > 0 {
seq += delta
}
for i := 0; i < len(args); i++ {
for i := 0; i < pushCnt; i++ {
ek := db.lEncodeListKey(key, seq+int32(i)*delta)
t.Put(ek, args[i])
//to do add binlog
}
seq += int32(len(args)-1) * delta
seq += int32(pushCnt-1) * delta
if seq <= listMinSeq || seq >= listMaxSeq {
return 0, errListSeq
}
size += int32(len(args))
// set meta info
if whereSeq == listHeadSeq {
headSeq = seq
} else {
tailSeq = seq
}
db.lSetMeta(metaKey, headSeq, tailSeq, size)
db.lSetMeta(metaKey, headSeq, tailSeq)
err = t.Commit()
return int64(size), err
return int64(size) + int64(pushCnt), err
}
func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
@ -153,54 +144,68 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
var headSeq int32
var tailSeq int32
var size int32
var err error
metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, size, err = db.lGetMeta(metaKey)
headSeq, tailSeq, _, err = db.lGetMeta(metaKey)
if err != nil {
return nil, err
}
var seq int32 = 0
var delta int32 = 1
if whereSeq == listHeadSeq {
seq = headSeq
} else {
delta = -1
var value []byte
var seq int32 = headSeq
if whereSeq == listTailSeq {
seq = tailSeq
}
itemKey := db.lEncodeListKey(key, seq)
var value []byte
value, err = db.db.Get(itemKey)
if err != nil {
return nil, err
}
t.Delete(itemKey)
seq += delta
size--
if size <= 0 {
t.Delete(metaKey)
} else {
if whereSeq == listHeadSeq {
headSeq = seq
headSeq += 1
} else {
tailSeq = seq
tailSeq -= 1
}
db.lSetMeta(metaKey, headSeq, tailSeq, size)
}
t.Delete(itemKey)
db.lSetMeta(metaKey, headSeq, tailSeq)
//todo add binlog
err = t.Commit()
return value, err
}
func (db *DB) lDelete(t *tx, key []byte) int64 {
mk := db.lEncodeMetaKey(key)
var headSeq int32
var tailSeq int32
var err error
headSeq, tailSeq, _, err = db.lGetMeta(mk)
if err != nil {
return 0
}
var num int64 = 0
startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq)
it := db.db.Iterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
num++
}
it.Close()
t.Delete(mk)
return num
}
func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) {
ek := db.lEncodeListKey(key, whereSeq)
@ -213,26 +218,35 @@ func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err
if err != nil {
return
} else if v == nil {
headSeq = listInitialSeq
tailSeq = listInitialSeq
size = 0
return
} else {
headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
size = int32(binary.LittleEndian.Uint32(v[8:]))
size = tailSeq - headSeq + 1
}
return
}
func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32, size int32) {
func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32) {
t := db.listTx
buf := make([]byte, 12)
var size int32 = tailSeq - headSeq + 1
if size < 0 {
// todo : log error + panic
} else if size == 0 {
t.Delete(ek)
} else {
buf := make([]byte, 8)
binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
binary.LittleEndian.PutUint32(buf[8:], uint32(size))
//binary.LittleEndian.PutUint32(buf[8:], uint32(size))
t.Put(ek, buf)
}
}
func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
@ -347,37 +361,13 @@ func (db *DB) LClear(key []byte) (int64, error) {
return 0, err
}
mk := db.lEncodeMetaKey(key)
t := db.listTx
t.Lock()
defer t.Unlock()
var headSeq int32
var tailSeq int32
var err error
num := db.lDelete(t, key)
headSeq, tailSeq, _, err = db.lGetMeta(mk)
if err != nil {
return 0, err
}
var num int64 = 0
startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq)
it := db.db.Iterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
num++
}
it.Close()
t.Delete(mk)
err = t.Commit()
err := t.Commit()
return num, err
}
@ -398,13 +388,13 @@ func (db *DB) LFlush() (drop int64, err error) {
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
if drop%1000 == 0 {
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
err = t.Commit()
return

View File

@ -31,9 +31,37 @@ func TestDBList(t *testing.T) {
key := []byte("testdb_list_a")
if n, err := db.RPush(key, []byte("1"), []byte("2")); err != nil {
if n, err := db.RPush(key, []byte("1"), []byte("2"), []byte("3")); err != nil {
t.Fatal(err)
} else if n != 2 {
} else if n != 3 {
t.Fatal(n)
}
if k, err := db.RPop(key); err != nil {
t.Fatal(err)
} else if string(k) != "3" {
t.Fatal(string(k))
}
if k, err := db.LPop(key); err != nil {
t.Fatal(err)
} else if string(k) != "1" {
t.Fatal(string(k))
}
if llen, err := db.LLen(key); err != nil {
t.Fatal(err)
} else if llen != 1 {
t.Fatal(llen)
}
if num, err := db.LClear(key); err != nil {
t.Fatal(err)
} else if num != 1 {
t.Fatal(num)
}
if llen, _ := db.LLen(key); llen != 0 {
t.Fatal(llen)
}
}

View File

@ -502,6 +502,7 @@ func (db *DB) zRemRange(key []byte, min int64, max int64, offset int, limit int)
t.Delete(k)
}
it.Close()
if _, err := db.zIncrSize(key, -num); err != nil {
return 0, err
@ -569,6 +570,7 @@ func (db *DB) zRange(key []byte, min int64, max int64, withScores bool, offset i
v = append(v, StrPutInt64(s))
}
}
it.Close()
if reverse && (offset == 0 && limit < 0) {
v = db.zReverse(v, withScores)
@ -703,6 +705,7 @@ func (db *DB) ZFlush() (drop int64, err error) {
}
}
}
it.Close()
err = t.Commit()
// to do : binlog
@ -743,6 +746,7 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco
v = append(v, ScorePair{Member: m, Score: score})
}
}
it.Close()
return v, nil
}