expire on the bin type is available

This commit is contained in:
silentsai 2014-07-03 11:38:52 +08:00
parent 70018092fe
commit 2dbf643b16
6 changed files with 299 additions and 225 deletions

View File

@ -1,11 +1,16 @@
package ledis package ledis
import (
"github.com/siddontang/ledisdb/leveldb"
)
func (db *DB) FlushAll() (drop int64, err error) { func (db *DB) FlushAll() (drop int64, err error) {
all := [...](func() (int64, error)){ all := [...](func() (int64, error)){
db.flush, db.flush,
db.lFlush, db.lFlush,
db.hFlush, db.hFlush,
db.zFlush} db.zFlush,
db.bFlush}
for _, flush := range all { for _, flush := range all {
if n, e := flush(); e != nil { if n, e := flush(); e != nil {
@ -25,6 +30,22 @@ func (db *DB) newEliminator() *elimination {
eliminator.regRetireContext(lExpType, db.listTx, db.lDelete) eliminator.regRetireContext(lExpType, db.listTx, db.lDelete)
eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete) eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete)
eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete) eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete)
eliminator.regRetireContext(bExpType, db.binTx, db.bDelete)
return eliminator return eliminator
} }
func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) {
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
return
}

View File

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/leveldb"
"time"
) )
const ( const (
@ -13,6 +14,11 @@ const (
OPnot OPnot
) )
type BitPair struct {
Pos int32
Val uint8
}
const ( const (
// byte // byte
segByteWidth uint32 = 9 segByteWidth uint32 = 9
@ -227,57 +233,7 @@ func (db *DB) bDelete(t *tx, key []byte) (drop int64) {
return drop return drop
} }
func (db *DB) BGet(key []byte) (data []byte, err error) { func (db *DB) bGetSegment(key []byte, seq uint32) ([]byte, []byte, error) {
if err = checkKeySize(key); err != nil {
return
}
var ts, to int32
if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 {
return
}
var tailSeq, tailOff = uint32(ts), uint32(to)
var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff)
data = make([]byte, capByteSize, capByteSize)
minKey := db.bEncodeBinKey(key, minSeq)
maxKey := db.bEncodeBinKey(key, tailSeq)
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose)
var seq, s, e uint32
for ; it.Valid(); it.Next() {
if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil {
data = nil
break
}
s = seq << segByteWidth
e = MinUInt32(s+segByteSize, capByteSize)
copy(data[s:e], it.Value())
}
it.Close()
return
}
func (db *DB) BDelete(key []byte) (drop int64, err error) {
if err = checkKeySize(key); err != nil {
return
}
t := db.binTx
t.Lock()
defer t.Unlock()
drop = db.bDelete(t, key)
db.rmExpire(t, bExpType, key)
err = t.Commit()
return
}
func (db *DB) getSegment(key []byte, seq uint32) ([]byte, []byte, error) {
bk := db.bEncodeBinKey(key, seq) bk := db.bEncodeBinKey(key, seq)
segment, err := db.db.Get(bk) segment, err := db.db.Get(bk)
if err != nil { if err != nil {
@ -286,113 +242,14 @@ func (db *DB) getSegment(key []byte, seq uint32) ([]byte, []byte, error) {
return bk, segment, nil return bk, segment, nil
} }
func (db *DB) allocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) {
bk, segment, err := db.getSegment(key, seq) bk, segment, err := db.bGetSegment(key, seq)
if err == nil && segment == nil { if err == nil && segment == nil {
segment = make([]byte, segByteSize, segByteSize) segment = make([]byte, segByteSize, segByteSize)
} }
return bk, segment, err return bk, segment, err
} }
func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) {
if err = checkKeySize(key); err != nil {
return
}
// todo : check offset
var seq, off uint32
if seq, off, err = db.bParseOffset(key, offset); err != nil {
return 0, err
}
var bk, segment []byte
if bk, segment, err = db.allocateSegment(key, seq); err != nil {
return 0, err
}
if segment != nil {
ori = getBit(segment, off)
setBit(segment, off, val)
t := db.binTx
t.Lock()
t.Put(bk, segment)
if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil {
err = e
return
}
err = t.Commit()
t.Unlock()
}
return
}
func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) {
if seq, off, err := db.bParseOffset(key, offset); err != nil {
return 0, err
} else {
_, segment, err := db.getSegment(key, seq)
if err != nil {
return 0, err
}
if segment == nil {
return 0, nil
} else {
return getBit(segment, off), nil
}
}
}
// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) {
// section := make([]byte)
// return
// }
func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) {
var sseq uint32
if sseq, _, err = db.bParseOffset(key, start); err != nil {
return
}
var eseq uint32
if eseq, _, err = db.bParseOffset(key, end); err != nil {
return
}
var segment []byte
skey := db.bEncodeBinKey(key, sseq)
ekey := db.bEncodeBinKey(key, eseq)
it := db.db.RangeIterator(skey, ekey, leveldb.RangeClose)
for ; it.Valid(); it.Next() {
segment = it.Value()
for _, bit := range segment {
cnt += bitsInByte[bit]
}
}
it.Close()
return
}
func (db *DB) BTail(key []byte) (int32, error) {
// effective length of data, the highest bit-pos set in history
tailSeq, tailOff, err := db.bGetMeta(key)
if err != nil {
return 0, err
}
tail := int32(-1)
if tailSeq >= 0 {
tail = int32(uint32(tailSeq)<<segBitWidth | uint32(tailOff))
}
return tail, nil
}
func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator {
sk := db.bEncodeBinKey(key, minSeq) sk := db.bEncodeBinKey(key, minSeq)
ek := db.bEncodeBinKey(key, maxSeq) ek := db.bEncodeBinKey(key, maxSeq)
@ -468,6 +325,198 @@ func (db *DB) bSegXor(a []byte, b []byte, res *[]byte) {
return return
} }
func (db *DB) bExpireAt(key []byte, when int64) (int64, error) {
t := db.binTx
t.Lock()
defer t.Unlock()
if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 {
return 0, err
} else {
db.expireAt(t, bExpType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
}
return 1, nil
}
func (db *DB) BGet(key []byte) (data []byte, err error) {
if err = checkKeySize(key); err != nil {
return
}
var ts, to int32
if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 {
return
}
var tailSeq, tailOff = uint32(ts), uint32(to)
var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff)
data = make([]byte, capByteSize, capByteSize)
minKey := db.bEncodeBinKey(key, minSeq)
maxKey := db.bEncodeBinKey(key, tailSeq)
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose)
var seq, s, e uint32
for ; it.Valid(); it.Next() {
if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil {
data = nil
break
}
s = seq << segByteWidth
e = MinUInt32(s+segByteSize, capByteSize)
copy(data[s:e], it.Value())
}
it.Close()
return
}
func (db *DB) BDelete(key []byte) (drop int64, err error) {
if err = checkKeySize(key); err != nil {
return
}
t := db.binTx
t.Lock()
defer t.Unlock()
drop = db.bDelete(t, key)
db.rmExpire(t, bExpType, key)
err = t.Commit()
return
}
func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) {
if err = checkKeySize(key); err != nil {
return
}
// todo : check offset
var seq, off uint32
if seq, off, err = db.bParseOffset(key, offset); err != nil {
return 0, err
}
var bk, segment []byte
if bk, segment, err = db.bAllocateSegment(key, seq); err != nil {
return 0, err
}
if segment != nil {
ori = getBit(segment, off)
setBit(segment, off, val)
t := db.binTx
t.Lock()
t.Put(bk, segment)
if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil {
err = e
return
}
err = t.Commit()
t.Unlock()
}
return
}
// func (db *DB) BMSetBit(key []byte, args ...BitPair) (err error) {
// if err = checkKeySize(key); err != nil {
// return
// }
// // todo ... optimize by sort the args by 'pos', and merge it ...
// t := db.binTx
// t.Lock()
// defer t.Unlock()
// var bk, segment []byte
// var seq, off uint32
// for _, bitInfo := range args {
// if seq, off, err = db.bParseOffset(key, bitInfo.Pos); err != nil {
// return
// }
// if bk, segment, err = db.bAllocateSegment(key, seq); err != nil {
// return
// }
// }
// return t.Commit()
// }
func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) {
if seq, off, err := db.bParseOffset(key, offset); err != nil {
return 0, err
} else {
_, segment, err := db.bGetSegment(key, seq)
if err != nil {
return 0, err
}
if segment == nil {
return 0, nil
} else {
return getBit(segment, off), nil
}
}
}
// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) {
// section := make([]byte)
// return
// }
func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) {
var sseq uint32
if sseq, _, err = db.bParseOffset(key, start); err != nil {
return
}
var eseq uint32
if eseq, _, err = db.bParseOffset(key, end); err != nil {
return
}
var segment []byte
skey := db.bEncodeBinKey(key, sseq)
ekey := db.bEncodeBinKey(key, eseq)
it := db.db.RangeIterator(skey, ekey, leveldb.RangeClose)
for ; it.Valid(); it.Next() {
segment = it.Value()
for _, bit := range segment {
cnt += bitsInByte[bit]
}
}
it.Close()
return
}
func (db *DB) BTail(key []byte) (int32, error) {
// effective length of data, the highest bit-pos set in history
tailSeq, tailOff, err := db.bGetMeta(key)
if err != nil {
return 0, err
}
tail := int32(-1)
if tailSeq >= 0 {
tail = int32(uint32(tailSeq)<<segBitWidth | uint32(tailOff))
}
return tail, nil
}
func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32, err error) { func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32, err error) {
// return : // return :
// The size of the string stored in the destination key, // The size of the string stored in the destination key,
@ -634,26 +683,76 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
return return
} }
// func (db *DB) BExpire(key []byte, duration int64) (int64, error) { func (db *DB) BExpire(key []byte, duration int64) (int64, error) {
if duration <= 0 {
return 0, errExpireValue
}
// } if err := checkKeySize(key); err != nil {
return -1, err
}
// func (db *DB) BExpireAt(key []byte, when int64) (int64, error) { return db.bExpireAt(key, time.Now().Unix()+duration)
}
// } func (db *DB) BExpireAt(key []byte, when int64) (int64, error) {
if when <= time.Now().Unix() {
return 0, errExpireValue
}
// func (db *DB) BTTL(key []byte) (int64, error) { if err := checkKeySize(key); err != nil {
return -1, err
}
// } return db.bExpireAt(key, when)
}
// func (db *DB) BPersist(key []byte) (int64, error) { func (db *DB) BTTL(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return -1, err
}
// } return db.ttl(bExpType, key)
}
func (db *DB) BPersist(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
t := db.binTx
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, bExpType, key)
if err != nil {
return 0, err
}
err = t.Commit()
return n, err
}
// func (db *DB) BScan(key []byte, count int, inclusive bool) ([]KVPair, error) { // func (db *DB) BScan(key []byte, count int, inclusive bool) ([]KVPair, error) {
// } // }
// func (db *DB) bFlush() (drop int64, err error) { func (db *DB) bFlush() (drop int64, err error) {
t := db.binTx
t.Lock()
defer t.Unlock()
// } minKey := make([]byte, 2)
minKey[0] = db.index
minKey[1] = binType
maxKey := make([]byte, 2)
maxKey[0] = db.index
maxKey[1] = binMetaType + 1
drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, bExpType)
err = t.Commit()
return
}

