Merge branch 'develop'

This commit is contained in:
siddontang 2014-10-24 16:39:58 +08:00
commit 3c9ef638dc
9 changed files with 364 additions and 61 deletions

View File

@ -1,7 +1,6 @@
package main
import (
crand "crypto/rand"
"flag"
"fmt"
"github.com/siddontang/ledisdb/client/go/ledis"
@ -39,10 +38,10 @@ func waitBench(cmd string, args ...interface{}) {
func bench(cmd string, f func()) {
wg.Add(*clients)
t1 := time.Now().UnixNano()
t1 := time.Now()
for i := 0; i < *clients; i++ {
go func() {
for i := 0; i < loop; i++ {
for j := 0; j < loop; j++ {
f()
}
wg.Done()
@ -51,11 +50,9 @@ func bench(cmd string, f func()) {
wg.Wait()
t2 := time.Now().UnixNano()
t2 := time.Now()
delta := float64(t2-t1) / float64(time.Second)
fmt.Printf("%s: %0.2f requests per second\n", cmd, (float64(*number) / delta))
fmt.Printf("%s: %0.2f op/s\n", cmd, (float64(*number) / t2.Sub(t1).Seconds()))
}
var kvSetBase int64 = 0
@ -66,7 +63,6 @@ var kvDelBase int64 = 0
func benchSet() {
f := func() {
value := make([]byte, *valueSize)
crand.Read(value)
n := atomic.AddInt64(&kvSetBase, 1)
waitBench("set", n, value)
}
@ -104,7 +100,6 @@ func benchDel() {
func benchPushList() {
f := func() {
value := make([]byte, 100)
crand.Read(value)
waitBench("rpush", "mytestlist", value)
}
@ -151,7 +146,6 @@ var hashDelBase int64 = 0
func benchHset() {
f := func() {
value := make([]byte, 100)
crand.Read(value)
n := atomic.AddInt64(&hashSetBase, 1)
waitBench("hset", "myhashkey", n, value)
@ -194,7 +188,6 @@ var zsetIncrBase int64 = 0
func benchZAdd() {
f := func() {
member := make([]byte, 16)
crand.Read(member)
n := atomic.AddInt64(&zsetAddBase, 1)
waitBench("zadd", "myzsetkey", n, member)
}

View File

@ -0,0 +1,143 @@
package main
import (
"flag"
"fmt"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
)
var KB = config.KB
var MB = config.MB
var GB = config.GB
var name = flag.String("db_name", "goleveldb", "db name")
var number = flag.Int("n", 10000, "request number")
var clients = flag.Int("c", 50, "number of clients")
var round = flag.Int("r", 1, "benchmark round number")
var valueSize = flag.Int("vsize", 100, "kv value size")
var wg sync.WaitGroup
var db *store.DB
var loop int = 0
func bench(cmd string, f func()) {
wg.Add(*clients)
t1 := time.Now()
for i := 0; i < *clients; i++ {
go func() {
for j := 0; j < loop; j++ {
f()
}
wg.Done()
}()
}
wg.Wait()
t2 := time.Now()
d := t2.Sub(t1)
fmt.Printf("%s: %0.3f micros/op, %0.2fmb/s\n", cmd, float64(d.Nanoseconds()/1e3)/float64(*number),
float64((*valueSize+16)*(*number))/(1024.0*1024.0*(d.Seconds())))
}
var kvSetBase int64 = 0
var kvGetBase int64 = 0
func benchSet() {
f := func() {
value := make([]byte, *valueSize)
n := atomic.AddInt64(&kvSetBase, 1)
db.Put(num.Int64ToBytes(n), value)
}
bench("set", f)
}
func benchGet() {
f := func() {
n := atomic.AddInt64(&kvGetBase, 1)
v, err := db.Get(num.Int64ToBytes(n))
if err != nil {
println(err.Error())
} else if len(v) != *valueSize {
println(len(v), *valueSize)
}
}
bench("get", f)
}
func setRocksDB(cfg *config.RocksDBConfig) {
cfg.BlockSize = 64 * KB
cfg.WriteBufferSize = 64 * MB
cfg.MaxWriteBufferNum = 2
cfg.MaxBytesForLevelBase = 512 * MB
cfg.TargetFileSizeBase = 64 * MB
cfg.BackgroundThreads = 4
cfg.HighPriorityBackgroundThreads = 1
cfg.MaxBackgroundCompactions = 3
cfg.MaxBackgroundFlushes = 1
cfg.CacheSize = 512 * MB
cfg.EnableStatistics = true
cfg.StatsDumpPeriodSec = 5
cfg.Level0FileNumCompactionTrigger = 8
cfg.MaxBytesForLevelMultiplier = 8
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse()
cfg := config.NewConfigDefault()
cfg.DBPath = "./var/store_test"
cfg.DBName = *name
os.RemoveAll(cfg.DBPath)
cfg.LevelDB.BlockSize = 32 * KB
cfg.LevelDB.CacheSize = 512 * MB
cfg.LevelDB.WriteBufferSize = 64 * MB
cfg.LevelDB.MaxOpenFiles = 1000
setRocksDB(&cfg.RocksDB)
var err error
db, err = store.Open(cfg)
if err != nil {
panic(err)
return
}
if *number <= 0 {
panic("invalid number")
return
}
if *clients <= 0 || *number < *clients {
panic("invalid client number")
return
}
loop = *number / *clients
if *round <= 0 {
*round = 1
}
for i := 0; i < *round; i++ {
benchSet()
benchGet()
println("")
}
}

View File

@ -19,6 +19,10 @@ const (
DefaultDBName string = "goleveldb"
DefaultDataDir string = "./var"
KB int = 1024
MB int = KB * 1024
GB int = MB * 1024
)
type LevelDBConfig struct {
@ -29,6 +33,34 @@ type LevelDBConfig struct {
MaxOpenFiles int `toml:"max_open_files"`
}
type RocksDBConfig struct {
Compression int `toml:"compression"`
BlockSize int `toml:"block_size"`
WriteBufferSize int `toml:"write_buffer_size"`
CacheSize int `toml:"cache_size"`
MaxOpenFiles int `toml:"max_open_files"`
MaxWriteBufferNum int `toml:"max_write_buffer_num"`
MinWriteBufferNumberToMerge int `toml:"min_write_buffer_number_to_merge"`
NumLevels int `toml:"num_levels"`
Level0FileNumCompactionTrigger int `toml:"level0_file_num_compaction_trigger"`
Level0SlowdownWritesTrigger int `toml:"level0_slowdown_writes_trigger"`
Level0StopWritesTrigger int `toml:"level0_stop_writes_trigger"`
TargetFileSizeBase int `toml:"target_file_size_base"`
TargetFileSizeMultiplier int `toml:"target_file_size_multiplier"`
MaxBytesForLevelBase int `toml:"max_bytes_for_level_base"`
MaxBytesForLevelMultiplier int `toml:"max_bytes_for_level_multiplier"`
DisableAutoCompactions bool `toml:"disable_auto_compactions"`
DisableDataSync bool `toml:"disable_data_sync"`
UseFsync bool `toml:"use_fsync"`
MaxBackgroundCompactions int `toml:"max_background_compactions"`
MaxBackgroundFlushes int `toml:"max_background_flushes"`
AllowOsBuffer bool `toml:"allow_os_buffer"`
EnableStatistics bool `toml:"enable_statistics"`
StatsDumpPeriodSec int `toml:"stats_dump_period_sec"`
BackgroundThreads int `toml:"background_theads"`
HighPriorityBackgroundThreads int `toml:"high_priority_background_threads"`
}
type LMDBConfig struct {
MapSize int `toml:"map_size"`
NoSync bool `toml:"nosync"`
@ -67,6 +99,7 @@ type Config struct {
DBSyncCommit int `toml:"db_sync_commit"`
LevelDB LevelDBConfig `toml:"leveldb"`
RocksDB RocksDBConfig `toml:"rocksdb"`
LMDB LMDBConfig `toml:"lmdb"`
@ -121,7 +154,7 @@ func NewConfigDefault() *Config {
// disable access log
cfg.AccessLog = ""
cfg.LMDB.MapSize = 20 * 1024 * 1024
cfg.LMDB.MapSize = 20 * MB
cfg.LMDB.NoSync = true
cfg.UseReplication = false
@ -131,31 +164,60 @@ func NewConfigDefault() *Config {
cfg.Replication.SyncLog = 0
cfg.Snapshot.MaxNum = 1
cfg.RocksDB.AllowOsBuffer = true
cfg.RocksDB.EnableStatistics = false
cfg.RocksDB.UseFsync = false
cfg.RocksDB.DisableAutoCompactions = false
cfg.RocksDB.AllowOsBuffer = true
cfg.adjust()
return cfg
}
func getDefault(d int, s int) int {
if s <= 0 {
return d
} else {
return s
}
}
func (cfg *Config) adjust() {
if cfg.LevelDB.CacheSize <= 0 {
cfg.LevelDB.CacheSize = 4 * 1024 * 1024
cfg.LevelDB.adjust()
cfg.RocksDB.adjust()
cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays)
}
if cfg.LevelDB.BlockSize <= 0 {
cfg.LevelDB.BlockSize = 4 * 1024
func (cfg *LevelDBConfig) adjust() {
cfg.CacheSize = getDefault(4*MB, cfg.CacheSize)
cfg.BlockSize = getDefault(4*KB, cfg.BlockSize)
cfg.WriteBufferSize = getDefault(4*MB, cfg.WriteBufferSize)
cfg.MaxOpenFiles = getDefault(1024, cfg.MaxOpenFiles)
}
if cfg.LevelDB.WriteBufferSize <= 0 {
cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024
}
if cfg.LevelDB.MaxOpenFiles < 1024 {
cfg.LevelDB.MaxOpenFiles = 1024
}
if cfg.Replication.ExpiredLogDays <= 0 {
cfg.Replication.ExpiredLogDays = 7
}
func (cfg *RocksDBConfig) adjust() {
cfg.CacheSize = getDefault(4*MB, cfg.CacheSize)
cfg.BlockSize = getDefault(4*KB, cfg.BlockSize)
cfg.WriteBufferSize = getDefault(4*MB, cfg.WriteBufferSize)
cfg.MaxOpenFiles = getDefault(1024, cfg.MaxOpenFiles)
cfg.MaxWriteBufferNum = getDefault(2, cfg.MaxWriteBufferNum)
cfg.MinWriteBufferNumberToMerge = getDefault(1, cfg.MinWriteBufferNumberToMerge)
cfg.NumLevels = getDefault(7, cfg.NumLevels)
cfg.Level0FileNumCompactionTrigger = getDefault(4, cfg.Level0FileNumCompactionTrigger)
cfg.Level0SlowdownWritesTrigger = getDefault(16, cfg.Level0SlowdownWritesTrigger)
cfg.Level0StopWritesTrigger = getDefault(64, cfg.Level0StopWritesTrigger)
cfg.TargetFileSizeBase = getDefault(32*MB, cfg.TargetFileSizeBase)
cfg.TargetFileSizeMultiplier = getDefault(1, cfg.TargetFileSizeMultiplier)
cfg.MaxBytesForLevelBase = getDefault(32*MB, cfg.MaxBytesForLevelBase)
cfg.MaxBytesForLevelMultiplier = getDefault(1, cfg.MaxBytesForLevelMultiplier)
cfg.MaxBackgroundCompactions = getDefault(1, cfg.MaxBackgroundCompactions)
cfg.MaxBackgroundFlushes = getDefault(1, cfg.MaxBackgroundFlushes)
cfg.StatsDumpPeriodSec = getDefault(3600, cfg.StatsDumpPeriodSec)
cfg.BackgroundThreads = getDefault(2, cfg.BackgroundThreads)
cfg.HighPriorityBackgroundThreads = getDefault(1, cfg.HighPriorityBackgroundThreads)
}
func (cfg *Config) Dump(w io.Writer) error {

View File

@ -44,12 +44,46 @@ db_sync_commit = 0
use_replication = true
[leveldb]
# for leveldb and goleveldb
compression = false
block_size = 32768
write_buffer_size = 67108864
cache_size = 524288000
max_open_files = 1024
[rocksdb]
# rocksdb has many many configurations,
# we only list little now, but may add more later.
# good luck!
# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc
compression = 0
block_size = 65536
write_buffer_size = 67108864
cache_size = 524288000
max_open_files = 1024
max_write_buffer_num = 2
min_write_buffer_number_to_merge = 1
num_levels = 7
level0_file_num_compaction_trigger = 8
level0_slowdown_writes_trigger = 16
level0_stop_writes_trigger = 64
target_file_size_base = 67108864
target_file_size_multiplier = 1
max_bytes_for_level_base = 536870912
max_bytes_for_level_multiplier = 8
disable_auto_compactions = false
disable_data_sync = false
use_fsync = false
background_theads = 4
high_priority_background_threads = 1
max_background_compactions = 3
max_background_flushes = 1
allow_os_buffer = true
enable_statistics = false
stats_dump_period_sec = 3600
[lmdb]
map_size = 524288000
nosync = true

View File

@ -44,12 +44,46 @@ db_sync_commit = 0
use_replication = true
[leveldb]
# for leveldb and goleveldb
compression = false
block_size = 32768
write_buffer_size = 67108864
cache_size = 524288000
max_open_files = 1024
[rocksdb]
# rocksdb has many many configurations,
# we only list little now, but may add more later.
# good luck!
# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc
compression = 0
block_size = 65536
write_buffer_size = 67108864
cache_size = 524288000
max_open_files = 1024
max_write_buffer_num = 2
min_write_buffer_number_to_merge = 1
num_levels = 7
level0_file_num_compaction_trigger = 8
level0_slowdown_writes_trigger = 16
level0_stop_writes_trigger = 64
target_file_size_base = 67108864
target_file_size_multiplier = 1
max_bytes_for_level_base = 536870912
max_bytes_for_level_multiplier = 8
disable_auto_compactions = false
disable_data_sync = false
use_fsync = false
background_theads = 4
high_priority_background_threads = 1
max_background_compactions = 3
max_background_flushes = 1
allow_os_buffer = true
enable_statistics = false
stats_dump_period_sec = 3600
[lmdb]
map_size = 524288000
nosync = true

View File

@ -14,7 +14,6 @@ import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
@ -188,9 +187,6 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch {
wbatch: C.leveldb_writebatch_create(),
}
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb
}

View File

@ -15,7 +15,6 @@ import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
@ -35,7 +34,7 @@ func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
db := new(DB)
db.path = path
db.cfg = &cfg.LevelDB
db.cfg = &cfg.RocksDB
if err := db.open(); err != nil {
return nil, err
@ -47,7 +46,7 @@ func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
func (s Store) Repair(path string, cfg *config.Config) error {
db := new(DB)
db.path = path
db.cfg = &cfg.LevelDB
db.cfg = &cfg.RocksDB
err := db.open()
defer db.Close()
@ -71,7 +70,7 @@ func (s Store) Repair(path string, cfg *config.Config) error {
type DB struct {
path string
cfg *config.LevelDBConfig
cfg *config.RocksDBConfig
db *C.rocksdb_t
@ -107,15 +106,15 @@ func (db *DB) open() error {
return nil
}
func (db *DB) initOptions(cfg *config.LevelDBConfig) {
func (db *DB) initOptions(cfg *config.RocksDBConfig) {
opts := NewOptions()
blockOpts := NewBlockBasedTableOptions()
opts.SetCreateIfMissing(true)
db.env = NewDefaultEnv()
db.env.SetBackgroundThreads(runtime.NumCPU() * 2)
db.env.SetHighPriorityBackgroundThreads(1)
db.env.SetBackgroundThreads(cfg.BackgroundThreads)
db.env.SetHighPriorityBackgroundThreads(cfg.HighPriorityBackgroundThreads)
opts.SetEnv(db.env)
db.cache = NewLRUCache(cfg.CacheSize)
@ -124,28 +123,28 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
//we must use bloomfilter
db.filter = NewBloomFilter(defaultFilterBits)
blockOpts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
}
blockOpts.SetBlockSize(cfg.BlockSize)
opts.SetWriteBufferSize(cfg.WriteBufferSize)
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1)
opts.SetMaxBackgroundFlushes(1)
opts.SetLevel0SlowdownWritesTrigger(16)
opts.SetLevel0StopWritesTrigger(64)
opts.SetTargetFileSizeBase(32 * 1024 * 1024)
opts.SetBlockBasedTableFactory(blockOpts)
opts.SetCompression(CompressionOpt(cfg.Compression))
opts.SetWriteBufferSize(cfg.WriteBufferSize)
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
opts.SetMaxBackgroundCompactions(cfg.MaxBackgroundCompactions)
opts.SetMaxBackgroundFlushes(cfg.MaxBackgroundFlushes)
opts.SetLevel0SlowdownWritesTrigger(cfg.Level0SlowdownWritesTrigger)
opts.SetLevel0StopWritesTrigger(cfg.Level0StopWritesTrigger)
opts.SetTargetFileSizeBase(cfg.TargetFileSizeBase)
opts.SetTargetFileSizeMultiplier(cfg.TargetFileSizeMultiplier)
opts.SetMaxBytesForLevelBase(cfg.MaxBytesForLevelBase)
opts.SetMaxBytesForLevelMultiplier(cfg.MaxBytesForLevelMultiplier)
opts.DisableDataSync(cfg.DisableDataSync)
opts.SetMinWriteBufferNumberToMerge(cfg.MinWriteBufferNumberToMerge)
opts.DisableAutoCompactions(cfg.DisableAutoCompactions)
opts.EnableStatistics(cfg.EnableStatistics)
opts.UseFsync(cfg.UseFsync)
opts.AllowOsBuffer(cfg.AllowOsBuffer)
opts.SetStatsDumpPeriodSec(cfg.StatsDumpPeriodSec)
db.opts = opts
db.blockOpts = blockOpts
@ -214,10 +213,6 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch {
wbatch: C.rocksdb_writebatch_create(),
}
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb
}

View File

@ -11,6 +11,10 @@ type CompressionOpt int
const (
NoCompression = CompressionOpt(0)
SnappyCompression = CompressionOpt(1)
ZlibCompression = CompressionOpt(2)
Bz2Compression = CompressionOpt(3)
Lz4Compression = CompressionOpt(4)
Lz4hcCompression = CompressionOpt(5)
)
type Options struct {
@ -134,6 +138,36 @@ func (o *Options) SetBlockBasedTableFactory(opt *BlockBasedTableOptions) {
C.rocksdb_options_set_block_based_table_factory(o.Opt, opt.Opt)
}
func (o *Options) SetMinWriteBufferNumberToMerge(n int) {
C.rocksdb_options_set_min_write_buffer_number_to_merge(o.Opt, C.int(n))
}
func (o *Options) DisableDataSync(b bool) {
C.rocksdb_options_set_disable_data_sync(o.Opt, boolToInt(b))
}
func (o *Options) DisableAutoCompactions(b bool) {
C.rocksdb_options_set_disable_auto_compactions(o.Opt, boolToInt(b))
}
func (o *Options) UseFsync(b bool) {
C.rocksdb_options_set_use_fsync(o.Opt, boolToInt(b))
}
func (o *Options) AllowOsBuffer(b bool) {
C.rocksdb_options_set_allow_os_buffer(o.Opt, boolToUchar(b))
}
func (o *Options) EnableStatistics(b bool) {
if b {
C.rocksdb_options_enable_statistics(o.Opt)
}
}
func (o *Options) SetStatsDumpPeriodSec(n int) {
C.rocksdb_options_set_stats_dump_period_sec(o.Opt, C.uint(n))
}
func (o *BlockBasedTableOptions) Close() {
C.rocksdb_block_based_options_destroy(o.Opt)
}
@ -185,3 +219,7 @@ func (wo *WriteOptions) Close() {
func (wo *WriteOptions) SetSync(b bool) {
C.rocksdb_writeoptions_set_sync(wo.Opt, boolToUchar(b))
}
func (wo *WriteOptions) DisableWAL(b bool) {
C.rocksdb_writeoptions_disable_WAL(wo.Opt, boolToInt(b))
}

View File

@ -27,6 +27,14 @@ func ucharToBool(uc C.uchar) bool {
return true
}
func boolToInt(b bool) C.int {
uc := C.int(0)
if b {
uc = C.int(1)
}
return uc
}
func saveError(errStr *C.char) error {
if errStr != nil {
gs := C.GoString(errStr)