From c6576990efe6b583c3626579e30d11450067ad58 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 11 Nov 2014 15:20:26 +0800 Subject: [PATCH] rpl support sync every 1 second --- rpl/file_store.go | 8 +++++++- rpl/file_table.go | 30 +++++++++++++++++++++++++-- rpl/goleveldb_store.go | 5 +++++ rpl/rpl.go | 46 ++++++++++++++++++++++++++++++------------ rpl/store.go | 2 ++ 5 files changed, 75 insertions(+), 16 deletions(-) diff --git a/rpl/file_store.go b/rpl/file_store.go index 3c5e719..63d287c 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -209,6 +209,10 @@ func (s *FileStore) PuregeExpired(n int64) error { return nil } +func (s *FileStore) Sync() error { + return s.w.Sync() +} + func (s *FileStore) Clear() error { s.wm.Lock() s.rm.Lock() @@ -244,7 +248,9 @@ func (s *FileStore) Close() error { s.rm.Lock() if r, err := s.w.Flush(); err != nil { - log.Error("close err: %s", err.Error()) + if err != errNilHandler { + log.Error("close err: %s", err.Error()) + } } else { r.Close() s.w.Close() diff --git a/rpl/file_table.go b/rpl/file_table.go index 9023aa3..bfff239 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -22,6 +22,7 @@ var ( log0 = Log{0, 1, 1, []byte("ledisdb")} log0Data = []byte{} errTableNeedFlush = errors.New("write table need flush") + errNilHandler = errors.New("nil write handler") pageSize = int64(4096) ) @@ -225,6 +226,13 @@ func (t *tableReader) repair() error { defer t.close() + st, _ := t.f.Stat() + size := st.Size() + + if size == 0 { + return fmt.Errorf("empty file, can not repaired") + } + tw := newTableWriter(path.Dir(t.name), t.index, maxLogFileSize) tmpName := tw.name + ".tmp" @@ -239,6 +247,14 @@ func (t *tableReader) repair() error { var l Log for { + lastPos, _ := t.f.Seek(0, os.SEEK_CUR) + if lastPos == size { + //no data anymore, we can not read log0 + //we may meet the log missing risk but have no way + log.Error("no more data, maybe missing some logs, use your own risk!!!") + break + } + if err := l.Decode(t.f); err != nil { return err } @@ -442,7 +458,7 @@ func (t *tableWriter) Flush() (*tableReader, error) { defer t.Unlock() if t.wf == nil { - return nil, fmt.Errorf("nil write handler") + return nil, errNilHandler } defer t.reset() @@ -564,7 +580,7 @@ func (t *tableWriter) StoreLog(l *Log) error { //todo add LRU cache - if t.syncType == 2 || (t.syncType == 1 && time.Now().Unix()-int64(t.lastTime) > 1) { + if t.syncType == 2 { if err := t.wf.Sync(); err != nil { log.Error("sync table error %s", err.Error()) } @@ -594,6 +610,16 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { return nil } +func (t *tableWriter) Sync() error { + t.Lock() + defer t.Unlock() + + if t.wf != nil { + return t.wf.Sync() + } + return nil +} + func (t *tableWriter) getLog(l *Log, pos int64) error { t.rm.Lock() defer t.rm.Unlock() diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index 2d3f808..5ece8d5 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -156,6 +156,11 @@ func (s *GoLevelDBStore) PurgeExpired(n int64) error { return nil } +func (s *GoLevelDBStore) Sync() error { + //no other way for sync, so ignore here + return nil +} + func (s *GoLevelDBStore) reset() { s.first = InvalidLogID s.last = InvalidLogID diff --git a/rpl/rpl.go b/rpl/rpl.go index b0453ef..e3bd54c 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -24,9 +24,8 @@ type Replication struct { s LogStore - commitID uint64 - commitLog *os.File - commitLastTime time.Time + commitID uint64 + commitLog *os.File quit chan struct{} @@ -75,7 +74,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) { } r.wg.Add(1) - go r.onPurgeExpired() + go r.run() return r, nil } @@ -213,9 +212,7 @@ func (r *Replication) Stat() (*Stat, error) { } func (r *Replication) updateCommitID(id uint64, force bool) error { - n := time.Now() - - if force || n.Sub(r.commitLastTime) > time.Second { + if force { if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil { return err } @@ -227,8 +224,6 @@ func (r *Replication) updateCommitID(id uint64, force bool) error { r.commitID = id - r.commitLastTime = n - return nil } @@ -280,19 +275,44 @@ func (r *Replication) ClearWithCommitID(id uint64) error { return r.updateCommitID(id, true) } -func (r *Replication) onPurgeExpired() { +func (r *Replication) run() { defer r.wg.Done() + syncTc := time.NewTicker(1 * time.Second) + purgeTc := time.NewTicker(1 * time.Hour) + for { select { - case <-time.After(1 * time.Hour): + case <-purgeTc.C: n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600) r.m.Lock() - if err := r.s.PurgeExpired(int64(n)); err != nil { + err := r.s.PurgeExpired(int64(n)) + r.m.Unlock() + if err != nil { log.Error("purge expired log error %s", err.Error()) } - r.m.Unlock() + case <-syncTc.C: + if r.cfg.Replication.SyncLog == 1 { + r.m.Lock() + err := r.s.Sync() + r.m.Unlock() + if err != nil { + log.Error("sync store error %s", err.Error()) + } + } + if r.cfg.Replication.SyncLog != 2 { + //we will sync commit id every 1 second + r.m.Lock() + err := r.updateCommitID(r.commitID, true) + r.m.Unlock() + + if err != nil { + log.Error("sync commitid error %s", err.Error()) + } + } case <-r.quit: + syncTc.Stop() + purgeTc.Stop() return } } diff --git a/rpl/store.go b/rpl/store.go index d56d9f0..7af9b0a 100644 --- a/rpl/store.go +++ b/rpl/store.go @@ -26,6 +26,8 @@ type LogStore interface { // Delete logs before n seconds PurgeExpired(n int64) error + Sync() error + // Clear all logs Clear() error