forked from mirror/ledisdb
rpl add max log file num
This commit is contained in:
parent
c737ec82ff
commit
053cee05a3
|
@ -76,6 +76,7 @@ type ReplicationConfig struct {
|
|||
ExpiredLogDays int `toml:"expired_log_days"`
|
||||
StoreName string `toml:"store_name"`
|
||||
MaxLogFileSize int64 `toml:"max_log_file_size"`
|
||||
MaxLogFileNum int `toml:"max_log_file_num"`
|
||||
SyncLog int `toml:"sync_log"`
|
||||
Compression bool `toml:"compression"`
|
||||
}
|
||||
|
@ -202,9 +203,11 @@ func (cfg *Config) adjust() {
|
|||
cfg.RocksDB.adjust()
|
||||
|
||||
cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays)
|
||||
cfg.Replication.MaxLogFileNum = getDefault(10, cfg.Replication.MaxLogFileNum)
|
||||
cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize)
|
||||
cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize)
|
||||
cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval)
|
||||
|
||||
}
|
||||
|
||||
func (cfg *LevelDBConfig) adjust() {
|
||||
|
|
|
@ -131,6 +131,9 @@ expired_log_days = 7
|
|||
# for file store, if 0, use default 1G, max is 4G
|
||||
max_log_file_size = 0
|
||||
|
||||
# for file store, if 0, use default 10
|
||||
max_log_file_num = 10
|
||||
|
||||
# Sync log to disk if possible
|
||||
# 0: no sync
|
||||
# 1: sync every second
|
||||
|
|
|
@ -131,6 +131,9 @@ expired_log_days = 7
|
|||
# for file store, if 0, use default 1G, max is 4G
|
||||
max_log_file_size = 0
|
||||
|
||||
# for file store, if 0, use default 10
|
||||
max_log_file_num = 10
|
||||
|
||||
# Sync log to disk if possible
|
||||
# 0: no sync
|
||||
# 1: sync every second
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/siddontang/go/log"
|
||||
"github.com/siddontang/go/num"
|
||||
"github.com/siddontang/ledisdb/config"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
|
@ -49,7 +50,7 @@ const (
|
|||
type FileStore struct {
|
||||
LogStore
|
||||
|
||||
maxFileSize int64
|
||||
cfg *config.Config
|
||||
|
||||
base string
|
||||
|
||||
|
@ -62,7 +63,7 @@ type FileStore struct {
|
|||
quit chan struct{}
|
||||
}
|
||||
|
||||
func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) {
|
||||
func NewFileStore(base string, cfg *config.Config) (*FileStore, error) {
|
||||
s := new(FileStore)
|
||||
|
||||
s.quit = make(chan struct{})
|
||||
|
@ -75,11 +76,14 @@ func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error)
|
|||
|
||||
s.base = base
|
||||
|
||||
s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize)
|
||||
if s.maxFileSize == 0 {
|
||||
s.maxFileSize = defaultMaxLogFileSize
|
||||
if cfg.Replication.MaxLogFileSize == 0 {
|
||||
cfg.Replication.MaxLogFileSize = defaultMaxLogFileSize
|
||||
}
|
||||
|
||||
cfg.Replication.MaxLogFileSize = num.MinInt64(cfg.Replication.MaxLogFileSize, maxLogFileSize)
|
||||
|
||||
s.cfg = cfg
|
||||
|
||||
if err = s.load(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -89,8 +93,8 @@ func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error)
|
|||
index = s.rs[len(s.rs)-1].index + 1
|
||||
}
|
||||
|
||||
s.w = newTableWriter(s.base, index, s.maxFileSize)
|
||||
s.w.SetSyncType(syncType)
|
||||
s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize)
|
||||
s.w.SetSyncType(cfg.Replication.SyncLog)
|
||||
|
||||
go s.checkTableReaders()
|
||||
|
||||
|
@ -204,13 +208,7 @@ func (s *FileStore) PurgeExpired(n int64) error {
|
|||
|
||||
s.rm.Unlock()
|
||||
|
||||
for _, r := range purges {
|
||||
name := r.name
|
||||
r.Close()
|
||||
if err := os.Remove(name); err != nil {
|
||||
log.Error("purge table %s err: %s", name, err.Error())
|
||||
}
|
||||
}
|
||||
s.purgeTableReaders(purges)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -244,7 +242,7 @@ func (s *FileStore) Clear() error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.w = newTableWriter(s.base, 1, s.maxFileSize)
|
||||
s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -289,13 +287,34 @@ func (s *FileStore) checkTableReaders() {
|
|||
}
|
||||
}
|
||||
|
||||
purges := []*tableReader{}
|
||||
maxNum := s.cfg.Replication.MaxLogFileNum
|
||||
num := len(s.rs)
|
||||
if num > maxNum {
|
||||
purges = s.rs[:num-maxNum]
|
||||
s.rs = s.rs[num-maxNum:]
|
||||
}
|
||||
|
||||
s.rm.Unlock()
|
||||
|
||||
s.purgeTableReaders(purges)
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FileStore) purgeTableReaders(purges []*tableReader) {
|
||||
for _, r := range purges {
|
||||
name := r.name
|
||||
r.Close()
|
||||
if err := os.Remove(name); err != nil {
|
||||
log.Error("purge table %s err: %s", name, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FileStore) load() error {
|
||||
fs, err := ioutil.ReadDir(s.base)
|
||||
if err != nil {
|
||||
|
|
|
@ -58,7 +58,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
|
|||
return nil, err
|
||||
}
|
||||
default:
|
||||
if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg.Replication.MaxLogFileSize, cfg.Replication.SyncLog); err != nil {
|
||||
if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package rpl
|
||||
|
||||
import (
|
||||
"github.com/siddontang/ledisdb/config"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
@ -33,7 +34,10 @@ func TestFileStore(t *testing.T) {
|
|||
defer os.RemoveAll(dir)
|
||||
|
||||
// New level
|
||||
l, err := NewFileStore(dir, 4096, 0)
|
||||
cfg := config.NewConfigDefault()
|
||||
cfg.Replication.MaxLogFileSize = 4096
|
||||
|
||||
l, err := NewFileStore(dir, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
|
@ -51,7 +55,6 @@ func testLogs(t *testing.T, l LogStore) {
|
|||
if idx != 0 {
|
||||
t.Fatalf("bad idx: %d", idx)
|
||||
}
|
||||
|
||||
// Should be no last index
|
||||
idx, err = l.LastID()
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue