diff --git a/ledis/ledis.go b/ledis/ledis.go index 0cf108a..3afa0f5 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "github.com/siddontang/go-leveldb/leveldb" + "github.com/siddontang/go-log/log" "path" "sync" ) @@ -132,3 +133,13 @@ func (l *Ledis) Select(index int) (*DB, error) { return l.dbs[index], nil } + +func (l *Ledis) FlushAll() error { + for index, db := range l.dbs { + if _, err := db.FlushAll(); err != nil { + log.Error("flush db %d error %s", index, err.Error()) + } + } + + return nil +} diff --git a/server/app.go b/server/app.go index cf76049..1b745b6 100644 --- a/server/app.go +++ b/server/app.go @@ -16,9 +16,10 @@ type App struct { closed bool - slaveMode bool - quit chan struct{} + + //for slave replication + master masterInfo } func NewApp(cfg *Config) (*App, error) { @@ -50,12 +51,6 @@ func NewApp(cfg *Config) (*App, error) { return nil, err } - app.slaveMode = false - - if len(app.cfg.SlaveOf) > 0 { - app.slaveMode = true - } - if app.ldb, err = ledis.OpenWithConfig(&cfg.DB); err != nil { return nil, err } @@ -78,7 +73,7 @@ func (app *App) Close() { } func (app *App) Run() { - if app.slaveMode { + if len(app.cfg.SlaveOf) > 0 { app.runReplication() } diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 634d57f..8588877 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -1,16 +1,33 @@ package server +import ( + "fmt" + "github.com/siddontang/ledisdb/ledis" + "strconv" + "strings" +) + func slaveofCommand(c *client) error { - if len(c.args) > 1 { + args := c.args + + if len(args) != 2 { return ErrCmdParams } - master := "" - if len(c.args) == 1 { - master = string(c.args[0]) + masterAddr := "" + + if strings.ToLower(ledis.String(args[0])) == "no" && + strings.ToLower(ledis.String(args[1])) == "one" { + //stop replication, use master = "" + } else { + if _, err := strconv.ParseInt(ledis.String(args[1]), 10, 16); err != nil { + return err + } + + masterAddr = fmt.Sprintf("%s:%s", args[0], args[1]) } - if err := c.app.slaveof(master); err != nil { + if err := c.app.slaveof(masterAddr); err != nil { return err } diff --git a/server/replication.go b/server/replication.go index d1a3749..81ba85e 100644 --- a/server/replication.go +++ b/server/replication.go @@ -1,9 +1,87 @@ package server -func (app *App) slaveof(master string) error { +import ( + "encoding/json" + "github.com/siddontang/go-log/log" + "io/ioutil" + "os" + "path" +) + +type masterInfo struct { + Addr string `json:"addr"` + LogFile string `json:"log_name"` + LogPos int64 `json:"log_pos"` +} + +func (app *App) getMasterInfoName() string { + return path.Join(app.cfg.DataDir, "master.info") +} + +func (app *App) loadMasterInfo() error { + data, err := ioutil.ReadFile(app.getMasterInfoName()) + if err != nil { + if os.IsNotExist(err) { + return nil + } else { + return err + } + } + + if err = json.Unmarshal(data, &app.master); err != nil { + return err + } + + return nil +} + +func (app *App) saveMasterInfo() error { + bakName := path.Join(app.cfg.DataDir, "master.info.bak") + + data, err := json.Marshal(&app.master) + if err != nil { + return err + } + + var fd *os.File + fd, err = os.OpenFile(bakName, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + + if _, err = fd.Write(data); err != nil { + fd.Close() + return err + } + + fd.Close() + return os.Rename(bakName, app.getMasterInfoName()) +} + +func (app *App) slaveof(masterAddr string) error { + if len(masterAddr) == 0 { + //stop replication + } else { + } + return nil } func (app *App) runReplication() { - +} + +func (app *App) startReplication(masterAddr string) error { + if err := app.loadMasterInfo(); err != nil { + log.Error("load master.info error %s, use fullsync", err.Error()) + app.master = masterInfo{masterAddr, "", 0} + } else if app.master.Addr != masterAddr { + if err := app.ldb.FlushAll(); err != nil { + log.Error("replication flush old data error %s", err.Error()) + return err + } + + app.master = masterInfo{masterAddr, "", 0} + } + + return nil }