Merge branch 'develop'

This commit is contained in:
siddontang 2014-10-16 11:16:21 +08:00
commit 014cf80f89
28 changed files with 124 additions and 940 deletions

View File

@ -8,14 +8,13 @@ LedisDB now supports multiple databases as backend to store data, you can test a
+ Rich data structure: KV, List, Hash, ZSet, Bitmap, Set.
+ Stores lots of data, over the memory limit.
+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, HyperLevelDB, Memory.
+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, Memory.
+ Supports transaction using LMDB or BotlDB.
+ Supports lua scripting.
+ Supports expiration and ttl.
+ Supports using redis-cli directly.
+ Multiple client API support, including Go, Python, Lua(Openresty), C/C++, Node.js.
+ Easy to embed in your own Go application.
+ Restful API support, json/bson/msgpack output.
+ HTTP API support, json/bson/msgpack output.
+ Replication to guarantee data safe.
+ Supplies tools to load, dump, repair database.
@ -54,6 +53,8 @@ LedisDB supports building with [godep](https://github.com/tools/godep) which can
LedisDB use the modified LevelDB for better performance, see [here](https://github.com/siddontang/ledisdb/wiki/leveldb-source-modification).
You can use other LevelDB (like Hyper LevelDB, Basho LevelDB) instead easily, the only thing you may pay attention to is that the header files must be in `include/leveldb` folder not `include/hyperleveldb` or other.
+ Set `LEVELDB_DIR` and `SNAPPY_DIR` to the actual install path in dev.sh.
+ `make clean && make`
@ -70,19 +71,9 @@ LedisDB supports building with [godep](https://github.com/tools/godep) which can
Because RocksDB API may change sometimes, LedisDB may not build successfully. Now LedisDB supports RocksDB version 3.5 or newest master branch.
## HyperLevelDB support
+ [Install hyperleveldb](https://github.com/rescrv/HyperLevelDB/blob/master/README) and snappy first.
LedisDB has not supplied a simple script to install, maybe later.
+ Set `HYPERLEVELDB` and `SNAPPY_DIR` to the actual install path in `dev.sh`.
+ `make clean && make`
## Choose store database
LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, hyperleveldb, memory. it will use goleveldb as default to store data if you don't set.
LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, memory. it will use goleveldb as default to store data if you don't set.
Choosing a store database to use is very simple, you have two ways:

View File

@ -1,4 +1,4 @@
dbs=(leveldb rocksdb hyperleveldb goleveldb boltdb lmdb)
dbs=(leveldb rocksdb goleveldb boltdb lmdb)
for db in "${dbs[@]}"
do
killall ledis-server

View File

@ -10,7 +10,7 @@ from ledis._compat import b
from ledis import ResponseError
l = ledis.Ledis(port=6380)
dbs = ["leveldb", "rocksdb", "goleveldb", "hyperleveldb", "lmdb", "boltdb"]
dbs = ["leveldb", "rocksdb", "goleveldb", "lmdb", "boltdb"]
class TestOtherCommands(unittest.TestCase):
def setUp(self):

View File

@ -7,7 +7,7 @@ import ledis
global_l = ledis.Ledis()
#db that do not support transaction
dbs = ["leveldb", "rocksdb", "hyperleveldb", "goleveldb"]
dbs = ["leveldb", "rocksdb", "goleveldb"]
check = global_l.info().get("db_name") in dbs

View File

@ -63,8 +63,8 @@ type Config struct {
DataDir string `toml:"data_dir"`
DBName string `toml:"db_name"`
DBPath string `toml:"db_path"`
DBSyncCommit int `toml:"db_sync_commit"`
LevelDB LevelDBConfig `toml:"leveldb"`

View File

@ -27,7 +27,6 @@ readonly = false
# goleveldb
# lmdb
# boltdb
# hyperleveldb
# memory
#
db_name = "leveldb"
@ -35,6 +34,12 @@ db_name = "leveldb"
# If not set, use data_dir/"db_name"_data
db_path = ""
# Sync commit to disk if possible
# 0: no sync
# 1: sync every second
# 2: sync every commit
db_sync_commit = 0
# enable replication or not
use_replication = true

10
dev.sh
View File

@ -13,7 +13,6 @@ fi
SNAPPY_DIR=/usr/local/snappy
LEVELDB_DIR=/usr/local/leveldb
ROCKSDB_DIR=/usr/local/rocksdb
HYPERLEVELDB_DIR=/usr/local/hyperleveldb
LUA_DIR=/usr/local/lua
function add_path()
@ -65,15 +64,6 @@ if [ -f $ROCKSDB_DIR/include/rocksdb/c.h ]; then
GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb"
fi
#check hyperleveldb
if [ -f $HYPERLEVELDB_DIR/include/hyperleveldb/c.h ]; then
CGO_CFLAGS="$CGO_CFLAGS -I$HYPERLEVELDB_DIR/include"
CGO_CXXFLAGS="$CGO_CXXFLAGS -I$HYPERLEVELDB_DIR/include"
CGO_LDFLAGS="$CGO_LDFLAGS -L$HYPERLEVELDB_DIR/lib -lhyperleveldb"
LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib)
DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib)
GO_BUILD_TAGS="$GO_BUILD_TAGS hyperleveldb"
fi
#check lua
if [ -f $LUA_DIR/include/lua.h ]; then

View File

@ -27,7 +27,6 @@ readonly = false
# goleveldb
# lmdb
# boltdb
# hyperleveldb
# memory
#
db_name = "leveldb"
@ -35,6 +34,12 @@ db_name = "leveldb"
# If not set, use data_dir/"db_name"_data
db_path = ""
# Sync commit to disk if possible
# 0: no sync
# 1: sync every second
# 2: sync every commit
db_sync_commit = 0
# enable replication or not
use_replication = true
@ -81,7 +86,7 @@ compression = true
[snapshot]
# Path to store snapshot dump file
# if not set, use data_dir/snapshot
# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp
# snapshot file name format is dmp-2006-01-02T15:04:05.999999999
path = ""
# Reserve newest max_num snapshot dump files

View File

@ -21,8 +21,6 @@ type GoLevelDBStore struct {
first uint64
last uint64
lastCommit time.Time
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
@ -134,17 +132,7 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
w.Put(key, buf.Bytes())
}
n := time.Now()
if s.cfg.Replication.SyncLog == 2 ||
(s.cfg.Replication.SyncLog == 1 && n.Sub(s.lastCommit) > time.Second) {
err = w.SyncCommit()
} else {
err = w.Commit()
}
s.lastCommit = n
if err != nil {
if err = w.Commit(); err != nil {
return err
}
@ -268,7 +256,7 @@ func (s *GoLevelDBStore) open() error {
return err
}
func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
func NewGoLevelDBStore(base string, syncLog int) (*GoLevelDBStore, error) {
cfg := config.NewConfigDefault()
cfg.DBName = "goleveldb"
cfg.DBPath = base
@ -276,6 +264,7 @@ func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
cfg.LevelDB.CacheSize = 64 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024
cfg.LevelDB.Compression = false
cfg.DBSyncCommit = syncLog
s := new(GoLevelDBStore)
s.cfg = cfg

View File

@ -49,7 +49,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
r.cfg = cfg
var err error
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal")); err != nil {
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err
}

View File

@ -16,7 +16,7 @@ func TestGoLevelDBStore(t *testing.T) {
defer os.RemoveAll(dir)
// New level
l, err := NewGoLevelDBStore(dir)
l, err := NewGoLevelDBStore(dir, 0)
if err != nil {
t.Fatalf("err: %v ", err)
}

View File

@ -3,7 +3,6 @@ package server
import (
"bytes"
"fmt"
"github.com/siddontang/ledisdb/config"
"os"
"runtime"
"strings"
@ -25,10 +24,6 @@ type info struct {
ConnectedClients int64
}
Persistence struct {
DBName string
}
Replication struct {
PubLogNum int64
PubLogTotalTime int64 //milliseconds
@ -43,12 +38,6 @@ func newInfo(app *App) (i *info, err error) {
i.Server.OS = runtime.GOOS
i.Server.ProceessId = os.Getpid()
if app.cfg.DBName != "" {
i.Persistence.DBName = app.cfg.DBName
} else {
i.Persistence.DBName = config.DefaultDBName
}
return i, nil
}
@ -144,7 +133,7 @@ func (i *info) dumpStore(buf *bytes.Buffer) {
s := i.app.ldb.StoreStat()
i.dumpPairs(buf, infoPair{"name", i.Persistence.DBName},
i.dumpPairs(buf, infoPair{"name", i.app.cfg.DBName},
infoPair{"get", s.GetNum},
infoPair{"get_missing", s.GetMissingNum},
infoPair{"put", s.PutNum},

View File

@ -1,15 +1,27 @@
package store
import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"sync"
"time"
)
type DB struct {
driver.IDB
db driver.IDB
name string
st *Stat
cfg *config.Config
lastCommit time.Time
m sync.Mutex
}
func (db *DB) Close() error {
return db.db.Close()
}
func (db *DB) String() string {
@ -20,43 +32,46 @@ func (db *DB) NewIterator() *Iterator {
db.st.IterNum.Add(1)
it := new(Iterator)
it.it = db.IDB.NewIterator()
it.it = db.db.NewIterator()
it.st = db.st
return it
}
func (db *DB) Get(key []byte) ([]byte, error) {
v, err := db.IDB.Get(key)
v, err := db.db.Get(key)
db.st.statGet(v, err)
return v, err
}
func (db *DB) Put(key []byte, value []byte) error {
db.st.PutNum.Add(1)
return db.IDB.Put(key, value)
if db.needSyncCommit() {
return db.db.SyncPut(key, value)
} else {
return db.db.Put(key, value)
}
}
func (db *DB) Delete(key []byte) error {
db.st.DeleteNum.Add(1)
return db.IDB.Delete(key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
db.st.SyncPutNum.Add(1)
return db.IDB.SyncPut(key, value)
if db.needSyncCommit() {
return db.db.SyncDelete(key)
} else {
return db.db.Delete(key)
}
func (db *DB) SyncDelete(key []byte) error {
db.st.SyncDeleteNum.Add(1)
return db.IDB.SyncDelete(key)
}
func (db *DB) NewWriteBatch() *WriteBatch {
db.st.BatchNum.Add(1)
wb := new(WriteBatch)
wb.IWriteBatch = db.IDB.NewWriteBatch()
wb.wb = db.db.NewWriteBatch()
wb.st = db.st
wb.db = db
return wb
}
@ -65,7 +80,7 @@ func (db *DB) NewSnapshot() (*Snapshot, error) {
var err error
s := &Snapshot{}
if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil {
if s.ISnapshot, err = db.db.NewSnapshot(); err != nil {
return nil, err
}
s.st = db.st
@ -77,7 +92,7 @@ func (db *DB) Compact() error {
db.st.CompactNum.Add(1)
t := time.Now()
err := db.IDB.Compact()
err := db.db.Compact()
db.st.CompactTotalTime.Add(time.Now().Sub(t))
@ -107,7 +122,7 @@ func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, off
}
func (db *DB) Begin() (*Tx, error) {
tx, err := db.IDB.Begin()
tx, err := db.db.Begin()
if err != nil {
return nil, err
}
@ -120,3 +135,24 @@ func (db *DB) Begin() (*Tx, error) {
func (db *DB) Stat() *Stat {
return db.st
}
func (db *DB) needSyncCommit() bool {
if db.cfg.DBSyncCommit == 0 {
return false
} else if db.cfg.DBSyncCommit == 2 {
return true
} else {
n := time.Now()
need := false
db.m.Lock()
if n.Sub(db.lastCommit) > time.Second {
need = true
}
db.lastCommit = n
db.m.Unlock()
return need
}
}

View File

@ -1,65 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include "hyperleveldb/c.h"
import "C"
import (
"unsafe"
)
type WriteBatch struct {
db *DB
wbatch *C.leveldb_writebatch_t
}
func (w *WriteBatch) Close() error {
C.leveldb_writebatch_destroy(w.wbatch)
w.wbatch = nil
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv))
}
func (w *WriteBatch) Delete(key []byte) {
C.leveldb_writebatch_delete(w.wbatch,
(*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}
func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil
}
func (w *WriteBatch) commit(wb *WriteOptions) error {
var errStr *C.char
C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}

View File

@ -1,20 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include <stdint.h>
// #include "hyperleveldb/c.h"
import "C"
type Cache struct {
Cache *C.leveldb_cache_t
}
func NewLRUCache(capacity int) *Cache {
return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))}
}
func (c *Cache) Close() {
C.leveldb_cache_destroy(c.Cache)
}

View File

@ -1,3 +0,0 @@
package hyperleveldb
const DBName = "hyperleveldb"

View File

@ -1,291 +0,0 @@
// +build hyperleveldb
// Package hyperleveldb is a wrapper for c++ hyperleveldb
package hyperleveldb
/*
#cgo LDFLAGS: -lhyperleveldb
#include <hyperleveldb/c.h>
#include "hyperleveldb_ext.h"
*/
import "C"
import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
const defaultFilterBits int = 10
type Store struct {
}
func (s Store) String() string {
return DBName
}
func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
db := new(DB)
db.path = path
db.cfg = &cfg.LevelDB
if err := db.open(); err != nil {
return nil, err
}
return db, nil
}
func (s Store) Repair(path string, cfg *config.Config) error {
db := new(DB)
db.cfg = &cfg.LevelDB
db.path = path
err := db.open()
defer db.Close()
//open ok, do not need repair
if err == nil {
return nil
}
var errStr *C.char
ldbname := C.CString(path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
C.leveldb_repair_db(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
type DB struct {
path string
cfg *config.LevelDBConfig
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
}
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func (db *DB) initOptions(cfg *config.LevelDBConfig) {
opts := NewOptions()
opts.SetCreateIfMissing(true)
db.cache = NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
//we must use bloomfilter
db.filter = NewBloomFilter(defaultFilterBits)
opts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
}
opts.SetBlockSize(cfg.BlockSize)
opts.SetWriteBufferSize(cfg.WriteBufferSize)
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
db.opts = opts
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
func (db *DB) Close() error {
if db.db != nil {
C.leveldb_close(db.db)
db.db = nil
}
db.opts.Close()
if db.cache != nil {
db.cache.Close()
}
if db.filter != nil {
db.filter.Close()
}
db.readOpts.Close()
db.writeOpts.Close()
db.iteratorOpts.Close()
return nil
}
func (db *DB) Put(key, value []byte) error {
return db.put(db.writeOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) {
return db.get(db.readOpts, key)
}
func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: C.leveldb_writebatch_create(),
}
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb
}
func (db *DB) NewIterator() driver.IIterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt)
return it
}
func (db *DB) NewSnapshot() (driver.ISnapshot, error) {
snap := &Snapshot{
db: db,
snap: C.leveldb_create_snapshot(db.db),
readOpts: NewReadOptions(),
iteratorOpts: NewReadOptions(),
}
snap.readOpts.SetSnapshot(snap)
snap.iteratorOpts.SetSnapshot(snap)
snap.iteratorOpts.SetFillCache(false)
return snap, nil
}
func (db *DB) put(wo *WriteOptions, key, value []byte) error {
var errStr *C.char
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.leveldb_put(
db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) {
var errStr *C.char
var vallen C.size_t
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
value := C.leveldb_get(
db.db, ro.Opt, k, C.size_t(len(key)), &vallen, &errStr)
if errStr != nil {
return nil, saveError(errStr)
}
if value == nil {
return nil, nil
}
defer C.leveldb_free(unsafe.Pointer(value))
return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil
}
func (db *DB) delete(wo *WriteOptions, key []byte) error {
var errStr *C.char
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
C.leveldb_delete(
db.db, wo.Opt, k, C.size_t(len(key)), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) Begin() (driver.Tx, error) {
return nil, driver.ErrTxSupport
}
func (db *DB) Compact() error {
C.leveldb_compact_range(db.db, nil, 0, nil, 0)
return nil
}
func init() {
driver.Register(Store{})
}

View File

@ -1,21 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include <stdlib.h>
// #include "hyperleveldb/c.h"
import "C"
type FilterPolicy struct {
Policy *C.leveldb_filterpolicy_t
}
func NewBloomFilter(bitsPerKey int) *FilterPolicy {
policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey))
return &FilterPolicy{policy}
}
func (fp *FilterPolicy) Close() {
C.leveldb_filterpolicy_destroy(fp.Policy)
}

View File

@ -1,88 +0,0 @@
// +build hyperleveldb
#include "hyperleveldb_ext.h"
#include <stdlib.h>
//#include <string>
//#include "hyperleveldb/db.h"
//using namespace leveldb;
extern "C" {
// static bool SaveError(char** errptr, const Status& s) {
// assert(errptr != NULL);
// if (s.ok()) {
// return false;
// } else if (*errptr == NULL) {
// *errptr = strdup(s.ToString().c_str());
// } else {
// free(*errptr);
// *errptr = strdup(s.ToString().c_str());
// }
// return true;
// }
// void* hyperleveldb_get_ext(
// leveldb_t* db,
// const leveldb_readoptions_t* options,
// const char* key, size_t keylen,
// char** valptr,
// size_t* vallen,
// char** errptr) {
// std::string *tmp = new(std::string);
// //very tricky, maybe changed with c++ leveldb upgrade
// Status s = (*(DB**)db)->Get(*(ReadOptions*)options, Slice(key, keylen), tmp);
// if (s.ok()) {
// *valptr = (char*)tmp->data();
// *vallen = tmp->size();
// } else {
// delete(tmp);
// tmp = NULL;
// *valptr = NULL;
// *vallen = 0;
// if (!s.IsNotFound()) {
// SaveError(errptr, s);
// }
// }
// return tmp;
// }
// void hyperleveldb_get_free_ext(void* context) {
// std::string* s = (std::string*)context;
// delete(s);
// }
unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t* iter) {
leveldb_iter_seek_to_first(iter);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t* iter) {
leveldb_iter_seek_to_last(iter);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t* iter, const char* k, size_t klen) {
leveldb_iter_seek(iter, k, klen);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t* iter) {
leveldb_iter_next(iter);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t* iter) {
leveldb_iter_prev(iter);
return leveldb_iter_valid(iter);
}
}

View File

@ -1,40 +0,0 @@
// +build hyperleveldb
#ifndef HYPERLEVELDB_EXT_H
#define HYPERLEVELDB_EXT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "hyperleveldb/c.h"
// /* Returns NULL if not found. Otherwise stores the value in **valptr.
// Stores the length of the value in *vallen.
// Returns a context must be later to free*/
// extern void* hyperleveldb_get_ext(
// leveldb_t* db,
// const leveldb_readoptions_t* options,
// const char* key, size_t keylen,
// char** valptr,
// size_t* vallen,
// char** errptr);
// // Free context returns by hyperleveldb_get_ext
// extern void hyperleveldb_get_free_ext(void* context);
// Below iterator functions like leveldb iterator but returns valid status for iterator
extern unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t*);
extern unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t*);
extern unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, size_t klen);
extern unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t*);
extern unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t*);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,70 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include <stdlib.h>
// #include "hyperleveldb/c.h"
// #include "hyperleveldb_ext.h"
import "C"
import (
"unsafe"
)
type Iterator struct {
it *C.leveldb_iterator_t
isValid C.uchar
}
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
}
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
}
func (it *Iterator) Close() error {
if it.it != nil {
C.leveldb_iter_destroy(it.it)
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid)
}
func (it *Iterator) Next() {
it.isValid = C.hyperleveldb_iter_next_ext(it.it)
}
func (it *Iterator) Prev() {
it.isValid = C.hyperleveldb_iter_prev_ext(it.it)
}
func (it *Iterator) First() {
it.isValid = C.hyperleveldb_iter_seek_to_first_ext(it.it)
}
func (it *Iterator) Last() {
it.isValid = C.hyperleveldb_iter_seek_to_last_ext(it.it)
}
func (it *Iterator) Seek(key []byte) {
it.isValid = C.hyperleveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}

View File

@ -1,122 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include "hyperleveldb/c.h"
import "C"
type CompressionOpt int
const (
NoCompression = CompressionOpt(0)
SnappyCompression = CompressionOpt(1)
)
type Options struct {
Opt *C.leveldb_options_t
}
type ReadOptions struct {
Opt *C.leveldb_readoptions_t
}
type WriteOptions struct {
Opt *C.leveldb_writeoptions_t
}
func NewOptions() *Options {
opt := C.leveldb_options_create()
return &Options{opt}
}
func NewReadOptions() *ReadOptions {
opt := C.leveldb_readoptions_create()
return &ReadOptions{opt}
}
func NewWriteOptions() *WriteOptions {
opt := C.leveldb_writeoptions_create()
return &WriteOptions{opt}
}
func (o *Options) Close() {
C.leveldb_options_destroy(o.Opt)
}
func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) {
C.leveldb_options_set_comparator(o.Opt, cmp)
}
func (o *Options) SetErrorIfExists(error_if_exists bool) {
eie := boolToUchar(error_if_exists)
C.leveldb_options_set_error_if_exists(o.Opt, eie)
}
func (o *Options) SetCache(cache *Cache) {
C.leveldb_options_set_cache(o.Opt, cache.Cache)
}
func (o *Options) SetWriteBufferSize(s int) {
C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s))
}
func (o *Options) SetParanoidChecks(pc bool) {
C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc))
}
func (o *Options) SetMaxOpenFiles(n int) {
C.leveldb_options_set_max_open_files(o.Opt, C.int(n))
}
func (o *Options) SetBlockSize(s int) {
C.leveldb_options_set_block_size(o.Opt, C.size_t(s))
}
func (o *Options) SetBlockRestartInterval(n int) {
C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n))
}
func (o *Options) SetCompression(t CompressionOpt) {
C.leveldb_options_set_compression(o.Opt, C.int(t))
}
func (o *Options) SetCreateIfMissing(b bool) {
C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b))
}
func (o *Options) SetFilterPolicy(fp *FilterPolicy) {
var policy *C.leveldb_filterpolicy_t
if fp != nil {
policy = fp.Policy
}
C.leveldb_options_set_filter_policy(o.Opt, policy)
}
func (ro *ReadOptions) Close() {
C.leveldb_readoptions_destroy(ro.Opt)
}
func (ro *ReadOptions) SetVerifyChecksums(b bool) {
C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetFillCache(b bool) {
C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetSnapshot(snap *Snapshot) {
var s *C.leveldb_snapshot_t
if snap != nil {
s = snap.snap
}
C.leveldb_readoptions_set_snapshot(ro.Opt, s)
}
func (wo *WriteOptions) Close() {
C.leveldb_writeoptions_destroy(wo.Opt)
}
func (wo *WriteOptions) SetSync(b bool) {
C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b))
}

