add hscan and zscan

This commit is contained in:
siddontang 2014-05-25 14:01:35 +08:00
parent 8038a54cdb
commit 39c88b956c
2 changed files with 97 additions and 4 deletions

View File

@ -419,3 +419,39 @@ func (db *DB) HFlush() (drop int64, err error) {
err = t.Commit()
return
}
func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPair, error) {
var minKey []byte
if field != nil {
if err := checkHashKFSize(key, field); err != nil {
return nil, err
}
minKey = db.hEncodeHashKey(key, field)
} else {
minKey = db.hEncodeStartKey(key)
}
maxKey := db.hEncodeStopKey(key)
if count <= 0 {
count = defaultScanCount
}
v := make([]FVPair, 0, 2*count)
rangeType := leveldb.RangeROpen
if !inclusive {
rangeType = leveldb.RangeOpen
}
it := db.db.Iterator(minKey, maxKey, rangeType, 0, count)
for ; it.Valid(); it.Next() {
if _, f, err := db.hDecodeHashKey(it.Key()); err != nil {
continue
} else {
v = append(v, FVPair{Field: f, Value: it.Value()})
}
}
return v, nil
}

View File

@ -58,7 +58,7 @@ func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) {
}
func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
buf := make([]byte, len(key)+len(member)+4)
buf := make([]byte, len(key)+len(member)+5)
pos := 0
buf[pos] = db.index
@ -73,26 +73,45 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
copy(buf[pos:], key)
pos += len(key)
buf[pos] = zsetStartMemSep
pos++
copy(buf[pos:], member)
return buf
}
func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) {
if len(ek) < 4 || ek[0] != db.index || ek[1] != zsetType {
if len(ek) < 5 || ek[0] != db.index || ek[1] != zsetType {
return nil, nil, errZSetKey
}
keyLen := int(binary.BigEndian.Uint16(ek[2:]))
if keyLen+4 > len(ek) {
if keyLen+5 > len(ek) {
return nil, nil, errZSetKey
}
key := ek[4 : 4+keyLen]
member := ek[4+keyLen:]
if ek[4+keyLen] != zsetStartMemSep {
return nil, nil, errZSetKey
}
member := ek[5+keyLen:]
return key, member, nil
}
func (db *DB) zEncodeStartSetKey(key []byte) []byte {
k := db.zEncodeSetKey(key, nil)
return k
}
func (db *DB) zEncodeStopSetKey(key []byte) []byte {
k := db.zEncodeSetKey(key, nil)
k[len(k)-1] = zsetStartMemSep + 1
return k
}
func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte {
buf := make([]byte, len(key)+len(member)+14)
@ -684,3 +703,41 @@ func (db *DB) ZFlush() (drop int64, err error) {
// to do : binlog
return
}
func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]ScorePair, error) {
var minKey []byte
if member != nil {
if err := checkZSetKMSize(key, member); err != nil {
return nil, err
}
minKey = db.zEncodeSetKey(key, member)
} else {
minKey = db.zEncodeStartSetKey(key)
}
maxKey := db.zEncodeStopSetKey(key)
if count <= 0 {
count = defaultScanCount
}
v := make([]ScorePair, 0, 2*count)
rangeType := leveldb.RangeROpen
if !inclusive {
rangeType = leveldb.RangeOpen
}
it := db.db.Iterator(minKey, maxKey, rangeType, 0, count)
for ; it.Valid(); it.Next() {
if _, m, err := db.zDecodeSetKey(it.Key()); err != nil {
continue
} else {
score, _ := Int64(it.Value(), nil)
v = append(v, ScorePair{Member: m, Score: score})
}
}
return v, nil
}