fix the problem causes from using a global empty segment

This commit is contained in:
silentsai 2014-06-27 11:00:01 +08:00
parent f27bc8f371
commit a404ef26c0
2 changed files with 55 additions and 48 deletions

View File

@ -42,8 +42,6 @@ var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3,
6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5,
6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}
var emptySegment []byte = make([]byte, segByteSize, segByteSize)
var fillSegment []byte = func() []byte { var fillSegment []byte = func() []byte {
data := make([]byte, segByteSize, segByteSize) data := make([]byte, segByteSize, segByteSize)
for i := uint32(0); i < segByteSize; i++ { for i := uint32(0); i < segByteSize; i++ {
@ -391,44 +389,46 @@ func (db *DB) BTail(key []byte) (int32, error) {
return tail, nil return tail, nil
} }
func (db *DB) bSegAnd(a []byte, b []byte, res **[]byte) { func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) {
if a == nil || b == nil { if a == nil || b == nil {
*res = &emptySegment *res = nil
return return
} }
data := **res data := *res
if data == nil { if data == nil {
data = make([]byte, segByteSize, segByteSize) data = make([]byte, segByteSize, segByteSize)
*res = &data *res = data
} }
for i := uint32(0); i < segByteSize; i++ { for i := uint32(0); i < segByteSize; i++ {
data[i] = a[i] & b[i] data[i] = a[i] & b[i]
} }
return
} }
func (db *DB) bSegOr(a []byte, b []byte, res **[]byte) { func (db *DB) bSegOr(a []byte, b []byte, res *[]byte) {
if a == nil || b == nil { if a == nil || b == nil {
if a == nil && b == nil { if a == nil && b == nil {
*res = &emptySegment // should not be here *res = nil
} else if a == nil { } else if a == nil {
*res = &b *res = b
} else { } else {
*res = &a *res = a
} }
return return
} }
data := **res data := *res
if data == nil { if data == nil {
data = make([]byte, segByteSize, segByteSize) data = make([]byte, segByteSize, segByteSize)
*res = &data *res = data
} }
for i := uint32(0); i < segByteSize; i++ { for i := uint32(0); i < segByteSize; i++ {
data[i] = a[i] | b[i] data[i] = a[i] | b[i]
} }
return
} }
func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator {
@ -441,7 +441,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
// return : // return :
// The size of the string stored in the destination key, // The size of the string stored in the destination key,
// that is equal to the size of the longest input string. // that is equal to the size of the longest input string.
var exeOp func([]byte, []byte, **[]byte) var exeOp func([]byte, []byte, *[]byte)
switch op { switch op {
case OPand: case OPand:
exeOp = db.bSegAnd exeOp = db.bSegAnd
@ -460,19 +460,19 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
defer t.Unlock() defer t.Unlock()
var seq, off uint32 var seq, off uint32
var segments = make([][]byte, maxSegCount) // todo : limit 8mb, to config ... var segments = make([][]byte, maxSegCount) // todo : init count also can be optimize while 'and' / 'or'
// init - meta info // init - meta info
var dstSeq, dstOff uint32 var maxDstSeq, maxDstOff uint32
var nowSeq, nowOff int32 var nowSeq, nowOff int32
if nowSeq, nowOff, err = db.bGetMeta(srckeys[0]); err != nil { // todo : if key not exists .... if nowSeq, nowOff, err = db.bGetMeta(srckeys[0]); err != nil { // todo : if key not exists ....
return return
} else if nowSeq < 0 { } else if nowSeq < 0 {
return return // incorrect ...
} else { } else {
dstSeq = uint32(nowSeq) maxDstSeq = uint32(nowSeq)
dstOff = uint32(nowOff) maxDstOff = uint32(nowOff)
} }
// init - data // init - data
@ -489,8 +489,6 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
// operation with following keys // operation with following keys
var keyNum int = len(srckeys) var keyNum int = len(srckeys)
var pSeg *[]byte
for i := 1; i < keyNum; i++ { for i := 1; i < keyNum; i++ {
if nowSeq, nowOff, err = db.bGetMeta(srckeys[i]); err != nil { if nowSeq, nowOff, err = db.bGetMeta(srckeys[i]); err != nil {
return return
@ -501,9 +499,9 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
} else { } else {
seq = uint32(nowSeq) seq = uint32(nowSeq)
off = uint32(nowOff) off = uint32(nowOff)
if seq > dstSeq || (seq == dstSeq && off > dstOff) { if seq > maxDstSeq || (seq == maxDstSeq && off > maxDstOff) {
dstSeq = seq maxDstSeq = seq
dstOff = off maxDstOff = off
} }
} }
@ -530,18 +528,13 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
if op == OPand || op == OPor { if op == OPand || op == OPor {
for ; segIdx < seq; segIdx++ { for ; segIdx < seq; segIdx++ {
if segments[segIdx] != nil { if segments[segIdx] != nil {
pSeg = &segments[segIdx] exeOp(segments[segIdx], nil, &segments[segIdx])
exeOp(segments[segIdx], nil, &pSeg)
segments[segIdx] = *pSeg
} }
} }
} } // else {...}
// else {...}
if !end { if !end {
pSeg = &segments[seq] exeOp(segments[seq], it.Value(), &segments[segIdx])
exeOp(segments[seq], it.Value(), &pSeg)
segments[seq] = *pSeg
segIdx++ segIdx++
} }
} }
@ -553,7 +546,12 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
db.rmExpire(t, bExpType, dstkey) db.rmExpire(t, bExpType, dstkey)
// set data and meta // set data and meta
db.bSetMeta(t, dstkey, dstSeq, dstOff) if op == OPand || op == OPor {
// for i := maxDstSeq; i >= 0; i-- {
//
// }
db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff)
}
var bk []byte var bk []byte
for seq, seg := range segments { for seq, seg := range segments {

View File

@ -133,13 +133,15 @@ func testOp(t *testing.T) {
srcKeys := [][]byte{k0, k1} srcKeys := [][]byte{k0, k1}
/* /*
<seg> - <high> ... <low> <k0>
0 - [10000000] ... [00000001] <seg> - <high> ... <low>
1 - nil 0 - [10000000] ... [00000001]
2 - [00000000] ... [11111111] ... [00000000] 1 - nil
3 - [01010101] ... [10000001] [10101010] 2 - [00000000] ... [11111111] ... [00000000]
4 - [10000000] ... [00000000] 3 - [01010101] ... [10000001] [10101010]
... 4 - [10000000] ... [00000000]
5 - [00000000] ... [00000011] [00000001]
...
*/ */
// (k0 - seg:0) // (k0 - seg:0)
db.BSetBit(k0, int32(0), 1) db.BSetBit(k0, int32(0), 1)
@ -160,20 +162,25 @@ func testOp(t *testing.T) {
for i := uint32(0); i < 8; i += 2 { for i := uint32(0); i < 8; i += 2 {
db.BSetBit(k0, int32(pos+i), 1) db.BSetBit(k0, int32(pos+i), 1)
} }
// (k0 - seg:3) // (k0 - seg:4)
db.BSetBit(k0, int32(segBitSize*5-1), 1) db.BSetBit(k0, int32(segBitSize*5-1), 1)
// (k0 - seg:5)
db.BSetBit(k0, int32(segBitSize*5), 1)
db.BSetBit(k0, int32(segBitSize*5+8), 1)
db.BSetBit(k0, int32(segBitSize*5+9), 1)
/* /*
0 - nil <k1>
1 - [00000001] ... [10000000] 0 - nil
2 - nil 1 - [00000001] ... [10000000]
3 - [10101010] ... [10000001] [01010101] 2 - nil
... 3 - [10101010] ... [10000001] [01010101]
...
*/ */
// (k1 - seg:1) // (k1 - seg:1)
db.BSetBit(k1, int32(segBitSize+7), 1) db.BSetBit(k1, int32(segBitSize+7), 1)
db.BSetBit(k1, int32(segBitSize*2-8), 1) db.BSetBit(k1, int32(segBitSize*2-8), 1)
// (k0 - seg:3) // (k1 - seg:3)
pos = segBitSize * 3 pos = segBitSize * 3
db.BSetBit(k1, int32(pos+8), 1) db.BSetBit(k1, int32(pos+8), 1)
db.BSetBit(k1, int32(pos+15), 1) db.BSetBit(k1, int32(pos+15), 1)
@ -192,7 +199,7 @@ func testOp(t *testing.T) {
// op - or // op - or
db.BOperation(OPor, dstKey, srcKeys...) db.BOperation(OPor, dstKey, srcKeys...)
stdData = make([]byte, 5*segByteSize) stdData = make([]byte, 5*segByteSize+2)
stdData[0] = uint8(0x01) stdData[0] = uint8(0x01)
stdData[segByteSize-1] = uint8(0x80) stdData[segByteSize-1] = uint8(0x80)
stdData[segByteSize] = uint8(0x80) stdData[segByteSize] = uint8(0x80)
@ -202,6 +209,8 @@ func testOp(t *testing.T) {
stdData[segByteSize*3+1] = uint8(0x81) stdData[segByteSize*3+1] = uint8(0x81)
stdData[segByteSize*4-1] = uint8(0xff) stdData[segByteSize*4-1] = uint8(0xff)
stdData[segByteSize*5-1] = uint8(0x80) stdData[segByteSize*5-1] = uint8(0x80)
stdData[segByteSize*5] = uint8(0x01)
stdData[segByteSize*5+1] = uint8(0x03)
data, _ = db.BGet(dstKey) data, _ = db.BGet(dstKey)
if cmpBytes(data, stdData) { if cmpBytes(data, stdData) {
@ -219,7 +228,7 @@ func testOp(t *testing.T) {
// op - and // op - and
db.BOperation(OPand, dstKey, srcKeys...) db.BOperation(OPand, dstKey, srcKeys...)
stdData = make([]byte, 5*segByteSize) stdData = make([]byte, 5*segByteSize+2)
stdData[segByteSize*3+1] = uint8(0x81) stdData[segByteSize*3+1] = uint8(0x81)
data, _ = db.BGet(dstKey) data, _ = db.BGet(dstKey)