add leveldb wrapper

This commit is contained in:
siddontang 2014-04-18 14:50:29 +08:00
parent 0a9ad31602
commit b9100d77aa
7 changed files with 744 additions and 0 deletions

34
leveldb/batch.go Normal file
View File

@ -0,0 +1,34 @@
package leveldb
import (
"github.com/jmhodges/levigo"
)
type WriteBatch struct {
db *DB
wb *levigo.WriteBatch
}
func (wb *WriteBatch) Put(key, value []byte) {
wb.wb.Put(key, value)
}
func (wb *WriteBatch) Delete(key []byte) {
wb.wb.Delete(key)
}
func (wb *WriteBatch) Commit() error {
err := wb.db.db.Write(wb.db.writeOpts, wb.wb)
wb.close()
return err
}
func (wb *WriteBatch) Rollback() {
wb.wb.Clear()
wb.close()
}
func (wb *WriteBatch) close() {
wb.wb.Close()
wb.wb = nil
}

41
leveldb/build_deps.sh Normal file
View File

@ -0,0 +1,41 @@
#!/bin/bash
#refer https://github.com/norton/lets/blob/master/c_src/build_deps.sh
#install leveldb and snappy
#you must set your own snappy and leveldb source directory
SNAPPY_SRC=./snappy
LEVELDB_SRC=./leveldb
SNAPPY_DIR=/usr/local/snappy
LEVELDB_DIR=/usr/local/leveldb
if [ ! -f $SNAPPY_DIR/lib/libsnappy.a ]; then
(cd $SNAPPY_SRC && \
./configure --prefix=$SNAPPY_DIR && \
make && \
make install)
else
echo "skip install snappy"
fi
if [ ! -f $LEVELDB_DIR/lib/libleveldb.a ]; then
(cd $LEVELDB_SRC && \
echo "echo \"PLATFORM_CFLAGS+=-I$SNAPPY_DIR/include\" >> build_config.mk" >> build_detect_platform &&
echo "echo \"PLATFORM_CXXFLAGS+=-I$SNAPPY_DIR/include\" >> build_config.mk" >> build_detect_platform &&
echo "echo \"PLATFORM_LDFLAGS+=-L $SNAPPY_DIR/lib -lsnappy\" >> build_config.mk" >> build_detect_platform &&
make SNAPPY=1 && \
make && \
mkdir -p $LEVELDB_DIR/include/leveldb && \
install include/leveldb/*.h $LEVELDB_DIR/include/leveldb && \
mkdir -p $LEVELDB_DIR/lib && \
cp -af libleveldb.* $LEVELDB_DIR/lib)
else
echo "skip install leveldb"
fi
export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include"
export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy"
go get github.com/jmhodges/levigo

154
leveldb/db.go Normal file
View File

@ -0,0 +1,154 @@
package leveldb
import (
"encoding/json"
"github.com/jmhodges/levigo"
)
const defaultFilterBits int = 10
type Config struct {
Path string `json:"path"`
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
}
type DB struct {
cfg *Config
db *levigo.DB
opts *levigo.Options
//for default read and write options
readOpts *levigo.ReadOptions
writeOpts *levigo.WriteOptions
iteratorOpts *levigo.ReadOptions
cache *levigo.Cache
filter *levigo.FilterPolicy
}
func Open(configJson json.RawMessage) (*DB, error) {
cfg := new(Config)
err := json.Unmarshal(configJson, cfg)
if err != nil {
return nil, err
}
db := new(DB)
db.cfg = cfg
db.opts = db.initOptions(cfg)
db.readOpts = levigo.NewReadOptions()
db.writeOpts = levigo.NewWriteOptions()
db.iteratorOpts = levigo.NewReadOptions()
db.iteratorOpts.SetFillCache(false)
db.db, err = levigo.Open(cfg.Path, db.opts)
return db, err
}
func (db *DB) initOptions(cfg *Config) *levigo.Options {
opts := levigo.NewOptions()
opts.SetCreateIfMissing(true)
if cfg.CacheSize > 0 {
db.cache = levigo.NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
}
//we must use bloomfilter
db.filter = levigo.NewBloomFilter(defaultFilterBits)
opts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(levigo.NoCompression)
}
blockSize := cfg.BlockSize * 1024
if blockSize > 0 {
opts.SetBlockSize(blockSize)
}
writeBufferSize := cfg.WriteBufferSize * 1024 * 1024
if writeBufferSize > 0 {
opts.SetWriteBufferSize(writeBufferSize)
}
return opts
}
func (db *DB) Close() {
db.opts.Close()
if db.cache != nil {
db.cache.Close()
}
if db.filter != nil {
db.filter.Close()
}
db.readOpts.Close()
db.writeOpts.Close()
db.iteratorOpts.Close()
db.db.Close()
db.db = nil
}
func (db *DB) Destroy() {
db.Close()
opts := levigo.NewOptions()
defer opts.Close()
levigo.DestroyDatabase(db.cfg.Path, opts)
}
func (db *DB) Put(key, value []byte) error {
return db.db.Put(db.writeOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) {
return db.db.Get(db.readOpts, key)
}
func (db *DB) Delete(key []byte) error {
return db.db.Delete(db.writeOpts, key)
}
func (db *DB) NewWriteBatch() *WriteBatch {
wb := new(WriteBatch)
wb.wb = levigo.NewWriteBatch()
wb.db = db
return wb
}
//like c++ iterator, [begin, end)
//begin should less than end
//if begin is nil, we will seek to first
//if end is nil, we will next until read last
//limit <= 0, no limit
func (db *DB) Iterator(begin []byte, end []byte, limit int) *Iterator {
return newIterator(db, db.iteratorOpts, begin, end, limit, forward)
}
//like c++ reverse_iterator, [rbegin, rend)
//rbegin should bigger than rend
//if rbegin is nil, we will seek to last
//if end is nil, we will next until read first
//limit <= 0, no limit
func (db *DB) ReverseIterator(rbegin []byte, rend []byte, limit int) *Iterator {
return newIterator(db, db.iteratorOpts, rbegin, rend, limit, backward)
}
func (db *DB) NewSnapshot() *Snapshot {
return newSnapshot(db)
}

101
leveldb/iterator.go Normal file
View File

@ -0,0 +1,101 @@
package leveldb
import (
"bytes"
"github.com/jmhodges/levigo"
)
const forward uint8 = 0
const backward uint8 = 1
type Iterator struct {
it *levigo.Iterator
start []byte
stop []byte
limit int
step int
//0 for forward, 1 for backward
direction uint8
}
func newIterator(db *DB, opts *levigo.ReadOptions, start []byte, stop []byte, limit int, direction uint8) *Iterator {
it := new(Iterator)
it.it = db.db.NewIterator(opts)
it.start = start
it.stop = stop
it.limit = limit
it.direction = direction
it.step = 0
if start == nil {
if direction == forward {
it.it.SeekToFirst()
} else {
it.it.SeekToLast()
}
} else {
it.it.Seek(start)
if it.Valid() && !bytes.Equal(it.Key(), start) {
//for forward, key is the next bigger than start
//for backward, key is the next bigger than start, so must go prev
if direction == backward {
it.it.Prev()
}
}
}
return it
}
func (it *Iterator) Valid() bool {
if !it.it.Valid() {
return false
}
if it.limit > 0 && it.step >= it.limit {
return false
}
if it.direction == forward {
if it.stop != nil && bytes.Compare(it.Key(), it.stop) >= 0 {
return false
}
} else {
if it.stop != nil && bytes.Compare(it.Key(), it.stop) <= 0 {
return false
}
}
return true
}
func (it *Iterator) GetError() error {
return it.it.GetError()
}
func (it *Iterator) Next() {
it.step++
if it.direction == forward {
it.it.Next()
} else {
it.it.Prev()
}
}
func (it *Iterator) Key() []byte {
return it.it.Key()
}
func (it *Iterator) Value() []byte {
return it.it.Value()
}
func (it *Iterator) Close() {
it.it.Close()
}

359
leveldb/leveldb_test.go Normal file
View File

@ -0,0 +1,359 @@
package leveldb
import (
"bytes"
"fmt"
"os"
"sync"
"testing"
)
var testConfigJson = []byte(`
{
"path" : "./testdb",
"compression":true,
"block_size" : 32,
"write_buffer_size" : 2,
"cache_size" : 20
}
`)
var testOnce sync.Once
var testDB *DB
func getTestDB() *DB {
f := func() {
var err error
testDB, err = Open(testConfigJson)
if err != nil {
println(err.Error())
panic(err)
}
}
testOnce.Do(f)
return testDB
}
func TestSimple(t *testing.T) {
db := getTestDB()
key := []byte("key")
value := []byte("hello world")
if err := db.Put(key, value); err != nil {
t.Fatal(err)
}
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(v, value) {
t.Fatal("not equal")
}
if err := db.Delete(key); err != nil {
t.Fatal(err)
}
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
}
func TestBatch(t *testing.T) {
db := getTestDB()
key1 := []byte("key1")
key2 := []byte("key2")
value := []byte("hello world")
db.Put(key1, value)
db.Put(key2, value)
wb := db.NewWriteBatch()
wb.Delete(key2)
wb.Put(key1, []byte("hello world2"))
if err := wb.Commit(); err != nil {
t.Fatal(err)
}
if v, err := db.Get(key2); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "hello world2" {
t.Fatal(string(v))
}
wb = db.NewWriteBatch()
wb.Delete(key1)
wb.Rollback()
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "hello world2" {
t.Fatal(string(v))
}
db.Delete(key1)
}
func TestIterator(t *testing.T) {
db := getTestDB()
for it := db.Iterator(nil, nil, 0); it.Valid(); it.Next() {
db.Delete(it.Key())
}
for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("key_%d", i))
value := []byte(fmt.Sprintf("value_%d", i))
db.Put(key, value)
}
step := 0
var it *Iterator
for it = db.Iterator(nil, nil, 0); it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if string(key) != fmt.Sprintf("key_%d", step) {
t.Fatal(string(key), step)
}
if string(value) != fmt.Sprintf("value_%d", step) {
t.Fatal(string(value), step)
}
step++
}
it.Close()
step = 2
for it = db.Iterator([]byte("key_2"), nil, 3); it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if string(key) != fmt.Sprintf("key_%d", step) {
t.Fatal(string(key), step)
}
if string(value) != fmt.Sprintf("value_%d", step) {
t.Fatal(string(value), step)
}
step++
}
it.Close()
if step != 5 {
t.Fatal("invalid step", step)
}
step = 2
for it = db.Iterator([]byte("key_2"), []byte("key_5"), 0); it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if string(key) != fmt.Sprintf("key_%d", step) {
t.Fatal(string(key), step)
}
if string(value) != fmt.Sprintf("value_%d", step) {
t.Fatal(string(value), step)
}
step++
}
it.Close()
if step != 5 {
t.Fatal("invalid step", step)
}
step = 2
for it = db.Iterator([]byte("key_5"), []byte("key_2"), 0); it.Valid(); it.Next() {
step++
}
it.Close()
if step != 2 {
t.Fatal("must 0")
}
step = 9
for it = db.ReverseIterator(nil, nil, 0); it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if string(key) != fmt.Sprintf("key_%d", step) {
t.Fatal(string(key), step)
}
if string(value) != fmt.Sprintf("value_%d", step) {
t.Fatal(string(value), step)
}
step--
}
it.Close()
step = 5
for it = db.ReverseIterator([]byte("key_5"), nil, 3); it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if string(key) != fmt.Sprintf("key_%d", step) {
t.Fatal(string(key), step)
}
if string(value) != fmt.Sprintf("value_%d", step) {
t.Fatal(string(value), step)
}
step--
}
it.Close()
if step != 2 {
t.Fatal("invalid step", step)
}
step = 5
for it = db.ReverseIterator([]byte("key_5"), []byte("key_2"), 0); it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if string(key) != fmt.Sprintf("key_%d", step) {
t.Fatal(string(key), step)
}
if string(value) != fmt.Sprintf("value_%d", step) {
t.Fatal(string(value), step)
}
step--
}
it.Close()
if step != 2 {
t.Fatal("invalid step", step)
}
step = 5
for it = db.ReverseIterator([]byte("key_2"), []byte("key_5"), 0); it.Valid(); it.Next() {
step--
}
it.Close()
if step != 5 {
t.Fatal("must 5")
}
}
func TestIterator_2(t *testing.T) {
db := getTestDB()
for it := db.Iterator(nil, nil, 0); it.Valid(); it.Next() {
db.Delete(it.Key())
}
db.Put([]byte("key_1"), []byte("value_1"))
db.Put([]byte("key_7"), []byte("value_9"))
db.Put([]byte("key_9"), []byte("value_9"))
it := db.Iterator([]byte("key_0"), []byte("key_8"), 0)
if !it.Valid() {
t.Fatal("must valid")
}
if string(it.Key()) != "key_1" {
t.Fatal(string(it.Key()))
}
it = db.ReverseIterator([]byte("key_8"), []byte("key_0"), 0)
if !it.Valid() {
t.Fatal("must valid")
}
if string(it.Key()) != "key_7" {
t.Fatal(string(it.Key()))
}
for it := db.Iterator(nil, nil, 0); it.Valid(); it.Next() {
db.Delete(it.Key())
}
it = db.Iterator([]byte("key_0"), []byte("key_8"), 0)
if it.Valid() {
t.Fatal("must not valid")
}
it = db.ReverseIterator([]byte("key_8"), []byte("key_0"), 0)
if it.Valid() {
t.Fatal("must not valid")
}
}
func TestSnapshot(t *testing.T) {
db := getTestDB()
key := []byte("key")
value := []byte("hello world")
db.Put(key, value)
s := db.NewSnapshot()
defer s.Close()
db.Put(key, []byte("hello world2"))
if v, err := s.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
found := false
for it := s.Iterator(nil, nil, 0); it.Valid(); it.Next() {
if string(it.Key()) == string(key) {
found = true
break
}
}
if !found {
t.Fatal("must found")
}
found = false
for it := s.ReverseIterator(nil, nil, 0); it.Valid(); it.Next() {
if string(it.Key()) == string(key) {
found = true
break
}
}
if !found {
t.Fatal("must found")
}
}
func TestDestroy(t *testing.T) {
db := getTestDB()
db.Destroy()
if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) {
t.Fatal("must not exist")
}
}

8
leveldb/readme.md Normal file
View File

@ -0,0 +1,8 @@
a leveldb wrapper for levigo
simplify use leveldb in go
# Install
+ download leveldb and snappy source, uncompress and set source directory in build_deps.sh
+ . ./build_deps.sh

47
leveldb/snapshot.go Normal file
View File

@ -0,0 +1,47 @@
package leveldb
import (
"github.com/jmhodges/levigo"
)
type Snapshot struct {
db *DB
s *levigo.Snapshot
readOpts *levigo.ReadOptions
iteratorOpts *levigo.ReadOptions
}
func newSnapshot(db *DB) *Snapshot {
s := new(Snapshot)
s.db = db
s.s = db.db.NewSnapshot()
s.readOpts = levigo.NewReadOptions()
s.readOpts.SetSnapshot(s.s)
s.iteratorOpts = levigo.NewReadOptions()
s.iteratorOpts.SetSnapshot(s.s)
s.iteratorOpts.SetFillCache(false)
return s
}
func (s *Snapshot) Close() {
s.db.db.ReleaseSnapshot(s.s)
s.iteratorOpts.Close()
s.readOpts.Close()
}
func (s *Snapshot) Get(key []byte) ([]byte, error) {
return s.db.db.Get(s.readOpts, key)
}
//same as db iterator and reverse iterator
func (s *Snapshot) Iterator(begin []byte, end []byte, limit int) *Iterator {
return newIterator(s.db, s.iteratorOpts, begin, end, limit, forward)
}
func (s *Snapshot) ReverseIterator(rbegin []byte, rend []byte, limit int) *Iterator {
return newIterator(s.db, s.iteratorOpts, rbegin, rend, limit, backward)
}