From 7a237661daa3cb6ac514b9c91c8ab02bf3a63d8c Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 25 Jul 2014 17:58:00 +0800 Subject: [PATCH] add support for multi storage --- Makefile | 7 +- bootstrap.sh | 5 +- cmd/ledis-repair/main.go | 4 +- dev.sh | 6 +- ledis/config.go | 6 +- ledis/doc.go | 3 +- ledis/dump.go | 14 +- ledis/dump_test.go | 4 +- ledis/ledis.go | 10 +- ledis/ledis_db.go | 4 +- ledis/replication_test.go | 4 +- ledis/t_bit.go | 12 +- ledis/t_hash.go | 14 +- ledis/t_kv.go | 6 +- ledis/t_list.go | 14 +- ledis/t_ttl.go | 4 +- ledis/t_zset.go | 26 +- ledis/tx.go | 4 +- leveldb/snapshot.go | 60 ----- server/cmd_replication_test.go | 4 +- store/config.go | 14 + store/db.go | 75 ++++++ store/driver/driver.go | 39 +++ store/goleveldb.go | 30 +++ store/goleveldb/batch.go | 31 +++ store/goleveldb/db.go | 137 ++++++++++ store/goleveldb/iterator.go | 49 ++++ {leveldb => store}/iterator.go | 65 ++--- store/leveldb.go | 32 +++ {leveldb => store/leveldb}/batch.go | 12 +- {leveldb => store/leveldb}/cache.go | 2 + {leveldb => store/leveldb}/db.go | 207 ++++----------- {leveldb => store/leveldb}/filterpolicy.go | 2 + store/leveldb/iterator.go | 70 +++++ {leveldb => store/leveldb}/leveldb_ext.cc | 2 + {leveldb => store/leveldb}/leveldb_ext.h | 2 + {leveldb => store/leveldb}/levigo-license | 0 {leveldb => store/leveldb}/options.go | 10 +- {leveldb => store/leveldb}/util.go | 2 + store/mdb.go | 30 +++ store/mdb/batch.go | 32 +++ store/mdb/influxdb_license | 20 ++ store/mdb/mdb.go | 249 ++++++++++++++++++ store/store.go | 61 +++++ .../leveldb_test.go => store/store_test.go | 71 +---- store/writebatch.go | 9 + 46 files changed, 1054 insertions(+), 410 deletions(-) delete mode 100644 leveldb/snapshot.go create mode 100644 store/config.go create mode 100644 store/db.go create mode 100644 store/driver/driver.go create mode 100644 store/goleveldb.go create mode 100644 store/goleveldb/batch.go create mode 100644 store/goleveldb/db.go create mode 100644 store/goleveldb/iterator.go rename {leveldb => store}/iterator.go (80%) create mode 100644 store/leveldb.go rename {leveldb => store/leveldb}/batch.go (87%) rename {leveldb => store/leveldb}/cache.go (94%) rename {leveldb => store/leveldb}/db.go (56%) rename {leveldb => store/leveldb}/filterpolicy.go (95%) create mode 100644 store/leveldb/iterator.go rename {leveldb => store/leveldb}/leveldb_ext.cc (98%) rename {leveldb => store/leveldb}/leveldb_ext.h (97%) rename {leveldb => store/leveldb}/levigo-license (100%) rename {leveldb => store/leveldb}/options.go (93%) rename {leveldb => store/leveldb}/util.go (97%) create mode 100644 store/mdb.go create mode 100644 store/mdb/batch.go create mode 100644 store/mdb/influxdb_license create mode 100644 store/mdb/mdb.go create mode 100644 store/store.go rename leveldb/leveldb_test.go => store/store_test.go (77%) create mode 100644 store/writebatch.go diff --git a/Makefile b/Makefile index 1dfe2f3..e3f7fc5 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,12 @@ +GO_BUILD_FLAG += leveldb + all: build build: - go install ./... + go install -tags $(GO_BUILD_FLAG) ./... clean: go clean -i ./... test: - go test ./... - go test -race ./... \ No newline at end of file + go test -tags $(GO_BUILD_FLAG) ./... diff --git a/bootstrap.sh b/bootstrap.sh index 0ba6b65..40b737b 100644 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -4,4 +4,7 @@ go get github.com/siddontang/go-log/log go get github.com/siddontang/go-snappy/snappy -go get github.com/siddontang/copier \ No newline at end of file +go get github.com/siddontang/copier + +go get github.com/syndtr/goleveldb/leveldb +go get github.com/szferi/gomdb \ No newline at end of file diff --git a/cmd/ledis-repair/main.go b/cmd/ledis-repair/main.go index a5b9d73..5a0aff4 100644 --- a/cmd/ledis-repair/main.go +++ b/cmd/ledis-repair/main.go @@ -4,7 +4,7 @@ import ( "encoding/json" "flag" "github.com/siddontang/ledisdb/ledis" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "io/ioutil" ) @@ -35,7 +35,7 @@ func main() { return } - if err = leveldb.Repair(cfg.NewDBConfig()); err != nil { + if err = store.Repair(cfg.NewDBConfig()); err != nil { println("repair error: ", err.Error()) } } diff --git a/dev.sh b/dev.sh index c1c59f7..baec6e1 100644 --- a/dev.sh +++ b/dev.sh @@ -8,12 +8,10 @@ if [[ "$VTTOP" == "${VTTOP/\/src\/github.com\/siddontang\/ledisdb/}" ]]; then exit 1 fi - #default snappy and leveldb install path #you may change yourself - -SNAPPY_DIR=/usr/local/snappy -LEVELDB_DIR=/usr/local/leveldb +export SNAPPY_DIR=/usr/local/snappy +export LEVELDB_DIR=/usr/local/leveldb function add_path() { diff --git a/ledis/config.go b/ledis/config.go index c3a5b84..5c2c1c1 100644 --- a/ledis/config.go +++ b/ledis/config.go @@ -2,7 +2,7 @@ package ledis import ( "github.com/siddontang/copier" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "path" ) @@ -24,10 +24,10 @@ type Config struct { } `json:"binlog"` } -func (cfg *Config) NewDBConfig() *leveldb.Config { +func (cfg *Config) NewDBConfig() *store.Config { dbPath := path.Join(cfg.DataDir, "data") - dbCfg := new(leveldb.Config) + dbCfg := new(store.Config) copier.Copy(dbCfg, &cfg.DB) dbCfg.Path = dbPath return dbCfg diff --git a/ledis/doc.go b/ledis/doc.go index f5c02f2..a31f76e 100644 --- a/ledis/doc.go +++ b/ledis/doc.go @@ -1,4 +1,5 @@ -// Package ledis is a high performance embedded NoSQL based on leveldb. +// Package ledis is a high performance embedded NoSQL. +// // Ledis supports various advanced data structure like kv, list, hash and zset like redis. // // Other features include binlog replication, data with a limited time-to-live. diff --git a/ledis/dump.go b/ledis/dump.go index 95232de..14d7ff7 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -5,7 +5,6 @@ import ( "bytes" "encoding/binary" "github.com/siddontang/go-snappy/snappy" - "github.com/siddontang/ledisdb/leveldb" "io" "os" ) @@ -57,16 +56,13 @@ func (l *Ledis) DumpFile(path string) error { } func (l *Ledis) Dump(w io.Writer) error { - var sp *leveldb.Snapshot var m *MasterInfo = new(MasterInfo) - if l.binlog == nil { - sp = l.ldb.NewSnapshot() - } else { - l.Lock() - sp = l.ldb.NewSnapshot() + l.Lock() + defer l.Unlock() + + if l.binlog != nil { m.LogFileIndex = l.binlog.LogFileIndex() m.LogPos = l.binlog.LogFilePos() - l.Unlock() } var err error @@ -76,7 +72,7 @@ func (l *Ledis) Dump(w io.Writer) error { return err } - it := sp.NewIterator() + it := l.ldb.NewIterator() it.SeekToFirst() compressBuf := make([]byte, 4096) diff --git a/ledis/dump_test.go b/ledis/dump_test.go index 8ca80b8..a22d72a 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -2,7 +2,7 @@ package ledis import ( "bytes" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "os" "testing" ) @@ -59,7 +59,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/ledis/ledis.go b/ledis/ledis.go index 9d5b9ac..8c3f175 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "sync" "time" ) @@ -12,7 +12,7 @@ import ( type DB struct { l *Ledis - db *leveldb.DB + db *store.DB index uint8 @@ -28,7 +28,7 @@ type Ledis struct { cfg *Config - ldb *leveldb.DB + ldb *store.DB dbs [MaxDBNumber]*DB binlog *BinLog @@ -52,7 +52,7 @@ func Open(cfg *Config) (*Ledis, error) { return nil, fmt.Errorf("must set correct data_dir") } - ldb, err := leveldb.Open(cfg.NewDBConfig()) + ldb, err := store.Open(cfg.NewDBConfig()) if err != nil { return nil, err } @@ -132,7 +132,7 @@ func (l *Ledis) FlushAll() error { } // very dangerous to use -func (l *Ledis) DataDB() *leveldb.DB { +func (l *Ledis) DataDB() *store.DB { return l.ldb } diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 83819ae..7d58054 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -1,7 +1,7 @@ package ledis import ( - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" ) func (db *DB) FlushAll() (drop int64, err error) { @@ -36,7 +36,7 @@ func (db *DB) newEliminator() *elimination { } func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) { - it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen) + it := db.db.RangeIterator(minKey, maxKey, store.RangeROpen) for ; it.Valid(); it.Next() { t.Delete(it.RawKey()) drop++ diff --git a/ledis/replication_test.go b/ledis/replication_test.go index b5a939b..ebc5c41 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -3,14 +3,14 @@ package ledis import ( "bytes" "fmt" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "os" "path" "testing" ) func checkLedisEqual(master *Ledis, slave *Ledis) error { - it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/ledis/t_bit.go b/ledis/t_bit.go index b93355d..37b788d 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "sort" "time" ) @@ -253,7 +253,7 @@ func (db *DB) bDelete(t *tx, key []byte) (drop int64) { minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, maxSeq) - it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + it := db.db.RangeIterator(minKey, maxKey, store.RangeClose) for ; it.Valid(); it.Next() { t.Delete(it.RawKey()) drop++ @@ -280,10 +280,10 @@ func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { return bk, segment, err } -func (db *DB) bIterator(key []byte) *leveldb.RangeLimitIterator { +func (db *DB) bIterator(key []byte) *store.RangeLimitIterator { sk := db.bEncodeBinKey(key, minSeq) ek := db.bEncodeBinKey(key, maxSeq) - return db.db.RangeIterator(sk, ek, leveldb.RangeClose) + return db.db.RangeIterator(sk, ek, store.RangeClose) } func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) { @@ -446,7 +446,7 @@ func (db *DB) BGet(key []byte) (data []byte, err error) { minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, tailSeq) - it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeClose) + it := db.db.RangeIterator(minKey, maxKey, store.RangeClose) var seq, s, e uint32 for ; it.Valid(); it.Next() { @@ -662,7 +662,7 @@ func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) skey := db.bEncodeBinKey(key, sseq) ekey := db.bEncodeBinKey(key, eseq) - it := db.db.RangeIterator(skey, ekey, leveldb.RangeOpen) + it := db.db.RangeIterator(skey, ekey, store.RangeOpen) for ; it.Valid(); it.Next() { segment = it.RawValue() for _, bt := range segment { diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 5f6be64..63c38da 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -132,7 +132,7 @@ func (db *DB) hDelete(t *tx, key []byte) int64 { stop := db.hEncodeStopKey(key) var num int64 = 0 - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) num++ @@ -354,7 +354,7 @@ func (db *DB) HGetAll(key []byte) ([]FVPair, error) { v := make([]FVPair, 0, 16) - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, f, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -379,7 +379,7 @@ func (db *DB) HKeys(key []byte) ([][]byte, error) { v := make([][]byte, 0, 16) - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, f, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -403,7 +403,7 @@ func (db *DB) HValues(key []byte) ([][]byte, error) { v := make([][]byte, 0, 16) - it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, _, err := db.hDecodeHashKey(it.Key()) if err != nil { @@ -491,9 +491,9 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa v := make([]FVPair, 0, count) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen if !inclusive { - rangeType = leveldb.RangeOpen + rangeType = store.RangeOpen } it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 116aa87..dd8a2bb 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -2,7 +2,7 @@ package ledis import ( "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -343,9 +343,9 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) { v := make([]KVPair, 0, 2*count) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen if !inclusive { - rangeType = leveldb.RangeOpen + rangeType = store.RangeOpen } it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) diff --git a/ledis/t_list.go b/ledis/t_list.go index a7ab1ac..079498e 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -203,7 +203,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { startKey := db.lEncodeListKey(key, headSeq) stopKey := db.lEncodeListKey(key, tailSeq) - rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose}) + rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose}) for ; rit.Valid(); rit.Next() { t.Delete(rit.RawKey()) num++ @@ -214,7 +214,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { return num } -func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { +func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { var v []byte if it != nil { v = it.Find(ek) @@ -364,12 +364,12 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { v := make([][]byte, 0, limit) startKey := db.lEncodeListKey(key, headSeq) - rit := leveldb.NewRangeLimitIterator(it, - &leveldb.Range{ + rit := store.NewRangeLimitIterator(it, + &store.Range{ Min: startKey, Max: nil, - Type: leveldb.RangeClose}, - &leveldb.Limit{ + Type: store.RangeClose}, + &store.Limit{ Offset: 0, Count: int(limit)}) diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index aa34b73..e41ed10 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -3,7 +3,7 @@ package ledis import ( "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -156,7 +156,7 @@ func (eli *elimination) active() { minKey := db.expEncodeTimeKey(NoneType, nil, 0) maxKey := db.expEncodeTimeKey(maxDataType, nil, now) - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { tk := it.RawKey() mk := it.RawValue() diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 4931ded..151f8eb 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/binary" "errors" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "time" ) @@ -432,7 +432,7 @@ func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) { minKey := db.zEncodeStartScoreKey(key, min) maxKey := db.zEncodeStopScoreKey(key, max) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1) var n int64 = 0 @@ -460,17 +460,17 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { if s, err := Int64(v, nil); err != nil { return 0, err } else { - var rit *leveldb.RangeLimitIterator + var rit *store.RangeLimitIterator sk := db.zEncodeScoreKey(key, member, s) if !reverse { minKey := db.zEncodeStartScoreKey(key, MinScore) - rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose}) + rit = store.NewRangeIterator(it, &store.Range{minKey, sk, store.RangeClose}) } else { maxKey := db.zEncodeStopScoreKey(key, MaxScore) - rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose}) + rit = store.NewRevRangeIterator(it, &store.Range{sk, maxKey, store.RangeClose}) } var lastKey []byte = nil @@ -492,14 +492,14 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { return -1, nil } -func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *leveldb.RangeLimitIterator { +func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator { minKey := db.zEncodeStartScoreKey(key, min) maxKey := db.zEncodeStopScoreKey(key, max) if !reverse { - return db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count) + return db.db.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count) } else { - return db.db.RevRangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, count) + return db.db.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count) } } @@ -550,10 +550,10 @@ func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, re v := make([]ScorePair, 0, nv) - var it *leveldb.RangeLimitIterator + var it *store.RangeLimitIterator //if reverse and offset is 0, count < 0, we may use forward iterator then reverse - //because leveldb iterator prev is slower than next + //because store iterator prev is slower than next if !reverse || (offset == 0 && count < 0) { it = db.zIterator(key, min, max, offset, count, false) } else { @@ -740,7 +740,7 @@ func (db *DB) zFlush() (drop int64, err error) { maxKey[0] = db.index maxKey[1] = ZScoreType + 1 - it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + it := db.db.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) defer it.Close() for ; it.Valid(); it.Next() { @@ -779,9 +779,9 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco v := make([]ScorePair, 0, 2*count) - rangeType := leveldb.RangeROpen + rangeType := store.RangeROpen if !inclusive { - rangeType = leveldb.RangeOpen + rangeType = store.RangeOpen } it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) diff --git a/ledis/tx.go b/ledis/tx.go index 0fe716a..f49cd34 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -1,7 +1,7 @@ package ledis import ( - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "sync" ) @@ -9,7 +9,7 @@ type tx struct { m sync.Mutex l *Ledis - wb *leveldb.WriteBatch + wb *store.WriteBatch binlog *BinLog batch [][]byte diff --git a/leveldb/snapshot.go b/leveldb/snapshot.go deleted file mode 100644 index 7d49317..0000000 --- a/leveldb/snapshot.go +++ /dev/null @@ -1,60 +0,0 @@ -package leveldb - -// #cgo LDFLAGS: -lleveldb -// #include -// #include "leveldb/c.h" -import "C" - -type Snapshot struct { - db *DB - - snap *C.leveldb_snapshot_t - - readOpts *ReadOptions - iteratorOpts *ReadOptions -} - -func (s *Snapshot) Close() { - C.leveldb_release_snapshot(s.db.db, s.snap) - - s.iteratorOpts.Close() - s.readOpts.Close() -} - -func (s *Snapshot) Get(key []byte) ([]byte, error) { - return s.db.get(nil, s.readOpts, key) -} - -func (s *Snapshot) BufGet(r []byte, key []byte) ([]byte, error) { - return s.db.get(r, s.readOpts, key) -} - -func (s *Snapshot) NewIterator() *Iterator { - it := new(Iterator) - - it.it = C.leveldb_create_iterator(s.db.db, s.iteratorOpts.Opt) - - return it -} - -func (s *Snapshot) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index dce4b6f..4b0111a 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -3,14 +3,14 @@ package server import ( "bytes" "fmt" - "github.com/siddontang/ledisdb/leveldb" + "github.com/siddontang/ledisdb/store" "os" "testing" "time" ) func checkDataEqual(master *App, slave *App) error { - it := master.ldb.DataDB().RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1) + it := master.ldb.DataDB().RangeLimitIterator(nil, nil, store.RangeClose, 0, -1) for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() diff --git a/store/config.go b/store/config.go new file mode 100644 index 0000000..f2c8eee --- /dev/null +++ b/store/config.go @@ -0,0 +1,14 @@ +package store + +type Config struct { + Name string + + Path string + + //for leveldb, goleveldb, rocksdb + Compression bool + BlockSize int + WriteBufferSize int + CacheSize int + MaxOpenFiles int +} diff --git a/store/db.go b/store/db.go new file mode 100644 index 0000000..adad004 --- /dev/null +++ b/store/db.go @@ -0,0 +1,75 @@ +package store + +import ( + "github.com/siddontang/ledisdb/store/driver" +) + +type DB struct { + db driver.IDB +} + +// Close database +// +// Caveat +// Any other DB operations like Get, Put, etc... may cause a panic after Close +// +func (db *DB) Close() error { + if db.db == nil { + return nil + } + + err := db.db.Close() + db.db = nil + + return err +} + +// Get Value with Key +func (db *DB) Get(key []byte) ([]byte, error) { + return db.db.Get(key) +} + +// Put value with key +func (db *DB) Put(key []byte, value []byte) error { + err := db.db.Put(key, value) + return err +} + +// Delete by key +func (db *DB) Delete(key []byte) error { + err := db.db.Delete(key) + return err +} + +func (db *DB) NewIterator() *Iterator { + it := new(Iterator) + it.it = db.db.NewIterator() + + return it +} + +func (db *DB) NewWriteBatch() *WriteBatch { + return &WriteBatch{db.db.NewWriteBatch()} +} + +func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { + return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} + +//count < 0, unlimit. +// +//offset must >= 0, if < 0, will get nothing. +func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { + return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) +} diff --git a/store/driver/driver.go b/store/driver/driver.go new file mode 100644 index 0000000..3be1c7e --- /dev/null +++ b/store/driver/driver.go @@ -0,0 +1,39 @@ +package driver + +type IDB interface { + Close() error + + Get(key []byte) ([]byte, error) + + Put(key []byte, value []byte) error + Delete(key []byte) error + + NewIterator() IIterator + + NewWriteBatch() IWriteBatch +} + +type IIterator interface { + Close() error + + First() + Last() + Seek(key []byte) + + Next() + Prev() + + Valid() bool + + Key() []byte + Value() []byte +} + +type IWriteBatch interface { + Close() error + + Put(key []byte, value []byte) + Delete(key []byte) + Commit() error + Rollback() error +} diff --git a/store/goleveldb.go b/store/goleveldb.go new file mode 100644 index 0000000..d9d7703 --- /dev/null +++ b/store/goleveldb.go @@ -0,0 +1,30 @@ +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/goleveldb" +) + +const GoLevelDBName = "goleveldb" + +type GoLevelDBStore struct { +} + +func (s GoLevelDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &goleveldb.Config{} + copier.Copy(c, cfg) + + return goleveldb.Open(c) +} + +func (s GoLevelDBStore) Repair(cfg *Config) error { + c := &goleveldb.Config{} + copier.Copy(c, cfg) + + return goleveldb.Repair(c) +} + +func init() { + Register(GoLevelDBName, GoLevelDBStore{}) +} diff --git a/store/goleveldb/batch.go b/store/goleveldb/batch.go new file mode 100644 index 0000000..f3ff67c --- /dev/null +++ b/store/goleveldb/batch.go @@ -0,0 +1,31 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +type WriteBatch struct { + db *DB + wbatch *leveldb.Batch +} + +func (w *WriteBatch) Close() error { + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + w.wbatch.Put(key, value) +} + +func (w *WriteBatch) Delete(key []byte) { + w.wbatch.Delete(key) +} + +func (w *WriteBatch) Commit() error { + return w.db.db.Write(w.wbatch, nil) +} + +func (w *WriteBatch) Rollback() error { + w.wbatch.Reset() + return nil +} diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go new file mode 100644 index 0000000..c208489 --- /dev/null +++ b/store/goleveldb/db.go @@ -0,0 +1,137 @@ +package goleveldb + +import ( + "github.com/siddontang/ledisdb/store/driver" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/cache" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" + "os" +) + +const defaultFilterBits int = 10 + +type Config struct { + Path string + + Compression bool + + BlockSize int + WriteBufferSize int + CacheSize int +} + +type DB struct { + cfg *Config + + db *leveldb.DB + + opts *opt.Options + + iteratorOpts *opt.ReadOptions + + cache cache.Cache + + filter filter.Filter +} + +func Open(cfg *Config) (*DB, error) { + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.cfg = cfg + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func Repair(cfg *Config) error { + db, err := leveldb.RecoverFile(cfg.Path, newOptions(cfg)) + if err != nil { + return err + } + + db.Close() + return nil +} + +func (db *DB) open() error { + db.opts = newOptions(db.cfg) + + db.iteratorOpts = &opt.ReadOptions{} + db.iteratorOpts.DontFillCache = true + + var err error + db.db, err = leveldb.OpenFile(db.cfg.Path, db.opts) + + return err +} + +func newOptions(cfg *Config) *opt.Options { + opts := &opt.Options{} + opts.ErrorIfMissing = false + + if cfg.CacheSize > 0 { + opts.BlockCache = cache.NewLRUCache(cfg.CacheSize) + } + + //we must use bloomfilter + opts.Filter = filter.NewBloomFilter(defaultFilterBits) + + if !cfg.Compression { + opts.Compression = opt.NoCompression + } else { + opts.Compression = opt.SnappyCompression + } + + if cfg.BlockSize > 0 { + opts.BlockSize = cfg.BlockSize + } + + if cfg.WriteBufferSize > 0 { + opts.WriteBuffer = cfg.WriteBufferSize + } + + return opts +} + +func (db *DB) Close() error { + return db.db.Close() +} + +func (db *DB) Put(key, value []byte) error { + return db.db.Put(key, value, nil) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + v, err := db.db.Get(key, nil) + if err == leveldb.ErrNotFound { + return nil, nil + } + return v, nil +} + +func (db *DB) Delete(key []byte) error { + return db.db.Delete(key, nil) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: new(leveldb.Batch), + } + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := &Iterator{ + db.db.NewIterator(nil, db.iteratorOpts), + } + + return it +} diff --git a/store/goleveldb/iterator.go b/store/goleveldb/iterator.go new file mode 100644 index 0000000..c1fd8b5 --- /dev/null +++ b/store/goleveldb/iterator.go @@ -0,0 +1,49 @@ +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type Iterator struct { + it iterator.Iterator +} + +func (it *Iterator) Key() []byte { + return it.it.Key() +} + +func (it *Iterator) Value() []byte { + return it.it.Value() +} + +func (it *Iterator) Close() error { + if it.it != nil { + it.it.Release() + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return it.it.Valid() +} + +func (it *Iterator) Next() { + it.it.Next() +} + +func (it *Iterator) Prev() { + it.it.Prev() +} + +func (it *Iterator) First() { + it.it.First() +} + +func (it *Iterator) Last() { + it.it.Last() +} + +func (it *Iterator) Seek(key []byte) { + it.it.Seek(key) +} diff --git a/leveldb/iterator.go b/store/iterator.go similarity index 80% rename from leveldb/iterator.go rename to store/iterator.go index a75e4d7..ff2536d 100644 --- a/leveldb/iterator.go +++ b/store/iterator.go @@ -1,14 +1,8 @@ -package leveldb - -// #cgo LDFLAGS: -lleveldb -// #include -// #include "leveldb/c.h" -// #include "leveldb_ext.h" -import "C" +package store import ( "bytes" - "unsafe" + "github.com/siddontang/ledisdb/store/driver" ) const ( @@ -27,10 +21,10 @@ const ( // // range type: // -// close: [min, max] -// open: (min, max) -// lopen: (min, max] -// ropen: [min, max) +// close: [min, max] +// open: (min, max) +// lopen: (min, max] +// ropen: [min, max) // type Range struct { Min []byte @@ -45,54 +39,39 @@ type Limit struct { } type Iterator struct { - it *C.leveldb_iterator_t - isValid C.uchar + it driver.IIterator } // Returns a copy of key. func (it *Iterator) Key() []byte { - var klen C.size_t - kdata := C.leveldb_iter_key(it.it, &klen) - if kdata == nil { + k := it.it.Key() + if k == nil { return nil } - return C.GoBytes(unsafe.Pointer(kdata), C.int(klen)) + return append([]byte{}, k...) } // Returns a copy of value. func (it *Iterator) Value() []byte { - var vlen C.size_t - vdata := C.leveldb_iter_value(it.it, &vlen) - if vdata == nil { + v := it.it.Value() + if v == nil { return nil } - return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen)) + return append([]byte{}, v...) } // Returns a reference of key. // you must be careful that it will be changed after next iterate. func (it *Iterator) RawKey() []byte { - var klen C.size_t - kdata := C.leveldb_iter_key(it.it, &klen) - if kdata == nil { - return nil - } - - return slice(unsafe.Pointer(kdata), int(C.int(klen))) + return it.it.Key() } // Returns a reference of value. // you must be careful that it will be changed after next iterate. func (it *Iterator) RawValue() []byte { - var vlen C.size_t - vdata := C.leveldb_iter_value(it.it, &vlen) - if vdata == nil { - return nil - } - - return slice(unsafe.Pointer(vdata), int(C.int(vlen))) + return it.it.Value() } // Copy key to b, if b len is small or nil, returns a new one. @@ -126,33 +105,33 @@ func (it *Iterator) BufValue(b []byte) []byte { func (it *Iterator) Close() { if it.it != nil { - C.leveldb_iter_destroy(it.it) + it.it.Close() it.it = nil } } func (it *Iterator) Valid() bool { - return ucharToBool(it.isValid) + return it.it.Valid() } func (it *Iterator) Next() { - it.isValid = C.leveldb_iter_next_ext(it.it) + it.it.Next() } func (it *Iterator) Prev() { - it.isValid = C.leveldb_iter_prev_ext(it.it) + it.it.Prev() } func (it *Iterator) SeekToFirst() { - it.isValid = C.leveldb_iter_seek_to_first_ext(it.it) + it.it.First() } func (it *Iterator) SeekToLast() { - it.isValid = C.leveldb_iter_seek_to_last_ext(it.it) + it.it.Last() } func (it *Iterator) Seek(key []byte) { - it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) + it.it.Seek(key) } // Finds by key, if not found, nil returns. diff --git a/store/leveldb.go b/store/leveldb.go new file mode 100644 index 0000000..3453bc8 --- /dev/null +++ b/store/leveldb.go @@ -0,0 +1,32 @@ +// +build leveldb + +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/leveldb" +) + +const LevelDBName = "leveldb" + +type LevelDBStore struct { +} + +func (s LevelDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &leveldb.Config{} + copier.Copy(c, cfg) + + return leveldb.Open(c) +} + +func (s LevelDBStore) Repair(cfg *Config) error { + c := &leveldb.Config{} + copier.Copy(c, cfg) + + return leveldb.Repair(c) +} + +func init() { + Register(LevelDBName, LevelDBStore{}) +} diff --git a/leveldb/batch.go b/store/leveldb/batch.go similarity index 87% rename from leveldb/batch.go rename to store/leveldb/batch.go index f24ec65..87600cb 100644 --- a/leveldb/batch.go +++ b/store/leveldb/batch.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb @@ -13,8 +15,9 @@ type WriteBatch struct { wbatch *C.leveldb_writebatch_t } -func (w *WriteBatch) Close() { +func (w *WriteBatch) Close() error { C.leveldb_writebatch_destroy(w.wbatch) + return nil } func (w *WriteBatch) Put(key, value []byte) { @@ -41,12 +44,9 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } -func (w *WriteBatch) SyncCommit() error { - return w.commit(w.db.syncWriteOpts) -} - -func (w *WriteBatch) Rollback() { +func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) + return nil } func (w *WriteBatch) commit(wb *WriteOptions) error { diff --git a/leveldb/cache.go b/store/leveldb/cache.go similarity index 94% rename from leveldb/cache.go rename to store/leveldb/cache.go index 3fbcf0d..e5587cb 100644 --- a/leveldb/cache.go +++ b/store/leveldb/cache.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb diff --git a/leveldb/db.go b/store/leveldb/db.go similarity index 56% rename from leveldb/db.go rename to store/leveldb/db.go index c6bfa94..453a139 100644 --- a/leveldb/db.go +++ b/store/leveldb/db.go @@ -1,3 +1,5 @@ +// +build leveldb + // Package leveldb is a wrapper for c++ leveldb package leveldb @@ -9,7 +11,7 @@ package leveldb import "C" import ( - "encoding/json" + "github.com/siddontang/ledisdb/store/driver" "os" "unsafe" ) @@ -17,42 +19,13 @@ import ( const defaultFilterBits int = 10 type Config struct { - Path string `json:"path"` + Path string - Compression bool `json:"compression"` - BlockSize int `json:"block_size"` - WriteBufferSize int `json:"write_buffer_size"` - CacheSize int `json:"cache_size"` - MaxOpenFiles int `json:"max_open_files"` -} - -type DB struct { - cfg *Config - - db *C.leveldb_t - - opts *Options - - //for default read and write options - readOpts *ReadOptions - writeOpts *WriteOptions - iteratorOpts *ReadOptions - - syncWriteOpts *WriteOptions - - cache *Cache - - filter *FilterPolicy -} - -func OpenWithJsonConfig(configJson json.RawMessage) (*DB, error) { - cfg := new(Config) - err := json.Unmarshal(configJson, cfg) - if err != nil { - return nil, err - } - - return Open(cfg) + Compression bool + BlockSize int + WriteBufferSize int + CacheSize int + MaxOpenFiles int } func Open(cfg *Config) (*DB, error) { @@ -70,21 +43,6 @@ func Open(cfg *Config) (*DB, error) { return db, nil } -func (db *DB) open() error { - db.initOptions(db.cfg) - - var errStr *C.char - ldbname := C.CString(db.cfg.Path) - defer C.leveldb_free(unsafe.Pointer(ldbname)) - - db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) - if errStr != nil { - db.db = nil - return saveError(errStr) - } - return nil -} - func Repair(cfg *Config) error { db := new(DB) db.cfg = cfg @@ -108,6 +66,38 @@ func Repair(cfg *Config) error { return nil } +type DB struct { + cfg *Config + + db *C.leveldb_t + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + cache *Cache + + filter *FilterPolicy +} + +func (db *DB) open() error { + db.initOptions(db.cfg) + + var errStr *C.char + ldbname := C.CString(db.cfg.Path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + db.db = nil + return saveError(errStr) + } + return nil +} + func (db *DB) initOptions(cfg *Config) { opts := NewOptions() @@ -126,6 +116,8 @@ func (db *DB) initOptions(cfg *Config) { if !cfg.Compression { opts.SetCompression(NoCompression) + } else { + opts.SetCompression(SnappyCompression) } if cfg.BlockSize <= 0 { @@ -153,9 +145,6 @@ func (db *DB) initOptions(cfg *Config) { db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) - - db.syncWriteOpts = NewWriteOptions() - db.syncWriteOpts.SetSync(true) } func (db *DB) Close() error { @@ -177,80 +166,23 @@ func (db *DB) Close() error { db.readOpts.Close() db.writeOpts.Close() db.iteratorOpts.Close() - db.syncWriteOpts.Close() return nil } -func (db *DB) Destroy() error { - path := db.cfg.Path - - db.Close() - - opts := NewOptions() - defer opts.Close() - - var errStr *C.char - ldbname := C.CString(path) - defer C.leveldb_free(unsafe.Pointer(ldbname)) - - C.leveldb_destroy_db(opts.Opt, ldbname, &errStr) - if errStr != nil { - return saveError(errStr) - } - return nil -} - -func (db *DB) Clear() error { - bc := db.NewWriteBatch() - defer bc.Close() - - var err error - it := db.NewIterator() - it.SeekToFirst() - - num := 0 - for ; it.Valid(); it.Next() { - bc.Delete(it.RawKey()) - num++ - if num == 1000 { - num = 0 - if err = bc.Commit(); err != nil { - return err - } - } - } - - err = bc.Commit() - - return err -} - func (db *DB) Put(key, value []byte) error { return db.put(db.writeOpts, key, value) } -func (db *DB) SyncPut(key, value []byte) error { - return db.put(db.syncWriteOpts, key, value) -} - func (db *DB) Get(key []byte) ([]byte, error) { - return db.get(nil, db.readOpts, key) -} - -func (db *DB) BufGet(r []byte, key []byte) ([]byte, error) { - return db.get(r, db.readOpts, key) + return db.get(db.readOpts, key) } func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } -func (db *DB) SyncDelete(key []byte) error { - return db.delete(db.syncWriteOpts, key) -} - -func (db *DB) NewWriteBatch() *WriteBatch { +func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, wbatch: C.leveldb_writebatch_create(), @@ -258,22 +190,7 @@ func (db *DB) NewWriteBatch() *WriteBatch { return wb } -func (db *DB) NewSnapshot() *Snapshot { - s := &Snapshot{ - db: db, - snap: C.leveldb_create_snapshot(db.db), - readOpts: NewReadOptions(), - iteratorOpts: NewReadOptions(), - } - - s.readOpts.SetSnapshot(s) - s.iteratorOpts.SetSnapshot(s) - s.iteratorOpts.SetFillCache(false) - - return s -} - -func (db *DB) NewIterator() *Iterator { +func (db *DB) NewIterator() driver.IIterator { it := new(Iterator) it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) @@ -281,28 +198,6 @@ func (db *DB) NewIterator() *Iterator { return it } -func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { - return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} - -//count < 0, unlimit. -// -//offset must >= 0, if < 0, will get nothing. -func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { - return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) -} - func (db *DB) put(wo *WriteOptions, key, value []byte) error { var errStr *C.char var k, v *C.char @@ -324,7 +219,7 @@ func (db *DB) put(wo *WriteOptions, key, value []byte) error { return nil } -func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) { +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { var errStr *C.char var vallen C.size_t var k *C.char @@ -347,13 +242,7 @@ func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) { defer C.leveldb_get_free_ext(unsafe.Pointer(c)) - if r == nil { - r = []byte{} - } - - r = r[0:0] - b := slice(unsafe.Pointer(value), int(C.int(vallen))) - return append(r, b...), nil + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil } func (db *DB) delete(wo *WriteOptions, key []byte) error { diff --git a/leveldb/filterpolicy.go b/store/leveldb/filterpolicy.go similarity index 95% rename from leveldb/filterpolicy.go rename to store/leveldb/filterpolicy.go index b007d58..640139f 100644 --- a/leveldb/filterpolicy.go +++ b/store/leveldb/filterpolicy.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb diff --git a/store/leveldb/iterator.go b/store/leveldb/iterator.go new file mode 100644 index 0000000..49cfd7d --- /dev/null +++ b/store/leveldb/iterator.go @@ -0,0 +1,70 @@ +// +build leveldb + +package leveldb + +// #cgo LDFLAGS: -lleveldb +// #include +// #include "leveldb/c.h" +// #include "leveldb_ext.h" +import "C" + +import ( + "unsafe" +) + +type Iterator struct { + it *C.leveldb_iterator_t + isValid C.uchar +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +func (it *Iterator) Close() error { + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(it.isValid) +} + +func (it *Iterator) Next() { + it.isValid = C.leveldb_iter_next_ext(it.it) +} + +func (it *Iterator) Prev() { + it.isValid = C.leveldb_iter_prev_ext(it.it) +} + +func (it *Iterator) First() { + it.isValid = C.leveldb_iter_seek_to_first_ext(it.it) +} + +func (it *Iterator) Last() { + it.isValid = C.leveldb_iter_seek_to_last_ext(it.it) +} + +func (it *Iterator) Seek(key []byte) { + it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} diff --git a/leveldb/leveldb_ext.cc b/store/leveldb/leveldb_ext.cc similarity index 98% rename from leveldb/leveldb_ext.cc rename to store/leveldb/leveldb_ext.cc index 9ed2579..96d6541 100644 --- a/leveldb/leveldb_ext.cc +++ b/store/leveldb/leveldb_ext.cc @@ -1,3 +1,5 @@ +// +build leveldb + #include "leveldb_ext.h" #include diff --git a/leveldb/leveldb_ext.h b/store/leveldb/leveldb_ext.h similarity index 97% rename from leveldb/leveldb_ext.h rename to store/leveldb/leveldb_ext.h index 8b26010..8222ae3 100644 --- a/leveldb/leveldb_ext.h +++ b/store/leveldb/leveldb_ext.h @@ -1,3 +1,5 @@ +// +build leveldb + #ifndef LEVELDB_EXT_H #define LEVELDB_EXT_H diff --git a/leveldb/levigo-license b/store/leveldb/levigo-license similarity index 100% rename from leveldb/levigo-license rename to store/leveldb/levigo-license diff --git a/leveldb/options.go b/store/leveldb/options.go similarity index 93% rename from leveldb/options.go rename to store/leveldb/options.go index 62836c0..83009be 100644 --- a/leveldb/options.go +++ b/store/leveldb/options.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #cgo LDFLAGS: -lleveldb @@ -103,14 +105,6 @@ func (ro *ReadOptions) SetFillCache(b bool) { C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) } -func (ro *ReadOptions) SetSnapshot(snap *Snapshot) { - var s *C.leveldb_snapshot_t - if snap != nil { - s = snap.snap - } - C.leveldb_readoptions_set_snapshot(ro.Opt, s) -} - func (wo *WriteOptions) Close() { C.leveldb_writeoptions_destroy(wo.Opt) } diff --git a/leveldb/util.go b/store/leveldb/util.go similarity index 97% rename from leveldb/util.go rename to store/leveldb/util.go index e1fd57d..6efe33b 100644 --- a/leveldb/util.go +++ b/store/leveldb/util.go @@ -1,3 +1,5 @@ +// +build leveldb + package leveldb // #include "leveldb/c.h" diff --git a/store/mdb.go b/store/mdb.go new file mode 100644 index 0000000..364065a --- /dev/null +++ b/store/mdb.go @@ -0,0 +1,30 @@ +package store + +import ( + "github.com/siddontang/copier" + "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store/mdb" +) + +const LMDBName = "lmdb" + +type LMDBStore struct { +} + +func (s LMDBStore) Open(cfg *Config) (driver.IDB, error) { + c := &mdb.Config{} + copier.Copy(c, cfg) + + return mdb.Open(c) +} + +func (s LMDBStore) Repair(cfg *Config) error { + c := &mdb.Config{} + copier.Copy(c, cfg) + + return mdb.Repair(c) +} + +func init() { + Register(LMDBName, LMDBStore{}) +} diff --git a/store/mdb/batch.go b/store/mdb/batch.go new file mode 100644 index 0000000..2bf2c04 --- /dev/null +++ b/store/mdb/batch.go @@ -0,0 +1,32 @@ +package mdb + +type Write struct { + Key []byte + Value []byte +} + +type WriteBatch struct { + db MDB + wb []Write +} + +func (w WriteBatch) Close() error { + return nil +} + +func (w WriteBatch) Put(key, value []byte) { + w.wb = append(w.wb, Write{key, value}) +} + +func (w WriteBatch) Delete(key []byte) { + w.wb = append(w.wb, Write{key, nil}) +} + +func (w WriteBatch) Commit() error { + return w.db.BatchPut(w.wb) +} + +func (w WriteBatch) Rollback() error { + w.wb = []Write{} + return nil +} diff --git a/store/mdb/influxdb_license b/store/mdb/influxdb_license new file mode 100644 index 0000000..03f21e8 --- /dev/null +++ b/store/mdb/influxdb_license @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013-2014 Errplane Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go new file mode 100644 index 0000000..ef9d2bf --- /dev/null +++ b/store/mdb/mdb.go @@ -0,0 +1,249 @@ +package mdb + +import ( + mdb "github.com/influxdb/gomdb" + "github.com/siddontang/ledisdb/store/driver" + "os" +) + +type Config struct { + Path string + MapSize int +} + +type MDB struct { + env *mdb.Env + db mdb.DBI + path string +} + +func Open(c *Config) (MDB, error) { + path := c.Path + if c.MapSize == 0 { + c.MapSize = 1 * 1024 * 1024 * 1024 + } + + env, err := mdb.NewEnv() + if err != nil { + return MDB{}, err + } + + // TODO: max dbs should be configurable + if err := env.SetMaxDBs(1); err != nil { + return MDB{}, err + } + if err := env.SetMapSize(uint64(c.MapSize)); err != nil { + return MDB{}, err + } + + if _, err := os.Stat(path); err != nil { + err = os.MkdirAll(path, 0755) + if err != nil { + return MDB{}, err + } + } + + err = env.Open(path, mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755) + if err != nil { + return MDB{}, err + } + + tx, err := env.BeginTxn(nil, 0) + if err != nil { + return MDB{}, err + } + + dbi, err := tx.DBIOpen(nil, mdb.CREATE) + if err != nil { + return MDB{}, err + } + + if err := tx.Commit(); err != nil { + return MDB{}, err + } + + db := MDB{ + env: env, + db: dbi, + path: path, + } + + return db, nil +} + +func Repair(c *Config) error { + println("llmd not supports repair") + return nil +} + +func (db MDB) Put(key, value []byte) error { + return db.BatchPut([]Write{{key, value}}) +} + +func (db MDB) BatchPut(writes []Write) error { + itr := db.iterator(false) + + for _, w := range writes { + if w.Value == nil { + itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET) + if itr.err == nil { + itr.err = itr.c.Del(0) + } + } else { + itr.err = itr.c.Put(w.Key, w.Value, 0) + } + + if itr.err != nil && itr.err != mdb.NotFound { + break + } + } + itr.setState() + + return itr.Close() +} + +func (db MDB) Get(key []byte) ([]byte, error) { + tx, err := db.env.BeginTxn(nil, mdb.RDONLY) + if err != nil { + return nil, err + } + defer tx.Commit() + + v, err := tx.Get(db.db, key) + if err == mdb.NotFound { + return nil, nil + } + return v, err +} + +func (db MDB) Delete(key []byte) error { + itr := db.iterator(false) + + itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET) + if itr.err == nil { + itr.err = itr.c.Del(0) + } + itr.setState() + return itr.Close() +} + +type MDBIterator struct { + key []byte + value []byte + c *mdb.Cursor + tx *mdb.Txn + valid bool + err error +} + +func (itr *MDBIterator) Key() []byte { + return itr.key +} + +func (itr *MDBIterator) Value() []byte { + return itr.value +} + +func (itr *MDBIterator) Valid() bool { + return itr.valid +} + +func (itr *MDBIterator) Error() error { + return itr.err +} + +func (itr *MDBIterator) getCurrent() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.GET_CURRENT) + itr.setState() +} + +func (itr *MDBIterator) Seek(key []byte) { + itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET_RANGE) + itr.setState() +} +func (itr *MDBIterator) Next() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.NEXT) + itr.setState() +} + +func (itr *MDBIterator) Prev() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.PREV) + itr.setState() +} + +func (itr *MDBIterator) First() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.FIRST) + itr.setState() +} + +func (itr *MDBIterator) Last() { + itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.LAST) + itr.setState() +} + +func (itr *MDBIterator) setState() { + if itr.err != nil { + if itr.err == mdb.NotFound { + itr.err = nil + } + itr.valid = false + } +} + +func (itr *MDBIterator) Close() error { + if err := itr.c.Close(); err != nil { + itr.tx.Abort() + return err + } + if itr.err != nil { + itr.tx.Abort() + return itr.err + } + return itr.tx.Commit() +} + +func (_ MDB) Name() string { + return "lmdb" +} + +func (db MDB) Path() string { + return db.path +} + +func (db MDB) Compact() { +} + +func (db MDB) iterator(rdonly bool) *MDBIterator { + flags := uint(0) + if rdonly { + flags = mdb.RDONLY + } + tx, err := db.env.BeginTxn(nil, flags) + if err != nil { + return &MDBIterator{nil, nil, nil, nil, false, err} + } + + c, err := tx.CursorOpen(db.db) + if err != nil { + tx.Abort() + return &MDBIterator{nil, nil, nil, nil, false, err} + } + + return &MDBIterator{nil, nil, c, tx, true, nil} +} + +func (db MDB) Close() error { + db.env.DBIClose(db.db) + if err := db.env.Close(); err != nil { + panic(err) + } + return nil +} + +func (db MDB) NewIterator() driver.IIterator { + return db.iterator(true) +} + +func (db MDB) NewWriteBatch() driver.IWriteBatch { + return WriteBatch{db, []Write{}} +} diff --git a/store/store.go b/store/store.go new file mode 100644 index 0000000..fb64bd2 --- /dev/null +++ b/store/store.go @@ -0,0 +1,61 @@ +package store + +import ( + "fmt" + "github.com/siddontang/ledisdb/store/driver" + "os" +) + +const DefaultStoreName = "lmdb" + +type Store interface { + Open(cfg *Config) (driver.IDB, error) + Repair(cfg *Config) error +} + +var dbs = map[string]Store{} + +func Register(name string, store Store) { + if _, ok := dbs[name]; ok { + panic(fmt.Errorf("db %s is registered", name)) + } + + dbs[name] = store +} + +func Open(cfg *Config) (*DB, error) { + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + if len(cfg.Name) == 0 { + cfg.Name = DefaultStoreName + } + + s, ok := dbs[cfg.Name] + if !ok { + return nil, fmt.Errorf("db %s is not registered", cfg.Name) + } + + idb, err := s.Open(cfg) + if err != nil { + return nil, err + } + + db := &DB{idb} + + return db, nil +} + +func Repair(cfg *Config) error { + if len(cfg.Name) == 0 { + cfg.Name = DefaultStoreName + } + + s, ok := dbs[cfg.Name] + if !ok { + return fmt.Errorf("db %s is not registered", cfg.Name) + } + + return s.Repair(cfg) +} diff --git a/leveldb/leveldb_test.go b/store/store_test.go similarity index 77% rename from leveldb/leveldb_test.go rename to store/store_test.go index 5da84d4..b6fc70c 100644 --- a/leveldb/leveldb_test.go +++ b/store/store_test.go @@ -1,4 +1,4 @@ -package leveldb +package store import ( "bytes" @@ -8,23 +8,17 @@ import ( "testing" ) -var testConfigJson = []byte(` - { - "path" : "./testdb", - "compression":true, - "block_size" : 32768, - "write_buffer_size" : 2097152, - "cache_size" : 20971520 - } - `) - var testOnce sync.Once var testDB *DB func getTestDB() *DB { f := func() { var err error - testDB, err = OpenWithJsonConfig(testConfigJson) + + cfg := new(Config) + cfg.Path = "/tmp/testdb" + + testDB, err = Open(cfg) if err != nil { println(err.Error()) panic(err) @@ -131,7 +125,11 @@ func checkIterator(it *RangeLimitIterator, cv ...int) error { func TestIterator(t *testing.T) { db := getTestDB() - db.Clear() + i := db.NewIterator() + for i.SeekToFirst(); i.Valid(); i.Next() { + db.Delete(i.Key()) + } + i.Close() for i := 0; i < 10; i++ { key := []byte(fmt.Sprintf("key_%d", i)) @@ -196,51 +194,6 @@ func TestIterator(t *testing.T) { } } -func TestSnapshot(t *testing.T) { - db := getTestDB() - - key := []byte("key") - value := []byte("hello world") - - db.Put(key, value) - - s := db.NewSnapshot() - defer s.Close() - - db.Put(key, []byte("hello world2")) - - if v, err := s.Get(key); err != nil { - t.Fatal(err) - } else if string(v) != string(value) { - t.Fatal(string(v)) - } -} - -func TestDestroy(t *testing.T) { - db := getTestDB() - - db.Put([]byte("a"), []byte("1")) - if err := db.Clear(); err != nil { - t.Fatal(err) - } - - if _, err := os.Stat(db.cfg.Path); err != nil { - t.Fatal("must exist ", err.Error()) - } - - if v, err := db.Get([]byte("a")); err != nil { - t.Fatal(err) - } else if string(v) == "1" { - t.Fatal(string(v)) - } - - db.Destroy() - - if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) { - t.Fatal("must not exist") - } -} - func TestCloseMore(t *testing.T) { cfg := new(Config) cfg.Path = "/tmp/testdb1234" @@ -256,4 +209,6 @@ func TestCloseMore(t *testing.T) { db.Close() } + + os.RemoveAll(cfg.Path) } diff --git a/store/writebatch.go b/store/writebatch.go new file mode 100644 index 0000000..d898a03 --- /dev/null +++ b/store/writebatch.go @@ -0,0 +1,9 @@ +package store + +import ( + "github.com/siddontang/ledisdb/store/driver" +) + +type WriteBatch struct { + driver.IWriteBatch +}