View File

@ -1,35 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include "hyperleveldb/c.h"
import "C"
import (
"github.com/siddontang/ledisdb/store/driver"
)
type Snapshot struct {
db *DB
snap *C.leveldb_snapshot_t
readOpts *ReadOptions
iteratorOpts *ReadOptions
}
func (s *Snapshot) Get(key []byte) ([]byte, error) {
return s.db.get(s.readOpts, key)
}
func (s *Snapshot) NewIterator() driver.IIterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(s.db.db, s.db.iteratorOpts.Opt)
return it
}
func (s *Snapshot) Close() {
C.leveldb_release_snapshot(s.db.db, s.snap)
s.iteratorOpts.Close()
s.readOpts.Close()
}

View File

@ -1,44 +0,0 @@
// +build hyperleveldb
package hyperleveldb
// #include "hyperleveldb/c.h"
import "C"
import (
"fmt"
"reflect"
"unsafe"
)
func boolToUchar(b bool) C.uchar {
uc := C.uchar(0)
if b {
uc = C.uchar(1)
}
return uc
}
func ucharToBool(uc C.uchar) bool {
if uc == C.uchar(0) {
return false
}
return true
}
func saveError(errStr *C.char) error {
if errStr != nil {
gs := C.GoString(errStr)
C.leveldb_free(unsafe.Pointer(errStr))
return fmt.Errorf(gs)
}
return nil
}
func slice(p unsafe.Pointer, n int) []byte {
var b []byte
pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pbyte.Data = uintptr(p)
pbyte.Len = n
pbyte.Cap = n
return b
}

