add rocksdb config

This commit is contained in:
siddontang 2014-10-24 13:01:13 +08:00
parent dbb725c678
commit 1cb40204fa
9 changed files with 252 additions and 66 deletions

View File

@ -39,7 +39,7 @@ func waitBench(cmd string, args ...interface{}) {
func bench(cmd string, f func()) { func bench(cmd string, f func()) {
wg.Add(*clients) wg.Add(*clients)
t1 := time.Now().UnixNano() t1 := time.Now()
for i := 0; i < *clients; i++ { for i := 0; i < *clients; i++ {
go func() { go func() {
for i := 0; i < loop; i++ { for i := 0; i < loop; i++ {
@ -51,11 +51,9 @@ func bench(cmd string, f func()) {
wg.Wait() wg.Wait()
t2 := time.Now().UnixNano() t2 := time.Now()
delta := float64(t2-t1) / float64(time.Second) fmt.Printf("%s: %0.2f op/s\n", cmd, (float64(*number) / t2.Sub(t1).Seconds()))
fmt.Printf("%s: %0.2f requests per second\n", cmd, (float64(*number) / delta))
} }
var kvSetBase int64 = 0 var kvSetBase int64 = 0

View File

@ -14,8 +14,12 @@ import (
"time" "time"
) )
var KB = config.KB
var MB = config.MB
var GB = config.GB
var name = flag.String("db_name", "goleveldb", "db name") var name = flag.String("db_name", "goleveldb", "db name")
var number = flag.Int("n", 1000, "request number") var number = flag.Int("n", 10000, "request number")
var clients = flag.Int("c", 50, "number of clients") var clients = flag.Int("c", 50, "number of clients")
var round = flag.Int("r", 1, "benchmark round number") var round = flag.Int("r", 1, "benchmark round number")
var valueSize = flag.Int("vsize", 100, "kv value size") var valueSize = flag.Int("vsize", 100, "kv value size")
@ -28,7 +32,7 @@ var loop int = 0
func bench(cmd string, f func()) { func bench(cmd string, f func()) {
wg.Add(*clients) wg.Add(*clients)
t1 := time.Now().UnixNano() t1 := time.Now()
for i := 0; i < *clients; i++ { for i := 0; i < *clients; i++ {
go func() { go func() {
for i := 0; i < loop; i++ { for i := 0; i < loop; i++ {
@ -40,11 +44,10 @@ func bench(cmd string, f func()) {
wg.Wait() wg.Wait()
t2 := time.Now().UnixNano() t2 := time.Now()
delta := float64(t2-t1) / float64(time.Second) fmt.Printf("%s: %0.2f op/s, %0.2fmb/s\n", cmd, (float64(*number) / t2.Sub(t1).Seconds()),
float64(*valueSize*(*number))/(1024.0*1024.0*(t2.Sub(t1).Seconds())))
fmt.Printf("%s: %0.2f requests per second\n", cmd, (float64(*number) / delta))
} }
var kvSetBase int64 = 0 var kvSetBase int64 = 0
@ -76,18 +79,36 @@ func benchGet() {
bench("get", f) bench("get", f)
} }
func setRocksDB(cfg *config.RocksDBConfig) {
cfg.BlockSize = 64 * KB
cfg.WriteBufferSize = 64 * MB
cfg.MaxWriteBufferNum = 2
cfg.MaxBytesForLevelBase = 512 * MB
cfg.TargetFileSizeBase = 64 * MB
cfg.BackgroundThreads = 4
cfg.HighPriorityBackgroundThreads = 1
cfg.MaxBackgroundCompactions = 3
cfg.MaxBackgroundFlushes = 1
cfg.CacheSize = 512 * MB
cfg.EnableStatistics = true
cfg.StatsDumpPeriodSec = 5
}
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse() flag.Parse()
cfg := config.NewConfigDefault() cfg := config.NewConfigDefault()
cfg.DBPath = "./store_test" cfg.DBPath = "./var/store_test"
cfg.DBName = *name
os.RemoveAll(cfg.DBPath) os.RemoveAll(cfg.DBPath)
defer os.RemoveAll(cfg.DBPath)
cfg.LevelDB.BlockSize = 32 * 1024 cfg.LevelDB.BlockSize = 32 * KB
cfg.LevelDB.CacheSize = 512 * 1024 * 1024 cfg.LevelDB.CacheSize = 512 * MB
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024 cfg.LevelDB.WriteBufferSize = 64 * MB
cfg.LevelDB.MaxOpenFiles = 1000
setRocksDB(&cfg.RocksDB)
var err error var err error
db, err = store.Open(cfg) db, err = store.Open(cfg)

View File

@ -19,6 +19,10 @@ const (
DefaultDBName string = "goleveldb" DefaultDBName string = "goleveldb"
DefaultDataDir string = "./var" DefaultDataDir string = "./var"
KB int = 1024
MB int = KB * 1024
GB int = MB * 1024
) )
type LevelDBConfig struct { type LevelDBConfig struct {
@ -29,6 +33,34 @@ type LevelDBConfig struct {
MaxOpenFiles int `toml:"max_open_files"` MaxOpenFiles int `toml:"max_open_files"`
} }
type RocksDBConfig struct {
Compression int `toml:"compression"`
BlockSize int `toml:"block_size"`
WriteBufferSize int `toml:"write_buffer_size"`
CacheSize int `toml:"cache_size"`
MaxOpenFiles int `toml:"max_open_files"`
MaxWriteBufferNum int `toml:"max_write_buffer_num"`
MinWriteBufferNumberToMerge int `toml:"min_write_buffer_number_to_merge"`
NumLevels int `toml:"num_levels"`
Level0FileNumCompactionTrigger int `toml:"level0_file_num_compaction_trigger"`
Level0SlowdownWritesTrigger int `toml:"level0_slowdown_writes_trigger"`
Level0StopWritesTrigger int `toml:"level0_stop_writes_trigger"`
TargetFileSizeBase int `toml:"target_file_size_base"`
TargetFileSizeMultiplier int `toml:"target_file_size_multiplier"`
MaxBytesForLevelBase int `toml:"max_bytes_for_level_base"`
MaxBytesForLevelMultiplier int `toml:"max_bytes_for_level_multiplier"`
DisableAutoCompactions bool `toml:"disable_auto_compactions"`
DisableDataSync bool `toml:"disable_data_sync"`
UseFsync bool `toml:"use_fsync"`
MaxBackgroundCompactions int `toml:"max_background_compactions"`
MaxBackgroundFlushes int `toml:"max_background_flushes"`
AllowOsBuffer bool `toml:"allow_os_buffer"`
EnableStatistics bool `toml:"enable_statistics"`
StatsDumpPeriodSec int `toml:"stats_dump_period_sec"`
BackgroundThreads int `toml:"background_theads"`
HighPriorityBackgroundThreads int `toml:"high_priority_background_threads"`
}
type LMDBConfig struct { type LMDBConfig struct {
MapSize int `toml:"map_size"` MapSize int `toml:"map_size"`
NoSync bool `toml:"nosync"` NoSync bool `toml:"nosync"`
@ -67,6 +99,7 @@ type Config struct {
DBSyncCommit int `toml:"db_sync_commit"` DBSyncCommit int `toml:"db_sync_commit"`
LevelDB LevelDBConfig `toml:"leveldb"` LevelDB LevelDBConfig `toml:"leveldb"`
RocksDB RocksDBConfig `toml:"rocksdb"`
LMDB LMDBConfig `toml:"lmdb"` LMDB LMDBConfig `toml:"lmdb"`
@ -121,7 +154,7 @@ func NewConfigDefault() *Config {
// disable access log // disable access log
cfg.AccessLog = "" cfg.AccessLog = ""
cfg.LMDB.MapSize = 20 * 1024 * 1024 cfg.LMDB.MapSize = 20 * MB
cfg.LMDB.NoSync = true cfg.LMDB.NoSync = true
cfg.UseReplication = false cfg.UseReplication = false
@ -131,31 +164,60 @@ func NewConfigDefault() *Config {
cfg.Replication.SyncLog = 0 cfg.Replication.SyncLog = 0
cfg.Snapshot.MaxNum = 1 cfg.Snapshot.MaxNum = 1
cfg.RocksDB.AllowOsBuffer = true
cfg.RocksDB.EnableStatistics = false
cfg.RocksDB.UseFsync = false
cfg.RocksDB.DisableAutoCompactions = false
cfg.RocksDB.AllowOsBuffer = true
cfg.adjust() cfg.adjust()
return cfg return cfg
} }
func getDefault(d int, s int) int {
if s <= 0 {
return d
} else {
return s
}
}
func (cfg *Config) adjust() { func (cfg *Config) adjust() {
if cfg.LevelDB.CacheSize <= 0 { cfg.LevelDB.adjust()
cfg.LevelDB.CacheSize = 4 * 1024 * 1024
cfg.RocksDB.adjust()
cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays)
} }
if cfg.LevelDB.BlockSize <= 0 { func (cfg *LevelDBConfig) adjust() {
cfg.LevelDB.BlockSize = 4 * 1024 cfg.CacheSize = getDefault(4*MB, cfg.CacheSize)
cfg.BlockSize = getDefault(4*KB, cfg.BlockSize)
cfg.WriteBufferSize = getDefault(4*MB, cfg.WriteBufferSize)
cfg.MaxOpenFiles = getDefault(1024, cfg.MaxOpenFiles)
} }
if cfg.LevelDB.WriteBufferSize <= 0 { func (cfg *RocksDBConfig) adjust() {
cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 cfg.CacheSize = getDefault(4*MB, cfg.CacheSize)
} cfg.BlockSize = getDefault(4*KB, cfg.BlockSize)
cfg.WriteBufferSize = getDefault(4*MB, cfg.WriteBufferSize)
if cfg.LevelDB.MaxOpenFiles < 1024 { cfg.MaxOpenFiles = getDefault(1024, cfg.MaxOpenFiles)
cfg.LevelDB.MaxOpenFiles = 1024 cfg.MaxWriteBufferNum = getDefault(2, cfg.MaxWriteBufferNum)
} cfg.MinWriteBufferNumberToMerge = getDefault(1, cfg.MinWriteBufferNumberToMerge)
cfg.NumLevels = getDefault(7, cfg.NumLevels)
if cfg.Replication.ExpiredLogDays <= 0 { cfg.Level0FileNumCompactionTrigger = getDefault(4, cfg.Level0FileNumCompactionTrigger)
cfg.Replication.ExpiredLogDays = 7 cfg.Level0SlowdownWritesTrigger = getDefault(16, cfg.Level0SlowdownWritesTrigger)
} cfg.Level0StopWritesTrigger = getDefault(64, cfg.Level0StopWritesTrigger)
cfg.TargetFileSizeBase = getDefault(32*MB, cfg.TargetFileSizeBase)
cfg.TargetFileSizeMultiplier = getDefault(1, cfg.TargetFileSizeMultiplier)
cfg.MaxBytesForLevelBase = getDefault(32*MB, cfg.MaxBytesForLevelBase)
cfg.MaxBytesForLevelMultiplier = getDefault(1, cfg.MaxBytesForLevelMultiplier)
cfg.MaxBackgroundCompactions = getDefault(1, cfg.MaxBackgroundCompactions)
cfg.MaxBackgroundFlushes = getDefault(1, cfg.MaxBackgroundFlushes)
cfg.StatsDumpPeriodSec = getDefault(3600, cfg.StatsDumpPeriodSec)
cfg.BackgroundThreads = getDefault(2, cfg.BackgroundThreads)
cfg.HighPriorityBackgroundThreads = getDefault(1, cfg.HighPriorityBackgroundThreads)
} }
func (cfg *Config) Dump(w io.Writer) error { func (cfg *Config) Dump(w io.Writer) error {

View File

@ -44,12 +44,46 @@ db_sync_commit = 0
use_replication = true use_replication = true
[leveldb] [leveldb]
# for leveldb and goleveldb
compression = false compression = false
block_size = 32768 block_size = 32768
write_buffer_size = 67108864 write_buffer_size = 67108864
cache_size = 524288000 cache_size = 524288000
max_open_files = 1024 max_open_files = 1024
[rocksdb]
# rocksdb has many many configurations,
# we only list little now, but may add more later.
# good luck!
# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc
compression = 0
block_size = 32768
write_buffer_size = 67108864
cache_size = 524288000
max_open_files = 1024
max_write_buffer_num = 2
min_write_buffer_number_to_merge = 1
num_levels = 7
level0_file_num_compaction_trigger = 4
level0_slowdown_writes_trigger = 16
level0_stop_writes_trigger = 64
target_file_size_base = 33554432
target_file_size_multiplier = 1
max_bytes_for_level_base = 33554432
max_bytes_for_level_multiplier = 10
disable_auto_compactions = false
disable_data_sync = false
use_fsync = false
background_theads = 4
high_priority_background_threads = 1
max_background_compactions = 1
max_background_flushes = 1
allow_os_buffer = true
enable_statistics = false
stats_dump_period_sec = 3600
[lmdb] [lmdb]
map_size = 524288000 map_size = 524288000
nosync = true nosync = true

View File

@ -44,12 +44,46 @@ db_sync_commit = 0
use_replication = true use_replication = true
[leveldb] [leveldb]
# for leveldb and goleveldb
compression = false compression = false
block_size = 32768 block_size = 32768
write_buffer_size = 67108864 write_buffer_size = 67108864
cache_size = 524288000 cache_size = 524288000
max_open_files = 1024 max_open_files = 1024
[rocksdb]
# rocksdb has many many configurations,
# we only list little now, but may add more later.
# good luck!
# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc
compression = 0
block_size = 32768
write_buffer_size = 67108864
cache_size = 524288000
max_open_files = 1024
max_write_buffer_num = 2
min_write_buffer_number_to_merge = 1
num_levels = 7
level0_file_num_compaction_trigger = 4
level0_slowdown_writes_trigger = 16
level0_stop_writes_trigger = 64
target_file_size_base = 33554432
target_file_size_multiplier = 1
max_bytes_for_level_base = 33554432
max_bytes_for_level_multiplier = 10
disable_auto_compactions = false
disable_data_sync = false
use_fsync = false
background_theads = 4
high_priority_background_threads = 1
max_background_compactions = 1
max_background_flushes = 1
allow_os_buffer = true
enable_statistics = false
stats_dump_period_sec = 3600
[lmdb] [lmdb]
map_size = 524288000 map_size = 524288000
nosync = true nosync = true

View File

@ -14,7 +14,6 @@ import (
"github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver" "github.com/siddontang/ledisdb/store/driver"
"os" "os"
"runtime"
"unsafe" "unsafe"
) )
@ -188,9 +187,6 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch {
wbatch: C.leveldb_writebatch_create(), wbatch: C.leveldb_writebatch_create(),
} }
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb return wb
} }

View File

@ -15,7 +15,6 @@ import (
"github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver" "github.com/siddontang/ledisdb/store/driver"
"os" "os"
"runtime"
"unsafe" "unsafe"
) )
@ -35,7 +34,7 @@ func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
db := new(DB) db := new(DB)
db.path = path db.path = path
db.cfg = &cfg.LevelDB db.cfg = &cfg.RocksDB
if err := db.open(); err != nil { if err := db.open(); err != nil {
return nil, err return nil, err
@ -47,7 +46,7 @@ func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
func (s Store) Repair(path string, cfg *config.Config) error { func (s Store) Repair(path string, cfg *config.Config) error {
db := new(DB) db := new(DB)
db.path = path db.path = path
db.cfg = &cfg.LevelDB db.cfg = &cfg.RocksDB
err := db.open() err := db.open()
defer db.Close() defer db.Close()
@ -71,7 +70,7 @@ func (s Store) Repair(path string, cfg *config.Config) error {
type DB struct { type DB struct {
path string path string
cfg *config.LevelDBConfig cfg *config.RocksDBConfig
db *C.rocksdb_t db *C.rocksdb_t
@ -107,15 +106,15 @@ func (db *DB) open() error {
return nil return nil
} }
func (db *DB) initOptions(cfg *config.LevelDBConfig) { func (db *DB) initOptions(cfg *config.RocksDBConfig) {
opts := NewOptions() opts := NewOptions()
blockOpts := NewBlockBasedTableOptions() blockOpts := NewBlockBasedTableOptions()
opts.SetCreateIfMissing(true) opts.SetCreateIfMissing(true)
db.env = NewDefaultEnv() db.env = NewDefaultEnv()
db.env.SetBackgroundThreads(runtime.NumCPU() * 2) db.env.SetBackgroundThreads(cfg.BackgroundThreads)
db.env.SetHighPriorityBackgroundThreads(1) db.env.SetHighPriorityBackgroundThreads(cfg.HighPriorityBackgroundThreads)
opts.SetEnv(db.env) opts.SetEnv(db.env)
db.cache = NewLRUCache(cfg.CacheSize) db.cache = NewLRUCache(cfg.CacheSize)
@ -124,28 +123,28 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
//we must use bloomfilter //we must use bloomfilter
db.filter = NewBloomFilter(defaultFilterBits) db.filter = NewBloomFilter(defaultFilterBits)
blockOpts.SetFilterPolicy(db.filter) blockOpts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
}
blockOpts.SetBlockSize(cfg.BlockSize) blockOpts.SetBlockSize(cfg.BlockSize)
opts.SetWriteBufferSize(cfg.WriteBufferSize)
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1)
opts.SetMaxBackgroundFlushes(1)
opts.SetLevel0SlowdownWritesTrigger(16)
opts.SetLevel0StopWritesTrigger(64)
opts.SetTargetFileSizeBase(32 * 1024 * 1024)
opts.SetBlockBasedTableFactory(blockOpts) opts.SetBlockBasedTableFactory(blockOpts)
opts.SetCompression(CompressionOpt(cfg.Compression))
opts.SetWriteBufferSize(cfg.WriteBufferSize)
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
opts.SetMaxBackgroundCompactions(cfg.MaxBackgroundCompactions)
opts.SetMaxBackgroundFlushes(cfg.MaxBackgroundFlushes)
opts.SetLevel0SlowdownWritesTrigger(cfg.Level0SlowdownWritesTrigger)
opts.SetLevel0StopWritesTrigger(cfg.Level0StopWritesTrigger)
opts.SetTargetFileSizeBase(cfg.TargetFileSizeBase)
opts.SetTargetFileSizeMultiplier(cfg.TargetFileSizeMultiplier)
opts.SetMaxBytesForLevelBase(cfg.MaxBytesForLevelBase)
opts.SetMaxBytesForLevelMultiplier(cfg.MaxBytesForLevelMultiplier)
opts.DisableDataSync(cfg.DisableDataSync)
opts.SetMinWriteBufferNumberToMerge(cfg.MinWriteBufferNumberToMerge)
opts.DisableAutoCompactions(cfg.DisableAutoCompactions)
opts.EnableStatistics(cfg.EnableStatistics)
opts.UseFsync(cfg.UseFsync)
opts.AllowOsBuffer(cfg.AllowOsBuffer)
opts.SetStatsDumpPeriodSec(cfg.StatsDumpPeriodSec)
db.opts = opts db.opts = opts
db.blockOpts = blockOpts db.blockOpts = blockOpts
@ -214,10 +213,6 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch {
wbatch: C.rocksdb_writebatch_create(), wbatch: C.rocksdb_writebatch_create(),
} }
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb return wb
} }

View File

@ -11,6 +11,10 @@ type CompressionOpt int
const ( const (
NoCompression = CompressionOpt(0) NoCompression = CompressionOpt(0)
SnappyCompression = CompressionOpt(1) SnappyCompression = CompressionOpt(1)
ZlibCompression = CompressionOpt(2)
Bz2Compression = CompressionOpt(3)
Lz4Compression = CompressionOpt(4)
Lz4hcCompression = CompressionOpt(5)
) )
type Options struct { type Options struct {
@ -134,6 +138,36 @@ func (o *Options) SetBlockBasedTableFactory(opt *BlockBasedTableOptions) {
C.rocksdb_options_set_block_based_table_factory(o.Opt, opt.Opt) C.rocksdb_options_set_block_based_table_factory(o.Opt, opt.Opt)
} }
func (o *Options) SetMinWriteBufferNumberToMerge(n int) {
C.rocksdb_options_set_min_write_buffer_number_to_merge(o.Opt, C.int(n))
}
func (o *Options) DisableDataSync(b bool) {
C.rocksdb_options_set_disable_data_sync(o.Opt, boolToInt(b))
}
func (o *Options) DisableAutoCompactions(b bool) {
C.rocksdb_options_set_disable_auto_compactions(o.Opt, boolToInt(b))
}
func (o *Options) UseFsync(b bool) {
C.rocksdb_options_set_use_fsync(o.Opt, boolToInt(b))
}
func (o *Options) AllowOsBuffer(b bool) {
C.rocksdb_options_set_allow_os_buffer(o.Opt, boolToUchar(b))
}
func (o *Options) EnableStatistics(b bool) {
if b {
C.rocksdb_options_enable_statistics(o.Opt)
}
}
func (o *Options) SetStatsDumpPeriodSec(n int) {
C.rocksdb_options_set_stats_dump_period_sec(o.Opt, C.uint(n))
}
func (o *BlockBasedTableOptions) Close() { func (o *BlockBasedTableOptions) Close() {
C.rocksdb_block_based_options_destroy(o.Opt) C.rocksdb_block_based_options_destroy(o.Opt)
} }
@ -185,3 +219,7 @@ func (wo *WriteOptions) Close() {
func (wo *WriteOptions) SetSync(b bool) { func (wo *WriteOptions) SetSync(b bool) {
C.rocksdb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) C.rocksdb_writeoptions_set_sync(wo.Opt, boolToUchar(b))
} }
func (wo *WriteOptions) DisableWAL(b bool) {
C.rocksdb_writeoptions_disable_WAL(wo.Opt, boolToInt(b))
}

View File

@ -27,6 +27,14 @@ func ucharToBool(uc C.uchar) bool {
return true return true
} }
func boolToInt(b bool) C.int {
uc := C.int(0)
if b {
uc = C.int(1)
}
return uc
}
func saveError(errStr *C.char) error { func saveError(errStr *C.char) error {
if errStr != nil { if errStr != nil {
gs := C.GoString(errStr) gs := C.GoString(errStr)