Merge branch 'storage-feature' into develop

This commit is contained in:
siddontang 2014-07-26 18:43:23 +08:00
commit efa8e850f5
63 changed files with 1991 additions and 443 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ build
*.pyc
.DS_Store
nohup.out
build_config.mk

View File

@ -1,11 +1,16 @@
$(shell ./bootstrap.sh)
$(shell ./build_config.sh build_config.mk ./)
include build_config.mk
all: build
build:
go install ./...
go install -tags '$(GO_BUILD_TAGS)' ./...
clean:
go clean -i ./...
test:
go test ./...
go test -race ./...
go test -tags $(GO_BUILD_TAGS) ./...

View File

@ -1,11 +1,14 @@
# LedisDB
Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis.
Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis.
LedisDB now supports multi database as backend to store data, you can test and choose the proper one for you.
## Features
+ Rich advanced data structure: KV, List, Hash, ZSet, Bit.
+ Uses leveldb to store lots of data, over the memory limit.
+ Stores lots of data, over the memory limit.
+ Various backend database to use: LevelDB, goleveldb, LMDB.
+ Supports expiration and ttl.
+ Redis clients, like redis-cli, are supported directly.
+ Multi client API supports, including Golang, Python, Lua(Openresty).
@ -15,7 +18,7 @@ Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. I
## Build and Install
+ Create a workspace and checkout ledisdb source
Create a workspace and checkout ledisdb source
mkdir $WORKSPACE
cd $WORKSPACE
@ -23,7 +26,11 @@ Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. I
cd src/github.com/siddontang/ledisdb
+ Install leveldb and snappy, if you have installed, skip.
make
## LevelDB support
+ Install leveldb and snappy.
LedisDB supplies a simple shell to install leveldb and snappy:
@ -35,16 +42,43 @@ Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. I
+ Set LEVELDB_DIR and SNAPPY_DIR to the actual install path in dev.sh.
+ Then:
+ ```make```
. ./bootstap.sh
. ./dev.sh
## RocksDB support
go install ./...
+ Install rocksdb and snappy first.
LedisDB has not supplied a simple shell to install, maybe it will later.
+ Set ROCKSDB_DIR and SNAPPY_DIR to the actual install path in dev.sh.
+ ```make```
## Choose store database
LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, it will choose goleveldb as default to store data if you not set.
Choosing a store database to use is very simple, you have two ways:
+ Set in server config file
"db" : {
"name" : "leveldb"
}
+ Set in command flag
ledis-server -config=/etc/ledis.json -db_name=leveldb
Flag command set will overwrite config set.
**Caveat**
You must known that changing store database runtime is very dangerous, LedisDB will not guarantee the data validation if you do it.
## Server Example
./ledis-server -config=/etc/ledis.json
ledis-server -config=/etc/ledis.json
//another shell
ledis-cli -p 6380

3
bootstrap.sh Normal file → Executable file
View File

@ -5,3 +5,6 @@
go get github.com/siddontang/go-log/log
go get github.com/siddontang/go-snappy/snappy
go get github.com/siddontang/copier
go get github.com/syndtr/goleveldb/leveldb
go get github.com/influxdb/gomdb

16
build_config.sh Executable file
View File

@ -0,0 +1,16 @@
#!/bin/sh
OUTPUT=$1
PREFIX=$2
if test -z "$OUTPUT" || test -z "$PREFIX"; then
echo "usage: $0 <output-filename> <directory_prefix>" >&2
exit 1
fi
# Delete existing output, if it exists
rm -f $OUTPUT
touch $OUTPUT
source ./dev.sh
echo "GO_BUILD_TAGS=$GO_BUILD_TAGS" >> $OUTPUT

View File

@ -4,7 +4,7 @@ import (
"encoding/json"
"flag"
"github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"io/ioutil"
)
@ -35,7 +35,7 @@ func main() {
return
}
if err = leveldb.Repair(cfg.NewDBConfig()); err != nil {
if err = store.Repair(cfg.NewDBConfig()); err != nil {
println("repair error: ", err.Error())
}
}

View File

@ -10,6 +10,7 @@ import (
)
var configFile = flag.String("config", "/etc/ledis.json", "ledisdb config file")
var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name")
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
@ -27,6 +28,10 @@ func main() {
return
}
if len(*dbName) > 0 {
cfg.DB.Name = *dbName
}
var app *server.App
app, err = server.NewApp(cfg)
if err != nil {

62
dev.sh
View File

@ -1,19 +1,18 @@
#!/bin/bash
export VTTOP=$(pwd)
export VTROOT="${VTROOT:-${VTTOP/\/src\/github.com\/siddontang\/ledisdb/}}"
# VTTOP sanity check
if [[ "$VTTOP" == "${VTTOP/\/src\/github.com\/siddontang\/ledisdb/}" ]]; then
echo "WARNING: VTTOP($VTTOP) does not contain src/github.com/siddontang/ledisdb"
export LEDISTOP=$(pwd)
export LEDISROOT="${LEDISROOT:-${LEDISTOP/\/src\/github.com\/siddontang\/ledisdb/}}"
# LEDISTOP sanity check
if [[ "$LEDISTOP" == "${LEDISTOP/\/src\/github.com\/siddontang\/ledisdb/}" ]]; then
echo "WARNING: LEDISTOP($LEDISTOP) does not contain src/github.com/siddontang/ledisdb"
exit 1
fi
#default snappy and leveldb install path
#you may change yourself
SNAPPY_DIR=/usr/local/snappy
LEVELDB_DIR=/usr/local/leveldb
ROCKSDB_DIR=/usr/local/rocksdb
function add_path()
{
@ -26,16 +25,45 @@ function add_path()
fi
}
export GOPATH=$(add_path $GOPATH $VTROOT)
export GOPATH=$(add_path $GOPATH $LEDISROOT)
export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include"
export CGO_CXXFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include"
export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy"
GO_BUILD_TAGS=
CGO_CFLAGS=
CGO_CXXFLAGS=
CGO_LDFLAGS=
#for linux, use LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib)
export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib)
# check snappy
if [ -f $SNAPPY_DIR/lib/libsnappy.a ]; then
CGO_CFLAGS="$CGO_CFLAGS -I$SNAPPY_DIR/include"
CGO_CXXFLAGS="$CGO_CXXFLAGS -I$SNAPPY_DIR/include"
CGO_LDFLAGS="$CGO_LDFLAGS -L$SNAPPY_DIR/lib -lsnappy"
LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib)
DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $SNAPPY_DIR/lib)
fi
#for macos, use DYLD_LIBRARY_PATH
export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $SNAPPY_DIR/lib)
export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $LEVELDB_DIR/lib)
# check leveldb
if [ -f $LEVELDB_DIR/lib/libleveldb.a ]; then
CGO_CFLAGS="$CGO_CFLAGS -I$LEVELDB_DIR/include"
CGO_CXXFLAGS="$CGO_CXXFLAGS -I$LEVELDB_DIR/include"
CGO_LDFLAGS="$CGO_LDFLAGS -L$LEVELDB_DIR/lib -lleveldb"
LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib)
DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $LEVELDB_DIR/lib)
GO_BUILD_TAGS="$GO_BUILD_TAGS leveldb"
fi
# check rocksdb
if [ -f $ROCKSDB_DIR/lib/librocksdb.a ]; then
CGO_CFLAGS="$CGO_CFLAGS -I$ROCKSDB_DIR/include"
CGO_CXXFLAGS="$CGO_CXXFLAGS -I$ROCKSDB_DIR/include"
CGO_LDFLAGS="$CGO_LDFLAGS -L$ROCKSDB_DIR/lib -lrocksdb"
LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $ROCKSDB_DIR/lib)
DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $ROCKSDB_DIR/lib)
GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb"
fi
export CGO_CFLAGS
export CGO_CXXFLAGS
export CGO_LDFLAGS
export LD_LIBRARY_PATH
export DYLD_LIBRARY_PATH
export GO_BUILD_TAGS

View File

@ -1,12 +1,16 @@
{
"addr": "127.0.0.1:6380",
"data_dir": "/tmp/ledis_server",
"db": {
"name" : "leveldb",
"compression": false,
"block_size": 32768,
"write_buffer_size": 67108864,
"cache_size": 524288000,
"max_open_files":1024
"max_open_files":1024,
"map_size" : 524288000
},
"access_log" : "access.log"

View File

@ -1,20 +1,24 @@
package ledis
import (
"fmt"
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"path"
"strings"
)
type Config struct {
DataDir string `json:"data_dir"`
DB struct {
Name string `json:"name"`
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
MaxOpenFiles int `json:"max_open_files"`
MapSize int `json:"map_size"`
} `json:"db"`
BinLog struct {
@ -24,11 +28,19 @@ type Config struct {
} `json:"binlog"`
}
func (cfg *Config) NewDBConfig() *leveldb.Config {
dbPath := path.Join(cfg.DataDir, "data")
func (cfg *Config) NewDBConfig() *store.Config {
if len(cfg.DB.Name) == 0 {
fmt.Printf("no store set, use default %s\n", store.DefaultStoreName)
cfg.DB.Name = store.DefaultStoreName
}
dbCfg := new(leveldb.Config)
cfg.DB.Name = strings.ToLower(cfg.DB.Name)
dbCfg := new(store.Config)
copier.Copy(dbCfg, &cfg.DB)
dbPath := path.Join(cfg.DataDir, fmt.Sprintf("%s_data", cfg.DB.Name))
dbCfg.Path = dbPath
return dbCfg
}

View File

@ -1,4 +1,5 @@
// Package ledis is a high performance embedded NoSQL based on leveldb.
// Package ledis is a high performance embedded NoSQL.
//
// Ledis supports various advanced data structure like kv, list, hash and zset like redis.
//
// Other features include binlog replication, data with a limited time-to-live.

View File

@ -5,7 +5,6 @@ import (
"bytes"
"encoding/binary"
"github.com/siddontang/go-snappy/snappy"
"github.com/siddontang/ledisdb/leveldb"
"io"
"os"
)
@ -57,16 +56,13 @@ func (l *Ledis) DumpFile(path string) error {
}
func (l *Ledis) Dump(w io.Writer) error {
var sp *leveldb.Snapshot
var m *MasterInfo = new(MasterInfo)
if l.binlog == nil {
sp = l.ldb.NewSnapshot()
} else {
l.Lock()
sp = l.ldb.NewSnapshot()
defer l.Unlock()
if l.binlog != nil {
m.LogFileIndex = l.binlog.LogFileIndex()
m.LogPos = l.binlog.LogFilePos()
l.Unlock()
}
var err error
@ -76,7 +72,7 @@ func (l *Ledis) Dump(w io.Writer) error {
return err
}
it := sp.NewIterator()
it := l.ldb.NewIterator()
it.SeekToFirst()
compressBuf := make([]byte, 4096)

View File

@ -2,7 +2,7 @@ package ledis
import (
"bytes"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"os"
"testing"
)
@ -59,7 +59,7 @@ func TestDump(t *testing.T) {
t.Fatal(err)
}
it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1)
it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()

View File

@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"sync"
"time"
)
@ -12,7 +12,7 @@ import (
type DB struct {
l *Ledis
db *leveldb.DB
db *store.DB
index uint8
@ -28,7 +28,7 @@ type Ledis struct {
cfg *Config
ldb *leveldb.DB
ldb *store.DB
dbs [MaxDBNumber]*DB
binlog *BinLog
@ -52,7 +52,7 @@ func Open(cfg *Config) (*Ledis, error) {
return nil, fmt.Errorf("must set correct data_dir")
}
ldb, err := leveldb.Open(cfg.NewDBConfig())
ldb, err := store.Open(cfg.NewDBConfig())
if err != nil {
return nil, err
}
@ -132,7 +132,7 @@ func (l *Ledis) FlushAll() error {
}
// very dangerous to use
func (l *Ledis) DataDB() *leveldb.DB {
func (l *Ledis) DataDB() *store.DB {
return l.ldb
}

View File

@ -1,7 +1,7 @@
package ledis
import (
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
)
func (db *DB) FlushAll() (drop int64, err error) {
@ -36,7 +36,7 @@ func (db *DB) newEliminator() *elimination {
}
func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) {
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen)
it := db.db.RangeIterator(minKey, maxKey, store.RangeROpen)
for ; it.Valid(); it.Next() {
t.Delete(it.RawKey())
drop++

View File

@ -3,14 +3,14 @@ package ledis
import (
"bytes"
"fmt"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"os"
"path"
"testing"
)
func checkLedisEqual(master *Ledis, slave *Ledis) error {
it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1)
it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"sort"
"time"
)
@ -253,7 +253,7 @@ func (db *DB) bDelete(t *tx, key []byte) (drop int64) {
minKey := db.bEncodeBinKey(key, minSeq)
maxKey := db.bEncodeBinKey(key, maxSeq)
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose)
it := db.db.RangeIterator(minKey, maxKey, store.RangeClose)
for ; it.Valid(); it.Next() {
t.Delete(it.RawKey())
drop++
@ -280,10 +280,10 @@ func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) {
return bk, segment, err
}
func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator {
func (db *DB) bIterator(key []byte) *store.RangeLimitIterator {
sk := db.bEncodeBinKey(key, minSeq)
ek := db.bEncodeBinKey(key, maxSeq)
return db.db.RangeIterator(sk, ek, leveldb.RangeClose)
return db.db.RangeIterator(sk, ek, store.RangeClose)
}
func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) {
@ -446,7 +446,7 @@ func (db *DB) BGet(key []byte) (data []byte, err error) {
minKey := db.bEncodeBinKey(key, minSeq)
maxKey := db.bEncodeBinKey(key, tailSeq)
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose)
it := db.db.RangeIterator(minKey, maxKey, store.RangeClose)
var seq, s, e uint32
for ; it.Valid(); it.Next() {
@ -662,7 +662,7 @@ func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error)
skey := db.bEncodeBinKey(key, sseq)
ekey := db.bEncodeBinKey(key, eseq)
it := db.db.RangeIterator(skey, ekey, leveldb.RangeOpen)
it := db.db.RangeIterator(skey, ekey, store.RangeOpen)
for ; it.Valid(); it.Next() {
segment = it.RawValue()
for _, bt := range segment {

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"time"
)
@ -132,7 +132,7 @@ func (db *DB) hDelete(t *tx, key []byte) int64 {
stop := db.hEncodeStopKey(key)
var num int64 = 0
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
num++
@ -354,7 +354,7 @@ func (db *DB) HGetAll(key []byte) ([]FVPair, error) {
v := make([]FVPair, 0, 16)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
_, f, err := db.hDecodeHashKey(it.Key())
if err != nil {
@ -379,7 +379,7 @@ func (db *DB) HKeys(key []byte) ([][]byte, error) {
v := make([][]byte, 0, 16)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
_, f, err := db.hDecodeHashKey(it.Key())
if err != nil {
@ -403,7 +403,7 @@ func (db *DB) HValues(key []byte) ([][]byte, error) {
v := make([][]byte, 0, 16)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
_, _, err := db.hDecodeHashKey(it.Key())
if err != nil {
@ -491,9 +491,9 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa
v := make([]FVPair, 0, count)
rangeType := leveldb.RangeROpen
rangeType := store.RangeROpen
if !inclusive {
rangeType = leveldb.RangeOpen
rangeType = store.RangeOpen
}
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)

View File

@ -2,7 +2,7 @@ package ledis
import (
"errors"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"time"
)
@ -343,9 +343,9 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) {
v := make([]KVPair, 0, 2*count)
rangeType := leveldb.RangeROpen
rangeType := store.RangeROpen
if !inclusive {
rangeType = leveldb.RangeOpen
rangeType = store.RangeOpen
}
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"time"
)
@ -203,7 +203,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq)
rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose})
rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose})
for ; rit.Valid(); rit.Next() {
t.Delete(rit.RawKey())
num++
@ -214,7 +214,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
return num
}
func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte
if it != nil {
v = it.Find(ek)
@ -364,12 +364,12 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
v := make([][]byte, 0, limit)
startKey := db.lEncodeListKey(key, headSeq)
rit := leveldb.NewRangeLimitIterator(it,
&leveldb.Range{
rit := store.NewRangeLimitIterator(it,
&store.Range{
Min: startKey,
Max: nil,
Type: leveldb.RangeClose},
&leveldb.Limit{
Type: store.RangeClose},
&store.Limit{
Offset: 0,
Count: int(limit)})

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"time"
)
@ -156,7 +156,7 @@ func (eli *elimination) active() {
minKey := db.expEncodeTimeKey(NoneType, nil, 0)
maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
tk := it.RawKey()
mk := it.RawValue()

View File

@ -4,7 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"time"
)
@ -432,7 +432,7 @@ func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) {
minKey := db.zEncodeStartScoreKey(key, min)
maxKey := db.zEncodeStopScoreKey(key, max)
rangeType := leveldb.RangeROpen
rangeType := store.RangeROpen
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1)
var n int64 = 0
@ -460,17 +460,17 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
if s, err := Int64(v, nil); err != nil {
return 0, err
} else {
var rit *leveldb.RangeLimitIterator
var rit *store.RangeLimitIterator
sk := db.zEncodeScoreKey(key, member, s)
if !reverse {
minKey := db.zEncodeStartScoreKey(key, MinScore)
rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose})
rit = store.NewRangeIterator(it, &store.Range{minKey, sk, store.RangeClose})
} else {
maxKey := db.zEncodeStopScoreKey(key, MaxScore)
rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose})
rit = store.NewRevRangeIterator(it, &store.Range{sk, maxKey, store.RangeClose})
}
var lastKey []byte = nil
@ -492,14 +492,14 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
return -1, nil
}
func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *leveldb.RangeLimitIterator {
func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator {
minKey := db.zEncodeStartScoreKey(key, min)
maxKey := db.zEncodeStopScoreKey(key, max)
if !reverse {
return db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count)
return db.db.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
} else {
return db.db.RevRangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count)
return db.db.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
}
}
@ -550,10 +550,10 @@ func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, re
v := make([]ScorePair, 0, nv)
var it *leveldb.RangeLimitIterator
var it *store.RangeLimitIterator
//if reverse and offset is 0, count < 0, we may use forward iterator then reverse
//because leveldb iterator prev is slower than next
//because store iterator prev is slower than next
if !reverse || (offset == 0 && count < 0) {
it = db.zIterator(key, min, max, offset, count, false)
} else {
@ -740,7 +740,7 @@ func (db *DB) zFlush() (drop int64, err error) {
maxKey[0] = db.index
maxKey[1] = ZScoreType + 1
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
defer it.Close()
for ; it.Valid(); it.Next() {
@ -779,9 +779,9 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco
v := make([]ScorePair, 0, 2*count)
rangeType := leveldb.RangeROpen
rangeType := store.RangeROpen
if !inclusive {
rangeType = leveldb.RangeOpen
rangeType = store.RangeOpen
}
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)

View File

@ -1,7 +1,7 @@
package ledis
import (
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"sync"
)
@ -9,7 +9,7 @@ type tx struct {
m sync.Mutex
l *Ledis
wb *leveldb.WriteBatch
wb *store.WriteBatch
binlog *BinLog
batch [][]byte

View File

@ -1,60 +0,0 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdint.h>
// #include "leveldb/c.h"
import "C"
type Snapshot struct {
db *DB
snap *C.leveldb_snapshot_t
readOpts *ReadOptions
iteratorOpts *ReadOptions
}
func (s *Snapshot) Close() {
C.leveldb_release_snapshot(s.db.db, s.snap)
s.iteratorOpts.Close()
s.readOpts.Close()
}
func (s *Snapshot) Get(key []byte) ([]byte, error) {
return s.db.get(nil, s.readOpts, key)
}
func (s *Snapshot) BufGet(r []byte, key []byte) ([]byte, error) {
return s.db.get(r, s.readOpts, key)
}
func (s *Snapshot) NewIterator() *Iterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(s.db.db, s.iteratorOpts.Opt)
return it
}
func (s *Snapshot) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}

View File

@ -3,14 +3,14 @@ package server
import (
"bytes"
"fmt"
"github.com/siddontang/ledisdb/leveldb"
"github.com/siddontang/ledisdb/store"
"os"
"testing"
"time"
)
func checkDataEqual(master *App, slave *App) error {
it := master.ldb.DataDB().RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1)
it := master.ldb.DataDB().RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()

View File

@ -13,11 +13,13 @@ type Config struct {
DataDir string `json:"data_dir"`
DB struct {
Name string `json:"name"`
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
MaxOpenFiles int `json:"max_open_files"`
MapSize int `json:"map_size"`
} `json:"db"`
BinLog struct {

17
store/config.go Normal file
View File

@ -0,0 +1,17 @@
package store
type Config struct {
Name string `json:"name"`
Path string `json:"path"`
//for leveldb, goleveldb
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
MaxOpenFiles int `json:"max_open_files"`
//for lmdb
MapSize int `json:"map_size"`
}

75
store/db.go Normal file
View File

@ -0,0 +1,75 @@
package store
import (
"github.com/siddontang/ledisdb/store/driver"
)
type DB struct {
db driver.IDB
}
// Close database
//
// Caveat
// Any other DB operations like Get, Put, etc... may cause a panic after Close
//
func (db *DB) Close() error {
if db.db == nil {
return nil
}
err := db.db.Close()
db.db = nil
return err
}
// Get Value with Key
func (db *DB) Get(key []byte) ([]byte, error) {
return db.db.Get(key)
}
// Put value with key
func (db *DB) Put(key []byte, value []byte) error {
err := db.db.Put(key, value)
return err
}
// Delete by key
func (db *DB) Delete(key []byte) error {
err := db.db.Delete(key)
return err
}
func (db *DB) NewIterator() *Iterator {
it := new(Iterator)
it.it = db.db.NewIterator()
return it
}
func (db *DB) NewWriteBatch() *WriteBatch {
return &WriteBatch{db.db.NewWriteBatch()}
}
func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}

39
store/driver/driver.go Normal file
View File

@ -0,0 +1,39 @@
package driver
type IDB interface {
Close() error
Get(key []byte) ([]byte, error)
Put(key []byte, value []byte) error
Delete(key []byte) error
NewIterator() IIterator
NewWriteBatch() IWriteBatch
}
type IIterator interface {
Close() error
First()
Last()
Seek(key []byte)
Next()
Prev()
Valid() bool
Key() []byte
Value() []byte
}
type IWriteBatch interface {
Close() error
Put(key []byte, value []byte)
Delete(key []byte)
Commit() error
Rollback() error
}

30
store/goleveldb.go Normal file
View File

@ -0,0 +1,30 @@
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/goleveldb"
)
const GoLevelDBName = "goleveldb"
type GoLevelDBStore struct {
}
func (s GoLevelDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &goleveldb.Config{}
copier.Copy(c, cfg)
return goleveldb.Open(c)
}
func (s GoLevelDBStore) Repair(cfg *Config) error {
c := &goleveldb.Config{}
copier.Copy(c, cfg)
return goleveldb.Repair(c)
}
func init() {
Register(GoLevelDBName, GoLevelDBStore{})
}

31
store/goleveldb/batch.go Normal file
View File

@ -0,0 +1,31 @@
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb"
)
type WriteBatch struct {
db *DB
wbatch *leveldb.Batch
}
func (w *WriteBatch) Close() error {
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
w.wbatch.Put(key, value)
}
func (w *WriteBatch) Delete(key []byte) {
w.wbatch.Delete(key)
}
func (w *WriteBatch) Commit() error {
return w.db.db.Write(w.wbatch, nil)
}
func (w *WriteBatch) Rollback() error {
w.wbatch.Reset()
return nil
}

137
store/goleveldb/db.go Normal file
View File

@ -0,0 +1,137 @@
package goleveldb
import (
"github.com/siddontang/ledisdb/store/driver"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"os"
)
const defaultFilterBits int = 10
type Config struct {
Path string `json:"path"`
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
MaxOpenFiles int `json:"max_open_files"`
}
type DB struct {
cfg *Config
db *leveldb.DB
opts *opt.Options
iteratorOpts *opt.ReadOptions
cache cache.Cache
filter filter.Filter
}
func Open(cfg *Config) (*DB, error) {
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
db := new(DB)
db.cfg = cfg
if err := db.open(); err != nil {
return nil, err
}
return db, nil
}
func Repair(cfg *Config) error {
db, err := leveldb.RecoverFile(cfg.Path, newOptions(cfg))
if err != nil {
return err
}
db.Close()
return nil
}
func (db *DB) open() error {
db.opts = newOptions(db.cfg)
db.iteratorOpts = &opt.ReadOptions{}
db.iteratorOpts.DontFillCache = true
var err error
db.db, err = leveldb.OpenFile(db.cfg.Path, db.opts)
return err
}
func newOptions(cfg *Config) *opt.Options {
opts := &opt.Options{}
opts.ErrorIfMissing = false
if cfg.CacheSize > 0 {
opts.BlockCache = cache.NewLRUCache(cfg.CacheSize)
}
//we must use bloomfilter
opts.Filter = filter.NewBloomFilter(defaultFilterBits)
if !cfg.Compression {
opts.Compression = opt.NoCompression
} else {
opts.Compression = opt.SnappyCompression
}
if cfg.BlockSize > 0 {
opts.BlockSize = cfg.BlockSize
}
if cfg.WriteBufferSize > 0 {
opts.WriteBuffer = cfg.WriteBufferSize
}
return opts
}
func (db *DB) Close() error {
return db.db.Close()
}
func (db *DB) Put(key, value []byte) error {
return db.db.Put(key, value, nil)
}
func (db *DB) Get(key []byte) ([]byte, error) {
v, err := db.db.Get(key, nil)
if err == leveldb.ErrNotFound {
return nil, nil
}
return v, nil
}
func (db *DB) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: new(leveldb.Batch),
}
return wb
}
func (db *DB) NewIterator() driver.IIterator {
it := &Iterator{
db.db.NewIterator(nil, db.iteratorOpts),
}
return it
}

View File

@ -0,0 +1,49 @@
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb/iterator"
)
type Iterator struct {
it iterator.Iterator
}
func (it *Iterator) Key() []byte {
return it.it.Key()
}
func (it *Iterator) Value() []byte {
return it.it.Value()
}
func (it *Iterator) Close() error {
if it.it != nil {
it.it.Release()
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return it.it.Valid()
}
func (it *Iterator) Next() {
it.it.Next()
}
func (it *Iterator) Prev() {
it.it.Prev()
}
func (it *Iterator) First() {
it.it.First()
}
func (it *Iterator) Last() {
it.it.Last()
}
func (it *Iterator) Seek(key []byte) {
it.it.Seek(key)
}

View File

@ -1,14 +1,8 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdlib.h>
// #include "leveldb/c.h"
// #include "leveldb_ext.h"
import "C"
package store
import (
"bytes"
"unsafe"
"github.com/siddontang/ledisdb/store/driver"
)
const (
@ -45,54 +39,39 @@ type Limit struct {
}
type Iterator struct {
it *C.leveldb_iterator_t
isValid C.uchar
it driver.IIterator
}
// Returns a copy of key.
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
k := it.it.Key()
if k == nil {
return nil
}
return C.GoBytes(unsafe.Pointer(kdata), C.int(klen))
return append([]byte{}, k...)
}
// Returns a copy of value.
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
v := it.it.Value()
if v == nil {
return nil
}
return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen))
return append([]byte{}, v...)
}
// Returns a reference of key.
// you must be careful that it will be changed after next iterate.
func (it *Iterator) RawKey() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
return it.it.Key()
}
// Returns a reference of value.
// you must be careful that it will be changed after next iterate.
func (it *Iterator) RawValue() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
return it.it.Value()
}
// Copy key to b, if b len is small or nil, returns a new one.
@ -126,33 +105,33 @@ func (it *Iterator) BufValue(b []byte) []byte {
func (it *Iterator) Close() {
if it.it != nil {
C.leveldb_iter_destroy(it.it)
it.it.Close()
it.it = nil
}
}
func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid)
return it.it.Valid()
}
func (it *Iterator) Next() {
it.isValid = C.leveldb_iter_next_ext(it.it)
it.it.Next()
}
func (it *Iterator) Prev() {
it.isValid = C.leveldb_iter_prev_ext(it.it)
it.it.Prev()
}
func (it *Iterator) SeekToFirst() {
it.isValid = C.leveldb_iter_seek_to_first_ext(it.it)
it.it.First()
}
func (it *Iterator) SeekToLast() {
it.isValid = C.leveldb_iter_seek_to_last_ext(it.it)
it.it.Last()
}
func (it *Iterator) Seek(key []byte) {
it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
it.it.Seek(key)
}
// Finds by key, if not found, nil returns.

