diff --git a/dev.sh b/dev.sh index 6d753f2..798ffab 100644 --- a/dev.sh +++ b/dev.sh @@ -13,6 +13,7 @@ fi SNAPPY_DIR=/usr/local/snappy LEVELDB_DIR=/usr/local/leveldb ROCKSDB_DIR=/usr/local/rocksdb +HYPERLEVELDB_DIR=/usr/local/hyperleveldb function add_path() { @@ -63,6 +64,16 @@ if [ -f $ROCKSDB_DIR/include/rocksdb/c.h ]; then GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb" fi +#check hyperleveldb +if [ -f $HYPERLEVELDB_DIR/include/hyperleveldb/c.h ]; then + CGO_CFLAGS="$CGO_CFLAGS -I$HYPERLEVELDB_DIR/include" + CGO_CXXFLAGS="$CGO_CXXFLAGS -I$HYPERLEVELDB_DIR/include" + CGO_LDFLAGS="$CGO_LDFLAGS -L$HYPERLEVELDB_DIR/lib -lhyperleveldb" + LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib) + DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib) + GO_BUILD_TAGS="$GO_BUILD_TAGS hyperleveldb" +fi + export CGO_CFLAGS export CGO_CXXFLAGS export CGO_LDFLAGS diff --git a/store/hyperleveldb.go b/store/hyperleveldb.go new file mode 100644 index 0000000..f26631b --- /dev/null +++ b/store/hyperleveldb.go @@ -0,0 +1,9 @@ +package store + +import ( + "github.com/siddontang/ledisdb/store/hyperleveldb" +) + +func init() { + Register(hyperleveldb.Store{}) +} diff --git a/store/hyperleveldb/batch.go b/store/hyperleveldb/batch.go new file mode 100644 index 0000000..149f0b4 --- /dev/null +++ b/store/hyperleveldb/batch.go @@ -0,0 +1,61 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include "hyperleveldb/c.h" +import "C" + +import ( + "unsafe" +) + +type WriteBatch struct { + db *DB + wbatch *C.leveldb_writebatch_t +} + +func (w *WriteBatch) Close() error { + C.leveldb_writebatch_destroy(w.wbatch) + w.wbatch = nil + + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + + C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv)) +} + +func (w *WriteBatch) Delete(key []byte) { + C.leveldb_writebatch_delete(w.wbatch, + (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} + +func (w *WriteBatch) Commit() error { + return w.commit(w.db.writeOpts) +} + +func (w *WriteBatch) Rollback() error { + C.leveldb_writebatch_clear(w.wbatch) + return nil +} + +func (w *WriteBatch) commit(wb *WriteOptions) error { + var errStr *C.char + C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/store/hyperleveldb/cache.go b/store/hyperleveldb/cache.go new file mode 100644 index 0000000..9b73d21 --- /dev/null +++ b/store/hyperleveldb/cache.go @@ -0,0 +1,20 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +import "C" + +type Cache struct { + Cache *C.leveldb_cache_t +} + +func NewLRUCache(capacity int) *Cache { + return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))} +} + +func (c *Cache) Close() { + C.leveldb_cache_destroy(c.Cache) +} diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go new file mode 100644 index 0000000..946a2f6 --- /dev/null +++ b/store/hyperleveldb/db.go @@ -0,0 +1,259 @@ +// +build hyperleveldb + +// Package hyperleveldb is a wrapper for c++ hyperleveldb +package hyperleveldb + +/* +#cgo LDFLAGS: -lhyperleveldb +#include +#include "hyperleveldb_ext.h" +*/ +import "C" + +import ( + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store/driver" + "os" + "runtime" + "unsafe" +) + +const defaultFilterBits int = 10 + +type Store struct { +} + +func (s Store) String() string { + return "hyperleveldb" +} + +func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.path = path + db.cfg = &cfg.LevelDB + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func (s Store) Repair(path string, cfg *config.Config) error { + db := new(DB) + db.cfg = &cfg.LevelDB + db.path = path + + err := db.open() + defer db.Close() + + //open ok, do not need repair + if err == nil { + return nil + } + + var errStr *C.char + ldbname := C.CString(path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + C.leveldb_repair_db(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} + +type DB struct { + path string + + cfg *config.LevelDBConfig + + db *C.leveldb_t + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + cache *Cache + + filter *FilterPolicy +} + +func (db *DB) open() error { + db.initOptions(db.cfg) + + var errStr *C.char + ldbname := C.CString(db.path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + db.db = nil + return saveError(errStr) + } + return nil +} + +func (db *DB) initOptions(cfg *config.LevelDBConfig) { + opts := NewOptions() + + opts.SetCreateIfMissing(true) + + cfg.Adjust() + + db.cache = NewLRUCache(cfg.CacheSize) + opts.SetCache(db.cache) + + //we must use bloomfilter + db.filter = NewBloomFilter(defaultFilterBits) + opts.SetFilterPolicy(db.filter) + + if !cfg.Compression { + opts.SetCompression(NoCompression) + } else { + opts.SetCompression(SnappyCompression) + } + + opts.SetBlockSize(cfg.BlockSize) + + opts.SetWriteBufferSize(cfg.WriteBufferSize) + + opts.SetMaxOpenFiles(cfg.MaxOpenFiles) + + db.opts = opts + + db.readOpts = NewReadOptions() + db.writeOpts = NewWriteOptions() + + db.iteratorOpts = NewReadOptions() + db.iteratorOpts.SetFillCache(false) +} + +func (db *DB) Close() error { + if db.db != nil { + C.leveldb_close(db.db) + db.db = nil + } + + db.opts.Close() + + if db.cache != nil { + db.cache.Close() + } + + if db.filter != nil { + db.filter.Close() + } + + db.readOpts.Close() + db.writeOpts.Close() + db.iteratorOpts.Close() + + return nil +} + +func (db *DB) Put(key, value []byte) error { + return db.put(db.writeOpts, key, value) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + return db.get(db.readOpts, key) +} + +func (db *DB) Delete(key []byte) error { + return db.delete(db.writeOpts, key) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: C.leveldb_writebatch_create(), + } + + runtime.SetFinalizer(wb, func(w *WriteBatch) { + w.Close() + }) + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := new(Iterator) + + it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) + + return it +} + +func (db *DB) put(wo *WriteOptions, key, value []byte) error { + var errStr *C.char + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + C.leveldb_put( + db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { + var errStr *C.char + var vallen C.size_t + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + var value *C.char + + c := C.hyperleveldb_get_ext( + db.db, ro.Opt, k, C.size_t(len(key)), &value, &vallen, &errStr) + + if errStr != nil { + return nil, saveError(errStr) + } + + if value == nil { + return nil, nil + } + + defer C.hyperleveldb_get_free_ext(unsafe.Pointer(c)) + + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil +} + +func (db *DB) delete(wo *WriteOptions, key []byte) error { + var errStr *C.char + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + C.leveldb_delete( + db.db, wo.Opt, k, C.size_t(len(key)), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) Begin() (driver.Tx, error) { + return nil, driver.ErrTxSupport +} diff --git a/store/hyperleveldb/filterpolicy.go b/store/hyperleveldb/filterpolicy.go new file mode 100644 index 0000000..1c8f126 --- /dev/null +++ b/store/hyperleveldb/filterpolicy.go @@ -0,0 +1,21 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +import "C" + +type FilterPolicy struct { + Policy *C.leveldb_filterpolicy_t +} + +func NewBloomFilter(bitsPerKey int) *FilterPolicy { + policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey)) + return &FilterPolicy{policy} +} + +func (fp *FilterPolicy) Close() { + C.leveldb_filterpolicy_destroy(fp.Policy) +} diff --git a/store/hyperleveldb/hyperleveldb_ext.cc b/store/hyperleveldb/hyperleveldb_ext.cc new file mode 100644 index 0000000..dab687c --- /dev/null +++ b/store/hyperleveldb/hyperleveldb_ext.cc @@ -0,0 +1,88 @@ +// +build hyperleveldb + +#include "hyperleveldb_ext.h" + +#include +#include + +#include "hyperleveldb/db.h" + +using namespace leveldb; + +extern "C" { + +static bool SaveError(char** errptr, const Status& s) { + assert(errptr != NULL); + if (s.ok()) { + return false; + } else if (*errptr == NULL) { + *errptr = strdup(s.ToString().c_str()); + } else { + free(*errptr); + *errptr = strdup(s.ToString().c_str()); + } + return true; +} + +void* hyperleveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr) { + + std::string *tmp = new(std::string); + + //very tricky, maybe changed with c++ leveldb upgrade + Status s = (*(DB**)db)->Get(*(ReadOptions*)options, Slice(key, keylen), tmp); + + if (s.ok()) { + *valptr = (char*)tmp->data(); + *vallen = tmp->size(); + } else { + delete(tmp); + tmp = NULL; + *valptr = NULL; + *vallen = 0; + if (!s.IsNotFound()) { + SaveError(errptr, s); + } + } + return tmp; +} + +void hyperleveldb_get_free_ext(void* context) { + std::string* s = (std::string*)context; + + delete(s); +} + + +unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_first(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_last(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t* iter, const char* k, size_t klen) { + leveldb_iter_seek(iter, k, klen); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t* iter) { + leveldb_iter_next(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t* iter) { + leveldb_iter_prev(iter); + return leveldb_iter_valid(iter); +} + + +} \ No newline at end of file diff --git a/store/hyperleveldb/hyperleveldb_ext.h b/store/hyperleveldb/hyperleveldb_ext.h new file mode 100644 index 0000000..940a090 --- /dev/null +++ b/store/hyperleveldb/hyperleveldb_ext.h @@ -0,0 +1,40 @@ +// +build hyperleveldb + +#ifndef HYPERLEVELDB_EXT_H +#define HYPERLEVELDB_EXT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "hyperleveldb/c.h" + + +/* Returns NULL if not found. Otherwise stores the value in **valptr. + Stores the length of the value in *vallen. + Returns a context must be later to free*/ +extern void* hyperleveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr); + +// Free context returns by hyperleveldb_get_ext +extern void hyperleveldb_get_free_ext(void* context); + + +// Below iterator functions like leveldb iterator but returns valid status for iterator +extern unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, size_t klen); +extern unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t*); + + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/store/hyperleveldb/iterator.go b/store/hyperleveldb/iterator.go new file mode 100644 index 0000000..fc72ccb --- /dev/null +++ b/store/hyperleveldb/iterator.go @@ -0,0 +1,70 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +// #include "hyperleveldb_ext.h" +import "C" + +import ( + "unsafe" +) + +type Iterator struct { + it *C.leveldb_iterator_t + isValid C.uchar +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +func (it *Iterator) Close() error { + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(it.isValid) +} + +func (it *Iterator) Next() { + it.isValid = C.hyperleveldb_iter_next_ext(it.it) +} + +func (it *Iterator) Prev() { + it.isValid = C.hyperleveldb_iter_prev_ext(it.it) +} + +func (it *Iterator) First() { + it.isValid = C.hyperleveldb_iter_seek_to_first_ext(it.it) +} + +func (it *Iterator) Last() { + it.isValid = C.hyperleveldb_iter_seek_to_last_ext(it.it) +} + +func (it *Iterator) Seek(key []byte) { + it.isValid = C.hyperleveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} diff --git a/store/hyperleveldb/options.go b/store/hyperleveldb/options.go new file mode 100644 index 0000000..09c9a02 --- /dev/null +++ b/store/hyperleveldb/options.go @@ -0,0 +1,114 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include "hyperleveldb/c.h" +import "C" + +type CompressionOpt int + +const ( + NoCompression = CompressionOpt(0) + SnappyCompression = CompressionOpt(1) +) + +type Options struct { + Opt *C.leveldb_options_t +} + +type ReadOptions struct { + Opt *C.leveldb_readoptions_t +} + +type WriteOptions struct { + Opt *C.leveldb_writeoptions_t +} + +func NewOptions() *Options { + opt := C.leveldb_options_create() + return &Options{opt} +} + +func NewReadOptions() *ReadOptions { + opt := C.leveldb_readoptions_create() + return &ReadOptions{opt} +} + +func NewWriteOptions() *WriteOptions { + opt := C.leveldb_writeoptions_create() + return &WriteOptions{opt} +} + +func (o *Options) Close() { + C.leveldb_options_destroy(o.Opt) +} + +func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) { + C.leveldb_options_set_comparator(o.Opt, cmp) +} + +func (o *Options) SetErrorIfExists(error_if_exists bool) { + eie := boolToUchar(error_if_exists) + C.leveldb_options_set_error_if_exists(o.Opt, eie) +} + +func (o *Options) SetCache(cache *Cache) { + C.leveldb_options_set_cache(o.Opt, cache.Cache) +} + +func (o *Options) SetWriteBufferSize(s int) { + C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetParanoidChecks(pc bool) { + C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc)) +} + +func (o *Options) SetMaxOpenFiles(n int) { + C.leveldb_options_set_max_open_files(o.Opt, C.int(n)) +} + +func (o *Options) SetBlockSize(s int) { + C.leveldb_options_set_block_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetBlockRestartInterval(n int) { + C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n)) +} + +func (o *Options) SetCompression(t CompressionOpt) { + C.leveldb_options_set_compression(o.Opt, C.int(t)) +} + +func (o *Options) SetCreateIfMissing(b bool) { + C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b)) +} + +func (o *Options) SetFilterPolicy(fp *FilterPolicy) { + var policy *C.leveldb_filterpolicy_t + if fp != nil { + policy = fp.Policy + } + C.leveldb_options_set_filter_policy(o.Opt, policy) +} + +func (ro *ReadOptions) Close() { + C.leveldb_readoptions_destroy(ro.Opt) +} + +func (ro *ReadOptions) SetVerifyChecksums(b bool) { + C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b)) +} + +func (ro *ReadOptions) SetFillCache(b bool) { + C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) +} + +func (wo *WriteOptions) Close() { + C.leveldb_writeoptions_destroy(wo.Opt) +} + +func (wo *WriteOptions) SetSync(b bool) { + C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) +} diff --git a/store/hyperleveldb/util.go b/store/hyperleveldb/util.go new file mode 100644 index 0000000..5008e80 --- /dev/null +++ b/store/hyperleveldb/util.go @@ -0,0 +1,44 @@ +// +build hyperleveldb + +package hyperleveldb + +// #include "hyperleveldb/c.h" +import "C" +import ( + "fmt" + "reflect" + "unsafe" +) + +func boolToUchar(b bool) C.uchar { + uc := C.uchar(0) + if b { + uc = C.uchar(1) + } + return uc +} + +func ucharToBool(uc C.uchar) bool { + if uc == C.uchar(0) { + return false + } + return true +} + +func saveError(errStr *C.char) error { + if errStr != nil { + gs := C.GoString(errStr) + C.leveldb_free(unsafe.Pointer(errStr)) + return fmt.Errorf(gs) + } + return nil +} + +func slice(p unsafe.Pointer, n int) []byte { + var b []byte + pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pbyte.Data = uintptr(p) + pbyte.Len = n + pbyte.Cap = n + return b +} diff --git a/store/hyperleveldb_test.go b/store/hyperleveldb_test.go new file mode 100644 index 0000000..51bef8f --- /dev/null +++ b/store/hyperleveldb_test.go @@ -0,0 +1,31 @@ +package store + +import ( + "github.com/siddontang/ledisdb/config" + "os" + "testing" +) + +func newTestHyperLevelDB() *DB { + cfg := new(config.Config) + cfg.DBName = "hyperleveldb" + cfg.DataDir = "/tmp/testdb" + + os.RemoveAll(getStorePath(cfg)) + + db, err := Open(cfg) + if err != nil { + println(err.Error()) + panic(err) + } + + return db +} + +func TestHyperLevelDB(t *testing.T) { + db := newTestHyperLevelDB() + + testStore(db, t) + + db.Close() +}