update replication

This commit is contained in:
siddontang 2014-11-07 16:35:54 +08:00
parent 137f85dff3
commit 34d485056f
7 changed files with 270 additions and 50 deletions

View File

@ -74,6 +74,8 @@ type ReplicationConfig struct {
WaitSyncTime int `toml:"wait_sync_time"` WaitSyncTime int `toml:"wait_sync_time"`
WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"` WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"`
ExpiredLogDays int `toml:"expired_log_days"` ExpiredLogDays int `toml:"expired_log_days"`
StoreName string `toml:"store_name"`
MaxLogFileSize int64 `toml:"max_log_file_size"`
SyncLog int `toml:"sync_log"` SyncLog int `toml:"sync_log"`
Compression bool `toml:"compression"` Compression bool `toml:"compression"`
} }

View File

@ -121,9 +121,16 @@ wait_sync_time = 500
# If 0, wait (n + 1) / 2 acks. # If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2 wait_max_slave_acks = 2
# store name: file, goleveldb
# change in runtime is very dangerous
store_name = "file"
# Expire write ahead logs after the given days # Expire write ahead logs after the given days
expired_log_days = 7 expired_log_days = 7
# for file store, if 0, use default 1G, max is 4G
max_log_file_size = 0
# Sync log to disk if possible # Sync log to disk if possible
# 0: no sync # 0: no sync
# 1: sync every second # 1: sync every second

View File

@ -60,7 +60,7 @@ type FileStore struct {
w *tableWriter w *tableWriter
} }
func NewFileStore(base string, maxSize int64) (*FileStore, error) { func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) {
s := new(FileStore) s := new(FileStore)
var err error var err error
@ -72,6 +72,9 @@ func NewFileStore(base string, maxSize int64) (*FileStore, error) {
s.base = base s.base = base
s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize) s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize)
if s.maxFileSize == 0 {
s.maxFileSize = defaultMaxLogFileSize
}
if err = s.load(); err != nil { if err = s.load(); err != nil {
return nil, err return nil, err
@ -83,30 +86,75 @@ func NewFileStore(base string, maxSize int64) (*FileStore, error) {
} }
s.w = newTableWriter(s.base, index, s.maxFileSize) s.w = newTableWriter(s.base, index, s.maxFileSize)
s.w.SetSyncType(syncType)
return s, nil return s, nil
} }
func (s *FileStore) GetLog(id uint64, log *Log) error { func (s *FileStore) GetLog(id uint64, l *Log) error {
panic("not implementation") //first search in table writer
if err := s.w.GetLog(id, l); err == nil {
return nil return nil
} else if err != ErrLogNotFound {
return err
}
s.rm.RLock()
t := s.rs.Search(id)
if t == nil {
s.rm.RUnlock()
return ErrLogNotFound
}
err := t.GetLog(id, l)
s.rm.RUnlock()
return err
} }
func (s *FileStore) FirstID() (uint64, error) { func (s *FileStore) FirstID() (uint64, error) {
return 0, nil id := uint64(0)
s.rm.RLock()
if len(s.rs) > 0 {
id = s.rs[0].first
} else {
id = 0
}
s.rm.RUnlock()
if id > 0 {
return id, nil
}
//if id = 0,
return s.w.First(), nil
} }
func (s *FileStore) LastID() (uint64, error) { func (s *FileStore) LastID() (uint64, error) {
return 0, nil id := s.w.Last()
if id > 0 {
return id, nil
}
//if table writer has no last id, we may find in the last table reader
s.rm.RLock()
if len(s.rs) > 0 {
id = s.rs[len(s.rs)-1].last
}
s.rm.RUnlock()
return id, nil
} }
func (s *FileStore) StoreLog(l *Log) error { func (s *FileStore) StoreLog(l *Log) error {
s.wm.Lock() s.wm.Lock()
defer s.wm.Unlock() defer s.wm.Unlock()
if s.w == nil {
return fmt.Errorf("nil table writer, cannot store")
}
err := s.w.StoreLog(l) err := s.w.StoreLog(l)
if err == nil { if err == nil {
return nil return nil
@ -114,40 +162,40 @@ func (s *FileStore) StoreLog(l *Log) error {
return err return err
} }
s.rm.Lock()
var r *tableReader var r *tableReader
if r, err = s.w.Flush(); err != nil { if r, err = s.w.Flush(); err != nil {
log.Error("write table flush error %s, can not store now", err.Error()) log.Error("write table flush error %s, can not store now", err.Error())
s.w.Close() s.w.Close()
s.w = nil
s.rm.Unlock()
return err return err
} }
s.rm.Lock()
s.rs = append(s.rs, r) s.rs = append(s.rs, r)
s.rm.Unlock() s.rm.Unlock()
return nil return s.w.StoreLog(l)
} }
func (s *FileStore) PuregeExpired(n int64) error { func (s *FileStore) PuregeExpired(n int64) error {
s.rm.Lock() s.rm.Lock()
purges := []*tableReader{} purges := []*tableReader{}
lefts := []*tableReader{}
t := uint32(time.Now().Unix() - int64(n)) t := uint32(time.Now().Unix() - int64(n))
for _, r := range s.rs { for i, r := range s.rs {
if r.lastTime < t { if r.lastTime > t {
purges = append(purges, r) purges = s.rs[0:i]
} else { s.rs = s.rs[i:]
lefts = append(lefts, r) break
} }
} }
s.rs = lefts
s.rm.Unlock() s.rm.Unlock()
for _, r := range purges { for _, r := range purges {
@ -162,31 +210,53 @@ func (s *FileStore) PuregeExpired(n int64) error {
} }
func (s *FileStore) Clear() error { func (s *FileStore) Clear() error {
s.wm.Lock()
s.rm.Lock()
defer func() {
s.rm.Unlock()
s.wm.Unlock()
}()
s.w.Close()
for i := range s.rs {
s.rs[i].Close()
}
s.rs = tableReaders{}
if err := os.RemoveAll(s.base); err != nil {
return err
}
if err := os.MkdirAll(s.base, 0755); err != nil {
return err
}
s.w = newTableWriter(s.base, 1, s.maxFileSize)
return nil return nil
} }
func (s *FileStore) Close() error { func (s *FileStore) Close() error {
s.wm.Lock() s.wm.Lock()
if s.w != nil { s.rm.Lock()
if r, err := s.w.Flush(); err != nil { if r, err := s.w.Flush(); err != nil {
log.Error("close err: %s", err.Error()) log.Error("close err: %s", err.Error())
} else { } else {
r.Close() r.Close()
s.w.Close() s.w.Close()
s.w = nil
} }
}
s.wm.Unlock()
s.rm.Lock()
for i := range s.rs { for i := range s.rs {
s.rs[i].Close() s.rs[i].Close()
} }
s.rs = nil s.rs = tableReaders{}
s.rm.Unlock() s.rm.Unlock()
s.wm.Unlock()
return nil return nil
} }
@ -227,19 +297,25 @@ func (ts tableReaders) Swap(i, j int) {
} }
func (ts tableReaders) Less(i, j int) bool { func (ts tableReaders) Less(i, j int) bool {
return ts[i].index < ts[j].index return ts[i].first < ts[j].first
} }
func (ts tableReaders) Search(id uint64) *tableReader { func (ts tableReaders) Search(id uint64) *tableReader {
n := sort.Search(len(ts), func(i int) bool { i, j := 0, len(ts)-1
return id >= ts[i].first && id <= ts[i].last
})
if n < len(ts) { for i <= j {
return ts[n] h := i + (j-i)/2
if ts[h].first <= id && id <= ts[h].last {
return ts[h]
} else if ts[h].last < id {
i = h + 1
} else { } else {
return nil j = h - 1
} }
}
return nil
} }
func (ts tableReaders) check() error { func (ts tableReaders) check() error {
@ -262,7 +338,7 @@ func (ts tableReaders) check() error {
return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name) return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name)
} }
if ts[i].index == index { if ts[i].index <= index {
return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name) return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name)
} }

View File

@ -293,7 +293,7 @@ func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) {
func (t *tableReader) GetLog(id uint64, l *Log) error { func (t *tableReader) GetLog(id uint64, l *Log) error {
if id < t.first || id > t.last { if id < t.first || id > t.last {
return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) return ErrLogNotFound
} }
t.lastReadTime.Set(time.Now().Unix()) t.lastReadTime.Set(time.Now().Unix())
@ -359,6 +359,11 @@ type tableWriter struct {
offsetBuf []byte offsetBuf []byte
maxLogSize int64 maxLogSize int64
closed bool
syncType int
lastTime uint32
} }
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
@ -374,9 +379,19 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
t.maxLogSize = maxLogSize t.maxLogSize = maxLogSize
t.closed = false
return t return t
} }
func (t *tableWriter) SetMaxLogSize(s int64) {
t.maxLogSize = s
}
func (t *tableWriter) SetSyncType(tp int) {
t.syncType = tp
}
func (t *tableWriter) close() { func (t *tableWriter) close() {
if t.rf != nil { if t.rf != nil {
t.rf.Close() t.rf.Close()
@ -393,6 +408,8 @@ func (t *tableWriter) Close() {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
t.closed = true
t.close() t.close()
} }
@ -502,6 +519,10 @@ func (t *tableWriter) StoreLog(l *Log) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
if t.closed {
return fmt.Errorf("table writer is closed")
}
if t.last > 0 && l.ID != t.last+1 { if t.last > 0 && l.ID != t.last+1 {
return ErrStoreLogID return ErrStoreLogID
} }
@ -539,8 +560,16 @@ func (t *tableWriter) StoreLog(l *Log) error {
t.last = l.ID t.last = l.ID
t.lastTime = l.CreateTime
//todo add LRU cache //todo add LRU cache
if t.syncType == 2 || (t.syncType == 1 && time.Now().Unix()-int64(t.lastTime) > 1) {
if err := t.wf.Sync(); err != nil {
log.Error("sync table error %s", err.Error())
}
}
return nil return nil
} }
@ -549,7 +578,7 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
defer t.RUnlock() defer t.RUnlock()
if id < t.first || id > t.last { if id < t.first || id > t.last {
return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) return ErrLogNotFound
} }
//todo memory cache //todo memory cache

View File

@ -51,9 +51,17 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
r.cfg = cfg r.cfg = cfg
var err error var err error
switch cfg.Replication.StoreName {
case "goleveldb":
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil { if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err return nil, err
} }
default:
if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg.Replication.MaxLogFileSize, cfg.Replication.SyncLog); err != nil {
return nil, err
}
}
if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil { if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, err return nil, err

View File

@ -24,6 +24,24 @@ func TestGoLevelDBStore(t *testing.T) {
testLogs(t, l) testLogs(t, l)
} }
func TestFileStore(t *testing.T) {
// Create a test dir
dir, err := ioutil.TempDir("", "ldb")
if err != nil {
t.Fatalf("err: %v ", err)
}
defer os.RemoveAll(dir)
// New level
l, err := NewFileStore(dir, 4096, 0)
if err != nil {
t.Fatalf("err: %v ", err)
}
defer l.Close()
testLogs(t, l)
}
func testLogs(t *testing.T, l LogStore) { func testLogs(t *testing.T, l LogStore) {
// Should be no first index // Should be no first index
idx, err := l.FirstID() idx, err := l.FirstID()
@ -45,14 +63,16 @@ func testLogs(t *testing.T, l LogStore) {
// Try a filed fetch // Try a filed fetch
var out Log var out Log
if err := l.GetLog(10, &out); err.Error() != "log not found" { if err := l.GetLog(10, &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err) t.Fatalf("err: %v ", err)
} }
data := make([]byte, 1024)
// Write out a log // Write out a log
log := Log{ log := Log{
ID: 1, ID: 1,
Data: []byte("first"), Data: data,
} }
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
log.ID = uint64(i) log.ID = uint64(i)
@ -65,7 +85,7 @@ func testLogs(t *testing.T, l LogStore) {
for i := 11; i <= 20; i++ { for i := 11; i <= 20; i++ {
nl := &Log{ nl := &Log{
ID: uint64(i), ID: uint64(i),
Data: []byte("first"), Data: data,
} }
if err := l.StoreLog(nl); err != nil { if err := l.StoreLog(nl); err != nil {
@ -73,6 +93,11 @@ func testLogs(t *testing.T, l LogStore) {
} }
} }
// Try to fetch
if err := l.GetLog(1, &out); err != nil {
t.Fatalf("err: %v ", err)
}
// Try to fetch // Try to fetch
if err := l.GetLog(10, &out); err != nil { if err := l.GetLog(10, &out); err != nil {
t.Fatalf("err: %v ", err) t.Fatalf("err: %v ", err)
@ -100,4 +125,39 @@ func testLogs(t *testing.T, l LogStore) {
if idx != 20 { if idx != 20 {
t.Fatalf("bad idx: %d", idx) t.Fatalf("bad idx: %d", idx)
} }
if err = l.Clear(); err != nil {
t.Fatalf("err :%v", err)
}
// Check the lowest index
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Check the highest index
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Write out a log
log = Log{
ID: 1,
Data: data,
}
for i := 1; i <= 10; i++ {
log.ID = uint64(i)
if err := l.StoreLog(&log); err != nil {
t.Fatalf("err: %v", err)
}
}
} }

38
rpl/table_readers_test.go Normal file
View File

@ -0,0 +1,38 @@
package rpl
import (
"testing"
)
func TestTableReaders(t *testing.T) {
ts := make(tableReaders, 0, 10)
for i := uint64(0); i < 10; i++ {
t := new(tableReader)
t.index = int64(i) + 1
t.first = i*10 + 1
t.last = i*10 + 10
ts = append(ts, t)
}
if err := ts.check(); err != nil {
t.Fatal(err)
}
for i := 1; i <= 100; i++ {
if r := ts.Search(uint64(i)); r == nil {
t.Fatal("must hit", i)
} else if r.index != int64((i-1)/10)+1 {
t.Fatal("invalid index", r.index, i)
}
}
if r := ts.Search(1000); r != nil {
t.Fatal("must not hit")
}
if r := ts.Search(0); r != nil {
t.Fatal("must not hit")
}
}