refactor replication, not compatibility

use data and meta file for a table
use mmap for file read and write
This commit is contained in:
siddontang 2014-11-20 17:33:38 +08:00
parent 8ec2730235
commit 62440fef91
12 changed files with 708 additions and 545 deletions

View File

@ -203,7 +203,7 @@ func (cfg *Config) adjust() {
cfg.RocksDB.adjust()
cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays)
cfg.Replication.MaxLogFileNum = getDefault(10, cfg.Replication.MaxLogFileNum)
cfg.Replication.MaxLogFileNum = getDefault(50, cfg.Replication.MaxLogFileNum)
cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize)
cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize)
cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval)

View File

@ -128,11 +128,11 @@ store_name = "file"
# Expire write ahead logs after the given days
expired_log_days = 7
# for file store, if 0, use default 1G, max is 4G
# for file store, if 0, use default 256MB, max is 1G
max_log_file_size = 0
# for file store, if 0, use default 10
max_log_file_num = 10
# for file store, if 0, use default 50
max_log_file_num = 0
# Sync log to disk if possible
# 0: no sync

View File

@ -128,11 +128,11 @@ store_name = "file"
# Expire write ahead logs after the given days
expired_log_days = 7
# for file store, if 0, use default 1G, max is 4G
# for file store, if 0, use default 256MB, max is 1G
max_log_file_size = 0
# for file store, if 0, use default 10
max_log_file_num = 10
# for file store, if 0, use default 50
max_log_file_num = 0
# Sync log to disk if possible
# 0: no sync

View File

