From c3824552b66dfccccfc048dc6cbcc6eac8132e8c Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 21 Oct 2014 17:35:03 +0800 Subject: [PATCH 1/4] add replconf for replication --- server/app.go | 8 ++-- server/client.go | 4 +- server/cmd_replication.go | 41 ++++++++++++++-- server/info.go | 6 ++- server/replication.go | 99 ++++++++++++++++++++++++++------------- server/util.go | 24 ++++++++-- 6 files changed, 134 insertions(+), 48 deletions(-) diff --git a/server/app.go b/server/app.go index aeb71a4..0e03b3f 100644 --- a/server/app.go +++ b/server/app.go @@ -32,8 +32,9 @@ type App struct { s *script // handle slaves - slock sync.Mutex - slaves map[*client]struct{} + slock sync.Mutex + slaves map[string]*client + slaveSyncAck chan uint64 snap *snapshotStore } @@ -60,7 +61,8 @@ func NewApp(cfg *config.Config) (*App, error) { app.cfg = cfg - app.slaves = make(map[*client]struct{}) + app.slaves = make(map[string]*client) + app.slaveSyncAck = make(chan uint64) var err error diff --git a/server/client.go b/server/client.go index ef9de76..ba24821 100644 --- a/server/client.go +++ b/server/client.go @@ -64,8 +64,6 @@ type client struct { lastLogID uint64 - ack *syncAck - reqErr chan error buf bytes.Buffer @@ -73,6 +71,8 @@ type client struct { tx *ledis.Tx script *ledis.Multi + + slaveListeningAddr string } func newClient(app *App) *client { diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 5526d5c..20c6982 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -3,7 +3,9 @@ package server import ( "fmt" "github.com/siddontang/go/hack" + "github.com/siddontang/go/num" "github.com/siddontang/ledisdb/ledis" + "net" "strconv" "strings" "time" @@ -107,10 +109,7 @@ func syncCommand(c *client) error { c.lastLogID = logId - 1 - if c.ack != nil && logId > c.ack.id { - asyncNotifyUint64(c.ack.ch, logId) - c.ack = nil - } + c.app.slaveAck(c) c.syncBuf.Reset() @@ -122,8 +121,39 @@ func syncCommand(c *client) error { c.resp.writeBulk(buf) } - c.app.addSlave(c) + return nil +} +//inner command, only for replication +//REPLCONF