update table reader

This commit is contained in:
siddontang 2014-11-06 15:05:21 +08:00
parent c3303e1fdb
commit 4a0244180a
2 changed files with 21 additions and 19 deletions

View File

@ -41,27 +41,26 @@ type tableReader struct {
first uint64 first uint64
last uint64 last uint64
lastTime uint32
offsetStartPos int64 offsetStartPos int64
offsetLen uint32 offsetLen uint32
lastReadTime sync2.AtomicInt64 lastReadTime sync2.AtomicInt64
} }
func newTableReader(name string) (*tableReader, error) { func newTableReader(base string, index int64) (*tableReader, error) {
t := new(tableReader) t := new(tableReader)
t.name = name t.name = path.Join(base, fmtTableName(index))
t.index = index
var err error var err error
if _, err = fmt.Sscanf(path.Base(name), "%d.ldb", &t.index); err != nil {
return nil, err
}
if err = t.check(); err != nil { if err = t.check(); err != nil {
log.Error("check %s error: %s, try to repair", name, err.Error()) log.Error("check %s error: %s, try to repair", t.name, err.Error())
if err = t.repair(); err != nil { if err = t.repair(); err != nil {
log.Error("repair %s error: %s", name, err.Error()) log.Error("repair %s error: %s", t.name, err.Error())
return nil, err return nil, err
} }
} }
@ -164,6 +163,7 @@ func (t *tableReader) check() error {
} }
t.last = l.ID t.last = l.ID
t.lastTime = l.CreateTime
if t.first > t.last { if t.first > t.last {
return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last) return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last)
@ -206,6 +206,8 @@ func (t *tableReader) repair() error {
break break
} }
t.lastTime = l.CreateTime
if err := tw.StoreLog(&l); err != nil { if err := tw.StoreLog(&l); err != nil {
return err return err
} }

View File

@ -4,6 +4,7 @@ import (
"github.com/siddontang/go/log" "github.com/siddontang/go/log"
"io/ioutil" "io/ioutil"
"os" "os"
"path"
"testing" "testing"
"time" "time"
) )
@ -80,6 +81,7 @@ func TestFileTable(t *testing.T) {
} }
var r *tableReader var r *tableReader
name := w.name name := w.name
if r, err = w.Flush(); err != nil { if r, err = w.Flush(); err != nil {
@ -111,7 +113,7 @@ func TestFileTable(t *testing.T) {
r.Close() r.Close()
if r, err = newTableReader(name); err != nil { if r, err = newTableReader(base, 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer r.Close() defer r.Close()
@ -135,34 +137,32 @@ func TestFileTable(t *testing.T) {
r.Close() r.Close()
testRepair(t, name, s, 11) testRepair(t, name, 1, s, 11)
testRepair(t, name, s, 32) testRepair(t, name, 1, s, 32)
testRepair(t, name, s, 42) testRepair(t, name, 1, s, 42)
testRepair(t, name, s, 72) testRepair(t, name, 1, s, 72)
if err := os.Truncate(name, s-73); err != nil { if err := os.Truncate(name, s-73); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if r, err = newTableReader(name); err == nil { if r, err = newTableReader(base, 1); err == nil {
t.Fatal("can not repair") t.Fatal("can not repair")
} }
name = w.name
if r, err := w.Flush(); err != nil { if r, err := w.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
r.Close() r.Close()
} }
if r, err = newTableReader(name); err != nil { if r, err = newTableReader(base, 2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer r.Close() defer r.Close()
} }
func testRepair(t *testing.T, name string, s int64, cutSize int64) { func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) {
var r *tableReader var r *tableReader
var err error var err error
@ -170,7 +170,7 @@ func testRepair(t *testing.T, name string, s int64, cutSize int64) {
t.Fatal(err) t.Fatal(err)
} }
if r, err = newTableReader(name); err != nil { if r, err = newTableReader(path.Dir(name), index); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer r.Close() defer r.Close()