32
store/leveldb.go Normal file
View File

@ -0,0 +1,32 @@
// +build leveldb
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/leveldb"
)
const LevelDBName = "leveldb"
type LevelDBStore struct {
}
func (s LevelDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &leveldb.Config{}
copier.Copy(c, cfg)
return leveldb.Open(c)
}
func (s LevelDBStore) Repair(cfg *Config) error {
c := &leveldb.Config{}
copier.Copy(c, cfg)
return leveldb.Repair(c)
}
func init() {
Register(LevelDBName, LevelDBStore{})
}

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb
// #cgo LDFLAGS: -lleveldb
@ -13,8 +15,9 @@ type WriteBatch struct {
wbatch *C.leveldb_writebatch_t
}
func (w *WriteBatch) Close() {
func (w *WriteBatch) Close() error {
C.leveldb_writebatch_destroy(w.wbatch)
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
@ -41,12 +44,9 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncWriteOpts)
}
func (w *WriteBatch) Rollback() {
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil
}
func (w *WriteBatch) commit(wb *WriteOptions) error {

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb
// #cgo LDFLAGS: -lleveldb

View File

@ -1,3 +1,5 @@
// +build leveldb
// Package leveldb is a wrapper for c++ leveldb
package leveldb
@ -9,7 +11,7 @@ package leveldb
import "C"
import (
"encoding/json"
"github.com/siddontang/ledisdb/store/driver"
"os"
"unsafe"
)
@ -26,35 +28,6 @@ type Config struct {
MaxOpenFiles int `json:"max_open_files"`
}
type DB struct {
cfg *Config
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncWriteOpts *WriteOptions
cache *Cache
filter *FilterPolicy
}
func OpenWithJsonConfig(configJson json.RawMessage) (*DB, error) {
cfg := new(Config)
err := json.Unmarshal(configJson, cfg)
if err != nil {
return nil, err
}
return Open(cfg)
}
func Open(cfg *Config) (*DB, error) {
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
@ -70,21 +43,6 @@ func Open(cfg *Config) (*DB, error) {
return db, nil
}
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func Repair(cfg *Config) error {
db := new(DB)
db.cfg = cfg
@ -108,6 +66,38 @@ func Repair(cfg *Config) error {
return nil
}
type DB struct {
cfg *Config
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
cache *Cache
filter *FilterPolicy
}
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func (db *DB) initOptions(cfg *Config) {
opts := NewOptions()
@ -126,6 +116,8 @@ func (db *DB) initOptions(cfg *Config) {
if !cfg.Compression {
opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
}
if cfg.BlockSize <= 0 {
@ -153,9 +145,6 @@ func (db *DB) initOptions(cfg *Config) {
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
db.syncWriteOpts = NewWriteOptions()
db.syncWriteOpts.SetSync(true)
}
func (db *DB) Close() error {
@ -177,80 +166,23 @@ func (db *DB) Close() error {
db.readOpts.Close()
db.writeOpts.Close()
db.iteratorOpts.Close()
db.syncWriteOpts.Close()
return nil
}
func (db *DB) Destroy() error {
path := db.cfg.Path
db.Close()
opts := NewOptions()
defer opts.Close()
var errStr *C.char
ldbname := C.CString(path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
C.leveldb_destroy_db(opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) Clear() error {
bc := db.NewWriteBatch()
defer bc.Close()
var err error
it := db.NewIterator()
it.SeekToFirst()
num := 0
for ; it.Valid(); it.Next() {
bc.Delete(it.RawKey())
num++
if num == 1000 {
num = 0
if err = bc.Commit(); err != nil {
return err
}
}
}
err = bc.Commit()
return err
}
func (db *DB) Put(key, value []byte) error {
return db.put(db.writeOpts, key, value)
}
func (db *DB) SyncPut(key, value []byte) error {
return db.put(db.syncWriteOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) {
return db.get(nil, db.readOpts, key)
}
func (db *DB) BufGet(r []byte, key []byte) ([]byte, error) {
return db.get(r, db.readOpts, key)
return db.get(db.readOpts, key)
}
func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncWriteOpts, key)
}
func (db *DB) NewWriteBatch() *WriteBatch {
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: C.leveldb_writebatch_create(),
@ -258,22 +190,7 @@ func (db *DB) NewWriteBatch() *WriteBatch {
return wb
}
func (db *DB) NewSnapshot() *Snapshot {
s := &Snapshot{
db: db,
snap: C.leveldb_create_snapshot(db.db),
readOpts: NewReadOptions(),
iteratorOpts: NewReadOptions(),
}
s.readOpts.SetSnapshot(s)
s.iteratorOpts.SetSnapshot(s)
s.iteratorOpts.SetFillCache(false)
return s
}
func (db *DB) NewIterator() *Iterator {
func (db *DB) NewIterator() driver.IIterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt)
@ -281,28 +198,6 @@ func (db *DB) NewIterator() *Iterator {
return it
}
func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
func (db *DB) put(wo *WriteOptions, key, value []byte) error {
var errStr *C.char
var k, v *C.char
@ -324,7 +219,7 @@ func (db *DB) put(wo *WriteOptions, key, value []byte) error {
return nil
}
func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) {
func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) {
var errStr *C.char
var vallen C.size_t
var k *C.char
@ -347,13 +242,7 @@ func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) {
defer C.leveldb_get_free_ext(unsafe.Pointer(c))
if r == nil {
r = []byte{}
}
r = r[0:0]
b := slice(unsafe.Pointer(value), int(C.int(vallen)))
return append(r, b...), nil
return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil
}
func (db *DB) delete(wo *WriteOptions, key []byte) error {

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb
// #cgo LDFLAGS: -lleveldb

70
store/leveldb/iterator.go Normal file
View File

@ -0,0 +1,70 @@
// +build leveldb
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdlib.h>
// #include "leveldb/c.h"
// #include "leveldb_ext.h"
import "C"
import (
"unsafe"
)
type Iterator struct {
it *C.leveldb_iterator_t
isValid C.uchar
}
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
}
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
}
func (it *Iterator) Close() error {
if it.it != nil {
C.leveldb_iter_destroy(it.it)
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid)
}
func (it *Iterator) Next() {
it.isValid = C.leveldb_iter_next_ext(it.it)
}
func (it *Iterator) Prev() {
it.isValid = C.leveldb_iter_prev_ext(it.it)
}
func (it *Iterator) First() {
it.isValid = C.leveldb_iter_seek_to_first_ext(it.it)
}
func (it *Iterator) Last() {
it.isValid = C.leveldb_iter_seek_to_last_ext(it.it)
}
func (it *Iterator) Seek(key []byte) {
it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}

View File

@ -1,3 +1,5 @@
// +build leveldb
#include "leveldb_ext.h"
#include <stdlib.h>

View File

@ -1,3 +1,5 @@
// +build leveldb
#ifndef LEVELDB_EXT_H
#define LEVELDB_EXT_H

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb
// #cgo LDFLAGS: -lleveldb
@ -103,14 +105,6 @@ func (ro *ReadOptions) SetFillCache(b bool) {
C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetSnapshot(snap *Snapshot) {
var s *C.leveldb_snapshot_t
if snap != nil {
s = snap.snap
}
C.leveldb_readoptions_set_snapshot(ro.Opt, s)
}
func (wo *WriteOptions) Close() {
C.leveldb_writeoptions_destroy(wo.Opt)
}

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb
// #include "leveldb/c.h"

30
store/mdb.go Normal file
View File

@ -0,0 +1,30 @@
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/mdb"
)
const LMDBName = "lmdb"
type LMDBStore struct {
}
func (s LMDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &mdb.Config{}
copier.Copy(c, cfg)
return mdb.Open(c)
}
func (s LMDBStore) Repair(cfg *Config) error {
c := &mdb.Config{}
copier.Copy(c, cfg)
return mdb.Repair(c)
}
func init() {
Register(LMDBName, LMDBStore{})
}

32
store/mdb/batch.go Normal file
View File

@ -0,0 +1,32 @@
package mdb
type Write struct {
Key []byte
Value []byte
}
type WriteBatch struct {
db *MDB
wb []Write
}
func (w *WriteBatch) Close() error {
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
w.wb = append(w.wb, Write{key, value})
}
func (w *WriteBatch) Delete(key []byte) {
w.wb = append(w.wb, Write{key, nil})
}
func (w *WriteBatch) Commit() error {
return w.db.BatchPut(w.wb)
}
func (w *WriteBatch) Rollback() error {
w.wb = w.wb[0:0]
return nil
}

View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013-2014 Errplane Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

255
store/mdb/mdb.go Normal file
View File

@ -0,0 +1,255 @@
package mdb
import (
mdb "github.com/influxdb/gomdb"
"github.com/siddontang/ledisdb/store/driver"
"os"
)
type Config struct {
Path string `json:"path"`
MapSize int `json:"map_size"`
}
type MDB struct {
env *mdb.Env
db mdb.DBI
path string
}
func Open(c *Config) (MDB, error) {
path := c.Path
if c.MapSize == 0 {
c.MapSize = 1024 * 1024 * 1024
}
env, err := mdb.NewEnv()
if err != nil {
return MDB{}, err
}
// TODO: max dbs should be configurable
if err := env.SetMaxDBs(1); err != nil {
return MDB{}, err
}
if err := env.SetMapSize(uint64(c.MapSize)); err != nil {
return MDB{}, err
}
if _, err := os.Stat(path); err != nil {
err = os.MkdirAll(path, 0755)
if err != nil {
return MDB{}, err
}
}
err = env.Open(path, mdb.NOSYNC|mdb.NOMETASYNC|mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755)
if err != nil {
return MDB{}, err
}
tx, err := env.BeginTxn(nil, 0)
if err != nil {
return MDB{}, err
}
dbi, err := tx.DBIOpen(nil, mdb.CREATE)
if err != nil {
return MDB{}, err
}
if err := tx.Commit(); err != nil {
return MDB{}, err
}
db := MDB{
env: env,
db: dbi,
path: path,
}
return db, nil
}
func Repair(c *Config) error {
println("llmd not supports repair")
return nil
}
func (db MDB) Put(key, value []byte) error {
itr := db.iterator(false)
defer itr.Close()
itr.err = itr.c.Put(key, value, 0)
itr.setState()
return itr.Error()
}
func (db MDB) BatchPut(writes []Write) error {
itr := db.iterator(false)
for _, w := range writes {
if w.Value == nil {
itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}
} else {
itr.err = itr.c.Put(w.Key, w.Value, 0)
}
if itr.err != nil && itr.err != mdb.NotFound {
break
}
}
itr.setState()
return itr.Close()
}
func (db MDB) Get(key []byte) ([]byte, error) {
tx, err := db.env.BeginTxn(nil, mdb.RDONLY)
if err != nil {
return nil, err
}
defer tx.Commit()
v, err := tx.Get(db.db, key)
if err == mdb.NotFound {
return nil, nil
}
return v, err
}
func (db MDB) Delete(key []byte) error {
itr := db.iterator(false)
defer itr.Close()
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}
itr.setState()
return itr.Error()
}
type MDBIterator struct {
key []byte
value []byte
c *mdb.Cursor
tx *mdb.Txn
valid bool
err error
}
func (itr *MDBIterator) Key() []byte {
return itr.key
}
func (itr *MDBIterator) Value() []byte {
return itr.value
}
func (itr *MDBIterator) Valid() bool {
return itr.valid
}
func (itr *MDBIterator) Error() error {
return itr.err
}
func (itr *MDBIterator) getCurrent() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.GET_CURRENT)
itr.setState()
}
func (itr *MDBIterator) Seek(key []byte) {
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET_RANGE)
itr.setState()
}
func (itr *MDBIterator) Next() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.NEXT)
itr.setState()
}
func (itr *MDBIterator) Prev() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.PREV)
itr.setState()
}
func (itr *MDBIterator) First() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.FIRST)
itr.setState()
}
func (itr *MDBIterator) Last() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.LAST)
itr.setState()
}
func (itr *MDBIterator) setState() {
if itr.err != nil {
if itr.err == mdb.NotFound {
itr.err = nil
}
itr.valid = false
}
}
func (itr *MDBIterator) Close() error {
if err := itr.c.Close(); err != nil {
itr.tx.Abort()
return err
}
if itr.err != nil {
itr.tx.Abort()
return itr.err
}
return itr.tx.Commit()
}
func (_ MDB) Name() string {
return "lmdb"
}
func (db MDB) Path() string {
return db.path
}
func (db MDB) Compact() {
}
func (db MDB) iterator(rdonly bool) *MDBIterator {
flags := uint(0)
if rdonly {
flags = mdb.RDONLY
}
tx, err := db.env.BeginTxn(nil, flags)
if err != nil {
return &MDBIterator{nil, nil, nil, nil, false, err}
}
c, err := tx.CursorOpen(db.db)
if err != nil {
tx.Abort()
return &MDBIterator{nil, nil, nil, nil, false, err}
}
return &MDBIterator{nil, nil, c, tx, true, nil}
}
func (db MDB) Close() error {
db.env.DBIClose(db.db)
if err := db.env.Close(); err != nil {
panic(err)
}
return nil
}
func (db MDB) NewIterator() driver.IIterator {
return db.iterator(true)
}
func (db MDB) NewWriteBatch() driver.IWriteBatch {
return &WriteBatch{&db, []Write{}}
}

