From 48e09a27273051b6407bec819dfc851336961586 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 19 Jun 2014 17:19:40 +0800 Subject: [PATCH] use our own leveldb --- bootstrap.sh | 1 - etc/ledis.json | 3 +- ledis/dump.go | 6 +- ledis/dump_test.go | 4 +- ledis/ledis.go | 2 +- ledis/replication_test.go | 4 +- ledis/t_hash.go | 27 ++- ledis/t_hash_test.go | 19 +- ledis/t_kv.go | 17 +- ledis/t_kv_test.go | 21 ++- ledis/t_list.go | 8 +- ledis/t_ttl.go | 6 +- ledis/t_zset.go | 22 +-- ledis/tx.go | 2 +- leveldb/batch.go | 59 ++++++ leveldb/cache.go | 18 ++ leveldb/db.go | 328 +++++++++++++++++++++++++++++++++ leveldb/filterpolicy.go | 19 ++ leveldb/iterator.go | 229 +++++++++++++++++++++++ leveldb/leveldb_test.go | 259 ++++++++++++++++++++++++++ leveldb/levigo-license | 7 + leveldb/options.go | 128 +++++++++++++ leveldb/snapshot.go | 54 ++++++ leveldb/util.go | 43 +++++ server/cmd_hash.go | 2 +- server/cmd_replication_test.go | 4 +- 26 files changed, 1233 insertions(+), 59 deletions(-) create mode 100644 leveldb/batch.go create mode 100644 leveldb/cache.go create mode 100644 leveldb/db.go create mode 100644 leveldb/filterpolicy.go create mode 100644 leveldb/iterator.go create mode 100644 leveldb/leveldb_test.go create mode 100644 leveldb/levigo-license create mode 100644 leveldb/options.go create mode 100644 leveldb/snapshot.go create mode 100644 leveldb/util.go diff --git a/bootstrap.sh b/bootstrap.sh index b8d0c41..c5aaefe 100644 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -2,6 +2,5 @@ . ./dev.sh -go get -u github.com/siddontang/go-leveldb/leveldb go get -u github.com/siddontang/go-log/log go get -u github.com/garyburd/redigo/redis diff --git a/etc/ledis.json b/etc/ledis.json index 8f230c3..0d93e88 100644 --- a/etc/ledis.json +++ b/etc/ledis.json @@ -6,7 +6,8 @@ "compression": false, "block_size": 32768, "write_buffer_size": 67108864, - "cache_size": 524288000 + "cache_size": 524288000, + "max_open_files":1024 } }, diff --git a/ledis/dump.go b/ledis/dump.go index 47bca19..16354c8 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -4,7 +4,7 @@ import ( "bufio" "bytes" "encoding/binary" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "io" "os" ) @@ -73,7 +73,9 @@ func (l *Ledis) Dump(w io.Writer) error { return err } - it := sp.Iterator(nil, nil, leveldb.RangeClose, 0, -1) + it := sp.NewIterator() + it.SeekToFirst() + var key []byte var value []byte for ; it.Valid(); it.Next() { diff --git a/ledis/dump_test.go b/ledis/dump_test.go index f15f8f4..c8bfe36 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -2,7 +2,7 @@ package ledis import ( "bytes" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "os" "testing" ) @@ -59,7 +59,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/ledis/ledis.go b/ledis/ledis.go index f011d65..668098c 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -3,8 +3,8 @@ package ledis import ( "encoding/json" "fmt" - "github.com/siddontang/go-leveldb/leveldb" "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/leveldb" "path" "sync" "time" diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 21d4dbc..7e9aa20 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -3,14 +3,14 @@ package ledis import ( "bytes" "fmt" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "os" "path" "testing" ) func checkLedisEqual(master *Ledis, slave *Ledis) error { - it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/ledis/t_hash.go b/ledis/t_hash.go index d64a1f6..4fa4cdc 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "time" ) @@ -132,7 +132,7 @@ func (db *DB) hDelete(t *tx, key []byte) int64 { stop := db.hEncodeStopKey(key) var num int64 = 0 - it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) num++ @@ -232,10 +232,11 @@ func (db *DB) HMset(key []byte, args ...FVPair) error { return err } -func (db *DB) HMget(key []byte, args [][]byte) ([]interface{}, error) { +func (db *DB) HMget(key []byte, args ...[]byte) ([]interface{}, error) { var ek []byte - var v []byte - var err error + + it := db.db.NewIterator() + defer it.Close() r := make([]interface{}, len(args)) for i := 0; i < len(args); i++ { @@ -245,11 +246,7 @@ func (db *DB) HMget(key []byte, args [][]byte) ([]interface{}, error) { ek = db.hEncodeHashKey(key, args[i]) - if v, err = db.db.Get(ek); err != nil { - return nil, err - } - - r[i] = v + r[i] = it.Find(ek) } return r, nil @@ -355,7 +352,7 @@ func (db *DB) HGetAll(key []byte) ([]interface{}, error) { v := make([]interface{}, 0, 16) - it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, k, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -380,7 +377,7 @@ func (db *DB) HKeys(key []byte) ([]interface{}, error) { v := make([]interface{}, 0, 16) - it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, k, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -404,7 +401,7 @@ func (db *DB) HValues(key []byte) ([]interface{}, error) { v := make([]interface{}, 0, 16) - it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { v = append(v, it.Value()) } @@ -443,7 +440,7 @@ func (db *DB) hFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = hSizeType + 1 - it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ @@ -485,7 +482,7 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa rangeType = leveldb.RangeOpen } - it := db.db.Iterator(minKey, maxKey, rangeType, 0, count) + it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) for ; it.Valid(); it.Next() { if _, f, err := db.hDecodeHashKey(it.Key()); err != nil { continue diff --git a/ledis/t_hash_test.go b/ledis/t_hash_test.go index 4648c09..a663c2e 100644 --- a/ledis/t_hash_test.go +++ b/ledis/t_hash_test.go @@ -32,11 +32,28 @@ func TestDBHash(t *testing.T) { key := []byte("testdb_hash_a") - if n, err := db.HSet(key, []byte("a"), []byte("hello world")); err != nil { + if n, err := db.HSet(key, []byte("a"), []byte("hello world 1")); err != nil { t.Fatal(err) } else if n != 1 { t.Fatal(n) } + + if n, err := db.HSet(key, []byte("b"), []byte("hello world 2")); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + ay, _ := db.HMget(key, []byte("a"), []byte("b")) + + if v1, _ := ay[0].([]byte); string(v1) != "hello world 1" { + t.Fatal(string(v1)) + } + + if v2, _ := ay[1].([]byte); string(v2) != "hello world 2" { + t.Fatal(string(v2)) + } + } func TestDBHScan(t *testing.T) { diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 2ae2a63..3008fb8 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -2,7 +2,7 @@ package ledis import ( "errors" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "time" ) @@ -204,18 +204,15 @@ func (db *DB) IncryBy(key []byte, increment int64) (int64, error) { func (db *DB) MGet(keys ...[]byte) ([]interface{}, error) { values := make([]interface{}, len(keys)) - var err error - var value []byte + it := db.db.NewIterator() + defer it.Close() + for i := range keys { if err := checkKeySize(keys[i]); err != nil { return nil, err } - if value, err = db.db.Get(db.encodeKVKey(keys[i])); err != nil { - return nil, err - } - - values[i] = value + values[i] = it.Find(db.encodeKVKey(keys[i])) } return values, nil @@ -319,7 +316,7 @@ func (db *DB) flush() (drop int64, err error) { minKey := db.encodeKVMinKey() maxKey := db.encodeKVMaxKey() - it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ @@ -362,7 +359,7 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) { rangeType = leveldb.RangeOpen } - it := db.db.Iterator(minKey, maxKey, rangeType, 0, count) + it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) for ; it.Valid(); it.Next() { if key, err := db.decodeKVKey(it.Key()); err != nil { continue diff --git a/ledis/t_kv_test.go b/ledis/t_kv_test.go index 0252421..9088bd1 100644 --- a/ledis/t_kv_test.go +++ b/ledis/t_kv_test.go @@ -19,11 +19,28 @@ func TestKVCodec(t *testing.T) { func TestDBKV(t *testing.T) { db := getTestDB() - key := []byte("testdb_kv_a") + key1 := []byte("testdb_kv_a") - if err := db.Set(key, []byte("hello world")); err != nil { + if err := db.Set(key1, []byte("hello world 1")); err != nil { t.Fatal(err) } + + key2 := []byte("testdb_kv_b") + + if err := db.Set(key2, []byte("hello world 2")); err != nil { + t.Fatal(err) + } + + ay, _ := db.MGet(key1, key2) + + if v1, _ := ay[0].([]byte); string(v1) != "hello world 1" { + t.Fatal(string(v1)) + } + + if v2, _ := ay[1].([]byte); string(v2) != "hello world 2" { + t.Fatal(string(v2)) + } + } func TestDBScan(t *testing.T) { diff --git a/ledis/t_list.go b/ledis/t_list.go index df5f576..e5d2f42 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "time" ) @@ -200,7 +200,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { startKey := db.lEncodeListKey(key, headSeq) stopKey := db.lEncodeListKey(key, tailSeq) - it := db.db.Iterator(startKey, stopKey, leveldb.RangeClose, 0, -1) + it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) num++ @@ -361,7 +361,7 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([]interface{}, error) startKey := db.lEncodeListKey(key, startSeq) stopKey := db.lEncodeListKey(key, stopSeq) - it := db.db.Iterator(startKey, stopKey, leveldb.RangeClose, 0, -1) + it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { v = append(v, it.Value()) } @@ -408,7 +408,7 @@ func (db *DB) lFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = lMetaType + 1 - it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index b353ce7..df64118 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "time" ) @@ -119,7 +119,7 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) { maxKey[0] = db.index maxKey[1] = expMetaType + 1 - it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ @@ -173,7 +173,7 @@ func (eli *elimination) active() { continue } - it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) for it.Valid() { for i := 1; i < 512 && it.Valid(); i++ { expKeys = append(expKeys, it.Key(), it.Value()) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 45f8cfa..a4ea355 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/binary" "errors" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "time" ) @@ -434,7 +434,7 @@ func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) { rangeType := leveldb.RangeROpen - it := db.db.Iterator(minKey, maxKey, rangeType, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1) var n int64 = 0 for ; it.Valid(); it.Next() { n++ @@ -459,16 +459,16 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { if s, err := Int64(v, err); err != nil { return 0, err } else { - var it *leveldb.Iterator + var it *leveldb.RangeLimitIterator sk := db.zEncodeScoreKey(key, member, s) if !reverse { minKey := db.zEncodeStartScoreKey(key, MinScore) - it = db.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1) + it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1) } else { maxKey := db.zEncodeStopScoreKey(key, MaxScore) - it = db.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1) + it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1) } var lastKey []byte = nil @@ -492,14 +492,14 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { return -1, nil } -func (db *DB) zIterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator { +func (db *DB) zIterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.RangeLimitIterator { minKey := db.zEncodeStartScoreKey(key, min) maxKey := db.zEncodeStopScoreKey(key, max) if !reverse { - return db.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit) + return db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) } else { - return db.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) + return db.db.RevRangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) } } @@ -567,7 +567,7 @@ func (db *DB) zRange(key []byte, min int64, max int64, withScores bool, offset i } v := make([]interface{}, 0, nv) - var it *leveldb.Iterator + var it *leveldb.RangeLimitIterator //if reverse and offset is 0, limit < 0, we may use forward iterator then reverse //because leveldb iterator prev is slower than next @@ -745,7 +745,7 @@ func (db *DB) zFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = zScoreType + 1 - it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ @@ -788,7 +788,7 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco rangeType = leveldb.RangeOpen } - it := db.db.Iterator(minKey, maxKey, rangeType, 0, count) + it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) for ; it.Valid(); it.Next() { if _, m, err := db.zDecodeSetKey(it.Key()); err != nil { continue diff --git a/ledis/tx.go b/ledis/tx.go index fa7379b..0fe716a 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -1,7 +1,7 @@ package ledis import ( - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "sync" ) diff --git a/leveldb/batch.go b/leveldb/batch.go new file mode 100644 index 0000000..f24ec65 --- /dev/null +++ b/leveldb/batch.go @@ -0,0 +1,59 @@ +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include "leveldb/c.h" +import "C" + +import ( + "unsafe" +) + +type WriteBatch struct { + db *DB + wbatch *C.leveldb_writebatch_t +} + +func (w *WriteBatch) Close() { + C.leveldb_writebatch_destroy(w.wbatch) +} + +func (w *WriteBatch) Put(key, value []byte) { + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + + C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv)) +} + +func (w *WriteBatch) Delete(key []byte) { + C.leveldb_writebatch_delete(w.wbatch, + (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} + +func (w *WriteBatch) Commit() error { + return w.commit(w.db.writeOpts) +} + +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncWriteOpts) +} + +func (w *WriteBatch) Rollback() { + C.leveldb_writebatch_clear(w.wbatch) +} + +func (w *WriteBatch) commit(wb *WriteOptions) error { + var errStr *C.char + C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/leveldb/cache.go b/leveldb/cache.go new file mode 100644 index 0000000..3fbcf0d --- /dev/null +++ b/leveldb/cache.go @@ -0,0 +1,18 @@ +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include +// #include "leveldb/c.h" +import "C" + +type Cache struct { + Cache *C.leveldb_cache_t +} + +func NewLRUCache(capacity int) *Cache { + return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))} +} + +func (c *Cache) Close() { + C.leveldb_cache_destroy(c.Cache) +} diff --git a/leveldb/db.go b/leveldb/db.go new file mode 100644 index 0000000..eb68d5a --- /dev/null +++ b/leveldb/db.go @@ -0,0 +1,328 @@ +package leveldb + +/* +#cgo LDFLAGS: -lleveldb +#include +*/ +import "C" + +import ( + "encoding/json" + "os" + "unsafe" +) + +const defaultFilterBits int = 10 + +type Config struct { + Path string `json:"path"` + + Compression bool `json:"compression"` + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` + MaxOpenFiles int `json:"max_open_files"` +} + +type DB struct { + cfg *Config + + db *C.leveldb_t + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + syncWriteOpts *WriteOptions + + cache *Cache + + filter *FilterPolicy +} + +func Open(configJson json.RawMessage) (*DB, error) { + cfg := new(Config) + err := json.Unmarshal(configJson, cfg) + if err != nil { + return nil, err + } + + return OpenWithConfig(cfg) +} + +func OpenWithConfig(cfg *Config) (*DB, error) { + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.cfg = cfg + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func (db *DB) open() error { + db.opts = db.initOptions(db.cfg) + + db.readOpts = NewReadOptions() + db.writeOpts = NewWriteOptions() + + db.iteratorOpts = NewReadOptions() + db.iteratorOpts.SetFillCache(false) + + db.syncWriteOpts = NewWriteOptions() + db.syncWriteOpts.SetSync(true) + + var errStr *C.char + ldbname := C.CString(db.cfg.Path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) initOptions(cfg *Config) *Options { + opts := NewOptions() + + opts.SetCreateIfMissing(true) + + if cfg.CacheSize <= 0 { + cfg.CacheSize = 4 * 1024 * 1024 + } + + db.cache = NewLRUCache(cfg.CacheSize) + opts.SetCache(db.cache) + + //we must use bloomfilter + db.filter = NewBloomFilter(defaultFilterBits) + opts.SetFilterPolicy(db.filter) + + if !cfg.Compression { + opts.SetCompression(NoCompression) + } + + if cfg.BlockSize <= 0 { + cfg.BlockSize = 4 * 1024 + } + + opts.SetBlockSize(cfg.BlockSize) + + if cfg.WriteBufferSize <= 0 { + cfg.WriteBufferSize = 4 * 1024 * 1024 + } + + opts.SetWriteBufferSize(cfg.WriteBufferSize) + + if cfg.MaxOpenFiles < 1024 { + cfg.MaxOpenFiles = 1024 + } + + opts.SetMaxOpenFiles(cfg.MaxOpenFiles) + + return opts +} + +func (db *DB) Close() { + C.leveldb_close(db.db) + db.db = nil + + db.opts.Close() + + if db.cache != nil { + db.cache.Close() + } + + if db.filter != nil { + db.filter.Close() + } + + db.readOpts.Close() + db.writeOpts.Close() + db.iteratorOpts.Close() + db.syncWriteOpts.Close() +} + +func (db *DB) Destroy() error { + path := db.cfg.Path + + db.Close() + + opts := NewOptions() + defer opts.Close() + + var errStr *C.char + ldbname := C.CString(path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + C.leveldb_destroy_db(opts.Opt, ldbname, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) Clear() error { + bc := db.NewWriteBatch() + defer bc.Close() + + var err error + it := db.NewIterator() + it.SeekToFirst() + + num := 0 + for ; it.Valid(); it.Next() { + bc.Delete(it.Key()) + num++ + if num == 1000 { + num = 0 + if err = bc.Commit(); err != nil { + return err + } + } + } + + err = bc.Commit() + + return err +} + +func (db *DB) Put(key, value []byte) error { + return db.put(db.writeOpts, key, value) +} + +func (db *DB) SyncPut(key, value []byte) error { + return db.put(db.syncWriteOpts, key, value) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + return db.get(db.readOpts, key) +} + +func (db *DB) Delete(key []byte) error { + return db.delete(db.writeOpts, key) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncWriteOpts, key) +} + +func (db *DB) NewWriteBatch() *WriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: C.leveldb_writebatch_create(), + } + return wb +} + +func (db *DB) NewSnapshot() *Snapshot { + s := &Snapshot{ + db: db, + snap: C.leveldb_create_snapshot(db.db), + readOpts: NewReadOptions(), + iteratorOpts: NewReadOptions(), + } + + s.readOpts.SetSnapshot(s) + s.iteratorOpts.SetSnapshot(s) + s.iteratorOpts.SetFillCache(false) + + return s +} + +func (db *DB) NewIterator() *Iterator { + it := new(Iterator) + + it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) + + return it +} + +func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorForward) +} + +func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorBackward) +} + +//limit < 0, unlimit +//offset must >= 0, if < 0, will get nothing +func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator { + return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorForward) +} + +//limit < 0, unlimit +//offset must >= 0, if < 0, will get nothing +func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator { + return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorBackward) +} + +func (db *DB) put(wo *WriteOptions, key, value []byte) error { + var errStr *C.char + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + C.leveldb_put( + db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { + var errStr *C.char + var vallen C.size_t + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + value := C.leveldb_get( + db.db, ro.Opt, k, C.size_t(len(key)), &vallen, &errStr) + + if errStr != nil { + return nil, saveError(errStr) + } + + if value == nil { + return nil, nil + } + + defer C.leveldb_free(unsafe.Pointer(value)) + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil +} + +func (db *DB) delete(wo *WriteOptions, key []byte) error { + var errStr *C.char + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + C.leveldb_delete( + db.db, wo.Opt, k, C.size_t(len(key)), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/leveldb/filterpolicy.go b/leveldb/filterpolicy.go new file mode 100644 index 0000000..b007d58 --- /dev/null +++ b/leveldb/filterpolicy.go @@ -0,0 +1,19 @@ +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include +// #include "leveldb/c.h" +import "C" + +type FilterPolicy struct { + Policy *C.leveldb_filterpolicy_t +} + +func NewBloomFilter(bitsPerKey int) *FilterPolicy { + policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey)) + return &FilterPolicy{policy} +} + +func (fp *FilterPolicy) Close() { + C.leveldb_filterpolicy_destroy(fp.Policy) +} diff --git a/leveldb/iterator.go b/leveldb/iterator.go new file mode 100644 index 0000000..6f22b92 --- /dev/null +++ b/leveldb/iterator.go @@ -0,0 +1,229 @@ +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include +// #include "leveldb/c.h" +import "C" + +import ( + "bytes" + "unsafe" +) + +const ( + IteratorForward uint8 = 0 + IteratorBackward uint8 = 1 +) + +const ( + RangeClose uint8 = 0x00 + RangeLOpen uint8 = 0x01 + RangeROpen uint8 = 0x10 + RangeOpen uint8 = 0x11 +) + +//min must less or equal than max +//range type: +//close: [min, max] +//open: (min, max) +//lopen: (min, max] +//ropen: [min, max) +type Range struct { + Min []byte + Max []byte + + Type uint8 +} + +type Iterator struct { + it *C.leveldb_iterator_t +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return C.GoBytes(unsafe.Pointer(kdata), C.int(klen)) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen)) +} + +func (it *Iterator) Close() { + C.leveldb_iter_destroy(it.it) + it.it = nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(C.leveldb_iter_valid(it.it)) +} + +func (it *Iterator) Next() { + C.leveldb_iter_next(it.it) +} + +func (it *Iterator) Prev() { + C.leveldb_iter_prev(it.it) +} + +func (it *Iterator) SeekToFirst() { + C.leveldb_iter_seek_to_first(it.it) +} + +func (it *Iterator) SeekToLast() { + C.leveldb_iter_seek_to_last(it.it) +} + +func (it *Iterator) Seek(key []byte) { + C.leveldb_iter_seek(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} + +func (it *Iterator) Find(key []byte) []byte { + it.Seek(key) + if it.Valid() && bytes.Equal(it.Key(), key) { + return it.Value() + } else { + return nil + } +} + +type RangeLimitIterator struct { + it *Iterator + + r *Range + + offset int + limit int + + step int + + //0 for IteratorForward, 1 for IteratorBackward + direction uint8 +} + +func (it *RangeLimitIterator) Key() []byte { + return it.it.Key() +} + +func (it *RangeLimitIterator) Value() []byte { + return it.it.Value() +} + +func (it *RangeLimitIterator) Valid() bool { + if it.offset < 0 { + return false + } else if !it.it.Valid() { + return false + } else if it.limit >= 0 && it.step >= it.limit { + return false + } + + if it.direction == IteratorForward { + if it.r.Max != nil { + r := bytes.Compare(it.it.Key(), it.r.Max) + if it.r.Type&RangeROpen > 0 { + return !(r >= 0) + } else { + return !(r > 0) + } + } + } else { + if it.r.Min != nil { + r := bytes.Compare(it.it.Key(), it.r.Min) + if it.r.Type&RangeLOpen > 0 { + return !(r <= 0) + } else { + return !(r < 0) + } + } + } + + return true +} + +func (it *RangeLimitIterator) Next() { + it.step++ + + if it.direction == IteratorForward { + it.it.Next() + } else { + it.it.Prev() + } +} + +func (it *RangeLimitIterator) Close() { + it.it.Close() +} + +func newRangeLimitIterator(i *Iterator, r *Range, offset int, limit int, direction uint8) *RangeLimitIterator { + it := new(RangeLimitIterator) + + it.it = i + + it.r = r + it.offset = offset + it.limit = limit + it.direction = direction + + it.step = 0 + + if offset < 0 { + return it + } + + if direction == IteratorForward { + if r.Min == nil { + it.it.SeekToFirst() + } else { + it.it.Seek(r.Min) + + if r.Type&RangeLOpen > 0 { + if it.it.Valid() && bytes.Equal(it.it.Key(), r.Min) { + it.it.Next() + } + } + } + } else { + if r.Max == nil { + it.it.SeekToLast() + } else { + it.it.Seek(r.Max) + + if !it.it.Valid() { + it.it.SeekToLast() + } else { + if !bytes.Equal(it.it.Key(), r.Max) { + it.it.Prev() + } + } + + if r.Type&RangeROpen > 0 { + if it.it.Valid() && bytes.Equal(it.it.Key(), r.Max) { + it.it.Prev() + } + } + } + } + + for i := 0; i < offset; i++ { + if it.it.Valid() { + if it.direction == IteratorForward { + it.it.Next() + } else { + it.it.Prev() + } + } + } + + return it +} diff --git a/leveldb/leveldb_test.go b/leveldb/leveldb_test.go new file mode 100644 index 0000000..7289ed9 --- /dev/null +++ b/leveldb/leveldb_test.go @@ -0,0 +1,259 @@ +package leveldb + +import ( + "bytes" + "fmt" + "os" + "sync" + "testing" +) + +var testConfigJson = []byte(` + { + "path" : "./testdb", + "compression":true, + "block_size" : 32768, + "write_buffer_size" : 2097152, + "cache_size" : 20971520 + } + `) + +var testOnce sync.Once +var testDB *DB + +func getTestDB() *DB { + f := func() { + var err error + testDB, err = Open(testConfigJson) + if err != nil { + println(err.Error()) + panic(err) + } + } + + testOnce.Do(f) + return testDB +} + +func TestSimple(t *testing.T) { + db := getTestDB() + + key := []byte("key") + value := []byte("hello world") + if err := db.Put(key, value); err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key); err != nil { + t.Fatal(err) + } else if !bytes.Equal(v, value) { + t.Fatal("not equal") + } + + if err := db.Delete(key); err != nil { + t.Fatal(err) + } + if v, err := db.Get(key); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatal("must nil") + } +} + +func TestBatch(t *testing.T) { + db := getTestDB() + + key1 := []byte("key1") + key2 := []byte("key2") + + value := []byte("hello world") + + db.Put(key1, value) + db.Put(key2, value) + + wb := db.NewWriteBatch() + defer wb.Close() + + wb.Delete(key2) + wb.Put(key1, []byte("hello world2")) + + if err := wb.Commit(); err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key2); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatal("must nil") + } + + if v, err := db.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "hello world2" { + t.Fatal(string(v)) + } + + wb.Delete(key1) + + wb.Rollback() + + if v, err := db.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "hello world2" { + t.Fatal(string(v)) + } + + db.Delete(key1) +} + +func checkIterator(it *RangeLimitIterator, cv ...int) error { + v := make([]string, 0, len(cv)) + for ; it.Valid(); it.Next() { + k := it.Key() + v = append(v, string(k)) + } + + it.Close() + + if len(v) != len(cv) { + return fmt.Errorf("len error %d != %d", len(v), len(cv)) + } + + for k, i := range cv { + if fmt.Sprintf("key_%d", i) != v[k] { + return fmt.Errorf("%s, %d", v[k], i) + } + } + + return nil +} + +func TestIterator(t *testing.T) { + db := getTestDB() + + db.Clear() + + for i := 0; i < 10; i++ { + key := []byte(fmt.Sprintf("key_%d", i)) + value := []byte("") + db.Put(key, value) + } + + var it *RangeLimitIterator + + k := func(i int) []byte { + return []byte(fmt.Sprintf("key_%d", i)) + } + + it = db.RangeLimitIterator(k(1), k(5), RangeClose, 0, -1) + if err := checkIterator(it, 1, 2, 3, 4, 5); err != nil { + t.Fatal(err) + } + + it = db.RangeLimitIterator(k(1), k(5), RangeClose, 1, 3) + if err := checkIterator(it, 2, 3, 4); err != nil { + t.Fatal(err) + } + + it = db.RangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1) + if err := checkIterator(it, 2, 3, 4, 5); err != nil { + t.Fatal(err) + } + + it = db.RangeLimitIterator(k(1), k(5), RangeROpen, 0, -1) + if err := checkIterator(it, 1, 2, 3, 4); err != nil { + t.Fatal(err) + } + + it = db.RangeLimitIterator(k(1), k(5), RangeOpen, 0, -1) + if err := checkIterator(it, 2, 3, 4); err != nil { + t.Fatal(err) + } + + it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 0, -1) + if err := checkIterator(it, 5, 4, 3, 2, 1); err != nil { + t.Fatal(err) + } + + it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 1, 3) + if err := checkIterator(it, 4, 3, 2); err != nil { + t.Fatal(err) + } + + it = db.RevRangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1) + if err := checkIterator(it, 5, 4, 3, 2); err != nil { + t.Fatal(err) + } + + it = db.RevRangeLimitIterator(k(1), k(5), RangeROpen, 0, -1) + if err := checkIterator(it, 4, 3, 2, 1); err != nil { + t.Fatal(err) + } + + it = db.RevRangeLimitIterator(k(1), k(5), RangeOpen, 0, -1) + if err := checkIterator(it, 4, 3, 2); err != nil { + t.Fatal(err) + } +} + +func TestSnapshot(t *testing.T) { + db := getTestDB() + + key := []byte("key") + value := []byte("hello world") + + db.Put(key, value) + + s := db.NewSnapshot() + defer s.Close() + + db.Put(key, []byte("hello world2")) + + if v, err := s.Get(key); err != nil { + t.Fatal(err) + } else if string(v) != string(value) { + t.Fatal(string(v)) + } +} + +func TestDestroy(t *testing.T) { + db := getTestDB() + + db.Put([]byte("a"), []byte("1")) + if err := db.Clear(); err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(db.cfg.Path); err != nil { + t.Fatal("must exist ", err.Error()) + } + + if v, err := db.Get([]byte("a")); err != nil { + t.Fatal(err) + } else if string(v) == "1" { + t.Fatal(string(v)) + } + + db.Destroy() + + if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) { + t.Fatal("must not exist") + } +} + +func TestCloseMore(t *testing.T) { + cfg := new(Config) + cfg.Path = "/tmp/testdb1234" + cfg.CacheSize = 4 * 1024 * 1024 + os.RemoveAll(cfg.Path) + for i := 0; i < 100; i++ { + db, err := OpenWithConfig(cfg) + if err != nil { + t.Fatal(err) + } + + db.Put([]byte("key"), []byte("value")) + + db.Close() + } +} diff --git a/leveldb/levigo-license b/leveldb/levigo-license new file mode 100644 index 0000000..c7c73be --- /dev/null +++ b/leveldb/levigo-license @@ -0,0 +1,7 @@ +Copyright (c) 2012 Jeffrey M Hodges + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/leveldb/options.go b/leveldb/options.go new file mode 100644 index 0000000..f080c24 --- /dev/null +++ b/leveldb/options.go @@ -0,0 +1,128 @@ +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include "leveldb/c.h" +import "C" + +type CompressionOpt int + +const ( + NoCompression = CompressionOpt(0) + SnappyCompression = CompressionOpt(1) +) + +type Options struct { + Opt *C.leveldb_options_t +} + +type ReadOptions struct { + Opt *C.leveldb_readoptions_t +} + +type WriteOptions struct { + Opt *C.leveldb_writeoptions_t +} + +func NewOptions() *Options { + opt := C.leveldb_options_create() + return &Options{opt} +} + +func NewReadOptions() *ReadOptions { + opt := C.leveldb_readoptions_create() + return &ReadOptions{opt} +} + +func NewWriteOptions() *WriteOptions { + opt := C.leveldb_writeoptions_create() + return &WriteOptions{opt} +} + +func (o *Options) Close() { + C.leveldb_options_destroy(o.Opt) +} + +func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) { + C.leveldb_options_set_comparator(o.Opt, cmp) +} + +func (o *Options) SetErrorIfExists(error_if_exists bool) { + eie := boolToUchar(error_if_exists) + C.leveldb_options_set_error_if_exists(o.Opt, eie) +} + +func (o *Options) SetCache(cache *Cache) { + C.leveldb_options_set_cache(o.Opt, cache.Cache) +} + +// func (o *Options) SetEnv(env *Env) { +// C.leveldb_options_set_env(o.Opt, env.Env) +// } + +func (o *Options) SetInfoLog(log *C.leveldb_logger_t) { + C.leveldb_options_set_info_log(o.Opt, log) +} + +func (o *Options) SetWriteBufferSize(s int) { + C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetParanoidChecks(pc bool) { + C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc)) +} + +func (o *Options) SetMaxOpenFiles(n int) { + C.leveldb_options_set_max_open_files(o.Opt, C.int(n)) +} + +func (o *Options) SetBlockSize(s int) { + C.leveldb_options_set_block_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetBlockRestartInterval(n int) { + C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n)) +} + +func (o *Options) SetCompression(t CompressionOpt) { + C.leveldb_options_set_compression(o.Opt, C.int(t)) +} + +func (o *Options) SetCreateIfMissing(b bool) { + C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b)) +} + +func (o *Options) SetFilterPolicy(fp *FilterPolicy) { + var policy *C.leveldb_filterpolicy_t + if fp != nil { + policy = fp.Policy + } + C.leveldb_options_set_filter_policy(o.Opt, policy) +} + +func (ro *ReadOptions) Close() { + C.leveldb_readoptions_destroy(ro.Opt) +} + +func (ro *ReadOptions) SetVerifyChecksums(b bool) { + C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b)) +} + +func (ro *ReadOptions) SetFillCache(b bool) { + C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) +} + +func (ro *ReadOptions) SetSnapshot(snap *Snapshot) { + var s *C.leveldb_snapshot_t + if snap != nil { + s = snap.snap + } + C.leveldb_readoptions_set_snapshot(ro.Opt, s) +} + +func (wo *WriteOptions) Close() { + C.leveldb_writeoptions_destroy(wo.Opt) +} + +func (wo *WriteOptions) SetSync(b bool) { + C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) +} diff --git a/leveldb/snapshot.go b/leveldb/snapshot.go new file mode 100644 index 0000000..3c847e9 --- /dev/null +++ b/leveldb/snapshot.go @@ -0,0 +1,54 @@ +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include +// #include "leveldb/c.h" +import "C" + +type Snapshot struct { + db *DB + + snap *C.leveldb_snapshot_t + + readOpts *ReadOptions + iteratorOpts *ReadOptions +} + +func (s *Snapshot) Close() { + C.leveldb_release_snapshot(s.db.db, s.snap) + + s.iteratorOpts.Close() + s.readOpts.Close() +} + +func (s *Snapshot) Get(key []byte) ([]byte, error) { + return s.db.get(s.readOpts, key) +} + +func (s *Snapshot) NewIterator() *Iterator { + it := new(Iterator) + + it.it = C.leveldb_create_iterator(s.db.db, s.iteratorOpts.Opt) + + return it +} + +func (s *Snapshot) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorForward) +} + +func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorBackward) +} + +//limit < 0, unlimit +//offset must >= 0, if < 0, will get nothing +func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator { + return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorForward) +} + +//limit < 0, unlimit +//offset must >= 0, if < 0, will get nothing +func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator { + return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorBackward) +} diff --git a/leveldb/util.go b/leveldb/util.go new file mode 100644 index 0000000..e1fd57d --- /dev/null +++ b/leveldb/util.go @@ -0,0 +1,43 @@ +package leveldb + +// #include "leveldb/c.h" +import "C" + +import ( + "fmt" + "reflect" + "unsafe" +) + +func boolToUchar(b bool) C.uchar { + uc := C.uchar(0) + if b { + uc = C.uchar(1) + } + return uc +} + +func ucharToBool(uc C.uchar) bool { + if uc == C.uchar(0) { + return false + } + return true +} + +func saveError(errStr *C.char) error { + if errStr != nil { + gs := C.GoString(errStr) + C.leveldb_free(unsafe.Pointer(errStr)) + return fmt.Errorf(gs) + } + return nil +} + +func slice(p unsafe.Pointer, n int) []byte { + var b []byte + pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pbyte.Data = uintptr(p) + pbyte.Len = n + pbyte.Cap = n + return b +} diff --git a/server/cmd_hash.go b/server/cmd_hash.go index bce1be2..a20fef6 100644 --- a/server/cmd_hash.go +++ b/server/cmd_hash.go @@ -138,7 +138,7 @@ func hmgetCommand(c *client) error { return ErrCmdParams } - if v, err := c.db.HMget(args[0], args[1:]); err != nil { + if v, err := c.db.HMget(args[0], args[1:]...); err != nil { return err } else { c.writeArray(v) diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index 645f4f7..290d94b 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -3,14 +3,14 @@ package server import ( "bytes" "fmt" - "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/ledisdb/leveldb" "os" "testing" "time" ) func checkDataEqual(master *App, slave *App) error { - it := master.ldb.DataDB().Iterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.DataDB().RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value()