add wait max slave acks config

This commit is contained in:
siddontang 2014-10-05 17:24:44 +08:00
parent 498a8c6f81
commit 4d974a0db7
5 changed files with 45 additions and 12 deletions

View File

@ -34,6 +34,7 @@ type ReplicationConfig struct {
ExpiredLogDays int `toml:"expired_log_days"` ExpiredLogDays int `toml:"expired_log_days"`
Sync bool `toml:"sync"` Sync bool `toml:"sync"`
WaitSyncTime int `toml:"wait_sync_time"` WaitSyncTime int `toml:"wait_sync_time"`
WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"`
Compression bool `toml:"compression"` Compression bool `toml:"compression"`
} }
@ -100,6 +101,7 @@ func NewConfigDefault() *Config {
cfg.Replication.WaitSyncTime = 1 cfg.Replication.WaitSyncTime = 1
cfg.Replication.Compression = true cfg.Replication.Compression = true
cfg.Replication.WaitMaxSlaveAcks = 2
return cfg return cfg
} }

View File

@ -60,5 +60,10 @@ sync = true
# If sync is true, wait at last wait_sync_time seconds for slave syncing this log # If sync is true, wait at last wait_sync_time seconds for slave syncing this log
wait_sync_time = 1 wait_sync_time = 1
# If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok.
# n is slave number
# If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2
# Compress the log or not # Compress the log or not
compression = true compression = true

View File

@ -5,7 +5,7 @@ import (
) )
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
_, err := NewConfigWithFile("./config.toml") _, err := NewConfigWithFile("./ledis.toml")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -1,7 +1,5 @@
# LedisDB configuration # LedisDB configuration
# Config format is toml, https://github.com/toml-lang/toml
# Server listen address # Server listen address
addr = "127.0.0.1:6380" addr = "127.0.0.1:6380"
@ -15,6 +13,7 @@ data_dir = "/tmp/ledis_server"
access_log = "" access_log = ""
# Set slaveof to enable replication from master, empty, no replication # Set slaveof to enable replication from master, empty, no replication
# Any write operations except flushall and replication will be disabled in slave mode.
slaveof = "" slaveof = ""
# Choose which backend storage to use, now support: # Choose which backend storage to use, now support:
@ -29,9 +28,12 @@ slaveof = ""
# #
db_name = "leveldb" db_name = "leveldb"
# if not set, use data_dir/"db_name"_data # If not set, use data_dir/"db_name"_data
db_path = "" db_path = ""
# enable replication or not
use_replication = true
[leveldb] [leveldb]
compression = false compression = false
block_size = 32768 block_size = 32768
@ -43,8 +45,25 @@ max_open_files = 1024
map_size = 524288000 map_size = 524288000
nosync = true nosync = true
[wal] [replication]
# if not set, use data_dir/wal # Path to store replication information(write ahead log, commit log, etc.)
# if not set, use data_dir/rpl
path = "" path = ""
# Expire write ahead logs after the given days
expired_log_days = 7
# If sync is true, the new log must be sent to some slaves, and then commit.
# It will reduce performance but have better high availability.
sync = true
# If sync is true, wait at last wait_sync_time seconds for slave syncing this log
wait_sync_time = 1
# If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok.
# n is slave number
# If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2
# Compress the log or not
compression = true

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
"github.com/siddontang/go/log" "github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/rpl" "github.com/siddontang/ledisdb/rpl"
"net" "net"
@ -334,6 +335,11 @@ func (app *App) publishNewLog(l *rpl.Log) {
s.ack = ack s.ack = ack
} }
total := (len(ss) + 1) / 2
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
}
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
go func(total int) { go func(total int) {
n := 0 n := 0
@ -347,10 +353,11 @@ func (app *App) publishNewLog(l *rpl.Log) {
} }
} }
done <- struct{}{} done <- struct{}{}
}((len(ss) + 1) / 2) }(total)
select { select {
case <-done: case <-done:
case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second): case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second):
log.Info("replication wait timeout")
} }
} }