32
store/rocksdb.go Normal file
View File

@ -0,0 +1,32 @@
// +build rocksdb
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/rocksdb"
)
const RocksDBName = "rocksdb"
type RocksDBStore struct {
}
func (s RocksDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &rocksdb.Config{}
copier.Copy(c, cfg)
return rocksdb.Open(c)
}
func (s RocksDBStore) Repair(cfg *Config) error {
c := &rocksdb.Config{}
copier.Copy(c, cfg)
return rocksdb.Repair(c)
}
func init() {
Register(RocksDBName, RocksDBStore{})
}

59
store/rocksdb/batch.go Normal file
View File

@ -0,0 +1,59 @@
// +build rocksdb
package rocksdb
// #cgo LDFLAGS: -lrocksdb
// #include "rocksdb/c.h"
import "C"
import (
"unsafe"
)
type WriteBatch struct {
db *DB
wbatch *C.rocksdb_writebatch_t
}
func (w *WriteBatch) Close() error {
C.rocksdb_writebatch_destroy(w.wbatch)
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.rocksdb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv))
}
func (w *WriteBatch) Delete(key []byte) {
C.rocksdb_writebatch_delete(w.wbatch,
(*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}
func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) Rollback() error {
C.rocksdb_writebatch_clear(w.wbatch)
return nil
}
func (w *WriteBatch) commit(wb *WriteOptions) error {
var errStr *C.char
C.rocksdb_write(w.db.db, wb.Opt, w.wbatch, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}

20
store/rocksdb/cache.go Normal file
View File

@ -0,0 +1,20 @@
// +build rocksdb
package rocksdb
// #cgo LDFLAGS: -lrocksdb
// #include <stdint.h>
// #include "rocksdb/c.h"
import "C"
type Cache struct {
Cache *C.rocksdb_cache_t
}
func NewLRUCache(capacity int) *Cache {
return &Cache{C.rocksdb_cache_create_lru(C.size_t(capacity))}
}
func (c *Cache) Close() {
C.rocksdb_cache_destroy(c.Cache)
}

279
store/rocksdb/db.go Normal file
View File

@ -0,0 +1,279 @@
// +build rocksdb
// Package rocksdb is a wrapper for c++ rocksdb
package rocksdb
/*
#cgo LDFLAGS: -lrocksdb
#include <rocksdb/c.h>
#include <stdlib.h>
#include "rocksdb_ext.h"
*/
import "C"
import (
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
const defaultFilterBits int = 10
type Config struct {
Path string `json:"path"`
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
MaxOpenFiles int `json:"max_open_files"`
}
func Open(cfg *Config) (*DB, error) {
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
db := new(DB)
db.cfg = cfg
if err := db.open(); err != nil {
return nil, err
}
return db, nil
}
func Repair(cfg *Config) error {
db := new(DB)
db.cfg = cfg
err := db.open()
defer db.Close()
//open ok, do not need repair
if err == nil {
return nil
}
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.free(unsafe.Pointer(ldbname))
C.rocksdb_repair_db(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
type DB struct {
cfg *Config
db *C.rocksdb_t
env *Env
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
cache *Cache
filter *FilterPolicy
}
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.free(unsafe.Pointer(ldbname))
db.db = C.rocksdb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func (db *DB) initOptions(cfg *Config) {
opts := NewOptions()
opts.SetCreateIfMissing(true)
if cfg.CacheSize <= 0 {
cfg.CacheSize = 4 * 1024 * 1024
}
db.env = NewDefaultEnv()
db.env.SetBackgroundThreads(runtime.NumCPU() * 2)
db.env.SetHighPriorityBackgroundThreads(1)
opts.SetEnv(db.env)
db.cache = NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
//we must use bloomfilter
db.filter = NewBloomFilter(defaultFilterBits)
opts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
}
if cfg.BlockSize <= 0 {
cfg.BlockSize = 4 * 1024
}
opts.SetBlockSize(cfg.BlockSize)
if cfg.WriteBufferSize <= 0 {
cfg.WriteBufferSize = 4 * 1024 * 1024
}
opts.SetWriteBufferSize(cfg.WriteBufferSize)
if cfg.MaxOpenFiles < 1024 {
cfg.MaxOpenFiles = 1024
}
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1)
opts.SetMaxBackgroundFlushes(1)
opts.SetLevel0SlowdownWritesTrigger(16)
opts.SetLevel0StopWritesTrigger(64)
opts.SetTargetFileSizeBase(32 * 1024 * 1024)
db.opts = opts
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
func (db *DB) Close() error {
if db.db != nil {
C.rocksdb_close(db.db)
db.db = nil
}
db.opts.Close()
if db.cache != nil {
db.cache.Close()
}
if db.filter != nil {
db.filter.Close()
}
if db.env != nil {
db.env.Close()
}
db.readOpts.Close()
db.writeOpts.Close()
db.iteratorOpts.Close()
return nil
}
func (db *DB) Put(key, value []byte) error {
return db.put(db.writeOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) {
return db.get(db.readOpts, key)
}
func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: C.rocksdb_writebatch_create(),
}
return wb
}
func (db *DB) NewIterator() driver.IIterator {
it := new(Iterator)
it.it = C.rocksdb_create_iterator(db.db, db.iteratorOpts.Opt)
return it
}
func (db *DB) put(wo *WriteOptions, key, value []byte) error {
var errStr *C.char
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.rocksdb_put(
db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) {
var errStr *C.char
var vallen C.size_t
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
value := C.rocksdb_get(
db.db, ro.Opt, k, C.size_t(len(key)), &vallen, &errStr)
if errStr != nil {
return nil, saveError(errStr)
}
if value == nil {
return nil, nil
}
defer C.free(unsafe.Pointer(value))
return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil
}
func (db *DB) delete(wo *WriteOptions, key []byte) error {
var errStr *C.char
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
C.rocksdb_delete(
db.db, wo.Opt, k, C.size_t(len(key)), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}

27
store/rocksdb/env.go Normal file
View File

@ -0,0 +1,27 @@
// +build rocksdb
package rocksdb
// #cgo LDFLAGS: -lrocksdb
// #include "rocksdb/c.h"
import "C"
type Env struct {
Env *C.rocksdb_env_t
}
func NewDefaultEnv() *Env {
return &Env{C.rocksdb_create_default_env()}
}
func (env *Env) SetHighPriorityBackgroundThreads(n int) {
C.rocksdb_env_set_high_priority_background_threads(env.Env, C.int(n))
}
func (env *Env) SetBackgroundThreads(n int) {
C.rocksdb_env_set_background_threads(env.Env, C.int(n))
}
func (env *Env) Close() {
C.rocksdb_env_destroy(env.Env)
}

View File

@ -0,0 +1,21 @@
// +build rocksdb
package rocksdb
// #cgo LDFLAGS: -lrocksdb
// #include <stdlib.h>
// #include "rocksdb/c.h"
import "C"
type FilterPolicy struct {
Policy *C.rocksdb_filterpolicy_t
}
func NewBloomFilter(bitsPerKey int) *FilterPolicy {
policy := C.rocksdb_filterpolicy_create_bloom(C.int(bitsPerKey))
return &FilterPolicy{policy}
}
func (fp *FilterPolicy) Close() {
C.rocksdb_filterpolicy_destroy(fp.Policy)
}

70
store/rocksdb/iterator.go Normal file
View File

@ -0,0 +1,70 @@
// +build rocksdb
package rocksdb
// #cgo LDFLAGS: -lrocksdb
// #include <stdlib.h>
// #include "rocksdb/c.h"
// #include "rocksdb_ext.h"
import "C"
import (
"unsafe"
)
type Iterator struct {
it *C.rocksdb_iterator_t
isValid C.uchar
}
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.rocksdb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
}
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.rocksdb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
}
func (it *Iterator) Close() error {
if it.it != nil {
C.rocksdb_iter_destroy(it.it)
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid)
}
func (it *Iterator) Next() {
it.isValid = C.rocksdb_iter_next_ext(it.it)
}
func (it *Iterator) Prev() {
it.isValid = C.rocksdb_iter_prev_ext(it.it)
}
func (it *Iterator) First() {
it.isValid = C.rocksdb_iter_seek_to_first_ext(it.it)
}
func (it *Iterator) Last() {
it.isValid = C.rocksdb_iter_seek_to_last_ext(it.it)
}
func (it *Iterator) Seek(key []byte) {
it.isValid = C.rocksdb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}

138
store/rocksdb/options.go Normal file
View File

@ -0,0 +1,138 @@
// +build rocksdb
package rocksdb
// #cgo LDFLAGS: -lrocksdb
// #include "rocksdb/c.h"
import "C"
type CompressionOpt int
const (
NoCompression = CompressionOpt(0)
SnappyCompression = CompressionOpt(1)
)
type Options struct {
Opt *C.rocksdb_options_t
}
type ReadOptions struct {
Opt *C.rocksdb_readoptions_t
}
type WriteOptions struct {
Opt *C.rocksdb_writeoptions_t
}
func NewOptions() *Options {
opt := C.rocksdb_options_create()
return &Options{opt}
}
func NewReadOptions() *ReadOptions {
opt := C.rocksdb_readoptions_create()
return &ReadOptions{opt}
}
func NewWriteOptions() *WriteOptions {
opt := C.rocksdb_writeoptions_create()
return &WriteOptions{opt}
}
func (o *Options) Close() {
C.rocksdb_options_destroy(o.Opt)
}
func (o *Options) SetComparator(cmp *C.rocksdb_comparator_t) {
C.rocksdb_options_set_comparator(o.Opt, cmp)
}
func (o *Options) SetErrorIfExists(error_if_exists bool) {
eie := boolToUchar(error_if_exists)
C.rocksdb_options_set_error_if_exists(o.Opt, eie)
}
func (o *Options) SetCache(cache *Cache) {
C.rocksdb_options_set_cache(o.Opt, cache.Cache)
}
func (o *Options) SetEnv(env *Env) {
C.rocksdb_options_set_env(o.Opt, env.Env)
}
func (o *Options) SetWriteBufferSize(s int) {
C.rocksdb_options_set_write_buffer_size(o.Opt, C.size_t(s))
}
func (o *Options) SetParanoidChecks(pc bool) {
C.rocksdb_options_set_paranoid_checks(o.Opt, boolToUchar(pc))
}
func (o *Options) SetMaxOpenFiles(n int) {
C.rocksdb_options_set_max_open_files(o.Opt, C.int(n))
}
func (o *Options) SetBlockSize(s int) {
C.rocksdb_options_set_block_size(o.Opt, C.size_t(s))
}
func (o *Options) SetBlockRestartInterval(n int) {
C.rocksdb_options_set_block_restart_interval(o.Opt, C.int(n))
}
func (o *Options) SetCompression(t CompressionOpt) {
C.rocksdb_options_set_compression(o.Opt, C.int(t))
}
func (o *Options) SetCreateIfMissing(b bool) {
C.rocksdb_options_set_create_if_missing(o.Opt, boolToUchar(b))
}
func (o *Options) SetFilterPolicy(fp *FilterPolicy) {
var policy *C.rocksdb_filterpolicy_t
if fp != nil {
policy = fp.Policy
}
C.rocksdb_options_set_filter_policy(o.Opt, policy)
}
func (o *Options) SetMaxBackgroundCompactions(n int) {
C.rocksdb_options_set_max_background_compactions(o.Opt, C.int(n))
}
func (o *Options) SetMaxBackgroundFlushes(n int) {
C.rocksdb_options_set_max_background_flushes(o.Opt, C.int(n))
}
func (o *Options) SetLevel0SlowdownWritesTrigger(n int) {
C.rocksdb_options_set_level0_slowdown_writes_trigger(o.Opt, C.int(n))
}
func (o *Options) SetLevel0StopWritesTrigger(n int) {
C.rocksdb_options_set_level0_stop_writes_trigger(o.Opt, C.int(n))
}
func (o *Options) SetTargetFileSizeBase(n int) {
C.rocksdb_options_set_target_file_size_base(o.Opt, C.uint64_t(uint64(n)))
}
func (ro *ReadOptions) Close() {
C.rocksdb_readoptions_destroy(ro.Opt)
}
func (ro *ReadOptions) SetVerifyChecksums(b bool) {
C.rocksdb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetFillCache(b bool) {
C.rocksdb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b))
}
func (wo *WriteOptions) Close() {
C.rocksdb_writeoptions_destroy(wo.Opt)
}
func (wo *WriteOptions) SetSync(b bool) {
C.rocksdb_writeoptions_set_sync(wo.Opt, boolToUchar(b))
}

View File

@ -0,0 +1,36 @@
// +build rocksdb
#include "rocksdb_ext.h"
#include <stdlib.h>
#include <string>
extern "C" {
unsigned char rocksdb_iter_seek_to_first_ext(rocksdb_iterator_t* iter) {
rocksdb_iter_seek_to_first(iter);
return rocksdb_iter_valid(iter);
}
unsigned char rocksdb_iter_seek_to_last_ext(rocksdb_iterator_t* iter) {
rocksdb_iter_seek_to_last(iter);
return rocksdb_iter_valid(iter);
}
unsigned char rocksdb_iter_seek_ext(rocksdb_iterator_t* iter, const char* k, size_t klen) {
rocksdb_iter_seek(iter, k, klen);
return rocksdb_iter_valid(iter);
}
unsigned char rocksdb_iter_next_ext(rocksdb_iterator_t* iter) {
rocksdb_iter_next(iter);
return rocksdb_iter_valid(iter);
}
unsigned char rocksdb_iter_prev_ext(rocksdb_iterator_t* iter) {
rocksdb_iter_prev(iter);
return rocksdb_iter_valid(iter);
}
}

View File

@ -0,0 +1,24 @@
// +build rocksdb
#ifndef ROCKSDB_EXT_H
#define ROCKSDB_EXT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "rocksdb/c.h"
// Below iterator functions like rocksdb iterator but returns valid status for iterator
extern unsigned char rocksdb_iter_seek_to_first_ext(rocksdb_iterator_t*);
extern unsigned char rocksdb_iter_seek_to_last_ext(rocksdb_iterator_t*);
extern unsigned char rocksdb_iter_seek_ext(rocksdb_iterator_t*, const char* k, size_t klen);
extern unsigned char rocksdb_iter_next_ext(rocksdb_iterator_t*);
extern unsigned char rocksdb_iter_prev_ext(rocksdb_iterator_t*);
#ifdef __cplusplus
}
#endif
#endif

46
store/rocksdb/util.go Normal file
View File

@ -0,0 +1,46 @@
// +build rocksdb
package rocksdb
// #include <stdlib.h>
// #include "rocksdb/c.h"
import "C"
import (
"fmt"
"reflect"
"unsafe"
)
func boolToUchar(b bool) C.uchar {
uc := C.uchar(0)
if b {
uc = C.uchar(1)
}
return uc
}
func ucharToBool(uc C.uchar) bool {
if uc == C.uchar(0) {
return false
}
return true
}
func saveError(errStr *C.char) error {
if errStr != nil {
gs := C.GoString(errStr)
C.free(unsafe.Pointer(errStr))
return fmt.Errorf(gs)
}
return nil
}
func slice(p unsafe.Pointer, n int) []byte {
var b []byte
pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pbyte.Data = uintptr(p)
pbyte.Len = n
pbyte.Cap = n
return b
}

70
store/store.go Normal file
View File

@ -0,0 +1,70 @@
package store
import (
"fmt"
"github.com/siddontang/ledisdb/store/driver"
"os"
)
const DefaultStoreName = "goleveldb"
type Store interface {
Open(cfg *Config) (driver.IDB, error)
Repair(cfg *Config) error
}
var dbs = map[string]Store{}
func Register(name string, store Store) {
if _, ok := dbs[name]; ok {
panic(fmt.Errorf("store %s is registered", name))
}
dbs[name] = store
}
func ListStores() []string {
s := []string{}
for k, _ := range dbs {
s = append(s, k)
}
return s
}
func Open(cfg *Config) (*DB, error) {
if len(cfg.Name) == 0 {
cfg.Name = DefaultStoreName
}
s, ok := dbs[cfg.Name]
if !ok {
return nil, fmt.Errorf("store %s is not registered", cfg.Name)
}
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
idb, err := s.Open(cfg)
if err != nil {
return nil, err
}
db := &DB{idb}
return db, nil
}
func Repair(cfg *Config) error {
if len(cfg.Name) == 0 {
cfg.Name = DefaultStoreName
}
s, ok := dbs[cfg.Name]
if !ok {
return fmt.Errorf("db %s is not registered", cfg.Name)
}
return s.Repair(cfg)
}

View File

@ -1,4 +1,4 @@
package leveldb
package store
import (
"bytes"
@ -8,23 +8,17 @@ import (
"testing"
)
var testConfigJson = []byte(`
{
"path" : "./testdb",
"compression":true,
"block_size" : 32768,
"write_buffer_size" : 2097152,
"cache_size" : 20971520
}
`)
var testOnce sync.Once
var testDB *DB
func getTestDB() *DB {
f := func() {
var err error
testDB, err = OpenWithJsonConfig(testConfigJson)
cfg := new(Config)
cfg.Path = "/tmp/testdb"
testDB, err = Open(cfg)
if err != nil {
println(err.Error())
panic(err)
@ -131,7 +125,11 @@ func checkIterator(it *RangeLimitIterator, cv ...int) error {
func TestIterator(t *testing.T) {
db := getTestDB()
db.Clear()
i := db.NewIterator()
for i.SeekToFirst(); i.Valid(); i.Next() {
db.Delete(i.Key())
}
i.Close()
for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("key_%d", i))
@ -139,6 +137,16 @@ func TestIterator(t *testing.T) {
db.Put(key, value)
}
i = db.NewIterator()
i.SeekToFirst()
if !i.Valid() {
t.Fatal("must valid")
} else if string(i.Key()) != "key_0" {
t.Fatal(string(i.Key()))
}
i.Close()
var it *RangeLimitIterator
k := func(i int) []byte {
@ -149,96 +157,67 @@ func TestIterator(t *testing.T) {
if err := checkIterator(it, 1, 2, 3, 4, 5); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RangeLimitIterator(k(1), k(5), RangeClose, 0, -1)
if err := checkIterator(it, 1, 2, 3, 4, 5); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RangeLimitIterator(k(1), k(5), RangeClose, 1, 3)
if err := checkIterator(it, 2, 3, 4); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1)
if err := checkIterator(it, 2, 3, 4, 5); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RangeLimitIterator(k(1), k(5), RangeROpen, 0, -1)
if err := checkIterator(it, 1, 2, 3, 4); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RangeLimitIterator(k(1), k(5), RangeOpen, 0, -1)
if err := checkIterator(it, 2, 3, 4); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 0, -1)
if err := checkIterator(it, 5, 4, 3, 2, 1); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 1, 3)
if err := checkIterator(it, 4, 3, 2); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RevRangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1)
if err := checkIterator(it, 5, 4, 3, 2); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RevRangeLimitIterator(k(1), k(5), RangeROpen, 0, -1)
if err := checkIterator(it, 4, 3, 2, 1); err != nil {
t.Fatal(err)
}
it.Close()
it = db.RevRangeLimitIterator(k(1), k(5), RangeOpen, 0, -1)
if err := checkIterator(it, 4, 3, 2); err != nil {
t.Fatal(err)
}
}
func TestSnapshot(t *testing.T) {
db := getTestDB()
key := []byte("key")
value := []byte("hello world")
db.Put(key, value)
s := db.NewSnapshot()
defer s.Close()
db.Put(key, []byte("hello world2"))
if v, err := s.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
}
func TestDestroy(t *testing.T) {
db := getTestDB()
db.Put([]byte("a"), []byte("1"))
if err := db.Clear(); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(db.cfg.Path); err != nil {
t.Fatal("must exist ", err.Error())
}
if v, err := db.Get([]byte("a")); err != nil {
t.Fatal(err)
} else if string(v) == "1" {
t.Fatal(string(v))
}
db.Destroy()
if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) {
t.Fatal("must not exist")
}
it.Close()
}
func TestCloseMore(t *testing.T) {
@ -256,4 +235,6 @@ func TestCloseMore(t *testing.T) {
db.Close()
}
os.RemoveAll(cfg.Path)
}

9
store/writebatch.go Normal file
View File

@ -0,0 +1,9 @@
package store
import (
"github.com/siddontang/ledisdb/store/driver"
)
type WriteBatch struct {
driver.IWriteBatch
}