diff --git a/.gitignore b/.gitignore index 42e539f..8959171 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,5 @@ build .DS_Store nohup.out build_config.mk -var +var* _workspace \ No newline at end of file diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index e132506..32be6c2 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -15,12 +15,15 @@ import ( ) var configFile = flag.String("config", "", "ledisdb config file") +var addr = flag.String("addr", "", "ledisdb listen address") +var dataDir = flag.String("data_dir", "", "ledisdb base data dir") var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") var usePprof = flag.Bool("pprof", false, "enable pprof") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") +var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -42,6 +45,14 @@ func main() { return } + if len(*addr) > 0 { + cfg.Addr = *addr + } + + if len(*dataDir) > 0 { + cfg.DataDir = *dataDir + } + if len(*dbName) > 0 { cfg.DBName = *dbName } @@ -53,6 +64,7 @@ func main() { } else { cfg.Readonly = *readonly cfg.UseReplication = *rpl + cfg.Replication.Sync = *rplSync } var app *server.App diff --git a/server/cmd_replication.go b/server/cmd_replication.go index aefd1ac..b401f35 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -1,6 +1,7 @@ package server import ( + "encoding/binary" "fmt" "github.com/siddontang/go/hack" "github.com/siddontang/go/num" @@ -94,6 +95,8 @@ func fullsyncCommand(c *client) error { return nil } +var dummyBuf = make([]byte, 8) + func syncCommand(c *client) error { args := c.args if len(args) != 1 { @@ -122,11 +125,20 @@ func syncCommand(c *client) error { c.syncBuf.Reset() + c.syncBuf.Write(dummyBuf) + if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil { return err } else { buf := c.syncBuf.Bytes() + stat, err = c.app.ldb.ReplicationStat() + if err != nil { + return err + } + + binary.BigEndian.PutUint64(buf, stat.LastID) + c.resp.writeBulk(buf) } diff --git a/server/info.go b/server/info.go index 7ccb587..ccdba98 100644 --- a/server/info.go +++ b/server/info.go @@ -3,6 +3,7 @@ package server import ( "bytes" "fmt" + "github.com/siddontang/go/sync2" "os" "runtime" "strings" @@ -25,8 +26,11 @@ type info struct { } Replication struct { - PubLogNum int64 - PubLogTotalTime int64 //milliseconds + PubLogNum sync2.AtomicInt64 + PubLogAckNum sync2.AtomicInt64 + PubLogTotalAckTime sync2.AtomicDuration + + MasterLastLogID sync2.AtomicUint64 } } @@ -156,13 +160,15 @@ func (i *info) dumpReplication(buf *bytes.Buffer) { } i.app.slock.Unlock() - num := i.Replication.PubLogNum + num := i.Replication.PubLogNum.Get() p = append(p, infoPair{"pub_log_num", num}) - if num != 0 { - p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num}) + ackNum := i.Replication.PubLogAckNum.Get() + totalTime := i.Replication.PubLogTotalAckTime.Get().Nanoseconds() / 1e6 + if ackNum != 0 { + p = append(p, infoPair{"pub_log_ack_per_time", totalTime / ackNum}) } else { - p = append(p, infoPair{"pub_log_per_time", 0}) + p = append(p, infoPair{"pub_log_ack_per_time", 0}) } p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf}) @@ -181,6 +187,8 @@ func (i *info) dumpReplication(buf *bytes.Buffer) { p = append(p, infoPair{"commit_log_id", 0}) } + p = append(p, infoPair{"master_last_log_id", i.Replication.MasterLastLogID.Get()}) + i.dumpPairs(buf, p...) } diff --git a/server/replication.go b/server/replication.go index 25e4a8d..47230b1 100644 --- a/server/replication.go +++ b/server/replication.go @@ -16,7 +16,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" ) @@ -263,6 +262,17 @@ func (m *master) sync() error { buf := m.syncBuf.Bytes() + if len(buf) < 8 { + return fmt.Errorf("inavlid sync size %d", len(buf)) + } + + m.app.info.Replication.MasterLastLogID.Set(num.BytesToUint64(buf)) + + var t bytes.Buffer + m.app.info.dumpReplication(&t) + + buf = buf[8:] + if len(buf) == 0 { return nil } @@ -370,6 +380,8 @@ func (app *App) publishNewLog(l *rpl.Log) { return } + app.info.Replication.PubLogNum.Add(1) + app.slock.Lock() total := (len(app.slaves) + 1) / 2 @@ -414,6 +426,6 @@ func (app *App) publishNewLog(l *rpl.Log) { } stopTime := time.Now() - atomic.AddInt64(&app.info.Replication.PubLogNum, 1) - atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6) + app.info.Replication.PubLogAckNum.Add(1) + app.info.Replication.PubLogTotalAckTime.Add(stopTime.Sub(startTime)) }