View File

@ -453,10 +453,6 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
} }
func (db *DB) hFlush() (drop int64, err error) { func (db *DB) hFlush() (drop int64, err error) {
t := db.kvTx
t.Lock()
defer t.Unlock()
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = hashType minKey[1] = hashType
@ -465,19 +461,12 @@ func (db *DB) hFlush() (drop int64, err error) {
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = hSizeType + 1 maxKey[1] = hSizeType + 1
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) t := db.kvTx
for ; it.Valid(); it.Next() { t.Lock()
t.Delete(it.Key()) defer t.Unlock()
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
db.expFlush(t, hExpType) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, hExpType)
err = t.Commit() err = t.Commit()
return return

View File

@ -309,28 +309,17 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
} }
func (db *DB) flush() (drop int64, err error) { func (db *DB) flush() (drop int64, err error) {
minKey := db.encodeKVMinKey()
maxKey := db.encodeKVMaxKey()
t := db.kvTx t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
minKey := db.encodeKVMinKey() drop, err = db.flushRegion(t, minKey, maxKey)
maxKey := db.encodeKVMaxKey() err = db.expFlush(t, kvExpType)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
err = t.Commit() err = t.Commit()
err = db.expFlush(t, kvExpType)
return return
} }

View File

@ -415,10 +415,6 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
} }
func (db *DB) lFlush() (drop int64, err error) { func (db *DB) lFlush() (drop int64, err error) {
t := db.listTx
t.Lock()
defer t.Unlock()
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = listType minKey[1] = listType
@ -427,19 +423,12 @@ func (db *DB) lFlush() (drop int64, err error) {
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = lMetaType + 1 maxKey[1] = lMetaType + 1
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) t := db.listTx
for ; it.Valid(); it.Next() { t.Lock()
t.Delete(it.Key()) defer t.Unlock()
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
db.expFlush(t, lExpType) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, lExpType)
err = t.Commit() err = t.Commit()
return return

View File

@ -112,8 +112,6 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
return errExpType return errExpType
} }
drop := 0
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = expType minKey[1] = expType
@ -122,18 +120,7 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = expMetaType + 1 maxKey[1] = expMetaType + 1
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) _, err = db.flushRegion(t, minKey, maxKey)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
err = t.Commit() err = t.Commit()
return return
} }