diff --git a/ledis/t_bin.go b/ledis/t_bin.go new file mode 100644 index 0000000..0071e68 --- /dev/null +++ b/ledis/t_bin.go @@ -0,0 +1,608 @@ +package ledis + +import ( + "encoding/binary" + "errors" + "github.com/siddontang/ledisdb/leveldb" +) + +const ( + OPand uint8 = iota + 1 + OPor + OPxor + OPnot +) + +const ( + // byte + segByteWidth uint32 = 9 + segByteSize uint32 = 1 << segByteWidth + + // bit + segBitWidth uint32 = segByteWidth + 3 + segBitSize uint32 = segByteSize << 3 + + maxByteSize uint32 = 8 << 20 + maxSegCount uint32 = maxByteSize / segByteSize + + minSeq uint32 = 0 + maxSeq uint32 = uint32((maxByteSize << 3) - 1) +) + +var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, + 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, + 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, + 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, + 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, + 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, + 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, + 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, + 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} + +var fillSegment []byte = func() []byte { + data := make([]byte, segByteSize, segByteSize) + for i := uint32(0); i < segByteSize; i++ { + data[i] = 0xff + } + return data +}() + +var errBinKey = errors.New("invalid bin key") +var errOffset = errors.New("invalid offset") + +func getBit(sz []byte, offset uint32) uint8 { + index := offset >> 3 + if index >= uint32(len(sz)) { + return 0 // error("overflow") + } + + offset -= index << 3 + return sz[index] >> offset & 1 +} + +func setBit(sz []byte, offset uint32, val uint8) bool { + if val != 1 && val != 0 { + return false // error("invalid val") + } + + index := offset >> 3 + if index >= uint32(len(sz)) { + return false // error("overflow") + } + + offset -= index << 3 + if sz[index]>>offset&1 != val { + sz[index] ^= (1 << offset) + } + return true +} + +func (db *DB) bEncodeMetaKey(key []byte) []byte { + mk := make([]byte, len(key)+2) + mk[0] = db.index + mk[1] = binMetaType + + copy(mk, key) + return mk +} + +func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { + bk := make([]byte, len(key)+8) + + pos := 0 + bk[pos] = db.index + pos++ + bk[pos] = binType + pos++ + + binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) + pos += 2 + + copy(bk[pos:], key) + pos += len(key) + + binary.BigEndian.PutUint32(bk[pos:], seq) + + return bk +} + +func (db *DB) bDecodeBinKey(bkey []byte) (key []byte, seq uint32, err error) { + if len(bkey) < 8 || bkey[0] != db.index { + err = errBinKey + return + } + + keyLen := binary.BigEndian.Uint16(bkey[2:4]) + if int(keyLen+8) != len(bkey) { + err = errBinKey + return + } + + key = bkey[4 : 4+keyLen] + seq = uint32(binary.BigEndian.Uint32(bkey[4+keyLen:])) + return +} + +func (db *DB) bCapByteSize(seq uint32, off uint32) uint32 { + var offByteSize uint32 = (off >> 3) + 1 + if offByteSize > segByteSize { + offByteSize = segByteSize + } + + return seq<= 0 { + offset += int32(uint32(tailSeq)<> segBitWidth + off &= (segBitSize - 1) + return +} + +func (db *DB) bGetMeta(key []byte) (tailSeq int32, tailOff int32, err error) { + var v []byte + + mk := db.bEncodeMetaKey(key) + v, err = db.db.Get(mk) + if err != nil { + return + } + + if v != nil { + tailSeq = int32(binary.LittleEndian.Uint32(v[0:4])) + tailOff = int32(binary.LittleEndian.Uint32(v[4:8])) + } else { + tailSeq = -1 + tailOff = -1 + } + return +} + +func (db *DB) bSetMeta(t *tx, key []byte, tailSeq uint32, tailOff uint32) { + ek := db.bEncodeMetaKey(key) + + // todo .. + // if size == 0 // headSeq == tailSeq + // t.Delete(ek) + + buf := make([]byte, 8) + binary.LittleEndian.PutUint32(buf[0:4], tailSeq) + binary.LittleEndian.PutUint32(buf[4:8], tailOff) + + t.Put(ek, buf) + return +} + +func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { + var ts, to int32 + if ts, to, err = db.bGetMeta(key); err != nil { + return + } else { + tailSeq = uint32(MaxInt32(ts, 0)) + tailOff = uint32(MaxInt32(to, 0)) + } + + if seq > tailSeq || (seq == tailSeq && off > tailOff) { + db.bSetMeta(t, key, seq, off) + tailSeq = seq + tailOff = off + } + return +} + +func (db *DB) bDelete(t *tx, key []byte) (drop int64) { + mk := db.bEncodeMetaKey(key) + t.Delete(mk) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, maxSeq) + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + drop = 1 + } + it.Close() + + return drop +} + +func (db *DB) BGet(key []byte) (data []byte, err error) { + if err = checkKeySize(key); err != nil { + return + } + + var ts, to int32 + if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 { + return + } + + var tailSeq, tailOff = uint32(ts), uint32(to) + var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff) + data = make([]byte, capByteSize, capByteSize) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, tailSeq) + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + + var seq, s, e uint32 + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + data = nil + break + } + + s = seq << segByteWidth + e = MinUInt32(s+segByteSize, capByteSize) + copy(data[s:e], it.Value()) + } + it.Close() + + return +} + +func (db *DB) BDelete(key []byte) (drop int64, err error) { + if err = checkKeySize(key); err != nil { + return + } + + t := db.binTx + t.Lock() + defer t.Unlock() + + drop = db.bDelete(t, key) + db.rmExpire(t, bExpType, key) + + err = t.Commit() + return +} + +func (db *DB) getSegment(key []byte, seq uint32) ([]byte, []byte, error) { + bk := db.bEncodeBinKey(key, seq) + segment, err := db.db.Get(bk) + if err != nil { + return bk, nil, err + } + return bk, segment, nil +} + +func (db *DB) allocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { + bk, segment, err := db.getSegment(key, seq) + if err == nil && segment == nil { + segment = make([]byte, segByteSize, segByteSize) // can be optimize ? + } + return bk, segment, err +} + +func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) { + if err = checkKeySize(key); err != nil { + return + } + + // todo : check offset + var seq, off uint32 + if seq, off, err = db.bParseOffset(key, offset); err != nil { + return 0, err + } + + var bk, segment []byte + if bk, segment, err = db.allocateSegment(key, seq); err != nil { + return 0, err + } + + if segment != nil { + ori = getBit(segment, off) + setBit(segment, off, val) + + t := db.binTx + t.Lock() + t.Put(bk, segment) + if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil { + err = e + return + } + err = t.Commit() + t.Unlock() + } + + return +} + +func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) { + if seq, off, err := db.bParseOffset(key, offset); err != nil { + return 0, err + } else { + _, segment, err := db.getSegment(key, seq) + if err != nil { + return 0, err + } + + if segment == nil { + return 0, nil + } else { + return getBit(segment, off), nil + } + } +} + +// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) { +// section := make([]byte) + +// return +// } + +func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) { + var sseq uint32 + if sseq, _, err = db.bParseOffset(key, start); err != nil { + return + } + + var eseq uint32 + if eseq, _, err = db.bParseOffset(key, end); err != nil { + return + } + + var segment []byte + skey := db.bEncodeBinKey(key, sseq) + ekey := db.bEncodeBinKey(key, eseq) + + it := db.db.RangeIterator(skey, ekey, leveldb.RangeClose) + for ; it.Valid(); it.Next() { + segment = it.Value() + for _, bit := range segment { + cnt += bitsInByte[bit] + } + } + it.Close() + + return +} + +func (db *DB) BTail(key []byte) (int32, error) { + // effective length of data, the highest bit-pos set in history + tailSeq, tailOff, err := db.bGetMeta(key) + if err != nil { + return 0, err + } + + tail := int32(-1) + if tailSeq >= 0 { + tail = int32(uint32(tailSeq)<= 0 { + maxDstSeq = uint32(nowSeq) + maxDstOff = uint32(nowOff) + break + } + } + + if iniIdx+1 >= keyNum { // todo ... while operation is 'not' + // msg("lack of arguments") + return + } + + // init - data + var seq, off uint32 + var segments = make([][]byte, maxSegCount) // todo : init count also can be optimize while 'and' / 'or' + + it := db.bIterator(srckeys[iniIdx]) + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + // to do ... + it.Close() + return + } + segments[seq] = it.Value() + } + it.Close() + + // operation with following keys + for i := iniIdx + 1; i < keyNum; i++ { + if nowSeq, nowOff, err = db.bGetMeta(srckeys[i]); err != nil { + return + } + + if nowSeq < 0 { + continue + } else { + seq = uint32(nowSeq) + off = uint32(nowOff) + if seq > maxDstSeq || (seq == maxDstSeq && off > maxDstOff) { + maxDstSeq = seq + maxDstOff = off + } + } + + it = db.bIterator(srckeys[i]) + segIdx := uint32(0) + + for end := false; !end; it.Next() { + end = !it.Valid() + if !end { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + // to do ... + it.Close() + return + } + } else { + seq = maxSegCount + } + + // todo : + // operation 'and' can be optimize here : + // if seq > max_segments_idx, this loop can be break, + // which can avoid cost from Key() and decode key + + if op == OPand || op == OPor { + for ; segIdx < seq; segIdx++ { + if segments[segIdx] != nil { + exeOp(segments[segIdx], nil, &segments[segIdx]) + } + } + } // else {...} + + if !end { + exeOp(segments[seq], it.Value(), &segments[segIdx]) + segIdx++ + } + } + it.Close() + } + + // clear the old data in case + db.bDelete(t, dstkey) + db.rmExpire(t, bExpType, dstkey) + + // set data and meta + // if op == OPand || op == OPor { + // for i := maxDstSeq; i >= 0; i-- { + // + // } + // } + db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) + + var bk []byte + for seq, seg := range segments { + if seg != nil { + // todo: + // here can be optimize, like 'updateBinKeySeq', + // avoid too many make mem + bk = db.bEncodeBinKey(dstkey, uint32(seq)) + t.Put(bk, seg) + } + } + + err = t.Commit() + if err != nil { + blen = int32(maxDstSeq< 0 { + bytes++ + } + + return make([]byte, bytes, bytes) +} + +func TestBinary(t *testing.T) { + testSimple(t) + testSimpleII(t) + testOp(t) +} + +func testSimple(t *testing.T) { + db := getTestDB() + + key := []byte("test_bin") + + if v, _ := db.BGetBit(key, 100); v != 0 { + t.Error(v) + } + + if ori, _ := db.BSetBit(key, 50, 1); ori != 0 { + t.Error(ori) + } + + if v, _ := db.BGetBit(key, 50); v != 1 { + t.Error(v) + } + + if ori, _ := db.BSetBit(key, 50, 0); ori != 1 { + t.Error(ori) + } + + if v, _ := db.BGetBit(key, 50); v != 0 { + t.Error(v) + } + + db.BSetBit(key, 7, 1) + db.BSetBit(key, 8, 1) + db.BSetBit(key, 9, 1) + db.BSetBit(key, 10, 1) + + if sum, _ := db.BCount(key, 0, -1); sum != 4 { + t.Error(sum) + } + + data, _ := db.BGet(key) + if cmpBytes(data, []byte{0x80, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00}) { + t.Error(data) + } + + if tail, _ := db.BTail(key); tail != int32(50) { + t.Error(tail) + } +} + +func testSimpleII(t *testing.T) { + db := getTestDB() + db.FlushAll() + + key := []byte("test_bin") + + pos := int32(1234567) + if ori, _ := db.BSetBit(key, pos, 1); ori != 0 { + t.Error(ori) + } + + if v, _ := db.BGetBit(key, pos); v != 1 { + t.Error(v) + } + + if v, _ := db.BGetBit(key, pos-1); v != 0 { + t.Error(v) + } + + if v, _ := db.BGetBit(key, pos+1); v != 0 { + t.Error(v) + } + + if tail, _ := db.BTail(key); tail != pos { + t.Error(tail) + } + + data, _ := db.BGet(key) + stdData := newBytes(pos + 1) + stdData[pos/8] = uint8(1 << (uint(pos) % 8)) + + if cmpBytes(data, stdData) { + t.Error(len(data)) + } + + if drop, _ := db.BDelete(key); drop != 1 { + t.Error(false) + } + + if data, _ := db.BGet(key); data != nil { + t.Error(data) + } +} + +func testOp(t *testing.T) { + db := getTestDB() + db.FlushAll() + + dstKey := []byte("test_bin_op") + + k0 := []byte("op_0") + k1 := []byte("op_01") + k2 := []byte("op_10") + k3 := []byte("op_11") + srcKeys := [][]byte{k2, k0, k1, k3} + + /* + + - ... + 0 - [10000000] ... [00000001] + 1 - nil + 2 - [00000000] ... [11111111] ... [00000000] + 3 - [01010101] ... [10000001] [10101010] + 4 - [10000000] ... [00000000] + 5 - [00000000] ... [00000011] [00000001] + ... + */ + // (k0 - seg:0) + db.BSetBit(k0, int32(0), 1) + db.BSetBit(k0, int32(segBitSize-1), 1) + // (k0 - seg:2) + pos := segBitSize*2 + segBitSize/2 + for i := uint32(0); i < 8; i++ { + db.BSetBit(k0, int32(pos+i), 1) + } + // (k0 - seg:3) + pos = segBitSize * 3 + db.BSetBit(k0, int32(pos+8), 1) + db.BSetBit(k0, int32(pos+15), 1) + for i := uint32(1); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + pos = segBitSize*4 - 8 + for i := uint32(0); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + // (k0 - seg:4) + db.BSetBit(k0, int32(segBitSize*5-1), 1) + // (k0 - seg:5) + db.BSetBit(k0, int32(segBitSize*5), 1) + db.BSetBit(k0, int32(segBitSize*5+8), 1) + db.BSetBit(k0, int32(segBitSize*5+9), 1) + + /* + + 0 - nil + 1 - [00000001] ... [10000000] + 2 - nil + 3 - [10101010] ... [10000001] [01010101] + ... + */ + // (k1 - seg:1) + db.BSetBit(k1, int32(segBitSize+7), 1) + db.BSetBit(k1, int32(segBitSize*2-8), 1) + // (k1 - seg:3) + pos = segBitSize * 3 + db.BSetBit(k1, int32(pos+8), 1) + db.BSetBit(k1, int32(pos+15), 1) + for i := uint32(0); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + pos = segBitSize*4 - 8 + for i := uint32(1); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + + var stdData []byte + var data []byte + var tmpKeys [][]byte + + // op - or + db.BOperation(OPor, dstKey, srcKeys...) + + stdData = make([]byte, 5*segByteSize+2) + stdData[0] = uint8(0x01) + stdData[segByteSize-1] = uint8(0x80) + stdData[segByteSize] = uint8(0x80) + stdData[segByteSize*2-1] = uint8(0x01) + stdData[segByteSize*2+segByteSize/2] = uint8(0xff) + stdData[segByteSize*3] = uint8(0xff) + stdData[segByteSize*3+1] = uint8(0x81) + stdData[segByteSize*4-1] = uint8(0xff) + stdData[segByteSize*5-1] = uint8(0x80) + stdData[segByteSize*5] = uint8(0x01) + stdData[segByteSize*5+1] = uint8(0x03) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + tmpKeys = [][]byte{k0, dstKey, k1} + db.BOperation(OPor, dstKey, tmpKeys...) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + // op - and + db.BOperation(OPand, dstKey, srcKeys...) + + stdData = make([]byte, 5*segByteSize+2) + stdData[segByteSize*3+1] = uint8(0x81) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + tmpKeys = [][]byte{k0, dstKey, k1} + db.BOperation(OPand, dstKey, tmpKeys...) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + +}