update table repair

This commit is contained in:
siddontang 2014-11-05 22:35:43 +08:00
parent 5f34758cb3
commit 128827999f
3 changed files with 76 additions and 9 deletions

View File

@ -5,6 +5,8 @@ import (
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
"github.com/siddontang/go/ioutil2" "github.com/siddontang/go/ioutil2"
"github.com/siddontang/go/log" "github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
@ -14,10 +16,10 @@ import (
) )
const ( const (
defaultMaxLogFileSize = uint32(1024 * 1024 * 1024) defaultMaxLogFileSize = int64(1024 * 1024 * 1024)
//why 4G, we can use uint32 as offset, reduce memory useage //why 4G, we can use uint32 as offset, reduce memory useage
maxLogFileSize = uint32(4*1024*1024*1024 - 1) maxLogFileSize = int64(uint32(4*1024*1024*1024 - 1))
maxLogNumInFile = uint64(10000000) maxLogNumInFile = uint64(10000000)
) )
@ -53,7 +55,7 @@ type FileStore struct {
m sync.Mutex m sync.Mutex
maxFileSize uint32 maxFileSize int64
first uint64 first uint64
last uint64 last uint64
@ -90,8 +92,8 @@ func NewFileStore(path string) (*FileStore, error) {
return s, nil return s, nil
} }
func (s *FileStore) SetMaxFileSize(size uint32) { func (s *FileStore) SetMaxFileSize(size int64) {
s.maxFileSize = size s.maxFileSize = num.MinInt64(maxLogFileSize, size)
} }
func (s *FileStore) GetLog(id uint64, log *Log) error { func (s *FileStore) GetLog(id uint64, log *Log) error {

View File

@ -177,8 +177,57 @@ func (t *tableReader) check() error {
func (t *tableReader) repair() error { func (t *tableReader) repair() error {
t.close() t.close()
//todo later var err error
return fmt.Errorf("repair not supported now") if t.f, err = os.Open(t.name); err != nil {
return err
}
defer t.close()
tw := newTableWriter(path.Base(t.name), t.index, maxLogFileSize)
tw.name = tw.name + ".tmp"
os.Remove(tw.name)
defer func() {
tw.Close()
os.Remove(tw.name)
}()
var l Log
for {
if err := l.Decode(t.f); err != nil {
return err
}
if l.ID == 0 {
break
}
if err := tw.StoreLog(&l); err != nil {
return err
}
}
t.close()
var tr *tableReader
if tr, err = tw.Flush(); err != nil {
return err
}
t.first = tr.first
t.last = tr.last
t.offsetStartPos = tr.offsetStartPos
t.offsetLen = tr.offsetLen
os.Remove(t.name)
if err := os.Rename(tw.name, t.name); err != nil {
return err
}
return nil
} }
func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) { func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) {
@ -264,7 +313,7 @@ type tableWriter struct {
maxLogSize int64 maxLogSize int64
} }
func newTableWriter(base string, index int64, maxLogSize int64) (*tableWriter, error) { func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
t := new(tableWriter) t := new(tableWriter)
t.base = base t.base = base
@ -273,7 +322,7 @@ func newTableWriter(base string, index int64, maxLogSize int64) (*tableWriter, e
t.maxLogSize = maxLogSize t.maxLogSize = maxLogSize
return t, nil return t
} }
func (t *tableWriter) close() { func (t *tableWriter) close() {
@ -370,6 +419,10 @@ func (t *tableWriter) StoreLog(l *Log) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
if t.frozen {
return errTableFrozen
}
if t.last > 0 && l.ID != t.last+1 { if t.last > 0 && l.ID != t.last+1 {
return ErrStoreLogID return ErrStoreLogID
} }
@ -416,6 +469,10 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
if t.frozen {
return errTableFrozen
}
if id < t.first && id > t.last { if id < t.first && id > t.last {
return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last)
} }

View File

@ -1,9 +1,17 @@
package rpl package rpl
import ( import (
"io/ioutil"
"os"
"testing" "testing"
) )
func TestFileTable(t *testing.T) { func TestFileTable(t *testing.T) {
base, err := ioutil.TempDir("./", "test_table")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(base)
} }