From 45a4dd1abc5bf9185b1224373e4d95a6c4c7dc41 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 9 Oct 2014 11:03:58 +0800 Subject: [PATCH 01/13] info log id returns 0 if no replication --- server/info.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/info.go b/server/info.go index 21c989b..b60db65 100644 --- a/server/info.go +++ b/server/info.go @@ -165,6 +165,10 @@ func (i *info) dumpReplication(buf *bytes.Buffer) { p = append(p, infoPair{"last_log_id", s.LastID}) p = append(p, infoPair{"first_log_id", s.FirstID}) p = append(p, infoPair{"commit_log_id", s.CommitID}) + } else { + p = append(p, infoPair{"last_log_id", 0}) + p = append(p, infoPair{"first_log_id", 0}) + p = append(p, infoPair{"commit_log_id", 0}) } i.dumpPairs(buf, p...) From 137bd19951fa898c83c706e170d971e9389feb64 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 9 Oct 2014 11:04:58 +0800 Subject: [PATCH 02/13] separate bootstrap from makefile --- Makefile | 2 -- README.md | 3 +++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 6764157..cfdddda 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,5 @@ INSTALL_PATH ?= $(CURDIR) -$(shell ./bootstrap.sh >> /dev/null 2>&1) - $(shell ./tools/build_config.sh build_config.mk $INSTALL_PATH) include build_config.mk diff --git a/README.md b/README.md index 2f12a55..0ebe2a6 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,9 @@ Create a workspace and checkout ledisdb source cd src/github.com/siddontang/ledisdb + #install go dependences + ./bootstrap.sh + #set build and run environment source dev.sh From 884de89cb8b0534f892b7df76bf80ba82e04133f Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 9 Oct 2014 11:47:14 +0800 Subject: [PATCH 03/13] replication add log --- server/replication.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/replication.go b/server/replication.go index b8b1868..1f12b93 100644 --- a/server/replication.go +++ b/server/replication.go @@ -287,7 +287,10 @@ func (app *App) removeSlave(c *client) { app.slock.Lock() defer app.slock.Unlock() - delete(app.slaves, c) + if _, ok := app.slaves[c]; ok { + delete(app.slaves, c) + log.Info("remove slave %s", c.remoteAddr) + } if c.ack != nil { asyncNotifyUint64(c.ack.ch, c.lastLogID) @@ -313,7 +316,7 @@ func (app *App) publishNewLog(l *rpl.Log) { logId := l.ID for s, _ := range app.slaves { if s.lastLogID >= logId { - //slave has already this log + //slave has already owned this log ss = []*client{} break } else { From 761431cf5bc9ea6cd511538ef2d49518488e79cb Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 9 Oct 2014 13:05:55 +0800 Subject: [PATCH 04/13] add sync interface --- store/boltdb/db.go | 11 +++++++++++ store/boltdb/tx.go | 4 ++++ store/driver/batch.go | 5 +++++ store/driver/driver.go | 4 ++++ store/goleveldb/batch.go | 4 ++++ store/goleveldb/db.go | 13 +++++++++++++ store/hyperleveldb/batch.go | 4 ++++ store/hyperleveldb/db.go | 13 +++++++++++++ store/leveldb/batch.go | 4 ++++ store/leveldb/db.go | 13 +++++++++++++ store/mdb/mdb.go | 24 ++++++++++++++++++++++++ store/mdb/tx.go | 3 +++ store/rocksdb/batch.go | 4 ++++ store/rocksdb/db.go | 13 +++++++++++++ 14 files changed, 119 insertions(+) diff --git a/store/boltdb/db.go b/store/boltdb/db.go index 15a0570..ac8cc03 100644 --- a/store/boltdb/db.go +++ b/store/boltdb/db.go @@ -98,7 +98,14 @@ func (db *DB) Delete(key []byte) error { return b.Delete(key) }) return err +} +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.Put(key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.Delete(key) } func (db *DB) NewIterator() driver.IIterator { @@ -152,6 +159,10 @@ func (db *DB) BatchPut(writes []driver.Write) error { return err } +func (db *DB) SyncBatchPut(writes []driver.Write) error { + return db.BatchPut(writes) +} + func (db *DB) Compact() error { return nil } diff --git a/store/boltdb/tx.go b/store/boltdb/tx.go index 058a5f7..8dba000 100644 --- a/store/boltdb/tx.go +++ b/store/boltdb/tx.go @@ -48,6 +48,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error { return nil } +func (t *Tx) SyncBatchPut(writes []driver.Write) error { + return t.BatchPut(writes) +} + func (t *Tx) Rollback() error { return t.tx.Rollback() } diff --git a/store/driver/batch.go b/store/driver/batch.go index 6b79c21..5fc461f 100644 --- a/store/driver/batch.go +++ b/store/driver/batch.go @@ -2,6 +2,7 @@ package driver type BatchPuter interface { BatchPut([]Write) error + SyncBatchPut([]Write) error } type Write struct { @@ -29,6 +30,10 @@ func (w *WriteBatch) Commit() error { return w.batch.BatchPut(w.wb) } +func (w *WriteBatch) SyncCommit() error { + return w.batch.SyncBatchPut(w.wb) +} + func (w *WriteBatch) Rollback() error { w.wb = w.wb[0:0] return nil diff --git a/store/driver/driver.go b/store/driver/driver.go index 859b840..e4312ce 100644 --- a/store/driver/driver.go +++ b/store/driver/driver.go @@ -16,6 +16,9 @@ type IDB interface { Put(key []byte, value []byte) error Delete(key []byte) error + SyncPut(key []byte, value []byte) error + SyncDelete(key []byte) error + NewIterator() IIterator NewWriteBatch() IWriteBatch @@ -53,6 +56,7 @@ type IWriteBatch interface { Put(key []byte, value []byte) Delete(key []byte) Commit() error + SyncCommit() error Rollback() error } diff --git a/store/goleveldb/batch.go b/store/goleveldb/batch.go index 74902b2..85b78c6 100644 --- a/store/goleveldb/batch.go +++ b/store/goleveldb/batch.go @@ -21,6 +21,10 @@ func (w *WriteBatch) Commit() error { return w.db.db.Write(w.wbatch, nil) } +func (w *WriteBatch) SyncCommit() error { + return w.db.db.Write(w.wbatch, w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { w.wbatch.Reset() return nil diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go index dad6a90..5e76f74 100644 --- a/store/goleveldb/db.go +++ b/store/goleveldb/db.go @@ -41,6 +41,8 @@ type DB struct { iteratorOpts *opt.ReadOptions + syncOpts *opt.WriteOptions + cache cache.Cache filter filter.Filter @@ -102,6 +104,9 @@ func (db *DB) initOpts() { db.iteratorOpts = &opt.ReadOptions{} db.iteratorOpts.DontFillCache = true + + db.syncOpts = &opt.WriteOptions{} + db.syncOpts.Sync = true } func newOptions(cfg *config.LevelDBConfig) *opt.Options { @@ -147,6 +152,14 @@ func (db *DB) Delete(key []byte) error { return db.db.Delete(key, nil) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.db.Put(key, value, db.syncOpts) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.db.Delete(key, db.syncOpts) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/hyperleveldb/batch.go b/store/hyperleveldb/batch.go index 149f0b4..8084335 100644 --- a/store/hyperleveldb/batch.go +++ b/store/hyperleveldb/batch.go @@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) return nil diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go index 071dd38..0f13a0b 100644 --- a/store/hyperleveldb/db.go +++ b/store/hyperleveldb/db.go @@ -81,6 +81,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -132,6 +134,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -171,6 +176,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/leveldb/batch.go b/store/leveldb/batch.go index 08a7d46..caadc03 100644 --- a/store/leveldb/batch.go +++ b/store/leveldb/batch.go @@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) return nil diff --git a/store/leveldb/db.go b/store/leveldb/db.go index ab29928..3705f78 100644 --- a/store/leveldb/db.go +++ b/store/leveldb/db.go @@ -81,6 +81,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -132,6 +134,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -171,6 +176,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go index ca79706..6bf26f6 100644 --- a/store/mdb/mdb.go +++ b/store/mdb/mdb.go @@ -122,6 +122,14 @@ func (db MDB) BatchPut(writes []driver.Write) error { return itr.err } +func (db MDB) SyncBatchPut(writes []driver.Write) error { + if err := db.BatchPut(writes); err != nil { + return err + } + + return db.env.Sync(1) +} + func (db MDB) Get(key []byte) ([]byte, error) { tx, err := db.env.BeginTxn(nil, mdb.RDONLY) if err != nil { @@ -148,6 +156,22 @@ func (db MDB) Delete(key []byte) error { return itr.Error() } +func (db MDB) SyncPut(key []byte, value []byte) error { + if err := db.Put(key, value); err != nil { + return err + } + + return db.env.Sync(1) +} + +func (db MDB) SyncDelete(key []byte) error { + if err := db.Delete(key); err != nil { + return err + } + + return db.env.Sync(1) +} + type MDBIterator struct { key []byte value []byte diff --git a/store/mdb/tx.go b/store/mdb/tx.go index e98a5f6..e2e3be0 100644 --- a/store/mdb/tx.go +++ b/store/mdb/tx.go @@ -74,7 +74,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error { itr.setState() return itr.Close() +} +func (t *Tx) SyncBatchPut(writes []driver.Write) error { + return t.BatchPut(writes) } func (t *Tx) Rollback() error { diff --git a/store/rocksdb/batch.go b/store/rocksdb/batch.go index b69c383..017fc88 100644 --- a/store/rocksdb/batch.go +++ b/store/rocksdb/batch.go @@ -45,6 +45,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.rocksdb_writebatch_clear(w.wbatch) return nil diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go index f3fb406..f5cecf4 100644 --- a/store/rocksdb/db.go +++ b/store/rocksdb/db.go @@ -85,6 +85,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -152,6 +154,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -197,6 +202,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, From 848fbf34ad2278504442aa57beda6875ee2267af Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 9 Oct 2014 15:05:15 +0800 Subject: [PATCH 05/13] add sync log config --- config/config.go | 3 ++- config/config.toml | 12 +++++++++--- etc/ledis.conf | 12 +++++++++--- rpl/goleveldb_store.go | 14 +++++++++++++- 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index a90e40c..86e54b4 100644 --- a/config/config.go +++ b/config/config.go @@ -37,10 +37,11 @@ type LMDBConfig struct { type ReplicationConfig struct { Path string `toml:"path"` - ExpiredLogDays int `toml:"expired_log_days"` Sync bool `toml:"sync"` WaitSyncTime int `toml:"wait_sync_time"` WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"` + ExpiredLogDays int `toml:"expired_log_days"` + SyncLog int `toml:"sync_log"` Compression bool `toml:"compression"` } diff --git a/config/config.toml b/config/config.toml index b8d80ec..0889933 100644 --- a/config/config.toml +++ b/config/config.toml @@ -50,9 +50,6 @@ nosync = true # if not set, use data_dir/rpl path = "" -# Expire write ahead logs after the given days -expired_log_days = 7 - # If sync is true, the new log must be sent to some slaves, and then commit. # It will reduce performance but have better high availability. sync = true @@ -65,5 +62,14 @@ wait_sync_time = 1 # If 0, wait (n + 1) / 2 acks. wait_max_slave_acks = 2 +# Expire write ahead logs after the given days +expired_log_days = 7 + +# Sync log to disk if possible +# 0: no sync +# 1: sync every second +# 2: sync every commit +sync_log = 0 + # Compress the log or not compression = true diff --git a/etc/ledis.conf b/etc/ledis.conf index b8d80ec..0889933 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -50,9 +50,6 @@ nosync = true # if not set, use data_dir/rpl path = "" -# Expire write ahead logs after the given days -expired_log_days = 7 - # If sync is true, the new log must be sent to some slaves, and then commit. # It will reduce performance but have better high availability. sync = true @@ -65,5 +62,14 @@ wait_sync_time = 1 # If 0, wait (n + 1) / 2 acks. wait_max_slave_acks = 2 +# Expire write ahead logs after the given days +expired_log_days = 7 + +# Sync log to disk if possible +# 0: no sync +# 1: sync every second +# 2: sync every commit +sync_log = 0 + # Compress the log or not compression = true diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index f9d2a7e..1c70603 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -21,6 +21,8 @@ type GoLevelDBStore struct { first uint64 last uint64 + + lastCommit time.Time } func (s *GoLevelDBStore) FirstID() (uint64, error) { @@ -132,7 +134,17 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { w.Put(key, buf.Bytes()) } - if err := w.Commit(); err != nil { + n := time.Now() + if s.cfg.Replication.SyncLog == 2 || + (s.cfg.Replication.SyncLog == 1 && n.Sub(s.lastCommit) > time.Second) { + err = w.SyncCommit() + } else { + err = w.Commit() + } + + s.lastCommit = n + + if err != nil { return err } From 855f0a3477a7a92caecabb770d40e72474029114 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 10 Oct 2014 09:49:16 +0800 Subject: [PATCH 06/13] add readonly support --- cmd/ledis-cli/const.go | 4 ++-- cmd/ledis-server/main.go | 9 +++++++++ config/config.go | 3 +++ config/config.toml | 4 ++++ doc/commands.json | 2 +- doc/commands.md | 12 +++++++----- etc/ledis.conf | 4 ++++ ledis/const.go | 5 ----- ledis/ledis.go | 16 ++-------------- ledis/replication.go | 2 +- ledis/replication_test.go | 3 ++- server/app.go | 7 +++---- server/cmd_replication.go | 9 +++++---- server/cmd_replication_test.go | 4 ++-- server/info.go | 7 ++++--- server/replication.go | 12 +++++++++--- 16 files changed, 58 insertions(+), 45 deletions(-) diff --git a/cmd/ledis-cli/const.go b/cmd/ledis-cli/const.go index f03b249..19c5ee9 100644 --- a/cmd/ledis-cli/const.go +++ b/cmd/ledis-cli/const.go @@ -1,4 +1,4 @@ -//This file was generated by .tools/generate_commands.py on Wed Oct 08 2014 16:36:20 +0800 +//This file was generated by .tools/generate_commands.py on Fri Oct 10 2014 09:08:54 +0800 package main var helpCommands = [][]string{ @@ -87,7 +87,7 @@ var helpCommands = [][]string{ {"SINTER", "key [key ...]", "Set"}, {"SINTERSTORE", "destination key [key ...]", "Set"}, {"SISMEMBER", "key member", "Set"}, - {"SLAVEOF", "host port [restart]", "Replication"}, + {"SLAVEOF", "host port [RESTART] [READONLY]", "Replication"}, {"SMCLEAR", "key [key ...]", "Set"}, {"SMEMBERS", "key", "Set"}, {"SPERSIST", "key", "Set"}, diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index 448120c..c03bb84 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -18,6 +18,8 @@ var configFile = flag.String("config", "", "ledisdb config file") var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") var usePprof = flag.Bool("pprof", false, "enable pprof") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") +var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") +var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -43,6 +45,13 @@ func main() { cfg.DBName = *dbName } + if len(*slaveof) > 0 { + cfg.SlaveOf = *slaveof + cfg.Readonly = true + } else { + cfg.Readonly = *readonly + } + var app *server.App app, err = server.NewApp(cfg) if err != nil { diff --git a/config/config.go b/config/config.go index 86e54b4..589a149 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,8 @@ type Config struct { SlaveOf string `toml:"slaveof"` + Readonly bool `toml:readonly` + DataDir string `toml:"data_dir"` DBName string `toml:"db_name"` @@ -106,6 +108,7 @@ func NewConfigDefault() *Config { cfg.DBName = DefaultDBName cfg.SlaveOf = "" + cfg.Readonly = false // disable access log cfg.AccessLog = "" diff --git a/config/config.toml b/config/config.toml index 0889933..1996303 100644 --- a/config/config.toml +++ b/config/config.toml @@ -16,6 +16,10 @@ access_log = "" # Any write operations except flushall and replication will be disabled in slave mode. slaveof = "" +# Readonly mode, slave server is always readonly even readonly = false +# for readonly mode, only replication and flushall can write +readonly = false + # Choose which backend storage to use, now support: # # leveldb diff --git a/doc/commands.json b/doc/commands.json index 8268ee8..7f354c9 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -301,7 +301,7 @@ "readonly": false }, "SLAVEOF": { - "arguments": "host port [restart]", + "arguments": "host port [RESTART] [READONLY]", "group": "Replication", "readonly": false }, diff --git a/doc/commands.md b/doc/commands.md index c01b67c..8337992 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -122,7 +122,7 @@ Table of Contents - [BPERSIST key](#bpersist-key) - [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count) - [Replication](#replication) - - [SLAVEOF host port [restart]](#slaveof-host-port-restart) + - [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly) - [FULLSYNC](#fullsync) - [SYNC logid](#sync-logid) - [Server](#server) @@ -2466,13 +2466,15 @@ See [XSCAN](#xscan-key-match-match-count-count) for more information. ## Replication -### SLAVEOF host port [restart] +### SLAVEOF host port [RESTART] [READONLY] -Changes the replication settings of a slave on the fly. If the server is already acting as slave, SLAVEOF NO ONE will turn off the replication. +Changes the replication settings of a slave on the fly. If the server is already acting as slave, `SLAVEOF NO ONE` will turn off the replication and turn the server into master. `SLAVEOF NO ONE READONLY` will turn the server into master with readonly mode. -SLAVEOF host port will make the server a slave of another server listening at the specified host and port. +If the server is already master, `SLAVEOF NO ONE READONLY` will force the server to readonly mode, and `SLAVEOF NO ONE` will disable readonly. -If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, if restart is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. +`SLAVEOF host port` will make the server a slave of another server listening at the specified host and port. + +If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. ### FULLSYNC diff --git a/etc/ledis.conf b/etc/ledis.conf index 0889933..1996303 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -16,6 +16,10 @@ access_log = "" # Any write operations except flushall and replication will be disabled in slave mode. slaveof = "" +# Readonly mode, slave server is always readonly even readonly = false +# for readonly mode, only replication and flushall can write +readonly = false + # Choose which backend storage to use, now support: # # leveldb diff --git a/ledis/const.go b/ledis/const.go index 3b30123..3e17a95 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -46,11 +46,6 @@ var ( } ) -const ( - RDWRMode = 0 - ROnlyMode = 1 -) - const ( defaultScanCount int = 10 ) diff --git a/ledis/ledis.go b/ledis/ledis.go index 8893eee..75caedd 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -33,17 +33,10 @@ type Ledis struct { wLock sync.RWMutex //allow one write at same time commitLock sync.Mutex //allow one write commit at same time - // for readonly mode, only replication and flushall can write - readOnly bool - lock io.Closer } func Open(cfg *config.Config) (*Ledis, error) { - return Open2(cfg, RDWRMode) -} - -func Open2(cfg *config.Config, flags int) (*Ledis, error) { if len(cfg.DataDir) == 0 { cfg.DataDir = config.DefaultDataDir } @@ -53,13 +46,12 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) { var err error l := new(Ledis) + l.cfg = cfg if l.lock, err = filelock.Lock(path.Join(cfg.DataDir, "LOCK")); err != nil { return nil, err } - l.readOnly = (flags&ROnlyMode > 0) - l.quit = make(chan struct{}) if l.ldb, err = store.Open(cfg); err != nil { @@ -163,7 +155,7 @@ func (l *Ledis) flushAll() error { } func (l *Ledis) IsReadOnly() bool { - if l.readOnly { + if l.cfg.Readonly { return true } else if l.r != nil { if b, _ := l.r.CommitIDBehind(); b { @@ -173,10 +165,6 @@ func (l *Ledis) IsReadOnly() bool { return false } -func (l *Ledis) SetReadOnly(b bool) { - l.readOnly = b -} - func (l *Ledis) onDataExpired() { defer l.wg.Done() diff --git a/ledis/replication.go b/ledis/replication.go index b68a990..33a5e12 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -110,7 +110,7 @@ func (l *Ledis) WaitReplication() error { func (l *Ledis) StoreLogsFromReader(rb io.Reader) error { if !l.ReplicationUsed() { return ErrRplNotSupport - } else if !l.readOnly { + } else if !l.cfg.Readonly { return ErrRplInRDWR } diff --git a/ledis/replication_test.go b/ledis/replication_test.go index c300ef8..920ed63 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -46,10 +46,11 @@ func TestReplication(t *testing.T) { cfgS := new(config.Config) cfgS.DataDir = "/tmp/test_repl/slave" cfgS.UseReplication = true + cfgS.Readonly = true os.RemoveAll(cfgS.DataDir) - slave, err = Open2(cfgS, ROnlyMode) + slave, err = Open(cfgS) if err != nil { t.Fatal(err) } diff --git a/server/app.go b/server/app.go index dbf12e5..0a253ac 100644 --- a/server/app.go +++ b/server/app.go @@ -88,13 +88,12 @@ func NewApp(cfg *config.Config) (*App, error) { } } - flag := ledis.RDWRMode if len(app.cfg.SlaveOf) > 0 { //slave must readonly - flag = ledis.ROnlyMode + app.cfg.Readonly = true } - if app.ldb, err = ledis.Open2(cfg, flag); err != nil { + if app.ldb, err = ledis.Open(cfg); err != nil { return nil, err } @@ -135,7 +134,7 @@ func (app *App) Close() { func (app *App) Run() { if len(app.cfg.SlaveOf) > 0 { - app.slaveof(app.cfg.SlaveOf, false) + app.slaveof(app.cfg.SlaveOf, false, app.cfg.Readonly) } go app.httpServe() diff --git a/server/cmd_replication.go b/server/cmd_replication.go index aa6ede4..128e4af 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -13,18 +13,19 @@ import ( func slaveofCommand(c *client) error { args := c.args - if len(args) != 2 || len(args) != 3 { + if len(args) != 2 && len(args) != 3 { return ErrCmdParams } masterAddr := "" restart := false + readonly := false if strings.ToLower(hack.String(args[0])) == "no" && strings.ToLower(hack.String(args[1])) == "one" { //stop replication, use master = "" - if len(args) != 2 { - return ErrCmdParams + if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "readonly" { + readonly = true } } else { if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil { @@ -38,7 +39,7 @@ func slaveofCommand(c *client) error { } } - if err := c.app.slaveof(masterAddr, restart); err != nil { + if err := c.app.slaveof(masterAddr, restart, readonly); err != nil { return err } diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index 76bf2c2..1dfde6b 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -96,7 +96,7 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - slave.slaveof("", false) + slave.slaveof("", false, false) db.Set([]byte("a2"), value) db.Set([]byte("b2"), value) @@ -112,7 +112,7 @@ func TestReplication(t *testing.T) { t.Fatal("must error") } - slave.slaveof(masterCfg.Addr, false) + slave.slaveof(masterCfg.Addr, false, false) time.Sleep(1 * time.Second) diff --git a/server/info.go b/server/info.go index b60db65..2babae8 100644 --- a/server/info.go +++ b/server/info.go @@ -115,7 +115,8 @@ func (i *info) dumpServer(buf *bytes.Buffer) { i.dumpPairs(buf, infoPair{"os", i.Server.OS}, infoPair{"process_id", i.Server.ProceessId}, infoPair{"addr", i.app.cfg.Addr}, - infoPair{"http_addr", i.app.cfg.HttpAddr}) + infoPair{"http_addr", i.app.cfg.HttpAddr}, + infoPair{"readonly", i.app.cfg.Readonly}) } func (i *info) dumpClients(buf *bytes.Buffer) { @@ -155,10 +156,10 @@ func (i *info) dumpReplication(buf *bytes.Buffer) { slaves = append(slaves, s.remoteAddr) } - p = append(p, infoPair{"readonly", i.app.ldb.IsReadOnly()}) + p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf}) if len(slaves) > 0 { - p = append(p, infoPair{"slave", strings.Join(slaves, ",")}) + p = append(p, infoPair{"slaves", strings.Join(slaves, ",")}) } if s, _ := i.app.ldb.ReplicationStat(); s != nil { diff --git a/server/replication.go b/server/replication.go index 1f12b93..68f2bc9 100644 --- a/server/replication.go +++ b/server/replication.go @@ -93,7 +93,7 @@ func (m *master) startReplication(masterAddr string, restart bool) error { m.quit = make(chan struct{}, 1) - m.app.ldb.SetReadOnly(true) + m.app.cfg.Readonly = true m.wg.Add(1) go m.runReplication(restart) @@ -238,10 +238,16 @@ func (m *master) sync() error { } -func (app *App) slaveof(masterAddr string, restart bool) error { +func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error { app.m.Lock() defer app.m.Unlock() + //in master mode and no slaveof, only set readonly + if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 { + app.cfg.Readonly = readonly + return nil + } + if !app.ldb.ReplicationUsed() { return fmt.Errorf("slaveof must enable replication") } @@ -253,7 +259,7 @@ func (app *App) slaveof(masterAddr string, restart bool) error { return err } - app.ldb.SetReadOnly(false) + app.cfg.Readonly = readonly } else { return app.m.startReplication(masterAddr, restart) } From 180091e10f92ae33f0f61da61d2531fb740d4fba Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 10 Oct 2014 17:57:18 +0800 Subject: [PATCH 07/13] add snapshot store --- config/config.go | 7 ++ config/config.toml | 9 +++ etc/ledis.conf | 8 ++ server/snapshot.go | 174 ++++++++++++++++++++++++++++++++++++++++ server/snapshot_test.go | 47 +++++++++++ 5 files changed, 245 insertions(+) create mode 100644 server/snapshot.go create mode 100644 server/snapshot_test.go diff --git a/config/config.go b/config/config.go index 589a149..ed0c2f2 100644 --- a/config/config.go +++ b/config/config.go @@ -45,6 +45,11 @@ type ReplicationConfig struct { Compression bool `toml:"compression"` } +type SnapshotConfig struct { + Path string `toml:"path"` + MaxNum int `toml:"max_num"` +} + type Config struct { FileName string `toml:"-"` @@ -70,6 +75,8 @@ type Config struct { UseReplication bool `toml:"use_replication"` Replication ReplicationConfig `toml:"replication"` + + Snapshot SnapshotConfig `toml:"snapshot"` } func NewConfigWithFile(fileName string) (*Config, error) { diff --git a/config/config.toml b/config/config.toml index 1996303..65947a0 100644 --- a/config/config.toml +++ b/config/config.toml @@ -77,3 +77,12 @@ sync_log = 0 # Compress the log or not compression = true + +[snapshot] +# Path to store snapshot dump file +# if not set, use data_dir/snapshot +# snapshot file name format is snap-2006-01-02T15:04:05.dump +path = "" + +# Reserve newest max_num snapshot dump files +max_num = 1 diff --git a/etc/ledis.conf b/etc/ledis.conf index 1996303..9b69fc2 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -77,3 +77,11 @@ sync_log = 0 # Compress the log or not compression = true + +[snapshot] +# Path to store snapshot dump file +# if not set, use data_dir/snapshot +path = "" + +# Reserve newest max_num snapshot dump files +max_num = 1 diff --git a/server/snapshot.go b/server/snapshot.go new file mode 100644 index 0000000..a814c8f --- /dev/null +++ b/server/snapshot.go @@ -0,0 +1,174 @@ +package server + +import ( + "fmt" + "github.com/siddontang/go/log" + "github.com/siddontang/ledisdb/config" + "io" + "io/ioutil" + "os" + "path" + "sort" + "sync" + "time" +) + +const ( + snapshotTimeFormat = "2006-01-02T15:04:05" +) + +type snapshotStore struct { + sync.Mutex + + cfg *config.Config + + names []string + + quit chan struct{} +} + +func snapshotName(t time.Time) string { + return fmt.Sprintf("snap-%s.dump", t.Format(snapshotTimeFormat)) +} + +func parseSnapshotName(name string) (time.Time, error) { + var timeString string + if _, err := fmt.Sscanf(name, "snap-%s.dump", &timeString); err != nil { + return time.Time{}, err + } + when, err := time.Parse(snapshotTimeFormat, timeString) + if err != nil { + return time.Time{}, err + } + return when, nil +} + +func newSnapshotStore(cfg *config.Config) (*snapshotStore, error) { + if len(cfg.Snapshot.Path) == 0 { + cfg.Snapshot.Path = path.Join(cfg.DataDir, "snapshot") + } + + if err := os.MkdirAll(cfg.Snapshot.Path, 0755); err != nil { + return nil, err + } + + s := new(snapshotStore) + s.cfg = cfg + s.names = make([]string, 0, s.cfg.Snapshot.MaxNum) + + s.quit = make(chan struct{}) + + snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path) + if err != nil { + return nil, err + } + + for _, info := range snapshots { + if _, err := parseSnapshotName(info.Name()); err != nil { + log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error()) + continue + } + + s.names = append(s.names, info.Name()) + } + + //from old to new + sort.Strings(s.names) + + go s.run() + + return s, nil +} + +func (s *snapshotStore) Close() { + close(s.quit) +} + +func (s *snapshotStore) run() { + t := time.NewTicker(1 * time.Minute) + defer t.Stop() + + for { + select { + case <-t.C: + s.purge() + case <-s.quit: + return + } + } +} + +func (s *snapshotStore) purge() { + s.Lock() + var names []string + num := s.cfg.Snapshot.MaxNum + if len(s.names) > num { + names = s.names[0:num] + + s.names = s.names[num:] + } + + s.Unlock() + + for _, n := range names { + if err := os.Remove(s.snapshotPath(n)); err != nil { + log.Error("purge snapshot %s error %s", n, err.Error()) + } + } +} + +func (s *snapshotStore) snapshotPath(name string) string { + return path.Join(s.cfg.Snapshot.Path, name) +} + +type snapshotDumper interface { + Dump(w io.Writer) error +} + +func (s *snapshotStore) Create(d snapshotDumper) (*os.File, time.Time, error) { + s.Lock() + defer s.Unlock() + + now := time.Now() + name := snapshotName(now) + + if len(s.names) > 0 && s.names[len(s.names)-1] >= name { + return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", name, s.names[len(s.names)-1]) + } + + f, err := os.OpenFile(s.snapshotPath(name), os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, time.Time{}, err + } + + if err := d.Dump(f); err != nil { + f.Close() + os.Remove(s.snapshotPath(name)) + return nil, time.Time{}, err + } + + s.names = append(s.names, name) + + f.Seek(0, os.SEEK_SET) + + return f, now, nil +} + +func (s *snapshotStore) OpenLatest() (*os.File, time.Time, error) { + s.Lock() + defer s.Unlock() + + if len(s.names) == 0 { + return nil, time.Time{}, nil + } + + name := s.names[len(s.names)-1] + t, _ := parseSnapshotName(name) + + f, err := os.Open(s.snapshotPath(name)) + if err != nil { + return nil, time.Time{}, err + } + + return f, t, err +} diff --git a/server/snapshot_test.go b/server/snapshot_test.go new file mode 100644 index 0000000..6220c5c --- /dev/null +++ b/server/snapshot_test.go @@ -0,0 +1,47 @@ +package server + +import ( + "github.com/siddontang/ledisdb/config" + "io" + "io/ioutil" + "os" + "path" + "testing" +) + +type testSnapshotDumper struct { +} + +func (d *testSnapshotDumper) Dump(w io.Writer) error { + w.Write([]byte("hello world")) + return nil +} + +func TestSnapshot(t *testing.T) { + cfg := new(config.Config) + cfg.Snapshot.MaxNum = 2 + cfg.Snapshot.Path = path.Join(os.TempDir(), "snapshot") + defer os.RemoveAll(cfg.Snapshot.Path) + + d := new(testSnapshotDumper) + + s, err := newSnapshotStore(cfg) + if err != nil { + t.Fatal(err) + } + + if f, _, err := s.Create(d); err != nil { + t.Fatal(err) + } else { + defer f.Close() + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + } + + if len(s.names) != 1 { + t.Fatal("mut one snapshot") + } + + s.Close() +} From 36f015268f3659ad96b19fcb2af9427e1614bbb1 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 10 Oct 2014 22:45:22 +0800 Subject: [PATCH 08/13] update snapshot --- config/config.toml | 2 +- server/snapshot.go | 64 ++++++++++++++++++++++++++++++----------- server/snapshot_test.go | 34 ++++++++++++++++++++-- 3 files changed, 81 insertions(+), 19 deletions(-) diff --git a/config/config.toml b/config/config.toml index 65947a0..87036db 100644 --- a/config/config.toml +++ b/config/config.toml @@ -81,7 +81,7 @@ compression = true [snapshot] # Path to store snapshot dump file # if not set, use data_dir/snapshot -# snapshot file name format is snap-2006-01-02T15:04:05.dump +# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp path = "" # Reserve newest max_num snapshot dump files diff --git a/server/snapshot.go b/server/snapshot.go index a814c8f..4068f48 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -14,7 +14,7 @@ import ( ) const ( - snapshotTimeFormat = "2006-01-02T15:04:05" + snapshotTimeFormat = "2006-01-02T15:04:05.999999999" ) type snapshotStore struct { @@ -91,28 +91,37 @@ func (s *snapshotStore) run() { for { select { case <-t.C: - s.purge() + s.Lock() + s.purge(false) + s.Unlock() case <-s.quit: return } } } -func (s *snapshotStore) purge() { - s.Lock() +func (s *snapshotStore) purge(create bool) { var names []string - num := s.cfg.Snapshot.MaxNum - if len(s.names) > num { - names = s.names[0:num] + maxNum := s.cfg.Snapshot.MaxNum + num := len(s.names) - maxNum - s.names = s.names[num:] + if create { + num++ + if num > len(s.names) { + num = len(s.names) + } } - s.Unlock() + if num > 0 { + names = s.names[0:num] - for _, n := range names { - if err := os.Remove(s.snapshotPath(n)); err != nil { - log.Error("purge snapshot %s error %s", n, err.Error()) + n := copy(s.names, s.names[num:]) + s.names = s.names[0:n] + } + + for _, name := range names { + if err := os.Remove(s.snapshotPath(name)); err != nil { + log.Error("purge snapshot %s error %s", name, err.Error()) } } } @@ -125,10 +134,31 @@ type snapshotDumper interface { Dump(w io.Writer) error } -func (s *snapshotStore) Create(d snapshotDumper) (*os.File, time.Time, error) { +type snapshot struct { + io.ReadCloser + + f *os.File +} + +func (st *snapshot) Read(b []byte) (int, error) { + return st.f.Read(b) +} + +func (st *snapshot) Close() error { + return st.f.Close() +} + +func (st *snapshot) Size() int64 { + s, _ := st.f.Stat() + return s.Size() +} + +func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) { s.Lock() defer s.Unlock() + s.purge(true) + now := time.Now() name := snapshotName(now) @@ -147,14 +177,16 @@ func (s *snapshotStore) Create(d snapshotDumper) (*os.File, time.Time, error) { return nil, time.Time{}, err } + f.Sync() + s.names = append(s.names, name) f.Seek(0, os.SEEK_SET) - return f, now, nil + return &snapshot{f: f}, now, nil } -func (s *snapshotStore) OpenLatest() (*os.File, time.Time, error) { +func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { s.Lock() defer s.Unlock() @@ -170,5 +202,5 @@ func (s *snapshotStore) OpenLatest() (*os.File, time.Time, error) { return nil, time.Time{}, err } - return f, t, err + return &snapshot{f: f}, t, err } diff --git a/server/snapshot_test.go b/server/snapshot_test.go index 6220c5c..c55e435 100644 --- a/server/snapshot_test.go +++ b/server/snapshot_test.go @@ -37,10 +37,40 @@ func TestSnapshot(t *testing.T) { if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { t.Fatal("invalid read snapshot") } + + if len(s.names) != 1 { + t.Fatal("must 1 snapshot") + } } - if len(s.names) != 1 { - t.Fatal("mut one snapshot") + if f, _, err := s.Create(d); err != nil { + t.Fatal(err) + } else { + defer f.Close() + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + if len(s.names) != 2 { + t.Fatal("must 2 snapshot") + } + } + + if f, _, err := s.Create(d); err != nil { + t.Fatal(err) + } else { + defer f.Close() + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + + if len(s.names) != 2 { + t.Fatal("must 2 snapshot") + } + } + + fs, _ := ioutil.ReadDir(cfg.Snapshot.Path) + if len(fs) != 2 { + t.Fatal("must 2 snapshot") } s.Close() From 1583ae90fc956a3af37355a9a81a9e61e7e3997e Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 11 Oct 2014 10:14:23 +0800 Subject: [PATCH 09/13] update snapshot --- server/snapshot.go | 114 +++++++++++++++++++++++++++++----------- server/snapshot_test.go | 30 +++++++++-- 2 files changed, 111 insertions(+), 33 deletions(-) diff --git a/server/snapshot.go b/server/snapshot.go index 4068f48..1643160 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -28,12 +28,12 @@ type snapshotStore struct { } func snapshotName(t time.Time) string { - return fmt.Sprintf("snap-%s.dump", t.Format(snapshotTimeFormat)) + return fmt.Sprintf("snap-%s.dmp", t.Format(snapshotTimeFormat)) } func parseSnapshotName(name string) (time.Time, error) { var timeString string - if _, err := fmt.Sscanf(name, "snap-%s.dump", &timeString); err != nil { + if _, err := fmt.Sscanf(name, "snap-%s.dmp", &timeString); err != nil { return time.Time{}, err } when, err := time.Parse(snapshotTimeFormat, timeString) @@ -58,23 +58,10 @@ func newSnapshotStore(cfg *config.Config) (*snapshotStore, error) { s.quit = make(chan struct{}) - snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path) - if err != nil { + if err := s.checkSnapshots(); err != nil { return nil, err } - for _, info := range snapshots { - if _, err := parseSnapshotName(info.Name()); err != nil { - log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error()) - continue - } - - s.names = append(s.names, info.Name()) - } - - //from old to new - sort.Strings(s.names) - go s.run() return s, nil @@ -84,15 +71,51 @@ func (s *snapshotStore) Close() { close(s.quit) } +func (s *snapshotStore) checkSnapshots() error { + cfg := s.cfg + snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path) + if err != nil { + log.Error("read %s error: %s", cfg.Snapshot.Path, err.Error()) + return err + } + + names := []string{} + for _, info := range snapshots { + if path.Ext(info.Name()) == ".tmp" { + log.Error("temp snapshot file name %s, try remove", info.Name()) + os.Remove(path.Join(cfg.Snapshot.Path, info.Name())) + continue + } + + if _, err := parseSnapshotName(info.Name()); err != nil { + log.Error("invalid snapshot file name %s, err: %s, try remove", info.Name(), err.Error()) + continue + } + + names = append(names, info.Name()) + } + + //from old to new + sort.Strings(names) + + s.names = names + + s.purge(false) + + return nil +} + func (s *snapshotStore) run() { - t := time.NewTicker(1 * time.Minute) + t := time.NewTicker(60 * time.Minute) defer t.Stop() for { select { case <-t.C: s.Lock() - s.purge(false) + if err := s.checkSnapshots(); err != nil { + log.Error("check snapshots error %s", err.Error()) + } s.Unlock() case <-s.quit: return @@ -138,6 +161,8 @@ type snapshot struct { io.ReadCloser f *os.File + + temp bool } func (st *snapshot) Read(b []byte) (int, error) { @@ -145,7 +170,15 @@ func (st *snapshot) Read(b []byte) (int, error) { } func (st *snapshot) Close() error { - return st.f.Close() + if st.temp { + name := st.f.Name() + if err := st.f.Close(); err != nil { + return err + } + return os.Remove(name) + } else { + return st.f.Close() + } } func (st *snapshot) Size() int64 { @@ -153,37 +186,58 @@ func (st *snapshot) Size() int64 { return s.Size() } -func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) { +func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Time, error) { s.Lock() defer s.Unlock() - s.purge(true) + if !temp { + s.purge(true) + } now := time.Now() name := snapshotName(now) - if len(s.names) > 0 && s.names[len(s.names)-1] >= name { - return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", name, s.names[len(s.names)-1]) + tmpName := name + ".tmp" + + if len(s.names) > 0 && !temp { + lastTime, _ := parseSnapshotName(s.names[len(s.names)-1]) + if !now.After(lastTime) { + return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", + now.Format(snapshotTimeFormat), lastTime.Format(snapshotTimeFormat)) + } } - f, err := os.OpenFile(s.snapshotPath(name), os.O_RDWR|os.O_CREATE, 0644) + f, err := os.OpenFile(s.snapshotPath(tmpName), os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, time.Time{}, err } if err := d.Dump(f); err != nil { f.Close() - os.Remove(s.snapshotPath(name)) + os.Remove(s.snapshotPath(tmpName)) return nil, time.Time{}, err } - f.Sync() + if temp { + if err := f.Sync(); err != nil { + f.Close() + return nil, time.Time{}, err + } - s.names = append(s.names, name) + f.Seek(0, os.SEEK_SET) + } else { + f.Close() + if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err + } - f.Seek(0, os.SEEK_SET) + if f, err = os.Open(s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err + } + s.names = append(s.names, name) + } - return &snapshot{f: f}, now, nil + return &snapshot{f: f, temp: temp}, now, nil } func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { @@ -202,5 +256,5 @@ func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { return nil, time.Time{}, err } - return &snapshot{f: f}, t, err + return &snapshot{f: f, temp: false}, t, err } diff --git a/server/snapshot_test.go b/server/snapshot_test.go index c55e435..11051a8 100644 --- a/server/snapshot_test.go +++ b/server/snapshot_test.go @@ -30,7 +30,7 @@ func TestSnapshot(t *testing.T) { t.Fatal(err) } - if f, _, err := s.Create(d); err != nil { + if f, _, err := s.Create(d, false); err != nil { t.Fatal(err) } else { defer f.Close() @@ -43,7 +43,7 @@ func TestSnapshot(t *testing.T) { } } - if f, _, err := s.Create(d); err != nil { + if f, _, err := s.Create(d, false); err != nil { t.Fatal(err) } else { defer f.Close() @@ -55,7 +55,7 @@ func TestSnapshot(t *testing.T) { } } - if f, _, err := s.Create(d); err != nil { + if f, _, err := s.Create(d, false); err != nil { t.Fatal(err) } else { defer f.Close() @@ -73,5 +73,29 @@ func TestSnapshot(t *testing.T) { t.Fatal("must 2 snapshot") } + if f, _, err := s.Create(d, true); err != nil { + t.Fatal(err) + } else { + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + + if len(s.names) != 2 { + t.Fatal("must 2 snapshot") + } + + fs, _ = ioutil.ReadDir(cfg.Snapshot.Path) + if len(fs) != 3 { + t.Fatal("must 3 snapshot") + } + + f.Close() + } + + fs, _ = ioutil.ReadDir(cfg.Snapshot.Path) + if len(fs) != 2 { + t.Fatal("must 2 snapshot") + } + s.Close() } From e3c21020805099d3673362b69882a53dbd258ce8 Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 11 Oct 2014 10:31:34 +0800 Subject: [PATCH 10/13] closure defer not works as I think --- ledis/dump.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/ledis/dump.go b/ledis/dump.go index 436e707..1f6d5e2 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -46,21 +46,22 @@ func (l *Ledis) Dump(w io.Writer) error { var commitID uint64 var snap *store.Snapshot - { - l.wLock.Lock() - defer l.wLock.Unlock() + l.wLock.Lock() - if l.r != nil { - if commitID, err = l.r.LastCommitID(); err != nil { - return err - } - } - - if snap, err = l.ldb.NewSnapshot(); err != nil { + if l.r != nil { + if commitID, err = l.r.LastCommitID(); err != nil { + l.wLock.Unlock() return err } } + if snap, err = l.ldb.NewSnapshot(); err != nil { + l.wLock.Unlock() + return err + } + + l.wLock.Unlock() + wb := bufio.NewWriterSize(w, 4096) h := &DumpHead{commitID} From 4a1c74cb44a62609f7e55b04d7fc9ca777c2322f Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 11 Oct 2014 16:00:59 +0800 Subject: [PATCH 11/13] adjust config --- cmd/ledis-server/main.go | 3 +++ config/config.go | 35 +++++++++++++++++++++------------- config/config.toml | 4 ++-- etc/ledis.conf | 5 +++-- ledis/dump_test.go | 4 ++-- ledis/ledis_test.go | 2 +- ledis/replication_test.go | 4 ++-- ledis/tx_test.go | 2 +- rpl/goleveldb_store.go | 8 ++++---- rpl/rpl_test.go | 2 +- server/app_test.go | 2 +- server/cmd_replication_test.go | 6 +++--- server/doc.go | 2 +- server/replication.go | 2 +- server/scan_test.go | 2 +- server/script_test.go | 2 +- server/snapshot_test.go | 2 +- store/goleveldb/db.go | 2 -- store/hyperleveldb/db.go | 2 -- store/leveldb/db.go | 2 -- store/rocksdb/db.go | 2 -- store/store_test.go | 2 +- 22 files changed, 51 insertions(+), 46 deletions(-) diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index c03bb84..e132506 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -20,6 +20,7 @@ var usePprof = flag.Bool("pprof", false, "enable pprof") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") +var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -48,8 +49,10 @@ func main() { if len(*slaveof) > 0 { cfg.SlaveOf = *slaveof cfg.Readonly = true + cfg.UseReplication = true } else { cfg.Readonly = *readonly + cfg.UseReplication = *rpl } var app *server.App diff --git a/config/config.go b/config/config.go index ed0c2f2..23a33de 100644 --- a/config/config.go +++ b/config/config.go @@ -14,8 +14,7 @@ var ( ) const ( - DefaultAddr string = "127.0.0.1:6380" - DefaultHttpAddr string = "127.0.0.1:11181" + DefaultAddr string = "127.0.0.1:6380" DefaultDBName string = "goleveldb" @@ -101,6 +100,8 @@ func NewConfigWithData(data []byte) (*Config, error) { return nil, err } + cfg.adjust() + return cfg, nil } @@ -108,7 +109,7 @@ func NewConfigDefault() *Config { cfg := new(Config) cfg.Addr = DefaultAddr - cfg.HttpAddr = DefaultHttpAddr + cfg.HttpAddr = "" cfg.DataDir = DefaultDataDir @@ -123,28 +124,36 @@ func NewConfigDefault() *Config { cfg.LMDB.MapSize = 20 * 1024 * 1024 cfg.LMDB.NoSync = true - cfg.Replication.WaitSyncTime = 1 + cfg.UseReplication = false + cfg.Replication.WaitSyncTime = 500 cfg.Replication.Compression = true cfg.Replication.WaitMaxSlaveAcks = 2 + cfg.Replication.SyncLog = 0 + + cfg.adjust() return cfg } -func (cfg *LevelDBConfig) Adjust() { - if cfg.CacheSize <= 0 { - cfg.CacheSize = 4 * 1024 * 1024 +func (cfg *Config) adjust() { + if cfg.LevelDB.CacheSize <= 0 { + cfg.LevelDB.CacheSize = 4 * 1024 * 1024 } - if cfg.BlockSize <= 0 { - cfg.BlockSize = 4 * 1024 + if cfg.LevelDB.BlockSize <= 0 { + cfg.LevelDB.BlockSize = 4 * 1024 } - if cfg.WriteBufferSize <= 0 { - cfg.WriteBufferSize = 4 * 1024 * 1024 + if cfg.LevelDB.WriteBufferSize <= 0 { + cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 } - if cfg.MaxOpenFiles < 1024 { - cfg.MaxOpenFiles = 1024 + if cfg.LevelDB.MaxOpenFiles < 1024 { + cfg.LevelDB.MaxOpenFiles = 1024 + } + + if cfg.Replication.ExpiredLogDays <= 0 { + cfg.Replication.ExpiredLogDays = 7 } } diff --git a/config/config.toml b/config/config.toml index 87036db..27de6c5 100644 --- a/config/config.toml +++ b/config/config.toml @@ -58,8 +58,8 @@ path = "" # It will reduce performance but have better high availability. sync = true -# If sync is true, wait at last wait_sync_time seconds for slave syncing this log -wait_sync_time = 1 +# If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log +wait_sync_time = 500 # If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok. # n is slave number diff --git a/etc/ledis.conf b/etc/ledis.conf index 9b69fc2..27de6c5 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -58,8 +58,8 @@ path = "" # It will reduce performance but have better high availability. sync = true -# If sync is true, wait at last wait_sync_time seconds for slave syncing this log -wait_sync_time = 1 +# If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log +wait_sync_time = 500 # If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok. # n is slave number @@ -81,6 +81,7 @@ compression = true [snapshot] # Path to store snapshot dump file # if not set, use data_dir/snapshot +# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp path = "" # Reserve newest max_num snapshot dump files diff --git a/ledis/dump_test.go b/ledis/dump_test.go index e29d928..98e4c9e 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -9,7 +9,7 @@ import ( ) func TestDump(t *testing.T) { - cfgM := new(config.Config) + cfgM := config.NewConfigDefault() cfgM.DataDir = "/tmp/test_ledis_master" os.RemoveAll(cfgM.DataDir) @@ -19,7 +19,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - cfgS := new(config.Config) + cfgS := config.NewConfigDefault() cfgS.DataDir = "/tmp/test_ledis_slave" os.RemoveAll(cfgM.DataDir) diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index 45f1c7f..51e4d0d 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -12,7 +12,7 @@ var testLedisOnce sync.Once func getTestDB() *DB { f := func() { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/test_ledis" os.RemoveAll(cfg.DataDir) diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 920ed63..fc5e210 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -30,7 +30,7 @@ func TestReplication(t *testing.T) { var slave *Ledis var err error - cfgM := new(config.Config) + cfgM := config.NewConfigDefault() cfgM.DataDir = "/tmp/test_repl/master" cfgM.UseReplication = true @@ -43,7 +43,7 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - cfgS := new(config.Config) + cfgS := config.NewConfigDefault() cfgS.DataDir = "/tmp/test_repl/slave" cfgS.UseReplication = true cfgS.Readonly = true diff --git a/ledis/tx_test.go b/ledis/tx_test.go index cb3a7f0..e21c0a8 100644 --- a/ledis/tx_test.go +++ b/ledis/tx_test.go @@ -190,7 +190,7 @@ func testTxSelect(t *testing.T, db *DB) { } func testTx(t *testing.T, name string) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/ledis_test_tx" cfg.DBName = name diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index 1c70603..c00f36c 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -269,12 +269,12 @@ func (s *GoLevelDBStore) open() error { } func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DBName = "goleveldb" cfg.DBPath = base - cfg.LevelDB.BlockSize = 4 * 1024 * 1024 - cfg.LevelDB.CacheSize = 16 * 1024 * 1024 - cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 + cfg.LevelDB.BlockSize = 16 * 1024 * 1024 + cfg.LevelDB.CacheSize = 64 * 1024 * 1024 + cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024 cfg.LevelDB.Compression = false s := new(GoLevelDBStore) diff --git a/rpl/rpl_test.go b/rpl/rpl_test.go index 06fcf7d..63ff427 100644 --- a/rpl/rpl_test.go +++ b/rpl/rpl_test.go @@ -14,7 +14,7 @@ func TestReplication(t *testing.T) { } defer os.RemoveAll(dir) - c := new(config.Config) + c := config.NewConfigDefault() c.Replication.Path = dir r, err := NewReplication(c) diff --git a/server/app_test.go b/server/app_test.go index aa4daba..17e2989 100644 --- a/server/app_test.go +++ b/server/app_test.go @@ -29,7 +29,7 @@ func startTestApp() { f := func() { newTestLedisClient() - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/testdb" os.RemoveAll(cfg.DataDir) diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index 1dfde6b..3c79836 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -37,12 +37,12 @@ func TestReplication(t *testing.T) { data_dir := "/tmp/test_replication" os.RemoveAll(data_dir) - masterCfg := new(config.Config) + masterCfg := config.NewConfigDefault() masterCfg.DataDir = fmt.Sprintf("%s/master", data_dir) masterCfg.Addr = "127.0.0.1:11182" masterCfg.UseReplication = true masterCfg.Replication.Sync = true - masterCfg.Replication.WaitSyncTime = 5 + masterCfg.Replication.WaitSyncTime = 5000 var master *App var slave *App @@ -53,7 +53,7 @@ func TestReplication(t *testing.T) { } defer master.Close() - slaveCfg := new(config.Config) + slaveCfg := config.NewConfigDefault() slaveCfg.DataDir = fmt.Sprintf("%s/slave", data_dir) slaveCfg.Addr = "127.0.0.1:11183" slaveCfg.SlaveOf = masterCfg.Addr diff --git a/server/doc.go b/server/doc.go index 7dc47ff..e5a5dd4 100644 --- a/server/doc.go +++ b/server/doc.go @@ -9,7 +9,7 @@ // // Start a ledis server is very simple: // -// cfg := new(config.Config) +// cfg := config.NewConfigDefault() // cfg.Addr = "127.0.0.1:6380" // cfg.DataDir = "/tmp/ledis" // app := server.NewApp(cfg) diff --git a/server/replication.go b/server/replication.go index 68f2bc9..a21c46f 100644 --- a/server/replication.go +++ b/server/replication.go @@ -366,7 +366,7 @@ func (app *App) publishNewLog(l *rpl.Log) { select { case <-done: - case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second): + case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Millisecond): log.Info("replication wait timeout") } } diff --git a/server/scan_test.go b/server/scan_test.go index bc47395..ed9b71c 100644 --- a/server/scan_test.go +++ b/server/scan_test.go @@ -9,7 +9,7 @@ import ( ) func TestScan(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/test_scan" cfg.Addr = "127.0.0.1:11185" diff --git a/server/script_test.go b/server/script_test.go index a422a82..280d86e 100644 --- a/server/script_test.go +++ b/server/script_test.go @@ -101,7 +101,7 @@ var testScript4 = ` ` func TestLuaCall(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.Addr = ":11188" cfg.DataDir = "/tmp/testscript" cfg.DBName = "memory" diff --git a/server/snapshot_test.go b/server/snapshot_test.go index 11051a8..5e9ba46 100644 --- a/server/snapshot_test.go +++ b/server/snapshot_test.go @@ -18,7 +18,7 @@ func (d *testSnapshotDumper) Dump(w io.Writer) error { } func TestSnapshot(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.Snapshot.MaxNum = 2 cfg.Snapshot.Path = path.Join(os.TempDir(), "snapshot") defer os.RemoveAll(cfg.Snapshot.Path) diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go index 5e76f74..9924067 100644 --- a/store/goleveldb/db.go +++ b/store/goleveldb/db.go @@ -113,8 +113,6 @@ func newOptions(cfg *config.LevelDBConfig) *opt.Options { opts := &opt.Options{} opts.ErrorIfMissing = false - cfg.Adjust() - opts.BlockCache = cache.NewLRUCache(cfg.CacheSize) //we must use bloomfilter diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go index 0f13a0b..90af5be 100644 --- a/store/hyperleveldb/db.go +++ b/store/hyperleveldb/db.go @@ -108,8 +108,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { opts.SetCreateIfMissing(true) - cfg.Adjust() - db.cache = NewLRUCache(cfg.CacheSize) opts.SetCache(db.cache) diff --git a/store/leveldb/db.go b/store/leveldb/db.go index 3705f78..cd1fb1e 100644 --- a/store/leveldb/db.go +++ b/store/leveldb/db.go @@ -108,8 +108,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { opts.SetCreateIfMissing(true) - cfg.Adjust() - db.cache = NewLRUCache(cfg.CacheSize) opts.SetCache(db.cache) diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go index f5cecf4..35a1c7e 100644 --- a/store/rocksdb/db.go +++ b/store/rocksdb/db.go @@ -113,8 +113,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { opts.SetCreateIfMissing(true) - cfg.Adjust() - db.env = NewDefaultEnv() db.env.SetBackgroundThreads(runtime.NumCPU() * 2) db.env.SetHighPriorityBackgroundThreads(1) diff --git a/store/store_test.go b/store/store_test.go index d2f2ca6..b488158 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -10,7 +10,7 @@ import ( ) func TestStore(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/testdb" cfg.LMDB.MapSize = 10 * 1024 * 1024 From f907234638d6a34ab54b46609cb8233ceed506e6 Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 11 Oct 2014 17:44:31 +0800 Subject: [PATCH 12/13] optimize fullsync --- cmd/ledis-dump/main.go | 2 +- config/config.go | 1 + config/config.toml | 2 +- doc/commands.json | 2 +- doc/commands.md | 6 +++-- server/app.go | 8 ++++++ server/cmd_replication.go | 52 ++++++++++++++++++++++++------------ server/snapshot.go | 56 +++++++++++++-------------------------- server/snapshot_test.go | 30 +++------------------ 9 files changed, 72 insertions(+), 87 deletions(-) diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go index b6a798d..6f9bac4 100644 --- a/cmd/ledis-dump/main.go +++ b/cmd/ledis-dump/main.go @@ -14,7 +14,7 @@ var port = flag.Int("port", 6380, "ledis server port") var sock = flag.String("sock", "", "ledis unix socket domain") var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") -var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync +var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync func main() { flag.Parse() diff --git a/config/config.go b/config/config.go index 23a33de..aa74c9d 100644 --- a/config/config.go +++ b/config/config.go @@ -129,6 +129,7 @@ func NewConfigDefault() *Config { cfg.Replication.Compression = true cfg.Replication.WaitMaxSlaveAcks = 2 cfg.Replication.SyncLog = 0 + cfg.Snapshot.MaxNum = 1 cfg.adjust() diff --git a/config/config.toml b/config/config.toml index 27de6c5..8a8cfba 100644 --- a/config/config.toml +++ b/config/config.toml @@ -81,7 +81,7 @@ compression = true [snapshot] # Path to store snapshot dump file # if not set, use data_dir/snapshot -# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp +# snapshot file name format is dmp-2006-01-02T15:04:05.999999999 path = "" # Reserve newest max_num snapshot dump files diff --git a/doc/commands.json b/doc/commands.json index 7f354c9..8f17223 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -90,7 +90,7 @@ "readonly": false }, "FULLSYNC": { - "arguments": "-", + "arguments": "[NEW]", "group": "Replication", "readonly": false diff --git a/doc/commands.md b/doc/commands.md index 8337992..4c9f3bc 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -123,7 +123,7 @@ Table of Contents - [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count) - [Replication](#replication) - [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly) - - [FULLSYNC](#fullsync) + - [FULLSYNC [NEW]](#fullsync-new) - [SYNC logid](#sync-logid) - [Server](#server) - [PING](#ping) @@ -2477,12 +2477,14 @@ If the server is already master, `SLAVEOF NO ONE READONLY` will force the server If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. -### FULLSYNC +### FULLSYNC [NEW] Inner command, starts a fullsync from the master set by SLAVEOF. FULLSYNC will first try to sync all data from the master, save in local disk, then discard old dataset and load new one. +`FULLSYNC NEW` will generate a new snapshot and sync, otherwise it will use the latest existing snapshot if possible. + **Return value** **Examples** diff --git a/server/app.go b/server/app.go index 0a253ac..aeb71a4 100644 --- a/server/app.go +++ b/server/app.go @@ -34,6 +34,8 @@ type App struct { // handle slaves slock sync.Mutex slaves map[*client]struct{} + + snap *snapshotStore } func netType(s string) string { @@ -88,6 +90,10 @@ func NewApp(cfg *config.Config) (*App, error) { } } + if app.snap, err = newSnapshotStore(cfg); err != nil { + return nil, err + } + if len(app.cfg.SlaveOf) > 0 { //slave must readonly app.cfg.Readonly = true @@ -125,6 +131,8 @@ func (app *App) Close() { app.m.Close() + app.snap.Close() + if app.access != nil { app.access.Close() } diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 128e4af..5526d5c 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -4,10 +4,9 @@ import ( "fmt" "github.com/siddontang/go/hack" "github.com/siddontang/ledisdb/ledis" - "io/ioutil" - "os" "strconv" "strings" + "time" ) func slaveofCommand(c *client) error { @@ -49,27 +48,46 @@ func slaveofCommand(c *client) error { } func fullsyncCommand(c *client) error { - //todo, multi fullsync may use same dump file - dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_") + args := c.args + needNew := false + if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" { + needNew = true + } + + var s *snapshot + var err error + var t time.Time + + dumper := c.app.ldb + + if needNew { + s, t, err = c.app.snap.Create(dumper) + } else { + if s, t, err = c.app.snap.OpenLatest(); err != nil { + return err + } else if s == nil { + s, t, err = c.app.snap.Create(dumper) + } else { + gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2 + minT := time.Now().Add(-gap) + + //snapshot is too old + if t.Before(minT) { + s.Close() + s, t, err = c.app.snap.Create(dumper) + } + } + } + if err != nil { return err } - if err = c.app.ldb.Dump(dumpFile); err != nil { - return err - } + n := s.Size() - st, _ := dumpFile.Stat() - n := st.Size() + c.resp.writeBulkFrom(n, s) - dumpFile.Seek(0, os.SEEK_SET) - - c.resp.writeBulkFrom(n, dumpFile) - - name := dumpFile.Name() - dumpFile.Close() - - os.Remove(name) + s.Close() return nil } diff --git a/server/snapshot.go b/server/snapshot.go index 1643160..7240c7a 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -28,12 +28,13 @@ type snapshotStore struct { } func snapshotName(t time.Time) string { - return fmt.Sprintf("snap-%s.dmp", t.Format(snapshotTimeFormat)) + return fmt.Sprintf("dmp-%s", t.Format(snapshotTimeFormat)) } func parseSnapshotName(name string) (time.Time, error) { var timeString string - if _, err := fmt.Sscanf(name, "snap-%s.dmp", &timeString); err != nil { + if _, err := fmt.Sscanf(name, "dmp-%s", &timeString); err != nil { + println(err.Error()) return time.Time{}, err } when, err := time.Parse(snapshotTimeFormat, timeString) @@ -88,7 +89,7 @@ func (s *snapshotStore) checkSnapshots() error { } if _, err := parseSnapshotName(info.Name()); err != nil { - log.Error("invalid snapshot file name %s, err: %s, try remove", info.Name(), err.Error()) + log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error()) continue } @@ -161,8 +162,6 @@ type snapshot struct { io.ReadCloser f *os.File - - temp bool } func (st *snapshot) Read(b []byte) (int, error) { @@ -170,15 +169,7 @@ func (st *snapshot) Read(b []byte) (int, error) { } func (st *snapshot) Close() error { - if st.temp { - name := st.f.Name() - if err := st.f.Close(); err != nil { - return err - } - return os.Remove(name) - } else { - return st.f.Close() - } + return st.f.Close() } func (st *snapshot) Size() int64 { @@ -186,20 +177,18 @@ func (st *snapshot) Size() int64 { return s.Size() } -func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Time, error) { +func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) { s.Lock() defer s.Unlock() - if !temp { - s.purge(true) - } + s.purge(true) now := time.Now() name := snapshotName(now) tmpName := name + ".tmp" - if len(s.names) > 0 && !temp { + if len(s.names) > 0 { lastTime, _ := parseSnapshotName(s.names[len(s.names)-1]) if !now.After(lastTime) { return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", @@ -218,26 +207,17 @@ func (s *snapshotStore) Create(d snapshotDumper, temp bool) (*snapshot, time.Tim return nil, time.Time{}, err } - if temp { - if err := f.Sync(); err != nil { - f.Close() - return nil, time.Time{}, err - } - - f.Seek(0, os.SEEK_SET) - } else { - f.Close() - if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil { - return nil, time.Time{}, err - } - - if f, err = os.Open(s.snapshotPath(name)); err != nil { - return nil, time.Time{}, err - } - s.names = append(s.names, name) + f.Close() + if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err } - return &snapshot{f: f, temp: temp}, now, nil + if f, err = os.Open(s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err + } + s.names = append(s.names, name) + + return &snapshot{f: f}, now, nil } func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { @@ -256,5 +236,5 @@ func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { return nil, time.Time{}, err } - return &snapshot{f: f, temp: false}, t, err + return &snapshot{f: f}, t, err } diff --git a/server/snapshot_test.go b/server/snapshot_test.go index 5e9ba46..d1e9230 100644 --- a/server/snapshot_test.go +++ b/server/snapshot_test.go @@ -30,7 +30,7 @@ func TestSnapshot(t *testing.T) { t.Fatal(err) } - if f, _, err := s.Create(d, false); err != nil { + if f, _, err := s.Create(d); err != nil { t.Fatal(err) } else { defer f.Close() @@ -43,7 +43,7 @@ func TestSnapshot(t *testing.T) { } } - if f, _, err := s.Create(d, false); err != nil { + if f, _, err := s.Create(d); err != nil { t.Fatal(err) } else { defer f.Close() @@ -55,7 +55,7 @@ func TestSnapshot(t *testing.T) { } } - if f, _, err := s.Create(d, false); err != nil { + if f, _, err := s.Create(d); err != nil { t.Fatal(err) } else { defer f.Close() @@ -73,29 +73,5 @@ func TestSnapshot(t *testing.T) { t.Fatal("must 2 snapshot") } - if f, _, err := s.Create(d, true); err != nil { - t.Fatal(err) - } else { - if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { - t.Fatal("invalid read snapshot") - } - - if len(s.names) != 2 { - t.Fatal("must 2 snapshot") - } - - fs, _ = ioutil.ReadDir(cfg.Snapshot.Path) - if len(fs) != 3 { - t.Fatal("must 3 snapshot") - } - - f.Close() - } - - fs, _ = ioutil.ReadDir(cfg.Snapshot.Path) - if len(fs) != 2 { - t.Fatal("must 2 snapshot") - } - s.Close() } From 4adf3e899696e9235cc174217e35558f094fba20 Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 13 Oct 2014 14:37:31 +0800 Subject: [PATCH 13/13] add some replication info --- server/info.go | 14 ++++++++++++++ server/replication.go | 7 +++++++ 2 files changed, 21 insertions(+) diff --git a/server/info.go b/server/info.go index 2babae8..2c729ed 100644 --- a/server/info.go +++ b/server/info.go @@ -28,6 +28,11 @@ type info struct { Persistence struct { DBName string } + + Replication struct { + PubLogNum int64 + PubLogTotalTime int64 //milliseconds + } } func newInfo(app *App) (i *info, err error) { @@ -156,6 +161,15 @@ func (i *info) dumpReplication(buf *bytes.Buffer) { slaves = append(slaves, s.remoteAddr) } + num := i.Replication.PubLogNum + p = append(p, infoPair{"pub_log_num", num}) + + if num != 0 { + p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num}) + } else { + p = append(p, infoPair{"pub_log_per_time", 0}) + } + p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf}) if len(slaves) > 0 { diff --git a/server/replication.go b/server/replication.go index a21c46f..7b37de4 100644 --- a/server/replication.go +++ b/server/replication.go @@ -15,6 +15,7 @@ import ( "path" "strconv" "sync" + "sync/atomic" "time" ) @@ -336,6 +337,8 @@ func (app *App) publishNewLog(l *rpl.Log) { return } + startTime := time.Now() + ack := &syncAck{ logId, make(chan uint64, len(ss)), } @@ -369,4 +372,8 @@ func (app *App) publishNewLog(l *rpl.Log) { case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Millisecond): log.Info("replication wait timeout") } + + stopTime := time.Now() + atomic.AddInt64(&app.info.Replication.PubLogNum, 1) + atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6) }