forked from mirror/ledisdb
use iterator to read many consistently in api
for a read api, it not uses lock, so multi read in a api may cause some unpredictable when something writes at same time.
This commit is contained in:
parent
612367d472
commit
cdb6b1cdd2
|
@ -84,8 +84,12 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
|
|||
var size int32
|
||||
var err error
|
||||
|
||||
t := db.listTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
metaKey := db.lEncodeMetaKey(key)
|
||||
headSeq, tailSeq, size, err = db.lGetMeta(metaKey)
|
||||
headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -102,10 +106,6 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
|
|||
delta = 1
|
||||
}
|
||||
|
||||
t := db.listTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// append elements
|
||||
if size > 0 {
|
||||
seq += delta
|
||||
|
@ -148,7 +148,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
|
|||
var err error
|
||||
|
||||
metaKey := db.lEncodeMetaKey(key)
|
||||
headSeq, tailSeq, _, err = db.lGetMeta(metaKey)
|
||||
headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -191,7 +191,10 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
|
|||
var tailSeq int32
|
||||
var err error
|
||||
|
||||
headSeq, tailSeq, _, err = db.lGetMeta(mk)
|
||||
it := db.db.NewIterator()
|
||||
defer it.Close()
|
||||
|
||||
headSeq, tailSeq, _, err = db.lGetMeta(it, mk)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -200,27 +203,24 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
|
|||
startKey := db.lEncodeListKey(key, headSeq)
|
||||
stopKey := db.lEncodeListKey(key, tailSeq)
|
||||
|
||||
it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose})
|
||||
for ; rit.Valid(); rit.Next() {
|
||||
t.Delete(rit.RawKey())
|
||||
num++
|
||||
}
|
||||
it.Close()
|
||||
|
||||
t.Delete(mk)
|
||||
|
||||
return num
|
||||
}
|
||||
|
||||
func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) {
|
||||
ek := db.lEncodeListKey(key, whereSeq)
|
||||
|
||||
return Int64(db.db.Get(ek))
|
||||
}
|
||||
|
||||
func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
|
||||
func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
|
||||
var v []byte
|
||||
v, err = db.db.Get(ek)
|
||||
if it != nil {
|
||||
v = it.Find(ek)
|
||||
} else {
|
||||
v, err = db.db.Get(ek)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
} else if v == nil {
|
||||
|
@ -284,7 +284,10 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
|
|||
|
||||
metaKey := db.lEncodeMetaKey(key)
|
||||
|
||||
headSeq, tailSeq, _, err = db.lGetMeta(metaKey)
|
||||
it := db.db.NewIterator()
|
||||
defer it.Close()
|
||||
|
||||
headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -296,7 +299,9 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
|
|||
}
|
||||
|
||||
sk := db.lEncodeListKey(key, seq)
|
||||
return db.db.Get(sk)
|
||||
v := it.Find(sk)
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (db *DB) LLen(key []byte) (int64, error) {
|
||||
|
@ -305,7 +310,7 @@ func (db *DB) LLen(key []byte) (int64, error) {
|
|||
}
|
||||
|
||||
ek := db.lEncodeMetaKey(key)
|
||||
_, _, size, err := db.lGetMeta(ek)
|
||||
_, _, size, err := db.lGetMeta(nil, ek)
|
||||
return int64(size), err
|
||||
}
|
||||
|
||||
|
@ -328,7 +333,10 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
|
|||
|
||||
metaKey := db.lEncodeMetaKey(key)
|
||||
|
||||
if headSeq, _, llen, err = db.lGetMeta(metaKey); err != nil {
|
||||
it := db.db.NewIterator()
|
||||
defer it.Close()
|
||||
|
||||
if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -356,12 +364,18 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
|
|||
v := make([][]byte, 0, limit)
|
||||
|
||||
startKey := db.lEncodeListKey(key, headSeq)
|
||||
it := db.db.RangeLimitIterator(startKey, nil, leveldb.RangeClose, 0, int(limit))
|
||||
for ; it.Valid(); it.Next() {
|
||||
v = append(v, it.Value())
|
||||
}
|
||||
rit := leveldb.NewRangeLimitIterator(it,
|
||||
&leveldb.Range{
|
||||
Min: startKey,
|
||||
Max: nil,
|
||||
Type: leveldb.RangeClose},
|
||||
&leveldb.Limit{
|
||||
Offset: 0,
|
||||
Count: int(limit)})
|
||||
|
||||
it.Close()
|
||||
for ; rit.Valid(); rit.Next() {
|
||||
v = append(v, rit.Value())
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
|
|
@ -451,37 +451,37 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
|
|||
|
||||
k := db.zEncodeSetKey(key, member)
|
||||
|
||||
if v, err := db.db.Get(k); err != nil {
|
||||
return 0, err
|
||||
} else if v == nil {
|
||||
it := db.db.NewIterator()
|
||||
defer it.Close()
|
||||
|
||||
if v := it.Find(k); v == nil {
|
||||
return -1, nil
|
||||
} else {
|
||||
if s, err := Int64(v, err); err != nil {
|
||||
if s, err := Int64(v, nil); err != nil {
|
||||
return 0, err
|
||||
} else {
|
||||
var it *leveldb.RangeLimitIterator
|
||||
var rit *leveldb.RangeLimitIterator
|
||||
|
||||
sk := db.zEncodeScoreKey(key, member, s)
|
||||
|
||||
if !reverse {
|
||||
minKey := db.zEncodeStartScoreKey(key, MinScore)
|
||||
it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1)
|
||||
|
||||
rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose})
|
||||
} else {
|
||||
maxKey := db.zEncodeStopScoreKey(key, MaxScore)
|
||||
it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1)
|
||||
rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose})
|
||||
}
|
||||
|
||||
var lastKey []byte = nil
|
||||
var n int64 = 0
|
||||
|
||||
for ; it.Valid(); it.Next() {
|
||||
for ; rit.Valid(); rit.Next() {
|
||||
n++
|
||||
|
||||
lastKey = it.BufKey(lastKey)
|
||||
lastKey = rit.BufKey(lastKey)
|
||||
}
|
||||
|
||||
it.Close()
|
||||
|
||||
if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) {
|
||||
n--
|
||||
return n, nil
|
||||
|
@ -741,8 +741,10 @@ func (db *DB) zFlush() (drop int64, err error) {
|
|||
maxKey[1] = zScoreType + 1
|
||||
|
||||
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
|
||||
defer it.Close()
|
||||
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
t.Delete(it.RawKey())
|
||||
drop++
|
||||
if drop&1023 == 0 {
|
||||
if err = t.Commit(); err != nil {
|
||||
|
@ -750,7 +752,6 @@ func (db *DB) zFlush() (drop int64, err error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
|
||||
db.expFlush(t, zsetType)
|
||||
|
||||
|
|
|
@ -125,8 +125,10 @@ func (it *Iterator) BufValue(b []byte) []byte {
|
|||
}
|
||||
|
||||
func (it *Iterator) Close() {
|
||||
C.leveldb_iter_destroy(it.it)
|
||||
it.it = nil
|
||||
if it.it != nil {
|
||||
C.leveldb_iter_destroy(it.it)
|
||||
it.it = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (it *Iterator) Valid() bool {
|
||||
|
@ -274,6 +276,14 @@ func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterat
|
|||
return rangeLimitIterator(i, r, l, IteratorBackward)
|
||||
}
|
||||
|
||||
func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator {
|
||||
return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward)
|
||||
}
|
||||
|
||||
func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator {
|
||||
return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward)
|
||||
}
|
||||
|
||||
func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator {
|
||||
it := new(RangeLimitIterator)
|
||||
|
||||
|
|
Loading…
Reference in New Issue