mirror of https://github.com/ledisdb/ledisdb.git
Merge branch 'bin-feature' into develop
This commit is contained in:
commit
fd248aaa35
|
@ -1,11 +1,16 @@
|
|||
package ledis
|
||||
|
||||
import (
|
||||
"github.com/siddontang/ledisdb/leveldb"
|
||||
)
|
||||
|
||||
func (db *DB) FlushAll() (drop int64, err error) {
|
||||
all := [...](func() (int64, error)){
|
||||
db.flush,
|
||||
db.lFlush,
|
||||
db.hFlush,
|
||||
db.zFlush}
|
||||
db.zFlush,
|
||||
db.bFlush}
|
||||
|
||||
for _, flush := range all {
|
||||
if n, e := flush(); e != nil {
|
||||
|
@ -25,6 +30,22 @@ func (db *DB) newEliminator() *elimination {
|
|||
eliminator.regRetireContext(lExpType, db.listTx, db.lDelete)
|
||||
eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete)
|
||||
eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete)
|
||||
eliminator.regRetireContext(bExpType, db.binTx, db.bDelete)
|
||||
|
||||
return eliminator
|
||||
}
|
||||
|
||||
func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) {
|
||||
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
drop++
|
||||
if drop&1023 == 0 {
|
||||
if err = t.Commit(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -0,0 +1,841 @@
|
|||
package ledis
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"github.com/siddontang/ledisdb/leveldb"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
OPand uint8 = iota + 1
|
||||
OPor
|
||||
OPxor
|
||||
OPnot
|
||||
)
|
||||
|
||||
type BitPair struct {
|
||||
Pos int32
|
||||
Val uint8
|
||||
}
|
||||
|
||||
type segBitInfo struct {
|
||||
Seq uint32
|
||||
Off uint32
|
||||
Val uint8
|
||||
}
|
||||
|
||||
type segBitInfoArray []segBitInfo
|
||||
|
||||
const (
|
||||
// byte
|
||||
segByteWidth uint32 = 9
|
||||
segByteSize uint32 = 1 << segByteWidth
|
||||
|
||||
// bit
|
||||
segBitWidth uint32 = segByteWidth + 3
|
||||
segBitSize uint32 = segByteSize << 3
|
||||
|
||||
maxByteSize uint32 = 8 << 20
|
||||
maxSegCount uint32 = maxByteSize / segByteSize
|
||||
|
||||
minSeq uint32 = 0
|
||||
maxSeq uint32 = uint32((maxByteSize << 3) - 1)
|
||||
)
|
||||
|
||||
var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3,
|
||||
4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3,
|
||||
3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4,
|
||||
5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4,
|
||||
3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4,
|
||||
5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2,
|
||||
2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3,
|
||||
4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4,
|
||||
5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6,
|
||||
6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5,
|
||||
6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}
|
||||
|
||||
var fillBits = [...]uint8{1, 3, 7, 15, 31, 63, 127, 255}
|
||||
|
||||
var emptySegment []byte = make([]byte, segByteSize, segByteSize)
|
||||
|
||||
var fillSegment []byte = func() []byte {
|
||||
data := make([]byte, segByteSize, segByteSize)
|
||||
for i := uint32(0); i < segByteSize; i++ {
|
||||
data[i] = 0xff
|
||||
}
|
||||
return data
|
||||
}()
|
||||
|
||||
var errBinKey = errors.New("invalid bin key")
|
||||
var errOffset = errors.New("invalid offset")
|
||||
var errDuplicatePos = errors.New("duplicate bit pos")
|
||||
|
||||
func getBit(sz []byte, offset uint32) uint8 {
|
||||
index := offset >> 3
|
||||
if index >= uint32(len(sz)) {
|
||||
return 0 // error("overflow")
|
||||
}
|
||||
|
||||
offset -= index << 3
|
||||
return sz[index] >> offset & 1
|
||||
}
|
||||
|
||||
func setBit(sz []byte, offset uint32, val uint8) bool {
|
||||
if val != 1 && val != 0 {
|
||||
return false // error("invalid val")
|
||||
}
|
||||
|
||||
index := offset >> 3
|
||||
if index >= uint32(len(sz)) {
|
||||
return false // error("overflow")
|
||||
}
|
||||
|
||||
offset -= index << 3
|
||||
if sz[index]>>offset&1 != val {
|
||||
sz[index] ^= (1 << offset)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (datas *segBitInfoArray) Len() int {
|
||||
return len(*datas)
|
||||
}
|
||||
|
||||
func (datas *segBitInfoArray) Less(i, j int) bool {
|
||||
res := (*datas)[i].Seq < (*datas)[j].Seq
|
||||
if !res && (*datas)[i].Seq == (*datas)[j].Seq {
|
||||
res = (*datas)[i].Off < (*datas)[j].Off
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (datas *segBitInfoArray) Swap(i, j int) {
|
||||
(*datas)[i], (*datas)[j] = (*datas)[j], (*datas)[i]
|
||||
}
|
||||
|
||||
func (db *DB) bEncodeMetaKey(key []byte) []byte {
|
||||
mk := make([]byte, len(key)+2)
|
||||
mk[0] = db.index
|
||||
mk[1] = binMetaType
|
||||
|
||||
copy(mk, key)
|
||||
return mk
|
||||
}
|
||||
|
||||
func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte {
|
||||
bk := make([]byte, len(key)+8)
|
||||
|
||||
pos := 0
|
||||
bk[pos] = db.index
|
||||
pos++
|
||||
bk[pos] = binType
|
||||
pos++
|
||||
|
||||
binary.BigEndian.PutUint16(bk[pos:], uint16(len(key)))
|
||||
pos += 2
|
||||
|
||||
copy(bk[pos:], key)
|
||||
pos += len(key)
|
||||
|
||||
binary.BigEndian.PutUint32(bk[pos:], seq)
|
||||
|
||||
return bk
|
||||
}
|
||||
|
||||
func (db *DB) bDecodeBinKey(bkey []byte) (key []byte, seq uint32, err error) {
|
||||
if len(bkey) < 8 || bkey[0] != db.index {
|
||||
err = errBinKey
|
||||
return
|
||||
}
|
||||
|
||||
keyLen := binary.BigEndian.Uint16(bkey[2:4])
|
||||
if int(keyLen+8) != len(bkey) {
|
||||
err = errBinKey
|
||||
return
|
||||
}
|
||||
|
||||
key = bkey[4 : 4+keyLen]
|
||||
seq = uint32(binary.BigEndian.Uint32(bkey[4+keyLen:]))
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bCapByteSize(seq uint32, off uint32) uint32 {
|
||||
var offByteSize uint32 = (off >> 3) + 1
|
||||
if offByteSize > segByteSize {
|
||||
offByteSize = segByteSize
|
||||
}
|
||||
|
||||
return seq<<segByteWidth + offByteSize
|
||||
}
|
||||
|
||||
func (db *DB) bParseOffset(key []byte, offset int32) (seq uint32, off uint32, err error) {
|
||||
if offset < 0 {
|
||||
if tailSeq, tailOff, e := db.bGetMeta(key); e != nil {
|
||||
err = e
|
||||
return
|
||||
} else if tailSeq >= 0 {
|
||||
offset += int32((uint32(tailSeq)<<segBitWidth | uint32(tailOff)) + 1)
|
||||
if offset < 0 {
|
||||
err = errOffset
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
off = uint32(offset)
|
||||
|
||||
seq = off >> segBitWidth
|
||||
off &= (segBitSize - 1)
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bGetMeta(key []byte) (tailSeq int32, tailOff int32, err error) {
|
||||
var v []byte
|
||||
|
||||
mk := db.bEncodeMetaKey(key)
|
||||
v, err = db.db.Get(mk)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if v != nil {
|
||||
tailSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
|
||||
tailOff = int32(binary.LittleEndian.Uint32(v[4:8]))
|
||||
} else {
|
||||
tailSeq = -1
|
||||
tailOff = -1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bSetMeta(t *tx, key []byte, tailSeq uint32, tailOff uint32) {
|
||||
ek := db.bEncodeMetaKey(key)
|
||||
|
||||
// todo ..
|
||||
// if size == 0 // headSeq == tailSeq
|
||||
// t.Delete(ek)
|
||||
|
||||
buf := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint32(buf[0:4], tailSeq)
|
||||
binary.LittleEndian.PutUint32(buf[4:8], tailOff)
|
||||
|
||||
t.Put(ek, buf)
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) {
|
||||
var ts, to int32
|
||||
if ts, to, err = db.bGetMeta(key); err != nil {
|
||||
return
|
||||
} else {
|
||||
tailSeq = uint32(MaxInt32(ts, 0))
|
||||
tailOff = uint32(MaxInt32(to, 0))
|
||||
}
|
||||
|
||||
if seq > tailSeq || (seq == tailSeq && off > tailOff) {
|
||||
db.bSetMeta(t, key, seq, off)
|
||||
tailSeq = seq
|
||||
tailOff = off
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bDelete(t *tx, key []byte) (drop int64) {
|
||||
mk := db.bEncodeMetaKey(key)
|
||||
t.Delete(mk)
|
||||
|
||||
minKey := db.bEncodeBinKey(key, minSeq)
|
||||
maxKey := db.bEncodeBinKey(key, maxSeq)
|
||||
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
drop++
|
||||
}
|
||||
it.Close()
|
||||
|
||||
return drop
|
||||
}
|
||||
|
||||
func (db *DB) bGetSegment(key []byte, seq uint32) ([]byte, []byte, error) {
|
||||
bk := db.bEncodeBinKey(key, seq)
|
||||
segment, err := db.db.Get(bk)
|
||||
if err != nil {
|
||||
return bk, nil, err
|
||||
}
|
||||
return bk, segment, nil
|
||||
}
|
||||
|
||||
func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) {
|
||||
bk, segment, err := db.bGetSegment(key, seq)
|
||||
if err == nil && segment == nil {
|
||||
segment = make([]byte, segByteSize, segByteSize)
|
||||
}
|
||||
return bk, segment, err
|
||||
}
|
||||
|
||||
func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator {
|
||||
sk := db.bEncodeBinKey(key, minSeq)
|
||||
ek := db.bEncodeBinKey(key, maxSeq)
|
||||
return db.db.RangeIterator(sk, ek, leveldb.RangeClose)
|
||||
}
|
||||
|
||||
func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) {
|
||||
if a == nil || b == nil {
|
||||
*res = nil
|
||||
return
|
||||
}
|
||||
|
||||
data := *res
|
||||
if data == nil {
|
||||
data = make([]byte, segByteSize, segByteSize)
|
||||
*res = data
|
||||
}
|
||||
|
||||
for i := uint32(0); i < segByteSize; i++ {
|
||||
data[i] = a[i] & b[i]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bSegOr(a []byte, b []byte, res *[]byte) {
|
||||
if a == nil || b == nil {
|
||||
if a == nil && b == nil {
|
||||
*res = nil
|
||||
} else if a == nil {
|
||||
*res = b
|
||||
} else {
|
||||
*res = a
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
data := *res
|
||||
if data == nil {
|
||||
data = make([]byte, segByteSize, segByteSize)
|
||||
*res = data
|
||||
}
|
||||
|
||||
for i := uint32(0); i < segByteSize; i++ {
|
||||
data[i] = a[i] | b[i]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bSegXor(a []byte, b []byte, res *[]byte) {
|
||||
if a == nil && b == nil {
|
||||
*res = fillSegment
|
||||
return
|
||||
}
|
||||
|
||||
if a == nil {
|
||||
a = emptySegment
|
||||
}
|
||||
|
||||
if b == nil {
|
||||
b = emptySegment
|
||||
}
|
||||
|
||||
data := *res
|
||||
if data == nil {
|
||||
data = make([]byte, segByteSize, segByteSize)
|
||||
*res = data
|
||||
}
|
||||
|
||||
for i := uint32(0); i < segByteSize; i++ {
|
||||
data[i] = a[i] ^ b[i]
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) bExpireAt(key []byte, when int64) (int64, error) {
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 {
|
||||
return 0, err
|
||||
} else {
|
||||
db.expireAt(t, bExpType, key, when)
|
||||
if err := t.Commit(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (db *DB) BGet(key []byte) (data []byte, err error) {
|
||||
if err = checkKeySize(key); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var ts, to int32
|
||||
if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var tailSeq, tailOff = uint32(ts), uint32(to)
|
||||
var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff)
|
||||
data = make([]byte, capByteSize, capByteSize)
|
||||
|
||||
minKey := db.bEncodeBinKey(key, minSeq)
|
||||
maxKey := db.bEncodeBinKey(key, tailSeq)
|
||||
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose)
|
||||
|
||||
var seq, s, e uint32
|
||||
for ; it.Valid(); it.Next() {
|
||||
if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil {
|
||||
data = nil
|
||||
break
|
||||
}
|
||||
|
||||
s = seq << segByteWidth
|
||||
e = MinUInt32(s+segByteSize, capByteSize)
|
||||
copy(data[s:e], it.Value())
|
||||
}
|
||||
it.Close()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) BDelete(key []byte) (drop int64, err error) {
|
||||
if err = checkKeySize(key); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
drop = db.bDelete(t, key)
|
||||
db.rmExpire(t, bExpType, key)
|
||||
|
||||
err = t.Commit()
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) {
|
||||
if err = checkKeySize(key); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// todo : check offset
|
||||
var seq, off uint32
|
||||
if seq, off, err = db.bParseOffset(key, offset); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var bk, segment []byte
|
||||
if bk, segment, err = db.bAllocateSegment(key, seq); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if segment != nil {
|
||||
ori = getBit(segment, off)
|
||||
if setBit(segment, off, val) {
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
|
||||
t.Put(bk, segment)
|
||||
if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil {
|
||||
err = e
|
||||
return
|
||||
}
|
||||
|
||||
err = t.Commit()
|
||||
t.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) BMSetBit(key []byte, args ...BitPair) (place int32, err error) {
|
||||
if err = checkKeySize(key); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// (ps : so as to aviod wasting the mem copy while calling db.Get() and batch.Put(),
|
||||
// here we sequence the request pos in order, so that we can execute diff pos
|
||||
// set on the same segment by justing calling db.Get() and batch.Put() one time
|
||||
// either.)
|
||||
|
||||
// sort the requesting data by set pos
|
||||
var argCnt = len(args)
|
||||
var bInfos segBitInfoArray = make(segBitInfoArray, argCnt)
|
||||
var seq, off uint32
|
||||
|
||||
for i, info := range args {
|
||||
if seq, off, err = db.bParseOffset(key, info.Pos); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
bInfos[i].Seq = seq
|
||||
bInfos[i].Off = off
|
||||
bInfos[i].Val = info.Val
|
||||
}
|
||||
|
||||
sort.Sort(&bInfos)
|
||||
|
||||
for i := 1; i < argCnt; i++ {
|
||||
if bInfos[i].Seq == bInfos[i-1].Seq && bInfos[i].Off == bInfos[i-1].Off {
|
||||
return 0, errDuplicatePos
|
||||
}
|
||||
}
|
||||
|
||||
// set data
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
var bk, segment []byte
|
||||
var lastSeq, maxSeq, maxOff uint32
|
||||
|
||||
for _, info := range bInfos {
|
||||
if segment != nil && info.Seq != lastSeq {
|
||||
t.Put(bk, segment)
|
||||
segment = nil
|
||||
}
|
||||
|
||||
if segment == nil {
|
||||
if bk, segment, err = db.bAllocateSegment(key, info.Seq); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if segment == nil {
|
||||
continue
|
||||
} else {
|
||||
lastSeq = info.Seq
|
||||
}
|
||||
}
|
||||
|
||||
if setBit(segment, info.Off, info.Val) {
|
||||
maxSeq = info.Seq
|
||||
maxOff = info.Off
|
||||
place++
|
||||
}
|
||||
}
|
||||
|
||||
if segment != nil {
|
||||
t.Put(bk, segment)
|
||||
}
|
||||
|
||||
// finally, update meta
|
||||
if place > 0 {
|
||||
if _, _, err = db.bUpdateMeta(t, key, maxSeq, maxOff); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = t.Commit()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) {
|
||||
if seq, off, err := db.bParseOffset(key, offset); err != nil {
|
||||
return 0, err
|
||||
} else {
|
||||
_, segment, err := db.bGetSegment(key, seq)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if segment == nil {
|
||||
return 0, nil
|
||||
} else {
|
||||
return getBit(segment, off), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) {
|
||||
// section := make([]byte)
|
||||
|
||||
// return
|
||||
// }
|
||||
|
||||
func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) {
|
||||
var sseq uint32
|
||||
if sseq, _, err = db.bParseOffset(key, start); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var eseq uint32
|
||||
if eseq, _, err = db.bParseOffset(key, end); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var segment []byte
|
||||
skey := db.bEncodeBinKey(key, sseq)
|
||||
ekey := db.bEncodeBinKey(key, eseq)
|
||||
|
||||
it := db.db.RangeIterator(skey, ekey, leveldb.RangeClose)
|
||||
for ; it.Valid(); it.Next() {
|
||||
segment = it.Value()
|
||||
for _, bit := range segment {
|
||||
cnt += bitsInByte[bit]
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) BTail(key []byte) (int32, error) {
|
||||
// effective length of data, the highest bit-pos set in history
|
||||
tailSeq, tailOff, err := db.bGetMeta(key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
tail := int32(-1)
|
||||
if tailSeq >= 0 {
|
||||
tail = int32(uint32(tailSeq)<<segBitWidth | uint32(tailOff))
|
||||
}
|
||||
|
||||
return tail, nil
|
||||
}
|
||||
|
||||
func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32, err error) {
|
||||
// return :
|
||||
// The size of the string stored in the destination key,
|
||||
// that is equal to the size of the longest input string.
|
||||
blen = -1
|
||||
|
||||
var exeOp func([]byte, []byte, *[]byte)
|
||||
switch op {
|
||||
case OPand:
|
||||
exeOp = db.bSegAnd
|
||||
case OPor:
|
||||
exeOp = db.bSegOr
|
||||
case OPxor, OPnot:
|
||||
exeOp = db.bSegXor
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
if dstkey == nil || srckeys == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if (op == OPnot && len(srckeys) != 1) ||
|
||||
(op != OPnot && len(srckeys) < 2) {
|
||||
// msg("lack of arguments")
|
||||
return
|
||||
}
|
||||
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// init - meta info
|
||||
var maxDstSeq, maxDstOff uint32
|
||||
var nowSeq, nowOff int32
|
||||
|
||||
var keyNum int = len(srckeys)
|
||||
var srcIdx int = 0
|
||||
for ; srcIdx < keyNum; srcIdx++ {
|
||||
if nowSeq, nowOff, err = db.bGetMeta(srckeys[srcIdx]); err != nil { // todo : if key not exists ....
|
||||
return
|
||||
}
|
||||
|
||||
if nowSeq >= 0 {
|
||||
maxDstSeq = uint32(nowSeq)
|
||||
maxDstOff = uint32(nowOff)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if srcIdx == keyNum {
|
||||
// none existing src key
|
||||
return
|
||||
}
|
||||
|
||||
// init - data
|
||||
var seq, off uint32
|
||||
var segments = make([][]byte, maxSegCount) // todo : init count also can be optimize while 'and' / 'or'
|
||||
|
||||
if op == OPnot {
|
||||
for i := uint32(0); i < maxDstSeq; i++ {
|
||||
segments[i] = fillSegment
|
||||
}
|
||||
|
||||
var tailSeg = make([]byte, segByteSize, segByteSize)
|
||||
var fillByte = fillBits[7]
|
||||
var cnt = db.bCapByteSize(uint32(0), maxDstOff)
|
||||
for i := uint32(0); i < cnt-1; i++ {
|
||||
tailSeg[i] = fillByte
|
||||
}
|
||||
tailSeg[cnt-1] = fillBits[maxDstOff-(cnt-1)<<3]
|
||||
segments[maxDstSeq] = tailSeg
|
||||
|
||||
} else {
|
||||
it := db.bIterator(srckeys[srcIdx])
|
||||
for ; it.Valid(); it.Next() {
|
||||
if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil {
|
||||
// to do ...
|
||||
it.Close()
|
||||
return
|
||||
}
|
||||
segments[seq] = it.Value()
|
||||
}
|
||||
it.Close()
|
||||
srcIdx++
|
||||
}
|
||||
|
||||
// operation with following keys
|
||||
var res []byte
|
||||
|
||||
for i := srcIdx; i < keyNum; i++ {
|
||||
if nowSeq, nowOff, err = db.bGetMeta(srckeys[i]); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if nowSeq < 0 {
|
||||
continue
|
||||
} else {
|
||||
seq = uint32(nowSeq)
|
||||
off = uint32(nowOff)
|
||||
if seq > maxDstSeq || (seq == maxDstSeq && off > maxDstOff) {
|
||||
maxDstSeq = seq
|
||||
maxDstOff = off
|
||||
}
|
||||
}
|
||||
|
||||
it := db.bIterator(srckeys[i])
|
||||
for idx, end := uint32(0), false; !end; it.Next() {
|
||||
end = !it.Valid()
|
||||
if !end {
|
||||
if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil {
|
||||
// to do ...
|
||||
it.Close()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
seq = maxSegCount
|
||||
}
|
||||
|
||||
// todo :
|
||||
// operation 'and' can be optimize here :
|
||||
// if seq > max_segments_idx, this loop can be break,
|
||||
// which can avoid cost from Key() and decode key
|
||||
for ; idx < seq; idx++ {
|
||||
res = nil
|
||||
exeOp(segments[idx], nil, &res)
|
||||
segments[idx] = res
|
||||
}
|
||||
|
||||
if !end {
|
||||
res = it.Value()
|
||||
exeOp(segments[seq], res, &res)
|
||||
segments[seq] = res
|
||||
idx++
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
}
|
||||
|
||||
// clear the old data in case
|
||||
db.bDelete(t, dstkey)
|
||||
db.rmExpire(t, bExpType, dstkey)
|
||||
|
||||
// set data and meta
|
||||
db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff)
|
||||
|
||||
// set data
|
||||
var bk []byte
|
||||
for seq, seg := range segments {
|
||||
if seg != nil {
|
||||
// todo:
|
||||
// here can be optimize, like 'updateBinKeySeq',
|
||||
// avoid allocate too many mem
|
||||
bk = db.bEncodeBinKey(dstkey, uint32(seq))
|
||||
t.Put(bk, seg)
|
||||
}
|
||||
}
|
||||
|
||||
err = t.Commit()
|
||||
if err != nil {
|
||||
blen = int32(maxDstSeq<<segBitWidth | maxDstOff)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *DB) BExpire(key []byte, duration int64) (int64, error) {
|
||||
if duration <= 0 {
|
||||
return 0, errExpireValue
|
||||
}
|
||||
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return db.bExpireAt(key, time.Now().Unix()+duration)
|
||||
}
|
||||
|
||||
func (db *DB) BExpireAt(key []byte, when int64) (int64, error) {
|
||||
if when <= time.Now().Unix() {
|
||||
return 0, errExpireValue
|
||||
}
|
||||
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return db.bExpireAt(key, when)
|
||||
}
|
||||
|
||||
func (db *DB) BTTL(key []byte) (int64, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return db.ttl(bExpType, key)
|
||||
}
|
||||
|
||||
func (db *DB) BPersist(key []byte) (int64, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
n, err := db.rmExpire(t, bExpType, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = t.Commit()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// func (db *DB) BScan(key []byte, count int, inclusive bool) ([]KVPair, error) {
|
||||
|
||||
// }
|
||||
|
||||
func (db *DB) bFlush() (drop int64, err error) {
|
||||
t := db.binTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
minKey := make([]byte, 2)
|
||||
minKey[0] = db.index
|
||||
minKey[1] = binType
|
||||
|
||||
maxKey := make([]byte, 2)
|
||||
maxKey[0] = db.index
|
||||
maxKey[1] = binMetaType + 1
|
||||
|
||||
drop, err = db.flushRegion(t, minKey, maxKey)
|
||||
err = db.expFlush(t, bExpType)
|
||||
|
||||
err = t.Commit()
|
||||
return
|
||||
}
|
|
@ -0,0 +1,379 @@
|
|||
package ledis
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func cmpBytes(a []byte, b []byte) bool {
|
||||
if len(a) != len(b) {
|
||||
println("len diff")
|
||||
println(len(a))
|
||||
println(len(b))
|
||||
return true
|
||||
}
|
||||
|
||||
for i, n := range a {
|
||||
if n != b[i] {
|
||||
println("diff !")
|
||||
println(i)
|
||||
println(n)
|
||||
println(b[i])
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func newBytes(bitLen int32) []byte {
|
||||
bytes := bitLen / 8
|
||||
if bitLen%8 > 0 {
|
||||
bytes++
|
||||
}
|
||||
|
||||
return make([]byte, bytes, bytes)
|
||||
}
|
||||
|
||||
func TestBinary(t *testing.T) {
|
||||
testSimple(t)
|
||||
testSimpleII(t)
|
||||
testOpAndOr(t)
|
||||
testOpXor(t)
|
||||
testOpNot(t)
|
||||
testMSetBit(t)
|
||||
}
|
||||
|
||||
func testSimple(t *testing.T) {
|
||||
db := getTestDB()
|
||||
|
||||
key := []byte("test_bin")
|
||||
|
||||
if v, _ := db.BGetBit(key, 100); v != 0 {
|
||||
t.Error(v)
|
||||
}
|
||||
|
||||
if ori, _ := db.BSetBit(key, 50, 1); ori != 0 {
|
||||
t.Error(ori)
|
||||
}
|
||||
|
||||
if v, _ := db.BGetBit(key, 50); v != 1 {
|
||||
t.Error(v)
|
||||
}
|
||||
|
||||
if ori, _ := db.BSetBit(key, 50, 0); ori != 1 {
|
||||
t.Error(ori)
|
||||
}
|
||||
|
||||
if v, _ := db.BGetBit(key, 50); v != 0 {
|
||||
t.Error(v)
|
||||
}
|
||||
|
||||
db.BSetBit(key, 7, 1)
|
||||
db.BSetBit(key, 8, 1)
|
||||
db.BSetBit(key, 9, 1)
|
||||
db.BSetBit(key, 10, 1)
|
||||
|
||||
if sum, _ := db.BCount(key, 0, -1); sum != 4 {
|
||||
t.Error(sum)
|
||||
}
|
||||
|
||||
data, _ := db.BGet(key)
|
||||
if cmpBytes(data, []byte{0x80, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00}) {
|
||||
t.Error(data)
|
||||
}
|
||||
|
||||
if tail, _ := db.BTail(key); tail != int32(50) {
|
||||
t.Error(tail)
|
||||
}
|
||||
}
|
||||
|
||||
func testSimpleII(t *testing.T) {
|
||||
db := getTestDB()
|
||||
db.FlushAll()
|
||||
|
||||
key := []byte("test_bin_2")
|
||||
|
||||
pos := int32(1234567)
|
||||
if ori, _ := db.BSetBit(key, pos, 1); ori != 0 {
|
||||
t.Error(ori)
|
||||
}
|
||||
|
||||
if v, _ := db.BGetBit(key, pos); v != 1 {
|
||||
t.Error(v)
|
||||
}
|
||||
|
||||
if v, _ := db.BGetBit(key, pos-1); v != 0 {
|
||||
t.Error(v)
|
||||
}
|
||||
|
||||
if v, _ := db.BGetBit(key, pos+1); v != 0 {
|
||||
t.Error(v)
|
||||
}
|
||||
|
||||
if tail, _ := db.BTail(key); tail != pos {
|
||||
t.Error(tail)
|
||||
}
|
||||
|
||||
data, _ := db.BGet(key)
|
||||
stdData := newBytes(pos + 1)
|
||||
stdData[pos/8] = uint8(1 << (uint(pos) % 8))
|
||||
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Error(len(data))
|
||||
}
|
||||
|
||||
if drop, _ := db.BDelete(key); drop != 1 {
|
||||
t.Error(false)
|
||||
}
|
||||
|
||||
if data, _ := db.BGet(key); data != nil {
|
||||
t.Error(data)
|
||||
}
|
||||
}
|
||||
|
||||
func testOpAndOr(t *testing.T) {
|
||||
db := getTestDB()
|
||||
db.FlushAll()
|
||||
|
||||
dstKey := []byte("test_bin_op_and_or")
|
||||
|
||||
k0 := []byte("op_0")
|
||||
k1 := []byte("op_01")
|
||||
k2 := []byte("op_10")
|
||||
k3 := []byte("op_11")
|
||||
srcKeys := [][]byte{k2, k0, k1, k3}
|
||||
|
||||
/*
|
||||
<k0>
|
||||
<seg> - <high> ... <low>
|
||||
0 - [10000000] ... [00000001]
|
||||
1 - nil
|
||||
2 - [00000000] ... [11111111] ... [00000000]
|
||||
3 - [01010101] ... [10000001] [10101010]
|
||||
4 - [10000000] ... [00000000]
|
||||
5 - [00000000] ... [00000011] [00000001]
|
||||
...
|
||||
*/
|
||||
// (k0 - seg:0)
|
||||
db.BSetBit(k0, int32(0), 1)
|
||||
db.BSetBit(k0, int32(segBitSize-1), 1)
|
||||
// (k0 - seg:2)
|
||||
pos := segBitSize*2 + segBitSize/2
|
||||
for i := uint32(0); i < 8; i++ {
|
||||
db.BSetBit(k0, int32(pos+i), 1)
|
||||
}
|
||||
// (k0 - seg:3)
|
||||
pos = segBitSize * 3
|
||||
db.BSetBit(k0, int32(pos+8), 1)
|
||||
db.BSetBit(k0, int32(pos+15), 1)
|
||||
for i := uint32(1); i < 8; i += 2 {
|
||||
db.BSetBit(k0, int32(pos+i), 1)
|
||||
}
|
||||
pos = segBitSize*4 - 8
|
||||
for i := uint32(0); i < 8; i += 2 {
|
||||
db.BSetBit(k0, int32(pos+i), 1)
|
||||
}
|
||||
// (k0 - seg:4)
|
||||
db.BSetBit(k0, int32(segBitSize*5-1), 1)
|
||||
// (k0 - seg:5)
|
||||
db.BSetBit(k0, int32(segBitSize*5), 1)
|
||||
db.BSetBit(k0, int32(segBitSize*5+8), 1)
|
||||
db.BSetBit(k0, int32(segBitSize*5+9), 1)
|
||||
|
||||
/*
|
||||
<k1>
|
||||
0 - nil
|
||||
1 - [00000001] ... [10000000]
|
||||
2 - nil
|
||||
3 - [10101010] ... [10000001] [01010101]
|
||||
...
|
||||
*/
|
||||
// (k1 - seg:1)
|
||||
db.BSetBit(k1, int32(segBitSize+7), 1)
|
||||
db.BSetBit(k1, int32(segBitSize*2-8), 1)
|
||||
// (k1 - seg:3)
|
||||
pos = segBitSize * 3
|
||||
db.BSetBit(k1, int32(pos+8), 1)
|
||||
db.BSetBit(k1, int32(pos+15), 1)
|
||||
for i := uint32(0); i < 8; i += 2 {
|
||||
db.BSetBit(k0, int32(pos+i), 1)
|
||||
}
|
||||
pos = segBitSize*4 - 8
|
||||
for i := uint32(1); i < 8; i += 2 {
|
||||
db.BSetBit(k0, int32(pos+i), 1)
|
||||
}
|
||||
|
||||
var stdData []byte
|
||||
var data []byte
|
||||
var tmpKeys [][]byte
|
||||
|
||||
// op - or
|
||||
db.BOperation(OPor, dstKey, srcKeys...)
|
||||
|
||||
stdData = make([]byte, 5*segByteSize+2)
|
||||
stdData[0] = uint8(0x01)
|
||||
stdData[segByteSize-1] = uint8(0x80)
|
||||
stdData[segByteSize] = uint8(0x80)
|
||||
stdData[segByteSize*2-1] = uint8(0x01)
|
||||
stdData[segByteSize*2+segByteSize/2] = uint8(0xff)
|
||||
stdData[segByteSize*3] = uint8(0xff)
|
||||
stdData[segByteSize*3+1] = uint8(0x81)
|
||||
stdData[segByteSize*4-1] = uint8(0xff)
|
||||
stdData[segByteSize*5-1] = uint8(0x80)
|
||||
stdData[segByteSize*5] = uint8(0x01)
|
||||
stdData[segByteSize*5+1] = uint8(0x03)
|
||||
|
||||
data, _ = db.BGet(dstKey)
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Fatal(false)
|
||||
}
|
||||
|
||||
tmpKeys = [][]byte{k0, dstKey, k1}
|
||||
db.BOperation(OPor, dstKey, tmpKeys...)
|
||||
|
||||
data, _ = db.BGet(dstKey)
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Fatal(false)
|
||||
}
|
||||
|
||||
// op - and
|
||||
db.BOperation(OPand, dstKey, srcKeys...)
|
||||
|
||||
stdData = make([]byte, 5*segByteSize+2)
|
||||
stdData[segByteSize*3+1] = uint8(0x81)
|
||||
|
||||
data, _ = db.BGet(dstKey)
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Fatal(false)
|
||||
}
|
||||
|
||||
tmpKeys = [][]byte{k0, dstKey, k1}
|
||||
db.BOperation(OPand, dstKey, tmpKeys...)
|
||||
|
||||
data, _ = db.BGet(dstKey)
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Fatal(false)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testOpXor(t *testing.T) {
|
||||
db := getTestDB()
|
||||
db.FlushAll()
|
||||
|
||||
dstKey := []byte("test_bin_op_xor")
|
||||
|
||||
k0 := []byte("op_2_00")
|
||||
k1 := []byte("op_2_01")
|
||||
srcKeys := [][]byte{k0, k1}
|
||||
|
||||
db.BSetBit(k0, int32(0), 1)
|
||||
db.BSetBit(k0, int32(7), 1)
|
||||
db.BSetBit(k0, int32(segBitSize-1), 1)
|
||||
db.BSetBit(k0, int32(segBitSize-8), 1)
|
||||
|
||||
db.BSetBit(k1, int32(7), 1)
|
||||
db.BSetBit(k1, int32(segBitSize-8), 1)
|
||||
|
||||
var stdData []byte
|
||||
var data []byte
|
||||
|
||||
// op - xor
|
||||
db.BOperation(OPxor, dstKey, srcKeys...)
|
||||
|
||||
stdData = make([]byte, segByteSize)
|
||||
stdData[0] = uint8(0x01)
|
||||
stdData[segByteSize-1] = uint8(0x80)
|
||||
|
||||
data, _ = db.BGet(dstKey)
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Fatal(false)
|
||||
}
|
||||
}
|
||||
|
||||
func testOpNot(t *testing.T) {
|
||||
db := getTestDB()
|
||||
db.FlushAll()
|
||||
|
||||
// intputs
|
||||
dstKey := []byte("test_bin_op_xor")
|
||||
|
||||
k0 := []byte("op_not_0")
|
||||
srcKeys := [][]byte{k0}
|
||||
|
||||
db.BSetBit(k0, int32(0), 1)
|
||||
db.BSetBit(k0, int32(7), 1)
|
||||
|
||||
pos := segBitSize
|
||||
for i := uint32(8); i >= 1; i -= 2 {
|
||||
db.BSetBit(k0, int32(pos-i), 1)
|
||||
}
|
||||
|
||||
db.BSetBit(k0, int32(3*segBitSize-10), 1)
|
||||
|
||||
// std
|
||||
stdData := make([]byte, segByteSize*3-1)
|
||||
for i, _ := range stdData {
|
||||
stdData[i] = 255
|
||||
}
|
||||
stdData[0] = uint8(0x7e)
|
||||
stdData[segByteSize-1] = uint8(0xaa)
|
||||
stdData[segByteSize*3-2] = uint8(0x3f)
|
||||
|
||||
// op - not
|
||||
db.BOperation(OPnot, dstKey, srcKeys...)
|
||||
|
||||
data, _ := db.BGet(dstKey)
|
||||
if cmpBytes(data, stdData) {
|
||||
t.Fatal(false)
|
||||
}
|
||||
}
|
||||
|
||||
func testMSetBit(t *testing.T) {
|
||||
db := getTestDB()
|
||||
db.FlushAll()
|
||||
|
||||
key := []byte("test_mset")
|
||||
|
||||
var datas = make([]BitPair, 8)
|
||||
|
||||
// 1st
|
||||
datas[0] = BitPair{1000, 1}
|
||||
datas[1] = BitPair{11, 1}
|
||||
datas[2] = BitPair{10, 1}
|
||||
datas[3] = BitPair{2, 1}
|
||||
datas[4] = BitPair{int32(segBitSize - 1), 1}
|
||||
datas[5] = BitPair{int32(segBitSize), 1}
|
||||
datas[6] = BitPair{int32(segBitSize + 1), 1}
|
||||
datas[7] = BitPair{int32(segBitSize) + 10, 0}
|
||||
|
||||
db.BMSetBit(key, datas...)
|
||||
|
||||
if sum, _ := db.BCount(key, 0, -1); sum != 7 {
|
||||
t.Error(sum)
|
||||
}
|
||||
|
||||
if tail, _ := db.BTail(key); tail != int32(segBitSize+10) {
|
||||
t.Error(tail)
|
||||
}
|
||||
|
||||
// 2nd
|
||||
datas = make([]BitPair, 5)
|
||||
|
||||
datas[0] = BitPair{1000, 0}
|
||||
datas[1] = BitPair{int32(segBitSize + 1), 0}
|
||||
datas[2] = BitPair{int32(segBitSize * 10), 1}
|
||||
datas[3] = BitPair{10, 0}
|
||||
datas[4] = BitPair{99, 0}
|
||||
|
||||
db.BMSetBit(key, datas...)
|
||||
|
||||
if sum, _ := db.BCount(key, 0, -1); sum != 7-3+1 {
|
||||
t.Error(sum)
|
||||
}
|
||||
|
||||
if tail, _ := db.BTail(key); tail != int32(segBitSize*10) {
|
||||
t.Error(tail)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -453,10 +453,6 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
|
|||
}
|
||||
|
||||
func (db *DB) hFlush() (drop int64, err error) {
|
||||
t := db.kvTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
minKey := make([]byte, 2)
|
||||
minKey[0] = db.index
|
||||
minKey[1] = hashType
|
||||
|
@ -465,19 +461,12 @@ func (db *DB) hFlush() (drop int64, err error) {
|
|||
maxKey[0] = db.index
|
||||
maxKey[1] = hSizeType + 1
|
||||
|
||||
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
drop++
|
||||
if drop&1023 == 0 {
|
||||
if err = t.Commit(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
t := db.kvTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
db.expFlush(t, hExpType)
|
||||
drop, err = db.flushRegion(t, minKey, maxKey)
|
||||
err = db.expFlush(t, hExpType)
|
||||
|
||||
err = t.Commit()
|
||||
return
|
||||
|
|
|
@ -309,28 +309,17 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
|
|||
}
|
||||
|
||||
func (db *DB) flush() (drop int64, err error) {
|
||||
minKey := db.encodeKVMinKey()
|
||||
maxKey := db.encodeKVMaxKey()
|
||||
|
||||
t := db.kvTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
minKey := db.encodeKVMinKey()
|
||||
maxKey := db.encodeKVMaxKey()
|
||||
|
||||
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
drop++
|
||||
|
||||
if drop&1023 == 0 {
|
||||
if err = t.Commit(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
drop, err = db.flushRegion(t, minKey, maxKey)
|
||||
err = db.expFlush(t, kvExpType)
|
||||
|
||||
err = t.Commit()
|
||||
err = db.expFlush(t, kvExpType)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -410,10 +410,6 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
|
|||
}
|
||||
|
||||
func (db *DB) lFlush() (drop int64, err error) {
|
||||
t := db.listTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
minKey := make([]byte, 2)
|
||||
minKey[0] = db.index
|
||||
minKey[1] = listType
|
||||
|
@ -422,19 +418,12 @@ func (db *DB) lFlush() (drop int64, err error) {
|
|||
maxKey[0] = db.index
|
||||
maxKey[1] = lMetaType + 1
|
||||
|
||||
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
drop++
|
||||
if drop&1023 == 0 {
|
||||
if err = t.Commit(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
t := db.listTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
db.expFlush(t, lExpType)
|
||||
drop, err = db.flushRegion(t, minKey, maxKey)
|
||||
err = db.expFlush(t, lExpType)
|
||||
|
||||
err = t.Commit()
|
||||
return
|
||||
|
|
|
@ -112,8 +112,6 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
|
|||
return errExpType
|
||||
}
|
||||
|
||||
drop := 0
|
||||
|
||||
minKey := make([]byte, 2)
|
||||
minKey[0] = db.index
|
||||
minKey[1] = expType
|
||||
|
@ -122,18 +120,7 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
|
|||
maxKey[0] = db.index
|
||||
maxKey[1] = expMetaType + 1
|
||||
|
||||
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
drop++
|
||||
if drop&1023 == 0 {
|
||||
if err = t.Commit(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
|
||||
_, err = db.flushRegion(t, minKey, maxKey)
|
||||
err = t.Commit()
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue