add padding for pagesize, not use mmap, add later

This commit is contained in:
siddontang 2014-11-06 17:37:22 +08:00
parent 501505cea2
commit 109468e0f3
3 changed files with 79 additions and 27 deletions

View File

@ -35,8 +35,8 @@ const (
if data has no magic data, it means that we don't close replication gracefully. if data has no magic data, it means that we don't close replication gracefully.
so we must repair the log data so we must repair the log data
log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data
split data = log0 data split data = log0 data + [padding 0] -> file % pagesize() == 0
log0: id 0, create time 0, compression 0, data len 0, data "" log0: id 0, create time 0, compression 0, data len 7, data "ledisdb"
log offset: bigendian uint32 | bigendian uint32 log offset: bigendian uint32 | bigendian uint32

View File

@ -11,18 +11,25 @@ import (
"io" "io"
"os" "os"
"path" "path"
"reflect"
"sync" "sync"
"syscall"
"time" "time"
) )
var ( var (
magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17") magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17")
log0Data = make([]byte, 17) log0 = Log{0, 1, 1, []byte("ledisdb")}
log0Data = []byte{}
errTableNeedFlush = errors.New("write table need flush") errTableNeedFlush = errors.New("write table need flush")
errTableFrozen = errors.New("write table is frozen") errTableFrozen = errors.New("write table is frozen")
pageSize = int64(4096)
) )
func init() {
log0Data, _ = log0.Marshal()
pageSize = int64(os.Getpagesize())
}
const tableReaderKeepaliveInterval int64 = 30 const tableReaderKeepaliveInterval int64 = 30
func fmtTableName(index int64) string { func fmtTableName(index int64) string {
@ -36,7 +43,7 @@ type tableReader struct {
index int64 index int64
f *os.File f *os.File
m []byte pf *os.File
first uint64 first uint64
last uint64 last uint64
@ -78,9 +85,9 @@ func (t *tableReader) Close() {
} }
func (t *tableReader) close() { func (t *tableReader) close() {
if t.m != nil { if t.pf != nil {
syscall.Munmap(t.m) t.pf.Close()
t.m = nil t.pf = nil
} }
if t.f != nil { if t.f != nil {
@ -98,6 +105,18 @@ func (t *tableReader) Keepalived() bool {
return true return true
} }
func (t *tableReader) getLogPos(index int) (uint32, error) {
if _, err := t.pf.Seek(t.offsetStartPos+int64(index*4), os.SEEK_SET); err != nil {
return 0, err
}
var pos uint32
if err := binary.Read(t.pf, binary.BigEndian, &pos); err != nil {
return 0, err
}
return pos, nil
}
func (t *tableReader) check() error { func (t *tableReader) check() error {
var err error var err error
@ -111,7 +130,8 @@ func (t *tableReader) check() error {
return fmt.Errorf("file size %d too short", st.Size()) return fmt.Errorf("file size %d too short", st.Size())
} }
if _, err = t.f.Seek(-32, os.SEEK_END); err != nil { var pos int64
if pos, err = t.f.Seek(-32, os.SEEK_END); err != nil {
return err return err
} }
@ -119,6 +139,8 @@ func (t *tableReader) check() error {
return err return err
} else if t.offsetStartPos >= st.Size() { } else if t.offsetStartPos >= st.Size() {
return fmt.Errorf("invalid offset start pos %d, file size %d", t.offsetStartPos, st.Size()) return fmt.Errorf("invalid offset start pos %d, file size %d", t.offsetStartPos, st.Size())
} else if t.offsetStartPos%pageSize != 0 {
return fmt.Errorf("invalid offset start pos %d, must page size %d multi", t.offsetStartPos, pageSize)
} }
if err = binary.Read(t.f, binary.BigEndian, &t.offsetLen); err != nil { if err = binary.Read(t.f, binary.BigEndian, &t.offsetLen); err != nil {
@ -129,6 +151,10 @@ func (t *tableReader) check() error {
return fmt.Errorf("invalid offset len %d, must 4 multiple", t.offsetLen) return fmt.Errorf("invalid offset len %d, must 4 multiple", t.offsetLen)
} }
if t.offsetStartPos+int64(t.offsetLen) != pos {
return fmt.Errorf("invalid offset %d %d", t.offsetStartPos, t.offsetLen)
}
b := make([]byte, 20) b := make([]byte, 20)
if _, err = t.f.Read(b); err != nil { if _, err = t.f.Read(b); err != nil {
return err return err
@ -136,12 +162,12 @@ func (t *tableReader) check() error {
return fmt.Errorf("invalid magic data %q", b) return fmt.Errorf("invalid magic data %q", b)
} }
if t.m, err = syscall.Mmap(int(t.f.Fd()), t.offsetStartPos, int(t.offsetLen), syscall.PROT_READ, syscall.MAP_PRIVATE); err != nil { if t.pf, err = os.Open(t.name); err != nil {
return err return err
} }
firstLogPos := binary.BigEndian.Uint32(t.m) firstLogPos, _ := t.getLogPos(0)
lastLogPos := binary.BigEndian.Uint32(t.m[len(t.m)-4:]) lastLogPos, _ := t.getLogPos(int(t.offsetLen/4 - 1))
if firstLogPos != 0 { if firstLogPos != 0 {
return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos) return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos)
@ -158,8 +184,16 @@ func (t *tableReader) check() error {
var n int64 var n int64
if n, err = t.decodeLogHead(&l, int64(lastLogPos)); err != nil { if n, err = t.decodeLogHead(&l, int64(lastLogPos)); err != nil {
return fmt.Errorf("decode last log err %s", err.Error()) return fmt.Errorf("decode last log err %s", err.Error())
} else if n+int64(len(log0Data)) != t.offsetStartPos { } else {
return fmt.Errorf("invalid last log, no proper log0") var l0 Log
if _, err := t.f.Seek(n, os.SEEK_SET); err != nil {
return fmt.Errorf("seek logo err %s", err.Error())
} else if err = l0.Decode(t.f); err != nil {
println(lastLogPos, n, l0.ID, l0.CreateTime, l0.Compression)
return fmt.Errorf("decode log0 err %s", err.Error())
} else if !reflect.DeepEqual(l0, log0) {
return fmt.Errorf("invalid log0 %#v != %#v", l0, log0)
}
} }
t.last = l.ID t.last = l.ID
@ -265,7 +299,10 @@ func (t *tableReader) GetLog(id uint64, l *Log) error {
return err return err
} }
pos := binary.BigEndian.Uint32(t.m[(id-t.first)*4:]) pos, err := t.getLogPos(int(id - t.first))
if err != nil {
return err
}
if _, err := t.f.Seek(int64(pos), os.SEEK_SET); err != nil { if _, err := t.f.Seek(int64(pos), os.SEEK_SET); err != nil {
return err return err
@ -284,12 +321,12 @@ func (t *tableReader) openTable() error {
var err error var err error
if t.f == nil { if t.f == nil {
if t.f, err = os.Open(t.name); err != nil { if t.f, err = os.Open(t.name); err != nil {
return fmt.Errorf("open %s error %s", t.name, err.Error()) return err
} }
} }
if t.m == nil { if t.pf == nil {
if t.m, err = syscall.Mmap(int(t.f.Fd()), t.offsetStartPos, int(t.offsetLen), syscall.PROT_READ, syscall.MAP_PRIVATE); err != nil { if t.pf, err = os.Open(t.name); err != nil {
return err return err
} }
} }
@ -378,20 +415,35 @@ func (t *tableWriter) Flush() (*tableReader, error) {
st, _ := t.wf.Stat() st, _ := t.wf.Stat()
tr.offsetStartPos = st.Size() + int64(len(log0Data))
tr.offsetLen = uint32(len(t.offsetBuf))
tr.first = t.first tr.first = t.first
tr.last = t.last tr.last = t.last
if n, err := t.wf.Write(log0Data); err != nil { if n, err := t.wf.Write(log0Data); err != nil {
log.Error("flush log0data error %s", err.Error()) return nil, fmt.Errorf("flush log0data error %s", err.Error())
return nil, err
} else if n != len(log0Data) { } else if n != len(log0Data) {
log.Error("flush log0data only %d != %d", n, len(log0Data)) return nil, fmt.Errorf("flush log0data only %d != %d", n, len(log0Data))
return nil, io.ErrShortWrite
} }
st, _ = t.wf.Stat()
if m := st.Size() % pageSize; m != 0 {
padding := pageSize - m
if n, err := t.wf.Write(make([]byte, padding)); err != nil {
return nil, fmt.Errorf("flush log padding error %s", err.Error())
} else if n != int(padding) {
return nil, fmt.Errorf("flush log padding error %d != %d", n, padding)
}
}
st, _ = t.wf.Stat()
if st.Size()%pageSize != 0 {
return nil, fmt.Errorf("invalid offset start pos, %d", st.Size())
}
tr.offsetStartPos = st.Size()
tr.offsetLen = uint32(len(t.offsetBuf))
if n, err := t.wf.Write(t.offsetBuf); err != nil { if n, err := t.wf.Write(t.offsetBuf); err != nil {
log.Error("flush offset buffer error %s", err.Error()) log.Error("flush offset buffer error %s", err.Error())
return nil, err return nil, err

View File

@ -142,7 +142,7 @@ func TestFileTable(t *testing.T) {
testRepair(t, name, 1, s, 42) testRepair(t, name, 1, s, 42)
testRepair(t, name, 1, s, 72) testRepair(t, name, 1, s, 72)
if err := os.Truncate(name, s-73); err != nil { if err := os.Truncate(name, s-(73+4096)); err != nil {
t.Fatal(err) t.Fatal(err)
} }