diff --git a/rpl/file_store.go b/rpl/file_store.go index eaf2321..873a0e2 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -35,8 +35,8 @@ const ( if data has no magic data, it means that we don't close replication gracefully. so we must repair the log data log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data - split data = log0 data - log0: id 0, create time 0, compression 0, data len 0, data "" + split data = log0 data + [padding 0] -> file % pagesize() == 0 + log0: id 0, create time 0, compression 0, data len 7, data "ledisdb" log offset: bigendian uint32 | bigendian uint32 diff --git a/rpl/file_table.go b/rpl/file_table.go index 16a6b2c..b8de065 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -11,18 +11,25 @@ import ( "io" "os" "path" + "reflect" "sync" - "syscall" "time" ) var ( magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17") - log0Data = make([]byte, 17) + log0 = Log{0, 1, 1, []byte("ledisdb")} + log0Data = []byte{} errTableNeedFlush = errors.New("write table need flush") errTableFrozen = errors.New("write table is frozen") + pageSize = int64(4096) ) +func init() { + log0Data, _ = log0.Marshal() + pageSize = int64(os.Getpagesize()) +} + const tableReaderKeepaliveInterval int64 = 30 func fmtTableName(index int64) string { @@ -35,8 +42,8 @@ type tableReader struct { name string index int64 - f *os.File - m []byte + f *os.File + pf *os.File first uint64 last uint64 @@ -78,9 +85,9 @@ func (t *tableReader) Close() { } func (t *tableReader) close() { - if t.m != nil { - syscall.Munmap(t.m) - t.m = nil + if t.pf != nil { + t.pf.Close() + t.pf = nil } if t.f != nil { @@ -98,6 +105,18 @@ func (t *tableReader) Keepalived() bool { return true } +func (t *tableReader) getLogPos(index int) (uint32, error) { + if _, err := t.pf.Seek(t.offsetStartPos+int64(index*4), os.SEEK_SET); err != nil { + return 0, err + } + + var pos uint32 + if err := binary.Read(t.pf, binary.BigEndian, &pos); err != nil { + return 0, err + } + return pos, nil +} + func (t *tableReader) check() error { var err error @@ -111,7 +130,8 @@ func (t *tableReader) check() error { return fmt.Errorf("file size %d too short", st.Size()) } - if _, err = t.f.Seek(-32, os.SEEK_END); err != nil { + var pos int64 + if pos, err = t.f.Seek(-32, os.SEEK_END); err != nil { return err } @@ -119,6 +139,8 @@ func (t *tableReader) check() error { return err } else if t.offsetStartPos >= st.Size() { return fmt.Errorf("invalid offset start pos %d, file size %d", t.offsetStartPos, st.Size()) + } else if t.offsetStartPos%pageSize != 0 { + return fmt.Errorf("invalid offset start pos %d, must page size %d multi", t.offsetStartPos, pageSize) } if err = binary.Read(t.f, binary.BigEndian, &t.offsetLen); err != nil { @@ -129,6 +151,10 @@ func (t *tableReader) check() error { return fmt.Errorf("invalid offset len %d, must 4 multiple", t.offsetLen) } + if t.offsetStartPos+int64(t.offsetLen) != pos { + return fmt.Errorf("invalid offset %d %d", t.offsetStartPos, t.offsetLen) + } + b := make([]byte, 20) if _, err = t.f.Read(b); err != nil { return err @@ -136,12 +162,12 @@ func (t *tableReader) check() error { return fmt.Errorf("invalid magic data %q", b) } - if t.m, err = syscall.Mmap(int(t.f.Fd()), t.offsetStartPos, int(t.offsetLen), syscall.PROT_READ, syscall.MAP_PRIVATE); err != nil { + if t.pf, err = os.Open(t.name); err != nil { return err } - firstLogPos := binary.BigEndian.Uint32(t.m) - lastLogPos := binary.BigEndian.Uint32(t.m[len(t.m)-4:]) + firstLogPos, _ := t.getLogPos(0) + lastLogPos, _ := t.getLogPos(int(t.offsetLen/4 - 1)) if firstLogPos != 0 { return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos) @@ -158,8 +184,16 @@ func (t *tableReader) check() error { var n int64 if n, err = t.decodeLogHead(&l, int64(lastLogPos)); err != nil { return fmt.Errorf("decode last log err %s", err.Error()) - } else if n+int64(len(log0Data)) != t.offsetStartPos { - return fmt.Errorf("invalid last log, no proper log0") + } else { + var l0 Log + if _, err := t.f.Seek(n, os.SEEK_SET); err != nil { + return fmt.Errorf("seek logo err %s", err.Error()) + } else if err = l0.Decode(t.f); err != nil { + println(lastLogPos, n, l0.ID, l0.CreateTime, l0.Compression) + return fmt.Errorf("decode log0 err %s", err.Error()) + } else if !reflect.DeepEqual(l0, log0) { + return fmt.Errorf("invalid log0 %#v != %#v", l0, log0) + } } t.last = l.ID @@ -265,7 +299,10 @@ func (t *tableReader) GetLog(id uint64, l *Log) error { return err } - pos := binary.BigEndian.Uint32(t.m[(id-t.first)*4:]) + pos, err := t.getLogPos(int(id - t.first)) + if err != nil { + return err + } if _, err := t.f.Seek(int64(pos), os.SEEK_SET); err != nil { return err @@ -284,12 +321,12 @@ func (t *tableReader) openTable() error { var err error if t.f == nil { if t.f, err = os.Open(t.name); err != nil { - return fmt.Errorf("open %s error %s", t.name, err.Error()) + return err } } - if t.m == nil { - if t.m, err = syscall.Mmap(int(t.f.Fd()), t.offsetStartPos, int(t.offsetLen), syscall.PROT_READ, syscall.MAP_PRIVATE); err != nil { + if t.pf == nil { + if t.pf, err = os.Open(t.name); err != nil { return err } } @@ -378,20 +415,35 @@ func (t *tableWriter) Flush() (*tableReader, error) { st, _ := t.wf.Stat() - tr.offsetStartPos = st.Size() + int64(len(log0Data)) - tr.offsetLen = uint32(len(t.offsetBuf)) - tr.first = t.first tr.last = t.last if n, err := t.wf.Write(log0Data); err != nil { - log.Error("flush log0data error %s", err.Error()) - return nil, err + return nil, fmt.Errorf("flush log0data error %s", err.Error()) } else if n != len(log0Data) { - log.Error("flush log0data only %d != %d", n, len(log0Data)) - return nil, io.ErrShortWrite + return nil, fmt.Errorf("flush log0data only %d != %d", n, len(log0Data)) } + st, _ = t.wf.Stat() + + if m := st.Size() % pageSize; m != 0 { + padding := pageSize - m + if n, err := t.wf.Write(make([]byte, padding)); err != nil { + return nil, fmt.Errorf("flush log padding error %s", err.Error()) + } else if n != int(padding) { + return nil, fmt.Errorf("flush log padding error %d != %d", n, padding) + } + } + + st, _ = t.wf.Stat() + + if st.Size()%pageSize != 0 { + return nil, fmt.Errorf("invalid offset start pos, %d", st.Size()) + } + + tr.offsetStartPos = st.Size() + tr.offsetLen = uint32(len(t.offsetBuf)) + if n, err := t.wf.Write(t.offsetBuf); err != nil { log.Error("flush offset buffer error %s", err.Error()) return nil, err diff --git a/rpl/file_table_test.go b/rpl/file_table_test.go index 2b9e906..c3ac2b2 100644 --- a/rpl/file_table_test.go +++ b/rpl/file_table_test.go @@ -142,7 +142,7 @@ func TestFileTable(t *testing.T) { testRepair(t, name, 1, s, 42) testRepair(t, name, 1, s, 72) - if err := os.Truncate(name, s-73); err != nil { + if err := os.Truncate(name, s-(73+4096)); err != nil { t.Fatal(err) }