use our own leveldb

This commit is contained in:
siddontang 2014-06-19 17:19:40 +08:00
parent 13a2e0e690
commit 48e09a2727
26 changed files with 1233 additions and 59 deletions

View File

@ -2,6 +2,5 @@
. ./dev.sh
go get -u github.com/siddontang/go-leveldb/leveldb
go get -u github.com/siddontang/go-log/log
go get -u github.com/garyburd/redigo/redis

View File

@ -6,7 +6,8 @@
"compression": false,
"block_size": 32768,
"write_buffer_size": 67108864,
"cache_size": 524288000
"cache_size": 524288000,
"max_open_files":1024
}
},

View File

@ -4,7 +4,7 @@ import (
"bufio"
"bytes"
"encoding/binary"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"io"
"os"
)
@ -73,7 +73,9 @@ func (l *Ledis) Dump(w io.Writer) error {
return err
}
it := sp.Iterator(nil, nil, leveldb.RangeClose, 0, -1)
it := sp.NewIterator()
it.SeekToFirst()
var key []byte
var value []byte
for ; it.Valid(); it.Next() {

View File

@ -2,7 +2,7 @@ package ledis
import (
"bytes"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"os"
"testing"
)
@ -59,7 +59,7 @@ func TestDump(t *testing.T) {
t.Fatal(err)
}
it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1)
it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()

View File

