diff --git a/Makefile b/Makefile index 6764157..cfdddda 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,5 @@ INSTALL_PATH ?= $(CURDIR) -$(shell ./bootstrap.sh >> /dev/null 2>&1) - $(shell ./tools/build_config.sh build_config.mk $INSTALL_PATH) include build_config.mk diff --git a/README.md b/README.md index 2f12a55..0ebe2a6 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,9 @@ Create a workspace and checkout ledisdb source cd src/github.com/siddontang/ledisdb + #install go dependences + ./bootstrap.sh + #set build and run environment source dev.sh diff --git a/cmd/ledis-cli/const.go b/cmd/ledis-cli/const.go index f03b249..19c5ee9 100644 --- a/cmd/ledis-cli/const.go +++ b/cmd/ledis-cli/const.go @@ -1,4 +1,4 @@ -//This file was generated by .tools/generate_commands.py on Wed Oct 08 2014 16:36:20 +0800 +//This file was generated by .tools/generate_commands.py on Fri Oct 10 2014 09:08:54 +0800 package main var helpCommands = [][]string{ @@ -87,7 +87,7 @@ var helpCommands = [][]string{ {"SINTER", "key [key ...]", "Set"}, {"SINTERSTORE", "destination key [key ...]", "Set"}, {"SISMEMBER", "key member", "Set"}, - {"SLAVEOF", "host port [restart]", "Replication"}, + {"SLAVEOF", "host port [RESTART] [READONLY]", "Replication"}, {"SMCLEAR", "key [key ...]", "Set"}, {"SMEMBERS", "key", "Set"}, {"SPERSIST", "key", "Set"}, diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go index b6a798d..6f9bac4 100644 --- a/cmd/ledis-dump/main.go +++ b/cmd/ledis-dump/main.go @@ -14,7 +14,7 @@ var port = flag.Int("port", 6380, "ledis server port") var sock = flag.String("sock", "", "ledis unix socket domain") var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") -var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync +var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync func main() { flag.Parse() diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index 448120c..e132506 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -18,6 +18,9 @@ var configFile = flag.String("config", "", "ledisdb config file") var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") var usePprof = flag.Bool("pprof", false, "enable pprof") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") +var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") +var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") +var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -43,6 +46,15 @@ func main() { cfg.DBName = *dbName } + if len(*slaveof) > 0 { + cfg.SlaveOf = *slaveof + cfg.Readonly = true + cfg.UseReplication = true + } else { + cfg.Readonly = *readonly + cfg.UseReplication = *rpl + } + var app *server.App app, err = server.NewApp(cfg) if err != nil { diff --git a/config/config.go b/config/config.go index a90e40c..aa74c9d 100644 --- a/config/config.go +++ b/config/config.go @@ -14,8 +14,7 @@ var ( ) const ( - DefaultAddr string = "127.0.0.1:6380" - DefaultHttpAddr string = "127.0.0.1:11181" + DefaultAddr string = "127.0.0.1:6380" DefaultDBName string = "goleveldb" @@ -37,13 +36,19 @@ type LMDBConfig struct { type ReplicationConfig struct { Path string `toml:"path"` - ExpiredLogDays int `toml:"expired_log_days"` Sync bool `toml:"sync"` WaitSyncTime int `toml:"wait_sync_time"` WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"` + ExpiredLogDays int `toml:"expired_log_days"` + SyncLog int `toml:"sync_log"` Compression bool `toml:"compression"` } +type SnapshotConfig struct { + Path string `toml:"path"` + MaxNum int `toml:"max_num"` +} + type Config struct { FileName string `toml:"-"` @@ -53,6 +58,8 @@ type Config struct { SlaveOf string `toml:"slaveof"` + Readonly bool `toml:readonly` + DataDir string `toml:"data_dir"` DBName string `toml:"db_name"` @@ -67,6 +74,8 @@ type Config struct { UseReplication bool `toml:"use_replication"` Replication ReplicationConfig `toml:"replication"` + + Snapshot SnapshotConfig `toml:"snapshot"` } func NewConfigWithFile(fileName string) (*Config, error) { @@ -91,6 +100,8 @@ func NewConfigWithData(data []byte) (*Config, error) { return nil, err } + cfg.adjust() + return cfg, nil } @@ -98,13 +109,14 @@ func NewConfigDefault() *Config { cfg := new(Config) cfg.Addr = DefaultAddr - cfg.HttpAddr = DefaultHttpAddr + cfg.HttpAddr = "" cfg.DataDir = DefaultDataDir cfg.DBName = DefaultDBName cfg.SlaveOf = "" + cfg.Readonly = false // disable access log cfg.AccessLog = "" @@ -112,28 +124,37 @@ func NewConfigDefault() *Config { cfg.LMDB.MapSize = 20 * 1024 * 1024 cfg.LMDB.NoSync = true - cfg.Replication.WaitSyncTime = 1 + cfg.UseReplication = false + cfg.Replication.WaitSyncTime = 500 cfg.Replication.Compression = true cfg.Replication.WaitMaxSlaveAcks = 2 + cfg.Replication.SyncLog = 0 + cfg.Snapshot.MaxNum = 1 + + cfg.adjust() return cfg } -func (cfg *LevelDBConfig) Adjust() { - if cfg.CacheSize <= 0 { - cfg.CacheSize = 4 * 1024 * 1024 +func (cfg *Config) adjust() { + if cfg.LevelDB.CacheSize <= 0 { + cfg.LevelDB.CacheSize = 4 * 1024 * 1024 } - if cfg.BlockSize <= 0 { - cfg.BlockSize = 4 * 1024 + if cfg.LevelDB.BlockSize <= 0 { + cfg.LevelDB.BlockSize = 4 * 1024 } - if cfg.WriteBufferSize <= 0 { - cfg.WriteBufferSize = 4 * 1024 * 1024 + if cfg.LevelDB.WriteBufferSize <= 0 { + cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 } - if cfg.MaxOpenFiles < 1024 { - cfg.MaxOpenFiles = 1024 + if cfg.LevelDB.MaxOpenFiles < 1024 { + cfg.LevelDB.MaxOpenFiles = 1024 + } + + if cfg.Replication.ExpiredLogDays <= 0 { + cfg.Replication.ExpiredLogDays = 7 } } diff --git a/config/config.toml b/config/config.toml index b8d80ec..8a8cfba 100644 --- a/config/config.toml +++ b/config/config.toml @@ -16,6 +16,10 @@ access_log = "" # Any write operations except flushall and replication will be disabled in slave mode. slaveof = "" +# Readonly mode, slave server is always readonly even readonly = false +# for readonly mode, only replication and flushall can write +readonly = false + # Choose which backend storage to use, now support: # # leveldb @@ -50,20 +54,35 @@ nosync = true # if not set, use data_dir/rpl path = "" -# Expire write ahead logs after the given days -expired_log_days = 7 - # If sync is true, the new log must be sent to some slaves, and then commit. # It will reduce performance but have better high availability. sync = true -# If sync is true, wait at last wait_sync_time seconds for slave syncing this log -wait_sync_time = 1 +# If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log +wait_sync_time = 500 # If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok. # n is slave number # If 0, wait (n + 1) / 2 acks. wait_max_slave_acks = 2 +# Expire write ahead logs after the given days +expired_log_days = 7 + +# Sync log to disk if possible +# 0: no sync +# 1: sync every second +# 2: sync every commit +sync_log = 0 + # Compress the log or not compression = true + +[snapshot] +# Path to store snapshot dump file +# if not set, use data_dir/snapshot +# snapshot file name format is dmp-2006-01-02T15:04:05.999999999 +path = "" + +# Reserve newest max_num snapshot dump files +max_num = 1 diff --git a/doc/commands.json b/doc/commands.json index 8268ee8..8f17223 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -90,7 +90,7 @@ "readonly": false }, "FULLSYNC": { - "arguments": "-", + "arguments": "[NEW]", "group": "Replication", "readonly": false @@ -301,7 +301,7 @@ "readonly": false }, "SLAVEOF": { - "arguments": "host port [restart]", + "arguments": "host port [RESTART] [READONLY]", "group": "Replication", "readonly": false }, diff --git a/doc/commands.md b/doc/commands.md index c01b67c..4c9f3bc 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -122,8 +122,8 @@ Table of Contents - [BPERSIST key](#bpersist-key) - [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count) - [Replication](#replication) - - [SLAVEOF host port [restart]](#slaveof-host-port-restart) - - [FULLSYNC](#fullsync) + - [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly) + - [FULLSYNC [NEW]](#fullsync-new) - [SYNC logid](#sync-logid) - [Server](#server) - [PING](#ping) @@ -2466,21 +2466,25 @@ See [XSCAN](#xscan-key-match-match-count-count) for more information. ## Replication -### SLAVEOF host port [restart] +### SLAVEOF host port [RESTART] [READONLY] -Changes the replication settings of a slave on the fly. If the server is already acting as slave, SLAVEOF NO ONE will turn off the replication. +Changes the replication settings of a slave on the fly. If the server is already acting as slave, `SLAVEOF NO ONE` will turn off the replication and turn the server into master. `SLAVEOF NO ONE READONLY` will turn the server into master with readonly mode. -SLAVEOF host port will make the server a slave of another server listening at the specified host and port. +If the server is already master, `SLAVEOF NO ONE READONLY` will force the server to readonly mode, and `SLAVEOF NO ONE` will disable readonly. -If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, if restart is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. +`SLAVEOF host port` will make the server a slave of another server listening at the specified host and port. + +If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. -### FULLSYNC +### FULLSYNC [NEW] Inner command, starts a fullsync from the master set by SLAVEOF. FULLSYNC will first try to sync all data from the master, save in local disk, then discard old dataset and load new one. +`FULLSYNC NEW` will generate a new snapshot and sync, otherwise it will use the latest existing snapshot if possible. + **Return value** **Examples** diff --git a/etc/ledis.conf b/etc/ledis.conf index b8d80ec..27de6c5 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -16,6 +16,10 @@ access_log = "" # Any write operations except flushall and replication will be disabled in slave mode. slaveof = "" +# Readonly mode, slave server is always readonly even readonly = false +# for readonly mode, only replication and flushall can write +readonly = false + # Choose which backend storage to use, now support: # # leveldb @@ -50,20 +54,35 @@ nosync = true # if not set, use data_dir/rpl path = "" -# Expire write ahead logs after the given days -expired_log_days = 7 - # If sync is true, the new log must be sent to some slaves, and then commit. # It will reduce performance but have better high availability. sync = true -# If sync is true, wait at last wait_sync_time seconds for slave syncing this log -wait_sync_time = 1 +# If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log +wait_sync_time = 500 # If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok. # n is slave number # If 0, wait (n + 1) / 2 acks. wait_max_slave_acks = 2 +# Expire write ahead logs after the given days +expired_log_days = 7 + +# Sync log to disk if possible +# 0: no sync +# 1: sync every second +# 2: sync every commit +sync_log = 0 + # Compress the log or not compression = true + +[snapshot] +# Path to store snapshot dump file +# if not set, use data_dir/snapshot +# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp +path = "" + +# Reserve newest max_num snapshot dump files +max_num = 1 diff --git a/ledis/const.go b/ledis/const.go index 3b30123..3e17a95 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -46,11 +46,6 @@ var ( } ) -const ( - RDWRMode = 0 - ROnlyMode = 1 -) - const ( defaultScanCount int = 10 ) diff --git a/ledis/dump.go b/ledis/dump.go index 436e707..1f6d5e2 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -46,21 +46,22 @@ func (l *Ledis) Dump(w io.Writer) error { var commitID uint64 var snap *store.Snapshot - { - l.wLock.Lock() - defer l.wLock.Unlock() + l.wLock.Lock() - if l.r != nil { - if commitID, err = l.r.LastCommitID(); err != nil { - return err - } - } - - if snap, err = l.ldb.NewSnapshot(); err != nil { + if l.r != nil { + if commitID, err = l.r.LastCommitID(); err != nil { + l.wLock.Unlock() return err } } + if snap, err = l.ldb.NewSnapshot(); err != nil { + l.wLock.Unlock() + return err + } + + l.wLock.Unlock() + wb := bufio.NewWriterSize(w, 4096) h := &DumpHead{commitID} diff --git a/ledis/dump_test.go b/ledis/dump_test.go index e29d928..98e4c9e 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -9,7 +9,7 @@ import ( ) func TestDump(t *testing.T) { - cfgM := new(config.Config) + cfgM := config.NewConfigDefault() cfgM.DataDir = "/tmp/test_ledis_master" os.RemoveAll(cfgM.DataDir) @@ -19,7 +19,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - cfgS := new(config.Config) + cfgS := config.NewConfigDefault() cfgS.DataDir = "/tmp/test_ledis_slave" os.RemoveAll(cfgM.DataDir) diff --git a/ledis/ledis.go b/ledis/ledis.go index 8893eee..75caedd 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -33,17 +33,10 @@ type Ledis struct { wLock sync.RWMutex //allow one write at same time commitLock sync.Mutex //allow one write commit at same time - // for readonly mode, only replication and flushall can write - readOnly bool - lock io.Closer } func Open(cfg *config.Config) (*Ledis, error) { - return Open2(cfg, RDWRMode) -} - -func Open2(cfg *config.Config, flags int) (*Ledis, error) { if len(cfg.DataDir) == 0 { cfg.DataDir = config.DefaultDataDir } @@ -53,13 +46,12 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) { var err error l := new(Ledis) + l.cfg = cfg if l.lock, err = filelock.Lock(path.Join(cfg.DataDir, "LOCK")); err != nil { return nil, err } - l.readOnly = (flags&ROnlyMode > 0) - l.quit = make(chan struct{}) if l.ldb, err = store.Open(cfg); err != nil { @@ -163,7 +155,7 @@ func (l *Ledis) flushAll() error { } func (l *Ledis) IsReadOnly() bool { - if l.readOnly { + if l.cfg.Readonly { return true } else if l.r != nil { if b, _ := l.r.CommitIDBehind(); b { @@ -173,10 +165,6 @@ func (l *Ledis) IsReadOnly() bool { return false } -func (l *Ledis) SetReadOnly(b bool) { - l.readOnly = b -} - func (l *Ledis) onDataExpired() { defer l.wg.Done() diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index 45f1c7f..51e4d0d 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -12,7 +12,7 @@ var testLedisOnce sync.Once func getTestDB() *DB { f := func() { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/test_ledis" os.RemoveAll(cfg.DataDir) diff --git a/ledis/replication.go b/ledis/replication.go index b68a990..33a5e12 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -110,7 +110,7 @@ func (l *Ledis) WaitReplication() error { func (l *Ledis) StoreLogsFromReader(rb io.Reader) error { if !l.ReplicationUsed() { return ErrRplNotSupport - } else if !l.readOnly { + } else if !l.cfg.Readonly { return ErrRplInRDWR } diff --git a/ledis/replication_test.go b/ledis/replication_test.go index c300ef8..fc5e210 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -30,7 +30,7 @@ func TestReplication(t *testing.T) { var slave *Ledis var err error - cfgM := new(config.Config) + cfgM := config.NewConfigDefault() cfgM.DataDir = "/tmp/test_repl/master" cfgM.UseReplication = true @@ -43,13 +43,14 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - cfgS := new(config.Config) + cfgS := config.NewConfigDefault() cfgS.DataDir = "/tmp/test_repl/slave" cfgS.UseReplication = true + cfgS.Readonly = true os.RemoveAll(cfgS.DataDir) - slave, err = Open2(cfgS, ROnlyMode) + slave, err = Open(cfgS) if err != nil { t.Fatal(err) } diff --git a/ledis/tx_test.go b/ledis/tx_test.go index cb3a7f0..e21c0a8 100644 --- a/ledis/tx_test.go +++ b/ledis/tx_test.go @@ -190,7 +190,7 @@ func testTxSelect(t *testing.T, db *DB) { } func testTx(t *testing.T, name string) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/ledis_test_tx" cfg.DBName = name diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index f9d2a7e..c00f36c 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -21,6 +21,8 @@ type GoLevelDBStore struct { first uint64 last uint64 + + lastCommit time.Time } func (s *GoLevelDBStore) FirstID() (uint64, error) { @@ -132,7 +134,17 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { w.Put(key, buf.Bytes()) } - if err := w.Commit(); err != nil { + n := time.Now() + if s.cfg.Replication.SyncLog == 2 || + (s.cfg.Replication.SyncLog == 1 && n.Sub(s.lastCommit) > time.Second) { + err = w.SyncCommit() + } else { + err = w.Commit() + } + + s.lastCommit = n + + if err != nil { return err } @@ -257,12 +269,12 @@ func (s *GoLevelDBStore) open() error { } func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DBName = "goleveldb" cfg.DBPath = base - cfg.LevelDB.BlockSize = 4 * 1024 * 1024 - cfg.LevelDB.CacheSize = 16 * 1024 * 1024 - cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 + cfg.LevelDB.BlockSize = 16 * 1024 * 1024 + cfg.LevelDB.CacheSize = 64 * 1024 * 1024 + cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024 cfg.LevelDB.Compression = false s := new(GoLevelDBStore) diff --git a/rpl/rpl_test.go b/rpl/rpl_test.go index 06fcf7d..63ff427 100644 --- a/rpl/rpl_test.go +++ b/rpl/rpl_test.go @@ -14,7 +14,7 @@ func TestReplication(t *testing.T) { } defer os.RemoveAll(dir) - c := new(config.Config) + c := config.NewConfigDefault() c.Replication.Path = dir r, err := NewReplication(c) diff --git a/server/app.go b/server/app.go index dbf12e5..aeb71a4 100644 --- a/server/app.go +++ b/server/app.go @@ -34,6 +34,8 @@ type App struct { // handle slaves slock sync.Mutex slaves map[*client]struct{} + + snap *snapshotStore } func netType(s string) string { @@ -88,13 +90,16 @@ func NewApp(cfg *config.Config) (*App, error) { } } - flag := ledis.RDWRMode - if len(app.cfg.SlaveOf) > 0 { - //slave must readonly - flag = ledis.ROnlyMode + if app.snap, err = newSnapshotStore(cfg); err != nil { + return nil, err } - if app.ldb, err = ledis.Open2(cfg, flag); err != nil { + if len(app.cfg.SlaveOf) > 0 { + //slave must readonly + app.cfg.Readonly = true + } + + if app.ldb, err = ledis.Open(cfg); err != nil { return nil, err } @@ -126,6 +131,8 @@ func (app *App) Close() { app.m.Close() + app.snap.Close() + if app.access != nil { app.access.Close() } @@ -135,7 +142,7 @@ func (app *App) Close() { func (app *App) Run() { if len(app.cfg.SlaveOf) > 0 { - app.slaveof(app.cfg.SlaveOf, false) + app.slaveof(app.cfg.SlaveOf, false, app.cfg.Readonly) } go app.httpServe() diff --git a/server/app_test.go b/server/app_test.go index aa4daba..17e2989 100644 --- a/server/app_test.go +++ b/server/app_test.go @@ -29,7 +29,7 @@ func startTestApp() { f := func() { newTestLedisClient() - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/testdb" os.RemoveAll(cfg.DataDir) diff --git a/server/cmd_replication.go b/server/cmd_replication.go index aa6ede4..5526d5c 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -4,27 +4,27 @@ import ( "fmt" "github.com/siddontang/go/hack" "github.com/siddontang/ledisdb/ledis" - "io/ioutil" - "os" "strconv" "strings" + "time" ) func slaveofCommand(c *client) error { args := c.args - if len(args) != 2 || len(args) != 3 { + if len(args) != 2 && len(args) != 3 { return ErrCmdParams } masterAddr := "" restart := false + readonly := false if strings.ToLower(hack.String(args[0])) == "no" && strings.ToLower(hack.String(args[1])) == "one" { //stop replication, use master = "" - if len(args) != 2 { - return ErrCmdParams + if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "readonly" { + readonly = true } } else { if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil { @@ -38,7 +38,7 @@ func slaveofCommand(c *client) error { } } - if err := c.app.slaveof(masterAddr, restart); err != nil { + if err := c.app.slaveof(masterAddr, restart, readonly); err != nil { return err } @@ -48,27 +48,46 @@ func slaveofCommand(c *client) error { } func fullsyncCommand(c *client) error { - //todo, multi fullsync may use same dump file - dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_") + args := c.args + needNew := false + if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" { + needNew = true + } + + var s *snapshot + var err error + var t time.Time + + dumper := c.app.ldb + + if needNew { + s, t, err = c.app.snap.Create(dumper) + } else { + if s, t, err = c.app.snap.OpenLatest(); err != nil { + return err + } else if s == nil { + s, t, err = c.app.snap.Create(dumper) + } else { + gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2 + minT := time.Now().Add(-gap) + + //snapshot is too old + if t.Before(minT) { + s.Close() + s, t, err = c.app.snap.Create(dumper) + } + } + } + if err != nil { return err } - if err = c.app.ldb.Dump(dumpFile); err != nil { - return err - } + n := s.Size() - st, _ := dumpFile.Stat() - n := st.Size() + c.resp.writeBulkFrom(n, s) - dumpFile.Seek(0, os.SEEK_SET) - - c.resp.writeBulkFrom(n, dumpFile) - - name := dumpFile.Name() - dumpFile.Close() - - os.Remove(name) + s.Close() return nil } diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index 76bf2c2..3c79836 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -37,12 +37,12 @@ func TestReplication(t *testing.T) { data_dir := "/tmp/test_replication" os.RemoveAll(data_dir) - masterCfg := new(config.Config) + masterCfg := config.NewConfigDefault() masterCfg.DataDir = fmt.Sprintf("%s/master", data_dir) masterCfg.Addr = "127.0.0.1:11182" masterCfg.UseReplication = true masterCfg.Replication.Sync = true - masterCfg.Replication.WaitSyncTime = 5 + masterCfg.Replication.WaitSyncTime = 5000 var master *App var slave *App @@ -53,7 +53,7 @@ func TestReplication(t *testing.T) { } defer master.Close() - slaveCfg := new(config.Config) + slaveCfg := config.NewConfigDefault() slaveCfg.DataDir = fmt.Sprintf("%s/slave", data_dir) slaveCfg.Addr = "127.0.0.1:11183" slaveCfg.SlaveOf = masterCfg.Addr @@ -96,7 +96,7 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - slave.slaveof("", false) + slave.slaveof("", false, false) db.Set([]byte("a2"), value) db.Set([]byte("b2"), value) @@ -112,7 +112,7 @@ func TestReplication(t *testing.T) { t.Fatal("must error") } - slave.slaveof(masterCfg.Addr, false) + slave.slaveof(masterCfg.Addr, false, false) time.Sleep(1 * time.Second) diff --git a/server/doc.go b/server/doc.go index 7dc47ff..e5a5dd4 100644 --- a/server/doc.go +++ b/server/doc.go @@ -9,7 +9,7 @@ // // Start a ledis server is very simple: // -// cfg := new(config.Config) +// cfg := config.NewConfigDefault() // cfg.Addr = "127.0.0.1:6380" // cfg.DataDir = "/tmp/ledis" // app := server.NewApp(cfg) diff --git a/server/info.go b/server/info.go index 21c989b..2c729ed 100644 --- a/server/info.go +++ b/server/info.go @@ -28,6 +28,11 @@ type info struct { Persistence struct { DBName string } + + Replication struct { + PubLogNum int64 + PubLogTotalTime int64 //milliseconds + } } func newInfo(app *App) (i *info, err error) { @@ -115,7 +120,8 @@ func (i *info) dumpServer(buf *bytes.Buffer) { i.dumpPairs(buf, infoPair{"os", i.Server.OS}, infoPair{"process_id", i.Server.ProceessId}, infoPair{"addr", i.app.cfg.Addr}, - infoPair{"http_addr", i.app.cfg.HttpAddr}) + infoPair{"http_addr", i.app.cfg.HttpAddr}, + infoPair{"readonly", i.app.cfg.Readonly}) } func (i *info) dumpClients(buf *bytes.Buffer) { @@ -155,16 +161,29 @@ func (i *info) dumpReplication(buf *bytes.Buffer) { slaves = append(slaves, s.remoteAddr) } - p = append(p, infoPair{"readonly", i.app.ldb.IsReadOnly()}) + num := i.Replication.PubLogNum + p = append(p, infoPair{"pub_log_num", num}) + + if num != 0 { + p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num}) + } else { + p = append(p, infoPair{"pub_log_per_time", 0}) + } + + p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf}) if len(slaves) > 0 { - p = append(p, infoPair{"slave", strings.Join(slaves, ",")}) + p = append(p, infoPair{"slaves", strings.Join(slaves, ",")}) } if s, _ := i.app.ldb.ReplicationStat(); s != nil { p = append(p, infoPair{"last_log_id", s.LastID}) p = append(p, infoPair{"first_log_id", s.FirstID}) p = append(p, infoPair{"commit_log_id", s.CommitID}) + } else { + p = append(p, infoPair{"last_log_id", 0}) + p = append(p, infoPair{"first_log_id", 0}) + p = append(p, infoPair{"commit_log_id", 0}) } i.dumpPairs(buf, p...) diff --git a/server/replication.go b/server/replication.go index b8b1868..7b37de4 100644 --- a/server/replication.go +++ b/server/replication.go @@ -15,6 +15,7 @@ import ( "path" "strconv" "sync" + "sync/atomic" "time" ) @@ -93,7 +94,7 @@ func (m *master) startReplication(masterAddr string, restart bool) error { m.quit = make(chan struct{}, 1) - m.app.ldb.SetReadOnly(true) + m.app.cfg.Readonly = true m.wg.Add(1) go m.runReplication(restart) @@ -238,10 +239,16 @@ func (m *master) sync() error { } -func (app *App) slaveof(masterAddr string, restart bool) error { +func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error { app.m.Lock() defer app.m.Unlock() + //in master mode and no slaveof, only set readonly + if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 { + app.cfg.Readonly = readonly + return nil + } + if !app.ldb.ReplicationUsed() { return fmt.Errorf("slaveof must enable replication") } @@ -253,7 +260,7 @@ func (app *App) slaveof(masterAddr string, restart bool) error { return err } - app.ldb.SetReadOnly(false) + app.cfg.Readonly = readonly } else { return app.m.startReplication(masterAddr, restart) } @@ -287,7 +294,10 @@ func (app *App) removeSlave(c *client) { app.slock.Lock() defer app.slock.Unlock() - delete(app.slaves, c) + if _, ok := app.slaves[c]; ok { + delete(app.slaves, c) + log.Info("remove slave %s", c.remoteAddr) + } if c.ack != nil { asyncNotifyUint64(c.ack.ch, c.lastLogID) @@ -313,7 +323,7 @@ func (app *App) publishNewLog(l *rpl.Log) { logId := l.ID for s, _ := range app.slaves { if s.lastLogID >= logId { - //slave has already this log + //slave has already owned this log ss = []*client{} break } else { @@ -327,6 +337,8 @@ func (app *App) publishNewLog(l *rpl.Log) { return } + startTime := time.Now() + ack := &syncAck{ logId, make(chan uint64, len(ss)), } @@ -357,7 +369,11 @@ func (app *App) publishNewLog(l *rpl.Log) { select { case <-done: - case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second): + case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Millisecond): log.Info("replication wait timeout") } + + stopTime := time.Now() + atomic.AddInt64(&app.info.Replication.PubLogNum, 1) + atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6) } diff --git a/server/scan_test.go b/server/scan_test.go index bc47395..ed9b71c 100644 --- a/server/scan_test.go +++ b/server/scan_test.go @@ -9,7 +9,7 @@ import ( ) func TestScan(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/test_scan" cfg.Addr = "127.0.0.1:11185" diff --git a/server/script_test.go b/server/script_test.go index a422a82..280d86e 100644 --- a/server/script_test.go +++ b/server/script_test.go @@ -101,7 +101,7 @@ var testScript4 = ` ` func TestLuaCall(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.Addr = ":11188" cfg.DataDir = "/tmp/testscript" cfg.DBName = "memory" diff --git a/server/snapshot.go b/server/snapshot.go new file mode 100644 index 0000000..7240c7a --- /dev/null +++ b/server/snapshot.go @@ -0,0 +1,240 @@ +package server + +import ( + "fmt" + "github.com/siddontang/go/log" + "github.com/siddontang/ledisdb/config" + "io" + "io/ioutil" + "os" + "path" + "sort" + "sync" + "time" +) + +const ( + snapshotTimeFormat = "2006-01-02T15:04:05.999999999" +) + +type snapshotStore struct { + sync.Mutex + + cfg *config.Config + + names []string + + quit chan struct{} +} + +func snapshotName(t time.Time) string { + return fmt.Sprintf("dmp-%s", t.Format(snapshotTimeFormat)) +} + +func parseSnapshotName(name string) (time.Time, error) { + var timeString string + if _, err := fmt.Sscanf(name, "dmp-%s", &timeString); err != nil { + println(err.Error()) + return time.Time{}, err + } + when, err := time.Parse(snapshotTimeFormat, timeString) + if err != nil { + return time.Time{}, err + } + return when, nil +} + +func newSnapshotStore(cfg *config.Config) (*snapshotStore, error) { + if len(cfg.Snapshot.Path) == 0 { + cfg.Snapshot.Path = path.Join(cfg.DataDir, "snapshot") + } + + if err := os.MkdirAll(cfg.Snapshot.Path, 0755); err != nil { + return nil, err + } + + s := new(snapshotStore) + s.cfg = cfg + s.names = make([]string, 0, s.cfg.Snapshot.MaxNum) + + s.quit = make(chan struct{}) + + if err := s.checkSnapshots(); err != nil { + return nil, err + } + + go s.run() + + return s, nil +} + +func (s *snapshotStore) Close() { + close(s.quit) +} + +func (s *snapshotStore) checkSnapshots() error { + cfg := s.cfg + snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path) + if err != nil { + log.Error("read %s error: %s", cfg.Snapshot.Path, err.Error()) + return err + } + + names := []string{} + for _, info := range snapshots { + if path.Ext(info.Name()) == ".tmp" { + log.Error("temp snapshot file name %s, try remove", info.Name()) + os.Remove(path.Join(cfg.Snapshot.Path, info.Name())) + continue + } + + if _, err := parseSnapshotName(info.Name()); err != nil { + log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error()) + continue + } + + names = append(names, info.Name()) + } + + //from old to new + sort.Strings(names) + + s.names = names + + s.purge(false) + + return nil +} + +func (s *snapshotStore) run() { + t := time.NewTicker(60 * time.Minute) + defer t.Stop() + + for { + select { + case <-t.C: + s.Lock() + if err := s.checkSnapshots(); err != nil { + log.Error("check snapshots error %s", err.Error()) + } + s.Unlock() + case <-s.quit: + return + } + } +} + +func (s *snapshotStore) purge(create bool) { + var names []string + maxNum := s.cfg.Snapshot.MaxNum + num := len(s.names) - maxNum + + if create { + num++ + if num > len(s.names) { + num = len(s.names) + } + } + + if num > 0 { + names = s.names[0:num] + + n := copy(s.names, s.names[num:]) + s.names = s.names[0:n] + } + + for _, name := range names { + if err := os.Remove(s.snapshotPath(name)); err != nil { + log.Error("purge snapshot %s error %s", name, err.Error()) + } + } +} + +func (s *snapshotStore) snapshotPath(name string) string { + return path.Join(s.cfg.Snapshot.Path, name) +} + +type snapshotDumper interface { + Dump(w io.Writer) error +} + +type snapshot struct { + io.ReadCloser + + f *os.File +} + +func (st *snapshot) Read(b []byte) (int, error) { + return st.f.Read(b) +} + +func (st *snapshot) Close() error { + return st.f.Close() +} + +func (st *snapshot) Size() int64 { + s, _ := st.f.Stat() + return s.Size() +} + +func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) { + s.Lock() + defer s.Unlock() + + s.purge(true) + + now := time.Now() + name := snapshotName(now) + + tmpName := name + ".tmp" + + if len(s.names) > 0 { + lastTime, _ := parseSnapshotName(s.names[len(s.names)-1]) + if !now.After(lastTime) { + return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", + now.Format(snapshotTimeFormat), lastTime.Format(snapshotTimeFormat)) + } + } + + f, err := os.OpenFile(s.snapshotPath(tmpName), os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, time.Time{}, err + } + + if err := d.Dump(f); err != nil { + f.Close() + os.Remove(s.snapshotPath(tmpName)) + return nil, time.Time{}, err + } + + f.Close() + if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err + } + + if f, err = os.Open(s.snapshotPath(name)); err != nil { + return nil, time.Time{}, err + } + s.names = append(s.names, name) + + return &snapshot{f: f}, now, nil +} + +func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) { + s.Lock() + defer s.Unlock() + + if len(s.names) == 0 { + return nil, time.Time{}, nil + } + + name := s.names[len(s.names)-1] + t, _ := parseSnapshotName(name) + + f, err := os.Open(s.snapshotPath(name)) + if err != nil { + return nil, time.Time{}, err + } + + return &snapshot{f: f}, t, err +} diff --git a/server/snapshot_test.go b/server/snapshot_test.go new file mode 100644 index 0000000..d1e9230 --- /dev/null +++ b/server/snapshot_test.go @@ -0,0 +1,77 @@ +package server + +import ( + "github.com/siddontang/ledisdb/config" + "io" + "io/ioutil" + "os" + "path" + "testing" +) + +type testSnapshotDumper struct { +} + +func (d *testSnapshotDumper) Dump(w io.Writer) error { + w.Write([]byte("hello world")) + return nil +} + +func TestSnapshot(t *testing.T) { + cfg := config.NewConfigDefault() + cfg.Snapshot.MaxNum = 2 + cfg.Snapshot.Path = path.Join(os.TempDir(), "snapshot") + defer os.RemoveAll(cfg.Snapshot.Path) + + d := new(testSnapshotDumper) + + s, err := newSnapshotStore(cfg) + if err != nil { + t.Fatal(err) + } + + if f, _, err := s.Create(d); err != nil { + t.Fatal(err) + } else { + defer f.Close() + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + + if len(s.names) != 1 { + t.Fatal("must 1 snapshot") + } + } + + if f, _, err := s.Create(d); err != nil { + t.Fatal(err) + } else { + defer f.Close() + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + if len(s.names) != 2 { + t.Fatal("must 2 snapshot") + } + } + + if f, _, err := s.Create(d); err != nil { + t.Fatal(err) + } else { + defer f.Close() + if b, _ := ioutil.ReadAll(f); string(b) != "hello world" { + t.Fatal("invalid read snapshot") + } + + if len(s.names) != 2 { + t.Fatal("must 2 snapshot") + } + } + + fs, _ := ioutil.ReadDir(cfg.Snapshot.Path) + if len(fs) != 2 { + t.Fatal("must 2 snapshot") + } + + s.Close() +} diff --git a/store/boltdb/db.go b/store/boltdb/db.go index 15a0570..ac8cc03 100644 --- a/store/boltdb/db.go +++ b/store/boltdb/db.go @@ -98,7 +98,14 @@ func (db *DB) Delete(key []byte) error { return b.Delete(key) }) return err +} +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.Put(key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.Delete(key) } func (db *DB) NewIterator() driver.IIterator { @@ -152,6 +159,10 @@ func (db *DB) BatchPut(writes []driver.Write) error { return err } +func (db *DB) SyncBatchPut(writes []driver.Write) error { + return db.BatchPut(writes) +} + func (db *DB) Compact() error { return nil } diff --git a/store/boltdb/tx.go b/store/boltdb/tx.go index 058a5f7..8dba000 100644 --- a/store/boltdb/tx.go +++ b/store/boltdb/tx.go @@ -48,6 +48,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error { return nil } +func (t *Tx) SyncBatchPut(writes []driver.Write) error { + return t.BatchPut(writes) +} + func (t *Tx) Rollback() error { return t.tx.Rollback() } diff --git a/store/driver/batch.go b/store/driver/batch.go index 6b79c21..5fc461f 100644 --- a/store/driver/batch.go +++ b/store/driver/batch.go @@ -2,6 +2,7 @@ package driver type BatchPuter interface { BatchPut([]Write) error + SyncBatchPut([]Write) error } type Write struct { @@ -29,6 +30,10 @@ func (w *WriteBatch) Commit() error { return w.batch.BatchPut(w.wb) } +func (w *WriteBatch) SyncCommit() error { + return w.batch.SyncBatchPut(w.wb) +} + func (w *WriteBatch) Rollback() error { w.wb = w.wb[0:0] return nil diff --git a/store/driver/driver.go b/store/driver/driver.go index 859b840..e4312ce 100644 --- a/store/driver/driver.go +++ b/store/driver/driver.go @@ -16,6 +16,9 @@ type IDB interface { Put(key []byte, value []byte) error Delete(key []byte) error + SyncPut(key []byte, value []byte) error + SyncDelete(key []byte) error + NewIterator() IIterator NewWriteBatch() IWriteBatch @@ -53,6 +56,7 @@ type IWriteBatch interface { Put(key []byte, value []byte) Delete(key []byte) Commit() error + SyncCommit() error Rollback() error } diff --git a/store/goleveldb/batch.go b/store/goleveldb/batch.go index 74902b2..85b78c6 100644 --- a/store/goleveldb/batch.go +++ b/store/goleveldb/batch.go @@ -21,6 +21,10 @@ func (w *WriteBatch) Commit() error { return w.db.db.Write(w.wbatch, nil) } +func (w *WriteBatch) SyncCommit() error { + return w.db.db.Write(w.wbatch, w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { w.wbatch.Reset() return nil diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go index dad6a90..9924067 100644 --- a/store/goleveldb/db.go +++ b/store/goleveldb/db.go @@ -41,6 +41,8 @@ type DB struct { iteratorOpts *opt.ReadOptions + syncOpts *opt.WriteOptions + cache cache.Cache filter filter.Filter @@ -102,14 +104,15 @@ func (db *DB) initOpts() { db.iteratorOpts = &opt.ReadOptions{} db.iteratorOpts.DontFillCache = true + + db.syncOpts = &opt.WriteOptions{} + db.syncOpts.Sync = true } func newOptions(cfg *config.LevelDBConfig) *opt.Options { opts := &opt.Options{} opts.ErrorIfMissing = false - cfg.Adjust() - opts.BlockCache = cache.NewLRUCache(cfg.CacheSize) //we must use bloomfilter @@ -147,6 +150,14 @@ func (db *DB) Delete(key []byte) error { return db.db.Delete(key, nil) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.db.Put(key, value, db.syncOpts) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.db.Delete(key, db.syncOpts) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/hyperleveldb/batch.go b/store/hyperleveldb/batch.go index 149f0b4..8084335 100644 --- a/store/hyperleveldb/batch.go +++ b/store/hyperleveldb/batch.go @@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) return nil diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go index 071dd38..90af5be 100644 --- a/store/hyperleveldb/db.go +++ b/store/hyperleveldb/db.go @@ -81,6 +81,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -106,8 +108,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { opts.SetCreateIfMissing(true) - cfg.Adjust() - db.cache = NewLRUCache(cfg.CacheSize) opts.SetCache(db.cache) @@ -132,6 +132,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -171,6 +174,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/leveldb/batch.go b/store/leveldb/batch.go index 08a7d46..caadc03 100644 --- a/store/leveldb/batch.go +++ b/store/leveldb/batch.go @@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) return nil diff --git a/store/leveldb/db.go b/store/leveldb/db.go index ab29928..cd1fb1e 100644 --- a/store/leveldb/db.go +++ b/store/leveldb/db.go @@ -81,6 +81,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -106,8 +108,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { opts.SetCreateIfMissing(true) - cfg.Adjust() - db.cache = NewLRUCache(cfg.CacheSize) opts.SetCache(db.cache) @@ -132,6 +132,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -171,6 +174,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go index ca79706..6bf26f6 100644 --- a/store/mdb/mdb.go +++ b/store/mdb/mdb.go @@ -122,6 +122,14 @@ func (db MDB) BatchPut(writes []driver.Write) error { return itr.err } +func (db MDB) SyncBatchPut(writes []driver.Write) error { + if err := db.BatchPut(writes); err != nil { + return err + } + + return db.env.Sync(1) +} + func (db MDB) Get(key []byte) ([]byte, error) { tx, err := db.env.BeginTxn(nil, mdb.RDONLY) if err != nil { @@ -148,6 +156,22 @@ func (db MDB) Delete(key []byte) error { return itr.Error() } +func (db MDB) SyncPut(key []byte, value []byte) error { + if err := db.Put(key, value); err != nil { + return err + } + + return db.env.Sync(1) +} + +func (db MDB) SyncDelete(key []byte) error { + if err := db.Delete(key); err != nil { + return err + } + + return db.env.Sync(1) +} + type MDBIterator struct { key []byte value []byte diff --git a/store/mdb/tx.go b/store/mdb/tx.go index e98a5f6..e2e3be0 100644 --- a/store/mdb/tx.go +++ b/store/mdb/tx.go @@ -74,7 +74,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error { itr.setState() return itr.Close() +} +func (t *Tx) SyncBatchPut(writes []driver.Write) error { + return t.BatchPut(writes) } func (t *Tx) Rollback() error { diff --git a/store/rocksdb/batch.go b/store/rocksdb/batch.go index b69c383..017fc88 100644 --- a/store/rocksdb/batch.go +++ b/store/rocksdb/batch.go @@ -45,6 +45,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.rocksdb_writebatch_clear(w.wbatch) return nil diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go index f3fb406..35a1c7e 100644 --- a/store/rocksdb/db.go +++ b/store/rocksdb/db.go @@ -85,6 +85,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -111,8 +113,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { opts.SetCreateIfMissing(true) - cfg.Adjust() - db.env = NewDefaultEnv() db.env.SetBackgroundThreads(runtime.NumCPU() * 2) db.env.SetHighPriorityBackgroundThreads(1) @@ -152,6 +152,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -197,6 +200,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/store_test.go b/store/store_test.go index d2f2ca6..b488158 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -10,7 +10,7 @@ import ( ) func TestStore(t *testing.T) { - cfg := new(config.Config) + cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/testdb" cfg.LMDB.MapSize = 10 * 1024 * 1024