add usemmap config

This commit is contained in:
siddontang 2014-11-20 22:28:08 +08:00
parent 62440fef91
commit 98ddc8b47a
7 changed files with 51 additions and 62 deletions

View File

@ -79,6 +79,7 @@ type ReplicationConfig struct {
MaxLogFileNum int `toml:"max_log_file_num"` MaxLogFileNum int `toml:"max_log_file_num"`
SyncLog int `toml:"sync_log"` SyncLog int `toml:"sync_log"`
Compression bool `toml:"compression"` Compression bool `toml:"compression"`
UseMmap bool `toml:"use_mmap"`
} }
type SnapshotConfig struct { type SnapshotConfig struct {
@ -175,6 +176,7 @@ func NewConfigDefault() *Config {
cfg.Replication.Compression = true cfg.Replication.Compression = true
cfg.Replication.WaitMaxSlaveAcks = 2 cfg.Replication.WaitMaxSlaveAcks = 2
cfg.Replication.SyncLog = 0 cfg.Replication.SyncLog = 0
cfg.Replication.UseMmap = true
cfg.Snapshot.MaxNum = 1 cfg.Snapshot.MaxNum = 1
cfg.RocksDB.AllowOsBuffer = true cfg.RocksDB.AllowOsBuffer = true

View File

@ -134,6 +134,9 @@ max_log_file_size = 0
# for file store, if 0, use default 50 # for file store, if 0, use default 50
max_log_file_num = 0 max_log_file_num = 0
# for file store, use mmap for file read and write
use_mmap = true
# Sync log to disk if possible # Sync log to disk if possible
# 0: no sync # 0: no sync
# 1: sync every second # 1: sync every second

View File

@ -134,6 +134,9 @@ max_log_file_size = 0
# for file store, if 0, use default 50 # for file store, if 0, use default 50
max_log_file_num = 0 max_log_file_num = 0
# for file store, use mmap for file read and write
use_mmap = true
# Sync log to disk if possible # Sync log to disk if possible
# 0: no sync # 0: no sync
# 1: sync every second # 1: sync every second

View File

@ -13,7 +13,7 @@ import (
type writeFile interface { type writeFile interface {
Sync() error Sync() error
Write(b []byte) (n int, err error) Write(b []byte) (n int, err error)
Close(addMagic bool) error Close() error
ReadAt(buf []byte, offset int64) (int, error) ReadAt(buf []byte, offset int64) (int, error)
Truncate(size int64) error Truncate(size int64) error
SetOffset(o int64) SetOffset(o int64)
@ -50,19 +50,9 @@ func newRawWriteFile(name string, size int64) (writeFile, error) {
return m, nil return m, nil
} }
func (m *rawWriteFile) Close(addMagic bool) error { func (m *rawWriteFile) Close() error {
if addMagic { if err := m.f.Truncate(m.offset); err != nil {
if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil { return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
if _, err := m.f.WriteAt(magic, m.offset); err != nil {
return fmt.Errorf("close write %s magic error %s", m.name, err.Error())
}
} else {
if err := m.f.Truncate(m.offset); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
} }
if err := m.f.Close(); err != nil { if err := m.f.Close(); err != nil {
@ -77,7 +67,7 @@ func (m *rawWriteFile) Sync() error {
} }
func (m *rawWriteFile) Write(b []byte) (n int, err error) { func (m *rawWriteFile) Write(b []byte) (n int, err error) {
n, err = m.f.Write(b) n, err = m.f.WriteAt(b, m.offset)
if err != nil { if err != nil {
return return
} else if n != len(b) { } else if n != len(b) {
@ -210,23 +200,13 @@ func (m *mmapWriteFile) Sync() error {
return m.m.Flush() return m.m.Flush()
} }
func (m *mmapWriteFile) Close(addMagic bool) error { func (m *mmapWriteFile) Close() error {
if err := m.m.Unmap(); err != nil { if err := m.m.Unmap(); err != nil {
return fmt.Errorf("unmap %s error %s", m.name, err.Error()) return fmt.Errorf("unmap %s error %s", m.name, err.Error())
} }
if addMagic { if err := m.f.Truncate(m.offset); err != nil {
if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil { return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
if _, err := m.f.WriteAt(magic, m.offset); err != nil {
return fmt.Errorf("close write %s magic error %s", m.name, err.Error())
}
} else {
if err := m.f.Truncate(m.offset); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
} }
if err := m.f.Close(); err != nil { if err := m.f.Close(); err != nil {

View File

@ -90,7 +90,7 @@ func NewFileStore(base string, cfg *config.Config) (*FileStore, error) {
index = s.rs[len(s.rs)-1].index + 1 index = s.rs[len(s.rs)-1].index + 1
} }
s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize) s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize, cfg.Replication.UseMmap)
s.w.SetSyncType(cfg.Replication.SyncLog) s.w.SetSyncType(cfg.Replication.SyncLog)
go s.checkTableReaders() go s.checkTableReaders()
@ -244,7 +244,7 @@ func (s *FileStore) Clear() error {
return err return err
} }
s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize) s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize, s.cfg.Replication.UseMmap)
return nil return nil
} }
@ -335,7 +335,7 @@ func (s *FileStore) load() error {
var index int64 var index int64
for _, f := range fs { for _, f := range fs {
if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil { if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil {
if r, err = newTableReader(s.base, index); err != nil { if r, err = newTableReader(s.base, index, s.cfg.Replication.UseMmap); err != nil {
log.Error("load table %s err: %s", f.Name(), err.Error()) log.Error("load table %s err: %s", f.Name(), err.Error())
} else { } else {
s.rs = append(s.rs, r) s.rs = append(s.rs, r)

View File

@ -29,9 +29,6 @@ func fmtTableMetaName(base string, index int64) string {
return path.Join(base, fmt.Sprintf("%08d.meta", index)) return path.Join(base, fmt.Sprintf("%08d.meta", index))
} }
//todo config
var useMmap = true
type tableReader struct { type tableReader struct {
sync.Mutex sync.Mutex
@ -51,7 +48,7 @@ type tableReader struct {
useMmap bool useMmap bool
} }
func newTableReader(base string, index int64) (*tableReader, error) { func newTableReader(base string, index int64, useMmap bool) (*tableReader, error) {
if index <= 0 { if index <= 0 {
return nil, fmt.Errorf("invalid index %d", index) return nil, fmt.Errorf("invalid index %d", index)
} }
@ -59,7 +56,6 @@ func newTableReader(base string, index int64) (*tableReader, error) {
t.base = base t.base = base
t.index = index t.index = index
//todo, use config
t.useMmap = useMmap t.useMmap = useMmap
var err error var err error
@ -122,7 +118,8 @@ func (t *tableReader) getLogPos(index int) (uint32, error) {
func (t *tableReader) checkData() error { func (t *tableReader) checkData() error {
var err error var err error
if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil { //check will use raw file mode
if t.data, err = newReadFile(false, fmtTableDataName(t.base, t.index)); err != nil {
return err return err
} }
@ -144,7 +141,8 @@ func (t *tableReader) checkData() error {
func (t *tableReader) checkMeta() error { func (t *tableReader) checkMeta() error {
var err error var err error
if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil { //check will use raw file mode
if t.meta, err = newReadFile(false, fmtTableMetaName(t.base, t.index)); err != nil {
return err return err
} }
@ -205,10 +203,11 @@ func (t *tableReader) repair() error {
var data writeFile var data writeFile
var meta writeFile var meta writeFile
data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), 0) //repair will use raw file mode
data, err = newWriteFile(false, fmtTableDataName(t.base, t.index), 0)
data.SetOffset(int64(data.Size())) data.SetOffset(int64(data.Size()))
meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)) meta, err = newWriteFile(false, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4))
var l Log var l Log
var pos int64 = 0 var pos int64 = 0
@ -254,13 +253,17 @@ func (t *tableReader) repair() error {
} }
var e error var e error
if err := meta.Close(false); err != nil { if err := meta.Close(); err != nil {
e = err e = err
} }
data.SetOffset(pos) data.SetOffset(pos)
if err = data.Close(true); err != nil { if _, err = data.Write(magic); err != nil {
log.Error("write magic error %s", err.Error())
}
if err = data.Close(); err != nil {
return err return err
} }
@ -348,7 +351,7 @@ type tableWriter struct {
useMmap bool useMmap bool
} }
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { func newTableWriter(base string, index int64, maxLogSize int64, useMmap bool) *tableWriter {
if index <= 0 { if index <= 0 {
panic(fmt.Errorf("invalid index %d", index)) panic(fmt.Errorf("invalid index %d", index))
} }
@ -364,7 +367,6 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
t.posBuf = make([]byte, 4) t.posBuf = make([]byte, 4)
//todo, use config
t.useMmap = useMmap t.useMmap = useMmap
return t return t
@ -384,14 +386,18 @@ func (t *tableWriter) SetSyncType(tp int) {
func (t *tableWriter) close() { func (t *tableWriter) close() {
if t.meta != nil { if t.meta != nil {
if err := t.meta.Close(false); err != nil { if err := t.meta.Close(); err != nil {
log.Fatal("close log meta error %s", err.Error()) log.Fatal("close log meta error %s", err.Error())
} }
t.meta = nil t.meta = nil
} }
if t.data != nil { if t.data != nil {
if err := t.data.Close(true); err != nil { if _, err := t.data.Write(magic); err != nil {
log.Fatal("write magic error %s", err.Error())
}
if err := t.data.Close(); err != nil {
log.Fatal("close log data error %s", err.Error()) log.Fatal("close log data error %s", err.Error())
} }
t.data = nil t.data = nil
@ -435,8 +441,7 @@ func (t *tableWriter) Flush() (*tableReader, error) {
tr.first = t.first tr.first = t.first
tr.last = t.last tr.last = t.last
tr.lastTime = t.lastTime tr.lastTime = t.lastTime
//todo config tr.useMmap = t.useMmap
tr.useMmap = useMmap
t.close() t.close()

View File

@ -10,15 +10,11 @@ import (
) )
func TestFileTable(t *testing.T) { func TestFileTable(t *testing.T) {
useMmap = true testFileTable(t, true)
testFileTable(t) testFileTable(t, false)
useMmap = false
testFileTable(t)
useMmap = true
} }
func testFileTable(t *testing.T) { func testFileTable(t *testing.T, useMmap bool) {
log.SetLevel(log.LevelInfo) log.SetLevel(log.LevelInfo)
base, err := ioutil.TempDir("", "test_table") base, err := ioutil.TempDir("", "test_table")
@ -34,7 +30,7 @@ func testFileTable(t *testing.T) {
l.Compression = 0 l.Compression = 0
l.Data = make([]byte, 4096) l.Data = make([]byte, 4096)
w := newTableWriter(base, 1, 1024*1024) w := newTableWriter(base, 1, 1024*1024, useMmap)
defer w.Close() defer w.Close()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -118,7 +114,7 @@ func testFileTable(t *testing.T) {
r.Close() r.Close()
if r, err = newTableReader(base, 1); err != nil { if r, err = newTableReader(base, 1, useMmap); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer r.Close() defer r.Close()
@ -143,8 +139,8 @@ func testFileTable(t *testing.T) {
log.SetLevel(log.LevelFatal) log.SetLevel(log.LevelFatal)
testRepair(t, name, 1, s, 11) testRepair(t, name, 1, s, 11, useMmap)
testRepair(t, name, 1, s, 20) testRepair(t, name, 1, s, 20, useMmap)
if err := os.Truncate(name, s-21); err != nil { if err := os.Truncate(name, s-21); err != nil {
t.Fatal(err) t.Fatal(err)
@ -156,13 +152,13 @@ func testFileTable(t *testing.T) {
r.Close() r.Close()
} }
if r, err = newTableReader(base, 2); err != nil { if r, err = newTableReader(base, 2, useMmap); err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.Close() r.Close()
} }
func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) { func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64, useMmap bool) {
var r *tableReader var r *tableReader
var err error var err error
@ -170,7 +166,7 @@ func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64)
t.Fatal(err) t.Fatal(err)
} }
if r, err = newTableReader(path.Dir(name), index); err != nil { if r, err = newTableReader(path.Dir(name), index, useMmap); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer r.Close() defer r.Close()