@ -3,8 +3,8 @@ package ledis
import (
"encoding/json"
"fmt"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/leveldb"
"path"
"sync"
"time"

View File

@ -3,14 +3,14 @@ package ledis
import (
"bytes"
"fmt"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"os"
"path"
"testing"
)
func checkLedisEqual(master *Ledis, slave *Ledis) error {
it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1)
it := master.ldb.RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"time"
)
@ -132,7 +132,7 @@ func (db *DB) hDelete(t *tx, key []byte) int64 {
stop := db.hEncodeStopKey(key)
var num int64 = 0
it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
num++
@ -232,10 +232,11 @@ func (db *DB) HMset(key []byte, args ...FVPair) error {
return err
}
func (db *DB) HMget(key []byte, args [][]byte) ([]interface{}, error) {
func (db *DB) HMget(key []byte, args ...[]byte) ([]interface{}, error) {
var ek []byte
var v []byte
var err error
it := db.db.NewIterator()
defer it.Close()
r := make([]interface{}, len(args))
for i := 0; i < len(args); i++ {
@ -245,11 +246,7 @@ func (db *DB) HMget(key []byte, args [][]byte) ([]interface{}, error) {
ek = db.hEncodeHashKey(key, args[i])
if v, err = db.db.Get(ek); err != nil {
return nil, err
}
r[i] = v
r[i] = it.Find(ek)
}
return r, nil
@ -355,7 +352,7 @@ func (db *DB) HGetAll(key []byte) ([]interface{}, error) {
v := make([]interface{}, 0, 16)
it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
_, k, err := db.hDecodeHashKey(it.Key())
if err != nil {
@ -380,7 +377,7 @@ func (db *DB) HKeys(key []byte) ([]interface{}, error) {
v := make([]interface{}, 0, 16)
it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
_, k, err := db.hDecodeHashKey(it.Key())
if err != nil {
@ -404,7 +401,7 @@ func (db *DB) HValues(key []byte) ([]interface{}, error) {
v := make([]interface{}, 0, 16)
it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
v = append(v, it.Value())
}
@ -443,7 +440,7 @@ func (db *DB) hFlush() (drop int64, err error) {
maxKey[0] = db.index
maxKey[1] = hSizeType + 1
it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
@ -485,7 +482,7 @@ func (db *DB) HScan(key []byte, field []byte, count int, inclusive bool) ([]FVPa
rangeType = leveldb.RangeOpen
}
it := db.db.Iterator(minKey, maxKey, rangeType, 0, count)
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)
for ; it.Valid(); it.Next() {
if _, f, err := db.hDecodeHashKey(it.Key()); err != nil {
continue

View File

@ -32,11 +32,28 @@ func TestDBHash(t *testing.T) {
key := []byte("testdb_hash_a")
if n, err := db.HSet(key, []byte("a"), []byte("hello world")); err != nil {
if n, err := db.HSet(key, []byte("a"), []byte("hello world 1")); err != nil {
t.Fatal(err)
} else if n != 1 {
t.Fatal(n)
}
if n, err := db.HSet(key, []byte("b"), []byte("hello world 2")); err != nil {
t.Fatal(err)
} else if n != 1 {
t.Fatal(n)
}
ay, _ := db.HMget(key, []byte("a"), []byte("b"))
if v1, _ := ay[0].([]byte); string(v1) != "hello world 1" {
t.Fatal(string(v1))
}
if v2, _ := ay[1].([]byte); string(v2) != "hello world 2" {
t.Fatal(string(v2))
}
}
func TestDBHScan(t *testing.T) {

View File

@ -2,7 +2,7 @@ package ledis
import (
"errors"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"time"
)
@ -204,18 +204,15 @@ func (db *DB) IncryBy(key []byte, increment int64) (int64, error) {
func (db *DB) MGet(keys ...[]byte) ([]interface{}, error) {
values := make([]interface{}, len(keys))
var err error
var value []byte
it := db.db.NewIterator()
defer it.Close()
for i := range keys {
if err := checkKeySize(keys[i]); err != nil {
return nil, err
}
if value, err = db.db.Get(db.encodeKVKey(keys[i])); err != nil {
return nil, err
}
values[i] = value
values[i] = it.Find(db.encodeKVKey(keys[i]))
}
return values, nil
@ -319,7 +316,7 @@ func (db *DB) flush() (drop int64, err error) {
minKey := db.encodeKVMinKey()
maxKey := db.encodeKVMaxKey()
it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
@ -362,7 +359,7 @@ func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) {
rangeType = leveldb.RangeOpen
}
it := db.db.Iterator(minKey, maxKey, rangeType, 0, count)
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)
for ; it.Valid(); it.Next() {
if key, err := db.decodeKVKey(it.Key()); err != nil {
continue

View File

@ -19,11 +19,28 @@ func TestKVCodec(t *testing.T) {
func TestDBKV(t *testing.T) {
db := getTestDB()
key := []byte("testdb_kv_a")
key1 := []byte("testdb_kv_a")
if err := db.Set(key, []byte("hello world")); err != nil {
if err := db.Set(key1, []byte("hello world 1")); err != nil {
t.Fatal(err)
}
key2 := []byte("testdb_kv_b")
if err := db.Set(key2, []byte("hello world 2")); err != nil {
t.Fatal(err)
}
ay, _ := db.MGet(key1, key2)
if v1, _ := ay[0].([]byte); string(v1) != "hello world 1" {
t.Fatal(string(v1))
}
if v2, _ := ay[1].([]byte); string(v2) != "hello world 2" {
t.Fatal(string(v2))
}
}
func TestDBScan(t *testing.T) {

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"time"
)
@ -200,7 +200,7 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq)
it := db.db.Iterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
num++
@ -361,7 +361,7 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([]interface{}, error)
startKey := db.lEncodeListKey(key, startSeq)
stopKey := db.lEncodeListKey(key, stopSeq)
it := db.db.Iterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
v = append(v, it.Value())
}
@ -408,7 +408,7 @@ func (db *DB) lFlush() (drop int64, err error) {
maxKey[0] = db.index
maxKey[1] = lMetaType + 1
it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++

View File

@ -3,7 +3,7 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"time"
)
@ -119,7 +119,7 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
maxKey[0] = db.index
maxKey[1] = expMetaType + 1
it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
@ -173,7 +173,7 @@ func (eli *elimination) active() {
continue
}
it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for it.Valid() {
for i := 1; i < 512 && it.Valid(); i++ {
expKeys = append(expKeys, it.Key(), it.Value())

View File

@ -4,7 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"time"
)
@ -434,7 +434,7 @@ func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) {
rangeType := leveldb.RangeROpen
it := db.db.Iterator(minKey, maxKey, rangeType, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1)
var n int64 = 0
for ; it.Valid(); it.Next() {
n++
@ -459,16 +459,16 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
if s, err := Int64(v, err); err != nil {
return 0, err
} else {
var it *leveldb.Iterator
var it *leveldb.RangeLimitIterator
sk := db.zEncodeScoreKey(key, member, s)
if !reverse {
minKey := db.zEncodeStartScoreKey(key, MinScore)
it = db.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1)
it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1)
} else {
maxKey := db.zEncodeStopScoreKey(key, MaxScore)
it = db.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1)
it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1)
}
var lastKey []byte = nil
@ -492,14 +492,14 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
return -1, nil
}
func (db *DB) zIterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator {
func (db *DB) zIterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.RangeLimitIterator {
minKey := db.zEncodeStartScoreKey(key, min)
maxKey := db.zEncodeStopScoreKey(key, max)
if !reverse {
return db.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit)
return db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, limit)
} else {
return db.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit)
return db.db.RevRangeLimitIterator(minKey, maxKey, leveldb.RangeClose, offset, limit)
}
}
@ -567,7 +567,7 @@ func (db *DB) zRange(key []byte, min int64, max int64, withScores bool, offset i
}
v := make([]interface{}, 0, nv)
var it *leveldb.Iterator
var it *leveldb.RangeLimitIterator
//if reverse and offset is 0, limit < 0, we may use forward iterator then reverse
//because leveldb iterator prev is slower than next
@ -745,7 +745,7 @@ func (db *DB) zFlush() (drop int64, err error) {
maxKey[0] = db.index
maxKey[1] = zScoreType + 1
it := db.db.Iterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
@ -788,7 +788,7 @@ func (db *DB) ZScan(key []byte, member []byte, count int, inclusive bool) ([]Sco
rangeType = leveldb.RangeOpen
}
it := db.db.Iterator(minKey, maxKey, rangeType, 0, count)
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)
for ; it.Valid(); it.Next() {
if _, m, err := db.zDecodeSetKey(it.Key()); err != nil {
continue

View File

@ -1,7 +1,7 @@
package ledis
import (
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"sync"
)

59
leveldb/batch.go Normal file
View File

@ -0,0 +1,59 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include "leveldb/c.h"
import "C"
import (
"unsafe"
)
type WriteBatch struct {
db *DB
wbatch *C.leveldb_writebatch_t
}
func (w *WriteBatch) Close() {
C.leveldb_writebatch_destroy(w.wbatch)
}
func (w *WriteBatch) Put(key, value []byte) {
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv))
}
func (w *WriteBatch) Delete(key []byte) {
C.leveldb_writebatch_delete(w.wbatch,
(*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}
func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncWriteOpts)
}
func (w *WriteBatch) Rollback() {
C.leveldb_writebatch_clear(w.wbatch)
}
func (w *WriteBatch) commit(wb *WriteOptions) error {
var errStr *C.char
C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}

18
leveldb/cache.go Normal file
View File

@ -0,0 +1,18 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdint.h>
// #include "leveldb/c.h"
import "C"
type Cache struct {
Cache *C.leveldb_cache_t
}
func NewLRUCache(capacity int) *Cache {
return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))}
}
func (c *Cache) Close() {
C.leveldb_cache_destroy(c.Cache)
}

328
leveldb/db.go Normal file
View File

@ -0,0 +1,328 @@
package leveldb
/*
#cgo LDFLAGS: -lleveldb
#include <leveldb/c.h>
*/
import "C"
import (
"encoding/json"
"os"
"unsafe"
)
const defaultFilterBits int = 10
type Config struct {
Path string `json:"path"`
Compression bool `json:"compression"`
BlockSize int `json:"block_size"`
WriteBufferSize int `json:"write_buffer_size"`
CacheSize int `json:"cache_size"`
MaxOpenFiles int `json:"max_open_files"`
}
type DB struct {
cfg *Config
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncWriteOpts *WriteOptions
cache *Cache
filter *FilterPolicy
}
func Open(configJson json.RawMessage) (*DB, error) {
cfg := new(Config)
err := json.Unmarshal(configJson, cfg)
if err != nil {
return nil, err
}
return OpenWithConfig(cfg)
}
func OpenWithConfig(cfg *Config) (*DB, error) {
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
db := new(DB)
db.cfg = cfg
if err := db.open(); err != nil {
return nil, err
}
return db, nil
}
func (db *DB) open() error {
db.opts = db.initOptions(db.cfg)
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
db.syncWriteOpts = NewWriteOptions()
db.syncWriteOpts.SetSync(true)
var errStr *C.char
ldbname := C.CString(db.cfg.Path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) initOptions(cfg *Config) *Options {
opts := NewOptions()
opts.SetCreateIfMissing(true)
if cfg.CacheSize <= 0 {
cfg.CacheSize = 4 * 1024 * 1024
}
db.cache = NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
//we must use bloomfilter
db.filter = NewBloomFilter(defaultFilterBits)
opts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(NoCompression)
}
if cfg.BlockSize <= 0 {
cfg.BlockSize = 4 * 1024
}
opts.SetBlockSize(cfg.BlockSize)
if cfg.WriteBufferSize <= 0 {
cfg.WriteBufferSize = 4 * 1024 * 1024
}
opts.SetWriteBufferSize(cfg.WriteBufferSize)
if cfg.MaxOpenFiles < 1024 {
cfg.MaxOpenFiles = 1024
}
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
return opts
}
func (db *DB) Close() {
C.leveldb_close(db.db)
db.db = nil
db.opts.Close()
if db.cache != nil {
db.cache.Close()
}
if db.filter != nil {
db.filter.Close()
}
db.readOpts.Close()
db.writeOpts.Close()
db.iteratorOpts.Close()
db.syncWriteOpts.Close()
}
func (db *DB) Destroy() error {
path := db.cfg.Path
db.Close()
opts := NewOptions()
defer opts.Close()
var errStr *C.char
ldbname := C.CString(path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
C.leveldb_destroy_db(opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) Clear() error {
bc := db.NewWriteBatch()
defer bc.Close()
var err error
it := db.NewIterator()
it.SeekToFirst()
num := 0
for ; it.Valid(); it.Next() {
bc.Delete(it.Key())
num++
if num == 1000 {
num = 0
if err = bc.Commit(); err != nil {
return err
}
}
}
err = bc.Commit()
return err
}
func (db *DB) Put(key, value []byte) error {
return db.put(db.writeOpts, key, value)
}
func (db *DB) SyncPut(key, value []byte) error {
return db.put(db.syncWriteOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) {
return db.get(db.readOpts, key)
}
func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncWriteOpts, key)
}
func (db *DB) NewWriteBatch() *WriteBatch {
wb := &WriteBatch{
db: db,
wbatch: C.leveldb_writebatch_create(),
}
return wb
}
func (db *DB) NewSnapshot() *Snapshot {
s := &Snapshot{
db: db,
snap: C.leveldb_create_snapshot(db.db),
readOpts: NewReadOptions(),
iteratorOpts: NewReadOptions(),
}
s.readOpts.SetSnapshot(s)
s.iteratorOpts.SetSnapshot(s)
s.iteratorOpts.SetFillCache(false)
return s
}
func (db *DB) NewIterator() *Iterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt)
return it
}
func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorForward)
}
func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorBackward)
}
//limit < 0, unlimit
//offset must >= 0, if < 0, will get nothing
func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator {
return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorForward)
}
//limit < 0, unlimit
//offset must >= 0, if < 0, will get nothing
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator {
return newRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorBackward)
}
func (db *DB) put(wo *WriteOptions, key, value []byte) error {
var errStr *C.char
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.leveldb_put(
db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) {
var errStr *C.char
var vallen C.size_t
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
value := C.leveldb_get(
db.db, ro.Opt, k, C.size_t(len(key)), &vallen, &errStr)
if errStr != nil {
return nil, saveError(errStr)
}
if value == nil {
return nil, nil
}
defer C.leveldb_free(unsafe.Pointer(value))
return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil
}
func (db *DB) delete(wo *WriteOptions, key []byte) error {
var errStr *C.char
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
C.leveldb_delete(
db.db, wo.Opt, k, C.size_t(len(key)), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}

19
leveldb/filterpolicy.go Normal file
View File

@ -0,0 +1,19 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdlib.h>
// #include "leveldb/c.h"
import "C"
type FilterPolicy struct {
Policy *C.leveldb_filterpolicy_t
}
func NewBloomFilter(bitsPerKey int) *FilterPolicy {
policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey))
return &FilterPolicy{policy}
}
func (fp *FilterPolicy) Close() {
C.leveldb_filterpolicy_destroy(fp.Policy)
}

229
leveldb/iterator.go Normal file
View File

@ -0,0 +1,229 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdlib.h>
// #include "leveldb/c.h"
import "C"
import (
"bytes"
"unsafe"
)
const (
IteratorForward uint8 = 0
IteratorBackward uint8 = 1
)
const (
RangeClose uint8 = 0x00
RangeLOpen uint8 = 0x01
RangeROpen uint8 = 0x10
RangeOpen uint8 = 0x11
)
//min must less or equal than max
//range type:
//close: [min, max]
//open: (min, max)
//lopen: (min, max]
//ropen: [min, max)
type Range struct {
Min []byte
Max []byte
Type uint8
}
type Iterator struct {
it *C.leveldb_iterator_t
}
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return C.GoBytes(unsafe.Pointer(kdata), C.int(klen))
}
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen))
}
func (it *Iterator) Close() {
C.leveldb_iter_destroy(it.it)
it.it = nil
}
func (it *Iterator) Valid() bool {
return ucharToBool(C.leveldb_iter_valid(it.it))
}
func (it *Iterator) Next() {
C.leveldb_iter_next(it.it)
}
func (it *Iterator) Prev() {
C.leveldb_iter_prev(it.it)
}
func (it *Iterator) SeekToFirst() {
C.leveldb_iter_seek_to_first(it.it)
}
func (it *Iterator) SeekToLast() {
C.leveldb_iter_seek_to_last(it.it)
}
func (it *Iterator) Seek(key []byte) {
C.leveldb_iter_seek(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}
func (it *Iterator) Find(key []byte) []byte {
it.Seek(key)
if it.Valid() && bytes.Equal(it.Key(), key) {
return it.Value()
} else {
return nil
}
}
type RangeLimitIterator struct {
it *Iterator
r *Range
offset int
limit int
step int
//0 for IteratorForward, 1 for IteratorBackward
direction uint8
}
func (it *RangeLimitIterator) Key() []byte {
return it.it.Key()
}
func (it *RangeLimitIterator) Value() []byte {
return it.it.Value()
}
func (it *RangeLimitIterator) Valid() bool {
if it.offset < 0 {
return false
} else if !it.it.Valid() {
return false
} else if it.limit >= 0 && it.step >= it.limit {
return false
}
if it.direction == IteratorForward {
if it.r.Max != nil {
r := bytes.Compare(it.it.Key(), it.r.Max)
if it.r.Type&RangeROpen > 0 {
return !(r >= 0)
} else {
return !(r > 0)
}
}
} else {
if it.r.Min != nil {
r := bytes.Compare(it.it.Key(), it.r.Min)
if it.r.Type&RangeLOpen > 0 {
return !(r <= 0)
} else {
return !(r < 0)
}
}
}
return true
}
func (it *RangeLimitIterator) Next() {
it.step++
if it.direction == IteratorForward {
it.it.Next()
} else {
it.it.Prev()
}
}
func (it *RangeLimitIterator) Close() {
it.it.Close()
}
func newRangeLimitIterator(i *Iterator, r *Range, offset int, limit int, direction uint8) *RangeLimitIterator {
it := new(RangeLimitIterator)
it.it = i
it.r = r
it.offset = offset
it.limit = limit
it.direction = direction
it.step = 0
if offset < 0 {
return it
}
if direction == IteratorForward {
if r.Min == nil {
it.it.SeekToFirst()
} else {
it.it.Seek(r.Min)
if r.Type&RangeLOpen > 0 {
if it.it.Valid() && bytes.Equal(it.it.Key(), r.Min) {
it.it.Next()
}
}
}
} else {
if r.Max == nil {
it.it.SeekToLast()
} else {
it.it.Seek(r.Max)
if !it.it.Valid() {
it.it.SeekToLast()
} else {
if !bytes.Equal(it.it.Key(), r.Max) {
it.it.Prev()
}
}
if r.Type&RangeROpen > 0 {
if it.it.Valid() && bytes.Equal(it.it.Key(), r.Max) {
it.it.Prev()
}
}
}
}
for i := 0; i < offset; i++ {
if it.it.Valid() {
if it.direction == IteratorForward {
it.it.Next()
} else {
it.it.Prev()
}
}
}
return it
}

259
leveldb/leveldb_test.go Normal file
View File

@ -0,0 +1,259 @@
package leveldb
import (
"bytes"
"fmt"
"os"
"sync"
"testing"
)
var testConfigJson = []byte(`
{
"path" : "./testdb",
"compression":true,
"block_size" : 32768,
"write_buffer_size" : 2097152,
"cache_size" : 20971520
}
`)
var testOnce sync.Once
var testDB *DB
func getTestDB() *DB {
f := func() {
var err error
testDB, err = Open(testConfigJson)
if err != nil {
println(err.Error())
panic(err)
}
}
testOnce.Do(f)
return testDB
}
func TestSimple(t *testing.T) {
db := getTestDB()
key := []byte("key")
value := []byte("hello world")
if err := db.Put(key, value); err != nil {
t.Fatal(err)
}
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(v, value) {
t.Fatal("not equal")
}
if err := db.Delete(key); err != nil {
t.Fatal(err)
}
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
}
func TestBatch(t *testing.T) {
db := getTestDB()
key1 := []byte("key1")
key2 := []byte("key2")
value := []byte("hello world")
db.Put(key1, value)
db.Put(key2, value)
wb := db.NewWriteBatch()
defer wb.Close()
wb.Delete(key2)
wb.Put(key1, []byte("hello world2"))
if err := wb.Commit(); err != nil {
t.Fatal(err)
}
if v, err := db.Get(key2); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "hello world2" {
t.Fatal(string(v))
}
wb.Delete(key1)
wb.Rollback()
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "hello world2" {
t.Fatal(string(v))
}
db.Delete(key1)
}
func checkIterator(it *RangeLimitIterator, cv ...int) error {
v := make([]string, 0, len(cv))
for ; it.Valid(); it.Next() {
k := it.Key()
v = append(v, string(k))
}
it.Close()
if len(v) != len(cv) {
return fmt.Errorf("len error %d != %d", len(v), len(cv))
}
for k, i := range cv {
if fmt.Sprintf("key_%d", i) != v[k] {
return fmt.Errorf("%s, %d", v[k], i)
}
}
return nil
}
func TestIterator(t *testing.T) {
db := getTestDB()
db.Clear()
for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("key_%d", i))
value := []byte("")
db.Put(key, value)
}
var it *RangeLimitIterator
k := func(i int) []byte {
return []byte(fmt.Sprintf("key_%d", i))
}
it = db.RangeLimitIterator(k(1), k(5), RangeClose, 0, -1)
if err := checkIterator(it, 1, 2, 3, 4, 5); err != nil {
t.Fatal(err)
}
it = db.RangeLimitIterator(k(1), k(5), RangeClose, 1, 3)
if err := checkIterator(it, 2, 3, 4); err != nil {
t.Fatal(err)
}
it = db.RangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1)
if err := checkIterator(it, 2, 3, 4, 5); err != nil {
t.Fatal(err)
}
it = db.RangeLimitIterator(k(1), k(5), RangeROpen, 0, -1)
if err := checkIterator(it, 1, 2, 3, 4); err != nil {
t.Fatal(err)
}
it = db.RangeLimitIterator(k(1), k(5), RangeOpen, 0, -1)
if err := checkIterator(it, 2, 3, 4); err != nil {
t.Fatal(err)
}
it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 0, -1)
if err := checkIterator(it, 5, 4, 3, 2, 1); err != nil {
t.Fatal(err)
}
it = db.RevRangeLimitIterator(k(1), k(5), RangeClose, 1, 3)
if err := checkIterator(it, 4, 3, 2); err != nil {
t.Fatal(err)
}
it = db.RevRangeLimitIterator(k(1), k(5), RangeLOpen, 0, -1)
if err := checkIterator(it, 5, 4, 3, 2); err != nil {
t.Fatal(err)
}
it = db.RevRangeLimitIterator(k(1), k(5), RangeROpen, 0, -1)
if err := checkIterator(it, 4, 3, 2, 1); err != nil {
t.Fatal(err)
}
it = db.RevRangeLimitIterator(k(1), k(5), RangeOpen, 0, -1)
if err := checkIterator(it, 4, 3, 2); err != nil {
t.Fatal(err)
}
}
func TestSnapshot(t *testing.T) {
db := getTestDB()
key := []byte("key")
value := []byte("hello world")
db.Put(key, value)
s := db.NewSnapshot()
defer s.Close()
db.Put(key, []byte("hello world2"))
if v, err := s.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
}
func TestDestroy(t *testing.T) {
db := getTestDB()
db.Put([]byte("a"), []byte("1"))
if err := db.Clear(); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(db.cfg.Path); err != nil {
t.Fatal("must exist ", err.Error())
}
if v, err := db.Get([]byte("a")); err != nil {
t.Fatal(err)
} else if string(v) == "1" {
t.Fatal(string(v))
}
db.Destroy()
if _, err := os.Stat(db.cfg.Path); !os.IsNotExist(err) {
t.Fatal("must not exist")
}
}
func TestCloseMore(t *testing.T) {
cfg := new(Config)
cfg.Path = "/tmp/testdb1234"
cfg.CacheSize = 4 * 1024 * 1024
os.RemoveAll(cfg.Path)
for i := 0; i < 100; i++ {
db, err := OpenWithConfig(cfg)
if err != nil {
t.Fatal(err)
}
db.Put([]byte("key"), []byte("value"))
db.Close()
}
}

