rpl support sync every 1 second

This commit is contained in:
siddontang 2014-11-11 15:20:26 +08:00
parent 529c158293
commit c6576990ef
5 changed files with 75 additions and 16 deletions

View File

@ -209,6 +209,10 @@ func (s *FileStore) PuregeExpired(n int64) error {
return nil return nil
} }
func (s *FileStore) Sync() error {
return s.w.Sync()
}
func (s *FileStore) Clear() error { func (s *FileStore) Clear() error {
s.wm.Lock() s.wm.Lock()
s.rm.Lock() s.rm.Lock()
@ -244,7 +248,9 @@ func (s *FileStore) Close() error {
s.rm.Lock() s.rm.Lock()
if r, err := s.w.Flush(); err != nil { if r, err := s.w.Flush(); err != nil {
if err != errNilHandler {
log.Error("close err: %s", err.Error()) log.Error("close err: %s", err.Error())
}
} else { } else {
r.Close() r.Close()
s.w.Close() s.w.Close()

View File

@ -22,6 +22,7 @@ var (
log0 = Log{0, 1, 1, []byte("ledisdb")} log0 = Log{0, 1, 1, []byte("ledisdb")}
log0Data = []byte{} log0Data = []byte{}
errTableNeedFlush = errors.New("write table need flush") errTableNeedFlush = errors.New("write table need flush")
errNilHandler = errors.New("nil write handler")
pageSize = int64(4096) pageSize = int64(4096)
) )
@ -225,6 +226,13 @@ func (t *tableReader) repair() error {
defer t.close() defer t.close()
st, _ := t.f.Stat()
size := st.Size()
if size == 0 {
return fmt.Errorf("empty file, can not repaired")
}
tw := newTableWriter(path.Dir(t.name), t.index, maxLogFileSize) tw := newTableWriter(path.Dir(t.name), t.index, maxLogFileSize)
tmpName := tw.name + ".tmp" tmpName := tw.name + ".tmp"
@ -239,6 +247,14 @@ func (t *tableReader) repair() error {
var l Log var l Log
for { for {
lastPos, _ := t.f.Seek(0, os.SEEK_CUR)
if lastPos == size {
//no data anymore, we can not read log0
//we may meet the log missing risk but have no way
log.Error("no more data, maybe missing some logs, use your own risk!!!")
break
}
if err := l.Decode(t.f); err != nil { if err := l.Decode(t.f); err != nil {
return err return err
} }
@ -442,7 +458,7 @@ func (t *tableWriter) Flush() (*tableReader, error) {
defer t.Unlock() defer t.Unlock()
if t.wf == nil { if t.wf == nil {
return nil, fmt.Errorf("nil write handler") return nil, errNilHandler
} }
defer t.reset() defer t.reset()
@ -564,7 +580,7 @@ func (t *tableWriter) StoreLog(l *Log) error {
//todo add LRU cache //todo add LRU cache
if t.syncType == 2 || (t.syncType == 1 && time.Now().Unix()-int64(t.lastTime) > 1) { if t.syncType == 2 {
if err := t.wf.Sync(); err != nil { if err := t.wf.Sync(); err != nil {
log.Error("sync table error %s", err.Error()) log.Error("sync table error %s", err.Error())
} }
@ -594,6 +610,16 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
return nil return nil
} }
func (t *tableWriter) Sync() error {
t.Lock()
defer t.Unlock()
if t.wf != nil {
return t.wf.Sync()
}
return nil
}
func (t *tableWriter) getLog(l *Log, pos int64) error { func (t *tableWriter) getLog(l *Log, pos int64) error {
t.rm.Lock() t.rm.Lock()
defer t.rm.Unlock() defer t.rm.Unlock()

View File

@ -156,6 +156,11 @@ func (s *GoLevelDBStore) PurgeExpired(n int64) error {
return nil return nil
} }
func (s *GoLevelDBStore) Sync() error {
//no other way for sync, so ignore here
return nil
}
func (s *GoLevelDBStore) reset() { func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID s.first = InvalidLogID
s.last = InvalidLogID s.last = InvalidLogID

View File

@ -26,7 +26,6 @@ type Replication struct {
commitID uint64 commitID uint64
commitLog *os.File commitLog *os.File
commitLastTime time.Time
quit chan struct{} quit chan struct{}
@ -75,7 +74,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
} }
r.wg.Add(1) r.wg.Add(1)
go r.onPurgeExpired() go r.run()
return r, nil return r, nil
} }
@ -213,9 +212,7 @@ func (r *Replication) Stat() (*Stat, error) {
} }
func (r *Replication) updateCommitID(id uint64, force bool) error { func (r *Replication) updateCommitID(id uint64, force bool) error {
n := time.Now() if force {
if force || n.Sub(r.commitLastTime) > time.Second {
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil { if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err return err
} }
@ -227,8 +224,6 @@ func (r *Replication) updateCommitID(id uint64, force bool) error {
r.commitID = id r.commitID = id
r.commitLastTime = n
return nil return nil
} }
@ -280,19 +275,44 @@ func (r *Replication) ClearWithCommitID(id uint64) error {
return r.updateCommitID(id, true) return r.updateCommitID(id, true)
} }
func (r *Replication) onPurgeExpired() { func (r *Replication) run() {
defer r.wg.Done() defer r.wg.Done()
syncTc := time.NewTicker(1 * time.Second)
purgeTc := time.NewTicker(1 * time.Hour)
for { for {
select { select {
case <-time.After(1 * time.Hour): case <-purgeTc.C:
n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600) n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600)
r.m.Lock() r.m.Lock()
if err := r.s.PurgeExpired(int64(n)); err != nil { err := r.s.PurgeExpired(int64(n))
r.m.Unlock()
if err != nil {
log.Error("purge expired log error %s", err.Error()) log.Error("purge expired log error %s", err.Error())
} }
case <-syncTc.C:
if r.cfg.Replication.SyncLog == 1 {
r.m.Lock()
err := r.s.Sync()
r.m.Unlock() r.m.Unlock()
if err != nil {
log.Error("sync store error %s", err.Error())
}
}
if r.cfg.Replication.SyncLog != 2 {
//we will sync commit id every 1 second
r.m.Lock()
err := r.updateCommitID(r.commitID, true)
r.m.Unlock()
if err != nil {
log.Error("sync commitid error %s", err.Error())
}
}
case <-r.quit: case <-r.quit:
syncTc.Stop()
purgeTc.Stop()
return return
} }
} }

View File

@ -26,6 +26,8 @@ type LogStore interface {
// Delete logs before n seconds // Delete logs before n seconds
PurgeExpired(n int64) error PurgeExpired(n int64) error
Sync() error
// Clear all logs // Clear all logs
Clear() error Clear() error