diff --git a/config/config.go b/config/config.go index 44887fa..92ea190 100644 --- a/config/config.go +++ b/config/config.go @@ -79,6 +79,7 @@ type ReplicationConfig struct { MaxLogFileNum int `toml:"max_log_file_num"` SyncLog int `toml:"sync_log"` Compression bool `toml:"compression"` + UseMmap bool `toml:"use_mmap"` } type SnapshotConfig struct { @@ -175,6 +176,7 @@ func NewConfigDefault() *Config { cfg.Replication.Compression = true cfg.Replication.WaitMaxSlaveAcks = 2 cfg.Replication.SyncLog = 0 + cfg.Replication.UseMmap = true cfg.Snapshot.MaxNum = 1 cfg.RocksDB.AllowOsBuffer = true diff --git a/config/config.toml b/config/config.toml index 4177699..44b38d6 100644 --- a/config/config.toml +++ b/config/config.toml @@ -134,6 +134,9 @@ max_log_file_size = 0 # for file store, if 0, use default 50 max_log_file_num = 0 +# for file store, use mmap for file read and write +use_mmap = true + # Sync log to disk if possible # 0: no sync # 1: sync every second diff --git a/etc/ledis.conf b/etc/ledis.conf index 4177699..44b38d6 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -134,6 +134,9 @@ max_log_file_size = 0 # for file store, if 0, use default 50 max_log_file_num = 0 +# for file store, use mmap for file read and write +use_mmap = true + # Sync log to disk if possible # 0: no sync # 1: sync every second diff --git a/rpl/file_io.go b/rpl/file_io.go index 2e0023c..5518b38 100644 --- a/rpl/file_io.go +++ b/rpl/file_io.go @@ -13,7 +13,7 @@ import ( type writeFile interface { Sync() error Write(b []byte) (n int, err error) - Close(addMagic bool) error + Close() error ReadAt(buf []byte, offset int64) (int, error) Truncate(size int64) error SetOffset(o int64) @@ -50,19 +50,9 @@ func newRawWriteFile(name string, size int64) (writeFile, error) { return m, nil } -func (m *rawWriteFile) Close(addMagic bool) error { - if addMagic { - if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil { - return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) - } - - if _, err := m.f.WriteAt(magic, m.offset); err != nil { - return fmt.Errorf("close write %s magic error %s", m.name, err.Error()) - } - } else { - if err := m.f.Truncate(m.offset); err != nil { - return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) - } +func (m *rawWriteFile) Close() error { + if err := m.f.Truncate(m.offset); err != nil { + return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) } if err := m.f.Close(); err != nil { @@ -77,7 +67,7 @@ func (m *rawWriteFile) Sync() error { } func (m *rawWriteFile) Write(b []byte) (n int, err error) { - n, err = m.f.Write(b) + n, err = m.f.WriteAt(b, m.offset) if err != nil { return } else if n != len(b) { @@ -210,23 +200,13 @@ func (m *mmapWriteFile) Sync() error { return m.m.Flush() } -func (m *mmapWriteFile) Close(addMagic bool) error { +func (m *mmapWriteFile) Close() error { if err := m.m.Unmap(); err != nil { return fmt.Errorf("unmap %s error %s", m.name, err.Error()) } - if addMagic { - if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil { - return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) - } - - if _, err := m.f.WriteAt(magic, m.offset); err != nil { - return fmt.Errorf("close write %s magic error %s", m.name, err.Error()) - } - } else { - if err := m.f.Truncate(m.offset); err != nil { - return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) - } + if err := m.f.Truncate(m.offset); err != nil { + return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) } if err := m.f.Close(); err != nil { diff --git a/rpl/file_store.go b/rpl/file_store.go index 5cebdfa..9c00b5e 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -90,7 +90,7 @@ func NewFileStore(base string, cfg *config.Config) (*FileStore, error) { index = s.rs[len(s.rs)-1].index + 1 } - s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize) + s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize, cfg.Replication.UseMmap) s.w.SetSyncType(cfg.Replication.SyncLog) go s.checkTableReaders() @@ -244,7 +244,7 @@ func (s *FileStore) Clear() error { return err } - s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize) + s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize, s.cfg.Replication.UseMmap) return nil } @@ -335,7 +335,7 @@ func (s *FileStore) load() error { var index int64 for _, f := range fs { if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil { - if r, err = newTableReader(s.base, index); err != nil { + if r, err = newTableReader(s.base, index, s.cfg.Replication.UseMmap); err != nil { log.Error("load table %s err: %s", f.Name(), err.Error()) } else { s.rs = append(s.rs, r) diff --git a/rpl/file_table.go b/rpl/file_table.go index de5f85d..b4dcbfd 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -29,9 +29,6 @@ func fmtTableMetaName(base string, index int64) string { return path.Join(base, fmt.Sprintf("%08d.meta", index)) } -//todo config -var useMmap = true - type tableReader struct { sync.Mutex @@ -51,7 +48,7 @@ type tableReader struct { useMmap bool } -func newTableReader(base string, index int64) (*tableReader, error) { +func newTableReader(base string, index int64, useMmap bool) (*tableReader, error) { if index <= 0 { return nil, fmt.Errorf("invalid index %d", index) } @@ -59,7 +56,6 @@ func newTableReader(base string, index int64) (*tableReader, error) { t.base = base t.index = index - //todo, use config t.useMmap = useMmap var err error @@ -122,7 +118,8 @@ func (t *tableReader) getLogPos(index int) (uint32, error) { func (t *tableReader) checkData() error { var err error - if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil { + //check will use raw file mode + if t.data, err = newReadFile(false, fmtTableDataName(t.base, t.index)); err != nil { return err } @@ -144,7 +141,8 @@ func (t *tableReader) checkData() error { func (t *tableReader) checkMeta() error { var err error - if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil { + //check will use raw file mode + if t.meta, err = newReadFile(false, fmtTableMetaName(t.base, t.index)); err != nil { return err } @@ -205,10 +203,11 @@ func (t *tableReader) repair() error { var data writeFile var meta writeFile - data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), 0) + //repair will use raw file mode + data, err = newWriteFile(false, fmtTableDataName(t.base, t.index), 0) data.SetOffset(int64(data.Size())) - meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)) + meta, err = newWriteFile(false, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)) var l Log var pos int64 = 0 @@ -254,13 +253,17 @@ func (t *tableReader) repair() error { } var e error - if err := meta.Close(false); err != nil { + if err := meta.Close(); err != nil { e = err } data.SetOffset(pos) - if err = data.Close(true); err != nil { + if _, err = data.Write(magic); err != nil { + log.Error("write magic error %s", err.Error()) + } + + if err = data.Close(); err != nil { return err } @@ -348,7 +351,7 @@ type tableWriter struct { useMmap bool } -func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { +func newTableWriter(base string, index int64, maxLogSize int64, useMmap bool) *tableWriter { if index <= 0 { panic(fmt.Errorf("invalid index %d", index)) } @@ -364,7 +367,6 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { t.posBuf = make([]byte, 4) - //todo, use config t.useMmap = useMmap return t @@ -384,14 +386,18 @@ func (t *tableWriter) SetSyncType(tp int) { func (t *tableWriter) close() { if t.meta != nil { - if err := t.meta.Close(false); err != nil { + if err := t.meta.Close(); err != nil { log.Fatal("close log meta error %s", err.Error()) } t.meta = nil } if t.data != nil { - if err := t.data.Close(true); err != nil { + if _, err := t.data.Write(magic); err != nil { + log.Fatal("write magic error %s", err.Error()) + } + + if err := t.data.Close(); err != nil { log.Fatal("close log data error %s", err.Error()) } t.data = nil @@ -435,8 +441,7 @@ func (t *tableWriter) Flush() (*tableReader, error) { tr.first = t.first tr.last = t.last tr.lastTime = t.lastTime - //todo config - tr.useMmap = useMmap + tr.useMmap = t.useMmap t.close() diff --git a/rpl/file_table_test.go b/rpl/file_table_test.go index b5cfb4b..e020c8a 100644 --- a/rpl/file_table_test.go +++ b/rpl/file_table_test.go @@ -10,15 +10,11 @@ import ( ) func TestFileTable(t *testing.T) { - useMmap = true - testFileTable(t) - - useMmap = false - testFileTable(t) - useMmap = true + testFileTable(t, true) + testFileTable(t, false) } -func testFileTable(t *testing.T) { +func testFileTable(t *testing.T, useMmap bool) { log.SetLevel(log.LevelInfo) base, err := ioutil.TempDir("", "test_table") @@ -34,7 +30,7 @@ func testFileTable(t *testing.T) { l.Compression = 0 l.Data = make([]byte, 4096) - w := newTableWriter(base, 1, 1024*1024) + w := newTableWriter(base, 1, 1024*1024, useMmap) defer w.Close() for i := 0; i < 10; i++ { @@ -118,7 +114,7 @@ func testFileTable(t *testing.T) { r.Close() - if r, err = newTableReader(base, 1); err != nil { + if r, err = newTableReader(base, 1, useMmap); err != nil { t.Fatal(err) } defer r.Close() @@ -143,8 +139,8 @@ func testFileTable(t *testing.T) { log.SetLevel(log.LevelFatal) - testRepair(t, name, 1, s, 11) - testRepair(t, name, 1, s, 20) + testRepair(t, name, 1, s, 11, useMmap) + testRepair(t, name, 1, s, 20, useMmap) if err := os.Truncate(name, s-21); err != nil { t.Fatal(err) @@ -156,13 +152,13 @@ func testFileTable(t *testing.T) { r.Close() } - if r, err = newTableReader(base, 2); err != nil { + if r, err = newTableReader(base, 2, useMmap); err != nil { t.Fatal(err) } r.Close() } -func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) { +func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64, useMmap bool) { var r *tableReader var err error @@ -170,7 +166,7 @@ func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) t.Fatal(err) } - if r, err = newTableReader(path.Dir(name), index); err != nil { + if r, err = newTableReader(path.Dir(name), index, useMmap); err != nil { t.Fatal(err) } defer r.Close()