7
leveldb/levigo-license Normal file
View File

@ -0,0 +1,7 @@
Copyright (c) 2012 Jeffrey M Hodges
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

128
leveldb/options.go Normal file
View File

@ -0,0 +1,128 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include "leveldb/c.h"
import "C"
type CompressionOpt int
const (
NoCompression = CompressionOpt(0)
SnappyCompression = CompressionOpt(1)
)
type Options struct {
Opt *C.leveldb_options_t
}
type ReadOptions struct {
Opt *C.leveldb_readoptions_t
}
type WriteOptions struct {
Opt *C.leveldb_writeoptions_t
}
func NewOptions() *Options {
opt := C.leveldb_options_create()
return &Options{opt}
}
func NewReadOptions() *ReadOptions {
opt := C.leveldb_readoptions_create()
return &ReadOptions{opt}
}
func NewWriteOptions() *WriteOptions {
opt := C.leveldb_writeoptions_create()
return &WriteOptions{opt}
}
func (o *Options) Close() {
C.leveldb_options_destroy(o.Opt)
}
func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) {
C.leveldb_options_set_comparator(o.Opt, cmp)
}
func (o *Options) SetErrorIfExists(error_if_exists bool) {
eie := boolToUchar(error_if_exists)
C.leveldb_options_set_error_if_exists(o.Opt, eie)
}
func (o *Options) SetCache(cache *Cache) {
C.leveldb_options_set_cache(o.Opt, cache.Cache)
}
// func (o *Options) SetEnv(env *Env) {
// C.leveldb_options_set_env(o.Opt, env.Env)
// }
func (o *Options) SetInfoLog(log *C.leveldb_logger_t) {
C.leveldb_options_set_info_log(o.Opt, log)
}
func (o *Options) SetWriteBufferSize(s int) {
C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s))
}
func (o *Options) SetParanoidChecks(pc bool) {
C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc))
}
func (o *Options) SetMaxOpenFiles(n int) {
C.leveldb_options_set_max_open_files(o.Opt, C.int(n))
}
func (o *Options) SetBlockSize(s int) {
C.leveldb_options_set_block_size(o.Opt, C.size_t(s))
}
func (o *Options) SetBlockRestartInterval(n int) {
C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n))
}
func (o *Options) SetCompression(t CompressionOpt) {
C.leveldb_options_set_compression(o.Opt, C.int(t))
}
func (o *Options) SetCreateIfMissing(b bool) {
C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b))
}
func (o *Options) SetFilterPolicy(fp *FilterPolicy) {
var policy *C.leveldb_filterpolicy_t
if fp != nil {
policy = fp.Policy
}
C.leveldb_options_set_filter_policy(o.Opt, policy)
}
func (ro *ReadOptions) Close() {
C.leveldb_readoptions_destroy(ro.Opt)
}
func (ro *ReadOptions) SetVerifyChecksums(b bool) {
C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetFillCache(b bool) {
C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetSnapshot(snap *Snapshot) {
var s *C.leveldb_snapshot_t
if snap != nil {
s = snap.snap
}
C.leveldb_readoptions_set_snapshot(ro.Opt, s)
}
func (wo *WriteOptions) Close() {
C.leveldb_writeoptions_destroy(wo.Opt)
}
func (wo *WriteOptions) SetSync(b bool) {
C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b))
}

