replication update

This commit is contained in:
siddontang 2014-11-06 21:52:18 +08:00
parent 297c7c0592
commit 137f85dff3
5 changed files with 189 additions and 272 deletions

View File

@ -2,17 +2,13 @@ package rpl
import (
"fmt"
"github.com/siddontang/go/hack"
"github.com/siddontang/go/ioutil2"
"github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sort"
"sync"
"time"
)
const (
@ -53,193 +49,226 @@ const (
type FileStore struct {
LogStore
m sync.Mutex
maxFileSize int64
first uint64
last uint64
base string
logFile *os.File
logNames []string
nextLogIndex int64
rm sync.RWMutex
wm sync.Mutex
indexName string
path string
rs tableReaders
w *tableWriter
}
func NewFileStore(path string) (*FileStore, error) {
func NewFileStore(base string, maxSize int64) (*FileStore, error) {
s := new(FileStore)
if err := os.MkdirAll(path, 0755); err != nil {
var err error
if err = os.MkdirAll(base, 0755); err != nil {
return nil, err
}
s.path = path
s.base = base
s.maxFileSize = defaultMaxLogFileSize
s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize)
s.first = 0
s.last = 0
s.logNames = make([]string, 0, 16)
if err := s.loadIndex(); err != nil {
if err = s.load(); err != nil {
return nil, err
}
index := int64(1)
if len(s.rs) != 0 {
index = s.rs[len(s.rs)-1].index + 1
}
s.w = newTableWriter(s.base, index, s.maxFileSize)
return s, nil
}
func (s *FileStore) SetMaxFileSize(size int64) {
s.maxFileSize = num.MinInt64(maxLogFileSize, size)
}
func (s *FileStore) GetLog(id uint64, log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) FirstID() (uint64, error) {
panic("not implementation")
return 0, nil
}
func (s *FileStore) LastID() (uint64, error) {
panic("not implementation")
return 0, nil
}
func (s *FileStore) StoreLog(log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) StoreLog(l *Log) error {
s.wm.Lock()
defer s.wm.Unlock()
if s.w == nil {
return fmt.Errorf("nil table writer, cannot store")
}
err := s.w.StoreLog(l)
if err == nil {
return nil
} else if err != errTableNeedFlush {
return err
}
var r *tableReader
if r, err = s.w.Flush(); err != nil {
log.Error("write table flush error %s, can not store now", err.Error())
s.w.Close()
s.w = nil
return err
}
s.rm.Lock()
s.rs = append(s.rs, r)
s.rm.Unlock()
func (s *FileStore) Purge(n uint64) error {
panic("not implementation")
return nil
}
func (s *FileStore) PuregeExpired(n int64) error {
panic("not implementation")
s.rm.Lock()
purges := []*tableReader{}
lefts := []*tableReader{}
t := uint32(time.Now().Unix() - int64(n))
for _, r := range s.rs {
if r.lastTime < t {
purges = append(purges, r)
} else {
lefts = append(lefts, r)
}
}
s.rs = lefts
s.rm.Unlock()
for _, r := range purges {
name := r.name
r.Close()
if err := os.Remove(name); err != nil {
log.Error("purge table %s err: %s", name, err.Error())
}
}
return nil
}
func (s *FileStore) Clear() error {
panic("not implementation")
return nil
}
func (s *FileStore) Close() error {
panic("not implementation")
s.wm.Lock()
if s.w != nil {
if r, err := s.w.Flush(); err != nil {
log.Error("close err: %s", err.Error())
} else {
r.Close()
s.w.Close()
s.w = nil
}
}
s.wm.Unlock()
s.rm.Lock()
for i := range s.rs {
s.rs[i].Close()
}
s.rs = nil
s.rm.Unlock()
return nil
}
func (s *FileStore) flushIndex() error {
data := strings.Join(s.logNames, "\n")
if err := ioutil2.WriteFileAtomic(s.indexName, hack.Slice(data), 0644); err != nil {
log.Error("flush index error %s", err.Error())
func (s *FileStore) load() error {
fs, err := ioutil.ReadDir(s.base)
if err != nil {
return err
}
return nil
}
func (s *FileStore) fileExists(name string) bool {
p := path.Join(s.path, name)
_, err := os.Stat(p)
return !os.IsNotExist(err)
}
func (s *FileStore) loadIndex() error {
s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index"))
if _, err := os.Stat(s.indexName); os.IsNotExist(err) {
//no index file, nothing to do
} else {
indexData, err := ioutil.ReadFile(s.indexName)
if err != nil {
return err
}
lines := strings.Split(string(indexData), "\n")
for _, line := range lines {
line = strings.Trim(line, "\r\n ")
if len(line) == 0 {
continue
}
if s.fileExists(line) {
s.logNames = append(s.logNames, line)
var r *tableReader
var index int64
for _, f := range fs {
if _, err := fmt.Sscanf(f.Name(), "%08d.ldb", &index); err == nil {
if r, err = newTableReader(s.base, index); err != nil {
log.Error("load table %s err: %s", f.Name(), err.Error())
} else {
log.Info("log %s has not exists", line)
s.rs = append(s.rs, r)
}
}
}
var err error
if len(s.logNames) == 0 {
s.nextLogIndex = 1
} else {
lastName := s.logNames[len(s.logNames)-1]
if err := s.rs.check(); err != nil {
return err
}
if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil {
log.Error("invalid logfile name %s", err.Error())
return err
return nil
}
type tableReaders []*tableReader
func (ts tableReaders) Len() int {
return len(ts)
}
func (ts tableReaders) Swap(i, j int) {
ts[i], ts[j] = ts[j], ts[i]
}
func (ts tableReaders) Less(i, j int) bool {
return ts[i].index < ts[j].index
}
func (ts tableReaders) Search(id uint64) *tableReader {
n := sort.Search(len(ts), func(i int) bool {
return id >= ts[i].first && id <= ts[i].last
})
if n < len(ts) {
return ts[n]
} else {
return nil
}
}
func (ts tableReaders) check() error {
if len(ts) == 0 {
return nil
}
sort.Sort(ts)
first := ts[0].first
last := ts[0].last
index := ts[0].index
if first == 0 || first > last {
return fmt.Errorf("invalid log in table %s", ts[0].name)
}
for i := 1; i < len(ts); i++ {
if ts[i].first <= last {
return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name)
}
//like mysql, if server restart, a new log will create
s.nextLogIndex++
}
if ts[i].index == index {
return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name)
}
first = ts[i].first
last = ts[i].last
index = ts[i].index
}
return nil
}
func (s *FileStore) openNewLogFile() error {
var err error
lastName := s.formatLogFileName(s.nextLogIndex)
logPath := path.Join(s.path, lastName)
if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
log.Error("open new logfile error %s", err.Error())
return err
}
s.logNames = append(s.logNames, lastName)
if err = s.flushIndex(); err != nil {
return err
}
return nil
}
func (s *FileStore) checkLogFileSize() bool {
if s.logFile == nil {
return false
}
st, _ := s.logFile.Stat()
if st.Size() >= int64(s.maxFileSize) {
s.closeLog()
return true
}
return false
}
func (s *FileStore) closeLog() {
if s.logFile == nil {
return
}
s.nextLogIndex++
s.logFile.Close()
s.logFile = nil
}
func (s *FileStore) formatLogFileName(index int64) string {
return fmt.Sprintf("ledis-bin.%07d", index)
}

View File

@ -22,7 +22,6 @@ var (
log0 = Log{0, 1, 1, []byte("ledisdb")}
log0Data = []byte{}
errTableNeedFlush = errors.New("write table need flush")
errTableFrozen = errors.New("write table is frozen")
pageSize = int64(4096)
)
@ -60,6 +59,9 @@ type tableReader struct {
}
func newTableReader(base string, index int64) (*tableReader, error) {
if index <= 0 {
return nil, fmt.Errorf("invalid index %d", index)
}
t := new(tableReader)
t.name = path.Join(base, fmtTableName(index))
t.index = index
@ -356,12 +358,14 @@ type tableWriter struct {
offsetBuf []byte
frozen bool
maxLogSize int64
}
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
if index <= 0 {
panic(fmt.Errorf("invalid index %d", index))
}
t := new(tableWriter)
t.base = base
@ -392,6 +396,20 @@ func (t *tableWriter) Close() {
t.close()
}
func (t *tableWriter) First() uint64 {
t.Lock()
id := t.first
t.Unlock()
return id
}
func (t *tableWriter) Last() uint64 {
t.Lock()
id := t.last
t.Unlock()
return id
}
func (t *tableWriter) reset() {
t.close()
@ -406,8 +424,6 @@ func (t *tableWriter) Flush() (*tableReader, error) {
t.Lock()
defer t.Unlock()
t.frozen = true
if t.wf == nil {
return nil, fmt.Errorf("nil write handler")
}
@ -532,10 +548,6 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
t.RLock()
defer t.RUnlock()
if t.frozen {
return errTableFrozen
}
if id < t.first || id > t.last {
return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last)
}

View File

@ -118,42 +118,6 @@ func (s *GoLevelDBStore) StoreLog(log *Log) error {
return nil
}
func (s *GoLevelDBStore) Purge(n uint64) error {
s.m.Lock()
defer s.m.Unlock()
var first, last uint64
var err error
first, err = s.firstID()
if err != nil {
return err
}
last, err = s.lastID()
if err != nil {
return err
}
start := first
stop := num.MinUint64(last, first+n)
w := s.db.NewWriteBatch()
defer w.Rollback()
s.reset()
for i := start; i < stop; i++ {
w.Delete(num.Uint64ToBytes(i))
}
if err = w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) PurgeExpired(n int64) error {
if n <= 0 {
return fmt.Errorf("invalid expired time %d", n)
@ -192,6 +156,11 @@ func (s *GoLevelDBStore) PurgeExpired(n int64) error {
return nil
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Clear() error {
s.m.Lock()
defer s.m.Unlock()
@ -206,11 +175,6 @@ func (s *GoLevelDBStore) Clear() error {
return s.open()
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Close() error {
s.m.Lock()
defer s.m.Unlock()

View File

@ -23,9 +23,6 @@ type LogStore interface {
// if log id is less than current last id, return error
StoreLog(log *Log) error
// Delete first n logs
Purge(n uint64) error
// Delete logs before n seconds
PurgeExpired(n int64) error

View File

@ -4,7 +4,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"
)
func TestGoLevelDBStore(t *testing.T) {
@ -101,88 +100,4 @@ func testLogs(t *testing.T, l LogStore) {
if idx != 20 {
t.Fatalf("bad idx: %d", idx)
}
// Delete a suffix
if err := l.Purge(5); err != nil {
t.Fatalf("err: %v ", err)
}
// Verify they are all deleted
for i := 1; i <= 5; i++ {
if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
}
// Index should be one
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 6 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 20 {
t.Fatalf("bad idx: %d", idx)
}
// Should not be able to fetch
if err := l.GetLog(5, &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
if err := l.Clear(); err != nil {
t.Fatal(err)
}
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
now := uint32(time.Now().Unix())
logs := []*Log{}
for i := 1; i <= 20; i++ {
nl := &Log{
ID: uint64(i),
CreateTime: now - 20,
Data: []byte("first"),
}
logs = append(logs, nl)
}
if err := l.PurgeExpired(1); err != nil {
t.Fatal(err)
}
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
}