From 137f85dff3bba12c6a94de88da85c6e3e5f5e678 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 6 Nov 2014 21:52:18 +0800 Subject: [PATCH] replication update --- rpl/file_store.go | 297 ++++++++++++++++++++++------------------- rpl/file_table.go | 30 +++-- rpl/goleveldb_store.go | 46 +------ rpl/store.go | 3 - rpl/store_test.go | 85 ------------ 5 files changed, 189 insertions(+), 272 deletions(-) diff --git a/rpl/file_store.go b/rpl/file_store.go index 873a0e2..f88b603 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -2,17 +2,13 @@ package rpl import ( "fmt" - "github.com/siddontang/go/hack" - "github.com/siddontang/go/ioutil2" "github.com/siddontang/go/log" "github.com/siddontang/go/num" - "io/ioutil" "os" - "path" - "strconv" - "strings" + "sort" "sync" + "time" ) const ( @@ -53,193 +49,226 @@ const ( type FileStore struct { LogStore - m sync.Mutex - maxFileSize int64 - first uint64 - last uint64 + base string - logFile *os.File - logNames []string - nextLogIndex int64 + rm sync.RWMutex + wm sync.Mutex - indexName string - - path string + rs tableReaders + w *tableWriter } -func NewFileStore(path string) (*FileStore, error) { +func NewFileStore(base string, maxSize int64) (*FileStore, error) { s := new(FileStore) - if err := os.MkdirAll(path, 0755); err != nil { + var err error + + if err = os.MkdirAll(base, 0755); err != nil { return nil, err } - s.path = path + s.base = base - s.maxFileSize = defaultMaxLogFileSize + s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize) - s.first = 0 - s.last = 0 - - s.logNames = make([]string, 0, 16) - - if err := s.loadIndex(); err != nil { + if err = s.load(); err != nil { return nil, err } + index := int64(1) + if len(s.rs) != 0 { + index = s.rs[len(s.rs)-1].index + 1 + } + + s.w = newTableWriter(s.base, index, s.maxFileSize) return s, nil } -func (s *FileStore) SetMaxFileSize(size int64) { - s.maxFileSize = num.MinInt64(maxLogFileSize, size) -} - func (s *FileStore) GetLog(id uint64, log *Log) error { panic("not implementation") return nil } func (s *FileStore) FirstID() (uint64, error) { - panic("not implementation") return 0, nil } func (s *FileStore) LastID() (uint64, error) { - panic("not implementation") return 0, nil } -func (s *FileStore) StoreLog(log *Log) error { - panic("not implementation") - return nil -} +func (s *FileStore) StoreLog(l *Log) error { + s.wm.Lock() + defer s.wm.Unlock() + + if s.w == nil { + return fmt.Errorf("nil table writer, cannot store") + } + + err := s.w.StoreLog(l) + if err == nil { + return nil + } else if err != errTableNeedFlush { + return err + } + + var r *tableReader + if r, err = s.w.Flush(); err != nil { + log.Error("write table flush error %s, can not store now", err.Error()) + + s.w.Close() + s.w = nil + return err + } + + s.rm.Lock() + s.rs = append(s.rs, r) + s.rm.Unlock() -func (s *FileStore) Purge(n uint64) error { - panic("not implementation") return nil } func (s *FileStore) PuregeExpired(n int64) error { - panic("not implementation") + s.rm.Lock() + + purges := []*tableReader{} + lefts := []*tableReader{} + + t := uint32(time.Now().Unix() - int64(n)) + + for _, r := range s.rs { + if r.lastTime < t { + purges = append(purges, r) + } else { + lefts = append(lefts, r) + } + } + + s.rs = lefts + + s.rm.Unlock() + + for _, r := range purges { + name := r.name + r.Close() + if err := os.Remove(name); err != nil { + log.Error("purge table %s err: %s", name, err.Error()) + } + } + return nil } func (s *FileStore) Clear() error { - panic("not implementation") return nil } func (s *FileStore) Close() error { - panic("not implementation") + s.wm.Lock() + if s.w != nil { + if r, err := s.w.Flush(); err != nil { + log.Error("close err: %s", err.Error()) + } else { + r.Close() + s.w.Close() + s.w = nil + } + } + + s.wm.Unlock() + + s.rm.Lock() + + for i := range s.rs { + s.rs[i].Close() + } + s.rs = nil + + s.rm.Unlock() + return nil } -func (s *FileStore) flushIndex() error { - data := strings.Join(s.logNames, "\n") - - if err := ioutil2.WriteFileAtomic(s.indexName, hack.Slice(data), 0644); err != nil { - log.Error("flush index error %s", err.Error()) +func (s *FileStore) load() error { + fs, err := ioutil.ReadDir(s.base) + if err != nil { return err } - return nil -} - -func (s *FileStore) fileExists(name string) bool { - p := path.Join(s.path, name) - _, err := os.Stat(p) - return !os.IsNotExist(err) -} - -func (s *FileStore) loadIndex() error { - s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index")) - if _, err := os.Stat(s.indexName); os.IsNotExist(err) { - //no index file, nothing to do - } else { - indexData, err := ioutil.ReadFile(s.indexName) - if err != nil { - return err - } - - lines := strings.Split(string(indexData), "\n") - for _, line := range lines { - line = strings.Trim(line, "\r\n ") - if len(line) == 0 { - continue - } - - if s.fileExists(line) { - s.logNames = append(s.logNames, line) + var r *tableReader + var index int64 + for _, f := range fs { + if _, err := fmt.Sscanf(f.Name(), "%08d.ldb", &index); err == nil { + if r, err = newTableReader(s.base, index); err != nil { + log.Error("load table %s err: %s", f.Name(), err.Error()) } else { - log.Info("log %s has not exists", line) + s.rs = append(s.rs, r) } } } - var err error - if len(s.logNames) == 0 { - s.nextLogIndex = 1 - } else { - lastName := s.logNames[len(s.logNames)-1] + if err := s.rs.check(); err != nil { + return err + } - if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { - log.Error("invalid logfile name %s", err.Error()) - return err + return nil +} + +type tableReaders []*tableReader + +func (ts tableReaders) Len() int { + return len(ts) +} + +func (ts tableReaders) Swap(i, j int) { + ts[i], ts[j] = ts[j], ts[i] +} + +func (ts tableReaders) Less(i, j int) bool { + return ts[i].index < ts[j].index +} + +func (ts tableReaders) Search(id uint64) *tableReader { + n := sort.Search(len(ts), func(i int) bool { + return id >= ts[i].first && id <= ts[i].last + }) + + if n < len(ts) { + return ts[n] + } else { + return nil + } +} + +func (ts tableReaders) check() error { + if len(ts) == 0 { + return nil + } + + sort.Sort(ts) + + first := ts[0].first + last := ts[0].last + index := ts[0].index + + if first == 0 || first > last { + return fmt.Errorf("invalid log in table %s", ts[0].name) + } + + for i := 1; i < len(ts); i++ { + if ts[i].first <= last { + return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name) } - //like mysql, if server restart, a new log will create - s.nextLogIndex++ - } + if ts[i].index == index { + return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name) + } + first = ts[i].first + last = ts[i].last + index = ts[i].index + } return nil } - -func (s *FileStore) openNewLogFile() error { - var err error - lastName := s.formatLogFileName(s.nextLogIndex) - - logPath := path.Join(s.path, lastName) - if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil { - log.Error("open new logfile error %s", err.Error()) - return err - } - - s.logNames = append(s.logNames, lastName) - - if err = s.flushIndex(); err != nil { - return err - } - - return nil -} - -func (s *FileStore) checkLogFileSize() bool { - if s.logFile == nil { - return false - } - - st, _ := s.logFile.Stat() - if st.Size() >= int64(s.maxFileSize) { - s.closeLog() - return true - } - - return false -} - -func (s *FileStore) closeLog() { - if s.logFile == nil { - return - } - - s.nextLogIndex++ - - s.logFile.Close() - s.logFile = nil -} - -func (s *FileStore) formatLogFileName(index int64) string { - return fmt.Sprintf("ledis-bin.%07d", index) -} diff --git a/rpl/file_table.go b/rpl/file_table.go index ab9df1f..410f150 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -22,7 +22,6 @@ var ( log0 = Log{0, 1, 1, []byte("ledisdb")} log0Data = []byte{} errTableNeedFlush = errors.New("write table need flush") - errTableFrozen = errors.New("write table is frozen") pageSize = int64(4096) ) @@ -60,6 +59,9 @@ type tableReader struct { } func newTableReader(base string, index int64) (*tableReader, error) { + if index <= 0 { + return nil, fmt.Errorf("invalid index %d", index) + } t := new(tableReader) t.name = path.Join(base, fmtTableName(index)) t.index = index @@ -356,12 +358,14 @@ type tableWriter struct { offsetBuf []byte - frozen bool - maxLogSize int64 } func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { + if index <= 0 { + panic(fmt.Errorf("invalid index %d", index)) + } + t := new(tableWriter) t.base = base @@ -392,6 +396,20 @@ func (t *tableWriter) Close() { t.close() } +func (t *tableWriter) First() uint64 { + t.Lock() + id := t.first + t.Unlock() + return id +} + +func (t *tableWriter) Last() uint64 { + t.Lock() + id := t.last + t.Unlock() + return id +} + func (t *tableWriter) reset() { t.close() @@ -406,8 +424,6 @@ func (t *tableWriter) Flush() (*tableReader, error) { t.Lock() defer t.Unlock() - t.frozen = true - if t.wf == nil { return nil, fmt.Errorf("nil write handler") } @@ -532,10 +548,6 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { t.RLock() defer t.RUnlock() - if t.frozen { - return errTableFrozen - } - if id < t.first || id > t.last { return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) } diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index 349cb4a..2d3f808 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -118,42 +118,6 @@ func (s *GoLevelDBStore) StoreLog(log *Log) error { return nil } -func (s *GoLevelDBStore) Purge(n uint64) error { - s.m.Lock() - defer s.m.Unlock() - - var first, last uint64 - var err error - - first, err = s.firstID() - if err != nil { - return err - } - - last, err = s.lastID() - if err != nil { - return err - } - - start := first - stop := num.MinUint64(last, first+n) - - w := s.db.NewWriteBatch() - defer w.Rollback() - - s.reset() - - for i := start; i < stop; i++ { - w.Delete(num.Uint64ToBytes(i)) - } - - if err = w.Commit(); err != nil { - return err - } - - return nil -} - func (s *GoLevelDBStore) PurgeExpired(n int64) error { if n <= 0 { return fmt.Errorf("invalid expired time %d", n) @@ -192,6 +156,11 @@ func (s *GoLevelDBStore) PurgeExpired(n int64) error { return nil } +func (s *GoLevelDBStore) reset() { + s.first = InvalidLogID + s.last = InvalidLogID +} + func (s *GoLevelDBStore) Clear() error { s.m.Lock() defer s.m.Unlock() @@ -206,11 +175,6 @@ func (s *GoLevelDBStore) Clear() error { return s.open() } -func (s *GoLevelDBStore) reset() { - s.first = InvalidLogID - s.last = InvalidLogID -} - func (s *GoLevelDBStore) Close() error { s.m.Lock() defer s.m.Unlock() diff --git a/rpl/store.go b/rpl/store.go index 00d9594..d56d9f0 100644 --- a/rpl/store.go +++ b/rpl/store.go @@ -23,9 +23,6 @@ type LogStore interface { // if log id is less than current last id, return error StoreLog(log *Log) error - // Delete first n logs - Purge(n uint64) error - // Delete logs before n seconds PurgeExpired(n int64) error diff --git a/rpl/store_test.go b/rpl/store_test.go index f572317..e84e286 100644 --- a/rpl/store_test.go +++ b/rpl/store_test.go @@ -4,7 +4,6 @@ import ( "io/ioutil" "os" "testing" - "time" ) func TestGoLevelDBStore(t *testing.T) { @@ -101,88 +100,4 @@ func testLogs(t *testing.T, l LogStore) { if idx != 20 { t.Fatalf("bad idx: %d", idx) } - - // Delete a suffix - if err := l.Purge(5); err != nil { - t.Fatalf("err: %v ", err) - } - - // Verify they are all deleted - for i := 1; i <= 5; i++ { - if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound { - t.Fatalf("err: %v ", err) - } - } - - // Index should be one - idx, err = l.FirstID() - if err != nil { - t.Fatalf("err: %v ", err) - } - if idx != 6 { - t.Fatalf("bad idx: %d", idx) - } - idx, err = l.LastID() - if err != nil { - t.Fatalf("err: %v ", err) - } - if idx != 20 { - t.Fatalf("bad idx: %d", idx) - } - - // Should not be able to fetch - if err := l.GetLog(5, &out); err != ErrLogNotFound { - t.Fatalf("err: %v ", err) - } - - if err := l.Clear(); err != nil { - t.Fatal(err) - } - - idx, err = l.FirstID() - if err != nil { - t.Fatalf("err: %v ", err) - } - if idx != 0 { - t.Fatalf("bad idx: %d", idx) - } - - idx, err = l.LastID() - if err != nil { - t.Fatalf("err: %v ", err) - } - if idx != 0 { - t.Fatalf("bad idx: %d", idx) - } - - now := uint32(time.Now().Unix()) - logs := []*Log{} - for i := 1; i <= 20; i++ { - nl := &Log{ - ID: uint64(i), - CreateTime: now - 20, - Data: []byte("first"), - } - logs = append(logs, nl) - } - - if err := l.PurgeExpired(1); err != nil { - t.Fatal(err) - } - - idx, err = l.FirstID() - if err != nil { - t.Fatalf("err: %v ", err) - } - if idx != 0 { - t.Fatalf("bad idx: %d", idx) - } - - idx, err = l.LastID() - if err != nil { - t.Fatalf("err: %v ", err) - } - if idx != 0 { - t.Fatalf("bad idx: %d", idx) - } }