@ -42,6 +42,7 @@ func TestReplication(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer master.Close()
cfgS := config.NewConfigDefault()
cfgS.DataDir = "/tmp/test_repl/slave"
@ -54,6 +55,7 @@ func TestReplication(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer slave.Close()
db, _ := master.Select(0)
db.Set([]byte("a"), []byte("value"))

View File

@ -195,7 +195,7 @@ func testTx(t *testing.T, name string) {
cfg.DBName = name
cfg.LMDB.MapSize = 10 * 1024 * 1024
cfg.UseReplication = true
//cfg.UseReplication = true
os.RemoveAll(cfg.DataDir)

383
rpl/file_io.go Normal file
View File

@ -0,0 +1,383 @@
package rpl
import (
"fmt"
"github.com/edsrzf/mmap-go"
"github.com/siddontang/go/log"
"io"
"os"
)
//like leveldb or rocksdb file interface, haha!
type writeFile interface {
Sync() error
Write(b []byte) (n int, err error)
Close(addMagic bool) error
ReadAt(buf []byte, offset int64) (int, error)
Truncate(size int64) error
SetOffset(o int64)
Name() string
Size() int
Offset() int64
}
type readFile interface {
ReadAt(buf []byte, offset int64) (int, error)
Close() error
Size() int
Name() string
}
type rawWriteFile struct {
writeFile
f *os.File
offset int64
name string
}
func newRawWriteFile(name string, size int64) (writeFile, error) {
m := new(rawWriteFile)
var err error
m.name = name
m.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return m, nil
}
func (m *rawWriteFile) Close(addMagic bool) error {
if addMagic {
if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
if _, err := m.f.WriteAt(magic, m.offset); err != nil {
return fmt.Errorf("close write %s magic error %s", m.name, err.Error())
}
} else {
if err := m.f.Truncate(m.offset); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
}
if err := m.f.Close(); err != nil {
return fmt.Errorf("close %s error %s", m.name, err.Error())
}
return nil
}
func (m *rawWriteFile) Sync() error {
return m.f.Sync()
}
func (m *rawWriteFile) Write(b []byte) (n int, err error) {
n, err = m.f.Write(b)
if err != nil {
return
} else if n != len(b) {
err = io.ErrShortWrite
return
}
m.offset += int64(n)
return
}
func (m *rawWriteFile) ReadAt(buf []byte, offset int64) (int, error) {
return m.f.ReadAt(buf, offset)
}
func (m *rawWriteFile) Truncate(size int64) error {
var err error
if err = m.f.Truncate(size); err != nil {
return err
}
if m.offset > size {
m.offset = size
}
return nil
}
func (m *rawWriteFile) SetOffset(o int64) {
m.offset = o
}
func (m *rawWriteFile) Offset() int64 {
return m.offset
}
func (m *rawWriteFile) Name() string {
return m.name
}
func (m *rawWriteFile) Size() int {
st, _ := m.f.Stat()
return int(st.Size())
}
type rawReadFile struct {
readFile
f *os.File
name string
}
func newRawReadFile(name string) (readFile, error) {
m := new(rawReadFile)
var err error
m.f, err = os.Open(name)
m.name = name
if err != nil {
return nil, err
}
return m, err
}
func (m *rawReadFile) Close() error {
return m.f.Close()
}
func (m *rawReadFile) Size() int {
st, _ := m.f.Stat()
return int(st.Size())
}
func (m *rawReadFile) ReadAt(b []byte, offset int64) (int, error) {
return m.f.ReadAt(b, offset)
}
func (m *rawReadFile) Name() string {
return m.name
}
/////////////////////////////////////////////////
type mmapWriteFile struct {
writeFile
f *os.File
m mmap.MMap
name string
size int64
offset int64
}
func newMmapWriteFile(name string, size int64) (writeFile, error) {
m := new(mmapWriteFile)
m.name = name
var err error
m.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
if size == 0 {
st, _ := m.f.Stat()
size = st.Size()
}
if err = m.f.Truncate(size); err != nil {
return nil, err
}
if m.m, err = mmap.Map(m.f, mmap.RDWR, 0); err != nil {
return nil, err
}
m.size = size
m.offset = 0
return m, nil
}
func (m *mmapWriteFile) Size() int {
return int(m.size)
}
func (m *mmapWriteFile) Sync() error {
return m.m.Flush()
}
func (m *mmapWriteFile) Close(addMagic bool) error {
if err := m.m.Unmap(); err != nil {
return fmt.Errorf("unmap %s error %s", m.name, err.Error())
}
if addMagic {
if err := m.f.Truncate(m.offset + int64(len(magic))); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
if _, err := m.f.WriteAt(magic, m.offset); err != nil {
return fmt.Errorf("close write %s magic error %s", m.name, err.Error())
}
} else {
if err := m.f.Truncate(m.offset); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
}
if err := m.f.Close(); err != nil {
return fmt.Errorf("close %s error %s", m.name, err.Error())
}
return nil
}
func (m *mmapWriteFile) Write(b []byte) (n int, err error) {
extra := int64(len(b)) - (m.size - m.offset)
if extra > 0 {
newSize := m.size + extra + m.size/10
println("need truncate ???", newSize, m.size, len(b))
if err = m.Truncate(newSize); err != nil {
return
}
m.size = newSize
}
n = copy(m.m[m.offset:], b)
if n != len(b) {
return 0, io.ErrShortWrite
}
m.offset += int64(len(b))
return len(b), nil
}
func (m *mmapWriteFile) ReadAt(buf []byte, offset int64) (int, error) {
if offset > m.offset {
return 0, fmt.Errorf("invalid offset %d", offset)
}
n := copy(buf, m.m[offset:m.offset])
if n != len(buf) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}
func (m *mmapWriteFile) Truncate(size int64) error {
var err error
if err = m.m.Unmap(); err != nil {
return err
}
if err = m.f.Truncate(size); err != nil {
return err
}
if m.m, err = mmap.Map(m.f, mmap.RDWR, 0); err != nil {
return err
}
m.size = size
if m.offset > m.size {
m.offset = m.size
}
return nil
}
func (m *mmapWriteFile) SetOffset(o int64) {
m.offset = o
}
func (m *mmapWriteFile) Offset() int64 {
return m.offset
}
func (m *mmapWriteFile) Name() string {
return m.name
}
type mmapReadFile struct {
readFile
f *os.File
m mmap.MMap
name string
}
func newMmapReadFile(name string) (readFile, error) {
m := new(mmapReadFile)
m.name = name
var err error
m.f, err = os.Open(name)
if err != nil {
return nil, err
}
m.m, err = mmap.Map(m.f, mmap.RDONLY, 0)
return m, err
}
func (m *mmapReadFile) ReadAt(buf []byte, offset int64) (int, error) {
if int64(offset) > int64(len(m.m)) {
return 0, fmt.Errorf("invalid offset %d", offset)
}
n := copy(buf, m.m[offset:])
if n != len(buf) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}
func (m *mmapReadFile) Close() error {
if m.m != nil {
if err := m.m.Unmap(); err != nil {
log.Error("unmap %s error %s", m.name, err.Error())
}
m.m = nil
}
if m.f != nil {
if err := m.f.Close(); err != nil {
log.Error("close %s error %s", m.name, err.Error())
}
m.f = nil
}
return nil
}
func (m *mmapReadFile) Size() int {
return len(m.m)
}
func (m *mmapReadFile) Name() string {
return m.name
}
/////////////////////////////////////
func newWriteFile(useMmap bool, name string, size int64) (writeFile, error) {
if useMmap {
return newMmapWriteFile(name, size)
} else {
return newRawWriteFile(name, size)
}
}
func newReadFile(useMmap bool, name string) (readFile, error) {
if useMmap {
return newMmapReadFile(name)
} else {
return newRawReadFile(name)
}
}

View File

@ -13,33 +13,30 @@ import (
)
const (
defaultMaxLogFileSize = int64(1024 * 1024 * 1024)
defaultMaxLogFileSize = int64(256 * 1024 * 1024)
//why 4G, we can use uint32 as offset, reduce memory useage
maxLogFileSize = int64(uint32(4*1024*1024*1024 - 1))
maxLogFileSize = int64(1024 * 1024 * 1024)
maxLogNumInFile = uint64(10000000)
defaultLogNumInFile = int64(1024 * 1024)
)
/*
File Store:
00000001.ldb
00000002.ldb
00000001.data
00000001.meta
00000002.data
00000002.meta
log: log1 data | log2 data | split data | log1 offset | log 2 offset | offset start pos | offset length | magic data
data: log1 data | log2 data | magic data
log id can not be 0, we use here for split data
if data has no magic data, it means that we don't close replication gracefully.
so we must repair the log data
log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data
split data = log0 data + [padding 0] -> file % pagesize() == 0
log0: id 0, create time 0, compression 0, data len 7, data "ledisdb"
meta: log1 offset | log2 offset
log offset: bigendian uint32 | bigendian uint32
offset start pos: bigendian uint64
offset length: bigendian uint32
//sha1 of github.com/siddontang/ledisdb 20 bytes
magic data = "\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17"
@ -270,6 +267,7 @@ func (s *FileStore) Close() error {
for i := range s.rs {
s.rs[i].Close()
}
s.rs = tableReaders{}
s.rm.Unlock()
@ -312,11 +310,16 @@ func (s *FileStore) checkTableReaders() {
func (s *FileStore) purgeTableReaders(purges []*tableReader) {
for _, r := range purges {
name := r.name
dataName := fmtTableDataName(r.base, r.index)
metaName := fmtTableMetaName(r.base, r.index)
r.Close()
if err := os.Remove(name); err != nil {
log.Error("purge table %s err: %s", name, err.Error())
if err := os.Remove(dataName); err != nil {
log.Error("purge table data %s err: %s", dataName, err.Error())
}
if err := os.Remove(metaName); err != nil {
log.Error("purge table meta %s err: %s", metaName, err.Error())
}
}
}
@ -331,7 +334,7 @@ func (s *FileStore) load() error {
var r *tableReader
var index int64
for _, f := range fs {
if _, err := fmt.Sscanf(f.Name(), "%08d.ldb", &index); err == nil {
if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil {
if r, err = newTableReader(s.base, index); err != nil {
log.Error("load table %s err: %s", f.Name(), err.Error())
} else {
@ -391,16 +394,16 @@ func (ts tableReaders) check() error {
index := ts[0].index
if first == 0 || first > last {
return fmt.Errorf("invalid log in table %s", ts[0].name)
return fmt.Errorf("invalid log in table %s", ts[0])
}
for i := 1; i < len(ts); i++ {
if ts[i].first <= last {
return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name)
return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i])
}
if ts[i].index <= index {
return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name)
return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i])
}
first = ts[i].first

View File

@ -1,64 +1,54 @@
package rpl
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/edsrzf/mmap-go"
"github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"github.com/siddontang/go/sync2"
"io"
"io/ioutil"
"os"
"path"
"reflect"
"sync"
"time"
)
var (
magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17")
log0 = Log{0, 1, 1, []byte("ledisdb")}
log0Data = []byte{}
errTableNeedFlush = errors.New("write table need flush")
errNilHandler = errors.New("nil write handler")
pageSize = int64(4096)
)
func init() {
log0Data, _ = log0.Marshal()
pageSize = int64(os.Getpagesize())
}
const tableReaderKeepaliveInterval int64 = 30
func fmtTableName(index int64) string {
return fmt.Sprintf("%08d.ldb", index)
func fmtTableDataName(base string, index int64) string {
return path.Join(base, fmt.Sprintf("%08d.data", index))
}
func fmtTableMetaName(base string, index int64) string {
return path.Join(base, fmt.Sprintf("%08d.meta", index))
}
//todo config
var useMmap = true
type tableReader struct {
sync.Mutex
name string
base string
index int64
f *os.File
m mmap.MMap
pf *os.File
data readFile
meta readFile
first uint64
last uint64
lastTime uint32
offsetStartPos int64
offsetLen uint32
lastReadTime sync2.AtomicInt64
useMmap bool
}
func newTableReader(base string, index int64) (*tableReader, error) {
@ -66,16 +56,19 @@ func newTableReader(base string, index int64) (*tableReader, error) {
return nil, fmt.Errorf("invalid index %d", index)
}
t := new(tableReader)
t.name = path.Join(base, fmtTableName(index))
t.base = base
t.index = index
//todo, use config
t.useMmap = useMmap
var err error
if err = t.check(); err != nil {
log.Error("check %s error: %s, try to repair", t.name, err.Error())
log.Error("check %d error: %s, try to repair", t.index, err.Error())
if err = t.repair(); err != nil {
log.Error("repair %s error: %s", t.name, err.Error())
log.Error("repair %d error: %s", t.index, err.Error())
return nil, err
}
}
@ -85,22 +78,27 @@ func newTableReader(base string, index int64) (*tableReader, error) {
return t, nil
}
func (t *tableReader) String() string {
return fmt.Sprintf("%d", t.index)
}
func (t *tableReader) Close() {
t.Lock()
defer t.Unlock()
t.close()
t.Unlock()
}
func (t *tableReader) close() {
if t.m != nil {
t.m.Unmap()
t.m = nil
if t.data != nil {
t.data.Close()
t.data = nil
}
if t.f != nil {
t.f.Close()
t.f = nil
if t.meta != nil {
t.meta.Close()
t.meta = nil
}
}
@ -114,96 +112,78 @@ func (t *tableReader) Keepalived() bool {
}
func (t *tableReader) getLogPos(index int) (uint32, error) {
// if _, err := t.pf.Seek(t.offsetStartPos+int64(index*4), os.SEEK_SET); err != nil {
// return 0, err
// }
var buf [4]byte
if _, err := t.meta.ReadAt(buf[0:4], int64(index)*4); err != nil {
return 0, err
}
// var pos uint32
// if err := binary.Read(t.pf, binary.BigEndian, &pos); err != nil {
// return 0, err
// }
// return pos, nil
return binary.BigEndian.Uint32(buf[0:4]), nil
}
return binary.BigEndian.Uint32(t.m[index*4:]), nil
func (t *tableReader) checkData() error {
var err error
if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil {
return err
}
if t.data.Size() < len(magic) {
return fmt.Errorf("data file %s size %d too short", t.data.Name(), t.data.Size())
}
buf := make([]byte, len(magic))
if _, err := t.data.ReadAt(buf, int64(t.data.Size()-len(magic))); err != nil {
return err
}
if !bytes.Equal(magic, buf) {
return fmt.Errorf("data file %s invalid magic data %q", t.data.Name(), buf)
}
return nil
}
func (t *tableReader) checkMeta() error {
var err error
if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil {
return err
}
if t.meta.Size()%4 != 0 || t.meta.Size() == 0 {
return fmt.Errorf("meta file %s invalid offset len %d, must 4 multiple and not 0", t.meta.Name(), t.meta.Size())
}
return nil
}
func (t *tableReader) check() error {
var err error
if t.f, err = os.Open(t.name); err != nil {
if err := t.checkMeta(); err != nil {
return err
}
st, _ := t.f.Stat()
if st.Size() < 32 {
return fmt.Errorf("file size %d too short", st.Size())
}
var pos int64
if pos, err = t.f.Seek(-32, os.SEEK_END); err != nil {
return err
}
if err = binary.Read(t.f, binary.BigEndian, &t.offsetStartPos); err != nil {
return err
} else if t.offsetStartPos >= st.Size() {
return fmt.Errorf("invalid offset start pos %d, file size %d", t.offsetStartPos, st.Size())
} else if t.offsetStartPos%pageSize != 0 {
return fmt.Errorf("invalid offset start pos %d, must page size %d multi", t.offsetStartPos, pageSize)
}
if err = binary.Read(t.f, binary.BigEndian, &t.offsetLen); err != nil {
return err
} else if int64(t.offsetLen) >= st.Size() || t.offsetLen == 0 {
return fmt.Errorf("invalid offset len %d, file size %d", t.offsetLen, st.Size())
} else if t.offsetLen%4 != 0 {
return fmt.Errorf("invalid offset len %d, must 4 multiple", t.offsetLen)
}
if t.offsetStartPos+int64(t.offsetLen) != pos {
return fmt.Errorf("invalid offset %d %d", t.offsetStartPos, t.offsetLen)
}
b := make([]byte, 20)
if _, err = t.f.Read(b); err != nil {
return err
} else if !bytes.Equal(b, magic) {
return fmt.Errorf("invalid magic data %q", b)
}
if t.m, err = mmap.MapRegion(t.f, int(t.offsetLen), mmap.RDONLY, 0, t.offsetStartPos); err != nil {
if err := t.checkData(); err != nil {
return err
}
firstLogPos, _ := t.getLogPos(0)
lastLogPos, _ := t.getLogPos(int(t.offsetLen/4 - 1))
lastLogPos, _ := t.getLogPos(t.meta.Size()/4 - 1)
if firstLogPos != 0 {
return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos)
} else if int64(lastLogPos) > t.offsetStartPos {
return fmt.Errorf("invalid last log pos %d", lastLogPos)
}
var l Log
if _, err = t.decodeLogHead(&l, int64(firstLogPos)); err != nil {
if _, err = t.decodeLogHead(&l, t.data, int64(firstLogPos)); err != nil {
return fmt.Errorf("decode first log err %s", err.Error())
}
t.first = l.ID
var n int64
if n, err = t.decodeLogHead(&l, int64(lastLogPos)); err != nil {
if n, err = t.decodeLogHead(&l, t.data, int64(lastLogPos)); err != nil {
return fmt.Errorf("decode last log err %s", err.Error())
} else {
var l0 Log
if _, err := t.f.Seek(n, os.SEEK_SET); err != nil {
return fmt.Errorf("seek logo err %s", err.Error())
} else if err = l0.Decode(t.f); err != nil {
println(lastLogPos, n, l0.ID, l0.CreateTime, l0.Compression)
return fmt.Errorf("decode log0 err %s", err.Error())
} else if !reflect.DeepEqual(l0, log0) {
return fmt.Errorf("invalid log0 %#v != %#v", l0, log0)
}
} else if n+int64(len(magic)) != int64(t.data.Size()) {
return fmt.Errorf("extra log data at offset %d", n)
}
t.last = l.ID
@ -211,8 +191,8 @@ func (t *tableReader) check() error {
if t.first > t.last {
return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last)
} else if (t.last - t.first + 1) != uint64(t.offsetLen/4) {
return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.offsetLen/4)
} else if (t.last - t.first + 1) != uint64(t.meta.Size()/4) {
return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.meta.Size()/4)
}
return nil
@ -222,86 +202,73 @@ func (t *tableReader) repair() error {
t.close()
var err error
if t.f, err = os.Open(t.name); err != nil {
return err
}
var data writeFile
var meta writeFile
defer t.close()
data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), 0)
data.SetOffset(int64(data.Size()))
st, _ := t.f.Stat()
size := st.Size()
if size == 0 {
return fmt.Errorf("empty file, can not repaired")
}
tw := newTableWriter(path.Dir(t.name), t.index, maxLogFileSize)
tmpName := tw.name + ".tmp"
tw.name = tmpName
os.Remove(tmpName)
defer func() {
tw.Close()
os.Remove(tmpName)
}()
meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4))
var l Log
var pos int64 = 0
var nextPos int64 = 0
b := make([]byte, 4)
t.first = 0
t.last = 0
for {
lastPos, _ := t.f.Seek(0, os.SEEK_CUR)
if lastPos == size {
//no data anymore, we can not read log0
//we may meet the log missing risk but have no way
log.Error("no more data, maybe missing some logs, use your own risk!!!")
nextPos, err = t.decodeLogHead(&l, data, pos)
if err != nil {
//if error, we may lost all logs from pos
log.Error("%s may lost logs from %d", data.Name(), pos)
break
}
if err := l.Decode(t.f); err != nil {
return err
}
if l.ID == 0 {
log.Error("%s may lost logs from %d, invalid log 0", data.Name(), pos)
break
}
if t.first == 0 {
t.first = l.ID
}
if t.last == 0 {
t.last = l.ID
} else if l.ID <= t.last {
log.Error("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID)
break
}
t.last = l.ID
t.lastTime = l.CreateTime
if err := tw.StoreLog(&l); err != nil {
return err
}
binary.BigEndian.PutUint32(b, uint32(pos))
meta.Write(b)
pos = nextPos
t.lastTime = l.CreateTime
}
t.close()
var e error
if err := meta.Close(false); err != nil {
e = err
}
var tr *tableReader
if tr, err = tw.Flush(); err != nil {
data.SetOffset(pos)
if err = data.Close(true); err != nil {
return err
}
t.first = tr.first
t.last = tr.last
t.offsetStartPos = tr.offsetStartPos
t.offsetLen = tr.offsetLen
defer tr.Close()
os.Remove(t.name)
if err := os.Rename(tmpName, t.name); err != nil {
return err
}
return nil
return e
}
func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) {
_, err := t.f.Seek(int64(pos), os.SEEK_SET)
if err != nil {
return 0, err
}
dataLen, err := l.DecodeHead(t.f)
func (t *tableReader) decodeLogHead(l *Log, r io.ReaderAt, pos int64) (int64, error) {
dataLen, err := l.DecodeHeadAt(r, pos)
if err != nil {
return 0, err
}
@ -317,23 +284,20 @@ func (t *tableReader) GetLog(id uint64, l *Log) error {
t.lastReadTime.Set(time.Now().Unix())
t.Lock()
defer t.Unlock()
if err := t.openTable(); err != nil {
t.close()
t.Unlock()
return err
}
t.Unlock()
pos, err := t.getLogPos(int(id - t.first))
if err != nil {
return err
}
if _, err := t.f.Seek(int64(pos), os.SEEK_SET); err != nil {
return err
}
if err := l.Decode(t.f); err != nil {
if err := l.DecodeAt(t.data, int64(pos)); err != nil {
return err
} else if l.ID != id {
return fmt.Errorf("invalid log id %d != %d", l.ID, id)
@ -344,16 +308,17 @@ func (t *tableReader) GetLog(id uint64, l *Log) error {
func (t *tableReader) openTable() error {
var err error
if t.f == nil {
if t.f, err = os.Open(t.name); err != nil {
if t.data == nil {
if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil {
return err
}
}
if t.m == nil {
if t.m, err = mmap.MapRegion(t.f, int(t.offsetLen), mmap.RDONLY, 0, t.offsetStartPos); err != nil {
if t.meta == nil {
if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil {
return err
}
}
return nil
@ -362,31 +327,25 @@ func (t *tableReader) openTable() error {
type tableWriter struct {
sync.RWMutex
wf *os.File
rf *os.File
wb *bufio.Writer
rm sync.Mutex
data writeFile
meta writeFile
base string
name string
index int64
first uint64
last uint64
offsetPos int64
offsetBuf []byte
first uint64
last uint64
lastTime uint32
maxLogSize int64
closed bool
syncType int
lastTime uint32
// cache *logLRUCache
posBuf []byte
useMmap bool
}
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
@ -397,23 +356,24 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
t := new(tableWriter)
t.base = base
t.name = path.Join(base, fmtTableName(index))
t.index = index
t.offsetPos = 0
t.maxLogSize = maxLogSize
//maybe config later?
t.wb = bufio.NewWriterSize(ioutil.Discard, 4096)
t.closed = false
//maybe use config later
// t.cache = newLogLRUCache(1024*1024, 1000)
t.posBuf = make([]byte, 4)
//todo, use config
t.useMmap = useMmap
return t
}
func (t *tableWriter) String() string {
return fmt.Sprintf("%d", t.index)
}
func (t *tableWriter) SetMaxLogSize(s int64) {
t.maxLogSize = s
}
@ -423,26 +383,27 @@ func (t *tableWriter) SetSyncType(tp int) {
}
func (t *tableWriter) close() {
if t.rf != nil {
t.rf.Close()
t.rf = nil
if t.meta != nil {
if err := t.meta.Close(false); err != nil {
log.Fatal("close log meta error %s", err.Error())
}
t.meta = nil
}
if t.wf != nil {
t.wf.Close()
t.wf = nil
if t.data != nil {
if err := t.data.Close(true); err != nil {
log.Fatal("close log data error %s", err.Error())
}
t.data = nil
}
t.wb.Reset(ioutil.Discard)
}
func (t *tableWriter) Close() {
t.Lock()
defer t.Unlock()
t.closed = true
t.close()
t.Unlock()
}
func (t *tableWriter) First() uint64 {
@ -459,88 +420,31 @@ func (t *tableWriter) Last() uint64 {
return id
}
func (t *tableWriter) reset() {
func (t *tableWriter) Flush() (*tableReader, error) {
t.Lock()
if t.data == nil || t.meta == nil {
t.Unlock()
return nil, errNilHandler
}
tr := new(tableReader)
tr.base = t.base
tr.index = t.index
tr.first = t.first
tr.last = t.last
tr.lastTime = t.lastTime
//todo config
tr.useMmap = useMmap
t.close()
t.first = 0
t.last = 0
t.index = t.index + 1
t.name = path.Join(t.base, fmtTableName(t.index))
t.offsetBuf = t.offsetBuf[0:0]
t.offsetPos = 0
// t.cache.Reset()
}
func (t *tableWriter) Flush() (*tableReader, error) {
t.Lock()
defer t.Unlock()
if t.wf == nil {
return nil, errNilHandler
}
defer t.reset()
tr := new(tableReader)
tr.name = t.name
tr.index = t.index
st, _ := t.wf.Stat()
tr.first = t.first
tr.last = t.last
if n, err := t.wf.Write(log0Data); err != nil {
return nil, fmt.Errorf("flush log0data error %s", err.Error())
} else if n != len(log0Data) {
return nil, fmt.Errorf("flush log0data only %d != %d", n, len(log0Data))
}
st, _ = t.wf.Stat()
if m := st.Size() % pageSize; m != 0 {
padding := pageSize - m
if n, err := t.wf.Write(make([]byte, padding)); err != nil {
return nil, fmt.Errorf("flush log padding error %s", err.Error())
} else if n != int(padding) {
return nil, fmt.Errorf("flush log padding error %d != %d", n, padding)
}
}
st, _ = t.wf.Stat()
if st.Size()%pageSize != 0 {
return nil, fmt.Errorf("invalid offset start pos, %d", st.Size())
}
tr.offsetStartPos = st.Size()
tr.offsetLen = uint32(len(t.offsetBuf))
if n, err := t.wf.Write(t.offsetBuf); err != nil {
log.Error("flush offset buffer error %s", err.Error())
return nil, err
} else if n != len(t.offsetBuf) {
log.Error("flush offset buffer only %d != %d", n, len(t.offsetBuf))
return nil, io.ErrShortWrite
}
if err := binary.Write(t.wf, binary.BigEndian, tr.offsetStartPos); err != nil {
log.Error("flush offset start pos error %s", err.Error())
return nil, err
}
if err := binary.Write(t.wf, binary.BigEndian, tr.offsetLen); err != nil {
log.Error("flush offset len error %s", err.Error())
return nil, err
}
if n, err := t.wf.Write(magic); err != nil {
log.Error("flush magic data error %s", err.Error())
return nil, err
} else if n != len(magic) {
log.Error("flush magic data only %d != %d", n, len(magic))
return nil, io.ErrShortWrite
}
t.Unlock()
return tr, nil
}
@ -553,6 +457,22 @@ func (t *tableWriter) StoreLog(l *Log) error {
return err
}
func (t *tableWriter) openFile() error {
var err error
if t.data == nil {
if t.data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), t.maxLogSize+t.maxLogSize/10+int64(len(magic))); err != nil {
return err
}
}
if t.meta == nil {
if t.meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)); err != nil {
return err
}
}
return err
}
func (t *tableWriter) storeLog(l *Log) error {
if l.ID == 0 {
return ErrStoreLogID
@ -566,63 +486,34 @@ func (t *tableWriter) storeLog(l *Log) error {
return ErrStoreLogID
}
if t.last-t.first+1 > maxLogNumInFile {
if t.data != nil && t.data.Offset() > t.maxLogSize {
return errTableNeedFlush
}
var err error
if t.wf == nil {
if t.wf, err = os.OpenFile(t.name, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
return err
}
t.wb.Reset(t.wf)
}
if t.offsetBuf == nil {
t.offsetBuf = make([]byte, 0, maxLogNumInFile*4)
}
// st, _ := t.wf.Stat()
// if st.Size() >= t.maxLogSize {
// return errTableNeedFlush
// }
if t.offsetPos >= t.maxLogSize {
return errTableNeedFlush
}
offsetPos := t.offsetPos
if err = l.Encode(t.wb); err != nil {
if err = t.openFile(); err != nil {
return err
}
if err = t.wb.Flush(); err != nil {
offsetPos := t.data.Offset()
if err = l.Encode(t.data); err != nil {
return err
}
// buf, _ := l.Marshal()
// if n, err := t.wf.Write(buf); err != nil {
// return err
// } else if n != len(buf) {
// return io.ErrShortWrite
// }
binary.BigEndian.PutUint32(t.posBuf, uint32(offsetPos))
if _, err = t.meta.Write(t.posBuf); err != nil {
return err
}
t.offsetPos += int64(l.Size())
t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(uint32(offsetPos))...)
if t.first == 0 {
t.first = l.ID
}
t.last = l.ID
t.lastTime = l.CreateTime
// t.cache.Set(l.ID, buf)
if t.syncType == 2 {
if err := t.wf.Sync(); err != nil {
if err := t.data.Sync(); err != nil {
log.Error("sync table error %s", err.Error())
}
}
@ -638,17 +529,14 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
return ErrLogNotFound
}
// if cl := t.cache.Get(id); cl != nil {
// if err := l.Unmarshal(cl); err == nil && l.ID == id {
// return nil
// } else {
// t.cache.Delete(id)
// }
// }
var buf [4]byte
if _, err := t.meta.ReadAt(buf[0:4], int64((id-t.first)*4)); err != nil {
return err
}
offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:])
offset := binary.BigEndian.Uint32(buf[0:4])
if err := t.getLog(l, int64(offset)); err != nil {
if err := l.DecodeAt(t.data, int64(offset)); err != nil {
return err
} else if l.ID != id {
return fmt.Errorf("invalid log id %d != %d", id, l.ID)
@ -661,32 +549,17 @@ func (t *tableWriter) Sync() error {
t.Lock()
var err error
if t.wf != nil {
err = t.wf.Sync()
if t.data != nil {
err = t.data.Sync()
t.Unlock()
return err
}
if t.meta != nil {
err = t.meta.Sync()
}
t.Unlock()
return err
}
func (t *tableWriter) getLog(l *Log, pos int64) error {
t.rm.Lock()
defer t.rm.Unlock()
var err error
if t.rf == nil {
if t.rf, err = os.Open(t.name); err != nil {
return err
}
}
if _, err = t.rf.Seek(pos, os.SEEK_SET); err != nil {
return err
}
if err = l.Decode(t.rf); err != nil {
return err
}
return nil
}

View File

@ -10,6 +10,17 @@ import (
)
func TestFileTable(t *testing.T) {
useMmap = true
testFileTable(t)
useMmap = false
testFileTable(t)
useMmap = true
}
func testFileTable(t *testing.T) {
log.SetLevel(log.LevelInfo)
base, err := ioutil.TempDir("", "test_table")
if err != nil {
t.Fatal(err)
@ -50,10 +61,6 @@ func TestFileTable(t *testing.T) {
var ll Log
if err = ll.Unmarshal(log0Data); err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
if err := w.GetLog(uint64(i+1), &ll); err != nil {
t.Fatal(err)
@ -70,7 +77,7 @@ func TestFileTable(t *testing.T) {
var r *tableReader
name := w.name
name := fmtTableDataName(w.base, w.index)
if r, err = w.Flush(); err != nil {
t.Fatal(err)
@ -130,26 +137,19 @@ func TestFileTable(t *testing.T) {
t.Fatal("must nil")
}
st, _ := r.f.Stat()
s := st.Size()
s := int64(r.data.Size())
r.Close()
log.SetLevel(log.LevelFatal)
testRepair(t, name, 1, s, 11)
testRepair(t, name, 1, s, 32)
testRepair(t, name, 1, s, 42)
testRepair(t, name, 1, s, 72)
testRepair(t, name, 1, s, 20)
if err := os.Truncate(name, s-(73+4096)); err != nil {
if err := os.Truncate(name, s-21); err != nil {
t.Fatal(err)
}
if r, err = newTableReader(base, 1); err == nil {
t.Fatal("can not repair")
}
if r, err := w.Flush(); err != nil {
t.Fatal(err)
} else {
@ -159,7 +159,7 @@ func TestFileTable(t *testing.T) {
if r, err = newTableReader(base, 2); err != nil {
t.Fatal(err)
}
defer r.Close()
r.Close()
}
func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) {
@ -178,7 +178,7 @@ func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64)
var ll Log
for i := 0; i < 10; i++ {
if err := r.GetLog(uint64(i+1), &ll); err != nil {
t.Fatal(err)
t.Fatal(err, i)
} else if len(ll.Data) != 4096 {
t.Fatal(len(ll.Data))
} else if ll.Data[0] != byte(i+1) {
@ -190,9 +190,8 @@ func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64)
t.Fatal("must nil")
}
st, _ := r.f.Stat()
if s != st.Size() {
t.Fatalf("repair error size %d != %d", s, st.Size())
if s != int64(r.data.Size()) {
t.Fatalf("repair error size %d != %d", s, r.data.Size())
}
}

View File

@ -81,13 +81,8 @@ func (l *Log) Decode(r io.Reader) error {
return err
}
l.Data = l.Data[0:0]
l.growData(int(length))
if cap(l.Data) >= int(length) {
l.Data = l.Data[0:length]
} else {
l.Data = make([]byte, length)
}
if _, err := io.ReadFull(r, l.Data); err != nil {
return err
}
@ -103,6 +98,60 @@ func (l *Log) DecodeHead(r io.Reader) (uint32, error) {
return 0, err
}
length := l.decodeHeadBuf(buf)
headPool.Put(buf)
return length, nil
}
func (l *Log) DecodeAt(r io.ReaderAt, pos int64) error {
length, err := l.DecodeHeadAt(r, pos)
if err != nil {
return err
}
l.growData(int(length))
var n int
n, err = r.ReadAt(l.Data, pos+int64(LogHeadSize))
if err == io.EOF && n == len(l.Data) {
err = nil
}
return err
}
func (l *Log) growData(length int) {
l.Data = l.Data[0:0]
if cap(l.Data) >= length {
l.Data = l.Data[0:length]
} else {
l.Data = make([]byte, length)
}
}
func (l *Log) DecodeHeadAt(r io.ReaderAt, pos int64) (uint32, error) {
buf := headPool.Get().([]byte)
n, err := r.ReadAt(buf, pos)
if err != nil && err != io.EOF {
headPool.Put(buf)
return 0, err
}
length := l.decodeHeadBuf(buf)
headPool.Put(buf)
if err == io.EOF && (length != 0 || n != len(buf)) {
return 0, err
}
return length, nil
}
func (l *Log) decodeHeadBuf(buf []byte) uint32 {
pos := 0
l.ID = binary.BigEndian.Uint64(buf[pos:])
pos += 8
@ -114,8 +163,5 @@ func (l *Log) DecodeHead(r io.Reader) (uint32, error) {
pos++
length := binary.BigEndian.Uint32(buf[pos:])
headPool.Put(buf)
return length, nil
return length
}

View File

@ -1,95 +0,0 @@
package rpl
import (
"container/list"
"encoding/binary"
)
type logLRUCache struct {
itemsList *list.List
itemsMap map[uint64]*list.Element
size int
capability int
maxNum int
}
func newLogLRUCache(capability int, maxNum int) *logLRUCache {
if capability <= 0 {
capability = 1024 * 1024
}
if maxNum <= 0 {
maxNum = 16
}
return &logLRUCache{
itemsList: list.New(),
itemsMap: make(map[uint64]*list.Element),
size: 0,
capability: capability,
maxNum: maxNum,
}
}
func (cache *logLRUCache) Set(id uint64, data []byte) {
elem, ok := cache.itemsMap[id]
if ok {
//we may not enter here
// item already exists, so move it to the front of the list and update the data
cache.itemsList.MoveToFront(elem)
ol := elem.Value.([]byte)
elem.Value = data
cache.size += (len(data) - len(ol))
} else {
cache.size += len(data)
// item doesn't exist, so add it to front of list
elem = cache.itemsList.PushFront(data)
cache.itemsMap[id] = elem
}
// evict LRU entry if the cache is full
for cache.size > cache.capability || cache.itemsList.Len() > cache.maxNum {
removedElem := cache.itemsList.Back()
l := removedElem.Value.([]byte)
cache.itemsList.Remove(removedElem)
delete(cache.itemsMap, binary.BigEndian.Uint64(l[0:8]))
cache.size -= len(l)
if cache.size <= 0 {
cache.size = 0
}
}
}
func (cache *logLRUCache) Get(id uint64) []byte {
elem, ok := cache.itemsMap[id]
if !ok {
return nil
}
// item exists, so move it to front of list and return it
cache.itemsList.MoveToFront(elem)
l := elem.Value.([]byte)
return l
}
func (cache *logLRUCache) Delete(id uint64) {
elem, ok := cache.itemsMap[id]
if !ok {
return
}
cache.itemsList.Remove(elem)
delete(cache.itemsMap, id)
}
func (cache *logLRUCache) Len() int {
return cache.itemsList.Len()
}
func (cache *logLRUCache) Reset() {
cache.itemsList = list.New()
cache.itemsMap = make(map[uint64]*list.Element)
cache.size = 0
}

View File

@ -1,48 +0,0 @@
package rpl
import (
"testing"
)
func TestLogLRUCache(t *testing.T) {
c := newLogLRUCache(180, 10)
var i uint64
for i = 1; i <= 10; i++ {
l := &Log{i, 0, 0, []byte("0")}
b, _ := l.Marshal()
c.Set(l.ID, b)
}
for i = 1; i <= 10; i++ {
if l := c.Get(i); l == nil {
t.Fatal("must exist", i)
}
}
for i = 11; i <= 20; i++ {
l := &Log{i, 0, 0, []byte("0")}
b, _ := l.Marshal()
c.Set(l.ID, b)
}
for i = 1; i <= 10; i++ {
if l := c.Get(i); l != nil {
t.Fatal("must not exist", i)
}
}
c.Get(11)
l := &Log{21, 0, 0, []byte("0")}
b, _ := l.Marshal()
c.Set(l.ID, b)
if l := c.Get(12); l != nil {
t.Fatal("must nil", 12)
}
if l := c.Get(11); l == nil {
t.Fatal("must not nil", 11)
}
}