diff --git a/.gitignore b/.gitignore index 42e539f..8959171 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,5 @@ build .DS_Store nohup.out build_config.mk -var +var* _workspace \ No newline at end of file diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index ab1bbab..594eeae 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -16,35 +16,35 @@ }, { "ImportPath": "github.com/siddontang/go/bson", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/filelock", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/hack", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/ioutil2", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/log", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/num", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/snappy", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/go/sync2", - "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + "Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1" }, { "ImportPath": "github.com/siddontang/goleveldb/leveldb", diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index e132506..32be6c2 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -15,12 +15,15 @@ import ( ) var configFile = flag.String("config", "", "ledisdb config file") +var addr = flag.String("addr", "", "ledisdb listen address") +var dataDir = flag.String("data_dir", "", "ledisdb base data dir") var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") var usePprof = flag.Bool("pprof", false, "enable pprof") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") +var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -42,6 +45,14 @@ func main() { return } + if len(*addr) > 0 { + cfg.Addr = *addr + } + + if len(*dataDir) > 0 { + cfg.DataDir = *dataDir + } + if len(*dbName) > 0 { cfg.DBName = *dbName } @@ -53,6 +64,7 @@ func main() { } else { cfg.Readonly = *readonly cfg.UseReplication = *rpl + cfg.Replication.Sync = *rplSync } var app *server.App diff --git a/server/app.go b/server/app.go index aeb71a4..0e03b3f 100644 --- a/server/app.go +++ b/server/app.go @@ -32,8 +32,9 @@ type App struct { s *script // handle slaves - slock sync.Mutex - slaves map[*client]struct{} + slock sync.Mutex + slaves map[string]*client + slaveSyncAck chan uint64 snap *snapshotStore } @@ -60,7 +61,8 @@ func NewApp(cfg *config.Config) (*App, error) { app.cfg = cfg - app.slaves = make(map[*client]struct{}) + app.slaves = make(map[string]*client) + app.slaveSyncAck = make(chan uint64) var err error diff --git a/server/client.go b/server/client.go index ef9de76..ba24821 100644 --- a/server/client.go +++ b/server/client.go @@ -64,8 +64,6 @@ type client struct { lastLogID uint64 - ack *syncAck - reqErr chan error buf bytes.Buffer @@ -73,6 +71,8 @@ type client struct { tx *ledis.Tx script *ledis.Multi + + slaveListeningAddr string } func newClient(app *App) *client { diff --git a/server/client_resp.go b/server/client_resp.go index 078a02c..93edef9 100644 --- a/server/client_resp.go +++ b/server/client_resp.go @@ -51,7 +51,10 @@ func (c *respClient) run() { log.Fatal("client run panic %s:%v", buf, e) } + handleQuit := true if c.conn != nil { + //if handle quit command before, conn is nil + handleQuit = false c.conn.Close() } @@ -60,7 +63,7 @@ func (c *respClient) run() { c.tx = nil } - c.app.removeSlave(c.client) + c.app.removeSlave(c.client, handleQuit) }() for { @@ -144,6 +147,7 @@ func (c *respClient) handleRequest(reqData [][]byte) { c.resp.writeStatus(OK) c.resp.flush() c.conn.Close() + c.conn = nil return } diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 5526d5c..b401f35 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -1,9 +1,12 @@ package server import ( + "encoding/binary" "fmt" "github.com/siddontang/go/hack" + "github.com/siddontang/go/num" "github.com/siddontang/ledisdb/ledis" + "net" "strconv" "strings" "time" @@ -92,6 +95,8 @@ func fullsyncCommand(c *client) error { return nil } +var dummyBuf = make([]byte, 8) + func syncCommand(c *client) error { args := c.args if len(args) != 1 { @@ -107,23 +112,69 @@ func syncCommand(c *client) error { c.lastLogID = logId - 1 - if c.ack != nil && logId > c.ack.id { - asyncNotifyUint64(c.ack.ch, logId) - c.ack = nil + stat, err := c.app.ldb.ReplicationStat() + if err != nil { + return err + } + + if c.lastLogID > stat.LastID { + return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID) + } else if c.lastLogID == stat.LastID { + c.app.slaveAck(c) } c.syncBuf.Reset() + c.syncBuf.Write(dummyBuf) + if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil { return err } else { buf := c.syncBuf.Bytes() + stat, err = c.app.ldb.ReplicationStat() + if err != nil { + return err + } + + binary.BigEndian.PutUint64(buf, stat.LastID) + c.resp.writeBulk(buf) } - c.app.addSlave(c) + return nil +} +//inner command, only for replication +//REPLCONF