View File

@ -9,8 +9,6 @@ type Stat struct {
GetMissingNum sync2.AtomicInt64
PutNum sync2.AtomicInt64
DeleteNum sync2.AtomicInt64
SyncPutNum sync2.AtomicInt64
SyncDeleteNum sync2.AtomicInt64
IterNum sync2.AtomicInt64
IterSeekNum sync2.AtomicInt64
IterCloseNum sync2.AtomicInt64
@ -18,7 +16,6 @@ type Stat struct {
SnapshotCloseNum sync2.AtomicInt64
BatchNum sync2.AtomicInt64
BatchCommitNum sync2.AtomicInt64
BatchSyncCommitNum sync2.AtomicInt64
TxNum sync2.AtomicInt64
TxCommitNum sync2.AtomicInt64
TxCloseNum sync2.AtomicInt64
@ -35,23 +32,4 @@ func (st *Stat) statGet(v []byte, err error) {
func (st *Stat) Reset() {
*st = Stat{}
// st.GetNum.Set(0)
// st.GetMissingNum.Set(0)
// st.PutNum.Set(0)
// st.DeleteNum.Set(0)
// st.SyncPutNum.Set(0)
// st.SyncDeleteNum.Set(0)
// st.IterNum.Set(0)
// st.IterSeekNum.Set(0)
// st.IterCloseNum.Set(0)
// st.SnapshotNum.Set(0)
// st.SnapshotCloseNum.Set(0)
// st.BatchNum.Set(0)
// st.BatchCommitNum.Set(0)
// st.BatchSyncCommitNum.Set(0)
// st.TxNum.Set(0)
// st.TxCommitNum.Set(0)
// st.TxCloseNum.Set(0)
// st.CompactNum.Set(0)
// st.CompactTotalTime.Set(0)
}

View File

@ -9,7 +9,6 @@ import (
_ "github.com/siddontang/ledisdb/store/boltdb"
_ "github.com/siddontang/ledisdb/store/goleveldb"
_ "github.com/siddontang/ledisdb/store/hyperleveldb"
_ "github.com/siddontang/ledisdb/store/leveldb"
_ "github.com/siddontang/ledisdb/store/mdb"
_ "github.com/siddontang/ledisdb/store/rocksdb"
@ -40,7 +39,11 @@ func Open(cfg *config.Config) (*DB, error) {
return nil, err
}
db := &DB{idb, s.String(), &Stat{}}
db := new(DB)
db.db = idb
db.name = s.String()
db.st = &Stat{}
db.cfg = cfg
return db, nil
}

View File

@ -5,13 +5,13 @@ import (
)
type Tx struct {
driver.Tx
tx driver.Tx
st *Stat
}
func (tx *Tx) NewIterator() *Iterator {
it := new(Iterator)
it.it = tx.Tx.NewIterator()
it.it = tx.tx.NewIterator()
it.st = tx.st
tx.st.IterNum.Add(1)
@ -23,7 +23,7 @@ func (tx *Tx) NewWriteBatch() *WriteBatch {
tx.st.BatchNum.Add(1)
wb := new(WriteBatch)
wb.IWriteBatch = tx.Tx.NewWriteBatch()
wb.wb = tx.tx.NewWriteBatch()
wb.st = tx.st
return wb
}
@ -51,26 +51,26 @@ func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, off
}
func (tx *Tx) Get(key []byte) ([]byte, error) {
v, err := tx.Tx.Get(key)
v, err := tx.tx.Get(key)
tx.st.statGet(v, err)
return v, err
}
func (tx *Tx) Put(key []byte, value []byte) error {
tx.st.PutNum.Add(1)
return tx.Tx.Put(key, value)
return tx.tx.Put(key, value)
}
func (tx *Tx) Delete(key []byte) error {
tx.st.DeleteNum.Add(1)
return tx.Tx.Delete(key)
return tx.tx.Delete(key)
}
func (tx *Tx) Commit() error {
tx.st.TxCommitNum.Add(1)
return tx.Tx.Commit()
return tx.tx.Commit()
}
func (tx *Tx) Rollback() error {
return tx.Tx.Rollback()
return tx.tx.Rollback()
}

View File

@ -5,20 +5,22 @@ import (
)
type WriteBatch struct {
driver.IWriteBatch
wb driver.IWriteBatch
st *Stat
putNum int64
deleteNum int64
db *DB
}
func (wb *WriteBatch) Put(key []byte, value []byte) {
wb.putNum++
wb.IWriteBatch.Put(key, value)
wb.wb.Put(key, value)
}
func (wb *WriteBatch) Delete(key []byte) {
wb.deleteNum++
wb.IWriteBatch.Delete(key)
wb.wb.Delete(key)
}
func (wb *WriteBatch) Commit() error {
@ -27,18 +29,13 @@ func (wb *WriteBatch) Commit() error {
wb.st.DeleteNum.Add(wb.deleteNum)
wb.putNum = 0
wb.deleteNum = 0
return wb.IWriteBatch.Commit()
if wb.db == nil || !wb.db.needSyncCommit() {
return wb.wb.Commit()
} else {
return wb.wb.SyncCommit()
}
func (wb *WriteBatch) SyncCommit() error {
wb.st.BatchSyncCommitNum.Add(1)
wb.st.SyncPutNum.Add(wb.putNum)
wb.st.SyncDeleteNum.Add(wb.deleteNum)
wb.putNum = 0
wb.deleteNum = 0
return wb.IWriteBatch.SyncCommit()
}
func (wb *WriteBatch) Rollback() error {
return wb.IWriteBatch.Rollback()
return wb.wb.Rollback()
}