add support for multi storage

This commit is contained in:
siddontang 2014-07-25 17:58:00 +08:00
parent 251b7eae5c
commit 7a237661da
46 changed files with 1054 additions and 410 deletions

View File

@ -1,11 +1,12 @@
GO_BUILD_FLAG += leveldb
all: build all: build
build: build:
go install ./... go install -tags $(GO_BUILD_FLAG) ./...
clean: clean:
go clean -i ./... go clean -i ./...
test: test:
go test ./... go test -tags $(GO_BUILD_FLAG) ./...
go test -race ./...

View File

@ -4,4 +4,7 @@
go get github.com/siddontang/go-log/log go get github.com/siddontang/go-log/log
go get github.com/siddontang/go-snappy/snappy go get github.com/siddontang/go-snappy/snappy
go get github.com/siddontang/copier go get github.com/siddontang/copier
go get github.com/syndtr/goleveldb/leveldb
go get github.com/szferi/gomdb

View File

@ -4,7 +4,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"io/ioutil" "io/ioutil"
) )
@ -35,7 +35,7 @@ func main() {
return return
} }
if err = leveldb.Repair(cfg.NewDBConfig()); err != nil { if err = store.Repair(cfg.NewDBConfig()); err != nil {
println("repair error: ", err.Error()) println("repair error: ", err.Error())
} }
} }

6
dev.sh
View File

@ -8,12 +8,10 @@ if [[ "$VTTOP" == "${VTTOP/\/src\/github.com\/siddontang\/ledisdb/}" ]]; then
exit 1 exit 1
fi fi
#default snappy and leveldb install path #default snappy and leveldb install path
#you may change yourself #you may change yourself
export SNAPPY_DIR=/usr/local/snappy
SNAPPY_DIR=/usr/local/snappy export LEVELDB_DIR=/usr/local/leveldb
LEVELDB_DIR=/usr/local/leveldb
function add_path() function add_path()
{ {

View File

@ -2,7 +2,7 @@ package ledis
import ( import (
"github.com/siddontang/copier" "github.com/siddontang/copier"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"path" "path"
) )
@ -24,10 +24,10 @@ type Config struct {
} `json:"binlog"` } `json:"binlog"`
} }
func (cfg *Config) NewDBConfig() *leveldb.Config { func (cfg *Config) NewDBConfig() *store.Config {
dbPath := path.Join(cfg.DataDir, "data") dbPath := path.Join(cfg.DataDir, "data")
dbCfg := new(leveldb.Config) dbCfg := new(store.Config)
copier.Copy(dbCfg, &cfg.DB) copier.Copy(dbCfg, &cfg.DB)
dbCfg.Path = dbPath dbCfg.Path = dbPath
return dbCfg return dbCfg

View File

@ -1,4 +1,5 @@
// Package ledis is a high performance embedded NoSQL based on leveldb. // Package ledis is a high performance embedded NoSQL.
//
// Ledis supports various advanced data structure like kv, list, hash and zset like redis. // Ledis supports various advanced data structure like kv, list, hash and zset like redis.
// //
// Other features include binlog replication, data with a limited time-to-live. // Other features include binlog replication, data with a limited time-to-live.

View File

@ -5,7 +5,6 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"github.com/siddontang/go-snappy/snappy" "github.com/siddontang/go-snappy/snappy"
"github.com/siddontang/ledisdb/leveldb"
"io" "io"
"os" "os"
) )
@ -57,16 +56,13 @@ func (l *Ledis) DumpFile(path string) error {
} }
func (l *Ledis) Dump(w io.Writer) error { func (l *Ledis) Dump(w io.Writer) error {
var sp *leveldb.Snapshot
var m *MasterInfo = new(MasterInfo) var m *MasterInfo = new(MasterInfo)
if l.binlog == nil { l.Lock()
sp = l.ldb.NewSnapshot() defer l.Unlock()
} else {
l.Lock() if l.binlog != nil {
sp = l.ldb.NewSnapshot()
m.LogFileIndex = l.binlog.LogFileIndex() m.LogFileIndex = l.binlog.LogFileIndex()
m.LogPos = l.binlog.LogFilePos() m.LogPos = l.binlog.LogFilePos()
l.Unlock()
} }
var err error var err error
@ -76,7 +72,7 @@ func (l *Ledis) Dump(w io.Writer) error {
return err return err
} }
it := sp.NewIterator() it := l.ldb.NewIterator()
it.SeekToFirst() it.SeekToFirst()
compressBuf := make([]byte, 4096) compressBuf := make([]byte, 4096)

View File

@ -2,7 +2,7 @@ package ledis
import ( import (
"bytes" "bytes"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"os" "os"
"testing" "testing"
) )
@ -59,7 +59,7 @@ func TestDump(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
key := it.Key() key := it.Key()
value := it.Value() value := it.Value()

View File

@ -4,7 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"sync" "sync"
"time" "time"
) )
@ -12,7 +12,7 @@ import (
type DB struct { type DB struct {
l *Ledis l *Ledis
db *leveldb.DB db *store.DB
index uint8 index uint8
@ -28,7 +28,7 @@ type Ledis struct {
cfg *Config cfg *Config
ldb *leveldb.DB ldb *store.DB
dbs [MaxDBNumber]*DB dbs [MaxDBNumber]*DB
binlog *BinLog binlog *BinLog
@ -52,7 +52,7 @@ func Open(cfg *Config) (*Ledis, error) {
return nil, fmt.Errorf("must set correct data_dir") return nil, fmt.Errorf("must set correct data_dir")
} }
ldb, err := leveldb.Open(cfg.NewDBConfig()) ldb, err := store.Open(cfg.NewDBConfig())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -132,7 +132,7 @@ func (l *Ledis) FlushAll() error {
} }
// very dangerous to use // very dangerous to use
func (l *Ledis) DataDB() *leveldb.DB { func (l *Ledis) DataDB() *store.DB {
return l.ldb return l.ldb
} }

View File

@ -1,7 +1,7 @@
package ledis package ledis
import ( import (
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
) )
func (db *DB) FlushAll() (drop int64, err error) { func (db *DB) FlushAll() (drop int64, err error) {
@ -36,7 +36,7 @@ func (db *DB) newEliminator() *elimination {
} }
func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) { func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) {
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen) it := db.db.RangeIterator(minKey, maxKey, store.RangeROpen)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.RawKey()) t.Delete(it.RawKey())
drop++ drop++

View File

@ -3,14 +3,14 @@ package ledis
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"os" "os"
"path" "path"
"testing" "testing"
) )
func checkLedisEqual(master *Ledis, slave *Ledis) error { func checkLedisEqual(master *Ledis, slave *Ledis) error {
it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
key := it.Key() key := it.Key()
value := it.Value() value := it.Value()

View File

@ -3,7 +3,7 @@ package ledis
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"sort" "sort"
"time" "time"
) )
@ -253,7 +253,7 @@ func (db *DB) bDelete(t *tx, key []byte) (drop int64) {
minKey := db.bEncodeBinKey(key, minSeq) minKey := db.bEncodeBinKey(key, minSeq)
maxKey := db.bEncodeBinKey(key, maxSeq) maxKey := db.bEncodeBinKey(key, maxSeq)
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) it := db.db.RangeIterator(minKey, maxKey, store.RangeClose)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.RawKey()) t.Delete(it.RawKey())
drop++ drop++
@ -280,10 +280,10 @@ func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) {
return bk, segment, err return bk, segment, err
} }
func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { func (db *DB) bIterator(key []byte) *store.RangeLimitIterator {
sk := db.bEncodeBinKey(key, minSeq) sk := db.bEncodeBinKey(key, minSeq)
ek := db.bEncodeBinKey(key, maxSeq) ek := db.bEncodeBinKey(key, maxSeq)
return db.db.RangeIterator(sk, ek, leveldb.RangeClose) return db.db.RangeIterator(sk, ek, store.RangeClose)
} }
func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) { func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) {
@ -446,7 +446,7 @@ func (db *DB) BGet(key []byte) (data []byte, err error) {
minKey := db.bEncodeBinKey(key, minSeq) minKey := db.bEncodeBinKey(key, minSeq)
maxKey := db.bEncodeBinKey(key, tailSeq) maxKey := db.bEncodeBinKey(key, tailSeq)
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) it := db.db.RangeIterator(minKey, maxKey, store.RangeClose)
var seq, s, e uint32 var seq, s, e uint32
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
@ -662,7 +662,7 @@ func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error)
skey := db.bEncodeBinKey(key, sseq) skey := db.bEncodeBinKey(key, sseq)
ekey := db.bEncodeBinKey(key, eseq) ekey := db.bEncodeBinKey(key, eseq)
it := db.db.RangeIterator(skey, ekey, leveldb.RangeOpen) it := db.db.RangeIterator(skey, ekey, store.RangeOpen)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
segment = it.RawValue() segment = it.RawValue()
for _, bt := range segment { for _, bt := range segment {

View File

@ -3,7 +3,7 @@ package ledis
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"time" "time"
) )
@ -132,7 +132,7 @@ func (db *DB) hDelete(t *tx, key []byte) int64 {
stop := db.hEncodeStopKey(key) stop := db.hEncodeStopKey(key)
var num int64 = 0 var num int64 = 0
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())
num++ num++
@ -354,7 +354,7 @@ func (db *DB) HGetAll(key []byte) ([]FVPair, error) {
v := make([]FVPair, 0, 16) v := make([]FVPair, 0, 16)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
_, f, err := db.hDecodeHashKey(it.Key()) _, f, err := db.hDecodeHashKey(it.Key())
if err != nil { if err != nil {
@ -379,7 +379,7 @@ func (db *DB) HKeys(key []byte) ([][]byte, error) {
v := make([][]byte, 0, 16) v := make([][]byte, 0, 16)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
_, f, err := db.hDecodeHashKey(it.Key()) _, f, err := db.hDecodeHashKey(it.Key())
if err != nil { if err != nil {
@ -403,7 +403,7 @@ func (db *DB) HValues(key []byte) ([][]byte, error) {
v := make([][]byte, 0, 16) v := make([][]byte, 0, 16)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
_, _, err := db.hDecodeHashKey(it.Key()) _, _, err := db.hDecodeHashKey(it.Key())
if err != nil { if err != nil {
@ -491,9 +491,9 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa
v := make([]FVPair, 0, count) v := make([]FVPair, 0, count)
rangeType := leveldb.RangeROpen rangeType := store.RangeROpen
if !inclusive { if !inclusive {
rangeType = leveldb.RangeOpen rangeType = store.RangeOpen
} }
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)

View File

@ -2,7 +2,7 @@ package ledis
import ( import (
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"time" "time"
) )
@ -343,9 +343,9 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) {
v := make([]KVPair, 0, 2*count) v := make([]KVPair, 0, 2*count)
rangeType := leveldb.RangeROpen rangeType := store.RangeROpen
if !inclusive { if !inclusive {
rangeType = leveldb.RangeOpen rangeType = store.RangeOpen
} }
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)

View File

@ -3,7 +3,7 @@ package ledis
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"time" "time"
) )
@ -203,7 +203,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
startKey := db.lEncodeListKey(key, headSeq) startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq) stopKey := db.lEncodeListKey(key, tailSeq)
rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose}) rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose})
for ; rit.Valid(); rit.Next() { for ; rit.Valid(); rit.Next() {
t.Delete(rit.RawKey()) t.Delete(rit.RawKey())
num++ num++
@ -214,7 +214,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
return num return num
} }
func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte var v []byte
if it != nil { if it != nil {
v = it.Find(ek) v = it.Find(ek)
@ -364,12 +364,12 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
v := make([][]byte, 0, limit) v := make([][]byte, 0, limit)
startKey := db.lEncodeListKey(key, headSeq) startKey := db.lEncodeListKey(key, headSeq)
rit := leveldb.NewRangeLimitIterator(it, rit := store.NewRangeLimitIterator(it,
&leveldb.Range{ &store.Range{
Min: startKey, Min: startKey,
Max: nil, Max: nil,
Type: leveldb.RangeClose}, Type: store.RangeClose},
&leveldb.Limit{ &store.Limit{
Offset: 0, Offset: 0,
Count: int(limit)}) Count: int(limit)})

View File

@ -3,7 +3,7 @@ package ledis
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"time" "time"
) )
@ -156,7 +156,7 @@ func (eli *elimination) active() {
minKey := db.expEncodeTimeKey(NoneType, nil, 0) minKey := db.expEncodeTimeKey(NoneType, nil, 0)
maxKey := db.expEncodeTimeKey(maxDataType, nil, now) maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
tk := it.RawKey() tk := it.RawKey()
mk := it.RawValue() mk := it.RawValue()

View File

@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"time" "time"
) )
@ -432,7 +432,7 @@ func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) {
minKey := db.zEncodeStartScoreKey(key, min) minKey := db.zEncodeStartScoreKey(key, min)
maxKey := db.zEncodeStopScoreKey(key, max) maxKey := db.zEncodeStopScoreKey(key, max)
rangeType := leveldb.RangeROpen rangeType := store.RangeROpen
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1) it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1)
var n int64 = 0 var n int64 = 0
@ -460,17 +460,17 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
if s, err := Int64(v, nil); err != nil { if s, err := Int64(v, nil); err != nil {
return 0, err return 0, err
} else { } else {
var rit *leveldb.RangeLimitIterator var rit *store.RangeLimitIterator
sk := db.zEncodeScoreKey(key, member, s) sk := db.zEncodeScoreKey(key, member, s)
if !reverse { if !reverse {
minKey := db.zEncodeStartScoreKey(key, MinScore) minKey := db.zEncodeStartScoreKey(key, MinScore)
rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose}) rit = store.NewRangeIterator(it, &store.Range{minKey, sk, store.RangeClose})
} else { } else {
maxKey := db.zEncodeStopScoreKey(key, MaxScore) maxKey := db.zEncodeStopScoreKey(key, MaxScore)
rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose}) rit = store.NewRevRangeIterator(it, &store.Range{sk, maxKey, store.RangeClose})
} }
var lastKey []byte = nil var lastKey []byte = nil
@ -492,14 +492,14 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
return -1, nil return -1, nil
} }
func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *leveldb.RangeLimitIterator { func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator {
minKey := db.zEncodeStartScoreKey(key, min) minKey := db.zEncodeStartScoreKey(key, min)
maxKey := db.zEncodeStopScoreKey(key, max) maxKey := db.zEncodeStopScoreKey(key, max)
if !reverse { if !reverse {
return db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count) return db.db.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
} else { } else {
return db.db.RevRangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count) return db.db.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
} }
} }
@ -550,10 +550,10 @@ func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, re
v := make([]ScorePair, 0, nv) v := make([]ScorePair, 0, nv)
var it *leveldb.RangeLimitIterator var it *store.RangeLimitIterator
//if reverse and offset is 0, count < 0, we may use forward iterator then reverse //if reverse and offset is 0, count < 0, we may use forward iterator then reverse
//because leveldb iterator prev is slower than next //because store iterator prev is slower than next
if !reverse || (offset == 0 && count < 0) { if !reverse || (offset == 0 && count < 0) {
it = db.zIterator(key, min, max, offset, count, false) it = db.zIterator(key, min, max, offset, count, false)
} else { } else {
@ -740,7 +740,7 @@ func (db *DB) zFlush() (drop int64, err error) {
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = ZScoreType + 1 maxKey[1] = ZScoreType + 1
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
defer it.Close() defer it.Close()
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
@ -779,9 +779,9 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco
v := make([]ScorePair, 0, 2*count) v := make([]ScorePair, 0, 2*count)
rangeType := leveldb.RangeROpen rangeType := store.RangeROpen
if !inclusive { if !inclusive {
rangeType = leveldb.RangeOpen rangeType = store.RangeOpen
} }
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)

View File

@ -1,7 +1,7 @@
package ledis package ledis
import ( import (
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"sync" "sync"
) )
@ -9,7 +9,7 @@ type tx struct {
m sync.Mutex m sync.Mutex
l *Ledis l *Ledis
wb *leveldb.WriteBatch wb *store.WriteBatch
binlog *BinLog binlog *BinLog
batch [][]byte batch [][]byte

View File

@ -1,60 +0,0 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdint.h>
// #include "leveldb/c.h"
import "C"
type Snapshot struct {
db *DB
snap *C.leveldb_snapshot_t
readOpts *ReadOptions
iteratorOpts *ReadOptions
}
func (s *Snapshot) Close() {
C.leveldb_release_snapshot(s.db.db, s.snap)
s.iteratorOpts.Close()
s.readOpts.Close()
}
func (s *Snapshot) Get(key []byte) ([]byte, error) {
return s.db.get(nil, s.readOpts, key)
}
func (s *Snapshot) BufGet(r []byte, key []byte) ([]byte, error) {
return s.db.get(r, s.readOpts, key)
}
func (s *Snapshot) NewIterator() *Iterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(s.db.db, s.iteratorOpts.Opt)
return it
}
func (s *Snapshot) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}

View File

@ -3,14 +3,14 @@ package server
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/siddontang/ledisdb/leveldb" "github.com/siddontang/ledisdb/store"
"os" "os"
"testing" "testing"
"time" "time"
) )
func checkDataEqual(master *App, slave *App) error { func checkDataEqual(master *App, slave *App) error {
it := master.ldb.DataDB().RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) it := master.ldb.DataDB().RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
key := it.Key() key := it.Key()
value := it.Value() value := it.Value()

14
store/config.go Normal file
View File

@ -0,0 +1,14 @@
package store
type Config struct {
Name string
Path string
//for leveldb, goleveldb, rocksdb
Compression bool
BlockSize int
WriteBufferSize int
CacheSize int
MaxOpenFiles int
}

75
store/db.go Normal file
View File

@ -0,0 +1,75 @@
package store
import (
"github.com/siddontang/ledisdb/store/driver"
)
type DB struct {
db driver.IDB
}
// Close database
//
// Caveat
// Any other DB operations like Get, Put, etc... may cause a panic after Close
//
func (db *DB) Close() error {
if db.db == nil {
return nil
}
err := db.db.Close()
db.db = nil
return err
}
// Get Value with Key
func (db *DB) Get(key []byte) ([]byte, error) {
return db.db.Get(key)
}
// Put value with key
func (db *DB) Put(key []byte, value []byte) error {
err := db.db.Put(key, value)
return err
}
// Delete by key
func (db *DB) Delete(key []byte) error {
err := db.db.Delete(key)
return err
}
func (db *DB) NewIterator() *Iterator {
it := new(Iterator)
it.it = db.db.NewIterator()
return it
}
func (db *DB) NewWriteBatch() *WriteBatch {
return &WriteBatch{db.db.NewWriteBatch()}
}
func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}

39
store/driver/driver.go Normal file
View File

@ -0,0 +1,39 @@
package driver
type IDB interface {
Close() error
Get(key []byte) ([]byte, error)
Put(key []byte, value []byte) error
Delete(key []byte) error
NewIterator() IIterator
NewWriteBatch() IWriteBatch
}
type IIterator interface {
Close() error
First()
Last()
Seek(key []byte)
Next()
Prev()
Valid() bool
Key() []byte
Value() []byte
}
type IWriteBatch interface {
Close() error
Put(key []byte, value []byte)
Delete(key []byte)
Commit() error
Rollback() error
}

30
store/goleveldb.go Normal file
View File

@ -0,0 +1,30 @@
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/goleveldb"
)
const GoLevelDBName = "goleveldb"
type GoLevelDBStore struct {
}
func (s GoLevelDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &goleveldb.Config{}
copier.Copy(c, cfg)
return goleveldb.Open(c)
}
func (s GoLevelDBStore) Repair(cfg *Config) error {
c := &goleveldb.Config{}
copier.Copy(c, cfg)
return goleveldb.Repair(c)
}
func init() {
Register(GoLevelDBName, GoLevelDBStore{})
}

31
store/goleveldb/batch.go Normal file
View File

@ -0,0 +1,31 @@
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb"
)
type WriteBatch struct {
db *DB
wbatch *leveldb.Batch
}
func (w *WriteBatch) Close() error {
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
w.wbatch.Put(key, value)
}
func (w *WriteBatch) Delete(key []byte) {
w.wbatch.Delete(key)
}
func (w *WriteBatch) Commit() error {
return w.db.db.Write(w.wbatch, nil)
}
func (w *WriteBatch) Rollback() error {
w.wbatch.Reset()
return nil
}

137
store/goleveldb/db.go Normal file
View File

@ -0,0 +1,137 @@
package goleveldb
import (
"github.com/siddontang/ledisdb/store/driver"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"os"
)
const defaultFilterBits int = 10
type Config struct {
Path string
Compression bool
BlockSize int
WriteBufferSize int
CacheSize int
}
type DB struct {
cfg *Config
db *leveldb.DB
opts *opt.Options
iteratorOpts *opt.ReadOptions
cache cache.Cache
filter filter.Filter
}
func Open(cfg *Config) (*DB, error) {
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
db := new(DB)
db.cfg = cfg
if err := db.open(); err != nil {
return nil, err
}
return db, nil
}
func Repair(cfg *Config) error {
db, err := leveldb.RecoverFile(cfg.Path, newOptions(cfg))
if err != nil {
return err
}
db.Close()
return nil
}
func (db *DB) open() error {
db.opts = newOptions(db.cfg)
db.iteratorOpts = &opt.ReadOptions{}
db.iteratorOpts.DontFillCache = true
var err error
db.db, err = leveldb.OpenFile(db.cfg.Path, db.opts)
return err
}
func newOptions(cfg *Config) *opt.Options {
opts := &opt.Options{}
opts.ErrorIfMissing = false
if cfg.CacheSize > 0 {
opts.BlockCache = cache.NewLRUCache(cfg.CacheSize)
}
//we must use bloomfilter
opts.Filter = filter.NewBloomFilter(defaultFilterBits)
if !cfg.Compression {
opts.Compression = opt.NoCompression
} else {
opts.Compression = opt.SnappyCompression
}
if cfg.BlockSize > 0 {
opts.BlockSize = cfg.BlockSize
}
if cfg.WriteBufferSize > 0 {
opts.WriteBuffer = cfg.WriteBufferSize
}
return opts
}
func (db *DB) Close() error {
return db.db.Close()
}
func (db *DB) Put(key, value []byte) error {
return db.db.Put(key, value, nil)
}
func (db *DB) Get(key []byte) ([]byte, error) {
v, err := db.db.Get(key, nil)
if err == leveldb.ErrNotFound {
return nil, nil
}
return v, nil
}
func (db *DB) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: new(leveldb.Batch),
}
return wb
}
func (db *DB) NewIterator() driver.IIterator {
it := &Iterator{
db.db.NewIterator(nil, db.iteratorOpts),
}
return it
}

View File

@ -0,0 +1,49 @@
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb/iterator"
)
type Iterator struct {
it iterator.Iterator
}
func (it *Iterator) Key() []byte {
return it.it.Key()
}
func (it *Iterator) Value() []byte {
return it.it.Value()
}
func (it *Iterator) Close() error {
if it.it != nil {
it.it.Release()
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return it.it.Valid()
}
func (it *Iterator) Next() {
it.it.Next()
}
func (it *Iterator) Prev() {
it.it.Prev()
}
func (it *Iterator) First() {
it.it.First()
}
func (it *Iterator) Last() {
it.it.Last()
}
func (it *Iterator) Seek(key []byte) {
it.it.Seek(key)
}

View File

@ -1,14 +1,8 @@
package leveldb package store
// #cgo LDFLAGS: -lleveldb
// #include <stdlib.h>
// #include "leveldb/c.h"
// #include "leveldb_ext.h"
import "C"
import ( import (
"bytes" "bytes"
"unsafe" "github.com/siddontang/ledisdb/store/driver"
) )
const ( const (
@ -27,10 +21,10 @@ const (
// //
// range type: // range type:
// //
// close: [min, max] // close: [min, max]
// open: (min, max) // open: (min, max)
// lopen: (min, max] // lopen: (min, max]
// ropen: [min, max) // ropen: [min, max)
// //
type Range struct { type Range struct {
Min []byte Min []byte
@ -45,54 +39,39 @@ type Limit struct {
} }
type Iterator struct { type Iterator struct {
it *C.leveldb_iterator_t it driver.IIterator
isValid C.uchar
} }
// Returns a copy of key. // Returns a copy of key.
func (it *Iterator) Key() []byte { func (it *Iterator) Key() []byte {
var klen C.size_t k := it.it.Key()
kdata := C.leveldb_iter_key(it.it, &klen) if k == nil {
if kdata == nil {
return nil return nil
} }
return C.GoBytes(unsafe.Pointer(kdata), C.int(klen)) return append([]byte{}, k...)
} }
// Returns a copy of value. // Returns a copy of value.
func (it *Iterator) Value() []byte { func (it *Iterator) Value() []byte {
var vlen C.size_t v := it.it.Value()
vdata := C.leveldb_iter_value(it.it, &vlen) if v == nil {
if vdata == nil {
return nil return nil
} }
return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen)) return append([]byte{}, v...)
} }
// Returns a reference of key. // Returns a reference of key.
// you must be careful that it will be changed after next iterate. // you must be careful that it will be changed after next iterate.
func (it *Iterator) RawKey() []byte { func (it *Iterator) RawKey() []byte {
var klen C.size_t return it.it.Key()
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
} }
// Returns a reference of value. // Returns a reference of value.
// you must be careful that it will be changed after next iterate. // you must be careful that it will be changed after next iterate.
func (it *Iterator) RawValue() []byte { func (it *Iterator) RawValue() []byte {
var vlen C.size_t return it.it.Value()
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
} }
// Copy key to b, if b len is small or nil, returns a new one. // Copy key to b, if b len is small or nil, returns a new one.
@ -126,33 +105,33 @@ func (it *Iterator) BufValue(b []byte) []byte {
func (it *Iterator) Close() { func (it *Iterator) Close() {
if it.it != nil { if it.it != nil {
C.leveldb_iter_destroy(it.it) it.it.Close()
it.it = nil it.it = nil
} }
} }
func (it *Iterator) Valid() bool { func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid) return it.it.Valid()
} }
func (it *Iterator) Next() { func (it *Iterator) Next() {
it.isValid = C.leveldb_iter_next_ext(it.it) it.it.Next()
} }
func (it *Iterator) Prev() { func (it *Iterator) Prev() {
it.isValid = C.leveldb_iter_prev_ext(it.it) it.it.Prev()
} }
func (it *Iterator) SeekToFirst() { func (it *Iterator) SeekToFirst() {
it.isValid = C.leveldb_iter_seek_to_first_ext(it.it) it.it.First()
} }
func (it *Iterator) SeekToLast() { func (it *Iterator) SeekToLast() {
it.isValid = C.leveldb_iter_seek_to_last_ext(it.it) it.it.Last()
} }
func (it *Iterator) Seek(key []byte) { func (it *Iterator) Seek(key []byte) {
it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) it.it.Seek(key)
} }
// Finds by key, if not found, nil returns. // Finds by key, if not found, nil returns.

32
store/leveldb.go Normal file
View File

@ -0,0 +1,32 @@
// +build leveldb
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/leveldb"
)
const LevelDBName = "leveldb"
type LevelDBStore struct {
}
func (s LevelDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &leveldb.Config{}
copier.Copy(c, cfg)
return leveldb.Open(c)
}
func (s LevelDBStore) Repair(cfg *Config) error {
c := &leveldb.Config{}
copier.Copy(c, cfg)
return leveldb.Repair(c)
}
func init() {
Register(LevelDBName, LevelDBStore{})
}

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb package leveldb
// #cgo LDFLAGS: -lleveldb // #cgo LDFLAGS: -lleveldb
@ -13,8 +15,9 @@ type WriteBatch struct {
wbatch *C.leveldb_writebatch_t wbatch *C.leveldb_writebatch_t
} }
func (w *WriteBatch) Close() { func (w *WriteBatch) Close() error {
C.leveldb_writebatch_destroy(w.wbatch) C.leveldb_writebatch_destroy(w.wbatch)
return nil
} }
func (w *WriteBatch) Put(key, value []byte) { func (w *WriteBatch) Put(key, value []byte) {
@ -41,12 +44,9 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts) return w.commit(w.db.writeOpts)
} }
func (w *WriteBatch) SyncCommit() error { func (w *WriteBatch) Rollback() error {
return w.commit(w.db.syncWriteOpts)
}
func (w *WriteBatch) Rollback() {
C.leveldb_writebatch_clear(w.wbatch) C.leveldb_writebatch_clear(w.wbatch)
return nil
} }
func (w *WriteBatch) commit(wb *WriteOptions) error { func (w *WriteBatch) commit(wb *WriteOptions) error {

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb package leveldb
// #cgo LDFLAGS: -lleveldb // #cgo LDFLAGS: -lleveldb

View File

@ -1,3 +1,5 @@
// +build leveldb
// Package leveldb is a wrapper for c++ leveldb // Package leveldb is a wrapper for c++ leveldb
package leveldb package leveldb
@ -9,7 +11,7 @@ package leveldb
import "C" import "C"
import ( import (
"encoding/json" "github.com/siddontang/ledisdb/store/driver"
"os" "os"
"unsafe" "unsafe"
) )
@ -17,42 +19,13 @@ import (
const defaultFilterBits int = 10 const defaultFilterBits int = 10
type Config struct { type Config struct {
Path string `json:"path"` Path string
Compression bool `json:"compression"` Compression bool
BlockSize int `json:"block_size"` BlockSize int
WriteBufferSize int `json:"write_buffer_size"` WriteBufferSize int
CacheSize int `json:"cache_size"` CacheSize int
MaxOpenFiles int `json:"max_open_files"` MaxOpenFiles int
}
type DB struct {
cfg *Config
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncWriteOpts *WriteOptions
cache *Cache
filter *FilterPolicy
}
func OpenWithJsonConfig(configJson json.RawMessage) (*DB, error) {
cfg := new(Config)
err := json.Unmarshal(configJson, cfg)
if err != nil {
return nil, err
}
return Open(cfg)
} }
func Open(cfg *Config) (*DB, error) { func Open(cfg *Config) (*DB, error) {
@ -70,21 +43,6 @@ func Open(cfg *Config) (*DB, error) {
return db, nil return db, nil
} }
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func Repair(cfg *Config) error { func Repair(cfg *Config) error {
db := new(DB) db := new(DB)
db.cfg = cfg db.cfg = cfg
@ -108,6 +66,38 @@ func Repair(cfg *Config) error {
return nil return nil
} }
type DB struct {
cfg *Config
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
cache *Cache
filter *FilterPolicy
}
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func (db *DB) initOptions(cfg *Config) { func (db *DB) initOptions(cfg *Config) {
opts := NewOptions() opts := NewOptions()
@ -126,6 +116,8 @@ func (db *DB) initOptions(cfg *Config) {
if !cfg.Compression { if !cfg.Compression {
opts.SetCompression(NoCompression) opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
} }
if cfg.BlockSize <= 0 { if cfg.BlockSize <= 0 {
@ -153,9 +145,6 @@ func (db *DB) initOptions(cfg *Config) {
db.iteratorOpts = NewReadOptions() db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false) db.iteratorOpts.SetFillCache(false)
db.syncWriteOpts = NewWriteOptions()
db.syncWriteOpts.SetSync(true)
} }
func (db *DB) Close() error { func (db *DB) Close() error {
@ -177,80 +166,23 @@ func (db *DB) Close() error {
db.readOpts.Close() db.readOpts.Close()
db.writeOpts.Close() db.writeOpts.Close()
db.iteratorOpts.Close() db.iteratorOpts.Close()
db.syncWriteOpts.Close()
return nil return nil
} }
func (db *DB) Destroy() error {
path := db.cfg.Path
db.Close()
opts := NewOptions()
defer opts.Close()
var errStr *C.char
ldbname := C.CString(path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
C.leveldb_destroy_db(opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) Clear() error {
bc := db.NewWriteBatch()
defer bc.Close()
var err error
it := db.NewIterator()
it.SeekToFirst()
num := 0
for ; it.Valid(); it.Next() {
bc.Delete(it.RawKey())
num++
if num == 1000 {
num = 0
if err = bc.Commit(); err != nil {
return err
}
}
}
err = bc.Commit()
return err
}
func (db *DB) Put(key, value []byte) error { func (db *DB) Put(key, value []byte) error {
return db.put(db.writeOpts, key, value) return db.put(db.writeOpts, key, value)
} }
func (db *DB) SyncPut(key, value []byte) error {
return db.put(db.syncWriteOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) { func (db *DB) Get(key []byte) ([]byte, error) {
return db.get(nil, db.readOpts, key) return db.get(db.readOpts, key)
}
func (db *DB) BufGet(r []byte, key []byte) ([]byte, error) {
return db.get(r, db.readOpts, key)
} }
func (db *DB) Delete(key []byte) error { func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key) return db.delete(db.writeOpts, key)
} }
func (db *DB) SyncDelete(key []byte) error { func (db *DB) NewWriteBatch() driver.IWriteBatch {
return db.delete(db.syncWriteOpts, key)
}
func (db *DB) NewWriteBatch() *WriteBatch {
wb := &WriteBatch{ wb := &WriteBatch{
db: db, db: db,
wbatch: C.leveldb_writebatch_create(), wbatch: C.leveldb_writebatch_create(),
@ -258,22 +190,7 @@ func (db *DB) NewWriteBatch() *WriteBatch {
return wb return wb
} }
func (db *DB) NewSnapshot() *Snapshot { func (db *DB) NewIterator() driver.IIterator {
s := &Snapshot{
db: db,
snap: C.leveldb_create_snapshot(db.db),
readOpts: NewReadOptions(),
iteratorOpts: NewReadOptions(),
}
s.readOpts.SetSnapshot(s)
s.iteratorOpts.SetSnapshot(s)
s.iteratorOpts.SetFillCache(false)
return s
}
func (db *DB) NewIterator() *Iterator {
it := new(Iterator) it := new(Iterator)
it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt)
@ -281,28 +198,6 @@ func (db *DB) NewIterator() *Iterator {
return it return it
} }
func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
func (db *DB) put(wo *WriteOptions, key, value []byte) error { func (db *DB) put(wo *WriteOptions, key, value []byte) error {
var errStr *C.char var errStr *C.char
var k, v *C.char var k, v *C.char
@ -324,7 +219,7 @@ func (db *DB) put(wo *WriteOptions, key, value []byte) error {
return nil return nil
} }
func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) { func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) {
var errStr *C.char var errStr *C.char
var vallen C.size_t var vallen C.size_t
var k *C.char var k *C.char
@ -347,13 +242,7 @@ func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) {
defer C.leveldb_get_free_ext(unsafe.Pointer(c)) defer C.leveldb_get_free_ext(unsafe.Pointer(c))
if r == nil { return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil
r = []byte{}
}
r = r[0:0]
b := slice(unsafe.Pointer(value), int(C.int(vallen)))
return append(r, b...), nil
} }
func (db *DB) delete(wo *WriteOptions, key []byte) error { func (db *DB) delete(wo *WriteOptions, key []byte) error {

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb package leveldb
// #cgo LDFLAGS: -lleveldb // #cgo LDFLAGS: -lleveldb

70
store/leveldb/iterator.go Normal file
View File

@ -0,0 +1,70 @@
// +build leveldb
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdlib.h>
// #include "leveldb/c.h"
// #include "leveldb_ext.h"
import "C"
import (
"unsafe"
)
type Iterator struct {
it *C.leveldb_iterator_t
isValid C.uchar
}
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
}
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
}
func (it *Iterator) Close() error {
if it.it != nil {
C.leveldb_iter_destroy(it.it)
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid)
}
func (it *Iterator) Next() {
it.isValid = C.leveldb_iter_next_ext(it.it)
}
func (it *Iterator) Prev() {
it.isValid = C.leveldb_iter_prev_ext(it.it)
}
func (it *Iterator) First() {
it.isValid = C.leveldb_iter_seek_to_first_ext(it.it)
}
func (it *Iterator) Last() {
it.isValid = C.leveldb_iter_seek_to_last_ext(it.it)
}
func (it *Iterator) Seek(key []byte) {
it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}

View File

@ -1,3 +1,5 @@
// +build leveldb
#include "leveldb_ext.h" #include "leveldb_ext.h"
#include <stdlib.h> #include <stdlib.h>

View File

@ -1,3 +1,5 @@
// +build leveldb
#ifndef LEVELDB_EXT_H #ifndef LEVELDB_EXT_H
#define LEVELDB_EXT_H #define LEVELDB_EXT_H

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb package leveldb
// #cgo LDFLAGS: -lleveldb // #cgo LDFLAGS: -lleveldb
@ -103,14 +105,6 @@ func (ro *ReadOptions) SetFillCache(b bool) {
C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b))
} }
func (ro *ReadOptions) SetSnapshot(snap *Snapshot) {
var s *C.leveldb_snapshot_t
if snap != nil {
s = snap.snap
}
C.leveldb_readoptions_set_snapshot(ro.Opt, s)
}
func (wo *WriteOptions) Close() { func (wo *WriteOptions) Close() {
C.leveldb_writeoptions_destroy(wo.Opt) C.leveldb_writeoptions_destroy(wo.Opt)
} }

View File

@ -1,3 +1,5 @@
// +build leveldb
package leveldb package leveldb
// #include "leveldb/c.h" // #include "leveldb/c.h"

30
store/mdb.go Normal file
View File

@ -0,0 +1,30 @@
package store
import (
"github.com/siddontang/copier"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/store/mdb"
)
const LMDBName = "lmdb"
type LMDBStore struct {
}
func (s LMDBStore) Open(cfg *Config) (driver.IDB, error) {
c := &mdb.Config{}
copier.Copy(c, cfg)
return mdb.Open(c)
}
func (s LMDBStore) Repair(cfg *Config) error {
c := &mdb.Config{}
copier.Copy(c, cfg)
return mdb.Repair(c)
}
func init() {
Register(LMDBName, LMDBStore{})
}

32
store/mdb/batch.go Normal file
View File

@ -0,0 +1,32 @@
package mdb
type Write struct {
Key []byte
Value []byte
}
type WriteBatch struct {
db MDB
wb []Write
}
func (w WriteBatch) Close() error {
return nil
}
func (w WriteBatch) Put(key, value []byte) {
w.wb = append(w.wb, Write{key, value})
}
func (w WriteBatch) Delete(key []byte) {
w.wb = append(w.wb, Write{key, nil})
}
func (w WriteBatch) Commit() error {
return w.db.BatchPut(w.wb)
}
func (w WriteBatch) Rollback() error {
w.wb = []Write{}
return nil
}

View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013-2014 Errplane Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

249
store/mdb/mdb.go Normal file
View File

@ -0,0 +1,249 @@
package mdb
import (
mdb "github.com/influxdb/gomdb"
"github.com/siddontang/ledisdb/store/driver"
"os"
)
type Config struct {
Path string
MapSize int
}
type MDB struct {
env *mdb.Env
db mdb.DBI
path string
}
func Open(c *Config) (MDB, error) {
path := c.Path
if c.MapSize == 0 {
c.MapSize = 1 * 1024 * 1024 * 1024
}
env, err := mdb.NewEnv()
if err != nil {
return MDB{}, err
}
// TODO: max dbs should be configurable
if err := env.SetMaxDBs(1); err != nil {
return MDB{}, err
}
if err := env.SetMapSize(uint64(c.MapSize)); err != nil {
return MDB{}, err
}
if _, err := os.Stat(path); err != nil {
err = os.MkdirAll(path, 0755)
if err != nil {
return MDB{}, err
}
}
err = env.Open(path, mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755)
if err != nil {
return MDB{}, err
}
tx, err := env.BeginTxn(nil, 0)
if err != nil {
return MDB{}, err
}
dbi, err := tx.DBIOpen(nil, mdb.CREATE)
if err != nil {
return MDB{}, err
}
if err := tx.Commit(); err != nil {
return MDB{}, err
}
db := MDB{
env: env,
db: dbi,
path: path,
}
return db, nil
}
func Repair(c *Config) error {
println("llmd not supports repair")
return nil
}
func (db MDB) Put(key, value []byte) error {
return db.BatchPut([]Write{{key, value}})
}
func (db MDB) BatchPut(writes []Write) error {
itr := db.iterator(false)
for _, w := range writes {
if w.Value == nil {
itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}
} else {
itr.err = itr.c.Put(w.Key, w.Value, 0)
}
if itr.err != nil && itr.err != mdb.NotFound {
break
}
}
itr.setState()
return itr.Close()
}
func (db MDB) Get(key []byte) ([]byte, error) {
tx, err := db.env.BeginTxn(nil, mdb.RDONLY)
if err != nil {
return nil, err
}
defer tx.Commit()
v, err := tx.Get(db.db, key)
if err == mdb.NotFound {
return nil, nil
}
return v, err
}
func (db MDB) Delete(key []byte) error {
itr := db.iterator(false)
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}
itr.setState()
return itr.Close()
}
type MDBIterator struct {
key []byte
value []byte
c *mdb.Cursor
tx *mdb.Txn
valid bool
err error
}
func (itr *MDBIterator) Key() []byte {
return itr.key
}
func (itr *MDBIterator) Value() []byte {
return itr.value
}
func (itr *MDBIterator) Valid() bool {
return itr.valid
}
func (itr *MDBIterator) Error() error {
return itr.err
}
func (itr *MDBIterator) getCurrent() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.GET_CURRENT)
itr.setState()
}
func (itr *MDBIterator) Seek(key []byte) {
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET_RANGE)
itr.setState()
}
func (itr *MDBIterator) Next() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.NEXT)
itr.setState()
}
func (itr *MDBIterator) Prev() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.PREV)
itr.setState()
}
func (itr *MDBIterator) First() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.FIRST)
itr.setState()
}
func (itr *MDBIterator) Last() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.LAST)
itr.setState()
}
func (itr *MDBIterator) setState() {
if itr.err != nil {
if itr.err == mdb.NotFound {
itr.err = nil
}
itr.valid = false
}
}
func (itr *MDBIterator) Close() error {
if err := itr.c.Close(); err != nil {
itr.tx.Abort()
return err
}
if itr.err != nil {
itr.tx.Abort()
return itr.err
}
return itr.tx.Commit()
}
func (_ MDB) Name() string {
return "lmdb"
}
func (db MDB) Path() string {
return db.path
}
func (db MDB) Compact() {
}
func (db MDB) iterator(rdonly bool) *MDBIterator {
flags := uint(0)
if rdonly {
flags = mdb.RDONLY
}
tx, err := db.env.BeginTxn(nil, flags)
if err != nil {
return &MDBIterator{nil, nil, nil, nil, false, err}
}
c, err := tx.CursorOpen(db.db)
if err != nil {
tx.Abort()
return &MDBIterator{nil, nil, nil, nil, false, err}
}
return &MDBIterator{nil, nil, c, tx, true, nil}
}
func (db MDB) Close() error {
db.env.DBIClose(db.db)
if err := db.env.Close(); err != nil {
panic(err)
}
return nil
}
func (db MDB) NewIterator() driver.IIterator {
return db.iterator(true)
}
func (db MDB) NewWriteBatch() driver.IWriteBatch {
return WriteBatch{db, []Write{}}
}

61
store/store.go Normal file
View File

@ -0,0 +1,61 @@
package store
import (
"fmt"
"github.com/siddontang/ledisdb/store/driver"
"os"
)
const DefaultStoreName = "lmdb"
type Store interface {
Open(cfg *Config) (driver.IDB, error)
Repair(cfg *Config) error
}
var dbs = map[string]Store{}
func Register(name string, store Store) {
if _, ok := dbs[name]; ok {
panic(fmt.Errorf("db %s is registered", name))
}
dbs[name] = store
}
func Open(cfg *Config) (*DB, error) {
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
if len(cfg.Name) == 0 {
cfg.Name = DefaultStoreName
}
s, ok := dbs[cfg.Name]
if !ok {
return nil, fmt.Errorf("db %s is not registered", cfg.Name)
}
idb, err := s.Open(cfg)
if err != nil {
return nil, err
}
db := &DB{idb}
return db, nil
}
func Repair(cfg *Config) error {
if len(cfg.Name) == 0 {
cfg.Name = DefaultStoreName
}
s, ok := dbs[cfg.Name]
if !ok {
return fmt.Errorf("db %s is not registered", cfg.Name)
}
return s.Repair(cfg)
}

View File

@ -1,4 +1,4 @@
package leveldb package store
import ( import (
"bytes" "bytes"
@ -8,23 +8,17 @@ import (
"testing" "testing"
) )
var testConfigJson = []byte(`
{
"path" : "./testdb",
"compression":true,
"block_size" : 32768,
"write_buffer_size" : 2097152,
"cache_size" : 20971520
}
`)
var testOnce sync.Once var testOnce sync.Once
var testDB *DB var testDB *DB
func getTestDB() *DB { func getTestDB() *DB {
f := func() { f := func() {
var err error var err error
testDB, err = OpenWithJsonConfig(testConfigJson)
cfg := new(Config)
cfg.Path = "/tmp/testdb"
testDB, err = Open(cfg)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
panic(err) panic(err)
@ -131,7 +125,11 @@ func checkIterator(it *RangeLimitIterator, cv ...int) error {
func TestIterator(t *testing.T) { func TestIterator(t *testing.T) {
db := getTestDB() db := getTestDB()
db.Clear() i := db.NewIterator()
for i.SeekToFirst(); i.Valid(); i.Next() {
db.Delete(i.Key())
}
i.Close()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("key_%d", i)) key := []byte(fmt.Sprintf("key_%d", i))
@ -196,51 +194,6 @@ func TestIterator(t *testing.T) {
} }
} }
func TestSnapshot(t *testing.T) {
db := getTestDB()
key := []byte("key")
value := []byte("hello world")
db.Put(key, value)
s := db.NewSnapshot()
defer s.Close()
db.Put(key, []byte("hello world2"))
if v, err := s.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
}
func TestDestroy(t *testing.T) {
db := getTestDB()
db.Put([]byte("a"), []byte("1"))
if err := db.Clear(); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(db.cfg.Path); err != nil {
t.Fatal("must exist ", err.Error())
}
if v, err := db.Get([]byte("a")); err != nil {
t.Fatal(err)
} else if string(v) == "1" {
t.Fatal(string(v))
}
db.Destroy()
if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) {
t.Fatal("must not exist")
}
}
func TestCloseMore(t *testing.T) { func TestCloseMore(t *testing.T) {
cfg := new(Config) cfg := new(Config)
cfg.Path = "/tmp/testdb1234" cfg.Path = "/tmp/testdb1234"
@ -256,4 +209,6 @@ func TestCloseMore(t *testing.T) {
db.Close() db.Close()
} }
os.RemoveAll(cfg.Path)
} }

9
store/writebatch.go Normal file
View File

@ -0,0 +1,9 @@
package store
import (
"github.com/siddontang/ledisdb/store/driver"
)
type WriteBatch struct {
driver.IWriteBatch
}