diff --git a/config/config.go b/config/config.go index f9fb6aa..44887fa 100644 --- a/config/config.go +++ b/config/config.go @@ -203,7 +203,7 @@ func (cfg *Config) adjust() { cfg.RocksDB.adjust() cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays) - cfg.Replication.MaxLogFileNum = getDefault(10, cfg.Replication.MaxLogFileNum) + cfg.Replication.MaxLogFileNum = getDefault(50, cfg.Replication.MaxLogFileNum) cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize) cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize) cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval) diff --git a/config/config.toml b/config/config.toml index 98da90e..4177699 100644 --- a/config/config.toml +++ b/config/config.toml @@ -128,11 +128,11 @@ store_name = "file" # Expire write ahead logs after the given days expired_log_days = 7 -# for file store, if 0, use default 1G, max is 4G +# for file store, if 0, use default 256MB, max is 1G max_log_file_size = 0 -# for file store, if 0, use default 10 -max_log_file_num = 10 +# for file store, if 0, use default 50 +max_log_file_num = 0 # Sync log to disk if possible # 0: no sync diff --git a/etc/ledis.conf b/etc/ledis.conf index 98da90e..4177699 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -128,11 +128,11 @@ store_name = "file" # Expire write ahead logs after the given days expired_log_days = 7 -# for file store, if 0, use default 1G, max is 4G +# for file store, if 0, use default 256MB, max is 1G max_log_file_size = 0 -# for file store, if 0, use default 10 -max_log_file_num = 10 +# for file store, if 0, use default 50 +max_log_file_num = 0 # Sync log to disk if possible # 0: no sync diff --git a/ledis/replication_test.go b/ledis/replication_test.go index fc5e210..287480b 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -42,6 +42,7 @@ func TestReplication(t *testing.T) { if err != nil { t.Fatal(err) } + defer master.Close() cfgS := config.NewConfigDefault() cfgS.DataDir = "/tmp/test_repl/slave" @@ -54,6 +55,7 @@ func TestReplication(t *testing.T) { if err != nil { t.Fatal(err) } + defer slave.Close() db, _ := master.Select(0) db.Set([]byte("a"), []byte("value")) diff --git a/ledis/tx_test.go b/ledis/tx_test.go index e21c0a8..26888b5 100644 --- a/ledis/tx_test.go +++ b/ledis/tx_test.go @@ -195,7 +195,7 @@ func testTx(t *testing.T, name string) { cfg.DBName = name cfg.LMDB.MapSize = 10 * 1024 * 1024 - cfg.UseReplication = true + //cfg.UseReplication = true os.RemoveAll(cfg.DataDir) diff --git a/rpl/file_io.go b/rpl/file_io.go new file mode 100644 index 0000000..2e0023c --- /dev/null +++ b/rpl/file_io.go @@ -0,0 +1,383 @@ +package rpl + +import ( + "fmt" + "github.com/edsrzf/mmap-go" + "github.com/siddontang/go/log" + "io" + "os" +) + +//like leveldb or rocksdb file interface, haha! + +type writeFile interface { + Sync() error + Write(b []byte) (n int, err error) + Close(addMagic bool) error + ReadAt(buf []byte, offset int64) (int, error) + Truncate(size int64) error + SetOffset(o int64) + Name() string + Size() int + Offset() int64 +} + +type readFile interface { + ReadAt(buf []byte, offset int64) (int, error) + Close() error + Size() int + Name() string +} + +type rawWriteFile struct { + writeFile + f *os.File + offset int64 + name string +} + +func newRawWriteFile(name string, size int64) (writeFile, error) { + m := new(rawWriteFile) + var err error + + m.name = name + + m.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, err + } + + return m, nil +} + +func (m *rawWriteFile) Close(addMagic bool) error { + if addMagic { + if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil { + return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) + } + + if _, err := m.f.WriteAt(magic, m.offset); err != nil { + return fmt.Errorf("close write %s magic error %s", m.name, err.Error()) + } + } else { + if err := m.f.Truncate(m.offset); err != nil { + return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) + } + } + + if err := m.f.Close(); err != nil { + return fmt.Errorf("close %s error %s", m.name, err.Error()) + } + + return nil +} + +func (m *rawWriteFile) Sync() error { + return m.f.Sync() +} + +func (m *rawWriteFile) Write(b []byte) (n int, err error) { + n, err = m.f.Write(b) + if err != nil { + return + } else if n != len(b) { + err = io.ErrShortWrite + return + } + + m.offset += int64(n) + return +} + +func (m *rawWriteFile) ReadAt(buf []byte, offset int64) (int, error) { + return m.f.ReadAt(buf, offset) +} + +func (m *rawWriteFile) Truncate(size int64) error { + var err error + if err = m.f.Truncate(size); err != nil { + return err + } + + if m.offset > size { + m.offset = size + } + return nil +} + +func (m *rawWriteFile) SetOffset(o int64) { + m.offset = o +} + +func (m *rawWriteFile) Offset() int64 { + return m.offset +} + +func (m *rawWriteFile) Name() string { + return m.name +} + +func (m *rawWriteFile) Size() int { + st, _ := m.f.Stat() + return int(st.Size()) +} + +type rawReadFile struct { + readFile + + f *os.File + name string +} + +func newRawReadFile(name string) (readFile, error) { + m := new(rawReadFile) + + var err error + m.f, err = os.Open(name) + m.name = name + + if err != nil { + return nil, err + } + + return m, err +} + +func (m *rawReadFile) Close() error { + return m.f.Close() +} + +func (m *rawReadFile) Size() int { + st, _ := m.f.Stat() + return int(st.Size()) +} + +func (m *rawReadFile) ReadAt(b []byte, offset int64) (int, error) { + return m.f.ReadAt(b, offset) +} + +func (m *rawReadFile) Name() string { + return m.name +} + +///////////////////////////////////////////////// + +type mmapWriteFile struct { + writeFile + + f *os.File + m mmap.MMap + name string + size int64 + offset int64 +} + +func newMmapWriteFile(name string, size int64) (writeFile, error) { + m := new(mmapWriteFile) + + m.name = name + + var err error + + m.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, err + } + + if size == 0 { + st, _ := m.f.Stat() + size = st.Size() + } + + if err = m.f.Truncate(size); err != nil { + return nil, err + } + + if m.m, err = mmap.Map(m.f, mmap.RDWR, 0); err != nil { + return nil, err + } + + m.size = size + m.offset = 0 + return m, nil +} + +func (m *mmapWriteFile) Size() int { + return int(m.size) +} + +func (m *mmapWriteFile) Sync() error { + return m.m.Flush() +} + +func (m *mmapWriteFile) Close(addMagic bool) error { + if err := m.m.Unmap(); err != nil { + return fmt.Errorf("unmap %s error %s", m.name, err.Error()) + } + + if addMagic { + if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil { + return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) + } + + if _, err := m.f.WriteAt(magic, m.offset); err != nil { + return fmt.Errorf("close write %s magic error %s", m.name, err.Error()) + } + } else { + if err := m.f.Truncate(m.offset); err != nil { + return fmt.Errorf("close truncate %s error %s", m.name, err.Error()) + } + } + + if err := m.f.Close(); err != nil { + return fmt.Errorf("close %s error %s", m.name, err.Error()) + } + + return nil +} + +func (m *mmapWriteFile) Write(b []byte) (n int, err error) { + extra := int64(len(b)) - (m.size - m.offset) + if extra > 0 { + newSize := m.size + extra + m.size/10 + println("need truncate ???", newSize, m.size, len(b)) + if err = m.Truncate(newSize); err != nil { + return + } + m.size = newSize + } + + n = copy(m.m[m.offset:], b) + if n != len(b) { + return 0, io.ErrShortWrite + } + + m.offset += int64(len(b)) + return len(b), nil +} + +func (m *mmapWriteFile) ReadAt(buf []byte, offset int64) (int, error) { + if offset > m.offset { + return 0, fmt.Errorf("invalid offset %d", offset) + } + + n := copy(buf, m.m[offset:m.offset]) + if n != len(buf) { + return n, io.ErrUnexpectedEOF + } + + return n, nil +} + +func (m *mmapWriteFile) Truncate(size int64) error { + var err error + if err = m.m.Unmap(); err != nil { + return err + } + + if err = m.f.Truncate(size); err != nil { + return err + } + + if m.m, err = mmap.Map(m.f, mmap.RDWR, 0); err != nil { + return err + } + + m.size = size + if m.offset > m.size { + m.offset = m.size + } + return nil +} + +func (m *mmapWriteFile) SetOffset(o int64) { + m.offset = o +} + +func (m *mmapWriteFile) Offset() int64 { + return m.offset +} + +func (m *mmapWriteFile) Name() string { + return m.name +} + +type mmapReadFile struct { + readFile + + f *os.File + m mmap.MMap + name string +} + +func newMmapReadFile(name string) (readFile, error) { + m := new(mmapReadFile) + + m.name = name + + var err error + m.f, err = os.Open(name) + if err != nil { + return nil, err + } + + m.m, err = mmap.Map(m.f, mmap.RDONLY, 0) + return m, err +} + +func (m *mmapReadFile) ReadAt(buf []byte, offset int64) (int, error) { + if int64(offset) > int64(len(m.m)) { + return 0, fmt.Errorf("invalid offset %d", offset) + } + + n := copy(buf, m.m[offset:]) + if n != len(buf) { + return n, io.ErrUnexpectedEOF + } + + return n, nil +} + +func (m *mmapReadFile) Close() error { + if m.m != nil { + if err := m.m.Unmap(); err != nil { + log.Error("unmap %s error %s", m.name, err.Error()) + } + m.m = nil + } + + if m.f != nil { + if err := m.f.Close(); err != nil { + log.Error("close %s error %s", m.name, err.Error()) + } + m.f = nil + } + + return nil +} + +func (m *mmapReadFile) Size() int { + return len(m.m) +} + +func (m *mmapReadFile) Name() string { + return m.name +} + +///////////////////////////////////// + +func newWriteFile(useMmap bool, name string, size int64) (writeFile, error) { + if useMmap { + return newMmapWriteFile(name, size) + } else { + return newRawWriteFile(name, size) + } +} + +func newReadFile(useMmap bool, name string) (readFile, error) { + if useMmap { + return newMmapReadFile(name) + } else { + return newRawReadFile(name) + } +} diff --git a/rpl/file_store.go b/rpl/file_store.go index 13d86c8..5cebdfa 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -13,33 +13,30 @@ import ( ) const ( - defaultMaxLogFileSize = int64(1024 * 1024 * 1024) + defaultMaxLogFileSize = int64(256 * 1024 * 1024) - //why 4G, we can use uint32 as offset, reduce memory useage - maxLogFileSize = int64(uint32(4*1024*1024*1024 - 1)) + maxLogFileSize = int64(1024 * 1024 * 1024) - maxLogNumInFile = uint64(10000000) + defaultLogNumInFile = int64(1024 * 1024) ) /* File Store: - 00000001.ldb - 00000002.ldb + 00000001.data + 00000001.meta + 00000002.data + 00000002.meta - log: log1 data | log2 data | split data | log1 offset | log 2 offset | offset start pos | offset length | magic data + data: log1 data | log2 data | magic data - log id can not be 0, we use here for split data if data has no magic data, it means that we don't close replication gracefully. so we must repair the log data log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data split data = log0 data + [padding 0] -> file % pagesize() == 0 - log0: id 0, create time 0, compression 0, data len 7, data "ledisdb" + meta: log1 offset | log2 offset log offset: bigendian uint32 | bigendian uint32 - offset start pos: bigendian uint64 - offset length: bigendian uint32 - //sha1 of github.com/siddontang/ledisdb 20 bytes magic data = "\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17" @@ -270,6 +267,7 @@ func (s *FileStore) Close() error { for i := range s.rs { s.rs[i].Close() } + s.rs = tableReaders{} s.rm.Unlock() @@ -312,11 +310,16 @@ func (s *FileStore) checkTableReaders() { func (s *FileStore) purgeTableReaders(purges []*tableReader) { for _, r := range purges { - name := r.name + dataName := fmtTableDataName(r.base, r.index) + metaName := fmtTableMetaName(r.base, r.index) r.Close() - if err := os.Remove(name); err != nil { - log.Error("purge table %s err: %s", name, err.Error()) + if err := os.Remove(dataName); err != nil { + log.Error("purge table data %s err: %s", dataName, err.Error()) } + if err := os.Remove(metaName); err != nil { + log.Error("purge table meta %s err: %s", metaName, err.Error()) + } + } } @@ -331,7 +334,7 @@ func (s *FileStore) load() error { var r *tableReader var index int64 for _, f := range fs { - if _, err := fmt.Sscanf(f.Name(), "%08d.ldb", &index); err == nil { + if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil { if r, err = newTableReader(s.base, index); err != nil { log.Error("load table %s err: %s", f.Name(), err.Error()) } else { @@ -391,16 +394,16 @@ func (ts tableReaders) check() error { index := ts[0].index if first == 0 || first > last { - return fmt.Errorf("invalid log in table %s", ts[0].name) + return fmt.Errorf("invalid log in table %s", ts[0]) } for i := 1; i < len(ts); i++ { if ts[i].first <= last { - return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name) + return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i]) } if ts[i].index <= index { - return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name) + return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i]) } first = ts[i].first diff --git a/rpl/file_table.go b/rpl/file_table.go index 7b37cd8..de5f85d 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -1,64 +1,54 @@ package rpl import ( - "bufio" "bytes" "encoding/binary" "errors" "fmt" - "github.com/edsrzf/mmap-go" "github.com/siddontang/go/log" - "github.com/siddontang/go/num" "github.com/siddontang/go/sync2" "io" - "io/ioutil" - "os" "path" - "reflect" "sync" "time" ) var ( magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17") - log0 = Log{0, 1, 1, []byte("ledisdb")} - log0Data = []byte{} errTableNeedFlush = errors.New("write table need flush") errNilHandler = errors.New("nil write handler") - pageSize = int64(4096) ) -func init() { - log0Data, _ = log0.Marshal() - pageSize = int64(os.Getpagesize()) -} - const tableReaderKeepaliveInterval int64 = 30 -func fmtTableName(index int64) string { - return fmt.Sprintf("%08d.ldb", index) +func fmtTableDataName(base string, index int64) string { + return path.Join(base, fmt.Sprintf("%08d.data", index)) } +func fmtTableMetaName(base string, index int64) string { + return path.Join(base, fmt.Sprintf("%08d.meta", index)) +} + +//todo config +var useMmap = true + type tableReader struct { sync.Mutex - name string + base string index int64 - f *os.File - m mmap.MMap - - pf *os.File + data readFile + meta readFile first uint64 last uint64 lastTime uint32 - offsetStartPos int64 - offsetLen uint32 - lastReadTime sync2.AtomicInt64 + + useMmap bool } func newTableReader(base string, index int64) (*tableReader, error) { @@ -66,16 +56,19 @@ func newTableReader(base string, index int64) (*tableReader, error) { return nil, fmt.Errorf("invalid index %d", index) } t := new(tableReader) - t.name = path.Join(base, fmtTableName(index)) + t.base = base t.index = index + //todo, use config + t.useMmap = useMmap + var err error if err = t.check(); err != nil { - log.Error("check %s error: %s, try to repair", t.name, err.Error()) + log.Error("check %d error: %s, try to repair", t.index, err.Error()) if err = t.repair(); err != nil { - log.Error("repair %s error: %s", t.name, err.Error()) + log.Error("repair %d error: %s", t.index, err.Error()) return nil, err } } @@ -85,22 +78,27 @@ func newTableReader(base string, index int64) (*tableReader, error) { return t, nil } +func (t *tableReader) String() string { + return fmt.Sprintf("%d", t.index) +} + func (t *tableReader) Close() { t.Lock() - defer t.Unlock() t.close() + + t.Unlock() } func (t *tableReader) close() { - if t.m != nil { - t.m.Unmap() - t.m = nil + if t.data != nil { + t.data.Close() + t.data = nil } - if t.f != nil { - t.f.Close() - t.f = nil + if t.meta != nil { + t.meta.Close() + t.meta = nil } } @@ -114,96 +112,78 @@ func (t *tableReader) Keepalived() bool { } func (t *tableReader) getLogPos(index int) (uint32, error) { - // if _, err := t.pf.Seek(t.offsetStartPos+int64(index*4), os.SEEK_SET); err != nil { - // return 0, err - // } + var buf [4]byte + if _, err := t.meta.ReadAt(buf[0:4], int64(index)*4); err != nil { + return 0, err + } - // var pos uint32 - // if err := binary.Read(t.pf, binary.BigEndian, &pos); err != nil { - // return 0, err - // } - // return pos, nil + return binary.BigEndian.Uint32(buf[0:4]), nil +} - return binary.BigEndian.Uint32(t.m[index*4:]), nil +func (t *tableReader) checkData() error { + var err error + if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil { + return err + } + + if t.data.Size() < len(magic) { + return fmt.Errorf("data file %s size %d too short", t.data.Name(), t.data.Size()) + } + + buf := make([]byte, len(magic)) + if _, err := t.data.ReadAt(buf, int64(t.data.Size()-len(magic))); err != nil { + return err + } + + if !bytes.Equal(magic, buf) { + return fmt.Errorf("data file %s invalid magic data %q", t.data.Name(), buf) + } + + return nil +} + +func (t *tableReader) checkMeta() error { + var err error + if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil { + return err + } + + if t.meta.Size()%4 != 0 || t.meta.Size() == 0 { + return fmt.Errorf("meta file %s invalid offset len %d, must 4 multiple and not 0", t.meta.Name(), t.meta.Size()) + } + + return nil } func (t *tableReader) check() error { var err error - if t.f, err = os.Open(t.name); err != nil { + if err := t.checkMeta(); err != nil { return err } - st, _ := t.f.Stat() - - if st.Size() < 32 { - return fmt.Errorf("file size %d too short", st.Size()) - } - - var pos int64 - if pos, err = t.f.Seek(-32, os.SEEK_END); err != nil { - return err - } - - if err = binary.Read(t.f, binary.BigEndian, &t.offsetStartPos); err != nil { - return err - } else if t.offsetStartPos >= st.Size() { - return fmt.Errorf("invalid offset start pos %d, file size %d", t.offsetStartPos, st.Size()) - } else if t.offsetStartPos%pageSize != 0 { - return fmt.Errorf("invalid offset start pos %d, must page size %d multi", t.offsetStartPos, pageSize) - } - - if err = binary.Read(t.f, binary.BigEndian, &t.offsetLen); err != nil { - return err - } else if int64(t.offsetLen) >= st.Size() || t.offsetLen == 0 { - return fmt.Errorf("invalid offset len %d, file size %d", t.offsetLen, st.Size()) - } else if t.offsetLen%4 != 0 { - return fmt.Errorf("invalid offset len %d, must 4 multiple", t.offsetLen) - } - - if t.offsetStartPos+int64(t.offsetLen) != pos { - return fmt.Errorf("invalid offset %d %d", t.offsetStartPos, t.offsetLen) - } - - b := make([]byte, 20) - if _, err = t.f.Read(b); err != nil { - return err - } else if !bytes.Equal(b, magic) { - return fmt.Errorf("invalid magic data %q", b) - } - - if t.m, err = mmap.MapRegion(t.f, int(t.offsetLen), mmap.RDONLY, 0, t.offsetStartPos); err != nil { + if err := t.checkData(); err != nil { return err } firstLogPos, _ := t.getLogPos(0) - lastLogPos, _ := t.getLogPos(int(t.offsetLen/4 - 1)) + lastLogPos, _ := t.getLogPos(t.meta.Size()/4 - 1) if firstLogPos != 0 { return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos) - } else if int64(lastLogPos) > t.offsetStartPos { - return fmt.Errorf("invalid last log pos %d", lastLogPos) } var l Log - if _, err = t.decodeLogHead(&l, int64(firstLogPos)); err != nil { + if _, err = t.decodeLogHead(&l, t.data, int64(firstLogPos)); err != nil { return fmt.Errorf("decode first log err %s", err.Error()) } t.first = l.ID var n int64 - if n, err = t.decodeLogHead(&l, int64(lastLogPos)); err != nil { + if n, err = t.decodeLogHead(&l, t.data, int64(lastLogPos)); err != nil { return fmt.Errorf("decode last log err %s", err.Error()) - } else { - var l0 Log - if _, err := t.f.Seek(n, os.SEEK_SET); err != nil { - return fmt.Errorf("seek logo err %s", err.Error()) - } else if err = l0.Decode(t.f); err != nil { - println(lastLogPos, n, l0.ID, l0.CreateTime, l0.Compression) - return fmt.Errorf("decode log0 err %s", err.Error()) - } else if !reflect.DeepEqual(l0, log0) { - return fmt.Errorf("invalid log0 %#v != %#v", l0, log0) - } + } else if n+int64(len(magic)) != int64(t.data.Size()) { + return fmt.Errorf("extra log data at offset %d", n) } t.last = l.ID @@ -211,8 +191,8 @@ func (t *tableReader) check() error { if t.first > t.last { return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last) - } else if (t.last - t.first + 1) != uint64(t.offsetLen/4) { - return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.offsetLen/4) + } else if (t.last - t.first + 1) != uint64(t.meta.Size()/4) { + return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.meta.Size()/4) } return nil @@ -222,86 +202,73 @@ func (t *tableReader) repair() error { t.close() var err error - if t.f, err = os.Open(t.name); err != nil { - return err - } + var data writeFile + var meta writeFile - defer t.close() + data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), 0) + data.SetOffset(int64(data.Size())) - st, _ := t.f.Stat() - size := st.Size() - - if size == 0 { - return fmt.Errorf("empty file, can not repaired") - } - - tw := newTableWriter(path.Dir(t.name), t.index, maxLogFileSize) - - tmpName := tw.name + ".tmp" - tw.name = tmpName - os.Remove(tmpName) - - defer func() { - tw.Close() - os.Remove(tmpName) - }() + meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)) var l Log + var pos int64 = 0 + var nextPos int64 = 0 + b := make([]byte, 4) + + t.first = 0 + t.last = 0 for { - lastPos, _ := t.f.Seek(0, os.SEEK_CUR) - if lastPos == size { - //no data anymore, we can not read log0 - //we may meet the log missing risk but have no way - log.Error("no more data, maybe missing some logs, use your own risk!!!") + nextPos, err = t.decodeLogHead(&l, data, pos) + if err != nil { + //if error, we may lost all logs from pos + log.Error("%s may lost logs from %d", data.Name(), pos) break } - if err := l.Decode(t.f); err != nil { - return err - } - if l.ID == 0 { + log.Error("%s may lost logs from %d, invalid log 0", data.Name(), pos) break } + if t.first == 0 { + t.first = l.ID + } + + if t.last == 0 { + t.last = l.ID + } else if l.ID <= t.last { + log.Error("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID) + break + } + + t.last = l.ID t.lastTime = l.CreateTime - if err := tw.StoreLog(&l); err != nil { - return err - } + binary.BigEndian.PutUint32(b, uint32(pos)) + meta.Write(b) + + pos = nextPos + + t.lastTime = l.CreateTime } - t.close() + var e error + if err := meta.Close(false); err != nil { + e = err + } - var tr *tableReader - if tr, err = tw.Flush(); err != nil { + data.SetOffset(pos) + + if err = data.Close(true); err != nil { return err } - t.first = tr.first - t.last = tr.last - t.offsetStartPos = tr.offsetStartPos - t.offsetLen = tr.offsetLen - - defer tr.Close() - - os.Remove(t.name) - - if err := os.Rename(tmpName, t.name); err != nil { - return err - } - - return nil + return e } -func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) { - _, err := t.f.Seek(int64(pos), os.SEEK_SET) - if err != nil { - return 0, err - } - - dataLen, err := l.DecodeHead(t.f) +func (t *tableReader) decodeLogHead(l *Log, r io.ReaderAt, pos int64) (int64, error) { + dataLen, err := l.DecodeHeadAt(r, pos) if err != nil { return 0, err } @@ -317,23 +284,20 @@ func (t *tableReader) GetLog(id uint64, l *Log) error { t.lastReadTime.Set(time.Now().Unix()) t.Lock() - defer t.Unlock() if err := t.openTable(); err != nil { t.close() + t.Unlock() return err } + t.Unlock() pos, err := t.getLogPos(int(id - t.first)) if err != nil { return err } - if _, err := t.f.Seek(int64(pos), os.SEEK_SET); err != nil { - return err - } - - if err := l.Decode(t.f); err != nil { + if err := l.DecodeAt(t.data, int64(pos)); err != nil { return err } else if l.ID != id { return fmt.Errorf("invalid log id %d != %d", l.ID, id) @@ -344,16 +308,17 @@ func (t *tableReader) GetLog(id uint64, l *Log) error { func (t *tableReader) openTable() error { var err error - if t.f == nil { - if t.f, err = os.Open(t.name); err != nil { + if t.data == nil { + if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil { return err } } - if t.m == nil { - if t.m, err = mmap.MapRegion(t.f, int(t.offsetLen), mmap.RDONLY, 0, t.offsetStartPos); err != nil { + if t.meta == nil { + if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil { return err } + } return nil @@ -362,31 +327,25 @@ func (t *tableReader) openTable() error { type tableWriter struct { sync.RWMutex - wf *os.File - rf *os.File - - wb *bufio.Writer - - rm sync.Mutex + data writeFile + meta writeFile base string - name string index int64 - first uint64 - last uint64 - - offsetPos int64 - offsetBuf []byte + first uint64 + last uint64 + lastTime uint32 maxLogSize int64 closed bool syncType int - lastTime uint32 - // cache *logLRUCache + posBuf []byte + + useMmap bool } func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { @@ -397,23 +356,24 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { t := new(tableWriter) t.base = base - t.name = path.Join(base, fmtTableName(index)) t.index = index - t.offsetPos = 0 t.maxLogSize = maxLogSize - //maybe config later? - t.wb = bufio.NewWriterSize(ioutil.Discard, 4096) - t.closed = false - //maybe use config later - // t.cache = newLogLRUCache(1024*1024, 1000) + t.posBuf = make([]byte, 4) + + //todo, use config + t.useMmap = useMmap return t } +func (t *tableWriter) String() string { + return fmt.Sprintf("%d", t.index) +} + func (t *tableWriter) SetMaxLogSize(s int64) { t.maxLogSize = s } @@ -423,26 +383,27 @@ func (t *tableWriter) SetSyncType(tp int) { } func (t *tableWriter) close() { - if t.rf != nil { - t.rf.Close() - t.rf = nil + if t.meta != nil { + if err := t.meta.Close(false); err != nil { + log.Fatal("close log meta error %s", err.Error()) + } + t.meta = nil } - if t.wf != nil { - t.wf.Close() - t.wf = nil + if t.data != nil { + if err := t.data.Close(true); err != nil { + log.Fatal("close log data error %s", err.Error()) + } + t.data = nil } - - t.wb.Reset(ioutil.Discard) } func (t *tableWriter) Close() { t.Lock() - defer t.Unlock() - t.closed = true t.close() + t.Unlock() } func (t *tableWriter) First() uint64 { @@ -459,88 +420,31 @@ func (t *tableWriter) Last() uint64 { return id } -func (t *tableWriter) reset() { +func (t *tableWriter) Flush() (*tableReader, error) { + t.Lock() + + if t.data == nil || t.meta == nil { + t.Unlock() + return nil, errNilHandler + } + + tr := new(tableReader) + tr.base = t.base + tr.index = t.index + + tr.first = t.first + tr.last = t.last + tr.lastTime = t.lastTime + //todo config + tr.useMmap = useMmap + t.close() t.first = 0 t.last = 0 t.index = t.index + 1 - t.name = path.Join(t.base, fmtTableName(t.index)) - t.offsetBuf = t.offsetBuf[0:0] - t.offsetPos = 0 - // t.cache.Reset() -} -func (t *tableWriter) Flush() (*tableReader, error) { - t.Lock() - defer t.Unlock() - - if t.wf == nil { - return nil, errNilHandler - } - - defer t.reset() - - tr := new(tableReader) - tr.name = t.name - tr.index = t.index - - st, _ := t.wf.Stat() - - tr.first = t.first - tr.last = t.last - - if n, err := t.wf.Write(log0Data); err != nil { - return nil, fmt.Errorf("flush log0data error %s", err.Error()) - } else if n != len(log0Data) { - return nil, fmt.Errorf("flush log0data only %d != %d", n, len(log0Data)) - } - - st, _ = t.wf.Stat() - - if m := st.Size() % pageSize; m != 0 { - padding := pageSize - m - if n, err := t.wf.Write(make([]byte, padding)); err != nil { - return nil, fmt.Errorf("flush log padding error %s", err.Error()) - } else if n != int(padding) { - return nil, fmt.Errorf("flush log padding error %d != %d", n, padding) - } - } - - st, _ = t.wf.Stat() - - if st.Size()%pageSize != 0 { - return nil, fmt.Errorf("invalid offset start pos, %d", st.Size()) - } - - tr.offsetStartPos = st.Size() - tr.offsetLen = uint32(len(t.offsetBuf)) - - if n, err := t.wf.Write(t.offsetBuf); err != nil { - log.Error("flush offset buffer error %s", err.Error()) - return nil, err - } else if n != len(t.offsetBuf) { - log.Error("flush offset buffer only %d != %d", n, len(t.offsetBuf)) - return nil, io.ErrShortWrite - } - - if err := binary.Write(t.wf, binary.BigEndian, tr.offsetStartPos); err != nil { - log.Error("flush offset start pos error %s", err.Error()) - return nil, err - } - - if err := binary.Write(t.wf, binary.BigEndian, tr.offsetLen); err != nil { - log.Error("flush offset len error %s", err.Error()) - return nil, err - } - - if n, err := t.wf.Write(magic); err != nil { - log.Error("flush magic data error %s", err.Error()) - return nil, err - } else if n != len(magic) { - log.Error("flush magic data only %d != %d", n, len(magic)) - return nil, io.ErrShortWrite - } + t.Unlock() return tr, nil } @@ -553,6 +457,22 @@ func (t *tableWriter) StoreLog(l *Log) error { return err } +func (t *tableWriter) openFile() error { + var err error + if t.data == nil { + if t.data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), t.maxLogSize+t.maxLogSize/10+int64(len(magic))); err != nil { + return err + } + } + + if t.meta == nil { + if t.meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)); err != nil { + return err + } + } + return err +} + func (t *tableWriter) storeLog(l *Log) error { if l.ID == 0 { return ErrStoreLogID @@ -566,63 +486,34 @@ func (t *tableWriter) storeLog(l *Log) error { return ErrStoreLogID } - if t.last-t.first+1 > maxLogNumInFile { + if t.data != nil && t.data.Offset() > t.maxLogSize { return errTableNeedFlush } var err error - if t.wf == nil { - if t.wf, err = os.OpenFile(t.name, os.O_CREATE|os.O_WRONLY, 0644); err != nil { - return err - } - t.wb.Reset(t.wf) - } - - if t.offsetBuf == nil { - t.offsetBuf = make([]byte, 0, maxLogNumInFile*4) - } - - // st, _ := t.wf.Stat() - // if st.Size() >= t.maxLogSize { - // return errTableNeedFlush - // } - - if t.offsetPos >= t.maxLogSize { - return errTableNeedFlush - } - - offsetPos := t.offsetPos - - if err = l.Encode(t.wb); err != nil { + if err = t.openFile(); err != nil { return err } - if err = t.wb.Flush(); err != nil { + offsetPos := t.data.Offset() + if err = l.Encode(t.data); err != nil { return err } - // buf, _ := l.Marshal() - // if n, err := t.wf.Write(buf); err != nil { - // return err - // } else if n != len(buf) { - // return io.ErrShortWrite - // } + binary.BigEndian.PutUint32(t.posBuf, uint32(offsetPos)) + if _, err = t.meta.Write(t.posBuf); err != nil { + return err + } - t.offsetPos += int64(l.Size()) - - t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(uint32(offsetPos))...) if t.first == 0 { t.first = l.ID } t.last = l.ID - t.lastTime = l.CreateTime - // t.cache.Set(l.ID, buf) - if t.syncType == 2 { - if err := t.wf.Sync(); err != nil { + if err := t.data.Sync(); err != nil { log.Error("sync table error %s", err.Error()) } } @@ -638,17 +529,14 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { return ErrLogNotFound } - // if cl := t.cache.Get(id); cl != nil { - // if err := l.Unmarshal(cl); err == nil && l.ID == id { - // return nil - // } else { - // t.cache.Delete(id) - // } - // } + var buf [4]byte + if _, err := t.meta.ReadAt(buf[0:4], int64((id-t.first)*4)); err != nil { + return err + } - offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:]) + offset := binary.BigEndian.Uint32(buf[0:4]) - if err := t.getLog(l, int64(offset)); err != nil { + if err := l.DecodeAt(t.data, int64(offset)); err != nil { return err } else if l.ID != id { return fmt.Errorf("invalid log id %d != %d", id, l.ID) @@ -661,32 +549,17 @@ func (t *tableWriter) Sync() error { t.Lock() var err error - if t.wf != nil { - err = t.wf.Sync() + if t.data != nil { + err = t.data.Sync() + t.Unlock() + return err } + + if t.meta != nil { + err = t.meta.Sync() + } + t.Unlock() return err } - -func (t *tableWriter) getLog(l *Log, pos int64) error { - t.rm.Lock() - defer t.rm.Unlock() - - var err error - if t.rf == nil { - if t.rf, err = os.Open(t.name); err != nil { - return err - } - } - - if _, err = t.rf.Seek(pos, os.SEEK_SET); err != nil { - return err - } - - if err = l.Decode(t.rf); err != nil { - return err - } - - return nil -} diff --git a/rpl/file_table_test.go b/rpl/file_table_test.go index c3ac2b2..b5cfb4b 100644 --- a/rpl/file_table_test.go +++ b/rpl/file_table_test.go @@ -10,6 +10,17 @@ import ( ) func TestFileTable(t *testing.T) { + useMmap = true + testFileTable(t) + + useMmap = false + testFileTable(t) + useMmap = true +} + +func testFileTable(t *testing.T) { + log.SetLevel(log.LevelInfo) + base, err := ioutil.TempDir("", "test_table") if err != nil { t.Fatal(err) @@ -50,10 +61,6 @@ func TestFileTable(t *testing.T) { var ll Log - if err = ll.Unmarshal(log0Data); err != nil { - t.Fatal(err) - } - for i := 0; i < 10; i++ { if err := w.GetLog(uint64(i+1), &ll); err != nil { t.Fatal(err) @@ -70,7 +77,7 @@ func TestFileTable(t *testing.T) { var r *tableReader - name := w.name + name := fmtTableDataName(w.base, w.index) if r, err = w.Flush(); err != nil { t.Fatal(err) @@ -130,26 +137,19 @@ func TestFileTable(t *testing.T) { t.Fatal("must nil") } - st, _ := r.f.Stat() - s := st.Size() + s := int64(r.data.Size()) r.Close() log.SetLevel(log.LevelFatal) testRepair(t, name, 1, s, 11) - testRepair(t, name, 1, s, 32) - testRepair(t, name, 1, s, 42) - testRepair(t, name, 1, s, 72) + testRepair(t, name, 1, s, 20) - if err := os.Truncate(name, s-(73+4096)); err != nil { + if err := os.Truncate(name, s-21); err != nil { t.Fatal(err) } - if r, err = newTableReader(base, 1); err == nil { - t.Fatal("can not repair") - } - if r, err := w.Flush(); err != nil { t.Fatal(err) } else { @@ -159,7 +159,7 @@ func TestFileTable(t *testing.T) { if r, err = newTableReader(base, 2); err != nil { t.Fatal(err) } - defer r.Close() + r.Close() } func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) { @@ -178,7 +178,7 @@ func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) var ll Log for i := 0; i < 10; i++ { if err := r.GetLog(uint64(i+1), &ll); err != nil { - t.Fatal(err) + t.Fatal(err, i) } else if len(ll.Data) != 4096 { t.Fatal(len(ll.Data)) } else if ll.Data[0] != byte(i+1) { @@ -190,9 +190,8 @@ func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) t.Fatal("must nil") } - st, _ := r.f.Stat() - if s != st.Size() { - t.Fatalf("repair error size %d != %d", s, st.Size()) + if s != int64(r.data.Size()) { + t.Fatalf("repair error size %d != %d", s, r.data.Size()) } } diff --git a/rpl/log.go b/rpl/log.go index 0b10fd4..ad0b48c 100644 --- a/rpl/log.go +++ b/rpl/log.go @@ -81,13 +81,8 @@ func (l *Log) Decode(r io.Reader) error { return err } - l.Data = l.Data[0:0] + l.growData(int(length)) - if cap(l.Data) >= int(length) { - l.Data = l.Data[0:length] - } else { - l.Data = make([]byte, length) - } if _, err := io.ReadFull(r, l.Data); err != nil { return err } @@ -103,6 +98,60 @@ func (l *Log) DecodeHead(r io.Reader) (uint32, error) { return 0, err } + length := l.decodeHeadBuf(buf) + + headPool.Put(buf) + + return length, nil +} + +func (l *Log) DecodeAt(r io.ReaderAt, pos int64) error { + length, err := l.DecodeHeadAt(r, pos) + if err != nil { + return err + } + + l.growData(int(length)) + var n int + n, err = r.ReadAt(l.Data, pos+int64(LogHeadSize)) + if err == io.EOF && n == len(l.Data) { + err = nil + } + + return err +} + +func (l *Log) growData(length int) { + l.Data = l.Data[0:0] + + if cap(l.Data) >= length { + l.Data = l.Data[0:length] + } else { + l.Data = make([]byte, length) + } +} + +func (l *Log) DecodeHeadAt(r io.ReaderAt, pos int64) (uint32, error) { + buf := headPool.Get().([]byte) + + n, err := r.ReadAt(buf, pos) + if err != nil && err != io.EOF { + headPool.Put(buf) + + return 0, err + } + + length := l.decodeHeadBuf(buf) + headPool.Put(buf) + + if err == io.EOF && (length != 0 || n != len(buf)) { + return 0, err + } + + return length, nil +} + +func (l *Log) decodeHeadBuf(buf []byte) uint32 { pos := 0 l.ID = binary.BigEndian.Uint64(buf[pos:]) pos += 8 @@ -114,8 +163,5 @@ func (l *Log) DecodeHead(r io.Reader) (uint32, error) { pos++ length := binary.BigEndian.Uint32(buf[pos:]) - - headPool.Put(buf) - - return length, nil + return length } diff --git a/rpl/loglrucache.go b/rpl/loglrucache.go deleted file mode 100644 index 3dcbaf3..0000000 --- a/rpl/loglrucache.go +++ /dev/null @@ -1,95 +0,0 @@ -package rpl - -import ( - "container/list" - "encoding/binary" -) - -type logLRUCache struct { - itemsList *list.List - itemsMap map[uint64]*list.Element - size int - capability int - maxNum int -} - -func newLogLRUCache(capability int, maxNum int) *logLRUCache { - if capability <= 0 { - capability = 1024 * 1024 - } - - if maxNum <= 0 { - maxNum = 16 - } - - return &logLRUCache{ - itemsList: list.New(), - itemsMap: make(map[uint64]*list.Element), - size: 0, - capability: capability, - maxNum: maxNum, - } -} - -func (cache *logLRUCache) Set(id uint64, data []byte) { - elem, ok := cache.itemsMap[id] - if ok { - //we may not enter here - // item already exists, so move it to the front of the list and update the data - cache.itemsList.MoveToFront(elem) - ol := elem.Value.([]byte) - elem.Value = data - cache.size += (len(data) - len(ol)) - } else { - cache.size += len(data) - - // item doesn't exist, so add it to front of list - elem = cache.itemsList.PushFront(data) - cache.itemsMap[id] = elem - } - - // evict LRU entry if the cache is full - for cache.size > cache.capability || cache.itemsList.Len() > cache.maxNum { - removedElem := cache.itemsList.Back() - l := removedElem.Value.([]byte) - cache.itemsList.Remove(removedElem) - delete(cache.itemsMap, binary.BigEndian.Uint64(l[0:8])) - - cache.size -= len(l) - if cache.size <= 0 { - cache.size = 0 - } - } -} - -func (cache *logLRUCache) Get(id uint64) []byte { - elem, ok := cache.itemsMap[id] - if !ok { - return nil - } - - // item exists, so move it to front of list and return it - cache.itemsList.MoveToFront(elem) - l := elem.Value.([]byte) - return l -} - -func (cache *logLRUCache) Delete(id uint64) { - elem, ok := cache.itemsMap[id] - if !ok { - return - } - - cache.itemsList.Remove(elem) - delete(cache.itemsMap, id) -} - -func (cache *logLRUCache) Len() int { - return cache.itemsList.Len() -} - -func (cache *logLRUCache) Reset() { - cache.itemsList = list.New() - cache.itemsMap = make(map[uint64]*list.Element) - cache.size = 0 -} diff --git a/rpl/loglrucache_test.go b/rpl/loglrucache_test.go deleted file mode 100644 index 88a2923..0000000 --- a/rpl/loglrucache_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package rpl - -import ( - "testing" -) - -func TestLogLRUCache(t *testing.T) { - c := newLogLRUCache(180, 10) - - var i uint64 - for i = 1; i <= 10; i++ { - l := &Log{i, 0, 0, []byte("0")} - b, _ := l.Marshal() - c.Set(l.ID, b) - } - - for i = 1; i <= 10; i++ { - if l := c.Get(i); l == nil { - t.Fatal("must exist", i) - } - } - - for i = 11; i <= 20; i++ { - l := &Log{i, 0, 0, []byte("0")} - b, _ := l.Marshal() - c.Set(l.ID, b) - } - - for i = 1; i <= 10; i++ { - if l := c.Get(i); l != nil { - t.Fatal("must not exist", i) - } - } - - c.Get(11) - - l := &Log{21, 0, 0, []byte("0")} - b, _ := l.Marshal() - c.Set(l.ID, b) - - if l := c.Get(12); l != nil { - t.Fatal("must nil", 12) - } - - if l := c.Get(11); l == nil { - t.Fatal("must not nil", 11) - } -}