add flushall, some replication

This commit is contained in:
siddontang 2014-06-08 16:43:59 +08:00
parent 664a082c06
commit 993ccdd052
4 changed files with 117 additions and 16 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/siddontang/go-leveldb/leveldb"
"github.com/siddontang/go-log/log"
"path"
"sync"
)
@ -132,3 +133,13 @@ func (l *Ledis) Select(index int) (*DB, error) {
return l.dbs[index], nil
}
func (l *Ledis) FlushAll() error {
for index, db := range l.dbs {
if _, err := db.FlushAll(); err != nil {
log.Error("flush db %d error %s", index, err.Error())
}
}
return nil
}

View File

@ -16,9 +16,10 @@ type App struct {
closed bool
slaveMode bool
quit chan struct{}
//for slave replication
master masterInfo
}
func NewApp(cfg *Config) (*App, error) {
@ -50,12 +51,6 @@ func NewApp(cfg *Config) (*App, error) {
return nil, err
}
app.slaveMode = false
if len(app.cfg.SlaveOf) > 0 {
app.slaveMode = true
}
if app.ldb, err = ledis.OpenWithConfig(&cfg.DB); err != nil {
return nil, err
}
@ -78,7 +73,7 @@ func (app *App) Close() {
}
func (app *App) Run() {
if app.slaveMode {
if len(app.cfg.SlaveOf) > 0 {
app.runReplication()
}

View File

@ -1,16 +1,33 @@
package server
import (
"fmt"
"github.com/siddontang/ledisdb/ledis"
"strconv"
"strings"
)
func slaveofCommand(c *client) error {
if len(c.args) > 1 {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
master := ""
if len(c.args) == 1 {
master = string(c.args[0])
masterAddr := ""
if strings.ToLower(ledis.String(args[0])) == "no" &&
strings.ToLower(ledis.String(args[1])) == "one" {
//stop replication, use master = ""
} else {
if _, err := strconv.ParseInt(ledis.String(args[1]), 10, 16); err != nil {
return err
}
if err := c.app.slaveof(master); err != nil {
masterAddr = fmt.Sprintf("%s:%s", args[0], args[1])
}
if err := c.app.slaveof(masterAddr); err != nil {
return err
}

View File

@ -1,9 +1,87 @@
package server
func (app *App) slaveof(master string) error {
import (
"encoding/json"
"github.com/siddontang/go-log/log"
"io/ioutil"
"os"
"path"
)
type masterInfo struct {
Addr string `json:"addr"`
LogFile string `json:"log_name"`
LogPos int64 `json:"log_pos"`
}
func (app *App) getMasterInfoName() string {
return path.Join(app.cfg.DataDir, "master.info")
}
func (app *App) loadMasterInfo() error {
data, err := ioutil.ReadFile(app.getMasterInfoName())
if err != nil {
if os.IsNotExist(err) {
return nil
} else {
return err
}
}
if err = json.Unmarshal(data, &app.master); err != nil {
return err
}
return nil
}
func (app *App) saveMasterInfo() error {
bakName := path.Join(app.cfg.DataDir, "master.info.bak")
data, err := json.Marshal(&app.master)
if err != nil {
return err
}
var fd *os.File
fd, err = os.OpenFile(bakName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
if _, err = fd.Write(data); err != nil {
fd.Close()
return err
}
fd.Close()
return os.Rename(bakName, app.getMasterInfoName())
}
func (app *App) slaveof(masterAddr string) error {
if len(masterAddr) == 0 {
//stop replication
} else {
}
return nil
}
func (app *App) runReplication() {
}
func (app *App) startReplication(masterAddr string) error {
if err := app.loadMasterInfo(); err != nil {
log.Error("load master.info error %s, use fullsync", err.Error())
app.master = masterInfo{masterAddr, "", 0}
} else if app.master.Addr != masterAddr {
if err := app.ldb.FlushAll(); err != nil {
log.Error("replication flush old data error %s", err.Error())
return err
}
app.master = masterInfo{masterAddr, "", 0}
}
return nil
}