diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 0e76d17..cdb4e94 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -1,11 +1,16 @@ package ledis +import ( + "github.com/siddontang/ledisdb/leveldb" +) + func (db *DB) FlushAll() (drop int64, err error) { all := [...](func() (int64, error)){ db.flush, db.lFlush, db.hFlush, - db.zFlush} + db.zFlush, + db.bFlush} for _, flush := range all { if n, e := flush(); e != nil { @@ -25,6 +30,22 @@ func (db *DB) newEliminator() *elimination { eliminator.regRetireContext(lExpType, db.listTx, db.lDelete) eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete) eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete) + eliminator.regRetireContext(bExpType, db.binTx, db.bDelete) return eliminator } + +func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) { + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + drop++ + if drop&1023 == 0 { + if err = t.Commit(); err != nil { + return + } + } + } + it.Close() + return +} diff --git a/ledis/t_bin.go b/ledis/t_bin.go new file mode 100644 index 0000000..ddc7e94 --- /dev/null +++ b/ledis/t_bin.go @@ -0,0 +1,832 @@ +package ledis + +import ( + "encoding/binary" + "errors" + "github.com/siddontang/ledisdb/leveldb" + "sort" + "time" +) + +const ( + OPand uint8 = iota + 1 + OPor + OPxor + OPnot +) + +type BitPair struct { + Pos int32 + Val uint8 +} + +type segBitInfo struct { + Seq uint32 + Off uint32 + Val uint8 +} + +type segBitInfoArray []segBitInfo + +const ( + // byte + segByteWidth uint32 = 9 + segByteSize uint32 = 1 << segByteWidth + + // bit + segBitWidth uint32 = segByteWidth + 3 + segBitSize uint32 = segByteSize << 3 + + maxByteSize uint32 = 8 << 20 + maxSegCount uint32 = maxByteSize / segByteSize + + minSeq uint32 = 0 + maxSeq uint32 = uint32((maxByteSize << 3) - 1) +) + +var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, + 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, + 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, + 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, + 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, + 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, + 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, + 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, + 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} + +var fillBits = [...]uint8{1, 3, 7, 15, 31, 63, 127, 255} + +var emptySegment []byte = make([]byte, segByteSize, segByteSize) + +var fillSegment []byte = func() []byte { + data := make([]byte, segByteSize, segByteSize) + for i := uint32(0); i < segByteSize; i++ { + data[i] = 0xff + } + return data +}() + +var errBinKey = errors.New("invalid bin key") +var errOffset = errors.New("invalid offset") +var errDuplicatePos = errors.New("duplicate bit pos") + +func getBit(sz []byte, offset uint32) uint8 { + index := offset >> 3 + if index >= uint32(len(sz)) { + return 0 // error("overflow") + } + + offset -= index << 3 + return sz[index] >> offset & 1 +} + +func setBit(sz []byte, offset uint32, val uint8) bool { + if val != 1 && val != 0 { + return false // error("invalid val") + } + + index := offset >> 3 + if index >= uint32(len(sz)) { + return false // error("overflow") + } + + offset -= index << 3 + if sz[index]>>offset&1 != val { + sz[index] ^= (1 << offset) + } + return true +} + +func (datas segBitInfoArray) Len() int { + return len(datas) +} + +func (datas segBitInfoArray) Less(i, j int) bool { + res := (datas)[i].Seq < (datas)[j].Seq + if !res && (datas)[i].Seq == (datas)[j].Seq { + res = (datas)[i].Off < (datas)[j].Off + } + return res +} + +func (datas segBitInfoArray) Swap(i, j int) { + datas[i], datas[j] = datas[j], datas[i] +} + +func (db *DB) bEncodeMetaKey(key []byte) []byte { + mk := make([]byte, len(key)+2) + mk[0] = db.index + mk[1] = binMetaType + + copy(mk, key) + return mk +} + +func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { + bk := make([]byte, len(key)+8) + + pos := 0 + bk[pos] = db.index + pos++ + bk[pos] = binType + pos++ + + binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) + pos += 2 + + copy(bk[pos:], key) + pos += len(key) + + binary.BigEndian.PutUint32(bk[pos:], seq) + + return bk +} + +func (db *DB) bDecodeBinKey(bkey []byte) (key []byte, seq uint32, err error) { + if len(bkey) < 8 || bkey[0] != db.index { + err = errBinKey + return + } + + keyLen := binary.BigEndian.Uint16(bkey[2:4]) + if int(keyLen+8) != len(bkey) { + err = errBinKey + return + } + + key = bkey[4 : 4+keyLen] + seq = uint32(binary.BigEndian.Uint32(bkey[4+keyLen:])) + return +} + +func (db *DB) bCapByteSize(seq uint32, off uint32) uint32 { + var offByteSize uint32 = (off >> 3) + 1 + if offByteSize > segByteSize { + offByteSize = segByteSize + } + + return seq<= 0 { + offset += int32((uint32(tailSeq)<> segBitWidth + off &= (segBitSize - 1) + return +} + +func (db *DB) bGetMeta(key []byte) (tailSeq int32, tailOff int32, err error) { + var v []byte + + mk := db.bEncodeMetaKey(key) + v, err = db.db.Get(mk) + if err != nil { + return + } + + if v != nil { + tailSeq = int32(binary.LittleEndian.Uint32(v[0:4])) + tailOff = int32(binary.LittleEndian.Uint32(v[4:8])) + } else { + tailSeq = -1 + tailOff = -1 + } + return +} + +func (db *DB) bSetMeta(t *tx, key []byte, tailSeq uint32, tailOff uint32) { + ek := db.bEncodeMetaKey(key) + + buf := make([]byte, 8) + binary.LittleEndian.PutUint32(buf[0:4], tailSeq) + binary.LittleEndian.PutUint32(buf[4:8], tailOff) + + t.Put(ek, buf) + return +} + +func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { + var ts, to int32 + if ts, to, err = db.bGetMeta(key); err != nil { + return + } else { + tailSeq = uint32(MaxInt32(ts, 0)) + tailOff = uint32(MaxInt32(to, 0)) + } + + if seq > tailSeq || (seq == tailSeq && off > tailOff) { + db.bSetMeta(t, key, seq, off) + tailSeq = seq + tailOff = off + } + return +} + +func (db *DB) bDelete(t *tx, key []byte) (drop int64) { + mk := db.bEncodeMetaKey(key) + t.Delete(mk) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, maxSeq) + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + drop++ + } + it.Close() + + return drop +} + +func (db *DB) bGetSegment(key []byte, seq uint32) ([]byte, []byte, error) { + bk := db.bEncodeBinKey(key, seq) + segment, err := db.db.Get(bk) + if err != nil { + return bk, nil, err + } + return bk, segment, nil +} + +func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { + bk, segment, err := db.bGetSegment(key, seq) + if err == nil && segment == nil { + segment = make([]byte, segByteSize, segByteSize) + } + return bk, segment, err +} + +func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { + sk := db.bEncodeBinKey(key, minSeq) + ek := db.bEncodeBinKey(key, maxSeq) + return db.db.RangeIterator(sk, ek, leveldb.RangeClose) +} + +func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) { + if a == nil || b == nil { + *res = nil + return + } + + data := *res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] & b[i] + } + return +} + +func (db *DB) bSegOr(a []byte, b []byte, res *[]byte) { + if a == nil || b == nil { + if a == nil && b == nil { + *res = nil + } else if a == nil { + *res = b + } else { + *res = a + } + return + } + + data := *res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] | b[i] + } + return +} + +func (db *DB) bSegXor(a []byte, b []byte, res *[]byte) { + if a == nil && b == nil { + *res = fillSegment + return + } + + if a == nil { + a = emptySegment + } + + if b == nil { + b = emptySegment + } + + data := *res + if data == nil { + data = make([]byte, segByteSize, segByteSize) + *res = data + } + + for i := uint32(0); i < segByteSize; i++ { + data[i] = a[i] ^ b[i] + } + + return +} + +func (db *DB) bExpireAt(key []byte, when int64) (int64, error) { + t := db.binTx + t.Lock() + defer t.Unlock() + + if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { + return 0, err + } else { + db.expireAt(t, bExpType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + } + return 1, nil +} + +func (db *DB) BGet(key []byte) (data []byte, err error) { + if err = checkKeySize(key); err != nil { + return + } + + var ts, to int32 + if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 { + return + } + + var tailSeq, tailOff = uint32(ts), uint32(to) + var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff) + data = make([]byte, capByteSize, capByteSize) + + minKey := db.bEncodeBinKey(key, minSeq) + maxKey := db.bEncodeBinKey(key, tailSeq) + it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + + var seq, s, e uint32 + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + data = nil + break + } + + s = seq << segByteWidth + e = MinUInt32(s+segByteSize, capByteSize) + copy(data[s:e], it.Value()) + } + it.Close() + + return +} + +func (db *DB) BDelete(key []byte) (drop int64, err error) { + if err = checkKeySize(key); err != nil { + return + } + + t := db.binTx + t.Lock() + defer t.Unlock() + + drop = db.bDelete(t, key) + db.rmExpire(t, bExpType, key) + + err = t.Commit() + return +} + +func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) { + if err = checkKeySize(key); err != nil { + return + } + + // todo : check offset + var seq, off uint32 + if seq, off, err = db.bParseOffset(key, offset); err != nil { + return 0, err + } + + var bk, segment []byte + if bk, segment, err = db.bAllocateSegment(key, seq); err != nil { + return 0, err + } + + if segment != nil { + ori = getBit(segment, off) + if setBit(segment, off, val) { + t := db.binTx + t.Lock() + + t.Put(bk, segment) + if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil { + err = e + return + } + + err = t.Commit() + t.Unlock() + } + } + + return +} + +func (db *DB) BMSetBit(key []byte, args ...BitPair) (place int64, err error) { + if err = checkKeySize(key); err != nil { + return + } + + // (ps : so as to aviod wasting memory copy while calling db.Get() and batch.Put(), + // here we sequence the params by pos, so that we can merge the execution of + // diff pos setting which targets on the same segment respectively. ) + + // #1 : sequence request data + var argCnt = len(args) + var bitInfos segBitInfoArray = make(segBitInfoArray, argCnt) + var seq, off uint32 + + for i, info := range args { + if seq, off, err = db.bParseOffset(key, info.Pos); err != nil { + return + } + + bitInfos[i].Seq = seq + bitInfos[i].Off = off + bitInfos[i].Val = info.Val + } + + sort.Sort(bitInfos) + + for i := 1; i < argCnt; i++ { + if bitInfos[i].Seq == bitInfos[i-1].Seq && bitInfos[i].Off == bitInfos[i-1].Off { + return 0, errDuplicatePos + } + } + + // #2 : execute bit set in order + t := db.binTx + t.Lock() + defer t.Unlock() + + var curBinKey, curSeg []byte + var curSeq, maxSeq, maxOff uint32 + + for _, info := range bitInfos { + if curSeg != nil && info.Seq != curSeq { + t.Put(curBinKey, curSeg) + curSeg = nil + } + + if curSeg == nil { + curSeq = info.Seq + if curBinKey, curSeg, err = db.bAllocateSegment(key, info.Seq); err != nil { + return + } + + if curSeg == nil { + continue + } + } + + if setBit(curSeg, info.Off, info.Val) { + maxSeq = info.Seq + maxOff = info.Off + place++ + } + } + + if curSeg != nil { + t.Put(curBinKey, curSeg) + } + + // finally, update meta + if place > 0 { + if _, _, err = db.bUpdateMeta(t, key, maxSeq, maxOff); err != nil { + return + } + + err = t.Commit() + } + + return +} + +func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) { + if seq, off, err := db.bParseOffset(key, offset); err != nil { + return 0, err + } else { + _, segment, err := db.bGetSegment(key, seq) + if err != nil { + return 0, err + } + + if segment == nil { + return 0, nil + } else { + return getBit(segment, off), nil + } + } +} + +// func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) { +// section := make([]byte) + +// return +// } + +func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) { + var sseq uint32 + if sseq, _, err = db.bParseOffset(key, start); err != nil { + return + } + + var eseq uint32 + if eseq, _, err = db.bParseOffset(key, end); err != nil { + return + } + + var segment []byte + skey := db.bEncodeBinKey(key, sseq) + ekey := db.bEncodeBinKey(key, eseq) + + it := db.db.RangeIterator(skey, ekey, leveldb.RangeClose) + for ; it.Valid(); it.Next() { + segment = it.Value() + for _, bit := range segment { + cnt += bitsInByte[bit] + } + } + it.Close() + + return +} + +func (db *DB) BTail(key []byte) (int32, error) { + // effective length of data, the highest bit-pos set in history + tailSeq, tailOff, err := db.bGetMeta(key) + if err != nil { + return 0, err + } + + tail := int32(-1) + if tailSeq >= 0 { + tail = int32(uint32(tailSeq)< maxDstSeq || (seq == maxDstSeq && off > maxDstOff) { + maxDstSeq = seq + maxDstOff = off + } + } + + if (op == OPnot && validKeyNum != 1) || + (op != OPnot && validKeyNum < 2) { + return // with not enough existing source key + } + + var srcIdx int + for srcIdx = 0; srcIdx < keyNum; srcIdx++ { + if srckeys[srcIdx] != nil { + break + } + } + + // init - data + var segments = make([][]byte, maxDstSeq+1) + + if op == OPnot { + // ps : + // ( ~num == num ^ 0x11111111 ) + // we init the result segments with all bit set, + // then we can calculate through the way of 'xor'. + + // ahead segments bin format : 1111 ... 1111 + for i := uint32(0); i < maxDstSeq; i++ { + segments[i] = fillSegment + } + + // last segment bin format : 1111..1100..0000 + var tailSeg = make([]byte, segByteSize, segByteSize) + var fillByte = fillBits[7] + var tailSegLen = db.bCapByteSize(uint32(0), maxDstOff) + for i := uint32(0); i < tailSegLen-1; i++ { + tailSeg[i] = fillByte + } + tailSeg[tailSegLen-1] = fillBits[maxDstOff-(tailSegLen-1)<<3] + segments[maxDstSeq] = tailSeg + + } else { + // ps : init segments by data corresponding to the 1st valid source key + it := db.bIterator(srckeys[srcIdx]) + for ; it.Valid(); it.Next() { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + // to do ... + it.Close() + return + } + segments[seq] = it.Value() + } + it.Close() + srcIdx++ + } + + // operation with following keys + var res []byte + for i := srcIdx; i < keyNum; i++ { + if srckeys[i] == nil { + continue + } + + it := db.bIterator(srckeys[i]) + for idx, end := uint32(0), false; !end; it.Next() { + end = !it.Valid() + if !end { + if _, seq, err = db.bDecodeBinKey(it.Key()); err != nil { + // to do ... + it.Close() + return + } + } else { + seq = maxDstSeq + 1 + } + + // todo : + // operation 'and' can be optimize here : + // if seq > max_segments_idx, this loop can be break, + // which can avoid cost from Key() and bDecodeBinKey() + + for ; idx < seq; idx++ { + res = nil + exeOp(segments[idx], nil, &res) + segments[idx] = res + } + + if !end { + res = it.Value() + exeOp(segments[seq], res, &res) + segments[seq] = res + idx++ + } + } + it.Close() + } + + // clear the old data in case + db.bDelete(t, dstkey) + db.rmExpire(t, bExpType, dstkey) + + // set data + db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) + + var bk []byte + for seq, segt := range segments { + if segt != nil { + bk = db.bEncodeBinKey(dstkey, uint32(seq)) + t.Put(bk, segt) + } + } + + err = t.Commit() + if err == nil { + blen = int32(maxDstSeq< 0 { + bytes++ + } + + return make([]byte, bytes, bytes) +} + +func TestBinary(t *testing.T) { + testSimple(t) + testSimpleII(t) + testOpAndOr(t) + testOpXor(t) + testOpNot(t) + testMSetBit(t) +} + +func testSimple(t *testing.T) { + db := getTestDB() + + key := []byte("test_bin") + + if v, _ := db.BGetBit(key, 100); v != 0 { + t.Error(v) + } + + if ori, _ := db.BSetBit(key, 50, 1); ori != 0 { + t.Error(ori) + } + + if v, _ := db.BGetBit(key, 50); v != 1 { + t.Error(v) + } + + if ori, _ := db.BSetBit(key, 50, 0); ori != 1 { + t.Error(ori) + } + + if v, _ := db.BGetBit(key, 50); v != 0 { + t.Error(v) + } + + db.BSetBit(key, 7, 1) + db.BSetBit(key, 8, 1) + db.BSetBit(key, 9, 1) + db.BSetBit(key, 10, 1) + + if sum, _ := db.BCount(key, 0, -1); sum != 4 { + t.Error(sum) + } + + data, _ := db.BGet(key) + if cmpBytes(data, []byte{0x80, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00}) { + t.Error(data) + } + + if tail, _ := db.BTail(key); tail != int32(50) { + t.Error(tail) + } +} + +func testSimpleII(t *testing.T) { + db := getTestDB() + db.FlushAll() + + key := []byte("test_bin_2") + + pos := int32(1234567) + if ori, _ := db.BSetBit(key, pos, 1); ori != 0 { + t.Error(ori) + } + + if v, _ := db.BGetBit(key, pos); v != 1 { + t.Error(v) + } + + if v, _ := db.BGetBit(key, pos-1); v != 0 { + t.Error(v) + } + + if v, _ := db.BGetBit(key, pos+1); v != 0 { + t.Error(v) + } + + if tail, _ := db.BTail(key); tail != pos { + t.Error(tail) + } + + data, _ := db.BGet(key) + stdData := newBytes(pos + 1) + stdData[pos/8] = uint8(1 << (uint(pos) % 8)) + + if cmpBytes(data, stdData) { + t.Error(len(data)) + } + + if drop, _ := db.BDelete(key); drop != 1 { + t.Error(false) + } + + if data, _ := db.BGet(key); data != nil { + t.Error(data) + } +} + +func testOpAndOr(t *testing.T) { + db := getTestDB() + db.FlushAll() + + dstKey := []byte("test_bin_op_and_or") + + k0 := []byte("op_0") + k1 := []byte("op_01") + k2 := []byte("op_10") + k3 := []byte("op_11") + srcKeys := [][]byte{k2, k0, k1, k3} + + /* + + - ... + 0 - [10000000] ... [00000001] + 1 - nil + 2 - [00000000] ... [11111111] ... [00000000] + 3 - [01010101] ... [10000001] [10101010] + 4 - [10000000] ... [00000000] + 5 - [00000000] ... [00000011] [00000001] + ... + */ + // (k0 - seg:0) + db.BSetBit(k0, int32(0), 1) + db.BSetBit(k0, int32(segBitSize-1), 1) + // (k0 - seg:2) + pos := segBitSize*2 + segBitSize/2 + for i := uint32(0); i < 8; i++ { + db.BSetBit(k0, int32(pos+i), 1) + } + // (k0 - seg:3) + pos = segBitSize * 3 + db.BSetBit(k0, int32(pos+8), 1) + db.BSetBit(k0, int32(pos+15), 1) + for i := uint32(1); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + pos = segBitSize*4 - 8 + for i := uint32(0); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + // (k0 - seg:4) + db.BSetBit(k0, int32(segBitSize*5-1), 1) + // (k0 - seg:5) + db.BSetBit(k0, int32(segBitSize*5), 1) + db.BSetBit(k0, int32(segBitSize*5+8), 1) + db.BSetBit(k0, int32(segBitSize*5+9), 1) + + /* + + 0 - nil + 1 - [00000001] ... [10000000] + 2 - nil + 3 - [10101010] ... [10000001] [01010101] + ... + */ + // (k1 - seg:1) + db.BSetBit(k1, int32(segBitSize+7), 1) + db.BSetBit(k1, int32(segBitSize*2-8), 1) + // (k1 - seg:3) + pos = segBitSize * 3 + db.BSetBit(k1, int32(pos+8), 1) + db.BSetBit(k1, int32(pos+15), 1) + for i := uint32(0); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + pos = segBitSize*4 - 8 + for i := uint32(1); i < 8; i += 2 { + db.BSetBit(k0, int32(pos+i), 1) + } + + var stdData []byte + var data []byte + var tmpKeys [][]byte + + // op - or + db.BOperation(OPor, dstKey, srcKeys...) + + stdData = make([]byte, 5*segByteSize+2) + stdData[0] = uint8(0x01) + stdData[segByteSize-1] = uint8(0x80) + stdData[segByteSize] = uint8(0x80) + stdData[segByteSize*2-1] = uint8(0x01) + stdData[segByteSize*2+segByteSize/2] = uint8(0xff) + stdData[segByteSize*3] = uint8(0xff) + stdData[segByteSize*3+1] = uint8(0x81) + stdData[segByteSize*4-1] = uint8(0xff) + stdData[segByteSize*5-1] = uint8(0x80) + stdData[segByteSize*5] = uint8(0x01) + stdData[segByteSize*5+1] = uint8(0x03) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + tmpKeys = [][]byte{k0, dstKey, k1} + db.BOperation(OPor, dstKey, tmpKeys...) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + // op - and + db.BOperation(OPand, dstKey, srcKeys...) + + stdData = make([]byte, 5*segByteSize+2) + stdData[segByteSize*3+1] = uint8(0x81) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + + tmpKeys = [][]byte{k0, dstKey, k1} + db.BOperation(OPand, dstKey, tmpKeys...) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } + +} + +func testOpXor(t *testing.T) { + db := getTestDB() + db.FlushAll() + + dstKey := []byte("test_bin_op_xor") + + k0 := []byte("op_xor_00") + k1 := []byte("op_xor_01") + srcKeys := [][]byte{k0, k1} + + reqs := make([]BitPair, 4) + reqs[0] = BitPair{0, 1} + reqs[1] = BitPair{7, 1} + reqs[2] = BitPair{int32(segBitSize - 1), 1} + reqs[3] = BitPair{int32(segBitSize - 8), 1} + db.BMSetBit(k0, reqs...) + + reqs = make([]BitPair, 2) + reqs[0] = BitPair{7, 1} + reqs[1] = BitPair{int32(segBitSize - 8), 1} + db.BMSetBit(k1, reqs...) + + var stdData []byte + var data []byte + + // op - xor + db.BOperation(OPxor, dstKey, srcKeys...) + + stdData = make([]byte, segByteSize) + stdData[0] = uint8(0x01) + stdData[segByteSize-1] = uint8(0x80) + + data, _ = db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } +} + +func testOpNot(t *testing.T) { + db := getTestDB() + db.FlushAll() + + // intputs + dstKey := []byte("test_bin_op_not") + + k0 := []byte("op_not_0") + srcKeys := [][]byte{k0} + + db.BSetBit(k0, int32(0), 1) + db.BSetBit(k0, int32(7), 1) + + pos := segBitSize + for i := uint32(8); i >= 1; i -= 2 { + db.BSetBit(k0, int32(pos-i), 1) + } + + db.BSetBit(k0, int32(3*segBitSize-10), 1) + + // std + stdData := make([]byte, segByteSize*3-1) + for i, _ := range stdData { + stdData[i] = 255 + } + stdData[0] = uint8(0x7e) + stdData[segByteSize-1] = uint8(0xaa) + stdData[segByteSize*3-2] = uint8(0x3f) + + // op - not + db.BOperation(OPnot, dstKey, srcKeys...) + + data, _ := db.BGet(dstKey) + if cmpBytes(data, stdData) { + t.Fatal(false) + } +} + +func testMSetBit(t *testing.T) { + db := getTestDB() + db.FlushAll() + + key := []byte("test_mset") + + var datas = make([]BitPair, 8) + + // 1st + datas[0] = BitPair{1000, 1} + datas[1] = BitPair{11, 1} + datas[2] = BitPair{10, 1} + datas[3] = BitPair{2, 1} + datas[4] = BitPair{int32(segBitSize - 1), 1} + datas[5] = BitPair{int32(segBitSize), 1} + datas[6] = BitPair{int32(segBitSize + 1), 1} + datas[7] = BitPair{int32(segBitSize) + 10, 0} + + db.BMSetBit(key, datas...) + + if sum, _ := db.BCount(key, 0, -1); sum != 7 { + t.Error(sum) + } + + if tail, _ := db.BTail(key); tail != int32(segBitSize+10) { + t.Error(tail) + } + + // 2nd + datas = make([]BitPair, 5) + + datas[0] = BitPair{1000, 0} + datas[1] = BitPair{int32(segBitSize + 1), 0} + datas[2] = BitPair{int32(segBitSize * 10), 1} + datas[3] = BitPair{10, 0} + datas[4] = BitPair{99, 0} + + db.BMSetBit(key, datas...) + + if sum, _ := db.BCount(key, 0, -1); sum != 7-3+1 { + t.Error(sum) + } + + if tail, _ := db.BTail(key); tail != int32(segBitSize*10) { + t.Error(tail) + } + + return +} diff --git a/ledis/t_hash.go b/ledis/t_hash.go index c3f6f42..68e9bec 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -453,10 +453,6 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) { } func (db *DB) hFlush() (drop int64, err error) { - t := db.kvTx - t.Lock() - defer t.Unlock() - minKey := make([]byte, 2) minKey[0] = db.index minKey[1] = hashType @@ -465,19 +461,12 @@ func (db *DB) hFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = hSizeType + 1 - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) - drop++ - if drop&1023 == 0 { - if err = t.Commit(); err != nil { - return - } - } - } - it.Close() + t := db.kvTx + t.Lock() + defer t.Unlock() - db.expFlush(t, hExpType) + drop, err = db.flushRegion(t, minKey, maxKey) + err = db.expFlush(t, hExpType) err = t.Commit() return diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 7497cc2..30850e8 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -309,28 +309,17 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) { } func (db *DB) flush() (drop int64, err error) { + minKey := db.encodeKVMinKey() + maxKey := db.encodeKVMaxKey() + t := db.kvTx t.Lock() defer t.Unlock() - minKey := db.encodeKVMinKey() - maxKey := db.encodeKVMaxKey() - - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) - drop++ - - if drop&1023 == 0 { - if err = t.Commit(); err != nil { - return - } - } - } - it.Close() + drop, err = db.flushRegion(t, minKey, maxKey) + err = db.expFlush(t, kvExpType) err = t.Commit() - err = db.expFlush(t, kvExpType) return } diff --git a/ledis/t_list.go b/ledis/t_list.go index 60c4bc5..34dea19 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -410,10 +410,6 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) { } func (db *DB) lFlush() (drop int64, err error) { - t := db.listTx - t.Lock() - defer t.Unlock() - minKey := make([]byte, 2) minKey[0] = db.index minKey[1] = listType @@ -422,19 +418,12 @@ func (db *DB) lFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = lMetaType + 1 - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) - drop++ - if drop&1023 == 0 { - if err = t.Commit(); err != nil { - return - } - } - } - it.Close() + t := db.listTx + t.Lock() + defer t.Unlock() - db.expFlush(t, lExpType) + drop, err = db.flushRegion(t, minKey, maxKey) + err = db.expFlush(t, lExpType) err = t.Commit() return diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index d1cd4f6..1750095 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -11,7 +11,8 @@ var mapExpMetaType = map[byte]byte{ kvExpType: kvExpMetaType, lExpType: lExpMetaType, hExpType: hExpMetaType, - zExpType: zExpMetaType} + zExpType: zExpMetaType, + bExpType: bExpMetaType} type retireCallback func(*tx, []byte) int64 @@ -112,8 +113,6 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) { return errExpType } - drop := 0 - minKey := make([]byte, 2) minKey[0] = db.index minKey[1] = expType @@ -122,18 +121,7 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) { maxKey[0] = db.index maxKey[1] = expMetaType + 1 - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) - drop++ - if drop&1023 == 0 { - if err = t.Commit(); err != nil { - return - } - } - } - it.Close() - + _, err = db.flushRegion(t, minKey, maxKey) err = t.Commit() return } @@ -151,6 +139,9 @@ func newEliminator(db *DB) *elimination { } func (eli *elimination) regRetireContext(expType byte, t *tx, onRetire retireCallback) { + + // todo .. need to ensure exist - mapExpMetaType[expType] + eli.exp2Tx[expType] = t eli.exp2Retire[expType] = onRetire } diff --git a/ledis/util.go b/ledis/util.go index 5948df7..d5f24d1 100644 --- a/ledis/util.go +++ b/ledis/util.go @@ -62,6 +62,28 @@ func StrInt64(v []byte, err error) (int64, error) { } } +func StrInt32(v []byte, err error) (int32, error) { + if err != nil { + return 0, err + } else if v == nil { + return 0, nil + } else { + res, err := strconv.ParseInt(String(v), 10, 32) + return int32(res), err + } +} + +func StrInt8(v []byte, err error) (int8, error) { + if err != nil { + return 0, err + } else if v == nil { + return 0, nil + } else { + res, err := strconv.ParseInt(String(v), 10, 32) + return int8(res), err + } +} + func StrPutInt64(v int64) []byte { return strconv.AppendInt(nil, v, 10) } diff --git a/server/cmd_bin.go b/server/cmd_bin.go new file mode 100644 index 0000000..cebb8dc --- /dev/null +++ b/server/cmd_bin.go @@ -0,0 +1,273 @@ +package server + +import ( + "github.com/siddontang/ledisdb/ledis" + "strings" +) + +func bgetCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if v, err := c.db.BGet(args[0]); err != nil { + return err + } else { + c.writeBulk(v) + } + return nil +} + +func bdeleteCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if n, err := c.db.BDelete(args[0]); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil +} + +func bsetbitCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + var err error + var offset int32 + var val int8 + + offset, err = ledis.StrInt32(args[1], nil) + if err != nil { + return err + } + + val, err = ledis.StrInt8(args[2], nil) + if err != nil { + return err + } + + if ori, err := c.db.BSetBit(args[0], offset, uint8(val)); err != nil { + return err + } else { + c.writeInteger(int64(ori)) + } + return nil +} + +func bgetbitCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + offset, err := ledis.StrInt32(args[1], nil) + if err != nil { + return err + } + + if v, err := c.db.BGetBit(args[0], offset); err != nil { + return err + } else { + c.writeInteger(int64(v)) + } + return nil +} + +func bmsetbitCommand(c *client) error { + args := c.args + if len(args) < 3 { + return ErrCmdParams + } + + key := args[0] + if len(args[1:])&1 != 0 { + return ErrCmdParams + } else { + args = args[1:] + } + + var err error + var offset int32 + var val int8 + + pairs := make([]ledis.BitPair, len(args)>>1) + for i := 0; i < len(pairs); i++ { + offset, err = ledis.StrInt32(args[i<<1], nil) + if err != nil { + return err + } + + val, err = ledis.StrInt8(args[i<<1+1], nil) + if err != nil { + return err + } + + pairs[i].Pos = offset + pairs[i].Val = uint8(val) + } + + if place, err := c.db.BMSetBit(key, pairs...); err != nil { + return err + } else { + c.writeInteger(place) + } + return nil +} + +func bcountCommand(c *client) error { + args := c.args + argCnt := len(args) + + if !(argCnt > 0 && argCnt <= 3) { + return ErrCmdParams + } + + // BCount(key []byte, start int32, end int32) (cnt int32, err error) { + + var err error + var start, end int32 = 0, -1 + + if argCnt > 1 { + start, err = ledis.StrInt32(args[1], nil) + if err != nil { + return err + } + } + + if argCnt > 2 { + end, err = ledis.StrInt32(args[2], nil) + if err != nil { + return err + } + } + + if cnt, err := c.db.BCount(args[0], start, end); err != nil { + return err + } else { + c.writeInteger(int64(cnt)) + } + return nil +} + +func boptCommand(c *client) error { + args := c.args + if len(args) < 2 { + return ErrCmdParams + } + + opDesc := strings.ToLower(ledis.String(args[0])) + dstKey := args[1] + srcKeys := args[2:] + + var op uint8 + switch opDesc { + case "and": + op = ledis.OPand + case "or": + op = ledis.OPor + case "xor": + op = ledis.OPxor + case "not": + op = ledis.OPnot + default: + return ErrCmdParams + } + + if blen, err := c.db.BOperation(op, dstKey, srcKeys...); err != nil { + return err + } else { + c.writeInteger(int64(blen)) + } + return nil +} + +func bexpireCommand(c *client) error { + args := c.args + if len(args) == 0 { + return ErrCmdParams + } + + duration, err := ledis.StrInt64(args[1], nil) + if err != nil { + return err + } + + if v, err := c.db.BExpire(args[0], duration); err != nil { + return err + } else { + c.writeInteger(v) + } + + return nil +} + +func bexpireatCommand(c *client) error { + args := c.args + if len(args) == 0 { + return ErrCmdParams + } + + when, err := ledis.StrInt64(args[1], nil) + if err != nil { + return err + } + + if v, err := c.db.BExpireAt(args[0], when); err != nil { + return err + } else { + c.writeInteger(v) + } + + return nil +} + +func bttlCommand(c *client) error { + args := c.args + if len(args) == 0 { + return ErrCmdParams + } + + if v, err := c.db.BTTL(args[0]); err != nil { + return err + } else { + c.writeInteger(v) + } + + return nil +} + +func bpersistCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if n, err := c.db.BPersist(args[0]); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func init() { + register("bget", bgetCommand) + register("bdelete", bdeleteCommand) + register("bsetbit", bsetbitCommand) + register("bgetbit", bgetbitCommand) + register("bmsetbit", bmsetbitCommand) + register("bcount", bcountCommand) + register("bopt", boptCommand) + register("bexpire", bexpireCommand) + register("bexpireat", bexpireatCommand) + register("bttl", bttlCommand) + register("bpersist", bpersistCommand) +} diff --git a/server/cmd_bin_test.go b/server/cmd_bin_test.go new file mode 100644 index 0000000..e49b84c --- /dev/null +++ b/server/cmd_bin_test.go @@ -0,0 +1,223 @@ +package server + +import ( + "github.com/siddontang/ledisdb/client/go/ledis" + "testing" +) + +func TestBin(t *testing.T) { + testBinGetSet(t) + testBinMset(t) + testBinCount(t) + testBinOpt(t) +} + +func testBinGetSet(t *testing.T) { + c := getTestConn() + defer c.Close() + + key := []byte("test_cmd_bin_basic") + + // get / set + if v, err := ledis.Int(c.Do("bgetbit", key, 1024)); err != nil { + t.Fatal(err) + } else if v != 0 { + t.Fatal(v) + } + + if ori, err := ledis.Int(c.Do("bsetbit", key, 1024, 1)); err != nil { + t.Fatal(err) + } else if ori != 0 { + t.Fatal(ori) + } + + if v, err := ledis.Int(c.Do("bgetbit", key, 1024)); err != nil { + t.Fatal(err) + } else if v != 1 { + t.Fatal(v) + } + + // fetch from revert pos + c.Do("bsetbit", key, 1000, 1) + + if v, err := ledis.Int(c.Do("bgetbit", key, -1)); err != nil { + t.Fatal(err) + } else if v != 1 { + t.Fatal(v) + } + + if v, err := ledis.Int(c.Do("bgetbit", key, -25)); err != nil { + t.Fatal(err) + } else if v != 1 { + t.Fatal(v) + } + + // delete + if drop, err := ledis.Int(c.Do("bdelete", key)); err != nil { + t.Fatal(err) + } else if drop != 1 { + t.Fatal(drop) + } + + if drop, err := ledis.Int(c.Do("bdelete", key)); err != nil { + t.Fatal(err) + } else if drop != 0 { + t.Fatal(drop) + } +} + +func testBinMset(t *testing.T) { + c := getTestConn() + defer c.Close() + + key := []byte("test_cmd_bin_mset") + + if n, err := ledis.Int( + c.Do("bmsetbit", key, + 500, 0, + 100, 1, + 200, 1, + 1000, 1, + 900, 0, + 500000, 1, + 600, 0, + 300, 1, + 100000, 1)); err != nil { + t.Fatal(err) + } else if n != 9 { + t.Fatal(n) + } + + fillPos := []int{100, 200, 300, 1000, 100000, 500000} + for _, pos := range fillPos { + v, err := ledis.Int(c.Do("bgetbit", key, pos)) + if err != nil || v != 1 { + t.Fatal(pos) + } + } + + // err + if n, err := ledis.Int( + c.Do("bmsetbit", key, 3, 0, 2, 1, 3, 0, 1, 1)); err == nil || n != 0 { + t.Fatal(n) // duplication on pos + } +} + +func testBinCount(t *testing.T) { + c := getTestConn() + defer c.Close() + + key := []byte("test_cmd_bin_count") + sum := 0 + for pos := 1; pos < 1000000; pos += 10001 { + c.Do("bsetbit", key, pos, 1) + sum++ + } + + if n, err := ledis.Int(c.Do("bcount", key)); err != nil { + t.Fatal(err) + } else if n != sum { + t.Fatal(n) + } +} + +func testBinOpt(t *testing.T) { + c := getTestConn() + defer c.Close() + + dstk := []byte("bin_op_res") + kmiss := []byte("bin_op_miss") + + k0 := []byte("bin_op_0") + k1 := []byte("bin_op_1") + c.Do("bmsetbit", k0, 10, 1, 30, 1, 50, 1, 70, 1, 100, 1) + c.Do("bmsetbit", k1, 20, 1, 40, 1, 60, 1, 80, 1, 100, 1) + + // case - lack of args + // todo ... + + // case - 'not' on inexisting key + if blen, err := ledis.Int( + c.Do("bopt", "not", dstk, kmiss)); err != nil { + t.Fatal(err) + } else if blen != 0 { + t.Fatal(blen) + } + + if v, _ := ledis.String(c.Do("bget", dstk)); v != "" { + t.Fatal(v) + } + + // case - 'and', 'or', 'xor' with inexisting key + opts := []string{"and", "or", "xor"} + for _, op := range opts { + if blen, err := ledis.Int( + c.Do("bopt", op, dstk, kmiss, k0)); err != nil { + t.Fatal(err) + } else if blen != 0 { + t.Fatal(blen) + } + } + + // case - 'and' + if blen, err := ledis.Int( + c.Do("bopt", "and", dstk, k0, k1)); err != nil { + t.Fatal(err) + } else if blen != 100 { + t.Fatal(blen) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 100)); v != 1 { + t.Fatal(v) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 20)); v != 0 { + t.Fatal(v) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 40)); v != 0 { + t.Fatal(v) + } + + // case - 'or' + if blen, err := ledis.Int( + c.Do("bopt", "or", dstk, k0, k1)); err != nil { + t.Fatal(err) + } else if blen != 100 { + t.Fatal(blen) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 100)); v != 1 { + t.Fatal(v) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 20)); v != 1 { + t.Fatal(v) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 40)); v != 1 { + t.Fatal(v) + } + + // case - 'xor' + if blen, err := ledis.Int( + c.Do("bopt", "xor", dstk, k0, k1)); err != nil { + t.Fatal(err) + } else if blen != 100 { + t.Fatal(blen) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 100)); v != 0 { + t.Fatal(v) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 20)); v != 1 { + t.Fatal(v) + } + + if v, _ := ledis.Int(c.Do("bgetbit", dstk, 40)); v != 1 { + t.Fatal(v) + } + + return +}