diff --git a/replication/log.go b/ledis/binlog.go similarity index 64% rename from replication/log.go rename to ledis/binlog.go index 4cdc460..c5df748 100644 --- a/replication/log.go +++ b/ledis/binlog.go @@ -1,8 +1,9 @@ -package replication +package ledis import ( "bufio" - "errors" + "encoding/binary" + "encoding/json" "fmt" "github.com/siddontang/go-log/log" "io/ioutil" @@ -10,27 +11,56 @@ import ( "path" "strconv" "strings" + "time" ) -var ( - ErrOverSpaceLimit = errors.New("total log files exceed space limit") +const ( + MaxBinLogFileSize int = 1024 * 1024 * 1024 + MaxBinLogFileNum int = 10000 + + DefaultBinLogFileSize int = MaxBinLogFileSize + DefaultBinLogFileNum int = 10 ) -type logHandler interface { - Write(wb *bufio.Writer, data []byte) (int, error) -} +/* +index file format: +ledis-bin.00001 +ledis-bin.00002 +ledis-bin.00003 -type LogConfig struct { +log file format + +timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData + +*/ + +type BinLogConfig struct { Name string `json:"name"` - LogType string `json:"log_type"` Path string `json:"path"` MaxFileSize int `json:"max_file_size"` MaxFileNum int `json:"max_file_num"` - SpaceLimit int64 `json:"space_limit"` } -type Log struct { - cfg *LogConfig +func (cfg *BinLogConfig) adjust() { + if cfg.MaxFileSize <= 0 { + cfg.MaxFileSize = DefaultBinLogFileSize + } else if cfg.MaxFileSize > MaxBinLogFileSize { + cfg.MaxFileSize = MaxBinLogFileSize + } + + if cfg.MaxFileNum <= 0 { + cfg.MaxFileNum = DefaultBinLogFileNum + } else if cfg.MaxFileNum > MaxBinLogFileNum { + cfg.MaxFileNum = MaxBinLogFileNum + } + + if len(cfg.Name) == 0 { + cfg.Name = "ledis" + } +} + +type BinLog struct { + cfg *BinLogConfig logFile *os.File @@ -39,28 +69,30 @@ type Log struct { indexName string logNames []string lastLogIndex int - - space int64 - - handler logHandler } -func newLog(handler logHandler, cfg *LogConfig) (*Log, error) { - l := new(Log) +func NewBinLog(data json.RawMessage) (*BinLog, error) { + var cfg BinLogConfig + + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, err + } + + return NewBinLogWithConfig(&cfg) +} + +func NewBinLogWithConfig(cfg *BinLogConfig) (*BinLog, error) { + cfg.adjust() + + l := new(BinLog) l.cfg = cfg - l.handler = handler - - if len(l.cfg.Name) == 0 { - l.cfg.Name = "ledis" - } if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { return nil, err } l.logNames = make([]string, 0, 16) - l.space = 0 if err := l.loadIndex(); err != nil { return nil, err @@ -69,7 +101,7 @@ func newLog(handler logHandler, cfg *LogConfig) (*Log, error) { return l, nil } -func (l *Log) flushIndex() error { +func (l *BinLog) flushIndex() error { data := strings.Join(l.logNames, "\n") bakName := fmt.Sprintf("%s.bak", l.indexName) @@ -95,8 +127,8 @@ func (l *Log) flushIndex() error { return nil } -func (l *Log) loadIndex() error { - l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("%s-%s.index", l.cfg.Name, l.cfg.LogType)) +func (l *BinLog) loadIndex() error { + l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("%s-bin.index", l.cfg.Name)) if _, err := os.Stat(l.indexName); os.IsNotExist(err) { //no index file, nothing to do } else { @@ -112,12 +144,10 @@ func (l *Log) loadIndex() error { continue } - if st, err := os.Stat(path.Join(l.cfg.Path, line)); err != nil { + if _, err := os.Stat(path.Join(l.cfg.Path, line)); err != nil { log.Error("load index line %s error %s", line, err.Error()) return err } else { - l.space += st.Size() - l.logNames = append(l.logNames, line) } } @@ -147,11 +177,11 @@ func (l *Log) loadIndex() error { return nil } -func (l *Log) getLogFile() string { - return fmt.Sprintf("%s-%s.%07d", l.cfg.Name, l.cfg.LogType, l.lastLogIndex) +func (l *BinLog) getLogFile() string { + return fmt.Sprintf("%s-bin.%07d", l.cfg.Name, l.lastLogIndex) } -func (l *Log) openNewLogFile() error { +func (l *BinLog) openNewLogFile() error { var err error lastName := l.getLogFile() @@ -180,7 +210,7 @@ func (l *Log) openNewLogFile() error { return nil } -func (l *Log) checkLogFileSize() bool { +func (l *BinLog) checkLogFileSize() bool { if l.logFile == nil { return false } @@ -197,15 +227,9 @@ func (l *Log) checkLogFileSize() bool { return false } -func (l *Log) purge(n int) { +func (l *BinLog) purge(n int) { for i := 0; i < n; i++ { logPath := path.Join(l.cfg.Path, l.logNames[i]) - if st, err := os.Stat(logPath); err != nil { - log.Error("purge %s error %s", logPath, err.Error()) - } else { - l.space -= st.Size() - } - os.Remove(logPath) } @@ -213,22 +237,22 @@ func (l *Log) purge(n int) { l.logNames = l.logNames[0 : len(l.logNames)-n] } -func (l *Log) Close() { +func (l *BinLog) Close() { if l.logFile != nil { l.logFile.Close() l.logFile = nil } } -func (l *Log) LogNames() []string { +func (l *BinLog) LogNames() []string { return l.logNames } -func (l *Log) LogFileName() string { +func (l *BinLog) LogFileName() string { return l.getLogFile() } -func (l *Log) LogFilePos() int64 { +func (l *BinLog) LogFilePos() int64 { if l.logFile == nil { return 0 } else { @@ -237,7 +261,7 @@ func (l *Log) LogFilePos() int64 { } } -func (l *Log) Purge(n int) error { +func (l *BinLog) Purge(n int) error { if len(l.logNames) == 0 { return nil } @@ -255,11 +279,7 @@ func (l *Log) Purge(n int) error { return l.flushIndex() } -func (l *Log) Log(args ...[]byte) error { - if l.cfg.SpaceLimit > 0 && l.space >= l.cfg.SpaceLimit { - return ErrOverSpaceLimit - } - +func (l *BinLog) Log(args ...[]byte) error { var err error if l.logFile == nil { @@ -268,17 +288,23 @@ func (l *Log) Log(args ...[]byte) error { } } - totalSize := 0 + //we treat log many args as a batch, so use same createTime + createTime := uint32(time.Now().Unix()) - var n int = 0 for _, data := range args { - if n, err = l.handler.Write(l.logWb, data); err != nil { - log.Error("write log error %s", err.Error()) + payLoadLen := uint32(len(data)) + + if err := binary.Write(l.logWb, binary.BigEndian, createTime); err != nil { return err - } else { - totalSize += n } + if err := binary.Write(l.logWb, binary.BigEndian, payLoadLen); err != nil { + return err + } + + if _, err := l.logWb.Write(data); err != nil { + return err + } } if err = l.logWb.Flush(); err != nil { @@ -286,8 +312,6 @@ func (l *Log) Log(args ...[]byte) error { return err } - l.space += int64(totalSize) - l.checkLogFileSize() return nil diff --git a/replication/binlog_test.go b/ledis/binlog_test.go similarity index 96% rename from replication/binlog_test.go rename to ledis/binlog_test.go index 48263ae..995a8d1 100644 --- a/replication/binlog_test.go +++ b/ledis/binlog_test.go @@ -1,4 +1,4 @@ -package replication +package ledis import ( "io/ioutil" diff --git a/ledis/ledis.go b/ledis/ledis.go index d9425b5..0cf108a 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "github.com/siddontang/go-leveldb/leveldb" - "github.com/siddontang/ledisdb/replication" "path" "sync" ) @@ -18,7 +17,7 @@ type Config struct { UseBinLog bool `json:"use_bin_log"` //if you not set bin log path, use data_dir/bin_log - BinLog replication.BinLogConfig `json:"bin_log"` + BinLog BinLogConfig `json:"bin_log"` } type DB struct { @@ -42,7 +41,7 @@ type Ledis struct { ldb *leveldb.DB dbs [MaxDBNumber]*DB - binlog *replication.Log + binlog *BinLog quit chan struct{} } @@ -81,7 +80,7 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) { if len(cfg.BinLog.Path) == 0 { cfg.BinLog.Path = path.Join(cfg.DataDir, "bin_log") } - l.binlog, err = replication.NewBinLogWithConfig(&cfg.BinLog) + l.binlog, err = NewBinLogWithConfig(&cfg.BinLog) if err != nil { return nil, err } diff --git a/ledis/replication.go b/ledis/replication.go index 8de0bc9..9a2cb1b 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -14,7 +14,7 @@ var ( errInvalidBinLogEvent = errors.New("invalid binglog event") ) -func (l *Ledis) replicateEvent(event []byte) error { +func (l *Ledis) ReplicateEvent(event []byte) error { if len(event) == 0 { return errInvalidBinLogEvent } @@ -70,8 +70,8 @@ func (l *Ledis) replicateCommandEvent(event []byte) error { return errors.New("command event not supported now") } -func (l *Ledis) RepliateRelayLog(relayLog string, offset int64) (int64, error) { - f, err := os.Open(relayLog) +func (l *Ledis) RepliateFromBinLog(filePath string, offset int64) (int64, error) { + f, err := os.Open(filePath) if err != nil { return 0, err } @@ -114,7 +114,7 @@ func (l *Ledis) RepliateRelayLog(relayLog string, offset int64) (int64, error) { } l.Lock() - err = l.replicateEvent(dataBuf.Bytes()) + err = l.ReplicateEvent(dataBuf.Bytes()) l.Unlock() if err != nil { log.Fatal("replication error %s, skip to next", err.Error()) diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 2fc02a4..2e4b977 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -38,14 +38,14 @@ func TestReplication(t *testing.T) { db.Set([]byte("b"), []byte("2")) db.Set([]byte("c"), []byte("3")) - relayLog := "/tmp/test_repl/master/bin_log/ledis-bin.0000001" + binLogName := "/tmp/test_repl/master/bin_log/ledis-bin.0000001" var offset int64 - offset, err = slave.RepliateRelayLog(relayLog, 0) + offset, err = slave.RepliateFromBinLog(binLogName, 0) if err != nil { t.Fatal(err) } else { - if st, err := os.Stat(relayLog); err != nil { + if st, err := os.Stat(binLogName); err != nil { t.Fatal(err) } else if st.Size() != offset { t.Fatal(st.Size(), offset) diff --git a/ledis/tx.go b/ledis/tx.go index 6c4c023..fa7379b 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -2,7 +2,6 @@ package ledis import ( "github.com/siddontang/go-leveldb/leveldb" - "github.com/siddontang/ledisdb/replication" "sync" ) @@ -12,7 +11,7 @@ type tx struct { l *Ledis wb *leveldb.WriteBatch - binlog *replication.Log + binlog *BinLog batch [][]byte } diff --git a/replication/binlog.go b/replication/binlog.go deleted file mode 100644 index ad9163c..0000000 --- a/replication/binlog.go +++ /dev/null @@ -1,89 +0,0 @@ -package replication - -import ( - "bufio" - "encoding/binary" - "encoding/json" - "time" -) - -const ( - MaxBinLogFileSize int = 1024 * 1024 * 1024 - MaxBinLogFileNum int = 10000 - - DefaultBinLogFileSize int = MaxBinLogFileSize - DefaultBinLogFileNum int = 10 -) - -/* -index file format: -ledis-bin.00001 -ledis-bin.00002 -ledis-bin.00003 - -log file format - -timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData - -*/ - -type BinLogConfig struct { - LogConfig -} - -func (cfg *BinLogConfig) adjust() { - if cfg.MaxFileSize <= 0 { - cfg.MaxFileSize = DefaultBinLogFileSize - } else if cfg.MaxFileSize > MaxBinLogFileSize { - cfg.MaxFileSize = MaxBinLogFileSize - } - - if cfg.MaxFileNum <= 0 { - cfg.MaxFileNum = DefaultBinLogFileNum - } else if cfg.MaxFileNum > MaxBinLogFileNum { - cfg.MaxFileNum = MaxBinLogFileNum - } - - //binlog not care space limit - cfg.SpaceLimit = -1 - - cfg.LogType = "bin" -} - -type binlogHandler struct { -} - -func (h *binlogHandler) Write(wb *bufio.Writer, data []byte) (int, error) { - createTime := uint32(time.Now().Unix()) - payLoadLen := uint32(len(data)) - - if err := binary.Write(wb, binary.BigEndian, createTime); err != nil { - return 0, err - } - - if err := binary.Write(wb, binary.BigEndian, payLoadLen); err != nil { - return 0, err - } - - if _, err := wb.Write(data); err != nil { - return 0, err - } - - return 8 + len(data), nil -} - -func NewBinLog(data json.RawMessage) (*Log, error) { - var cfg BinLogConfig - - if err := json.Unmarshal(data, &cfg); err != nil { - return nil, err - } - - return NewBinLogWithConfig(&cfg) -} - -func NewBinLogWithConfig(cfg *BinLogConfig) (*Log, error) { - cfg.adjust() - - return newLog(new(binlogHandler), &cfg.LogConfig) -} diff --git a/replication/relaylog.go b/replication/relaylog.go deleted file mode 100644 index 4ee2a4a..0000000 --- a/replication/relaylog.go +++ /dev/null @@ -1,50 +0,0 @@ -package replication - -import ( - "bufio" - "encoding/json" -) - -const ( - MaxRelayLogFileSize int = 1024 * 1024 * 1024 - DefaultRelayLogFileSize int = MaxRelayLogFileSize -) - -type RelayLogConfig struct { - LogConfig -} - -func (cfg *RelayLogConfig) adjust() { - if cfg.MaxFileSize <= 0 { - cfg.MaxFileSize = DefaultRelayLogFileSize - } else if cfg.MaxFileSize > MaxRelayLogFileSize { - cfg.MaxFileSize = MaxRelayLogFileSize - } - - //relaylog not care file num - cfg.MaxFileNum = -1 - cfg.LogType = "relay" -} - -type relayLogHandler struct { -} - -func (h *relayLogHandler) Write(wb *bufio.Writer, data []byte) (int, error) { - return wb.Write(data) -} - -func NewRelayLog(data json.RawMessage) (*Log, error) { - var cfg RelayLogConfig - - if err := json.Unmarshal(data, &cfg); err != nil { - return nil, err - } - - return NewRelayLogWithConfig(&cfg) -} - -func NewRelayLogWithConfig(cfg *RelayLogConfig) (*Log, error) { - cfg.adjust() - - return newLog(new(relayLogHandler), &cfg.LogConfig) -} diff --git a/replication/relaylog_test.go b/replication/relaylog_test.go deleted file mode 100644 index 0cde67b..0000000 --- a/replication/relaylog_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package replication - -import ( - "os" - "testing" -) - -func TestRelayLog(t *testing.T) { - cfg := new(RelayLogConfig) - - cfg.MaxFileSize = 1024 - cfg.SpaceLimit = 1024 - cfg.Path = "/tmp/ledis_relaylog" - cfg.Name = "ledis" - - os.RemoveAll(cfg.Path) - - b, err := NewRelayLogWithConfig(cfg) - if err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1)); err == nil { - t.Fatal("must not nil") - } else if err != ErrOverSpaceLimit { - t.Fatal(err) - } - - if err := b.Purge(1); err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1)); err != nil { - t.Fatal(err) - } - -} diff --git a/server/app.go b/server/app.go index e293a14..cf76049 100644 --- a/server/app.go +++ b/server/app.go @@ -3,7 +3,6 @@ package server import ( "fmt" "github.com/siddontang/ledisdb/ledis" - "github.com/siddontang/ledisdb/replication" "net" "strings" ) @@ -19,8 +18,6 @@ type App struct { slaveMode bool - relayLog *replication.Log - quit chan struct{} } @@ -57,10 +54,6 @@ func NewApp(cfg *Config) (*App, error) { if len(app.cfg.SlaveOf) > 0 { app.slaveMode = true - - if app.relayLog, err = replication.NewRelayLogWithConfig(&cfg.RelayLog); err != nil { - return nil, err - } } if app.ldb, err = ledis.OpenWithConfig(&cfg.DB); err != nil { diff --git a/server/config.go b/server/config.go index 518a083..80214c3 100644 --- a/server/config.go +++ b/server/config.go @@ -3,7 +3,6 @@ package server import ( "encoding/json" "github.com/siddontang/ledisdb/ledis" - "github.com/siddontang/ledisdb/replication" "io/ioutil" ) @@ -18,9 +17,6 @@ type Config struct { //set slaveof to enable replication from master //empty, no replication SlaveOf string `json:"slaveof"` - - //if you not set relay log path, use data_dir/realy_log - RelayLog replication.RelayLogConfig `json:"relay_log"` } func NewConfig(data json.RawMessage) (*Config, error) {