diff --git a/ledis/t_bin.go b/ledis/t_bin.go index bd69505..e31cbee 100644 --- a/ledis/t_bin.go +++ b/ledis/t_bin.go @@ -7,7 +7,7 @@ import ( ) const ( - OPand byte = iota + 1 + OPand uint8 = iota + 1 OPor OPxor OPnot @@ -22,11 +22,35 @@ const ( segBitWidth uint32 = segByteWidth + 3 segBitSize uint32 = segByteSize << 3 + maxByteSize uint32 = 8 << 20 + maxSegCount uint32 = maxByteSize / segByteSize + minSeq uint32 = 0 - maxSeq uint32 = uint32((1 << 31) - 1) + maxSeq uint32 = uint32((maxByteSize << 3) - 1) ) -var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} +var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, + 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, + 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, + 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, + 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, + 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, + 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, + 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, + 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} + +var emptySegment []byte = make([]byte, segByteSize, segByteSize) + +var fillSegment []byte = func() []byte { + data := make([]byte, segByteSize, segByteSize) + for i := uint32(0); i < segByteSize; i++ { + data[i] = 0xff + } + return data +}() var errBinKey = errors.New("invalid bin key") var errOffset = errors.New("invalid offset") @@ -118,8 +142,8 @@ func (db *DB) bParseOffset(key []byte, offset int32) (seq uint32, off uint32, er if tailSeq, tailOff, e := db.bGetMeta(key); e != nil { err = e return - } else { - offset += int32(tailSeq<= 0 { + offset += int32(uint32(tailSeq)< tailSeq || (seq == tailSeq && off > tailOff) { @@ -178,26 +209,39 @@ func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq ui return } -// func (db *DB) bDelete(key []byte) int64 { -// return 0 -// } +func (db *DB) bDelete(t *tx, key []byte) (drop int64) { + mk := db.bEncodeMetaKey(key) + t.Delete(mk) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, maxSeq) + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + drop = 1 + } + it.Close() + + return drop +} func (db *DB) BGet(key []byte) (data []byte, err error) { if err = checkKeySize(key); err != nil { return } - var tailSeq, tailOff uint32 - if tailSeq, tailOff, err = db.bGetMeta(key); err != nil { + var ts, to int32 + if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 { return } + var tailSeq, tailOff = uint32(ts), uint32(to) var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff) data = make([]byte, capByteSize, capByteSize) minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, tailSeq) - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, 0, -1) + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) var seq, s, e uint32 for ; it.Valid(); it.Next() { @@ -207,11 +251,7 @@ func (db *DB) BGet(key []byte) (data []byte, err error) { } s = seq << segByteWidth - e = s + segByteSize - if e > capByteSize { - e = capByteSize - } - + e = MinUInt32(s+segByteSize, capByteSize) copy(data[s:e], it.Value()) } it.Close() @@ -219,9 +259,21 @@ func (db *DB) BGet(key []byte) (data []byte, err error) { return } -// func (db *DB) BDelete(key []byte) (int8, error) { +func (db *DB) BDelete(key []byte) (drop int64, err error) { + if err = checkKeySize(key); err != nil { + return + } -// } + t := db.binTx + t.Lock() + defer t.Unlock() + + drop = db.bDelete(t, key) + db.rmExpire(t, bExpType, key) + + err = t.Commit() + return +} func (db *DB) getSegment(key []byte, seq uint32) ([]byte, []byte, error) { bk := db.bEncodeBinKey(key, seq) @@ -312,7 +364,7 @@ func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) skey := db.bEncodeBinKey(key, sseq) ekey := db.bEncodeBinKey(key, eseq) - it := db.db.RangeLimitIterator(skey, ekey, leveldb.RangeClose, 0, -1) + it := db.db.RangeIterator(skey, ekey, leveldb.RangeClose) for ; it.Valid(); it.Next() { segment = it.Value() for _, bit := range segment { @@ -324,25 +376,199 @@ func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) return } -func (db *DB) BTail(key []byte) (uint32, error) { +func (db *DB) BTail(key []byte) (int32, error) { // effective length of data, the highest bit-pos set in history tailSeq, tailOff, err := db.bGetMeta(key) if err != nil { return 0, err } - return tailSeq<= 0 { + tail = int32(uint32(tailSeq)< OPnot { -// return -// } +func (db *DB) bSegAnd(a []byte, b []byte, res **[]byte) { + if a == nil || b == nil { + *res = &emptySegment + return + } -// } + data := **res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = &data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] & b[i] + } +} + +func (db *DB) bSegOr(a []byte, b []byte, res **[]byte) { + if a == nil || b == nil { + if a == nil && b == nil { + *res = &emptySegment // should not be here + } else if a == nil { + *res = &b + } else { + *res = &a + } + return + } + + data := **res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = &data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] | b[i] + } +} + +func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { + sk := db.bEncodeBinKey(key, minSeq) + ek := db.bEncodeBinKey(key, maxSeq) + return db.db.RangeIterator(sk, ek, leveldb.RangeClose) +} + +func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32, err error) { + // return : + // The size of the string stored in the destination key, + // that is equal to the size of the longest input string. + var exeOp func([]byte, []byte, **[]byte) + switch op { + case OPand: + exeOp = db.bSegAnd + case OPor: + exeOp = db.bSegOr + default: + return + } + + if dstkey == nil || srckeys == nil || len(srckeys) == 0 { + return + } + + t := db.binTx + t.Lock() + defer t.Unlock() + + var seq, off uint32 + var segments = make([][]byte, maxSegCount) // todo : limit 8mb, to config ... + + // init - meta info + var dstSeq, dstOff uint32 + var nowSeq, nowOff int32 + + if nowSeq, nowOff, err = db.bGetMeta(srckeys[0]); err != nil { // todo : if key not exists .... + return + } else if nowSeq < 0 { + return + } else { + dstSeq = uint32(nowSeq) + dstOff = uint32(nowOff) + } + + // init - data + it := db.bIterator(srckeys[0]) + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + // to do ... + it.Close() + return + } + segments[seq] = it.Value() + } + it.Close() + + // operation with following keys + var keyNum int = len(srckeys) + var pSeg *[]byte + + for i := 1; i < keyNum; i++ { + if nowSeq, nowOff, err = db.bGetMeta(srckeys[i]); err != nil { + return + } + + if nowSeq < 0 { + continue + } else { + seq = uint32(nowSeq) + off = uint32(nowOff) + if seq > dstSeq || (seq == dstSeq && off > dstOff) { + dstSeq = seq + dstOff = off + } + } + + it = db.bIterator(srckeys[i]) + segIdx := uint32(0) + + for end := false; !end; it.Next() { + end = !it.Valid() + if !end { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + // to do ... + it.Close() + return + } + } else { + seq = maxSegCount + } + + // todo : + // operation 'and' can be optimize here : + // if seq > max_segments_idx, this loop can be break, + // which can avoid cost from Key() and decode key + + if op == OPand || op == OPor { + for ; segIdx < seq; segIdx++ { + if segments[segIdx] != nil { + pSeg = &segments[segIdx] + exeOp(segments[segIdx], nil, &pSeg) + segments[segIdx] = *pSeg + } + } + } + // else {...} + + if !end { + pSeg = &segments[seq] + exeOp(segments[seq], it.Value(), &pSeg) + segments[seq] = *pSeg + segIdx++ + } + } + it.Close() + } + + // clear the old data in case + db.bDelete(t, dstkey) + db.rmExpire(t, bExpType, dstkey) + + // set data and meta + db.bSetMeta(t, dstkey, dstSeq, dstOff) + + var bk []byte + for seq, seg := range segments { + if seg != nil { + // todo: + // here can be optimize, like 'updateBinKeySeq', + // avoid too many make mem + bk = db.bEncodeBinKey(dstkey, uint32(seq)) + t.Put(bk, seg) + } + } + + err = t.Commit() + return +} // func (db *DB) BExpire(key []byte, duration int64) (int64, error) { @@ -356,6 +582,10 @@ func (db *DB) BTail(key []byte) (uint32, error) { // } +// func (db *DB) BPersist(key []byte) (int64, error) { + +// } + // func (db *DB) BScan(key []byte, count int, inclusive bool) ([]KVPair, error) { // } diff --git a/ledis/t_bin_test.go b/ledis/t_bin_test.go index f853ef3..4aedbbc 100644 --- a/ledis/t_bin_test.go +++ b/ledis/t_bin_test.go @@ -11,6 +11,10 @@ func cmpBytes(a []byte, b []byte) bool { for i, n := range a { if n != b[i] { + println("diff !") + println(i) + println(n) + println(b[i]) return true } } @@ -27,8 +31,9 @@ func newBytes(bitLen int32) []byte { } func TestBinary(t *testing.T) { - //testSimple(t) + testSimple(t) testSimpleII(t) + testOp(t) } func testSimple(t *testing.T) { @@ -70,14 +75,14 @@ func testSimple(t *testing.T) { t.Error(data) } - if tail, _ := db.BTail(key); tail != uint32(50) { + if tail, _ := db.BTail(key); tail != int32(50) { t.Error(tail) } } func testSimpleII(t *testing.T) { db := getTestDB() - key := []byte("test_bin") + key := []byte("test_bin_2") pos := int32(1234567) if ori, _ := db.BSetBit(key, pos, 1); ori != 0 { @@ -96,7 +101,7 @@ func testSimpleII(t *testing.T) { t.Error(v) } - if tail, _ := db.BTail(key); tail != uint32(pos) { + if tail, _ := db.BTail(key); tail != pos { t.Error(tail) } @@ -107,4 +112,127 @@ func testSimpleII(t *testing.T) { if cmpBytes(data, stdData) { t.Error(len(data)) } + + if drop, _ := db.BDelete(key); drop != 1 { + t.Error(false) + } + + if data, _ := db.BGet(key); data != nil { + t.Error(data) + } +} + +func testOp(t *testing.T) { + db := getTestDB() + + dstKey := []byte("test_bin_op") + + k0 := []byte("op_0") + k1 := []byte("op_10") + //k2 := []byte("op_11") + srcKeys := [][]byte{k0, k1} + + /* + - ... + 0 - [10000000] ... [00000001] + 1 - nil + 2 - [00000000] ... [11111111] ... [00000000] + 3 - [01010101] ... [10000001] [10101010] + 4 - [10000000] ... [00000000] + ... + */ + // (k0 - seg:0) + db.BSetBit(k0, int32(0), 1) + db.BSetBit(k0, int32(segBitSize-1), 1) + // (k0 - seg:2) + pos := segBitSize*2 + segBitSize/2 + for i := uint32(0); i < 8; i++ { + db.BSetBit(k0, int32(pos+i), 1) + } + // (k0 - seg:3) + pos = segBitSize * 3 + db.BSetBit(k0, int32(pos+8), 1) + db.BSetBit(k0, int32(pos+15), 1) + for i := uint32(1); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + pos = segBitSize*4 - 8 + for i := uint32(0); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + // (k0 - seg:3) + db.BSetBit(k0, int32(segBitSize*5-1), 1) + + /* + 0 - nil + 1 - [00000001] ... [10000000] + 2 - nil + 3 - [10101010] ... [10000001] [01010101] + ... + */ + // (k1 - seg:1) + db.BSetBit(k1, int32(segBitSize+7), 1) + db.BSetBit(k1, int32(segBitSize*2-8), 1) + // (k0 - seg:3) + pos = segBitSize * 3 + db.BSetBit(k1, int32(pos+8), 1) + db.BSetBit(k1, int32(pos+15), 1) + for i := uint32(0); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + pos = segBitSize*4 - 8 + for i := uint32(1); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + + var stdData []byte + var data []byte + var tmpKeys [][]byte + + // op - or + db.BOperation(OPor, dstKey, srcKeys...) + + stdData = make([]byte, 5*segByteSize) + stdData[0] = uint8(0x01) + stdData[segByteSize-1] = uint8(0x80) + stdData[segByteSize] = uint8(0x80) + stdData[segByteSize*2-1] = uint8(0x01) + stdData[segByteSize*2+segByteSize/2] = uint8(0xff) + stdData[segByteSize*3] = uint8(0xff) + stdData[segByteSize*3+1] = uint8(0x81) + stdData[segByteSize*4-1] = uint8(0xff) + stdData[segByteSize*5-1] = uint8(0x80) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + tmpKeys = [][]byte{k0, dstKey, k1} + db.BOperation(OPor, dstKey, tmpKeys...) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + // op - and + db.BOperation(OPand, dstKey, srcKeys...) + + stdData = make([]byte, 5*segByteSize) + stdData[segByteSize*3+1] = uint8(0x81) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + tmpKeys = [][]byte{k0, dstKey, k1} + db.BOperation(OPand, dstKey, tmpKeys...) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + } diff --git a/ledis/util.go b/ledis/util.go index 9140a52..5948df7 100644 --- a/ledis/util.go +++ b/ledis/util.go @@ -65,3 +65,27 @@ func StrInt64(v []byte, err error) (int64, error) { func StrPutInt64(v int64) []byte { return strconv.AppendInt(nil, v, 10) } + +func MinUInt32(a uint32, b uint32) uint32 { + if a > b { + return b + } else { + return a + } +} + +func MaxUInt32(a uint32, b uint32) uint32 { + if a > b { + return a + } else { + return b + } +} + +func MaxInt32(a int32, b int32) int32 { + if a > b { + return a + } else { + return b + } +}