diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go index b6a798d..6f9bac4 100644 --- a/cmd/ledis-dump/main.go +++ b/cmd/ledis-dump/main.go @@ -14,7 +14,7 @@ var port = flag.Int("port", 6380, "ledis server port") var sock = flag.String("sock", "", "ledis unix socket domain") var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") -var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync +var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync func main() { flag.Parse() diff --git a/config/config.go b/config/config.go index 23a33de..aa74c9d 100644 --- a/config/config.go +++ b/config/config.go @@ -129,6 +129,7 @@ func NewConfigDefault() *Config { cfg.Replication.Compression = true cfg.Replication.WaitMaxSlaveAcks = 2 cfg.Replication.SyncLog = 0 + cfg.Snapshot.MaxNum = 1 cfg.adjust() diff --git a/config/config.toml b/config/config.toml index 27de6c5..8a8cfba 100644 --- a/config/config.toml +++ b/config/config.toml @@ -81,7 +81,7 @@ compression = true [snapshot] # Path to store snapshot dump file # if not set, use data_dir/snapshot -# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp +# snapshot file name format is dmp-2006-01-02T15:04:05.999999999 path = "" # Reserve newest max_num snapshot dump files diff --git a/doc/commands.json b/doc/commands.json index 7f354c9..8f17223 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -90,7 +90,7 @@ "readonly": false }, "FULLSYNC": { - "arguments": "-", + "arguments": "[NEW]", "group": "Replication", "readonly": false diff --git a/doc/commands.md b/doc/commands.md index 8337992..4c9f3bc 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -123,7 +123,7 @@ Table of Contents - [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count) - [Replication](#replication) - [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly) - - [FULLSYNC](#fullsync) + - [FULLSYNC [NEW]](#fullsync-new) - [SYNC logid](#sync-logid) - [Server](#server) - [PING](#ping) @@ -2477,12 +2477,14 @@ If the server is already master, `SLAVEOF NO ONE READONLY` will force the server If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. -### FULLSYNC +### FULLSYNC [NEW] Inner command, starts a fullsync from the master set by SLAVEOF. FULLSYNC will first try to sync all data from the master, save in local disk, then discard old dataset and load new one. +`FULLSYNC NEW` will generate a new snapshot and sync, otherwise it will use the latest existing snapshot if possible. + **Return value** **Examples** diff --git a/server/app.go b/server/app.go index 0a253ac..aeb71a4 100644 --- a/server/app.go +++ b/server/app.go @@ -34,6 +34,8 @@ type App struct { // handle slaves slock sync.Mutex slaves map[*client]struct{} + + snap *snapshotStore } func netType(s string) string { @@ -88,6 +90,10 @@ func NewApp(cfg *config.Config) (*App, error) { } } + if app.snap, err = newSnapshotStore(cfg); err != nil { + return nil, err + } + if len(app.cfg.SlaveOf) > 0 { //slave must readonly app.cfg.Readonly = true @@ -125,6 +131,8 @@ func (app *App) Close() { app.m.Close() + app.snap.Close() + if app.access != nil { app.access.Close() } diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 128e4af..5526d5c 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -4,10 +4,9 @@ import ( "fmt" "github.com/siddontang/go/hack" "github.com/siddontang/ledisdb/ledis" - "io/ioutil" - "os" "strconv" "strings" + "time" ) func slaveofCommand(c *client) error { @@ -49,27 +48,46 @@ func slaveofCommand(c *client) error { } func fullsyncCommand(c *client) error { - //todo, multi fullsync may use same dump file - dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_") + args := c.args + needNew := false + if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" { + needNew = true + } + + var s *snapshot + var err error + var t time.Time + + dumper := c.app.ldb + + if needNew { + s, t, err = c.app.snap.Create(dumper) + } else { + if s, t, err = c.app.snap.OpenLatest(); err != nil { + return err + } else if s == nil { + s, t, err = c.app.snap.Create(dumper) + } else { + gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2 + minT := time.Now().Add(-gap) + + //snapshot is too old + if t.Before(minT) { + s.Close() + s, t, err = c.app.snap.Create(dumper) + } + } + } + if err != nil { return err } - if err = c.app.ldb.Dump(dumpFile); err != nil { - return err - } + n := s.Size() - st, _ := dumpFile.Stat() - n := st.Size() + c.resp.writeBulkFrom(n, s) - dumpFile.Seek(0, os.SEEK_SET) - - c.resp.writeBulkFrom(n, dumpFile) - - name := dumpFile.Name() - dumpFile.Close() - - os.Remove(name) + s.Close() return nil } diff --git a/server/snapshot.go b/server/snapshot.go index 1643160..7240c7a 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -28,12 +28,13 @@ type snapshotStore struct { } func snapshotName(t time.Time) string { - return fmt.Sprintf("snap-%s.dmp", t.Format(snapshotTimeFormat)) + return fmt.Sprintf("dmp-%s", t.Format(snapshotTimeFormat)) } func parseSnapshotName(name string) (time.Time, error) { var timeString string - if _, err := fmt.Sscanf(name, "snap-%s.dmp", &timeString); err != nil { + if _, err := fmt.Sscanf(name, "dmp-%s", &timeString); err != nil { + println(err.Error()) return time.Time{}, err } when, err := time.Parse(snapshotTimeFormat, timeString) @@ -88,7 +89,7 @@ func (s *snapshotStore) checkSnapshots() error { } if _, err := parseSnapshotName(info.Name()); err != nil { - log.Error("invalid snapshot file name %s, err: %s, try remove", info.Name(), err.Error()) + log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error()) continue } @@ -161,8 +162,6 @@ type snapshot struct { io.ReadCloser f *os.File - - temp bool } func (st *snapshot) Read(b []byte) (int, error) { @@ -170,15 +169,7 @@ func (st *snapshot) Read(b []byte) (int, error) { } func (st *snapshot) Close() error { - if st.temp { - name := st.f.Name() - if err := st.f.Close(); err != nil { - return err - } - return os.Remove(name) - } else { - return st.f.Close() - } + return st.f.Close() } func (st *snapshot) Size() int64 { @@ -186,20 +177,18 @@ func (st *snapshot) Size() int64 { return s.Size() } -func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Time, error) { +func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) { s.Lock() defer s.Unlock() - if !temp { - s.purge(true) - } + s.purge(true) now := time.Now() name := snapshotName(now) tmpName := name + ".tmp" - if len(s.names) > 0 && !temp { + if len(s.names) > 0 { lastTime, _ := parseSnapshotName(s.names[len(s.names)-1]) if !now.After(lastTime) { return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", @@ -218,26 +207,17 @@ func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Tim return nil, time.Time{}, err } - if temp { - if err := f.Sync(); err != nil { - f.Close() - return nil, time.Time{}, err - } - - f.Seek(0, os.SEEK_SET) - } else { - f.Close() - if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil { - return nil, time.Time{}, err - } - - if f, err = os.Open(s.snapshotPath(name)); err != nil { - return nil, time.Time{}, err - } - s.names = append(s.names, name) + f.Close() + if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err } - return &snapshot{f: f, temp: temp}, now, nil + if f, err = os.Open(s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err + } + s.names = append(s.names, name) + + return &snapshot{f: f}, now, nil } func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { @@ -256,5 +236,5 @@ func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { return nil, time.Time{}, err } - return &snapshot{f: f, temp: false}, t, err + return &snapshot{f: f}, t, err } diff --git a/server/snapshot_test.go b/server/snapshot_test.go index 5e9ba46..d1e9230 100644 --- a/server/snapshot_test.go +++ b/server/snapshot_test.go @@ -30,7 +30,7 @@ func TestSnapshot(t *testing.T) { t.Fatal(err) } - if f, _, err := s.Create(d, false); err != nil { + if f, _, err := s.Create(d); err != nil { t.Fatal(err) } else { defer f.Close() @@ -43,7 +43,7 @@ func TestSnapshot(t *testing.T) { } } - if f, _, err := s.Create(d, false); err != nil { + if f, _, err := s.Create(d); err != nil { t.Fatal(err) } else { defer f.Close() @@ -55,7 +55,7 @@ func TestSnapshot(t *testing.T) { } } - if f, _, err := s.Create(d, false); err != nil { + if f, _, err := s.Create(d); err != nil { t.Fatal(err) } else { defer f.Close() @@ -73,29 +73,5 @@ func TestSnapshot(t *testing.T) { t.Fatal("must 2 snapshot") } - if f, _, err := s.Create(d, true); err != nil { - t.Fatal(err) - } else { - if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { - t.Fatal("invalid read snapshot") - } - - if len(s.names) != 2 { - t.Fatal("must 2 snapshot") - } - - fs, _ = ioutil.ReadDir(cfg.Snapshot.Path) - if len(fs) != 3 { - t.Fatal("must 3 snapshot") - } - - f.Close() - } - - fs, _ = ioutil.ReadDir(cfg.Snapshot.Path) - if len(fs) != 2 { - t.Fatal("must 2 snapshot") - } - s.Close() }