diff --git a/rpl/file_store.go b/rpl/file_store.go index 27b9e30..eaf2321 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -5,6 +5,8 @@ import ( "github.com/siddontang/go/hack" "github.com/siddontang/go/ioutil2" "github.com/siddontang/go/log" + "github.com/siddontang/go/num" + "io/ioutil" "os" "path" @@ -14,10 +16,10 @@ import ( ) const ( - defaultMaxLogFileSize = uint32(1024 * 1024 * 1024) + defaultMaxLogFileSize = int64(1024 * 1024 * 1024) //why 4G, we can use uint32 as offset, reduce memory useage - maxLogFileSize = uint32(4*1024*1024*1024 - 1) + maxLogFileSize = int64(uint32(4*1024*1024*1024 - 1)) maxLogNumInFile = uint64(10000000) ) @@ -53,7 +55,7 @@ type FileStore struct { m sync.Mutex - maxFileSize uint32 + maxFileSize int64 first uint64 last uint64 @@ -90,8 +92,8 @@ func NewFileStore(path string) (*FileStore, error) { return s, nil } -func (s *FileStore) SetMaxFileSize(size uint32) { - s.maxFileSize = size +func (s *FileStore) SetMaxFileSize(size int64) { + s.maxFileSize = num.MinInt64(maxLogFileSize, size) } func (s *FileStore) GetLog(id uint64, log *Log) error { diff --git a/rpl/file_table.go b/rpl/file_table.go index 584b27f..9745644 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -177,8 +177,57 @@ func (t *tableReader) check() error { func (t *tableReader) repair() error { t.close() - //todo later - return fmt.Errorf("repair not supported now") + var err error + if t.f, err = os.Open(t.name); err != nil { + return err + } + + defer t.close() + + tw := newTableWriter(path.Base(t.name), t.index, maxLogFileSize) + tw.name = tw.name + ".tmp" + os.Remove(tw.name) + + defer func() { + tw.Close() + os.Remove(tw.name) + }() + + var l Log + + for { + if err := l.Decode(t.f); err != nil { + return err + } + + if l.ID == 0 { + break + } + + if err := tw.StoreLog(&l); err != nil { + return err + } + } + + t.close() + + var tr *tableReader + if tr, err = tw.Flush(); err != nil { + return err + } + + t.first = tr.first + t.last = tr.last + t.offsetStartPos = tr.offsetStartPos + t.offsetLen = tr.offsetLen + + os.Remove(t.name) + + if err := os.Rename(tw.name, t.name); err != nil { + return err + } + + return nil } func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) { @@ -264,7 +313,7 @@ type tableWriter struct { maxLogSize int64 } -func newTableWriter(base string, index int64, maxLogSize int64) (*tableWriter, error) { +func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { t := new(tableWriter) t.base = base @@ -273,7 +322,7 @@ func newTableWriter(base string, index int64, maxLogSize int64) (*tableWriter, e t.maxLogSize = maxLogSize - return t, nil + return t } func (t *tableWriter) close() { @@ -370,6 +419,10 @@ func (t *tableWriter) StoreLog(l *Log) error { t.Lock() defer t.Unlock() + if t.frozen { + return errTableFrozen + } + if t.last > 0 && l.ID != t.last+1 { return ErrStoreLogID } @@ -416,6 +469,10 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { t.RLock() defer t.RUnlock() + if t.frozen { + return errTableFrozen + } + if id < t.first && id > t.last { return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) } diff --git a/rpl/file_table_test.go b/rpl/file_table_test.go index 82efade..edd1019 100644 --- a/rpl/file_table_test.go +++ b/rpl/file_table_test.go @@ -1,9 +1,17 @@ package rpl import ( + "io/ioutil" + "os" "testing" ) func TestFileTable(t *testing.T) { + base, err := ioutil.TempDir("./", "test_table") + if err != nil { + t.Fatal(err) + } + + defer os.RemoveAll(base) }