optimize fullsync

This commit is contained in:
siddontang 2014-10-11 17:44:31 +08:00
parent 4a1c74cb44
commit f907234638
9 changed files with 72 additions and 87 deletions

View File

@ -14,7 +14,7 @@ var port = flag.Int("port", 6380, "ledis server port")
var sock = flag.String("sock", "", "ledis unix socket domain") var sock = flag.String("sock", "", "ledis unix socket domain")
var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") var dumpFile = flag.String("o", "./ledis.dump", "dump file to save")
var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync
func main() { func main() {
flag.Parse() flag.Parse()

View File

@ -129,6 +129,7 @@ func NewConfigDefault() *Config {
cfg.Replication.Compression = true cfg.Replication.Compression = true
cfg.Replication.WaitMaxSlaveAcks = 2 cfg.Replication.WaitMaxSlaveAcks = 2
cfg.Replication.SyncLog = 0 cfg.Replication.SyncLog = 0
cfg.Snapshot.MaxNum = 1
cfg.adjust() cfg.adjust()

View File

@ -81,7 +81,7 @@ compression = true
[snapshot] [snapshot]
# Path to store snapshot dump file # Path to store snapshot dump file
# if not set, use data_dir/snapshot # if not set, use data_dir/snapshot
# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp # snapshot file name format is dmp-2006-01-02T15:04:05.999999999
path = "" path = ""
# Reserve newest max_num snapshot dump files # Reserve newest max_num snapshot dump files

View File

@ -90,7 +90,7 @@
"readonly": false "readonly": false
}, },
"FULLSYNC": { "FULLSYNC": {
"arguments": "-", "arguments": "[NEW]",
"group": "Replication", "group": "Replication",
"readonly": false "readonly": false

View File

@ -123,7 +123,7 @@ Table of Contents
- [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count) - [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count)
- [Replication](#replication) - [Replication](#replication)
- [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly) - [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly)
- [FULLSYNC](#fullsync) - [FULLSYNC [NEW]](#fullsync-new)
- [SYNC logid](#sync-logid) - [SYNC logid](#sync-logid)
- [Server](#server) - [Server](#server)
- [PING](#ping) - [PING](#ping)
@ -2477,12 +2477,14 @@ If the server is already master, `SLAVEOF NO ONE READONLY` will force the server
If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1.
### FULLSYNC ### FULLSYNC [NEW]
Inner command, starts a fullsync from the master set by SLAVEOF. Inner command, starts a fullsync from the master set by SLAVEOF.
FULLSYNC will first try to sync all data from the master, save in local disk, then discard old dataset and load new one. FULLSYNC will first try to sync all data from the master, save in local disk, then discard old dataset and load new one.
`FULLSYNC NEW` will generate a new snapshot and sync, otherwise it will use the latest existing snapshot if possible.
**Return value** **Return value**
**Examples** **Examples**

View File

@ -34,6 +34,8 @@ type App struct {
// handle slaves // handle slaves
slock sync.Mutex slock sync.Mutex
slaves map[*client]struct{} slaves map[*client]struct{}
snap *snapshotStore
} }
func netType(s string) string { func netType(s string) string {
@ -88,6 +90,10 @@ func NewApp(cfg *config.Config) (*App, error) {
} }
} }
if app.snap, err = newSnapshotStore(cfg); err != nil {
return nil, err
}
if len(app.cfg.SlaveOf) > 0 { if len(app.cfg.SlaveOf) > 0 {
//slave must readonly //slave must readonly
app.cfg.Readonly = true app.cfg.Readonly = true
@ -125,6 +131,8 @@ func (app *App) Close() {
app.m.Close() app.m.Close()
app.snap.Close()
if app.access != nil { if app.access != nil {
app.access.Close() app.access.Close()
} }

View File

@ -4,10 +4,9 @@ import (
"fmt" "fmt"
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"io/ioutil"
"os"
"strconv" "strconv"
"strings" "strings"
"time"
) )
func slaveofCommand(c *client) error { func slaveofCommand(c *client) error {
@ -49,27 +48,46 @@ func slaveofCommand(c *client) error {
} }
func fullsyncCommand(c *client) error { func fullsyncCommand(c *client) error {
//todo, multi fullsync may use same dump file args := c.args
dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_") needNew := false
if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" {
needNew = true
}
var s *snapshot
var err error
var t time.Time
dumper := c.app.ldb
if needNew {
s, t, err = c.app.snap.Create(dumper)
} else {
if s, t, err = c.app.snap.OpenLatest(); err != nil {
return err
} else if s == nil {
s, t, err = c.app.snap.Create(dumper)
} else {
gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2
minT := time.Now().Add(-gap)
//snapshot is too old
if t.Before(minT) {
s.Close()
s, t, err = c.app.snap.Create(dumper)
}
}
}
if err != nil { if err != nil {
return err return err
} }
if err = c.app.ldb.Dump(dumpFile); err != nil { n := s.Size()
return err
}
st, _ := dumpFile.Stat() c.resp.writeBulkFrom(n, s)
n := st.Size()
dumpFile.Seek(0, os.SEEK_SET) s.Close()
c.resp.writeBulkFrom(n, dumpFile)
name := dumpFile.Name()
dumpFile.Close()
os.Remove(name)
return nil return nil
} }

View File

@ -28,12 +28,13 @@ type snapshotStore struct {
} }
func snapshotName(t time.Time) string { func snapshotName(t time.Time) string {
return fmt.Sprintf("snap-%s.dmp", t.Format(snapshotTimeFormat)) return fmt.Sprintf("dmp-%s", t.Format(snapshotTimeFormat))
} }
func parseSnapshotName(name string) (time.Time, error) { func parseSnapshotName(name string) (time.Time, error) {
var timeString string var timeString string
if _, err := fmt.Sscanf(name, "snap-%s.dmp", &timeString); err != nil { if _, err := fmt.Sscanf(name, "dmp-%s", &timeString); err != nil {
println(err.Error())
return time.Time{}, err return time.Time{}, err
} }
when, err := time.Parse(snapshotTimeFormat, timeString) when, err := time.Parse(snapshotTimeFormat, timeString)
@ -88,7 +89,7 @@ func (s *snapshotStore) checkSnapshots() error {
} }
if _, err := parseSnapshotName(info.Name()); err != nil { if _, err := parseSnapshotName(info.Name()); err != nil {
log.Error("invalid snapshot file name %s, err: %s, try remove", info.Name(), err.Error()) log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error())
continue continue
} }
@ -161,8 +162,6 @@ type snapshot struct {
io.ReadCloser io.ReadCloser
f *os.File f *os.File
temp bool
} }
func (st *snapshot) Read(b []byte) (int, error) { func (st *snapshot) Read(b []byte) (int, error) {
@ -170,15 +169,7 @@ func (st *snapshot) Read(b []byte) (int, error) {
} }
func (st *snapshot) Close() error { func (st *snapshot) Close() error {
if st.temp { return st.f.Close()
name := st.f.Name()
if err := st.f.Close(); err != nil {
return err
}
return os.Remove(name)
} else {
return st.f.Close()
}
} }
func (st *snapshot) Size() int64 { func (st *snapshot) Size() int64 {
@ -186,20 +177,18 @@ func (st *snapshot) Size() int64 {
return s.Size() return s.Size()
} }
func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Time, error) { func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if !temp { s.purge(true)
s.purge(true)
}
now := time.Now() now := time.Now()
name := snapshotName(now) name := snapshotName(now)
tmpName := name + ".tmp" tmpName := name + ".tmp"
if len(s.names) > 0 && !temp { if len(s.names) > 0 {
lastTime, _ := parseSnapshotName(s.names[len(s.names)-1]) lastTime, _ := parseSnapshotName(s.names[len(s.names)-1])
if !now.After(lastTime) { if !now.After(lastTime) {
return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ",
@ -218,26 +207,17 @@ func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Tim
return nil, time.Time{}, err return nil, time.Time{}, err
} }
if temp { f.Close()
if err := f.Sync(); err != nil { if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil {
f.Close() return nil, time.Time{}, err
return nil, time.Time{}, err
}
f.Seek(0, os.SEEK_SET)
} else {
f.Close()
if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil {
return nil, time.Time{}, err
}
if f, err = os.Open(s.snapshotPath(name)); err != nil {
return nil, time.Time{}, err
}
s.names = append(s.names, name)
} }
return &snapshot{f: f, temp: temp}, now, nil if f, err = os.Open(s.snapshotPath(name)); err != nil {
return nil, time.Time{}, err
}
s.names = append(s.names, name)
return &snapshot{f: f}, now, nil
} }
func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) {
@ -256,5 +236,5 @@ func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) {
return nil, time.Time{}, err return nil, time.Time{}, err
} }
return &snapshot{f: f, temp: false}, t, err return &snapshot{f: f}, t, err
} }

View File

@ -30,7 +30,7 @@ func TestSnapshot(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if f, _, err := s.Create(d, false); err != nil { if f, _, err := s.Create(d); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
defer f.Close() defer f.Close()
@ -43,7 +43,7 @@ func TestSnapshot(t *testing.T) {
} }
} }
if f, _, err := s.Create(d, false); err != nil { if f, _, err := s.Create(d); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
defer f.Close() defer f.Close()
@ -55,7 +55,7 @@ func TestSnapshot(t *testing.T) {
} }
} }
if f, _, err := s.Create(d, false); err != nil { if f, _, err := s.Create(d); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
defer f.Close() defer f.Close()
@ -73,29 +73,5 @@ func TestSnapshot(t *testing.T) {
t.Fatal("must 2 snapshot") t.Fatal("must 2 snapshot")
} }
if f, _, err := s.Create(d, true); err != nil {
t.Fatal(err)
} else {
if b, _ := ioutil.ReadAll(f); string(b) != "hello world" {
t.Fatal("invalid read snapshot")
}
if len(s.names) != 2 {
t.Fatal("must 2 snapshot")
}
fs, _ = ioutil.ReadDir(cfg.Snapshot.Path)
if len(fs) != 3 {
t.Fatal("must 3 snapshot")
}
f.Close()
}
fs, _ = ioutil.ReadDir(cfg.Snapshot.Path)
if len(fs) != 2 {
t.Fatal("must 2 snapshot")
}
s.Close() s.Close()
} }