From 053cee05a38874f526db310aefbde3f3b1e5f12a Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 15 Nov 2014 21:20:12 +0800 Subject: [PATCH] rpl add max log file num --- config/config.go | 3 +++ config/config.toml | 3 +++ etc/ledis.conf | 3 +++ rpl/file_store.go | 49 ++++++++++++++++++++++++++++++++-------------- rpl/rpl.go | 2 +- rpl/store_test.go | 7 +++++-- 6 files changed, 49 insertions(+), 18 deletions(-) diff --git a/config/config.go b/config/config.go index 1634fa0..f9fb6aa 100644 --- a/config/config.go +++ b/config/config.go @@ -76,6 +76,7 @@ type ReplicationConfig struct { ExpiredLogDays int `toml:"expired_log_days"` StoreName string `toml:"store_name"` MaxLogFileSize int64 `toml:"max_log_file_size"` + MaxLogFileNum int `toml:"max_log_file_num"` SyncLog int `toml:"sync_log"` Compression bool `toml:"compression"` } @@ -202,9 +203,11 @@ func (cfg *Config) adjust() { cfg.RocksDB.adjust() cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays) + cfg.Replication.MaxLogFileNum = getDefault(10, cfg.Replication.MaxLogFileNum) cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize) cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize) cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval) + } func (cfg *LevelDBConfig) adjust() { diff --git a/config/config.toml b/config/config.toml index 9f1f5f7..17b5d51 100644 --- a/config/config.toml +++ b/config/config.toml @@ -131,6 +131,9 @@ expired_log_days = 7 # for file store, if 0, use default 1G, max is 4G max_log_file_size = 0 +# for file store, if 0, use default 10 +max_log_file_num = 10 + # Sync log to disk if possible # 0: no sync # 1: sync every second diff --git a/etc/ledis.conf b/etc/ledis.conf index 9f1f5f7..17b5d51 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -131,6 +131,9 @@ expired_log_days = 7 # for file store, if 0, use default 1G, max is 4G max_log_file_size = 0 +# for file store, if 0, use default 10 +max_log_file_num = 10 + # Sync log to disk if possible # 0: no sync # 1: sync every second diff --git a/rpl/file_store.go b/rpl/file_store.go index 26a2c5c..823b1cd 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/siddontang/go/log" "github.com/siddontang/go/num" + "github.com/siddontang/ledisdb/config" "io/ioutil" "os" "sort" @@ -49,7 +50,7 @@ const ( type FileStore struct { LogStore - maxFileSize int64 + cfg *config.Config base string @@ -62,7 +63,7 @@ type FileStore struct { quit chan struct{} } -func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) { +func NewFileStore(base string, cfg *config.Config) (*FileStore, error) { s := new(FileStore) s.quit = make(chan struct{}) @@ -75,11 +76,14 @@ func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) s.base = base - s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize) - if s.maxFileSize == 0 { - s.maxFileSize = defaultMaxLogFileSize + if cfg.Replication.MaxLogFileSize == 0 { + cfg.Replication.MaxLogFileSize = defaultMaxLogFileSize } + cfg.Replication.MaxLogFileSize = num.MinInt64(cfg.Replication.MaxLogFileSize, maxLogFileSize) + + s.cfg = cfg + if err = s.load(); err != nil { return nil, err } @@ -89,8 +93,8 @@ func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) index = s.rs[len(s.rs)-1].index + 1 } - s.w = newTableWriter(s.base, index, s.maxFileSize) - s.w.SetSyncType(syncType) + s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize) + s.w.SetSyncType(cfg.Replication.SyncLog) go s.checkTableReaders() @@ -204,13 +208,7 @@ func (s *FileStore) PurgeExpired(n int64) error { s.rm.Unlock() - for _, r := range purges { - name := r.name - r.Close() - if err := os.Remove(name); err != nil { - log.Error("purge table %s err: %s", name, err.Error()) - } - } + s.purgeTableReaders(purges) return nil } @@ -244,7 +242,7 @@ func (s *FileStore) Clear() error { return err } - s.w = newTableWriter(s.base, 1, s.maxFileSize) + s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize) return nil } @@ -289,13 +287,34 @@ func (s *FileStore) checkTableReaders() { } } + purges := []*tableReader{} + maxNum := s.cfg.Replication.MaxLogFileNum + num := len(s.rs) + if num > maxNum { + purges = s.rs[:num-maxNum] + s.rs = s.rs[num-maxNum:] + } + s.rm.Unlock() + + s.purgeTableReaders(purges) + case <-s.quit: return } } } +func (s *FileStore) purgeTableReaders(purges []*tableReader) { + for _, r := range purges { + name := r.name + r.Close() + if err := os.Remove(name); err != nil { + log.Error("purge table %s err: %s", name, err.Error()) + } + } +} + func (s *FileStore) load() error { fs, err := ioutil.ReadDir(s.base) if err != nil { diff --git a/rpl/rpl.go b/rpl/rpl.go index e3bd54c..d942b49 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -58,7 +58,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) { return nil, err } default: - if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg.Replication.MaxLogFileSize, cfg.Replication.SyncLog); err != nil { + if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg); err != nil { return nil, err } } diff --git a/rpl/store_test.go b/rpl/store_test.go index cdec659..9b8febe 100644 --- a/rpl/store_test.go +++ b/rpl/store_test.go @@ -1,6 +1,7 @@ package rpl import ( + "github.com/siddontang/ledisdb/config" "io/ioutil" "os" "testing" @@ -33,7 +34,10 @@ func TestFileStore(t *testing.T) { defer os.RemoveAll(dir) // New level - l, err := NewFileStore(dir, 4096, 0) + cfg := config.NewConfigDefault() + cfg.Replication.MaxLogFileSize = 4096 + + l, err := NewFileStore(dir, cfg) if err != nil { t.Fatalf("err: %v ", err) } @@ -51,7 +55,6 @@ func testLogs(t *testing.T, l LogStore) { if idx != 0 { t.Fatalf("bad idx: %d", idx) } - // Should be no last index idx, err = l.LastID() if err != nil {