diff --git a/cmd/ledis-benchmark/main.go b/cmd/ledis-benchmark/main.go index 379f6f9..dad16f2 100644 --- a/cmd/ledis-benchmark/main.go +++ b/cmd/ledis-benchmark/main.go @@ -1,7 +1,6 @@ package main import ( - crand "crypto/rand" "flag" "fmt" "github.com/siddontang/ledisdb/client/go/ledis" @@ -39,10 +38,10 @@ func waitBench(cmd string, args ...interface{}) { func bench(cmd string, f func()) { wg.Add(*clients) - t1 := time.Now().UnixNano() + t1 := time.Now() for i := 0; i < *clients; i++ { go func() { - for i := 0; i < loop; i++ { + for j := 0; j < loop; j++ { f() } wg.Done() @@ -51,11 +50,9 @@ func bench(cmd string, f func()) { wg.Wait() - t2 := time.Now().UnixNano() + t2 := time.Now() - delta := float64(t2-t1) / float64(time.Second) - - fmt.Printf("%s: %0.2f requests per second\n", cmd, (float64(*number) / delta)) + fmt.Printf("%s: %0.2f op/s\n", cmd, (float64(*number) / t2.Sub(t1).Seconds())) } var kvSetBase int64 = 0 @@ -66,7 +63,6 @@ var kvDelBase int64 = 0 func benchSet() { f := func() { value := make([]byte, *valueSize) - crand.Read(value) n := atomic.AddInt64(&kvSetBase, 1) waitBench("set", n, value) } @@ -104,7 +100,6 @@ func benchDel() { func benchPushList() { f := func() { value := make([]byte, 100) - crand.Read(value) waitBench("rpush", "mytestlist", value) } @@ -151,7 +146,6 @@ var hashDelBase int64 = 0 func benchHset() { f := func() { value := make([]byte, 100) - crand.Read(value) n := atomic.AddInt64(&hashSetBase, 1) waitBench("hset", "myhashkey", n, value) @@ -194,7 +188,6 @@ var zsetIncrBase int64 = 0 func benchZAdd() { f := func() { member := make([]byte, 16) - crand.Read(member) n := atomic.AddInt64(&zsetAddBase, 1) waitBench("zadd", "myzsetkey", n, member) } diff --git a/cmd/ledis-storebench/main.go b/cmd/ledis-storebench/main.go new file mode 100644 index 0000000..5493ee0 --- /dev/null +++ b/cmd/ledis-storebench/main.go @@ -0,0 +1,143 @@ +package main + +import ( + "flag" + "fmt" + "github.com/siddontang/go/num" + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store" + "os" + "runtime" + "sync" + "sync/atomic" + "time" +) + +var KB = config.KB +var MB = config.MB +var GB = config.GB + +var name = flag.String("db_name", "goleveldb", "db name") +var number = flag.Int("n", 10000, "request number") +var clients = flag.Int("c", 50, "number of clients") +var round = flag.Int("r", 1, "benchmark round number") +var valueSize = flag.Int("vsize", 100, "kv value size") +var wg sync.WaitGroup + +var db *store.DB + +var loop int = 0 + +func bench(cmd string, f func()) { + wg.Add(*clients) + + t1 := time.Now() + for i := 0; i < *clients; i++ { + go func() { + for j := 0; j < loop; j++ { + f() + } + wg.Done() + }() + } + + wg.Wait() + + t2 := time.Now() + + d := t2.Sub(t1) + fmt.Printf("%s: %0.3f micros/op, %0.2fmb/s\n", cmd, float64(d.Nanoseconds()/1e3)/float64(*number), + float64((*valueSize+16)*(*number))/(1024.0*1024.0*(d.Seconds()))) +} + +var kvSetBase int64 = 0 +var kvGetBase int64 = 0 + +func benchSet() { + f := func() { + value := make([]byte, *valueSize) + n := atomic.AddInt64(&kvSetBase, 1) + + db.Put(num.Int64ToBytes(n), value) + } + + bench("set", f) +} + +func benchGet() { + f := func() { + n := atomic.AddInt64(&kvGetBase, 1) + v, err := db.Get(num.Int64ToBytes(n)) + if err != nil { + println(err.Error()) + } else if len(v) != *valueSize { + println(len(v), *valueSize) + } + } + + bench("get", f) +} + +func setRocksDB(cfg *config.RocksDBConfig) { + cfg.BlockSize = 64 * KB + cfg.WriteBufferSize = 64 * MB + cfg.MaxWriteBufferNum = 2 + cfg.MaxBytesForLevelBase = 512 * MB + cfg.TargetFileSizeBase = 64 * MB + cfg.BackgroundThreads = 4 + cfg.HighPriorityBackgroundThreads = 1 + cfg.MaxBackgroundCompactions = 3 + cfg.MaxBackgroundFlushes = 1 + cfg.CacheSize = 512 * MB + cfg.EnableStatistics = true + cfg.StatsDumpPeriodSec = 5 + cfg.Level0FileNumCompactionTrigger = 8 + cfg.MaxBytesForLevelMultiplier = 8 +} + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + flag.Parse() + + cfg := config.NewConfigDefault() + cfg.DBPath = "./var/store_test" + cfg.DBName = *name + os.RemoveAll(cfg.DBPath) + + cfg.LevelDB.BlockSize = 32 * KB + cfg.LevelDB.CacheSize = 512 * MB + cfg.LevelDB.WriteBufferSize = 64 * MB + cfg.LevelDB.MaxOpenFiles = 1000 + + setRocksDB(&cfg.RocksDB) + + var err error + db, err = store.Open(cfg) + if err != nil { + panic(err) + return + } + + if *number <= 0 { + panic("invalid number") + return + } + + if *clients <= 0 || *number < *clients { + panic("invalid client number") + return + } + + loop = *number / *clients + + if *round <= 0 { + *round = 1 + } + + for i := 0; i < *round; i++ { + benchSet() + benchGet() + + println("") + } +} diff --git a/config/config.go b/config/config.go index aa3c1cf..72c3105 100644 --- a/config/config.go +++ b/config/config.go @@ -19,6 +19,10 @@ const ( DefaultDBName string = "goleveldb" DefaultDataDir string = "./var" + + KB int = 1024 + MB int = KB * 1024 + GB int = MB * 1024 ) type LevelDBConfig struct { @@ -29,6 +33,34 @@ type LevelDBConfig struct { MaxOpenFiles int `toml:"max_open_files"` } +type RocksDBConfig struct { + Compression int `toml:"compression"` + BlockSize int `toml:"block_size"` + WriteBufferSize int `toml:"write_buffer_size"` + CacheSize int `toml:"cache_size"` + MaxOpenFiles int `toml:"max_open_files"` + MaxWriteBufferNum int `toml:"max_write_buffer_num"` + MinWriteBufferNumberToMerge int `toml:"min_write_buffer_number_to_merge"` + NumLevels int `toml:"num_levels"` + Level0FileNumCompactionTrigger int `toml:"level0_file_num_compaction_trigger"` + Level0SlowdownWritesTrigger int `toml:"level0_slowdown_writes_trigger"` + Level0StopWritesTrigger int `toml:"level0_stop_writes_trigger"` + TargetFileSizeBase int `toml:"target_file_size_base"` + TargetFileSizeMultiplier int `toml:"target_file_size_multiplier"` + MaxBytesForLevelBase int `toml:"max_bytes_for_level_base"` + MaxBytesForLevelMultiplier int `toml:"max_bytes_for_level_multiplier"` + DisableAutoCompactions bool `toml:"disable_auto_compactions"` + DisableDataSync bool `toml:"disable_data_sync"` + UseFsync bool `toml:"use_fsync"` + MaxBackgroundCompactions int `toml:"max_background_compactions"` + MaxBackgroundFlushes int `toml:"max_background_flushes"` + AllowOsBuffer bool `toml:"allow_os_buffer"` + EnableStatistics bool `toml:"enable_statistics"` + StatsDumpPeriodSec int `toml:"stats_dump_period_sec"` + BackgroundThreads int `toml:"background_theads"` + HighPriorityBackgroundThreads int `toml:"high_priority_background_threads"` +} + type LMDBConfig struct { MapSize int `toml:"map_size"` NoSync bool `toml:"nosync"` @@ -67,6 +99,7 @@ type Config struct { DBSyncCommit int `toml:"db_sync_commit"` LevelDB LevelDBConfig `toml:"leveldb"` + RocksDB RocksDBConfig `toml:"rocksdb"` LMDB LMDBConfig `toml:"lmdb"` @@ -121,7 +154,7 @@ func NewConfigDefault() *Config { // disable access log cfg.AccessLog = "" - cfg.LMDB.MapSize = 20 * 1024 * 1024 + cfg.LMDB.MapSize = 20 * MB cfg.LMDB.NoSync = true cfg.UseReplication = false @@ -131,31 +164,60 @@ func NewConfigDefault() *Config { cfg.Replication.SyncLog = 0 cfg.Snapshot.MaxNum = 1 + cfg.RocksDB.AllowOsBuffer = true + cfg.RocksDB.EnableStatistics = false + cfg.RocksDB.UseFsync = false + cfg.RocksDB.DisableAutoCompactions = false + cfg.RocksDB.AllowOsBuffer = true + cfg.adjust() return cfg } +func getDefault(d int, s int) int { + if s <= 0 { + return d + } else { + return s + } +} + func (cfg *Config) adjust() { - if cfg.LevelDB.CacheSize <= 0 { - cfg.LevelDB.CacheSize = 4 * 1024 * 1024 - } + cfg.LevelDB.adjust() - if cfg.LevelDB.BlockSize <= 0 { - cfg.LevelDB.BlockSize = 4 * 1024 - } + cfg.RocksDB.adjust() - if cfg.LevelDB.WriteBufferSize <= 0 { - cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 - } + cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays) +} - if cfg.LevelDB.MaxOpenFiles < 1024 { - cfg.LevelDB.MaxOpenFiles = 1024 - } +func (cfg *LevelDBConfig) adjust() { + cfg.CacheSize = getDefault(4*MB, cfg.CacheSize) + cfg.BlockSize = getDefault(4*KB, cfg.BlockSize) + cfg.WriteBufferSize = getDefault(4*MB, cfg.WriteBufferSize) + cfg.MaxOpenFiles = getDefault(1024, cfg.MaxOpenFiles) +} - if cfg.Replication.ExpiredLogDays <= 0 { - cfg.Replication.ExpiredLogDays = 7 - } +func (cfg *RocksDBConfig) adjust() { + cfg.CacheSize = getDefault(4*MB, cfg.CacheSize) + cfg.BlockSize = getDefault(4*KB, cfg.BlockSize) + cfg.WriteBufferSize = getDefault(4*MB, cfg.WriteBufferSize) + cfg.MaxOpenFiles = getDefault(1024, cfg.MaxOpenFiles) + cfg.MaxWriteBufferNum = getDefault(2, cfg.MaxWriteBufferNum) + cfg.MinWriteBufferNumberToMerge = getDefault(1, cfg.MinWriteBufferNumberToMerge) + cfg.NumLevels = getDefault(7, cfg.NumLevels) + cfg.Level0FileNumCompactionTrigger = getDefault(4, cfg.Level0FileNumCompactionTrigger) + cfg.Level0SlowdownWritesTrigger = getDefault(16, cfg.Level0SlowdownWritesTrigger) + cfg.Level0StopWritesTrigger = getDefault(64, cfg.Level0StopWritesTrigger) + cfg.TargetFileSizeBase = getDefault(32*MB, cfg.TargetFileSizeBase) + cfg.TargetFileSizeMultiplier = getDefault(1, cfg.TargetFileSizeMultiplier) + cfg.MaxBytesForLevelBase = getDefault(32*MB, cfg.MaxBytesForLevelBase) + cfg.MaxBytesForLevelMultiplier = getDefault(1, cfg.MaxBytesForLevelMultiplier) + cfg.MaxBackgroundCompactions = getDefault(1, cfg.MaxBackgroundCompactions) + cfg.MaxBackgroundFlushes = getDefault(1, cfg.MaxBackgroundFlushes) + cfg.StatsDumpPeriodSec = getDefault(3600, cfg.StatsDumpPeriodSec) + cfg.BackgroundThreads = getDefault(2, cfg.BackgroundThreads) + cfg.HighPriorityBackgroundThreads = getDefault(1, cfg.HighPriorityBackgroundThreads) } func (cfg *Config) Dump(w io.Writer) error { diff --git a/config/config.toml b/config/config.toml index ca33f59..88f566b 100644 --- a/config/config.toml +++ b/config/config.toml @@ -44,12 +44,46 @@ db_sync_commit = 0 use_replication = true [leveldb] +# for leveldb and goleveldb compression = false block_size = 32768 write_buffer_size = 67108864 cache_size = 524288000 max_open_files = 1024 +[rocksdb] +# rocksdb has many many configurations, +# we only list little now, but may add more later. +# good luck! + +# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc +compression = 0 +block_size = 65536 +write_buffer_size = 67108864 +cache_size = 524288000 +max_open_files = 1024 +max_write_buffer_num = 2 +min_write_buffer_number_to_merge = 1 +num_levels = 7 +level0_file_num_compaction_trigger = 8 +level0_slowdown_writes_trigger = 16 +level0_stop_writes_trigger = 64 +target_file_size_base = 67108864 +target_file_size_multiplier = 1 +max_bytes_for_level_base = 536870912 +max_bytes_for_level_multiplier = 8 +disable_auto_compactions = false +disable_data_sync = false +use_fsync = false +background_theads = 4 +high_priority_background_threads = 1 +max_background_compactions = 3 +max_background_flushes = 1 +allow_os_buffer = true +enable_statistics = false +stats_dump_period_sec = 3600 + + [lmdb] map_size = 524288000 nosync = true diff --git a/etc/ledis.conf b/etc/ledis.conf index ca33f59..88f566b 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -44,12 +44,46 @@ db_sync_commit = 0 use_replication = true [leveldb] +# for leveldb and goleveldb compression = false block_size = 32768 write_buffer_size = 67108864 cache_size = 524288000 max_open_files = 1024 +[rocksdb] +# rocksdb has many many configurations, +# we only list little now, but may add more later. +# good luck! + +# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc +compression = 0 +block_size = 65536 +write_buffer_size = 67108864 +cache_size = 524288000 +max_open_files = 1024 +max_write_buffer_num = 2 +min_write_buffer_number_to_merge = 1 +num_levels = 7 +level0_file_num_compaction_trigger = 8 +level0_slowdown_writes_trigger = 16 +level0_stop_writes_trigger = 64 +target_file_size_base = 67108864 +target_file_size_multiplier = 1 +max_bytes_for_level_base = 536870912 +max_bytes_for_level_multiplier = 8 +disable_auto_compactions = false +disable_data_sync = false +use_fsync = false +background_theads = 4 +high_priority_background_threads = 1 +max_background_compactions = 3 +max_background_flushes = 1 +allow_os_buffer = true +enable_statistics = false +stats_dump_period_sec = 3600 + + [lmdb] map_size = 524288000 nosync = true diff --git a/store/leveldb/db.go b/store/leveldb/db.go index cd1fb1e..449c32b 100644 --- a/store/leveldb/db.go +++ b/store/leveldb/db.go @@ -14,7 +14,6 @@ import ( "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store/driver" "os" - "runtime" "unsafe" ) @@ -188,9 +187,6 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch { wbatch: C.leveldb_writebatch_create(), } - runtime.SetFinalizer(wb, func(w *WriteBatch) { - w.Close() - }) return wb } diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go index 35a1c7e..9bc7898 100644 --- a/store/rocksdb/db.go +++ b/store/rocksdb/db.go @@ -15,7 +15,6 @@ import ( "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store/driver" "os" - "runtime" "unsafe" ) @@ -35,7 +34,7 @@ func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { db := new(DB) db.path = path - db.cfg = &cfg.LevelDB + db.cfg = &cfg.RocksDB if err := db.open(); err != nil { return nil, err @@ -47,7 +46,7 @@ func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { func (s Store) Repair(path string, cfg *config.Config) error { db := new(DB) db.path = path - db.cfg = &cfg.LevelDB + db.cfg = &cfg.RocksDB err := db.open() defer db.Close() @@ -71,7 +70,7 @@ func (s Store) Repair(path string, cfg *config.Config) error { type DB struct { path string - cfg *config.LevelDBConfig + cfg *config.RocksDBConfig db *C.rocksdb_t @@ -107,15 +106,15 @@ func (db *DB) open() error { return nil } -func (db *DB) initOptions(cfg *config.LevelDBConfig) { +func (db *DB) initOptions(cfg *config.RocksDBConfig) { opts := NewOptions() blockOpts := NewBlockBasedTableOptions() opts.SetCreateIfMissing(true) db.env = NewDefaultEnv() - db.env.SetBackgroundThreads(runtime.NumCPU() * 2) - db.env.SetHighPriorityBackgroundThreads(1) + db.env.SetBackgroundThreads(cfg.BackgroundThreads) + db.env.SetHighPriorityBackgroundThreads(cfg.HighPriorityBackgroundThreads) opts.SetEnv(db.env) db.cache = NewLRUCache(cfg.CacheSize) @@ -124,28 +123,28 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { //we must use bloomfilter db.filter = NewBloomFilter(defaultFilterBits) blockOpts.SetFilterPolicy(db.filter) - - if !cfg.Compression { - opts.SetCompression(NoCompression) - } else { - opts.SetCompression(SnappyCompression) - } - blockOpts.SetBlockSize(cfg.BlockSize) - - opts.SetWriteBufferSize(cfg.WriteBufferSize) - - opts.SetMaxOpenFiles(cfg.MaxOpenFiles) - - opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1) - opts.SetMaxBackgroundFlushes(1) - - opts.SetLevel0SlowdownWritesTrigger(16) - opts.SetLevel0StopWritesTrigger(64) - opts.SetTargetFileSizeBase(32 * 1024 * 1024) - opts.SetBlockBasedTableFactory(blockOpts) + opts.SetCompression(CompressionOpt(cfg.Compression)) + opts.SetWriteBufferSize(cfg.WriteBufferSize) + opts.SetMaxOpenFiles(cfg.MaxOpenFiles) + opts.SetMaxBackgroundCompactions(cfg.MaxBackgroundCompactions) + opts.SetMaxBackgroundFlushes(cfg.MaxBackgroundFlushes) + opts.SetLevel0SlowdownWritesTrigger(cfg.Level0SlowdownWritesTrigger) + opts.SetLevel0StopWritesTrigger(cfg.Level0StopWritesTrigger) + opts.SetTargetFileSizeBase(cfg.TargetFileSizeBase) + opts.SetTargetFileSizeMultiplier(cfg.TargetFileSizeMultiplier) + opts.SetMaxBytesForLevelBase(cfg.MaxBytesForLevelBase) + opts.SetMaxBytesForLevelMultiplier(cfg.MaxBytesForLevelMultiplier) + opts.DisableDataSync(cfg.DisableDataSync) + opts.SetMinWriteBufferNumberToMerge(cfg.MinWriteBufferNumberToMerge) + opts.DisableAutoCompactions(cfg.DisableAutoCompactions) + opts.EnableStatistics(cfg.EnableStatistics) + opts.UseFsync(cfg.UseFsync) + opts.AllowOsBuffer(cfg.AllowOsBuffer) + opts.SetStatsDumpPeriodSec(cfg.StatsDumpPeriodSec) + db.opts = opts db.blockOpts = blockOpts @@ -214,10 +213,6 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch { wbatch: C.rocksdb_writebatch_create(), } - runtime.SetFinalizer(wb, func(w *WriteBatch) { - w.Close() - }) - return wb } diff --git a/store/rocksdb/options.go b/store/rocksdb/options.go index 878ccf1..0404cdb 100644 --- a/store/rocksdb/options.go +++ b/store/rocksdb/options.go @@ -11,6 +11,10 @@ type CompressionOpt int const ( NoCompression = CompressionOpt(0) SnappyCompression = CompressionOpt(1) + ZlibCompression = CompressionOpt(2) + Bz2Compression = CompressionOpt(3) + Lz4Compression = CompressionOpt(4) + Lz4hcCompression = CompressionOpt(5) ) type Options struct { @@ -134,6 +138,36 @@ func (o *Options) SetBlockBasedTableFactory(opt *BlockBasedTableOptions) { C.rocksdb_options_set_block_based_table_factory(o.Opt, opt.Opt) } +func (o *Options) SetMinWriteBufferNumberToMerge(n int) { + C.rocksdb_options_set_min_write_buffer_number_to_merge(o.Opt, C.int(n)) +} + +func (o *Options) DisableDataSync(b bool) { + C.rocksdb_options_set_disable_data_sync(o.Opt, boolToInt(b)) +} + +func (o *Options) DisableAutoCompactions(b bool) { + C.rocksdb_options_set_disable_auto_compactions(o.Opt, boolToInt(b)) +} + +func (o *Options) UseFsync(b bool) { + C.rocksdb_options_set_use_fsync(o.Opt, boolToInt(b)) +} + +func (o *Options) AllowOsBuffer(b bool) { + C.rocksdb_options_set_allow_os_buffer(o.Opt, boolToUchar(b)) +} + +func (o *Options) EnableStatistics(b bool) { + if b { + C.rocksdb_options_enable_statistics(o.Opt) + } +} + +func (o *Options) SetStatsDumpPeriodSec(n int) { + C.rocksdb_options_set_stats_dump_period_sec(o.Opt, C.uint(n)) +} + func (o *BlockBasedTableOptions) Close() { C.rocksdb_block_based_options_destroy(o.Opt) } @@ -185,3 +219,7 @@ func (wo *WriteOptions) Close() { func (wo *WriteOptions) SetSync(b bool) { C.rocksdb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) } + +func (wo *WriteOptions) DisableWAL(b bool) { + C.rocksdb_writeoptions_disable_WAL(wo.Opt, boolToInt(b)) +} diff --git a/store/rocksdb/util.go b/store/rocksdb/util.go index f924d76..22b73ba 100644 --- a/store/rocksdb/util.go +++ b/store/rocksdb/util.go @@ -27,6 +27,14 @@ func ucharToBool(uc C.uchar) bool { return true } +func boolToInt(b bool) C.int { + uc := C.int(0) + if b { + uc = C.int(1) + } + return uc +} + func saveError(errStr *C.char) error { if errStr != nil { gs := C.GoString(errStr)