diff --git a/.gitignore b/.gitignore index d1c3459..49a172d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build *.pyc .DS_Store -nohup.out \ No newline at end of file +nohup.out +build_config.mk \ No newline at end of file diff --git a/Makefile b/Makefile index 1dfe2f3..634de20 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,16 @@ +$(shell ./bootstrap.sh) + +$(shell ./build_config.sh build_config.mk ./) + +include build_config.mk + all: build build: - go install ./... + go install -tags '$(GO_BUILD_TAGS)' ./... clean: go clean -i ./... test: - go test ./... - go test -race ./... \ No newline at end of file + go test -tags $(GO_BUILD_TAGS) ./... diff --git a/README.md b/README.md index 96aacb5..5bd261b 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,14 @@ # LedisDB -Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis. +Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis. + +LedisDB now supports multi database as backend to store data, you can test and choose the proper one for you. ## Features + Rich advanced data structure: KV, List, Hash, ZSet, Bit. -+ Uses leveldb to store lots of data, over the memory limit. ++ Stores lots of data, over the memory limit. ++ Various backend database to use: LevelDB, goleveldb, LMDB. + Supports expiration and ttl. + Redis clients, like redis-cli, are supported directly. + Multi client API supports, including Golang, Python, Lua(Openresty). @@ -15,15 +18,19 @@ Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. I ## Build and Install -+ Create a workspace and checkout ledisdb source +Create a workspace and checkout ledisdb source - mkdir $WORKSPACE - cd $WORKSPACE - git clone git@github.com:siddontang/ledisdb.git src/github.com/siddontang/ledisdb + mkdir $WORKSPACE + cd $WORKSPACE + git clone git@github.com:siddontang/ledisdb.git src/github.com/siddontang/ledisdb - cd src/github.com/siddontang/ledisdb + cd src/github.com/siddontang/ledisdb -+ Install leveldb and snappy, if you have installed, skip. + make + +## LevelDB support + ++ Install leveldb and snappy. LedisDB supplies a simple shell to install leveldb and snappy: @@ -35,16 +42,43 @@ Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. I + Set LEVELDB_DIR and SNAPPY_DIR to the actual install path in dev.sh. -+ Then: ++ ```make``` - . ./bootstap.sh - . ./dev.sh +## RocksDB support - go install ./... ++ Install rocksdb and snappy first. + + LedisDB has not supplied a simple shell to install, maybe it will later. + ++ Set ROCKSDB_DIR and SNAPPY_DIR to the actual install path in dev.sh. + ++ ```make``` + +## Choose store database + +LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, it will choose goleveldb as default to store data if you not set. + +Choosing a store database to use is very simple, you have two ways: + ++ Set in server config file + + "db" : { + "name" : "leveldb" + } + ++ Set in command flag + + ledis-server -config=/etc/ledis.json -db_name=leveldb + + Flag command set will overwrite config set. + +**Caveat** + +You must known that changing store database runtime is very dangerous, LedisDB will not guarantee the data validation if you do it. ## Server Example - ./ledis-server -config=/etc/ledis.json + ledis-server -config=/etc/ledis.json //another shell ledis-cli -p 6380 diff --git a/bootstrap.sh b/bootstrap.sh old mode 100644 new mode 100755 index 0ba6b65..1cf49b6 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -4,4 +4,7 @@ go get github.com/siddontang/go-log/log go get github.com/siddontang/go-snappy/snappy -go get github.com/siddontang/copier \ No newline at end of file +go get github.com/siddontang/copier + +go get github.com/syndtr/goleveldb/leveldb +go get github.com/influxdb/gomdb \ No newline at end of file diff --git a/build_config.sh b/build_config.sh new file mode 100755 index 0000000..ce52efd --- /dev/null +++ b/build_config.sh @@ -0,0 +1,16 @@ +#!/bin/sh + +OUTPUT=$1 +PREFIX=$2 +if test -z "$OUTPUT" || test -z "$PREFIX"; then + echo "usage: $0 " >&2 + exit 1 +fi + +# Delete existing output, if it exists +rm -f $OUTPUT +touch $OUTPUT + +source ./dev.sh + +echo "GO_BUILD_TAGS=$GO_BUILD_TAGS" >> $OUTPUT \ No newline at end of file diff --git a/cmd/ledis-repair/main.go b/cmd/ledis-repair/main.go index a5b9d73..5a0aff4 100644 --- a/cmd/ledis-repair/main.go +++ b/cmd/ledis-repair/main.go @@ -4,7 +4,7 @@ import ( "encoding/json" "flag" "github.com/siddontang/ledisdb/ledis" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "io/ioutil" ) @@ -35,7 +35,7 @@ func main() { return } - if err = leveldb.Repair(cfg.NewDBConfig()); err != nil { + if err = store.Repair(cfg.NewDBConfig()); err != nil { println("repair error: ", err.Error()) } } diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index 84c4442..576ab7f 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -10,6 +10,7 @@ import ( ) var configFile = flag.String("config", "/etc/ledis.json", "ledisdb config file") +var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -27,6 +28,10 @@ func main() { return } + if len(*dbName) > 0 { + cfg.DB.Name = *dbName + } + var app *server.App app, err = server.NewApp(cfg) if err != nil { diff --git a/dev.sh b/dev.sh index c1c59f7..8bd378c 100644 --- a/dev.sh +++ b/dev.sh @@ -1,19 +1,18 @@ #!/bin/bash -export VTTOP=$(pwd) -export VTROOT="${VTROOT:-${VTTOP/\/src\/github.com\/siddontang\/ledisdb/}}" -# VTTOP sanity check -if [[ "$VTTOP" == "${VTTOP/\/src\/github.com\/siddontang\/ledisdb/}" ]]; then - echo "WARNING: VTTOP($VTTOP) does not contain src/github.com/siddontang/ledisdb" +export LEDISTOP=$(pwd) +export LEDISROOT="${LEDISROOT:-${LEDISTOP/\/src\/github.com\/siddontang\/ledisdb/}}" +# LEDISTOP sanity check +if [[ "$LEDISTOP" == "${LEDISTOP/\/src\/github.com\/siddontang\/ledisdb/}" ]]; then + echo "WARNING: LEDISTOP($LEDISTOP) does not contain src/github.com/siddontang/ledisdb" exit 1 fi - #default snappy and leveldb install path #you may change yourself - SNAPPY_DIR=/usr/local/snappy LEVELDB_DIR=/usr/local/leveldb +ROCKSDB_DIR=/usr/local/rocksdb function add_path() { @@ -26,16 +25,45 @@ function add_path() fi } -export GOPATH=$(add_path $GOPATH $VTROOT) +export GOPATH=$(add_path $GOPATH $LEDISROOT) -export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" -export CGO_CXXFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" -export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy" +GO_BUILD_TAGS= +CGO_CFLAGS= +CGO_CXXFLAGS= +CGO_LDFLAGS= -#for linux, use LD_LIBRARY_PATH -export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib) -export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib) +# check snappy +if [ -f $SNAPPY_DIR/lib/libsnappy.a ]; then + CGO_CFLAGS="$CGO_CFLAGS -I$SNAPPY_DIR/include" + CGO_CXXFLAGS="$CGO_CXXFLAGS -I$SNAPPY_DIR/include" + CGO_LDFLAGS="$CGO_LDFLAGS -L$SNAPPY_DIR/lib -lsnappy" + LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib) + DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $SNAPPY_DIR/lib) +fi -#for macos, use DYLD_LIBRARY_PATH -export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $SNAPPY_DIR/lib) -export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $LEVELDB_DIR/lib) +# check leveldb +if [ -f $LEVELDB_DIR/lib/libleveldb.a ]; then + CGO_CFLAGS="$CGO_CFLAGS -I$LEVELDB_DIR/include" + CGO_CXXFLAGS="$CGO_CXXFLAGS -I$LEVELDB_DIR/include" + CGO_LDFLAGS="$CGO_LDFLAGS -L$LEVELDB_DIR/lib -lleveldb" + LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib) + DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $LEVELDB_DIR/lib) + GO_BUILD_TAGS="$GO_BUILD_TAGS leveldb" +fi + +# check rocksdb +if [ -f $ROCKSDB_DIR/lib/librocksdb.a ]; then + CGO_CFLAGS="$CGO_CFLAGS -I$ROCKSDB_DIR/include" + CGO_CXXFLAGS="$CGO_CXXFLAGS -I$ROCKSDB_DIR/include" + CGO_LDFLAGS="$CGO_LDFLAGS -L$ROCKSDB_DIR/lib -lrocksdb" + LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $ROCKSDB_DIR/lib) + DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $ROCKSDB_DIR/lib) + GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb" +fi + +export CGO_CFLAGS +export CGO_CXXFLAGS +export CGO_LDFLAGS +export LD_LIBRARY_PATH +export DYLD_LIBRARY_PATH +export GO_BUILD_TAGS diff --git a/etc/ledis.json b/etc/ledis.json index 1631303..8303f5e 100644 --- a/etc/ledis.json +++ b/etc/ledis.json @@ -1,12 +1,16 @@ { "addr": "127.0.0.1:6380", "data_dir": "/tmp/ledis_server", + "db": { + "name" : "leveldb", + "compression": false, "block_size": 32768, "write_buffer_size": 67108864, "cache_size": 524288000, - "max_open_files":1024 + "max_open_files":1024, + "map_size" : 524288000 }, "access_log" : "access.log" diff --git a/ledis/config.go b/ledis/config.go index c3a5b84..a81fda7 100644 --- a/ledis/config.go +++ b/ledis/config.go @@ -1,20 +1,24 @@ package ledis import ( + "fmt" "github.com/siddontang/copier" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "path" + "strings" ) type Config struct { DataDir string `json:"data_dir"` DB struct { - Compression bool `json:"compression"` - BlockSize int `json:"block_size"` - WriteBufferSize int `json:"write_buffer_size"` - CacheSize int `json:"cache_size"` - MaxOpenFiles int `json:"max_open_files"` + Name string `json:"name"` + Compression bool `json:"compression"` + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` + MaxOpenFiles int `json:"max_open_files"` + MapSize int `json:"map_size"` } `json:"db"` BinLog struct { @@ -24,11 +28,19 @@ type Config struct { } `json:"binlog"` } -func (cfg *Config) NewDBConfig() *leveldb.Config { - dbPath := path.Join(cfg.DataDir, "data") +func (cfg *Config) NewDBConfig() *store.Config { + if len(cfg.DB.Name) == 0 { + fmt.Printf("no store set, use default %s\n", store.DefaultStoreName) + cfg.DB.Name = store.DefaultStoreName + } - dbCfg := new(leveldb.Config) + cfg.DB.Name = strings.ToLower(cfg.DB.Name) + + dbCfg := new(store.Config) copier.Copy(dbCfg, &cfg.DB) + + dbPath := path.Join(cfg.DataDir, fmt.Sprintf("%s_data", cfg.DB.Name)) + dbCfg.Path = dbPath return dbCfg } diff --git a/ledis/doc.go b/ledis/doc.go index f5c02f2..a31f76e 100644 --- a/ledis/doc.go +++ b/ledis/doc.go @@ -1,4 +1,5 @@ -// Package ledis is a high performance embedded NoSQL based on leveldb. +// Package ledis is a high performance embedded NoSQL. +// // Ledis supports various advanced data structure like kv, list, hash and zset like redis. // // Other features include binlog replication, data with a limited time-to-live. diff --git a/ledis/dump.go b/ledis/dump.go index 95232de..14d7ff7 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -5,7 +5,6 @@ import ( "bytes" "encoding/binary" "github.com/siddontang/go-snappy/snappy" - "github.com/siddontang/ledisdb/leveldb" "io" "os" ) @@ -57,16 +56,13 @@ func (l *Ledis) DumpFile(path string) error { } func (l *Ledis) Dump(w io.Writer) error { - var sp *leveldb.Snapshot var m *MasterInfo = new(MasterInfo) - if l.binlog == nil { - sp = l.ldb.NewSnapshot() - } else { - l.Lock() - sp = l.ldb.NewSnapshot() + l.Lock() + defer l.Unlock() + + if l.binlog != nil { m.LogFileIndex = l.binlog.LogFileIndex() m.LogPos = l.binlog.LogFilePos() - l.Unlock() } var err error @@ -76,7 +72,7 @@ func (l *Ledis) Dump(w io.Writer) error { return err } - it := sp.NewIterator() + it := l.ldb.NewIterator() it.SeekToFirst() compressBuf := make([]byte, 4096) diff --git a/ledis/dump_test.go b/ledis/dump_test.go index 8ca80b8..a22d72a 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -2,7 +2,7 @@ package ledis import ( "bytes" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "os" "testing" ) @@ -59,7 +59,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/ledis/ledis.go b/ledis/ledis.go index 9d5b9ac..8c3f175 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "sync" "time" ) @@ -12,7 +12,7 @@ import ( type DB struct { l *Ledis - db *leveldb.DB + db *store.DB index uint8 @@ -28,7 +28,7 @@ type Ledis struct { cfg *Config - ldb *leveldb.DB + ldb *store.DB dbs [MaxDBNumber]*DB binlog *BinLog @@ -52,7 +52,7 @@ func Open(cfg *Config) (*Ledis, error) { return nil, fmt.Errorf("must set correct data_dir") } - ldb, err := leveldb.Open(cfg.NewDBConfig()) + ldb, err := store.Open(cfg.NewDBConfig()) if err != nil { return nil, err } @@ -132,7 +132,7 @@ func (l *Ledis) FlushAll() error { } // very dangerous to use -func (l *Ledis) DataDB() *leveldb.DB { +func (l *Ledis) DataDB() *store.DB { return l.ldb } diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 83819ae..7d58054 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -1,7 +1,7 @@ package ledis import ( - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" ) func (db *DB) FlushAll() (drop int64, err error) { @@ -36,7 +36,7 @@ func (db *DB) newEliminator() *elimination { } func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) { - it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen) + it := db.db.RangeIterator(minKey, maxKey, store.RangeROpen) for ; it.Valid(); it.Next() { t.Delete(it.RawKey()) drop++ diff --git a/ledis/replication_test.go b/ledis/replication_test.go index b5a939b..ebc5c41 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -3,14 +3,14 @@ package ledis import ( "bytes" "fmt" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "os" "path" "testing" ) func checkLedisEqual(master *Ledis, slave *Ledis) error { - it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/ledis/t_bit.go b/ledis/t_bit.go index b93355d..37b788d 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "sort" "time" ) @@ -253,7 +253,7 @@ func (db *DB) bDelete(t *tx, key []byte) (drop int64) { minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, maxSeq) - it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + it := db.db.RangeIterator(minKey, maxKey, store.RangeClose) for ; it.Valid(); it.Next() { t.Delete(it.RawKey()) drop++ @@ -280,10 +280,10 @@ func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { return bk, segment, err } -func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { +func (db *DB) bIterator(key []byte) *store.RangeLimitIterator { sk := db.bEncodeBinKey(key, minSeq) ek := db.bEncodeBinKey(key, maxSeq) - return db.db.RangeIterator(sk, ek, leveldb.RangeClose) + return db.db.RangeIterator(sk, ek, store.RangeClose) } func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) { @@ -446,7 +446,7 @@ func (db *DB) BGet(key []byte) (data []byte, err error) { minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, tailSeq) - it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + it := db.db.RangeIterator(minKey, maxKey, store.RangeClose) var seq, s, e uint32 for ; it.Valid(); it.Next() { @@ -662,7 +662,7 @@ func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) skey := db.bEncodeBinKey(key, sseq) ekey := db.bEncodeBinKey(key, eseq) - it := db.db.RangeIterator(skey, ekey, leveldb.RangeOpen) + it := db.db.RangeIterator(skey, ekey, store.RangeOpen) for ; it.Valid(); it.Next() { segment = it.RawValue() for _, bt := range segment { diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 5f6be64..63c38da 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -132,7 +132,7 @@ func (db *DB) hDelete(t *tx, key []byte) int64 { stop := db.hEncodeStopKey(key) var num int64 = 0 - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) num++ @@ -354,7 +354,7 @@ func (db *DB) HGetAll(key []byte) ([]FVPair, error) { v := make([]FVPair, 0, 16) - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, f, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -379,7 +379,7 @@ func (db *DB) HKeys(key []byte) ([][]byte, error) { v := make([][]byte, 0, 16) - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, f, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -403,7 +403,7 @@ func (db *DB) HValues(key []byte) ([][]byte, error) { v := make([][]byte, 0, 16) - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, _, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -491,9 +491,9 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa v := make([]FVPair, 0, count) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen if !inclusive { - rangeType = leveldb.RangeOpen + rangeType = store.RangeOpen } it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 136772b..f11f525 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -2,7 +2,7 @@ package ledis import ( "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -343,9 +343,9 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) { v := make([]KVPair, 0, 2*count) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen if !inclusive { - rangeType = leveldb.RangeOpen + rangeType = store.RangeOpen } it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) diff --git a/ledis/t_list.go b/ledis/t_list.go index a7ab1ac..079498e 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -203,7 +203,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { startKey := db.lEncodeListKey(key, headSeq) stopKey := db.lEncodeListKey(key, tailSeq) - rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose}) + rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose}) for ; rit.Valid(); rit.Next() { t.Delete(rit.RawKey()) num++ @@ -214,7 +214,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { return num } -func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { +func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { var v []byte if it != nil { v = it.Find(ek) @@ -364,12 +364,12 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { v := make([][]byte, 0, limit) startKey := db.lEncodeListKey(key, headSeq) - rit := leveldb.NewRangeLimitIterator(it, - &leveldb.Range{ + rit := store.NewRangeLimitIterator(it, + &store.Range{ Min: startKey, Max: nil, - Type: leveldb.RangeClose}, - &leveldb.Limit{ + Type: store.RangeClose}, + &store.Limit{ Offset: 0, Count: int(limit)}) diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index aa34b73..e41ed10 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -156,7 +156,7 @@ func (eli *elimination) active() { minKey := db.expEncodeTimeKey(NoneType, nil, 0) maxKey := db.expEncodeTimeKey(maxDataType, nil, now) - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { tk := it.RawKey() mk := it.RawValue() diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 4931ded..151f8eb 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -432,7 +432,7 @@ func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) { minKey := db.zEncodeStartScoreKey(key, min) maxKey := db.zEncodeStopScoreKey(key, max) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1) var n int64 = 0 @@ -460,17 +460,17 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { if s, err := Int64(v, nil); err != nil { return 0, err } else { - var rit *leveldb.RangeLimitIterator + var rit *store.RangeLimitIterator sk := db.zEncodeScoreKey(key, member, s) if !reverse { minKey := db.zEncodeStartScoreKey(key, MinScore) - rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose}) + rit = store.NewRangeIterator(it, &store.Range{minKey, sk, store.RangeClose}) } else { maxKey := db.zEncodeStopScoreKey(key, MaxScore) - rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose}) + rit = store.NewRevRangeIterator(it, &store.Range{sk, maxKey, store.RangeClose}) } var lastKey []byte = nil @@ -492,14 +492,14 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { return -1, nil } -func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *leveldb.RangeLimitIterator { +func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator { minKey := db.zEncodeStartScoreKey(key, min) maxKey := db.zEncodeStopScoreKey(key, max) if !reverse { - return db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count) + return db.db.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count) } else { - return db.db.RevRangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count) + return db.db.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count) } } @@ -550,10 +550,10 @@ func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, re v := make([]ScorePair, 0, nv) - var it *leveldb.RangeLimitIterator + var it *store.RangeLimitIterator //if reverse and offset is 0, count < 0, we may use forward iterator then reverse - //because leveldb iterator prev is slower than next + //because store iterator prev is slower than next if !reverse || (offset == 0 && count < 0) { it = db.zIterator(key, min, max, offset, count, false) } else { @@ -740,7 +740,7 @@ func (db *DB) zFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = ZScoreType + 1 - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) defer it.Close() for ; it.Valid(); it.Next() { @@ -779,9 +779,9 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco v := make([]ScorePair, 0, 2*count) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen if !inclusive { - rangeType = leveldb.RangeOpen + rangeType = store.RangeOpen } it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) diff --git a/ledis/tx.go b/ledis/tx.go index 0fe716a..f49cd34 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -1,7 +1,7 @@ package ledis import ( - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "sync" ) @@ -9,7 +9,7 @@ type tx struct { m sync.Mutex l *Ledis - wb *leveldb.WriteBatch + wb *store.WriteBatch binlog *BinLog batch [][]byte diff --git a/leveldb/snapshot.go b/leveldb/snapshot.go deleted file mode 100644 index 7d49317..0000000 --- a/leveldb/snapshot.go +++ /dev/null @@ -1,60 +0,0 @@ -package leveldb - -// #cgo LDFLAGS: -lleveldb -// #include -// #include "leveldb/c.h" -import "C" - -type Snapshot struct { - db *DB - - snap *C.leveldb_snapshot_t - - readOpts *ReadOptions - iteratorOpts *ReadOptions -} - -func (s *Snapshot) Close() { - C.leveldb_release_snapshot(s.db.db, s.snap) - - s.iteratorOpts.Close() - s.readOpts.Close() -} - -func (s *Snapshot) Get(key []byte) ([]byte, error) { - return s.db.get(nil, s.readOpts, key) -} - -func (s *Snapshot) BufGet(r []byte, key []byte) ([]byte, error) { - return s.db.get(r, s.readOpts, key) -} - -func (s *Snapshot) NewIterator() *Iterator { - it := new(Iterator) - - it.it = C.leveldb_create_iterator(s.db.db, s.iteratorOpts.Opt) - - return it -} - -func (s *Snapshot) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index dce4b6f..4b0111a 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -3,14 +3,14 @@ package server import ( "bytes" "fmt" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "os" "testing" "time" ) func checkDataEqual(master *App, slave *App) error { - it := master.ldb.DataDB().RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.DataDB().RangeLimitIterator(nil, nil, store.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/server/config.go b/server/config.go index 13ab286..024eeb9 100644 --- a/server/config.go +++ b/server/config.go @@ -13,11 +13,13 @@ type Config struct { DataDir string `json:"data_dir"` DB struct { - Compression bool `json:"compression"` - BlockSize int `json:"block_size"` - WriteBufferSize int `json:"write_buffer_size"` - CacheSize int `json:"cache_size"` - MaxOpenFiles int `json:"max_open_files"` + Name string `json:"name"` + Compression bool `json:"compression"` + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` + MaxOpenFiles int `json:"max_open_files"` + MapSize int `json:"map_size"` } `json:"db"` BinLog struct { diff --git a/store/config.go b/store/config.go new file mode 100644 index 0000000..16666a2 --- /dev/null +++ b/store/config.go @@ -0,0 +1,17 @@ +package store + +type Config struct { + Name string `json:"name"` + + Path string `json:"path"` + + //for leveldb, goleveldb + Compression bool `json:"compression"` + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` + MaxOpenFiles int `json:"max_open_files"` + + //for lmdb + MapSize int `json:"map_size"` +} diff --git a/store/db.go b/store/db.go new file mode 100644 index 0000000..adad004 --- /dev/null +++ b/store/db.go @@ -0,0 +1,75 @@ +package store + +import ( + "github.com/siddontang/ledisdb/store/driver" +) + +type DB struct { + db driver.IDB +} + +// Close database +// +// Caveat +// Any other DB operations like Get, Put, etc... may cause a panic after Close +// +func (db *DB) Close() error { + if db.db == nil { + return nil + } + + err := db.db.Close() + db.db = nil + + return err +} + +// Get Value with Key +func (db *DB) Get(key []byte) ([]byte, error) { + return db.db.Get(key) +} + +// Put value with key +func (db *DB) Put(key []byte, value []byte) error { + err := db.db.Put(key, value) + return err +} + +// Delete by key +func (db *DB) Delete(key []byte) error { + err := db.db.Delete(key) + return err +} + +func (db *DB) NewIterator() *Iterator { + it := new(Iterator) + it.it = db.db.NewIterator() + + return it +} + +func (db *DB) NewWriteBatch() *WriteBatch { + return &WriteBatch{db.db.NewWriteBatch()} +} + +func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} diff --git a/store/driver/driver.go b/store/driver/driver.go new file mode 100644 index 0000000..3be1c7e --- /dev/null +++ b/store/driver/driver.go @@ -0,0 +1,39 @@ +package driver + +type IDB interface { + Close() error + + Get(key []byte) ([]byte, error) + + Put(key []byte, value []byte) error + Delete(key []byte) error + + NewIterator() IIterator + + NewWriteBatch() IWriteBatch +} + +type IIterator interface { + Close() error + + First() + Last() + Seek(key []byte) + + Next() + Prev() + + Valid() bool + + Key() []byte + Value() []byte +} + +type IWriteBatch interface { + Close() error + + Put(key []byte, value []byte) + Delete(key []byte) + Commit() error + Rollback() error +} diff --git a/store/goleveldb.go b/store/goleveldb.go new file mode 100644 index 0000000..d9d7703 --- /dev/null +++ b/store/goleveldb.go @@ -0,0 +1,30 @@ +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/goleveldb" +) + +const GoLevelDBName = "goleveldb" + +type GoLevelDBStore struct { +} + +func (s GoLevelDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &goleveldb.Config{} + copier.Copy(c, cfg) + + return goleveldb.Open(c) +} + +func (s GoLevelDBStore) Repair(cfg *Config) error { + c := &goleveldb.Config{} + copier.Copy(c, cfg) + + return goleveldb.Repair(c) +} + +func init() { + Register(GoLevelDBName, GoLevelDBStore{}) +} diff --git a/store/goleveldb/batch.go b/store/goleveldb/batch.go new file mode 100644 index 0000000..f3ff67c --- /dev/null +++ b/store/goleveldb/batch.go @@ -0,0 +1,31 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +type WriteBatch struct { + db *DB + wbatch *leveldb.Batch +} + +func (w *WriteBatch) Close() error { + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + w.wbatch.Put(key, value) +} + +func (w *WriteBatch) Delete(key []byte) { + w.wbatch.Delete(key) +} + +func (w *WriteBatch) Commit() error { + return w.db.db.Write(w.wbatch, nil) +} + +func (w *WriteBatch) Rollback() error { + w.wbatch.Reset() + return nil +} diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go new file mode 100644 index 0000000..8ae1f73 --- /dev/null +++ b/store/goleveldb/db.go @@ -0,0 +1,137 @@ +package goleveldb + +import ( + "github.com/siddontang/ledisdb/store/driver" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/cache" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" + "os" +) + +const defaultFilterBits int = 10 + +type Config struct { + Path string `json:"path"` + + Compression bool `json:"compression"` + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` + MaxOpenFiles int `json:"max_open_files"` +} + +type DB struct { + cfg *Config + + db *leveldb.DB + + opts *opt.Options + + iteratorOpts *opt.ReadOptions + + cache cache.Cache + + filter filter.Filter +} + +func Open(cfg *Config) (*DB, error) { + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.cfg = cfg + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func Repair(cfg *Config) error { + db, err := leveldb.RecoverFile(cfg.Path, newOptions(cfg)) + if err != nil { + return err + } + + db.Close() + return nil +} + +func (db *DB) open() error { + db.opts = newOptions(db.cfg) + + db.iteratorOpts = &opt.ReadOptions{} + db.iteratorOpts.DontFillCache = true + + var err error + db.db, err = leveldb.OpenFile(db.cfg.Path, db.opts) + + return err +} + +func newOptions(cfg *Config) *opt.Options { + opts := &opt.Options{} + opts.ErrorIfMissing = false + + if cfg.CacheSize > 0 { + opts.BlockCache = cache.NewLRUCache(cfg.CacheSize) + } + + //we must use bloomfilter + opts.Filter = filter.NewBloomFilter(defaultFilterBits) + + if !cfg.Compression { + opts.Compression = opt.NoCompression + } else { + opts.Compression = opt.SnappyCompression + } + + if cfg.BlockSize > 0 { + opts.BlockSize = cfg.BlockSize + } + + if cfg.WriteBufferSize > 0 { + opts.WriteBuffer = cfg.WriteBufferSize + } + + return opts +} + +func (db *DB) Close() error { + return db.db.Close() +} + +func (db *DB) Put(key, value []byte) error { + return db.db.Put(key, value, nil) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + v, err := db.db.Get(key, nil) + if err == leveldb.ErrNotFound { + return nil, nil + } + return v, nil +} + +func (db *DB) Delete(key []byte) error { + return db.db.Delete(key, nil) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: new(leveldb.Batch), + } + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := &Iterator{ + db.db.NewIterator(nil, db.iteratorOpts), + } + + return it +} diff --git a/store/goleveldb/iterator.go b/store/goleveldb/iterator.go new file mode 100644 index 0000000..c1fd8b5 --- /dev/null +++ b/store/goleveldb/iterator.go @@ -0,0 +1,49 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type Iterator struct { + it iterator.Iterator +} + +func (it *Iterator) Key() []byte { + return it.it.Key() +} + +func (it *Iterator) Value() []byte { + return it.it.Value() +} + +func (it *Iterator) Close() error { + if it.it != nil { + it.it.Release() + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return it.it.Valid() +} + +func (it *Iterator) Next() { + it.it.Next() +} + +func (it *Iterator) Prev() { + it.it.Prev() +} + +func (it *Iterator) First() { + it.it.First() +} + +func (it *Iterator) Last() { + it.it.Last() +} + +func (it *Iterator) Seek(key []byte) { + it.it.Seek(key) +} diff --git a/leveldb/iterator.go b/store/iterator.go similarity index 80% rename from leveldb/iterator.go rename to store/iterator.go index a75e4d7..ff2536d 100644 --- a/leveldb/iterator.go +++ b/store/iterator.go @@ -1,14 +1,8 @@ -package leveldb - -// #cgo LDFLAGS: -lleveldb -// #include -// #include "leveldb/c.h" -// #include "leveldb_ext.h" -import "C" +package store import ( "bytes" - "unsafe" + "github.com/siddontang/ledisdb/store/driver" ) const ( @@ -27,10 +21,10 @@ const ( // // range type: // -// close: [min, max] -// open: (min, max) -// lopen: (min, max] -// ropen: [min, max) +// close: [min, max] +// open: (min, max) +// lopen: (min, max] +// ropen: [min, max) // type Range struct { Min []byte @@ -45,54 +39,39 @@ type Limit struct { } type Iterator struct { - it *C.leveldb_iterator_t - isValid C.uchar + it driver.IIterator } // Returns a copy of key. func (it *Iterator) Key() []byte { - var klen C.size_t - kdata := C.leveldb_iter_key(it.it, &klen) - if kdata == nil { + k := it.it.Key() + if k == nil { return nil } - return C.GoBytes(unsafe.Pointer(kdata), C.int(klen)) + return append([]byte{}, k...) } // Returns a copy of value. func (it *Iterator) Value() []byte { - var vlen C.size_t - vdata := C.leveldb_iter_value(it.it, &vlen) - if vdata == nil { + v := it.it.Value() + if v == nil { return nil } - return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen)) + return append([]byte{}, v...) } // Returns a reference of key. // you must be careful that it will be changed after next iterate. func (it *Iterator) RawKey() []byte { - var klen C.size_t - kdata := C.leveldb_iter_key(it.it, &klen) - if kdata == nil { - return nil - } - - return slice(unsafe.Pointer(kdata), int(C.int(klen))) + return it.it.Key() } // Returns a reference of value. // you must be careful that it will be changed after next iterate. func (it *Iterator) RawValue() []byte { - var vlen C.size_t - vdata := C.leveldb_iter_value(it.it, &vlen) - if vdata == nil { - return nil - } - - return slice(unsafe.Pointer(vdata), int(C.int(vlen))) + return it.it.Value() } // Copy key to b, if b len is small or nil, returns a new one. @@ -126,33 +105,33 @@ func (it *Iterator) BufValue(b []byte) []byte { func (it *Iterator) Close() { if it.it != nil { - C.leveldb_iter_destroy(it.it) + it.it.Close() it.it = nil } } func (it *Iterator) Valid() bool { - return ucharToBool(it.isValid) + return it.it.Valid() } func (it *Iterator) Next() { - it.isValid = C.leveldb_iter_next_ext(it.it) + it.it.Next() } func (it *Iterator) Prev() { - it.isValid = C.leveldb_iter_prev_ext(it.it) + it.it.Prev() } func (it *Iterator) SeekToFirst() { - it.isValid = C.leveldb_iter_seek_to_first_ext(it.it) + it.it.First() } func (it *Iterator) SeekToLast() { - it.isValid = C.leveldb_iter_seek_to_last_ext(it.it) + it.it.Last() } func (it *Iterator) Seek(key []byte) { - it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) + it.it.Seek(key) } // Finds by key, if not found, nil returns. diff --git a/store/leveldb.go b/store/leveldb.go new file mode 100644 index 0000000..3453bc8 --- /dev/null +++ b/store/leveldb.go @@ -0,0 +1,32 @@ +// +build leveldb + +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/leveldb" +) + +const LevelDBName = "leveldb" + +type LevelDBStore struct { +} + +func (s LevelDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &leveldb.Config{} + copier.Copy(c, cfg) + + return leveldb.Open(c) +} + +func (s LevelDBStore) Repair(cfg *Config) error { + c := &leveldb.Config{} + copier.Copy(c, cfg) + + return leveldb.Repair(c) +} + +func init() { + Register(LevelDBName, LevelDBStore{}) +} diff --git a/leveldb/batch.go b/store/leveldb/batch.go similarity index 87% rename from leveldb/batch.go rename to store/leveldb/batch.go index f24ec65..87600cb 100644 --- a/leveldb/batch.go +++ b/store/leveldb/batch.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb @@ -13,8 +15,9 @@ type WriteBatch struct { wbatch *C.leveldb_writebatch_t } -func (w *WriteBatch) Close() { +func (w *WriteBatch) Close() error { C.leveldb_writebatch_destroy(w.wbatch) + return nil } func (w *WriteBatch) Put(key, value []byte) { @@ -41,12 +44,9 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } -func (w *WriteBatch) SyncCommit() error { - return w.commit(w.db.syncWriteOpts) -} - -func (w *WriteBatch) Rollback() { +func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) + return nil } func (w *WriteBatch) commit(wb *WriteOptions) error { diff --git a/leveldb/cache.go b/store/leveldb/cache.go similarity index 94% rename from leveldb/cache.go rename to store/leveldb/cache.go index 3fbcf0d..e5587cb 100644 --- a/leveldb/cache.go +++ b/store/leveldb/cache.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb diff --git a/leveldb/db.go b/store/leveldb/db.go similarity index 59% rename from leveldb/db.go rename to store/leveldb/db.go index c6bfa94..6e4022d 100644 --- a/leveldb/db.go +++ b/store/leveldb/db.go @@ -1,3 +1,5 @@ +// +build leveldb + // Package leveldb is a wrapper for c++ leveldb package leveldb @@ -9,7 +11,7 @@ package leveldb import "C" import ( - "encoding/json" + "github.com/siddontang/ledisdb/store/driver" "os" "unsafe" ) @@ -26,35 +28,6 @@ type Config struct { MaxOpenFiles int `json:"max_open_files"` } -type DB struct { - cfg *Config - - db *C.leveldb_t - - opts *Options - - //for default read and write options - readOpts *ReadOptions - writeOpts *WriteOptions - iteratorOpts *ReadOptions - - syncWriteOpts *WriteOptions - - cache *Cache - - filter *FilterPolicy -} - -func OpenWithJsonConfig(configJson json.RawMessage) (*DB, error) { - cfg := new(Config) - err := json.Unmarshal(configJson, cfg) - if err != nil { - return nil, err - } - - return Open(cfg) -} - func Open(cfg *Config) (*DB, error) { if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { return nil, err @@ -70,21 +43,6 @@ func Open(cfg *Config) (*DB, error) { return db, nil } -func (db *DB) open() error { - db.initOptions(db.cfg) - - var errStr *C.char - ldbname := C.CString(db.cfg.Path) - defer C.leveldb_free(unsafe.Pointer(ldbname)) - - db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) - if errStr != nil { - db.db = nil - return saveError(errStr) - } - return nil -} - func Repair(cfg *Config) error { db := new(DB) db.cfg = cfg @@ -108,6 +66,38 @@ func Repair(cfg *Config) error { return nil } +type DB struct { + cfg *Config + + db *C.leveldb_t + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + cache *Cache + + filter *FilterPolicy +} + +func (db *DB) open() error { + db.initOptions(db.cfg) + + var errStr *C.char + ldbname := C.CString(db.cfg.Path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + db.db = nil + return saveError(errStr) + } + return nil +} + func (db *DB) initOptions(cfg *Config) { opts := NewOptions() @@ -126,6 +116,8 @@ func (db *DB) initOptions(cfg *Config) { if !cfg.Compression { opts.SetCompression(NoCompression) + } else { + opts.SetCompression(SnappyCompression) } if cfg.BlockSize <= 0 { @@ -153,9 +145,6 @@ func (db *DB) initOptions(cfg *Config) { db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) - - db.syncWriteOpts = NewWriteOptions() - db.syncWriteOpts.SetSync(true) } func (db *DB) Close() error { @@ -177,80 +166,23 @@ func (db *DB) Close() error { db.readOpts.Close() db.writeOpts.Close() db.iteratorOpts.Close() - db.syncWriteOpts.Close() return nil } -func (db *DB) Destroy() error { - path := db.cfg.Path - - db.Close() - - opts := NewOptions() - defer opts.Close() - - var errStr *C.char - ldbname := C.CString(path) - defer C.leveldb_free(unsafe.Pointer(ldbname)) - - C.leveldb_destroy_db(opts.Opt, ldbname, &errStr) - if errStr != nil { - return saveError(errStr) - } - return nil -} - -func (db *DB) Clear() error { - bc := db.NewWriteBatch() - defer bc.Close() - - var err error - it := db.NewIterator() - it.SeekToFirst() - - num := 0 - for ; it.Valid(); it.Next() { - bc.Delete(it.RawKey()) - num++ - if num == 1000 { - num = 0 - if err = bc.Commit(); err != nil { - return err - } - } - } - - err = bc.Commit() - - return err -} - func (db *DB) Put(key, value []byte) error { return db.put(db.writeOpts, key, value) } -func (db *DB) SyncPut(key, value []byte) error { - return db.put(db.syncWriteOpts, key, value) -} - func (db *DB) Get(key []byte) ([]byte, error) { - return db.get(nil, db.readOpts, key) -} - -func (db *DB) BufGet(r []byte, key []byte) ([]byte, error) { - return db.get(r, db.readOpts, key) + return db.get(db.readOpts, key) } func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } -func (db *DB) SyncDelete(key []byte) error { - return db.delete(db.syncWriteOpts, key) -} - -func (db *DB) NewWriteBatch() *WriteBatch { +func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, wbatch: C.leveldb_writebatch_create(), @@ -258,22 +190,7 @@ func (db *DB) NewWriteBatch() *WriteBatch { return wb } -func (db *DB) NewSnapshot() *Snapshot { - s := &Snapshot{ - db: db, - snap: C.leveldb_create_snapshot(db.db), - readOpts: NewReadOptions(), - iteratorOpts: NewReadOptions(), - } - - s.readOpts.SetSnapshot(s) - s.iteratorOpts.SetSnapshot(s) - s.iteratorOpts.SetFillCache(false) - - return s -} - -func (db *DB) NewIterator() *Iterator { +func (db *DB) NewIterator() driver.IIterator { it := new(Iterator) it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) @@ -281,28 +198,6 @@ func (db *DB) NewIterator() *Iterator { return it } -func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} - func (db *DB) put(wo *WriteOptions, key, value []byte) error { var errStr *C.char var k, v *C.char @@ -324,7 +219,7 @@ func (db *DB) put(wo *WriteOptions, key, value []byte) error { return nil } -func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) { +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { var errStr *C.char var vallen C.size_t var k *C.char @@ -347,13 +242,7 @@ func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) { defer C.leveldb_get_free_ext(unsafe.Pointer(c)) - if r == nil { - r = []byte{} - } - - r = r[0:0] - b := slice(unsafe.Pointer(value), int(C.int(vallen))) - return append(r, b...), nil + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil } func (db *DB) delete(wo *WriteOptions, key []byte) error { diff --git a/leveldb/filterpolicy.go b/store/leveldb/filterpolicy.go similarity index 95% rename from leveldb/filterpolicy.go rename to store/leveldb/filterpolicy.go index b007d58..640139f 100644 --- a/leveldb/filterpolicy.go +++ b/store/leveldb/filterpolicy.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb diff --git a/store/leveldb/iterator.go b/store/leveldb/iterator.go new file mode 100644 index 0000000..49cfd7d --- /dev/null +++ b/store/leveldb/iterator.go @@ -0,0 +1,70 @@ +// +build leveldb + +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include +// #include "leveldb/c.h" +// #include "leveldb_ext.h" +import "C" + +import ( + "unsafe" +) + +type Iterator struct { + it *C.leveldb_iterator_t + isValid C.uchar +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +func (it *Iterator) Close() error { + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(it.isValid) +} + +func (it *Iterator) Next() { + it.isValid = C.leveldb_iter_next_ext(it.it) +} + +func (it *Iterator) Prev() { + it.isValid = C.leveldb_iter_prev_ext(it.it) +} + +func (it *Iterator) First() { + it.isValid = C.leveldb_iter_seek_to_first_ext(it.it) +} + +func (it *Iterator) Last() { + it.isValid = C.leveldb_iter_seek_to_last_ext(it.it) +} + +func (it *Iterator) Seek(key []byte) { + it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} diff --git a/leveldb/leveldb_ext.cc b/store/leveldb/leveldb_ext.cc similarity index 98% rename from leveldb/leveldb_ext.cc rename to store/leveldb/leveldb_ext.cc index 9ed2579..96d6541 100644 --- a/leveldb/leveldb_ext.cc +++ b/store/leveldb/leveldb_ext.cc @@ -1,3 +1,5 @@ +// +build leveldb + #include "leveldb_ext.h" #include diff --git a/leveldb/leveldb_ext.h b/store/leveldb/leveldb_ext.h similarity index 97% rename from leveldb/leveldb_ext.h rename to store/leveldb/leveldb_ext.h index 8b26010..8222ae3 100644 --- a/leveldb/leveldb_ext.h +++ b/store/leveldb/leveldb_ext.h @@ -1,3 +1,5 @@ +// +build leveldb + #ifndef LEVELDB_EXT_H #define LEVELDB_EXT_H diff --git a/leveldb/levigo-license b/store/leveldb/levigo-license similarity index 100% rename from leveldb/levigo-license rename to store/leveldb/levigo-license diff --git a/leveldb/options.go b/store/leveldb/options.go similarity index 93% rename from leveldb/options.go rename to store/leveldb/options.go index 62836c0..83009be 100644 --- a/leveldb/options.go +++ b/store/leveldb/options.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb @@ -103,14 +105,6 @@ func (ro *ReadOptions) SetFillCache(b bool) { C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) } -func (ro *ReadOptions) SetSnapshot(snap *Snapshot) { - var s *C.leveldb_snapshot_t - if snap != nil { - s = snap.snap - } - C.leveldb_readoptions_set_snapshot(ro.Opt, s) -} - func (wo *WriteOptions) Close() { C.leveldb_writeoptions_destroy(wo.Opt) } diff --git a/leveldb/util.go b/store/leveldb/util.go similarity index 97% rename from leveldb/util.go rename to store/leveldb/util.go index e1fd57d..6efe33b 100644 --- a/leveldb/util.go +++ b/store/leveldb/util.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #include "leveldb/c.h" diff --git a/store/mdb.go b/store/mdb.go new file mode 100644 index 0000000..364065a --- /dev/null +++ b/store/mdb.go @@ -0,0 +1,30 @@ +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/mdb" +) + +const LMDBName = "lmdb" + +type LMDBStore struct { +} + +func (s LMDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &mdb.Config{} + copier.Copy(c, cfg) + + return mdb.Open(c) +} + +func (s LMDBStore) Repair(cfg *Config) error { + c := &mdb.Config{} + copier.Copy(c, cfg) + + return mdb.Repair(c) +} + +func init() { + Register(LMDBName, LMDBStore{}) +} diff --git a/store/mdb/batch.go b/store/mdb/batch.go new file mode 100644 index 0000000..d8debf6 --- /dev/null +++ b/store/mdb/batch.go @@ -0,0 +1,32 @@ +package mdb + +type Write struct { + Key []byte + Value []byte +} + +type WriteBatch struct { + db *MDB + wb []Write +} + +func (w *WriteBatch) Close() error { + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + w.wb = append(w.wb, Write{key, value}) +} + +func (w *WriteBatch) Delete(key []byte) { + w.wb = append(w.wb, Write{key, nil}) +} + +func (w *WriteBatch) Commit() error { + return w.db.BatchPut(w.wb) +} + +func (w *WriteBatch) Rollback() error { + w.wb = w.wb[0:0] + return nil +} diff --git a/store/mdb/influxdb_license b/store/mdb/influxdb_license new file mode 100644 index 0000000..03f21e8 --- /dev/null +++ b/store/mdb/influxdb_license @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013-2014 Errplane Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go new file mode 100644 index 0000000..a10d0d4 --- /dev/null +++ b/store/mdb/mdb.go @@ -0,0 +1,255 @@ +package mdb + +import ( + mdb "github.com/influxdb/gomdb" + "github.com/siddontang/ledisdb/store/driver" + "os" +) + +type Config struct { + Path string `json:"path"` + MapSize int `json:"map_size"` +} + +type MDB struct { + env *mdb.Env + db mdb.DBI + path string +} + +func Open(c *Config) (MDB, error) { + path := c.Path + if c.MapSize == 0 { + c.MapSize = 1024 * 1024 * 1024 + } + + env, err := mdb.NewEnv() + if err != nil { + return MDB{}, err + } + + // TODO: max dbs should be configurable + if err := env.SetMaxDBs(1); err != nil { + return MDB{}, err + } + if err := env.SetMapSize(uint64(c.MapSize)); err != nil { + return MDB{}, err + } + + if _, err := os.Stat(path); err != nil { + err = os.MkdirAll(path, 0755) + if err != nil { + return MDB{}, err + } + } + + err = env.Open(path, mdb.NOSYNC|mdb.NOMETASYNC|mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755) + if err != nil { + return MDB{}, err + } + + tx, err := env.BeginTxn(nil, 0) + if err != nil { + return MDB{}, err + } + + dbi, err := tx.DBIOpen(nil, mdb.CREATE) + if err != nil { + return MDB{}, err + } + + if err := tx.Commit(); err != nil { + return MDB{}, err + } + + db := MDB{ + env: env, + db: dbi, + path: path, + } + + return db, nil +} + +func Repair(c *Config) error { + println("llmd not supports repair") + return nil +} + +func (db MDB) Put(key, value []byte) error { + itr := db.iterator(false) + defer itr.Close() + + itr.err = itr.c.Put(key, value, 0) + itr.setState() + return itr.Error() +} + +func (db MDB) BatchPut(writes []Write) error { + itr := db.iterator(false) + + for _, w := range writes { + if w.Value == nil { + itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET) + if itr.err == nil { + itr.err = itr.c.Del(0) + } + } else { + itr.err = itr.c.Put(w.Key, w.Value, 0) + } + + if itr.err != nil && itr.err != mdb.NotFound { + break + } + } + itr.setState() + + return itr.Close() +} + +func (db MDB) Get(key []byte) ([]byte, error) { + tx, err := db.env.BeginTxn(nil, mdb.RDONLY) + if err != nil { + return nil, err + } + defer tx.Commit() + + v, err := tx.Get(db.db, key) + if err == mdb.NotFound { + return nil, nil + } + return v, err +} + +func (db MDB) Delete(key []byte) error { + itr := db.iterator(false) + defer itr.Close() + + itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET) + if itr.err == nil { + itr.err = itr.c.Del(0) + } + itr.setState() + return itr.Error() +} + +type MDBIterator struct { + key []byte + value []byte + c *mdb.Cursor + tx *mdb.Txn + valid bool + err error +} + +func (itr *MDBIterator) Key() []byte { + return itr.key +} + +func (itr *MDBIterator) Value() []byte { + return itr.value +} + +func (itr *MDBIterator) Valid() bool { + return itr.valid +} + +func (itr *MDBIterator) Error() error { + return itr.err +} + +func (itr *MDBIterator) getCurrent() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.GET_CURRENT) + itr.setState() +} + +func (itr *MDBIterator) Seek(key []byte) { + itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET_RANGE) + itr.setState() +} +func (itr *MDBIterator) Next() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.NEXT) + itr.setState() +} + +func (itr *MDBIterator) Prev() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.PREV) + itr.setState() +} + +func (itr *MDBIterator) First() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.FIRST) + itr.setState() +} + +func (itr *MDBIterator) Last() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.LAST) + itr.setState() +} + +func (itr *MDBIterator) setState() { + if itr.err != nil { + if itr.err == mdb.NotFound { + itr.err = nil + } + itr.valid = false + } +} + +func (itr *MDBIterator) Close() error { + if err := itr.c.Close(); err != nil { + itr.tx.Abort() + return err + } + if itr.err != nil { + itr.tx.Abort() + return itr.err + } + return itr.tx.Commit() +} + +func (_ MDB) Name() string { + return "lmdb" +} + +func (db MDB) Path() string { + return db.path +} + +func (db MDB) Compact() { +} + +func (db MDB) iterator(rdonly bool) *MDBIterator { + flags := uint(0) + if rdonly { + flags = mdb.RDONLY + } + tx, err := db.env.BeginTxn(nil, flags) + if err != nil { + return &MDBIterator{nil, nil, nil, nil, false, err} + } + + c, err := tx.CursorOpen(db.db) + if err != nil { + tx.Abort() + return &MDBIterator{nil, nil, nil, nil, false, err} + } + + return &MDBIterator{nil, nil, c, tx, true, nil} +} + +func (db MDB) Close() error { + db.env.DBIClose(db.db) + if err := db.env.Close(); err != nil { + panic(err) + } + return nil +} + +func (db MDB) NewIterator() driver.IIterator { + return db.iterator(true) +} + +func (db MDB) NewWriteBatch() driver.IWriteBatch { + return &WriteBatch{&db, []Write{}} +} diff --git a/store/rocksdb.go b/store/rocksdb.go new file mode 100644 index 0000000..53e135e --- /dev/null +++ b/store/rocksdb.go @@ -0,0 +1,32 @@ +// +build rocksdb + +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/rocksdb" +) + +const RocksDBName = "rocksdb" + +type RocksDBStore struct { +} + +func (s RocksDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &rocksdb.Config{} + copier.Copy(c, cfg) + + return rocksdb.Open(c) +} + +func (s RocksDBStore) Repair(cfg *Config) error { + c := &rocksdb.Config{} + copier.Copy(c, cfg) + + return rocksdb.Repair(c) +} + +func init() { + Register(RocksDBName, RocksDBStore{}) +} diff --git a/store/rocksdb/batch.go b/store/rocksdb/batch.go new file mode 100644 index 0000000..46e700b --- /dev/null +++ b/store/rocksdb/batch.go @@ -0,0 +1,59 @@ +// +build rocksdb + +package rocksdb + +// #cgo LDFLAGS: -lrocksdb +// #include "rocksdb/c.h" +import "C" + +import ( + "unsafe" +) + +type WriteBatch struct { + db *DB + wbatch *C.rocksdb_writebatch_t +} + +func (w *WriteBatch) Close() error { + C.rocksdb_writebatch_destroy(w.wbatch) + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + + C.rocksdb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv)) +} + +func (w *WriteBatch) Delete(key []byte) { + C.rocksdb_writebatch_delete(w.wbatch, + (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} + +func (w *WriteBatch) Commit() error { + return w.commit(w.db.writeOpts) +} + +func (w *WriteBatch) Rollback() error { + C.rocksdb_writebatch_clear(w.wbatch) + return nil +} + +func (w *WriteBatch) commit(wb *WriteOptions) error { + var errStr *C.char + C.rocksdb_write(w.db.db, wb.Opt, w.wbatch, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/store/rocksdb/cache.go b/store/rocksdb/cache.go new file mode 100644 index 0000000..931998b --- /dev/null +++ b/store/rocksdb/cache.go @@ -0,0 +1,20 @@ +// +build rocksdb + +package rocksdb + +// #cgo LDFLAGS: -lrocksdb +// #include +// #include "rocksdb/c.h" +import "C" + +type Cache struct { + Cache *C.rocksdb_cache_t +} + +func NewLRUCache(capacity int) *Cache { + return &Cache{C.rocksdb_cache_create_lru(C.size_t(capacity))} +} + +func (c *Cache) Close() { + C.rocksdb_cache_destroy(c.Cache) +} diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go new file mode 100644 index 0000000..0faeaec --- /dev/null +++ b/store/rocksdb/db.go @@ -0,0 +1,279 @@ +// +build rocksdb + +// Package rocksdb is a wrapper for c++ rocksdb +package rocksdb + +/* +#cgo LDFLAGS: -lrocksdb +#include +#include +#include "rocksdb_ext.h" +*/ +import "C" + +import ( + "github.com/siddontang/ledisdb/store/driver" + "os" + "runtime" + "unsafe" +) + +const defaultFilterBits int = 10 + +type Config struct { + Path string `json:"path"` + + Compression bool `json:"compression"` + BlockSize int `json:"block_size"` + WriteBufferSize int `json:"write_buffer_size"` + CacheSize int `json:"cache_size"` + MaxOpenFiles int `json:"max_open_files"` +} + +func Open(cfg *Config) (*DB, error) { + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.cfg = cfg + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func Repair(cfg *Config) error { + db := new(DB) + db.cfg = cfg + + err := db.open() + defer db.Close() + + //open ok, do not need repair + if err == nil { + return nil + } + + var errStr *C.char + ldbname := C.CString(db.cfg.Path) + defer C.free(unsafe.Pointer(ldbname)) + + C.rocksdb_repair_db(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} + +type DB struct { + cfg *Config + + db *C.rocksdb_t + + env *Env + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + cache *Cache + + filter *FilterPolicy +} + +func (db *DB) open() error { + db.initOptions(db.cfg) + + var errStr *C.char + ldbname := C.CString(db.cfg.Path) + defer C.free(unsafe.Pointer(ldbname)) + + db.db = C.rocksdb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + db.db = nil + return saveError(errStr) + } + return nil +} + +func (db *DB) initOptions(cfg *Config) { + opts := NewOptions() + + opts.SetCreateIfMissing(true) + + if cfg.CacheSize <= 0 { + cfg.CacheSize = 4 * 1024 * 1024 + } + + db.env = NewDefaultEnv() + db.env.SetBackgroundThreads(runtime.NumCPU() * 2) + db.env.SetHighPriorityBackgroundThreads(1) + opts.SetEnv(db.env) + + db.cache = NewLRUCache(cfg.CacheSize) + opts.SetCache(db.cache) + + //we must use bloomfilter + db.filter = NewBloomFilter(defaultFilterBits) + opts.SetFilterPolicy(db.filter) + + if !cfg.Compression { + opts.SetCompression(NoCompression) + } else { + opts.SetCompression(SnappyCompression) + } + + if cfg.BlockSize <= 0 { + cfg.BlockSize = 4 * 1024 + } + + opts.SetBlockSize(cfg.BlockSize) + + if cfg.WriteBufferSize <= 0 { + cfg.WriteBufferSize = 4 * 1024 * 1024 + } + + opts.SetWriteBufferSize(cfg.WriteBufferSize) + + if cfg.MaxOpenFiles < 1024 { + cfg.MaxOpenFiles = 1024 + } + + opts.SetMaxOpenFiles(cfg.MaxOpenFiles) + + opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1) + opts.SetMaxBackgroundFlushes(1) + + opts.SetLevel0SlowdownWritesTrigger(16) + opts.SetLevel0StopWritesTrigger(64) + opts.SetTargetFileSizeBase(32 * 1024 * 1024) + + db.opts = opts + + db.readOpts = NewReadOptions() + db.writeOpts = NewWriteOptions() + + db.iteratorOpts = NewReadOptions() + db.iteratorOpts.SetFillCache(false) +} + +func (db *DB) Close() error { + if db.db != nil { + C.rocksdb_close(db.db) + db.db = nil + } + + db.opts.Close() + + if db.cache != nil { + db.cache.Close() + } + + if db.filter != nil { + db.filter.Close() + } + + if db.env != nil { + db.env.Close() + } + + db.readOpts.Close() + db.writeOpts.Close() + db.iteratorOpts.Close() + + return nil +} + +func (db *DB) Put(key, value []byte) error { + return db.put(db.writeOpts, key, value) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + return db.get(db.readOpts, key) +} + +func (db *DB) Delete(key []byte) error { + return db.delete(db.writeOpts, key) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: C.rocksdb_writebatch_create(), + } + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := new(Iterator) + + it.it = C.rocksdb_create_iterator(db.db, db.iteratorOpts.Opt) + + return it +} + +func (db *DB) put(wo *WriteOptions, key, value []byte) error { + var errStr *C.char + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + C.rocksdb_put( + db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { + var errStr *C.char + var vallen C.size_t + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + value := C.rocksdb_get( + db.db, ro.Opt, k, C.size_t(len(key)), &vallen, &errStr) + + if errStr != nil { + return nil, saveError(errStr) + } + + if value == nil { + return nil, nil + } + + defer C.free(unsafe.Pointer(value)) + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil +} + +func (db *DB) delete(wo *WriteOptions, key []byte) error { + var errStr *C.char + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + C.rocksdb_delete( + db.db, wo.Opt, k, C.size_t(len(key)), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/store/rocksdb/env.go b/store/rocksdb/env.go new file mode 100644 index 0000000..e239c1b --- /dev/null +++ b/store/rocksdb/env.go @@ -0,0 +1,27 @@ +// +build rocksdb + +package rocksdb + +// #cgo LDFLAGS: -lrocksdb +// #include "rocksdb/c.h" +import "C" + +type Env struct { + Env *C.rocksdb_env_t +} + +func NewDefaultEnv() *Env { + return &Env{C.rocksdb_create_default_env()} +} + +func (env *Env) SetHighPriorityBackgroundThreads(n int) { + C.rocksdb_env_set_high_priority_background_threads(env.Env, C.int(n)) +} + +func (env *Env) SetBackgroundThreads(n int) { + C.rocksdb_env_set_background_threads(env.Env, C.int(n)) +} + +func (env *Env) Close() { + C.rocksdb_env_destroy(env.Env) +} diff --git a/store/rocksdb/filterpolicy.go b/store/rocksdb/filterpolicy.go new file mode 100644 index 0000000..3be4ef6 --- /dev/null +++ b/store/rocksdb/filterpolicy.go @@ -0,0 +1,21 @@ +// +build rocksdb + +package rocksdb + +// #cgo LDFLAGS: -lrocksdb +// #include +// #include "rocksdb/c.h" +import "C" + +type FilterPolicy struct { + Policy *C.rocksdb_filterpolicy_t +} + +func NewBloomFilter(bitsPerKey int) *FilterPolicy { + policy := C.rocksdb_filterpolicy_create_bloom(C.int(bitsPerKey)) + return &FilterPolicy{policy} +} + +func (fp *FilterPolicy) Close() { + C.rocksdb_filterpolicy_destroy(fp.Policy) +} diff --git a/store/rocksdb/iterator.go b/store/rocksdb/iterator.go new file mode 100644 index 0000000..046c5e9 --- /dev/null +++ b/store/rocksdb/iterator.go @@ -0,0 +1,70 @@ +// +build rocksdb + +package rocksdb + +// #cgo LDFLAGS: -lrocksdb +// #include +// #include "rocksdb/c.h" +// #include "rocksdb_ext.h" +import "C" + +import ( + "unsafe" +) + +type Iterator struct { + it *C.rocksdb_iterator_t + isValid C.uchar +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.rocksdb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.rocksdb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +func (it *Iterator) Close() error { + if it.it != nil { + C.rocksdb_iter_destroy(it.it) + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(it.isValid) +} + +func (it *Iterator) Next() { + it.isValid = C.rocksdb_iter_next_ext(it.it) +} + +func (it *Iterator) Prev() { + it.isValid = C.rocksdb_iter_prev_ext(it.it) +} + +func (it *Iterator) First() { + it.isValid = C.rocksdb_iter_seek_to_first_ext(it.it) +} + +func (it *Iterator) Last() { + it.isValid = C.rocksdb_iter_seek_to_last_ext(it.it) +} + +func (it *Iterator) Seek(key []byte) { + it.isValid = C.rocksdb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} diff --git a/store/rocksdb/options.go b/store/rocksdb/options.go new file mode 100644 index 0000000..f5e6149 --- /dev/null +++ b/store/rocksdb/options.go @@ -0,0 +1,138 @@ +// +build rocksdb + +package rocksdb + +// #cgo LDFLAGS: -lrocksdb +// #include "rocksdb/c.h" +import "C" + +type CompressionOpt int + +const ( + NoCompression = CompressionOpt(0) + SnappyCompression = CompressionOpt(1) +) + +type Options struct { + Opt *C.rocksdb_options_t +} + +type ReadOptions struct { + Opt *C.rocksdb_readoptions_t +} + +type WriteOptions struct { + Opt *C.rocksdb_writeoptions_t +} + +func NewOptions() *Options { + opt := C.rocksdb_options_create() + return &Options{opt} +} + +func NewReadOptions() *ReadOptions { + opt := C.rocksdb_readoptions_create() + return &ReadOptions{opt} +} + +func NewWriteOptions() *WriteOptions { + opt := C.rocksdb_writeoptions_create() + return &WriteOptions{opt} +} + +func (o *Options) Close() { + C.rocksdb_options_destroy(o.Opt) +} + +func (o *Options) SetComparator(cmp *C.rocksdb_comparator_t) { + C.rocksdb_options_set_comparator(o.Opt, cmp) +} + +func (o *Options) SetErrorIfExists(error_if_exists bool) { + eie := boolToUchar(error_if_exists) + C.rocksdb_options_set_error_if_exists(o.Opt, eie) +} + +func (o *Options) SetCache(cache *Cache) { + C.rocksdb_options_set_cache(o.Opt, cache.Cache) +} + +func (o *Options) SetEnv(env *Env) { + C.rocksdb_options_set_env(o.Opt, env.Env) +} + +func (o *Options) SetWriteBufferSize(s int) { + C.rocksdb_options_set_write_buffer_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetParanoidChecks(pc bool) { + C.rocksdb_options_set_paranoid_checks(o.Opt, boolToUchar(pc)) +} + +func (o *Options) SetMaxOpenFiles(n int) { + C.rocksdb_options_set_max_open_files(o.Opt, C.int(n)) +} + +func (o *Options) SetBlockSize(s int) { + C.rocksdb_options_set_block_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetBlockRestartInterval(n int) { + C.rocksdb_options_set_block_restart_interval(o.Opt, C.int(n)) +} + +func (o *Options) SetCompression(t CompressionOpt) { + C.rocksdb_options_set_compression(o.Opt, C.int(t)) +} + +func (o *Options) SetCreateIfMissing(b bool) { + C.rocksdb_options_set_create_if_missing(o.Opt, boolToUchar(b)) +} + +func (o *Options) SetFilterPolicy(fp *FilterPolicy) { + var policy *C.rocksdb_filterpolicy_t + if fp != nil { + policy = fp.Policy + } + C.rocksdb_options_set_filter_policy(o.Opt, policy) +} + +func (o *Options) SetMaxBackgroundCompactions(n int) { + C.rocksdb_options_set_max_background_compactions(o.Opt, C.int(n)) +} + +func (o *Options) SetMaxBackgroundFlushes(n int) { + C.rocksdb_options_set_max_background_flushes(o.Opt, C.int(n)) +} + +func (o *Options) SetLevel0SlowdownWritesTrigger(n int) { + C.rocksdb_options_set_level0_slowdown_writes_trigger(o.Opt, C.int(n)) +} + +func (o *Options) SetLevel0StopWritesTrigger(n int) { + C.rocksdb_options_set_level0_stop_writes_trigger(o.Opt, C.int(n)) +} + +func (o *Options) SetTargetFileSizeBase(n int) { + C.rocksdb_options_set_target_file_size_base(o.Opt, C.uint64_t(uint64(n))) +} + +func (ro *ReadOptions) Close() { + C.rocksdb_readoptions_destroy(ro.Opt) +} + +func (ro *ReadOptions) SetVerifyChecksums(b bool) { + C.rocksdb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b)) +} + +func (ro *ReadOptions) SetFillCache(b bool) { + C.rocksdb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) +} + +func (wo *WriteOptions) Close() { + C.rocksdb_writeoptions_destroy(wo.Opt) +} + +func (wo *WriteOptions) SetSync(b bool) { + C.rocksdb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) +} diff --git a/store/rocksdb/rocksdb_ext.cc b/store/rocksdb/rocksdb_ext.cc new file mode 100644 index 0000000..4a7720f --- /dev/null +++ b/store/rocksdb/rocksdb_ext.cc @@ -0,0 +1,36 @@ +// +build rocksdb + +#include "rocksdb_ext.h" + +#include +#include + +extern "C" { + +unsigned char rocksdb_iter_seek_to_first_ext(rocksdb_iterator_t* iter) { + rocksdb_iter_seek_to_first(iter); + return rocksdb_iter_valid(iter); +} + +unsigned char rocksdb_iter_seek_to_last_ext(rocksdb_iterator_t* iter) { + rocksdb_iter_seek_to_last(iter); + return rocksdb_iter_valid(iter); +} + +unsigned char rocksdb_iter_seek_ext(rocksdb_iterator_t* iter, const char* k, size_t klen) { + rocksdb_iter_seek(iter, k, klen); + return rocksdb_iter_valid(iter); +} + +unsigned char rocksdb_iter_next_ext(rocksdb_iterator_t* iter) { + rocksdb_iter_next(iter); + return rocksdb_iter_valid(iter); +} + +unsigned char rocksdb_iter_prev_ext(rocksdb_iterator_t* iter) { + rocksdb_iter_prev(iter); + return rocksdb_iter_valid(iter); +} + + +} \ No newline at end of file diff --git a/store/rocksdb/rocksdb_ext.h b/store/rocksdb/rocksdb_ext.h new file mode 100644 index 0000000..4938294 --- /dev/null +++ b/store/rocksdb/rocksdb_ext.h @@ -0,0 +1,24 @@ +// +build rocksdb + +#ifndef ROCKSDB_EXT_H +#define ROCKSDB_EXT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "rocksdb/c.h" + +// Below iterator functions like rocksdb iterator but returns valid status for iterator +extern unsigned char rocksdb_iter_seek_to_first_ext(rocksdb_iterator_t*); +extern unsigned char rocksdb_iter_seek_to_last_ext(rocksdb_iterator_t*); +extern unsigned char rocksdb_iter_seek_ext(rocksdb_iterator_t*, const char* k, size_t klen); +extern unsigned char rocksdb_iter_next_ext(rocksdb_iterator_t*); +extern unsigned char rocksdb_iter_prev_ext(rocksdb_iterator_t*); + + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/store/rocksdb/util.go b/store/rocksdb/util.go new file mode 100644 index 0000000..f924d76 --- /dev/null +++ b/store/rocksdb/util.go @@ -0,0 +1,46 @@ +// +build rocksdb + +package rocksdb + +// #include +// #include "rocksdb/c.h" +import "C" + +import ( + "fmt" + "reflect" + "unsafe" +) + +func boolToUchar(b bool) C.uchar { + uc := C.uchar(0) + if b { + uc = C.uchar(1) + } + return uc +} + +func ucharToBool(uc C.uchar) bool { + if uc == C.uchar(0) { + return false + } + return true +} + +func saveError(errStr *C.char) error { + if errStr != nil { + gs := C.GoString(errStr) + C.free(unsafe.Pointer(errStr)) + return fmt.Errorf(gs) + } + return nil +} + +func slice(p unsafe.Pointer, n int) []byte { + var b []byte + pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pbyte.Data = uintptr(p) + pbyte.Len = n + pbyte.Cap = n + return b +} diff --git a/store/store.go b/store/store.go new file mode 100644 index 0000000..f415b7f --- /dev/null +++ b/store/store.go @@ -0,0 +1,70 @@ +package store + +import ( + "fmt" + "github.com/siddontang/ledisdb/store/driver" + "os" +) + +const DefaultStoreName = "goleveldb" + +type Store interface { + Open(cfg *Config) (driver.IDB, error) + Repair(cfg *Config) error +} + +var dbs = map[string]Store{} + +func Register(name string, store Store) { + if _, ok := dbs[name]; ok { + panic(fmt.Errorf("store %s is registered", name)) + } + + dbs[name] = store +} + +func ListStores() []string { + s := []string{} + for k, _ := range dbs { + s = append(s, k) + } + + return s +} + +func Open(cfg *Config) (*DB, error) { + if len(cfg.Name) == 0 { + cfg.Name = DefaultStoreName + } + + s, ok := dbs[cfg.Name] + if !ok { + return nil, fmt.Errorf("store %s is not registered", cfg.Name) + } + + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + idb, err := s.Open(cfg) + if err != nil { + return nil, err + } + + db := &DB{idb} + + return db, nil +} + +func Repair(cfg *Config) error { + if len(cfg.Name) == 0 { + cfg.Name = DefaultStoreName + } + + s, ok := dbs[cfg.Name] + if !ok { + return fmt.Errorf("db %s is not registered", cfg.Name) + } + + return s.Repair(cfg) +} diff --git a/leveldb/leveldb_test.go b/store/store_test.go similarity index 78% rename from leveldb/leveldb_test.go rename to store/store_test.go index 5da84d4..b911cb5 100644 --- a/leveldb/leveldb_test.go +++ b/store/store_test.go @@ -1,4 +1,4 @@ -package leveldb +package store import ( "bytes" @@ -8,23 +8,17 @@ import ( "testing" ) -var testConfigJson = []byte(` - { - "path" : "./testdb", - "compression":true, - "block_size" : 32768, - "write_buffer_size" : 2097152, - "cache_size" : 20971520 - } - `) - var testOnce sync.Once var testDB *DB func getTestDB() *DB { f := func() { var err error - testDB, err = OpenWithJsonConfig(testConfigJson) + + cfg := new(Config) + cfg.Path = "/tmp/testdb" + + testDB, err = Open(cfg) if err != nil { println(err.Error()) panic(err) @@ -131,7 +125,11 @@ func checkIterator(it *RangeLimitIterator, cv ...int) error { func TestIterator(t *testing.T) { db := getTestDB() - db.Clear() + i := db.NewIterator() + for i.SeekToFirst(); i.Valid(); i.Next() { + db.Delete(i.Key()) + } + i.Close() for i := 0; i < 10; i++ { key := []byte(fmt.Sprintf("key_%d", i)) @@ -139,6 +137,16 @@ func TestIterator(t *testing.T) { db.Put(key, value) } + i = db.NewIterator() + i.SeekToFirst() + + if !i.Valid() { + t.Fatal("must valid") + } else if string(i.Key()) != "key_0" { + t.Fatal(string(i.Key())) + } + i.Close() + var it *RangeLimitIterator k := func(i int) []byte { @@ -149,96 +157,67 @@ func TestIterator(t *testing.T) { if err := checkIterator(it, 1, 2, 3, 4, 5); err != nil { t.Fatal(err) } + it.Close() + + it = db.RangeLimitIterator(k(1), k(5), RangeClose, 0, -1) + if err := checkIterator(it, 1, 2, 3, 4, 5); err != nil { + t.Fatal(err) + } + it.Close() it = db.RangeLimitIterator(k(1), k(5), RangeClose, 1, 3) if err := checkIterator(it, 2, 3, 4); err != nil { t.Fatal(err) } + it.Close() it = db.RangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1) if err := checkIterator(it, 2, 3, 4, 5); err != nil { t.Fatal(err) } + it.Close() it = db.RangeLimitIterator(k(1), k(5), RangeROpen, 0, -1) if err := checkIterator(it, 1, 2, 3, 4); err != nil { t.Fatal(err) } + it.Close() it = db.RangeLimitIterator(k(1), k(5), RangeOpen, 0, -1) if err := checkIterator(it, 2, 3, 4); err != nil { t.Fatal(err) } + it.Close() it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 0, -1) if err := checkIterator(it, 5, 4, 3, 2, 1); err != nil { t.Fatal(err) } + it.Close() it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 1, 3) if err := checkIterator(it, 4, 3, 2); err != nil { t.Fatal(err) } + it.Close() it = db.RevRangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1) if err := checkIterator(it, 5, 4, 3, 2); err != nil { t.Fatal(err) } + it.Close() it = db.RevRangeLimitIterator(k(1), k(5), RangeROpen, 0, -1) if err := checkIterator(it, 4, 3, 2, 1); err != nil { t.Fatal(err) } + it.Close() it = db.RevRangeLimitIterator(k(1), k(5), RangeOpen, 0, -1) if err := checkIterator(it, 4, 3, 2); err != nil { t.Fatal(err) } -} - -func TestSnapshot(t *testing.T) { - db := getTestDB() - - key := []byte("key") - value := []byte("hello world") - - db.Put(key, value) - - s := db.NewSnapshot() - defer s.Close() - - db.Put(key, []byte("hello world2")) - - if v, err := s.Get(key); err != nil { - t.Fatal(err) - } else if string(v) != string(value) { - t.Fatal(string(v)) - } -} - -func TestDestroy(t *testing.T) { - db := getTestDB() - - db.Put([]byte("a"), []byte("1")) - if err := db.Clear(); err != nil { - t.Fatal(err) - } - - if _, err := os.Stat(db.cfg.Path); err != nil { - t.Fatal("must exist ", err.Error()) - } - - if v, err := db.Get([]byte("a")); err != nil { - t.Fatal(err) - } else if string(v) == "1" { - t.Fatal(string(v)) - } - - db.Destroy() - - if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) { - t.Fatal("must not exist") - } + it.Close() } func TestCloseMore(t *testing.T) { @@ -256,4 +235,6 @@ func TestCloseMore(t *testing.T) { db.Close() } + + os.RemoveAll(cfg.Path) } diff --git a/store/writebatch.go b/store/writebatch.go new file mode 100644 index 0000000..d898a03 --- /dev/null +++ b/store/writebatch.go @@ -0,0 +1,9 @@ +package store + +import ( + "github.com/siddontang/ledisdb/store/driver" +) + +type WriteBatch struct { + driver.IWriteBatch +}