diff --git a/config/config.go b/config/config.go index 498c748..1634fa0 100644 --- a/config/config.go +++ b/config/config.go @@ -74,6 +74,8 @@ type ReplicationConfig struct { WaitSyncTime int `toml:"wait_sync_time"` WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"` ExpiredLogDays int `toml:"expired_log_days"` + StoreName string `toml:"store_name"` + MaxLogFileSize int64 `toml:"max_log_file_size"` SyncLog int `toml:"sync_log"` Compression bool `toml:"compression"` } diff --git a/config/config.toml b/config/config.toml index f75c784..f121543 100644 --- a/config/config.toml +++ b/config/config.toml @@ -121,9 +121,16 @@ wait_sync_time = 500 # If 0, wait (n + 1) / 2 acks. wait_max_slave_acks = 2 +# store name: file, goleveldb +# change in runtime is very dangerous +store_name = "file" + # Expire write ahead logs after the given days expired_log_days = 7 +# for file store, if 0, use default 1G, max is 4G +max_log_file_size = 0 + # Sync log to disk if possible # 0: no sync # 1: sync every second diff --git a/rpl/file_store.go b/rpl/file_store.go index f88b603..3c5e719 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -60,7 +60,7 @@ type FileStore struct { w *tableWriter } -func NewFileStore(base string, maxSize int64) (*FileStore, error) { +func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) { s := new(FileStore) var err error @@ -72,6 +72,9 @@ func NewFileStore(base string, maxSize int64) (*FileStore, error) { s.base = base s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize) + if s.maxFileSize == 0 { + s.maxFileSize = defaultMaxLogFileSize + } if err = s.load(); err != nil { return nil, err @@ -83,30 +86,75 @@ func NewFileStore(base string, maxSize int64) (*FileStore, error) { } s.w = newTableWriter(s.base, index, s.maxFileSize) + s.w.SetSyncType(syncType) + return s, nil } -func (s *FileStore) GetLog(id uint64, log *Log) error { - panic("not implementation") - return nil +func (s *FileStore) GetLog(id uint64, l *Log) error { + //first search in table writer + if err := s.w.GetLog(id, l); err == nil { + return nil + } else if err != ErrLogNotFound { + return err + } + + s.rm.RLock() + t := s.rs.Search(id) + + if t == nil { + s.rm.RUnlock() + + return ErrLogNotFound + } + + err := t.GetLog(id, l) + s.rm.RUnlock() + + return err } func (s *FileStore) FirstID() (uint64, error) { - return 0, nil + id := uint64(0) + + s.rm.RLock() + if len(s.rs) > 0 { + id = s.rs[0].first + } else { + id = 0 + } + s.rm.RUnlock() + + if id > 0 { + return id, nil + } + + //if id = 0, + + return s.w.First(), nil } func (s *FileStore) LastID() (uint64, error) { - return 0, nil + id := s.w.Last() + if id > 0 { + return id, nil + } + + //if table writer has no last id, we may find in the last table reader + + s.rm.RLock() + if len(s.rs) > 0 { + id = s.rs[len(s.rs)-1].last + } + s.rm.RUnlock() + + return id, nil } func (s *FileStore) StoreLog(l *Log) error { s.wm.Lock() defer s.wm.Unlock() - if s.w == nil { - return fmt.Errorf("nil table writer, cannot store") - } - err := s.w.StoreLog(l) if err == nil { return nil @@ -114,40 +162,40 @@ func (s *FileStore) StoreLog(l *Log) error { return err } + s.rm.Lock() + var r *tableReader if r, err = s.w.Flush(); err != nil { log.Error("write table flush error %s, can not store now", err.Error()) s.w.Close() - s.w = nil + + s.rm.Unlock() + return err } - s.rm.Lock() s.rs = append(s.rs, r) s.rm.Unlock() - return nil + return s.w.StoreLog(l) } func (s *FileStore) PuregeExpired(n int64) error { s.rm.Lock() purges := []*tableReader{} - lefts := []*tableReader{} t := uint32(time.Now().Unix() - int64(n)) - for _, r := range s.rs { - if r.lastTime < t { - purges = append(purges, r) - } else { - lefts = append(lefts, r) + for i, r := range s.rs { + if r.lastTime > t { + purges = s.rs[0:i] + s.rs = s.rs[i:] + break } } - s.rs = lefts - s.rm.Unlock() for _, r := range purges { @@ -162,31 +210,53 @@ func (s *FileStore) PuregeExpired(n int64) error { } func (s *FileStore) Clear() error { + s.wm.Lock() + s.rm.Lock() + + defer func() { + s.rm.Unlock() + s.wm.Unlock() + }() + + s.w.Close() + + for i := range s.rs { + s.rs[i].Close() + } + + s.rs = tableReaders{} + + if err := os.RemoveAll(s.base); err != nil { + return err + } + + if err := os.MkdirAll(s.base, 0755); err != nil { + return err + } + + s.w = newTableWriter(s.base, 1, s.maxFileSize) + return nil } func (s *FileStore) Close() error { s.wm.Lock() - if s.w != nil { - if r, err := s.w.Flush(); err != nil { - log.Error("close err: %s", err.Error()) - } else { - r.Close() - s.w.Close() - s.w = nil - } - } - - s.wm.Unlock() - s.rm.Lock() + if r, err := s.w.Flush(); err != nil { + log.Error("close err: %s", err.Error()) + } else { + r.Close() + s.w.Close() + } + for i := range s.rs { s.rs[i].Close() } - s.rs = nil + s.rs = tableReaders{} s.rm.Unlock() + s.wm.Unlock() return nil } @@ -227,19 +297,25 @@ func (ts tableReaders) Swap(i, j int) { } func (ts tableReaders) Less(i, j int) bool { - return ts[i].index < ts[j].index + return ts[i].first < ts[j].first } func (ts tableReaders) Search(id uint64) *tableReader { - n := sort.Search(len(ts), func(i int) bool { - return id >= ts[i].first && id <= ts[i].last - }) + i, j := 0, len(ts)-1 - if n < len(ts) { - return ts[n] - } else { - return nil + for i <= j { + h := i + (j-i)/2 + + if ts[h].first <= id && id <= ts[h].last { + return ts[h] + } else if ts[h].last < id { + i = h + 1 + } else { + j = h - 1 + } } + + return nil } func (ts tableReaders) check() error { @@ -262,7 +338,7 @@ func (ts tableReaders) check() error { return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name) } - if ts[i].index == index { + if ts[i].index <= index { return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name) } diff --git a/rpl/file_table.go b/rpl/file_table.go index 410f150..9023aa3 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -293,7 +293,7 @@ func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) { func (t *tableReader) GetLog(id uint64, l *Log) error { if id < t.first || id > t.last { - return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) + return ErrLogNotFound } t.lastReadTime.Set(time.Now().Unix()) @@ -359,6 +359,11 @@ type tableWriter struct { offsetBuf []byte maxLogSize int64 + + closed bool + + syncType int + lastTime uint32 } func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { @@ -374,9 +379,19 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { t.maxLogSize = maxLogSize + t.closed = false + return t } +func (t *tableWriter) SetMaxLogSize(s int64) { + t.maxLogSize = s +} + +func (t *tableWriter) SetSyncType(tp int) { + t.syncType = tp +} + func (t *tableWriter) close() { if t.rf != nil { t.rf.Close() @@ -393,6 +408,8 @@ func (t *tableWriter) Close() { t.Lock() defer t.Unlock() + t.closed = true + t.close() } @@ -502,6 +519,10 @@ func (t *tableWriter) StoreLog(l *Log) error { t.Lock() defer t.Unlock() + if t.closed { + return fmt.Errorf("table writer is closed") + } + if t.last > 0 && l.ID != t.last+1 { return ErrStoreLogID } @@ -539,8 +560,16 @@ func (t *tableWriter) StoreLog(l *Log) error { t.last = l.ID + t.lastTime = l.CreateTime + //todo add LRU cache + if t.syncType == 2 || (t.syncType == 1 && time.Now().Unix()-int64(t.lastTime) > 1) { + if err := t.wf.Sync(); err != nil { + log.Error("sync table error %s", err.Error()) + } + } + return nil } @@ -549,7 +578,7 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { defer t.RUnlock() if id < t.first || id > t.last { - return fmt.Errorf("log %d not in [%d=%d]", id, t.first, t.last) + return ErrLogNotFound } //todo memory cache diff --git a/rpl/rpl.go b/rpl/rpl.go index 067c0c2..5e9eca8 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -51,8 +51,16 @@ func NewReplication(cfg *config.Config) (*Replication, error) { r.cfg = cfg var err error - if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil { - return nil, err + + switch cfg.Replication.StoreName { + case "goleveldb": + if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil { + return nil, err + } + default: + if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg.Replication.MaxLogFileSize, cfg.Replication.SyncLog); err != nil { + return nil, err + } } if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil { diff --git a/rpl/store_test.go b/rpl/store_test.go index e84e286..cdec659 100644 --- a/rpl/store_test.go +++ b/rpl/store_test.go @@ -24,6 +24,24 @@ func TestGoLevelDBStore(t *testing.T) { testLogs(t, l) } +func TestFileStore(t *testing.T) { + // Create a test dir + dir, err := ioutil.TempDir("", "ldb") + if err != nil { + t.Fatalf("err: %v ", err) + } + defer os.RemoveAll(dir) + + // New level + l, err := NewFileStore(dir, 4096, 0) + if err != nil { + t.Fatalf("err: %v ", err) + } + defer l.Close() + + testLogs(t, l) +} + func testLogs(t *testing.T, l LogStore) { // Should be no first index idx, err := l.FirstID() @@ -45,14 +63,16 @@ func testLogs(t *testing.T, l LogStore) { // Try a filed fetch var out Log - if err := l.GetLog(10, &out); err.Error() != "log not found" { + if err := l.GetLog(10, &out); err != ErrLogNotFound { t.Fatalf("err: %v ", err) } + data := make([]byte, 1024) + // Write out a log log := Log{ ID: 1, - Data: []byte("first"), + Data: data, } for i := 1; i <= 10; i++ { log.ID = uint64(i) @@ -65,7 +85,7 @@ func testLogs(t *testing.T, l LogStore) { for i := 11; i <= 20; i++ { nl := &Log{ ID: uint64(i), - Data: []byte("first"), + Data: data, } if err := l.StoreLog(nl); err != nil { @@ -73,6 +93,11 @@ func testLogs(t *testing.T, l LogStore) { } } + // Try to fetch + if err := l.GetLog(1, &out); err != nil { + t.Fatalf("err: %v ", err) + } + // Try to fetch if err := l.GetLog(10, &out); err != nil { t.Fatalf("err: %v ", err) @@ -100,4 +125,39 @@ func testLogs(t *testing.T, l LogStore) { if idx != 20 { t.Fatalf("bad idx: %d", idx) } + + if err = l.Clear(); err != nil { + t.Fatalf("err :%v", err) + } + + // Check the lowest index + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Check the highest index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Write out a log + log = Log{ + ID: 1, + Data: data, + } + for i := 1; i <= 10; i++ { + log.ID = uint64(i) + if err := l.StoreLog(&log); err != nil { + t.Fatalf("err: %v", err) + } + } + } diff --git a/rpl/table_readers_test.go b/rpl/table_readers_test.go new file mode 100644 index 0000000..d0045e6 --- /dev/null +++ b/rpl/table_readers_test.go @@ -0,0 +1,38 @@ +package rpl + +import ( + "testing" +) + +func TestTableReaders(t *testing.T) { + ts := make(tableReaders, 0, 10) + + for i := uint64(0); i < 10; i++ { + t := new(tableReader) + t.index = int64(i) + 1 + t.first = i*10 + 1 + t.last = i*10 + 10 + + ts = append(ts, t) + } + + if err := ts.check(); err != nil { + t.Fatal(err) + } + + for i := 1; i <= 100; i++ { + if r := ts.Search(uint64(i)); r == nil { + t.Fatal("must hit", i) + } else if r.index != int64((i-1)/10)+1 { + t.Fatal("invalid index", r.index, i) + } + } + + if r := ts.Search(1000); r != nil { + t.Fatal("must not hit") + } + if r := ts.Search(0); r != nil { + t.Fatal("must not hit") + } + +}