From b9100d77aacfdf6f592213aacf310a27ab975b04 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 18 Apr 2014 14:50:29 +0800 Subject: [PATCH] add leveldb wrapper --- leveldb/batch.go | 34 ++++ leveldb/build_deps.sh | 41 +++++ leveldb/db.go | 154 +++++++++++++++++ leveldb/iterator.go | 101 +++++++++++ leveldb/leveldb_test.go | 359 ++++++++++++++++++++++++++++++++++++++++ leveldb/readme.md | 8 + leveldb/snapshot.go | 47 ++++++ 7 files changed, 744 insertions(+) create mode 100644 leveldb/batch.go create mode 100644 leveldb/build_deps.sh create mode 100644 leveldb/db.go create mode 100644 leveldb/iterator.go create mode 100644 leveldb/leveldb_test.go create mode 100644 leveldb/readme.md create mode 100644 leveldb/snapshot.go diff --git a/leveldb/batch.go b/leveldb/batch.go new file mode 100644 index 0000000..945531c --- /dev/null +++ b/leveldb/batch.go @@ -0,0 +1,34 @@ +package leveldb + +import ( + "github.com/jmhodges/levigo" +) + +type WriteBatch struct { + db *DB + wb *levigo.WriteBatch +} + +func (wb *WriteBatch) Put(key, value []byte) { + wb.wb.Put(key, value) +} + +func (wb *WriteBatch) Delete(key []byte) { + wb.wb.Delete(key) +} + +func (wb *WriteBatch) Commit() error { + err := wb.db.db.Write(wb.db.writeOpts, wb.wb) + wb.close() + return err +} + +func (wb *WriteBatch) Rollback() { + wb.wb.Clear() + wb.close() +} + +func (wb *WriteBatch) close() { + wb.wb.Close() + wb.wb = nil +} diff --git a/leveldb/build_deps.sh b/leveldb/build_deps.sh new file mode 100644 index 0000000..3b1443b --- /dev/null +++ b/leveldb/build_deps.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +#refer https://github.com/norton/lets/blob/master/c_src/build_deps.sh + +#install leveldb and snappy + +#you must set your own snappy and leveldb source directory +SNAPPY_SRC=./snappy +LEVELDB_SRC=./leveldb + +SNAPPY_DIR=/usr/local/snappy +LEVELDB_DIR=/usr/local/leveldb + +if [ ! -f $SNAPPY_DIR/lib/libsnappy.a ]; then + (cd $SNAPPY_SRC && \ + ./configure --prefix=$SNAPPY_DIR && \ + make && \ + make install) +else + echo "skip install snappy" +fi + +if [ ! -f $LEVELDB_DIR/lib/libleveldb.a ]; then + (cd $LEVELDB_SRC && \ + echo "echo \"PLATFORM_CFLAGS+=-I$SNAPPY_DIR/include\" >> build_config.mk" >> build_detect_platform && + echo "echo \"PLATFORM_CXXFLAGS+=-I$SNAPPY_DIR/include\" >> build_config.mk" >> build_detect_platform && + echo "echo \"PLATFORM_LDFLAGS+=-L $SNAPPY_DIR/lib -lsnappy\" >> build_config.mk" >> build_detect_platform && + make SNAPPY=1 && \ + make && \ + mkdir -p $LEVELDB_DIR/include/leveldb && \ + install include/leveldb/*.h $LEVELDB_DIR/include/leveldb && \ + mkdir -p $LEVELDB_DIR/lib && \ + cp -af libleveldb.* $LEVELDB_DIR/lib) +else + echo "skip install leveldb" +fi + +export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" +export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy" + +go get github.com/jmhodges/levigo \ No newline at end of file diff --git a/leveldb/db.go b/leveldb/db.go new file mode 100644 index 0000000..d55a410 --- /dev/null +++ b/leveldb/db.go @@ -0,0 +1,154 @@ +package leveldb + +import ( + "encoding/json" + "github.com/jmhodges/levigo" +) + +const defaultFilterBits int = 10 + +type Config struct { + Path string `json:"path"` + + Compression bool `json:"compression"` + + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` +} + +type DB struct { + cfg *Config + db *levigo.DB + + opts *levigo.Options + + //for default read and write options + readOpts *levigo.ReadOptions + writeOpts *levigo.WriteOptions + iteratorOpts *levigo.ReadOptions + + cache *levigo.Cache + + filter *levigo.FilterPolicy +} + +func Open(configJson json.RawMessage) (*DB, error) { + cfg := new(Config) + err := json.Unmarshal(configJson, cfg) + if err != nil { + return nil, err + } + + db := new(DB) + db.cfg = cfg + + db.opts = db.initOptions(cfg) + + db.readOpts = levigo.NewReadOptions() + db.writeOpts = levigo.NewWriteOptions() + db.iteratorOpts = levigo.NewReadOptions() + db.iteratorOpts.SetFillCache(false) + + db.db, err = levigo.Open(cfg.Path, db.opts) + return db, err +} + +func (db *DB) initOptions(cfg *Config) *levigo.Options { + opts := levigo.NewOptions() + + opts.SetCreateIfMissing(true) + + if cfg.CacheSize > 0 { + db.cache = levigo.NewLRUCache(cfg.CacheSize) + opts.SetCache(db.cache) + } + + //we must use bloomfilter + db.filter = levigo.NewBloomFilter(defaultFilterBits) + opts.SetFilterPolicy(db.filter) + + if !cfg.Compression { + opts.SetCompression(levigo.NoCompression) + } + + blockSize := cfg.BlockSize * 1024 + if blockSize > 0 { + opts.SetBlockSize(blockSize) + } + + writeBufferSize := cfg.WriteBufferSize * 1024 * 1024 + if writeBufferSize > 0 { + opts.SetWriteBufferSize(writeBufferSize) + } + + return opts +} + +func (db *DB) Close() { + db.opts.Close() + + if db.cache != nil { + db.cache.Close() + } + + if db.filter != nil { + db.filter.Close() + } + + db.readOpts.Close() + db.writeOpts.Close() + db.iteratorOpts.Close() + + db.db.Close() + db.db = nil +} + +func (db *DB) Destroy() { + db.Close() + opts := levigo.NewOptions() + defer opts.Close() + + levigo.DestroyDatabase(db.cfg.Path, opts) +} + +func (db *DB) Put(key, value []byte) error { + return db.db.Put(db.writeOpts, key, value) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + return db.db.Get(db.readOpts, key) +} + +func (db *DB) Delete(key []byte) error { + return db.db.Delete(db.writeOpts, key) +} + +func (db *DB) NewWriteBatch() *WriteBatch { + wb := new(WriteBatch) + wb.wb = levigo.NewWriteBatch() + wb.db = db + return wb +} + +//like c++ iterator, [begin, end) +//begin should less than end +//if begin is nil, we will seek to first +//if end is nil, we will next until read last +//limit <= 0, no limit +func (db *DB) Iterator(begin []byte, end []byte, limit int) *Iterator { + return newIterator(db, db.iteratorOpts, begin, end, limit, forward) +} + +//like c++ reverse_iterator, [rbegin, rend) +//rbegin should bigger than rend +//if rbegin is nil, we will seek to last +//if end is nil, we will next until read first +//limit <= 0, no limit +func (db *DB) ReverseIterator(rbegin []byte, rend []byte, limit int) *Iterator { + return newIterator(db, db.iteratorOpts, rbegin, rend, limit, backward) +} + +func (db *DB) NewSnapshot() *Snapshot { + return newSnapshot(db) +} diff --git a/leveldb/iterator.go b/leveldb/iterator.go new file mode 100644 index 0000000..cd8ce50 --- /dev/null +++ b/leveldb/iterator.go @@ -0,0 +1,101 @@ +package leveldb + +import ( + "bytes" + "github.com/jmhodges/levigo" +) + +const forward uint8 = 0 +const backward uint8 = 1 + +type Iterator struct { + it *levigo.Iterator + + start []byte + stop []byte + limit int + + step int + + //0 for forward, 1 for backward + direction uint8 +} + +func newIterator(db *DB, opts *levigo.ReadOptions, start []byte, stop []byte, limit int, direction uint8) *Iterator { + it := new(Iterator) + it.it = db.db.NewIterator(opts) + + it.start = start + it.stop = stop + it.limit = limit + it.direction = direction + it.step = 0 + + if start == nil { + if direction == forward { + it.it.SeekToFirst() + } else { + it.it.SeekToLast() + } + } else { + it.it.Seek(start) + + if it.Valid() && !bytes.Equal(it.Key(), start) { + //for forward, key is the next bigger than start + //for backward, key is the next bigger than start, so must go prev + if direction == backward { + it.it.Prev() + } + } + } + + return it +} + +func (it *Iterator) Valid() bool { + if !it.it.Valid() { + return false + } + + if it.limit > 0 && it.step >= it.limit { + return false + } + + if it.direction == forward { + if it.stop != nil && bytes.Compare(it.Key(), it.stop) >= 0 { + return false + } + } else { + if it.stop != nil && bytes.Compare(it.Key(), it.stop) <= 0 { + return false + } + } + + return true +} + +func (it *Iterator) GetError() error { + return it.it.GetError() +} + +func (it *Iterator) Next() { + it.step++ + + if it.direction == forward { + it.it.Next() + } else { + it.it.Prev() + } +} + +func (it *Iterator) Key() []byte { + return it.it.Key() +} + +func (it *Iterator) Value() []byte { + return it.it.Value() +} + +func (it *Iterator) Close() { + it.it.Close() +} diff --git a/leveldb/leveldb_test.go b/leveldb/leveldb_test.go new file mode 100644 index 0000000..a5207a2 --- /dev/null +++ b/leveldb/leveldb_test.go @@ -0,0 +1,359 @@ +package leveldb + +import ( + "bytes" + "fmt" + "os" + "sync" + "testing" +) + +var testConfigJson = []byte(` + { + "path" : "./testdb", + "compression":true, + "block_size" : 32, + "write_buffer_size" : 2, + "cache_size" : 20 + } + `) + +var testOnce sync.Once +var testDB *DB + +func getTestDB() *DB { + f := func() { + var err error + testDB, err = Open(testConfigJson) + if err != nil { + println(err.Error()) + panic(err) + } + } + + testOnce.Do(f) + return testDB +} + +func TestSimple(t *testing.T) { + db := getTestDB() + + key := []byte("key") + value := []byte("hello world") + if err := db.Put(key, value); err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key); err != nil { + t.Fatal(err) + } else if !bytes.Equal(v, value) { + t.Fatal("not equal") + } + + if err := db.Delete(key); err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatal("must nil") + } +} + +func TestBatch(t *testing.T) { + db := getTestDB() + + key1 := []byte("key1") + key2 := []byte("key2") + + value := []byte("hello world") + + db.Put(key1, value) + db.Put(key2, value) + + wb := db.NewWriteBatch() + wb.Delete(key2) + wb.Put(key1, []byte("hello world2")) + + if err := wb.Commit(); err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key2); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatal("must nil") + } + + if v, err := db.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "hello world2" { + t.Fatal(string(v)) + } + + wb = db.NewWriteBatch() + wb.Delete(key1) + + wb.Rollback() + + if v, err := db.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "hello world2" { + t.Fatal(string(v)) + } + + db.Delete(key1) +} + +func TestIterator(t *testing.T) { + db := getTestDB() + for it := db.Iterator(nil, nil, 0); it.Valid(); it.Next() { + db.Delete(it.Key()) + } + + for i := 0; i < 10; i++ { + key := []byte(fmt.Sprintf("key_%d", i)) + value := []byte(fmt.Sprintf("value_%d", i)) + db.Put(key, value) + } + + step := 0 + var it *Iterator + for it = db.Iterator(nil, nil, 0); it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if string(key) != fmt.Sprintf("key_%d", step) { + t.Fatal(string(key), step) + } + + if string(value) != fmt.Sprintf("value_%d", step) { + t.Fatal(string(value), step) + } + + step++ + } + + it.Close() + + step = 2 + for it = db.Iterator([]byte("key_2"), nil, 3); it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if string(key) != fmt.Sprintf("key_%d", step) { + t.Fatal(string(key), step) + } + + if string(value) != fmt.Sprintf("value_%d", step) { + t.Fatal(string(value), step) + } + + step++ + } + it.Close() + + if step != 5 { + t.Fatal("invalid step", step) + } + + step = 2 + for it = db.Iterator([]byte("key_2"), []byte("key_5"), 0); it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if string(key) != fmt.Sprintf("key_%d", step) { + t.Fatal(string(key), step) + } + + if string(value) != fmt.Sprintf("value_%d", step) { + t.Fatal(string(value), step) + } + + step++ + } + it.Close() + + if step != 5 { + t.Fatal("invalid step", step) + } + + step = 2 + for it = db.Iterator([]byte("key_5"), []byte("key_2"), 0); it.Valid(); it.Next() { + step++ + } + it.Close() + + if step != 2 { + t.Fatal("must 0") + } + + step = 9 + for it = db.ReverseIterator(nil, nil, 0); it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if string(key) != fmt.Sprintf("key_%d", step) { + t.Fatal(string(key), step) + } + + if string(value) != fmt.Sprintf("value_%d", step) { + t.Fatal(string(value), step) + } + + step-- + } + it.Close() + + step = 5 + for it = db.ReverseIterator([]byte("key_5"), nil, 3); it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if string(key) != fmt.Sprintf("key_%d", step) { + t.Fatal(string(key), step) + } + + if string(value) != fmt.Sprintf("value_%d", step) { + t.Fatal(string(value), step) + } + + step-- + } + it.Close() + + if step != 2 { + t.Fatal("invalid step", step) + } + + step = 5 + for it = db.ReverseIterator([]byte("key_5"), []byte("key_2"), 0); it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if string(key) != fmt.Sprintf("key_%d", step) { + t.Fatal(string(key), step) + } + + if string(value) != fmt.Sprintf("value_%d", step) { + t.Fatal(string(value), step) + } + + step-- + } + it.Close() + + if step != 2 { + t.Fatal("invalid step", step) + } + + step = 5 + for it = db.ReverseIterator([]byte("key_2"), []byte("key_5"), 0); it.Valid(); it.Next() { + step-- + } + it.Close() + + if step != 5 { + t.Fatal("must 5") + } +} + +func TestIterator_2(t *testing.T) { + db := getTestDB() + for it := db.Iterator(nil, nil, 0); it.Valid(); it.Next() { + db.Delete(it.Key()) + } + + db.Put([]byte("key_1"), []byte("value_1")) + db.Put([]byte("key_7"), []byte("value_9")) + db.Put([]byte("key_9"), []byte("value_9")) + + it := db.Iterator([]byte("key_0"), []byte("key_8"), 0) + if !it.Valid() { + t.Fatal("must valid") + } + + if string(it.Key()) != "key_1" { + t.Fatal(string(it.Key())) + } + + it = db.ReverseIterator([]byte("key_8"), []byte("key_0"), 0) + if !it.Valid() { + t.Fatal("must valid") + } + + if string(it.Key()) != "key_7" { + t.Fatal(string(it.Key())) + } + + for it := db.Iterator(nil, nil, 0); it.Valid(); it.Next() { + db.Delete(it.Key()) + } + + it = db.Iterator([]byte("key_0"), []byte("key_8"), 0) + if it.Valid() { + t.Fatal("must not valid") + } + + it = db.ReverseIterator([]byte("key_8"), []byte("key_0"), 0) + if it.Valid() { + t.Fatal("must not valid") + } + +} + +func TestSnapshot(t *testing.T) { + db := getTestDB() + + key := []byte("key") + value := []byte("hello world") + + db.Put(key, value) + + s := db.NewSnapshot() + defer s.Close() + + db.Put(key, []byte("hello world2")) + + if v, err := s.Get(key); err != nil { + t.Fatal(err) + } else if string(v) != string(value) { + t.Fatal(string(v)) + } + + found := false + for it := s.Iterator(nil, nil, 0); it.Valid(); it.Next() { + if string(it.Key()) == string(key) { + found = true + break + } + } + + if !found { + t.Fatal("must found") + } + + found = false + for it := s.ReverseIterator(nil, nil, 0); it.Valid(); it.Next() { + if string(it.Key()) == string(key) { + found = true + break + } + } + + if !found { + t.Fatal("must found") + } + +} + +func TestDestroy(t *testing.T) { + db := getTestDB() + + db.Destroy() + + if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) { + t.Fatal("must not exist") + } +} diff --git a/leveldb/readme.md b/leveldb/readme.md new file mode 100644 index 0000000..99c3a3b --- /dev/null +++ b/leveldb/readme.md @@ -0,0 +1,8 @@ +a leveldb wrapper for levigo + +simplify use leveldb in go + +# Install + ++ download leveldb and snappy source, uncompress and set source directory in build_deps.sh ++ . ./build_deps.sh diff --git a/leveldb/snapshot.go b/leveldb/snapshot.go new file mode 100644 index 0000000..0663b5c --- /dev/null +++ b/leveldb/snapshot.go @@ -0,0 +1,47 @@ +package leveldb + +import ( + "github.com/jmhodges/levigo" +) + +type Snapshot struct { + db *DB + s *levigo.Snapshot + readOpts *levigo.ReadOptions + iteratorOpts *levigo.ReadOptions +} + +func newSnapshot(db *DB) *Snapshot { + s := new(Snapshot) + s.db = db + s.s = db.db.NewSnapshot() + + s.readOpts = levigo.NewReadOptions() + s.readOpts.SetSnapshot(s.s) + + s.iteratorOpts = levigo.NewReadOptions() + s.iteratorOpts.SetSnapshot(s.s) + s.iteratorOpts.SetFillCache(false) + + return s +} + +func (s *Snapshot) Close() { + s.db.db.ReleaseSnapshot(s.s) + + s.iteratorOpts.Close() + s.readOpts.Close() +} + +func (s *Snapshot) Get(key []byte) ([]byte, error) { + return s.db.db.Get(s.readOpts, key) +} + +//same as db iterator and reverse iterator +func (s *Snapshot) Iterator(begin []byte, end []byte, limit int) *Iterator { + return newIterator(s.db, s.iteratorOpts, begin, end, limit, forward) +} + +func (s *Snapshot) ReverseIterator(rbegin []byte, rend []byte, limit int) *Iterator { + return newIterator(s.db, s.iteratorOpts, rbegin, rend, limit, backward) +}