From cdb6b1cdd2f030156f8293eb882ea146ef4dcc2f Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 10 Jul 2014 11:32:37 +0800 Subject: [PATCH] use iterator to read many consistently in api for a read api, it not uses lock, so multi read in a api may cause some unpredictable when something writes at same time. --- ledis/t_list.go | 70 +++++++++++++++++++++++++++------------------ ledis/t_zset.go | 27 ++++++++--------- leveldb/iterator.go | 14 +++++++-- 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/ledis/t_list.go b/ledis/t_list.go index 39ca6ed..2f17609 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -84,8 +84,12 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { var size int32 var err error + t := db.listTx + t.Lock() + defer t.Unlock() + metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, size, err = db.lGetMeta(metaKey) + headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey) if err != nil { return 0, err } @@ -102,10 +106,6 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { delta = 1 } - t := db.listTx - t.Lock() - defer t.Unlock() - // append elements if size > 0 { seq += delta @@ -148,7 +148,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { var err error metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, _, err = db.lGetMeta(metaKey) + headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey) if err != nil { return nil, err } @@ -191,7 +191,10 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { var tailSeq int32 var err error - headSeq, tailSeq, _, err = db.lGetMeta(mk) + it := db.db.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, mk) if err != nil { return 0 } @@ -200,27 +203,24 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { startKey := db.lEncodeListKey(key, headSeq) stopKey := db.lEncodeListKey(key, tailSeq) - it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose}) + for ; rit.Valid(); rit.Next() { + t.Delete(rit.RawKey()) num++ } - it.Close() t.Delete(mk) return num } -func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) { - ek := db.lEncodeListKey(key, whereSeq) - - return Int64(db.db.Get(ek)) -} - -func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { +func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { var v []byte - v, err = db.db.Get(ek) + if it != nil { + v = it.Find(ek) + } else { + v, err = db.db.Get(ek) + } if err != nil { return } else if v == nil { @@ -284,7 +284,10 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, _, err = db.lGetMeta(metaKey) + it := db.db.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey) if err != nil { return nil, err } @@ -296,7 +299,9 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { } sk := db.lEncodeListKey(key, seq) - return db.db.Get(sk) + v := it.Find(sk) + + return v, nil } func (db *DB) LLen(key []byte) (int64, error) { @@ -305,7 +310,7 @@ func (db *DB) LLen(key []byte) (int64, error) { } ek := db.lEncodeMetaKey(key) - _, _, size, err := db.lGetMeta(ek) + _, _, size, err := db.lGetMeta(nil, ek) return int64(size), err } @@ -328,7 +333,10 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { metaKey := db.lEncodeMetaKey(key) - if headSeq, _, llen, err = db.lGetMeta(metaKey); err != nil { + it := db.db.NewIterator() + defer it.Close() + + if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil { return nil, err } @@ -356,12 +364,18 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { v := make([][]byte, 0, limit) startKey := db.lEncodeListKey(key, headSeq) - it := db.db.RangeLimitIterator(startKey, nil, leveldb.RangeClose, 0, int(limit)) - for ; it.Valid(); it.Next() { - v = append(v, it.Value()) - } + rit := leveldb.NewRangeLimitIterator(it, + &leveldb.Range{ + Min: startKey, + Max: nil, + Type: leveldb.RangeClose}, + &leveldb.Limit{ + Offset: 0, + Count: int(limit)}) - it.Close() + for ; rit.Valid(); rit.Next() { + v = append(v, rit.Value()) + } return v, nil } diff --git a/ledis/t_zset.go b/ledis/t_zset.go index e9b9c4b..9a2f105 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -451,37 +451,37 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { k := db.zEncodeSetKey(key, member) - if v, err := db.db.Get(k); err != nil { - return 0, err - } else if v == nil { + it := db.db.NewIterator() + defer it.Close() + + if v := it.Find(k); v == nil { return -1, nil } else { - if s, err := Int64(v, err); err != nil { + if s, err := Int64(v, nil); err != nil { return 0, err } else { - var it *leveldb.RangeLimitIterator + var rit *leveldb.RangeLimitIterator sk := db.zEncodeScoreKey(key, member, s) if !reverse { minKey := db.zEncodeStartScoreKey(key, MinScore) - it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1) + + rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose}) } else { maxKey := db.zEncodeStopScoreKey(key, MaxScore) - it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1) + rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose}) } var lastKey []byte = nil var n int64 = 0 - for ; it.Valid(); it.Next() { + for ; rit.Valid(); rit.Next() { n++ - lastKey = it.BufKey(lastKey) + lastKey = rit.BufKey(lastKey) } - it.Close() - if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) { n-- return n, nil @@ -741,8 +741,10 @@ func (db *DB) zFlush() (drop int64, err error) { maxKey[1] = zScoreType + 1 it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + defer it.Close() + for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + t.Delete(it.RawKey()) drop++ if drop&1023 == 0 { if err = t.Commit(); err != nil { @@ -750,7 +752,6 @@ func (db *DB) zFlush() (drop int64, err error) { } } } - it.Close() db.expFlush(t, zsetType) diff --git a/leveldb/iterator.go b/leveldb/iterator.go index 3c3e811..a75e4d7 100644 --- a/leveldb/iterator.go +++ b/leveldb/iterator.go @@ -125,8 +125,10 @@ func (it *Iterator) BufValue(b []byte) []byte { } func (it *Iterator) Close() { - C.leveldb_iter_destroy(it.it) - it.it = nil + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } } func (it *Iterator) Valid() bool { @@ -274,6 +276,14 @@ func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterat return rangeLimitIterator(i, r, l, IteratorBackward) } +func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward) +} + +func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward) +} + func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator { it := new(RangeLimitIterator)