diff --git a/ledis/ledis.go b/ledis/ledis.go index fb668fe..f0bdf09 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -33,7 +33,7 @@ type Ledis struct { ldb *leveldb.DB dbs [MaxDBNumber]*DB - binlog *replication.BinLog + binlog *replication.Log } func Open(configJson json.RawMessage) (*Ledis, error) { diff --git a/ledis/tx.go b/ledis/tx.go index fc4357d..6c4c023 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -12,7 +12,7 @@ type tx struct { l *Ledis wb *leveldb.WriteBatch - binlog *replication.BinLog + binlog *replication.Log batch [][]byte } diff --git a/replication/binlog.go b/replication/binlog.go index 435e297..677e087 100644 --- a/replication/binlog.go +++ b/replication/binlog.go @@ -4,13 +4,6 @@ import ( "bufio" "encoding/binary" "encoding/json" - "fmt" - "github.com/siddontang/go-log/log" - "io" - "os" - "path" - "strconv" - "strings" "time" ) @@ -35,11 +28,7 @@ timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData|Lo */ type BinLogConfig struct { - BaseName string `json:"base_name"` - IndexName string `json:"index_name"` - Path string `json:"path"` - MaxFileSize int `json:"max_file_size"` - MaxFileNum int `json:"max_file_num"` + LogConfig } func (cfg *BinLogConfig) adjust() { @@ -61,21 +50,35 @@ func (cfg *BinLogConfig) adjust() { if len(cfg.IndexName) == 0 { cfg.IndexName = "ledis" } + + cfg.SpaceLimit = -1 + + cfg.LogType = "bin" } -type BinLog struct { - cfg *BinLogConfig - - logFile *os.File - - logWb *bufio.Writer - - indexName string - logNames []string - lastLogIndex int +type binlogHandler struct { } -func NewBinLog(data json.RawMessage) (*BinLog, error) { +func (h *binlogHandler) Write(wb *bufio.Writer, data []byte) (int, error) { + createTime := uint32(time.Now().Unix()) + payLoadLen := uint32(len(data)) + + if err := binary.Write(wb, binary.BigEndian, createTime); err != nil { + return 0, err + } + + if err := binary.Write(wb, binary.BigEndian, payLoadLen); err != nil { + return 0, err + } + + if _, err := wb.Write(data); err != nil { + return 0, err + } + + return 8 + len(data), nil +} + +func NewBinLog(data json.RawMessage) (*Log, error) { var cfg BinLogConfig if err := json.Unmarshal(data, &cfg); err != nil { @@ -85,211 +88,8 @@ func NewBinLog(data json.RawMessage) (*BinLog, error) { return NewBinLogWithConfig(&cfg) } -func NewBinLogWithConfig(cfg *BinLogConfig) (*BinLog, error) { - b := new(BinLog) - +func NewBinLogWithConfig(cfg *BinLogConfig) (*Log, error) { cfg.adjust() - b.cfg = cfg - - if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { - return nil, err - } - - b.logNames = make([]string, 0, b.cfg.MaxFileNum) - - if err := b.loadIndex(); err != nil { - return nil, err - } - - return b, nil -} - -func (b *BinLog) Close() { - if b.logFile != nil { - b.logFile.Close() - } -} - -func (b *BinLog) deleteOldest() { - logPath := path.Join(b.cfg.Path, b.logNames[0]) - os.Remove(logPath) - - copy(b.logNames[0:], b.logNames[1:]) - b.logNames = b.logNames[0 : len(b.logNames)-1] -} - -func (b *BinLog) flushIndex() error { - data := strings.Join(b.logNames, "\n") - - bakName := fmt.Sprintf("%s.bak", b.indexName) - f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - log.Error("create binlog bak index error %s", err.Error()) - return err - } - - if _, err := f.WriteString(data); err != nil { - log.Error("write binlog index error %s", err.Error()) - f.Close() - return err - } - - f.Close() - - if err := os.Rename(bakName, b.indexName); err != nil { - log.Error("rename binlog bak index error %s", err.Error()) - return err - } - - return nil -} - -func (b *BinLog) loadIndex() error { - b.indexName = path.Join(b.cfg.Path, fmt.Sprintf("%s-bin.index", b.cfg.IndexName)) - fd, err := os.OpenFile(b.indexName, os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - return err - } - - //maybe we will check valid later? - rb := bufio.NewReader(fd) - for { - line, err := rb.ReadString('\n') - if err != nil && err != io.EOF { - fd.Close() - return err - } - - line = strings.Trim(line, "\r\n ") - - if len(line) > 0 { - b.logNames = append(b.logNames, line) - } - - if len(b.logNames) == b.cfg.MaxFileNum { - //remove oldest logfile - b.deleteOldest() - } - - if err == io.EOF { - break - } - } - - fd.Close() - - if err := b.flushIndex(); err != nil { - return err - } - - if len(b.logNames) == 0 { - b.lastLogIndex = 1 - } else { - lastName := b.logNames[len(b.logNames)-1] - - if b.lastLogIndex, err = strconv.Atoi(path.Ext(lastName)[1:]); err != nil { - log.Error("invalid logfile name %s", err.Error()) - return err - } - - //like mysql, if server restart, a new binlog will create - b.lastLogIndex++ - } - - return nil -} - -func (b *BinLog) getLogName() string { - return fmt.Sprintf("%s-bin.%05d", b.cfg.BaseName, b.lastLogIndex) -} - -func (b *BinLog) openNewLogFile() error { - var err error - lastName := b.getLogName() - - logPath := path.Join(b.cfg.Path, lastName) - if b.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil { - log.Error("open new logfile error %s", err.Error()) - return err - } - - if len(b.logNames) == b.cfg.MaxFileNum { - b.deleteOldest() - } - - b.logNames = append(b.logNames, lastName) - - if b.logWb == nil { - b.logWb = bufio.NewWriterSize(b.logFile, 1024) - } else { - b.logWb.Reset(b.logFile) - } - - if err = b.flushIndex(); err != nil { - return err - } - - return nil -} - -func (b *BinLog) openLogFile() error { - if b.logFile == nil { - return b.openNewLogFile() - } else { - //check file size - st, _ := b.logFile.Stat() - if st.Size() >= int64(b.cfg.MaxFileSize) { - //must use new file - b.lastLogIndex++ - - b.logFile.Close() - - return b.openNewLogFile() - } - } - - return nil -} - -func (b *BinLog) Log(args ...[]byte) error { - var err error - - if err = b.openLogFile(); err != nil { - return err - } - - for _, data := range args { - createTime := uint32(time.Now().Unix()) - payLoadLen := len(data) - - binary.Write(b.logWb, binary.BigEndian, createTime) - binary.Write(b.logWb, binary.BigEndian, payLoadLen) - - b.logWb.Write(data) - - if err = b.logWb.Flush(); err != nil { - log.Error("write log error %s", err.Error()) - return err - } - } - - return nil -} - -func (b *BinLog) LogFileName() string { - if len(b.logNames) == 0 { - return "" - } else { - return b.logNames[len(b.logNames)-1] - } -} - -func (b *BinLog) LogFilePos() int64 { - if b.logFile == nil { - return 0 - } else { - st, _ := b.logFile.Stat() - return st.Size() - } + return newLog(new(binlogHandler), &cfg.LogConfig) } diff --git a/replication/log.go b/replication/log.go new file mode 100644 index 0000000..e61e59d --- /dev/null +++ b/replication/log.go @@ -0,0 +1,287 @@ +package replication + +import ( + "bufio" + "errors" + "fmt" + "github.com/siddontang/go-log/log" + "io/ioutil" + "os" + "path" + "strconv" + "strings" +) + +var ( + ErrOverSpaceLimit = errors.New("total log files exceed space limit") +) + +type logHandler interface { + Write(wb *bufio.Writer, data []byte) (int, error) +} + +type LogConfig struct { + BaseName string `json:"base_name"` + IndexName string `json:"index_name"` + LogType string `json:"log_type"` + Path string `json:"path"` + MaxFileSize int `json:"max_file_size"` + MaxFileNum int `json:"max_file_num"` + SpaceLimit int64 `json:"space_limit"` +} + +type Log struct { + cfg *LogConfig + + logFile *os.File + + logWb *bufio.Writer + + indexName string + logNames []string + lastLogIndex int + + space int64 + + handler logHandler +} + +func newLog(handler logHandler, cfg *LogConfig) (*Log, error) { + l := new(Log) + + l.cfg = cfg + l.handler = handler + + if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil { + return nil, err + } + + l.logNames = make([]string, 0, 16) + l.space = 0 + + if err := l.loadIndex(); err != nil { + return nil, err + } + + return l, nil +} + +func (l *Log) flushIndex() error { + data := strings.Join(l.logNames, "\n") + + bakName := fmt.Sprintf("%s.bak", l.indexName) + f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + log.Error("create binlog bak index error %s", err.Error()) + return err + } + + if _, err := f.WriteString(data); err != nil { + log.Error("write binlog index error %s", err.Error()) + f.Close() + return err + } + + f.Close() + + if err := os.Rename(bakName, l.indexName); err != nil { + log.Error("rename binlog bak index error %s", err.Error()) + return err + } + + return nil +} + +func (l *Log) loadIndex() error { + l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("%s-%s.index", l.cfg.IndexName, l.cfg.LogType)) + if _, err := os.Stat(l.indexName); os.IsNotExist(err) { + //no index file, nothing to do + } else { + indexData, err := ioutil.ReadFile(l.indexName) + if err != nil { + return err + } + + lines := strings.Split(string(indexData), "\n") + for _, line := range lines { + line = strings.Trim(line, "\r\n ") + if len(line) == 0 { + continue + } + + if st, err := os.Stat(path.Join(l.cfg.Path, line)); err != nil { + log.Error("load index line %s error %s", line, err.Error()) + return err + } else { + l.space += st.Size() + + l.logNames = append(l.logNames, line) + } + } + } + if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum { + //remove oldest logfile + if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil { + return err + } + } + + var err error + if len(l.logNames) == 0 { + l.lastLogIndex = 1 + } else { + lastName := l.logNames[len(l.logNames)-1] + + if l.lastLogIndex, err = strconv.Atoi(path.Ext(lastName)[1:]); err != nil { + log.Error("invalid logfile name %s", err.Error()) + return err + } + + //like mysql, if server restart, a new binlog will create + l.lastLogIndex++ + } + + return nil +} + +func (l *Log) getLogFile() string { + return fmt.Sprintf("%s-%s.%05d", l.cfg.BaseName, l.cfg.LogType, l.lastLogIndex) +} + +func (l *Log) openNewLogFile() error { + var err error + lastName := l.getLogFile() + + logPath := path.Join(l.cfg.Path, lastName) + if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil { + log.Error("open new logfile error %s", err.Error()) + return err + } + + if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum { + l.purge(1) + } + + l.logNames = append(l.logNames, lastName) + + if l.logWb == nil { + l.logWb = bufio.NewWriterSize(l.logFile, 1024) + } else { + l.logWb.Reset(l.logFile) + } + + if err = l.flushIndex(); err != nil { + return err + } + + return nil +} + +func (l *Log) checkLogFileSize() bool { + if l.logFile == nil { + return false + } + + st, _ := l.logFile.Stat() + if st.Size() >= int64(l.cfg.MaxFileSize) { + l.lastLogIndex++ + + l.logFile.Close() + l.logFile = nil + return true + } + + return false +} + +func (l *Log) purge(n int) { + for i := 0; i < n; i++ { + logPath := path.Join(l.cfg.Path, l.logNames[i]) + if st, err := os.Stat(logPath); err != nil { + log.Error("purge %s error %s", logPath, err.Error()) + } else { + l.space -= st.Size() + } + + os.Remove(logPath) + } + + copy(l.logNames[0:], l.logNames[n:]) + l.logNames = l.logNames[0 : len(l.logNames)-n] +} + +func (l *Log) Close() { + if l.logFile != nil { + l.logFile.Close() + l.logFile = nil + } +} + +func (l *Log) LogFileName() string { + return l.getLogFile() +} + +func (l *Log) LogFilePos() int64 { + if l.logFile == nil { + return 0 + } else { + st, _ := l.logFile.Stat() + return st.Size() + } +} + +func (l *Log) Purge(n int) error { + if len(l.logNames) == 0 { + return nil + } + + if n >= len(l.logNames) { + n = len(l.logNames) + //can not purge current log file + if l.logNames[n-1] == l.getLogFile() { + n = n - 1 + } + } + + l.purge(n) + + return l.flushIndex() +} + +func (l *Log) Log(args ...[]byte) error { + if l.cfg.SpaceLimit > 0 && l.space >= l.cfg.SpaceLimit { + return ErrOverSpaceLimit + } + + var err error + + if l.logFile == nil { + if err = l.openNewLogFile(); err != nil { + return err + } + } + + totalSize := 0 + + var n int = 0 + for _, data := range args { + if n, err = l.handler.Write(l.logWb, data); err != nil { + log.Error("write log error %s", err.Error()) + return err + } else { + totalSize += n + } + + } + + if err = l.logWb.Flush(); err != nil { + log.Error("write log error %s", err.Error()) + return err + } + + l.space += int64(totalSize) + + l.checkLogFileSize() + + return nil +} diff --git a/replication/relaylog.go b/replication/relaylog.go index 1043568..9f2b7a4 100644 --- a/replication/relaylog.go +++ b/replication/relaylog.go @@ -1,4 +1,56 @@ package replication -type RelayLog struct { +import ( + "bufio" + "encoding/json" +) + +const ( + MaxRelayLogFileSize int = 1024 * 1024 * 1024 + DefaultRelayLogFileSize int = MaxRelayLogFileSize +) + +type RelayLogConfig struct { + LogConfig +} + +func (cfg *RelayLogConfig) adjust() { + if cfg.MaxFileSize <= 0 { + cfg.MaxFileSize = DefaultRelayLogFileSize + } else if cfg.MaxFileSize > MaxRelayLogFileSize { + cfg.MaxFileSize = MaxRelayLogFileSize + } + + if len(cfg.BaseName) == 0 { + cfg.BaseName = "ledis" + } + if len(cfg.IndexName) == 0 { + cfg.IndexName = "ledis" + } + + cfg.MaxFileNum = -1 + cfg.LogType = "relay" +} + +type relayLogHandler struct { +} + +func (h *relayLogHandler) Write(wb *bufio.Writer, data []byte) (int, error) { + return wb.Write(data) +} + +func NewRelayLog(data json.RawMessage) (*Log, error) { + var cfg RelayLogConfig + + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, err + } + + return NewRelayLogWithConfig(&cfg) +} + +func NewRelayLogWithConfig(cfg *RelayLogConfig) (*Log, error) { + cfg.adjust() + + return newLog(new(relayLogHandler), &cfg.LogConfig) } diff --git a/replication/relaylog_test.go b/replication/relaylog_test.go index d29eac0..2e37af8 100644 --- a/replication/relaylog_test.go +++ b/replication/relaylog_test.go @@ -1,9 +1,40 @@ package replication import ( + "os" "testing" ) func TestRelayLog(t *testing.T) { + cfg := new(RelayLogConfig) + + cfg.MaxFileSize = 1024 + cfg.SpaceLimit = 1024 + cfg.Path = "/tmp/ledis_relaylog" + + os.RemoveAll(cfg.Path) + + b, err := NewRelayLogWithConfig(cfg) + if err != nil { + t.Fatal(err) + } + + if err := b.Log(make([]byte, 1024)); err != nil { + t.Fatal(err) + } + + if err := b.Log(make([]byte, 1)); err == nil { + t.Fatal("must not nil") + } else if err != ErrOverSpaceLimit { + t.Fatal(err) + } + + if err := b.Purge(1); err != nil { + t.Fatal(err) + } + + if err := b.Log(make([]byte, 1)); err != nil { + t.Fatal(err) + } }