54
leveldb/snapshot.go Normal file
View File

@ -0,0 +1,54 @@
package leveldb
// #cgo LDFLAGS: -lleveldb
// #include <stdint.h>
// #include "leveldb/c.h"
import "C"
type Snapshot struct {
db *DB
snap *C.leveldb_snapshot_t
readOpts *ReadOptions
iteratorOpts *ReadOptions
}
func (s *Snapshot) Close() {
C.leveldb_release_snapshot(s.db.db, s.snap)
s.iteratorOpts.Close()
s.readOpts.Close()
}
func (s *Snapshot) Get(key []byte) ([]byte, error) {
return s.db.get(s.readOpts, key)
}
func (s *Snapshot) NewIterator() *Iterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(s.db.db, s.iteratorOpts.Opt)
return it
}
func (s *Snapshot) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorForward)
}
func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, 0, -1, IteratorBackward)
}
//limit < 0, unlimit
//offset must >= 0, if < 0, will get nothing
func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator {
return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorForward)
}
//limit < 0, unlimit
//offset must >= 0, if < 0, will get nothing
func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, limit int) *RangeLimitIterator {
return newRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, offset, limit, IteratorBackward)
}

43
leveldb/util.go Normal file
View File

@ -0,0 +1,43 @@
package leveldb
// #include "leveldb/c.h"
import "C"
import (
"fmt"
"reflect"
"unsafe"
)
func boolToUchar(b bool) C.uchar {
uc := C.uchar(0)
if b {
uc = C.uchar(1)
}
return uc
}
func ucharToBool(uc C.uchar) bool {
if uc == C.uchar(0) {
return false
}
return true
}
func saveError(errStr *C.char) error {
if errStr != nil {
gs := C.GoString(errStr)
C.leveldb_free(unsafe.Pointer(errStr))
return fmt.Errorf(gs)
}
return nil
}
func slice(p unsafe.Pointer, n int) []byte {
var b []byte
pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pbyte.Data = uintptr(p)
pbyte.Len = n
pbyte.Cap = n
return b
}

View File

@ -138,7 +138,7 @@ func hmgetCommand(c *client) error {
return ErrCmdParams
}
if v, err := c.db.HMget(args[0], args[1:]); err != nil {
if v, err := c.db.HMget(args[0], args[1:]...); err != nil {
return err
} else {
c.writeArray(v)

View File

@ -3,14 +3,14 @@ package server
import (
"bytes"
"fmt"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/ledisdb/leveldb"
"os"
"testing"
"time"
)
func checkDataEqual(master *App, slave *App) error {
it := master.ldb.DataDB().Iterator(nil, nil, leveldb.RangeClose, 0, -1)
it := master.ldb.DataDB().RangeLimitIterator(nil, nil, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()