diff --git a/config/config.go b/config/config.go index 5a8070b..668b545 100644 --- a/config/config.go +++ b/config/config.go @@ -30,11 +30,12 @@ type LMDBConfig struct { } type ReplicationConfig struct { - Path string `toml:"path"` - ExpiredLogDays int `toml:"expired_log_days"` - Sync bool `toml:"sync"` - WaitSyncTime int `toml:"wait_sync_time"` - Compression bool `toml:"compression"` + Path string `toml:"path"` + ExpiredLogDays int `toml:"expired_log_days"` + Sync bool `toml:"sync"` + WaitSyncTime int `toml:"wait_sync_time"` + WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"` + Compression bool `toml:"compression"` } type Config struct { @@ -100,6 +101,7 @@ func NewConfigDefault() *Config { cfg.Replication.WaitSyncTime = 1 cfg.Replication.Compression = true + cfg.Replication.WaitMaxSlaveAcks = 2 return cfg } diff --git a/config/config.toml b/config/config.toml index fbe86a6..b8d80ec 100644 --- a/config/config.toml +++ b/config/config.toml @@ -60,5 +60,10 @@ sync = true # If sync is true, wait at last wait_sync_time seconds for slave syncing this log wait_sync_time = 1 +# If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok. +# n is slave number +# If 0, wait (n + 1) / 2 acks. +wait_max_slave_acks = 2 + # Compress the log or not compression = true diff --git a/config/config_test.go b/config/config_test.go index c2b5a16..47779aa 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -5,7 +5,7 @@ import ( ) func TestConfig(t *testing.T) { - _, err := NewConfigWithFile("./config.toml") + _, err := NewConfigWithFile("./ledis.toml") if err != nil { t.Fatal(err) } diff --git a/etc/ledis.conf b/etc/ledis.conf index 0d46aee..b8d80ec 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -1,7 +1,5 @@ # LedisDB configuration -# Config format is toml, https://github.com/toml-lang/toml - # Server listen address addr = "127.0.0.1:6380" @@ -15,6 +13,7 @@ data_dir = "/tmp/ledis_server" access_log = "" # Set slaveof to enable replication from master, empty, no replication +# Any write operations except flushall and replication will be disabled in slave mode. slaveof = "" # Choose which backend storage to use, now support: @@ -29,9 +28,12 @@ slaveof = "" # db_name = "leveldb" -# if not set, use data_dir/"db_name"_data +# If not set, use data_dir/"db_name"_data db_path = "" +# enable replication or not +use_replication = true + [leveldb] compression = false block_size = 32768 @@ -43,8 +45,25 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[wal] -# if not set, use data_dir/wal +[replication] +# Path to store replication information(write ahead log, commit log, etc.) +# if not set, use data_dir/rpl path = "" +# Expire write ahead logs after the given days +expired_log_days = 7 +# If sync is true, the new log must be sent to some slaves, and then commit. +# It will reduce performance but have better high availability. +sync = true + +# If sync is true, wait at last wait_sync_time seconds for slave syncing this log +wait_sync_time = 1 + +# If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok. +# n is slave number +# If 0, wait (n + 1) / 2 acks. +wait_max_slave_acks = 2 + +# Compress the log or not +compression = true diff --git a/server/replication.go b/server/replication.go index dcd1587..b8b1868 100644 --- a/server/replication.go +++ b/server/replication.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/siddontang/go/hack" "github.com/siddontang/go/log" + "github.com/siddontang/go/num" "github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/rpl" "net" @@ -334,6 +335,11 @@ func (app *App) publishNewLog(l *rpl.Log) { s.ack = ack } + total := (len(ss) + 1) / 2 + if app.cfg.Replication.WaitMaxSlaveAcks > 0 { + total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks) + } + done := make(chan struct{}, 1) go func(total int) { n := 0 @@ -347,10 +353,11 @@ func (app *App) publishNewLog(l *rpl.Log) { } } done <- struct{}{} - }((len(ss) + 1) / 2) + }(total) select { case <-done: case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second): + log.Info("replication wait timeout") } }