diff --git a/cmd/ledis-binlog/main.go b/cmd/ledis-binlog/main.go deleted file mode 100644 index 3725920..0000000 --- a/cmd/ledis-binlog/main.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "github.com/siddontang/ledisdb/ledis" - "os" - "time" -) - -var TimeFormat = "2006-01-02 15:04:05" - -var startDateTime = flag.String("start-datetime", "", - "Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.") -var stopDateTime = flag.String("stop-datetime", "", - "Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.") - -var startTime uint32 = 0 -var stopTime uint32 = 0xFFFFFFFF - -func main() { - flag.Usage = func() { - fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0]) - flag.PrintDefaults() - } - - flag.Parse() - - logFile := flag.Arg(0) - f, err := os.Open(logFile) - if err != nil { - println(err.Error()) - return - } - defer f.Close() - - var t time.Time - - if len(*startDateTime) > 0 { - if t, err = time.Parse(TimeFormat, *startDateTime); err != nil { - println("parse start-datetime error: ", err.Error()) - return - } - - startTime = uint32(t.Unix()) - } - - if len(*stopDateTime) > 0 { - if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil { - println("parse stop-datetime error: ", err.Error()) - return - } - - stopTime = uint32(t.Unix()) - } - - rb := bufio.NewReaderSize(f, 4096) - err = ledis.ReadEventFromReader(rb, printEvent) - if err != nil { - println("read event error: ", err.Error()) - return - } -} - -func printEvent(head *ledis.BinLogHead, event []byte) error { - if head.CreateTime < startTime || head.CreateTime > stopTime { - return nil - } - - t := time.Unix(int64(head.CreateTime), 0) - - fmt.Printf("%s ", t.Format(TimeFormat)) - - s, err := ledis.FormatBinLogEvent(event) - if err != nil { - fmt.Printf("%s", err.Error()) - } else { - fmt.Printf(s) - } - - fmt.Printf("\n") - - return nil -} diff --git a/cmd/ledis-load/main.go b/cmd/ledis-load/main.go index 34165b8..1edb250 100644 --- a/cmd/ledis-load/main.go +++ b/cmd/ledis-load/main.go @@ -57,18 +57,5 @@ func loadDump(cfg *config.Config, ldb *ledis.Ledis) error { return err } - var head *ledis.BinLogAnchor - head, err = ldb.LoadDumpFile(*dumpPath) - - if err != nil { - return err - } - - //master enable binlog, here output this like mysql - if head.LogFileIndex != 0 && head.LogPos != 0 { - format := "MASTER_LOG_FILE='binlog.%07d', MASTER_LOG_POS=%d;\n" - fmt.Printf(format, head.LogFileIndex, head.LogPos) - } - - return nil + return ldb.LoadDumpFile(*dumpPath) } diff --git a/config/config.go b/config/config.go index 1464edf..b54da8b 100644 --- a/config/config.go +++ b/config/config.go @@ -16,14 +16,6 @@ const ( DefaultDataDir string = "./var" ) -const ( - MaxBinLogFileSize int = 1024 * 1024 * 1024 - MaxBinLogFileNum int = 10000 - - DefaultBinLogFileSize int = MaxBinLogFileSize - DefaultBinLogFileNum int = 10 -) - type LevelDBConfig struct { Compression bool `toml:"compression"` BlockSize int `toml:"block_size"` @@ -37,9 +29,8 @@ type LMDBConfig struct { NoSync bool `toml:"nosync"` } -type BinLogConfig struct { - MaxFileSize int `toml:"max_file_size"` - MaxFileNum int `toml:"max_file_num"` +type WALConfig struct { + Path string `toml:"path"` } type Config struct { @@ -52,11 +43,13 @@ type Config struct { DBName string `toml:"db_name"` DBPath string `toml:"db_path"` + UseWAL bool `toml:use_wal` + LevelDB LevelDBConfig `toml:"leveldb"` LMDB LMDBConfig `toml:"lmdb"` - BinLog BinLogConfig `toml:"binlog"` + WAL WALConfig `toml:wal` SlaveOf string `toml:"slaveof"` @@ -93,10 +86,6 @@ func NewConfigDefault() *Config { cfg.DBName = DefaultDBName - // disable binlog - cfg.BinLog.MaxFileNum = 0 - cfg.BinLog.MaxFileSize = 0 - // disable replication cfg.SlaveOf = "" @@ -126,17 +115,3 @@ func (cfg *LevelDBConfig) Adjust() { cfg.MaxOpenFiles = 1024 } } - -func (cfg *BinLogConfig) Adjust() { - if cfg.MaxFileSize <= 0 { - cfg.MaxFileSize = DefaultBinLogFileSize - } else if cfg.MaxFileSize > MaxBinLogFileSize { - cfg.MaxFileSize = MaxBinLogFileSize - } - - if cfg.MaxFileNum <= 0 { - cfg.MaxFileNum = DefaultBinLogFileNum - } else if cfg.MaxFileNum > MaxBinLogFileNum { - cfg.MaxFileNum = MaxBinLogFileNum - } -} diff --git a/config/config.toml b/config/config.toml index f271a70..ae08c47 100644 --- a/config/config.toml +++ b/config/config.toml @@ -30,6 +30,8 @@ db_name = "leveldb" # if not set, use data_dir/"db_name"_data db_path = "" +use_wal = true + [leveldb] compression = false block_size = 32768 @@ -41,8 +43,10 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[binlog] -max_file_size = 0 -max_file_num = 0 +[wal] +# if not set, use data_dir/wal +path = "" + + diff --git a/etc/ledis.conf b/etc/ledis.conf index c0606eb..0d46aee 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -43,9 +43,8 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[binlog] -# Set either size or num to 0 to disable binlog -max_file_size = 0 -max_file_num = 0 +[wal] +# if not set, use data_dir/wal +path = "" diff --git a/ledis/batch.go b/ledis/batch.go index b23cc47..6f97457 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -12,9 +12,11 @@ type batch struct { sync.Locker - logs [][]byte + eb *eventBatch tx *Tx + + noLogging bool } func (b *batch) Commit() error { @@ -23,17 +25,6 @@ func (b *batch) Commit() error { err := b.WriteBatch.Commit() - if b.l.binlog != nil { - if err == nil { - if b.tx == nil { - b.l.binlog.Log(b.logs...) - } else { - b.tx.logs = append(b.tx.logs, b.logs...) - } - } - b.logs = [][]byte{} - } - return err } @@ -42,29 +33,28 @@ func (b *batch) Lock() { } func (b *batch) Unlock() { - if b.l.binlog != nil { - b.logs = [][]byte{} - } + b.noLogging = false b.WriteBatch.Rollback() b.Locker.Unlock() } func (b *batch) Put(key []byte, value []byte) { - if b.l.binlog != nil { - buf := encodeBinLogPut(key, value) - b.logs = append(b.logs, buf) - } b.WriteBatch.Put(key, value) } func (b *batch) Delete(key []byte) { - if b.l.binlog != nil { - buf := encodeBinLogDelete(key) - b.logs = append(b.logs, buf) - } + b.WriteBatch.Delete(key) } +func (b *batch) LogEanbled() bool { + return !b.noLogging && b.l.log != nil +} + +func (b *batch) DisableLog(d bool) { + b.noLogging = d +} + type dbBatchLocker struct { l *sync.Mutex wrLock *sync.RWMutex @@ -100,6 +90,8 @@ func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch b.tx = tx b.Locker = locker - b.logs = [][]byte{} + b.eb = new(eventBatch) + b.noLogging = false + return b } diff --git a/ledis/binlog.go b/ledis/binlog.go deleted file mode 100644 index f26323e..0000000 --- a/ledis/binlog.go +++ /dev/null @@ -1,400 +0,0 @@ -package ledis - -import ( - "bufio" - "encoding/binary" - "fmt" - "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/config" - "io" - "io/ioutil" - "os" - "path" - "strconv" - "strings" - "sync" - "time" -) - -type BinLogHead struct { - CreateTime uint32 - BatchId uint32 - PayloadLen uint32 -} - -func (h *BinLogHead) Len() int { - return 12 -} - -func (h *BinLogHead) Write(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil { - return err - } - - return nil -} - -func (h *BinLogHead) handleReadError(err error) error { - if err == io.EOF { - return io.ErrUnexpectedEOF - } else { - return err - } -} - -func (h *BinLogHead) Read(r io.Reader) error { - var err error - if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil { - return err - } - - if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil { - return h.handleReadError(err) - } - - if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil { - return h.handleReadError(err) - } - - return nil -} - -func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool { - if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId { - return true - } else { - return false - } -} - -/* -index file format: -ledis-bin.00001 -ledis-bin.00002 -ledis-bin.00003 - -log file format - -Log: Head|PayloadData - -Head: createTime|batchId|payloadData - -*/ - -type BinLog struct { - sync.Mutex - - path string - - cfg *config.BinLogConfig - - logFile *os.File - - logWb *bufio.Writer - - indexName string - logNames []string - lastLogIndex int64 - - batchId uint32 - - ch chan struct{} -} - -func NewBinLog(cfg *config.Config) (*BinLog, error) { - l := new(BinLog) - - l.cfg = &cfg.BinLog - l.cfg.Adjust() - - l.path = path.Join(cfg.DataDir, "binlog") - - if err := os.MkdirAll(l.path, 0755); err != nil { - return nil, err - } - - l.logNames = make([]string, 0, 16) - - l.ch = make(chan struct{}) - - if err := l.loadIndex(); err != nil { - return nil, err - } - - return l, nil -} - -func (l *BinLog) flushIndex() error { - data := strings.Join(l.logNames, "\n") - - bakName := fmt.Sprintf("%s.bak", l.indexName) - f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - log.Error("create binlog bak index error %s", err.Error()) - return err - } - - if _, err := f.WriteString(data); err != nil { - log.Error("write binlog index error %s", err.Error()) - f.Close() - return err - } - - f.Close() - - if err := os.Rename(bakName, l.indexName); err != nil { - log.Error("rename binlog bak index error %s", err.Error()) - return err - } - - return nil -} - -func (l *BinLog) loadIndex() error { - l.indexName = path.Join(l.path, fmt.Sprintf("ledis-bin.index")) - if _, err := os.Stat(l.indexName); os.IsNotExist(err) { - //no index file, nothing to do - } else { - indexData, err := ioutil.ReadFile(l.indexName) - if err != nil { - return err - } - - lines := strings.Split(string(indexData), "\n") - for _, line := range lines { - line = strings.Trim(line, "\r\n ") - if len(line) == 0 { - continue - } - - if _, err := os.Stat(path.Join(l.path, line)); err != nil { - log.Error("load index line %s error %s", line, err.Error()) - return err - } else { - l.logNames = append(l.logNames, line) - } - } - } - if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum { - //remove oldest logfile - if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil { - return err - } - } - - var err error - if len(l.logNames) == 0 { - l.lastLogIndex = 1 - } else { - lastName := l.logNames[len(l.logNames)-1] - - if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { - log.Error("invalid logfile name %s", err.Error()) - return err - } - - //like mysql, if server restart, a new binlog will create - l.lastLogIndex++ - } - - return nil -} - -func (l *BinLog) getLogFile() string { - return l.FormatLogFileName(l.lastLogIndex) -} - -func (l *BinLog) openNewLogFile() error { - var err error - lastName := l.getLogFile() - - logPath := path.Join(l.path, lastName) - if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil { - log.Error("open new logfile error %s", err.Error()) - return err - } - - if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum { - l.purge(1) - } - - l.logNames = append(l.logNames, lastName) - - if l.logWb == nil { - l.logWb = bufio.NewWriterSize(l.logFile, 1024) - } else { - l.logWb.Reset(l.logFile) - } - - if err = l.flushIndex(); err != nil { - return err - } - - return nil -} - -func (l *BinLog) checkLogFileSize() bool { - if l.logFile == nil { - return false - } - - st, _ := l.logFile.Stat() - if st.Size() >= int64(l.cfg.MaxFileSize) { - l.closeLog() - return true - } - - return false -} - -func (l *BinLog) closeLog() { - if l.logFile == nil { - return - } - - l.lastLogIndex++ - - l.logFile.Close() - l.logFile = nil -} - -func (l *BinLog) purge(n int) { - if len(l.logNames) < n { - n = len(l.logNames) - } - for i := 0; i < n; i++ { - logPath := path.Join(l.path, l.logNames[i]) - os.Remove(logPath) - } - - copy(l.logNames[0:], l.logNames[n:]) - l.logNames = l.logNames[0 : len(l.logNames)-n] -} - -func (l *BinLog) Close() { - if l.logFile != nil { - l.logFile.Close() - l.logFile = nil - } -} - -func (l *BinLog) LogNames() []string { - return l.logNames -} - -func (l *BinLog) LogFileName() string { - return l.getLogFile() -} - -func (l *BinLog) LogFilePos() int64 { - if l.logFile == nil { - return 0 - } else { - st, _ := l.logFile.Stat() - return st.Size() - } -} - -func (l *BinLog) LogFileIndex() int64 { - return l.lastLogIndex -} - -func (l *BinLog) FormatLogFileName(index int64) string { - return fmt.Sprintf("ledis-bin.%07d", index) -} - -func (l *BinLog) FormatLogFilePath(index int64) string { - return path.Join(l.path, l.FormatLogFileName(index)) -} - -func (l *BinLog) LogPath() string { - return l.path -} - -func (l *BinLog) Purge(n int) error { - l.Lock() - defer l.Unlock() - - if len(l.logNames) == 0 { - return nil - } - - if n >= len(l.logNames) { - n = len(l.logNames) - //can not purge current log file - if l.logNames[n-1] == l.getLogFile() { - n = n - 1 - } - } - - l.purge(n) - - return l.flushIndex() -} - -func (l *BinLog) PurgeAll() error { - l.Lock() - defer l.Unlock() - - l.closeLog() - - l.purge(len(l.logNames)) - - return l.openNewLogFile() -} - -func (l *BinLog) Log(args ...[]byte) error { - l.Lock() - defer l.Unlock() - - var err error - - if l.logFile == nil { - if err = l.openNewLogFile(); err != nil { - return err - } - } - - head := &BinLogHead{} - - head.CreateTime = uint32(time.Now().Unix()) - head.BatchId = l.batchId - - l.batchId++ - - for _, data := range args { - head.PayloadLen = uint32(len(data)) - - if err := head.Write(l.logWb); err != nil { - return err - } - - if _, err := l.logWb.Write(data); err != nil { - return err - } - } - - if err = l.logWb.Flush(); err != nil { - log.Error("write log error %s", err.Error()) - return err - } - - l.checkLogFileSize() - - close(l.ch) - l.ch = make(chan struct{}) - - return nil -} - -func (l *BinLog) Wait() <-chan struct{} { - return l.ch -} diff --git a/ledis/binlog_test.go b/ledis/binlog_test.go deleted file mode 100644 index ea62bd9..0000000 --- a/ledis/binlog_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package ledis - -import ( - "github.com/siddontang/ledisdb/config" - "io/ioutil" - "os" - "testing" -) - -func TestBinLog(t *testing.T) { - cfg := new(config.Config) - - cfg.BinLog.MaxFileNum = 1 - cfg.BinLog.MaxFileSize = 1024 - cfg.DataDir = "/tmp/ledis_binlog" - - os.RemoveAll(cfg.DataDir) - - b, err := NewBinLog(cfg) - if err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if fs, err := ioutil.ReadDir(b.LogPath()); err != nil { - t.Fatal(err) - } else if len(fs) != 2 { - t.Fatal(len(fs)) - } - - if err := b.PurgeAll(); err != nil { - t.Fatal(err) - } - - if fs, err := ioutil.ReadDir(b.LogPath()); err != nil { - t.Fatal(err) - } else if len(fs) != 2 { - t.Fatal(len(fs)) - } else if b.LogFilePos() != 0 { - t.Fatal(b.LogFilePos()) - } -} diff --git a/ledis/const.go b/ledis/const.go index e889f4e..9108736 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -81,12 +81,6 @@ var ( ErrScoreMiss = errors.New("zset score miss") ) -const ( - BinLogTypeDeletion uint8 = 0x0 - BinLogTypePut uint8 = 0x1 - BinLogTypeCommand uint8 = 0x2 -) - const ( DBAutoCommit uint8 = 0x0 DBInTransaction uint8 = 0x1 diff --git a/ledis/dump.go b/ledis/dump.go index f162481..6f3d81c 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -9,42 +9,6 @@ import ( "os" ) -//dump format -// fileIndex(bigendian int64)|filePos(bigendian int64) -// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... -// -//key and value are both compressed for fast transfer dump on network using snappy - -type BinLogAnchor struct { - LogFileIndex int64 - LogPos int64 -} - -func (m *BinLogAnchor) WriteTo(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil { - return err - } - return nil -} - -func (m *BinLogAnchor) ReadFrom(r io.Reader) error { - err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) - if err != nil { - return err - } - - err = binary.Read(r, binary.BigEndian, &m.LogPos) - if err != nil { - return err - } - - return nil -} - func (l *Ledis) DumpFile(path string) error { f, err := os.Create(path) if err != nil { @@ -56,18 +20,11 @@ func (l *Ledis) DumpFile(path string) error { } func (l *Ledis) Dump(w io.Writer) error { - m := new(BinLogAnchor) - var err error l.wLock.Lock() defer l.wLock.Unlock() - if l.binlog != nil { - m.LogFileIndex = l.binlog.LogFileIndex() - m.LogPos = l.binlog.LogFilePos() - } - wb := bufio.NewWriterSize(w, 4096) if err = m.WriteTo(wb); err != nil { return err @@ -118,7 +75,7 @@ func (l *Ledis) Dump(w io.Writer) error { return nil } -func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { +func (l *Ledis) LoadDumpFile(path string) error { f, err := os.Open(path) if err != nil { return nil, err @@ -128,19 +85,12 @@ func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { return l.LoadDump(f) } -func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { +func (l *Ledis) LoadDump(r io.Reader) error { l.wLock.Lock() defer l.wLock.Unlock() - info := new(BinLogAnchor) - rb := bufio.NewReaderSize(r, 4096) - err := info.ReadFrom(rb) - if err != nil { - return nil, err - } - var keyLen uint16 var valueLen uint32 @@ -154,33 +104,33 @@ func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { for { if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF { - return nil, err + return err } else if err == io.EOF { break } if _, err = io.CopyN(&keyBuf, rb, int64(keyLen)); err != nil { - return nil, err + return err } if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil { - return nil, err + return err } if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil { - return nil, err + return err } if _, err = io.CopyN(&valueBuf, rb, int64(valueLen)); err != nil { - return nil, err + return err } if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil { - return nil, err + return err } if err = l.ldb.Put(key, value); err != nil { - return nil, err + return err } keyBuf.Reset() @@ -190,10 +140,11 @@ func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { deKeyBuf = nil deValueBuf = nil - //if binlog enable, we will delete all binlogs and open a new one for handling simply - if l.binlog != nil { - l.binlog.PurgeAll() + //to do remove all wal log + + if l.log != nil { + l.log.Clear() } - return info, nil + return nil } diff --git a/ledis/dump_test.go b/ledis/dump_test.go index e29d928..8a1b2fa 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -38,7 +38,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - if _, err := slave.LoadDumpFile("/tmp/testdb.dump"); err != nil { + if err := slave.LoadDumpFile("/tmp/testdb.dump"); err != nil { t.Fatal(err) } diff --git a/ledis/binlog_util.go b/ledis/event.go similarity index 63% rename from ledis/binlog_util.go rename to ledis/event.go index da058bd..674e9c7 100644 --- a/ledis/binlog_util.go +++ b/ledis/event.go @@ -1,97 +1,108 @@ package ledis import ( + "bytes" "encoding/binary" "errors" - "fmt" + "io" "strconv" ) -var ( - errBinLogDeleteType = errors.New("invalid bin log delete type") - errBinLogPutType = errors.New("invalid bin log put type") - errBinLogCommandType = errors.New("invalid bin log command type") +const ( + kTypeDeleteEvent uint8 = 0 + kTypePutEvent uint8 = 1 ) -func encodeBinLogDelete(key []byte) []byte { - buf := make([]byte, 1+len(key)) - buf[0] = BinLogTypeDeletion - copy(buf[1:], key) - return buf +var ( + errInvalidPutEvent = errors.New("invalid put event") + errInvalidDeleteEvent = errors.New("invalid delete event") + errInvalidEvent = errors.New("invalid event") +) + +type eventBatch struct { + bytes.Buffer } -func decodeBinLogDelete(sz []byte) ([]byte, error) { - if len(sz) < 1 || sz[0] != BinLogTypeDeletion { - return nil, errBinLogDeleteType +type event struct { + key []byte + value []byte //value = nil for delete event +} + +func (b *eventBatch) Put(key []byte, value []byte) { + l := uint32(len(key) + len(value) + 1 + 2) + binary.Write(b, binary.BigEndian, l) + b.WriteByte(kTypePutEvent) + keyLen := uint16(len(key)) + binary.Write(b, binary.BigEndian, keyLen) + b.Write(key) + b.Write(value) +} + +func (b *eventBatch) Delete(key []byte) { + l := uint32(len(key) + 1) + binary.Write(b, binary.BigEndian, l) + b.WriteByte(kTypeDeleteEvent) + b.Write(key) +} + +func decodeEventBatch(data []byte) (ev []event, err error) { + ev = make([]event, 0, 16) + for { + if len(data) == 0 { + return ev, nil + } + + if len(data) < 4 { + return nil, io.ErrUnexpectedEOF + } + + l := binary.BigEndian.Uint32(data) + data = data[4:] + if uint32(len(data)) < l { + return nil, io.ErrUnexpectedEOF + } + + var e event + if err := decodeEvent(&e, data[0:l]); err != nil { + return nil, err + } + ev = append(ev, e) + data = data[l:] + } +} + +func decodeEvent(e *event, b []byte) error { + if len(b) == 0 { + return errInvalidEvent } - return sz[1:], nil -} + switch b[0] { + case kTypePutEvent: + if len(b[1:]) < 2 { + return errInvalidPutEvent + } -func encodeBinLogPut(key []byte, value []byte) []byte { - buf := make([]byte, 3+len(key)+len(value)) - buf[0] = BinLogTypePut - pos := 1 - binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) - pos += 2 - copy(buf[pos:], key) - pos += len(key) - copy(buf[pos:], value) + keyLen := binary.BigEndian.Uint16(b[1:3]) + b = b[3:] + if len(b) < int(keyLen) { + return errInvalidPutEvent + } - return buf -} - -func decodeBinLogPut(sz []byte) ([]byte, []byte, error) { - if len(sz) < 3 || sz[0] != BinLogTypePut { - return nil, nil, errBinLogPutType - } - - keyLen := int(binary.BigEndian.Uint16(sz[1:])) - if 3+keyLen > len(sz) { - return nil, nil, errBinLogPutType - } - - return sz[3 : 3+keyLen], sz[3+keyLen:], nil -} - -func FormatBinLogEvent(event []byte) (string, error) { - logType := uint8(event[0]) - - var err error - var k []byte - var v []byte - - var buf []byte = make([]byte, 0, 1024) - - switch logType { - case BinLogTypePut: - k, v, err = decodeBinLogPut(event) - buf = append(buf, "PUT "...) - case BinLogTypeDeletion: - k, err = decodeBinLogDelete(event) - buf = append(buf, "DELETE "...) + e.key = b[0:keyLen] + e.value = b[keyLen:] + case kTypeDeleteEvent: + e.value = nil + e.key = b[1:] default: - err = errInvalidBinLogEvent + return errInvalidEvent } - if err != nil { - return "", err - } - - if buf, err = formatDataKey(buf, k); err != nil { - return "", err - } - - if v != nil && len(v) != 0 { - buf = append(buf, fmt.Sprintf(" %q", v)...) - } - - return String(buf), nil + return nil } -func formatDataKey(buf []byte, k []byte) ([]byte, error) { +func formatEventKey(buf []byte, k []byte) ([]byte, error) { if len(k) < 2 { - return nil, errInvalidBinLogEvent + return nil, errInvalidEvent } buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) @@ -208,7 +219,7 @@ func formatDataKey(buf []byte, k []byte) ([]byte, error) { buf = strconv.AppendQuote(buf, String(key)) } default: - return nil, errInvalidBinLogEvent + return nil, errInvalidEvent } return buf, nil diff --git a/ledis/event_test.go b/ledis/event_test.go new file mode 100644 index 0000000..0349ea7 --- /dev/null +++ b/ledis/event_test.go @@ -0,0 +1,34 @@ +package ledis + +import ( + "reflect" + "testing" +) + +func TestEvent(t *testing.T) { + k1 := []byte("k1") + v1 := []byte("v1") + k2 := []byte("k2") + k3 := []byte("k3") + v3 := []byte("v3") + + b := new(eventBatch) + + b.Put(k1, v1) + b.Delete(k2) + b.Put(k3, v3) + + buf := b.Bytes() + + ev2 := []event{ + event{k1, v1}, + event{k2, nil}, + event{k3, v3}, + } + + if ev, err := decodeEventBatch(buf); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(ev, ev2) { + t.Fatal("not equal") + } +} diff --git a/ledis/ledis.go b/ledis/ledis.go index f3c1c8c..c4ac42f 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -5,6 +5,7 @@ import ( "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store" + "github.com/siddontang/ledisdb/wal" "sync" "time" ) @@ -18,10 +19,12 @@ type Ledis struct { quit chan struct{} jobs *sync.WaitGroup - binlog *BinLog + log wal.Store wLock sync.RWMutex //allow one write at same time commitLock sync.Mutex //allow one write commit at same time + + readOnly bool } func Open(cfg *config.Config) (*Ledis, error) { @@ -41,13 +44,10 @@ func Open(cfg *config.Config) (*Ledis, error) { l.ldb = ldb - if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 { - l.binlog, err = NewBinLog(cfg) - if err != nil { + if cfg.UseWAL { + if l.log, err = wal.NewStore(cfg); err != nil { return nil, err } - } else { - l.binlog = nil } for i := uint8(0); i < MaxDBNumber; i++ { @@ -65,9 +65,9 @@ func (l *Ledis) Close() { l.ldb.Close() - if l.binlog != nil { - l.binlog.Close() - l.binlog = nil + if l.log != nil { + l.log.Close() + l.log = nil } } diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index d5a5476..45f1c7f 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -14,8 +14,6 @@ func getTestDB() *DB { f := func() { cfg := new(config.Config) cfg.DataDir = "/tmp/test_ledis" - // cfg.BinLog.MaxFileSize = 1073741824 - // cfg.BinLog.MaxFileNum = 3 os.RemoveAll(cfg.DataDir) diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 8ee199e..952ddae 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -183,8 +183,6 @@ func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { return 0, err } - //todo add binlog - err = t.Commit() return n, err } diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 1dd540a..fd13436 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -77,8 +77,6 @@ func (db *DB) incr(key []byte, delta int64) (int64, error) { t.Put(key, StrPutInt64(n)) - //todo binlog - err = t.Commit() return n, err } @@ -244,7 +242,6 @@ func (db *DB) MSet(args ...KVPair) error { t.Put(key, value) - //todo binlog } err = t.Commit() @@ -297,8 +294,6 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) { } else { t.Put(key, value) - //todo binlog - err = t.Commit() } diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 47af6ec..50fc6aa 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -305,7 +305,6 @@ func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) { return 0, err } - //todo add binlog err := t.Commit() return num, err } @@ -862,7 +861,6 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg sk := db.zEncodeSizeKey(destKey) t.Put(sk, PutInt64(num)) - //todo add binlog if err := t.Commit(); err != nil { return 0, err } @@ -930,7 +928,7 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg var num int64 = int64(len(destMap)) sk := db.zEncodeSizeKey(destKey) t.Put(sk, PutInt64(num)) - //todo add binlog + if err := t.Commit(); err != nil { return 0, err } diff --git a/ledis/tx.go b/ledis/tx.go index 6339bae..2d96bd3 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -15,8 +15,6 @@ type Tx struct { *DB tx *store.Tx - - logs [][]byte } func (db *DB) IsTransaction() bool { @@ -71,10 +69,6 @@ func (tx *Tx) Commit() error { err := tx.tx.Commit() tx.tx = nil - if len(tx.logs) > 0 { - tx.l.binlog.Log(tx.logs...) - } - tx.l.commitLock.Unlock() tx.l.wLock.Unlock() diff --git a/wal/file_store.go b/wal/file_store.go index 0d39efb..5eb800f 100644 --- a/wal/file_store.go +++ b/wal/file_store.go @@ -1,25 +1,44 @@ package wal import ( + "fmt" + "github.com/siddontang/go-log/log" + "io/ioutil" "os" + "path" + "strconv" + "strings" "sync" ) const ( defaultMaxLogFileSize = 1024 * 1024 * 1024 - defaultMaxLogFileNum = 10 ) +/* +index file format: +ledis-bin.00001 +ledis-bin.00002 +ledis-bin.00003 +*/ + type FileStore struct { Store m sync.Mutex maxFileSize int - maxFileNum int first uint64 last uint64 + + logFile *os.File + logNames []string + nextLogIndex int64 + + indexName string + + path string } func NewFileStore(path string) (*FileStore, error) { @@ -29,12 +48,19 @@ func NewFileStore(path string) (*FileStore, error) { return nil, err } + s.path = path + s.maxFileSize = defaultMaxLogFileSize - s.maxFileNum = defaultMaxLogFileNum s.first = 0 s.last = 0 + s.logNames = make([]string, 0, 16) + + if err := s.loadIndex(); err != nil { + return nil, err + } + return s, nil } @@ -42,10 +68,6 @@ func (s *FileStore) SetMaxFileSize(size int) { s.maxFileSize = size } -func (s *FileStore) SetMaxFileNum(n int) { - s.maxFileNum = n -} - func (s *FileStore) GetLog(id uint64, log *Log) error { return nil } @@ -70,7 +92,11 @@ func (s *FileStore) StoreLogs(logs []*Log) error { return nil } -func (s *FileStore) DeleteRange(start, stop uint64) error { +func (s *FileStore) Purge(n uint64) error { + return nil +} + +func (s *FileStore) PuregeExpired(n int) error { return nil } @@ -81,3 +107,126 @@ func (s *FileStore) Clear() error { func (s *FileStore) Close() error { return nil } + +func (s *FileStore) flushIndex() error { + data := strings.Join(s.logNames, "\n") + + bakName := fmt.Sprintf("%s.bak", s.indexName) + f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Error("create bak index error %s", err.Error()) + return err + } + + if _, err := f.WriteString(data); err != nil { + log.Error("write index error %s", err.Error()) + f.Close() + return err + } + + f.Close() + + if err := os.Rename(bakName, s.indexName); err != nil { + log.Error("rename bak index error %s", err.Error()) + return err + } + + return nil +} + +func (s *FileStore) fileExists(name string) bool { + p := path.Join(s.path, name) + _, err := os.Stat(p) + return !os.IsNotExist(err) +} + +func (s *FileStore) loadIndex() error { + s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index")) + if _, err := os.Stat(s.indexName); os.IsNotExist(err) { + //no index file, nothing to do + } else { + indexData, err := ioutil.ReadFile(s.indexName) + if err != nil { + return err + } + + lines := strings.Split(string(indexData), "\n") + for _, line := range lines { + line = strings.Trim(line, "\r\n ") + if len(line) == 0 { + continue + } + + if s.fileExists(line) { + s.logNames = append(s.logNames, line) + } else { + log.Info("log %s has not exists", line) + } + } + } + + var err error + if len(s.logNames) == 0 { + s.nextLogIndex = 1 + } else { + lastName := s.logNames[len(s.logNames)-1] + + if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { + log.Error("invalid logfile name %s", err.Error()) + return err + } + + //like mysql, if server restart, a new log will create + s.nextLogIndex++ + } + + return nil +} + +func (s *FileStore) openNewLogFile() error { + var err error + lastName := s.formatLogFileName(s.nextLogIndex) + + logPath := path.Join(s.path, lastName) + if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil { + log.Error("open new logfile error %s", err.Error()) + return err + } + + s.logNames = append(s.logNames, lastName) + + if err = s.flushIndex(); err != nil { + return err + } + + return nil +} + +func (s *FileStore) checkLogFileSize() bool { + if s.logFile == nil { + return false + } + + st, _ := s.logFile.Stat() + if st.Size() >= int64(s.maxFileSize) { + s.closeLog() + return true + } + + return false +} + +func (s *FileStore) closeLog() { + if s.logFile == nil { + return + } + + s.nextLogIndex++ + + s.logFile.Close() + s.logFile = nil +} + +func (s *FileStore) formatLogFileName(index int64) string { + return fmt.Sprintf("ledis-bin.%07d", index) +} diff --git a/wal/goleveldb_store.go b/wal/goleveldb_store.go index 2c9b09d..4e78eb8 100644 --- a/wal/goleveldb_store.go +++ b/wal/goleveldb_store.go @@ -2,11 +2,13 @@ package wal import ( "bytes" + "fmt" "github.com/siddontang/go/num" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store" "os" "sync" + "time" ) type GoLevelDBStore struct { @@ -132,7 +134,7 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { return nil } -func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { +func (s *GoLevelDBStore) Purge(n uint64) error { s.m.Lock() defer s.m.Unlock() @@ -149,25 +151,16 @@ func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { return err } - min = num.MaxUint64(min, first) - max = num.MinUint64(max, last) + start := first + stop := num.MinUint64(last, first+n) w := s.db.NewWriteBatch() defer w.Rollback() - n := 0 - s.reset() - for i := min; i <= max; i++ { + for i := start; i < stop; i++ { w.Delete(num.Uint64ToBytes(i)) - n++ - if n > 1024 { - if err = w.Commit(); err != nil { - return err - } - n = 0 - } } if err = w.Commit(); err != nil { @@ -177,6 +170,44 @@ func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { return nil } +func (s *GoLevelDBStore) PurgeExpired(n int) error { + if n <= 0 { + return fmt.Errorf("invalid expired time %d", n) + } + + t := uint32(time.Now().Unix() - int64(n)) + + s.m.Lock() + defer s.m.Unlock() + + s.reset() + + it := s.db.NewIterator() + it.SeekToFirst() + + w := s.db.NewWriteBatch() + defer w.Rollback() + + l := new(Log) + for ; it.Valid(); it.Next() { + v := it.RawValue() + + if err := l.Unmarshal(v); err != nil { + return err + } else if l.CreateTime > t { + break + } else { + w.Delete(it.RawKey()) + } + } + + if err := w.Commit(); err != nil { + return err + } + + return nil +} + func (s *GoLevelDBStore) Clear() error { s.m.Lock() defer s.m.Unlock() diff --git a/wal/log.go b/wal/log.go index d567c60..c150513 100644 --- a/wal/log.go +++ b/wal/log.go @@ -4,19 +4,31 @@ import ( "bytes" "encoding/binary" "io" + "time" ) type Log struct { ID uint64 CreateTime uint32 - // 0 for no compression - // 1 for snappy compression - Compression uint8 - Data []byte + + Data []byte +} + +func NewLog(id uint64, data []byte) *Log { + l := new(Log) + l.ID = id + l.CreateTime = uint32(time.Now().Unix()) + l.Data = data + + return l +} + +func (l *Log) HeadSize() int { + return 16 } func (l *Log) Marshal() ([]byte, error) { - buf := bytes.NewBuffer(make([]byte, 17+len(l.Data))) + buf := bytes.NewBuffer(make([]byte, l.HeadSize()+len(l.Data))) buf.Reset() if err := l.Encode(buf); err != nil { @@ -33,8 +45,7 @@ func (l *Log) Unmarshal(b []byte) error { } func (l *Log) Encode(w io.Writer) error { - length := uint32(17) - buf := make([]byte, length) + buf := make([]byte, l.HeadSize()) pos := 0 binary.BigEndian.PutUint64(buf[pos:], l.ID) @@ -43,9 +54,6 @@ func (l *Log) Encode(w io.Writer) error { binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) pos += 4 - buf[pos] = l.Compression - pos++ - binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) if n, err := w.Write(buf); err != nil { @@ -63,8 +71,7 @@ func (l *Log) Encode(w io.Writer) error { } func (l *Log) Decode(r io.Reader) error { - length := uint32(17) - buf := make([]byte, length) + buf := make([]byte, l.HeadSize()) if _, err := io.ReadFull(r, buf); err != nil { return err @@ -77,10 +84,7 @@ func (l *Log) Decode(r io.Reader) error { l.CreateTime = binary.BigEndian.Uint32(buf[pos:]) pos += 4 - l.Compression = buf[pos] - pos++ - - length = binary.BigEndian.Uint32(buf[pos:]) + length := binary.BigEndian.Uint32(buf[pos:]) l.Data = make([]byte, length) if _, err := io.ReadFull(r, l.Data); err != nil { diff --git a/wal/log_test.go b/wal/log_test.go index cfd8c22..46109cd 100644 --- a/wal/log_test.go +++ b/wal/log_test.go @@ -36,5 +36,4 @@ func TestLog(t *testing.T) { if !reflect.DeepEqual(l1, l2) { t.Fatal("must equal") } - } diff --git a/wal/store_test.go b/wal/store_test.go index 5b32f3e..030bff0 100644 --- a/wal/store_test.go +++ b/wal/store_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "testing" + "time" ) func TestGoLevelDBStore(t *testing.T) { @@ -103,12 +104,12 @@ func testLogs(t *testing.T, l Store) { } // Delete a suffix - if err := l.DeleteRange(5, 20); err != nil { + if err := l.Purge(5); err != nil { t.Fatalf("err: %v ", err) } // Verify they are all deleted - for i := 5; i <= 20; i++ { + for i := 1; i <= 5; i++ { if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound { t.Fatalf("err: %v ", err) } @@ -119,14 +120,14 @@ func testLogs(t *testing.T, l Store) { if err != nil { t.Fatalf("err: %v ", err) } - if idx != 1 { + if idx != 6 { t.Fatalf("bad idx: %d", idx) } idx, err = l.LastID() if err != nil { t.Fatalf("err: %v ", err) } - if idx != 4 { + if idx != 20 { t.Fatalf("bad idx: %d", idx) } @@ -154,4 +155,35 @@ func testLogs(t *testing.T, l Store) { if idx != 0 { t.Fatalf("bad idx: %d", idx) } + + now := uint32(time.Now().Unix()) + logs = []*Log{} + for i := 1; i <= 20; i++ { + nl := &Log{ + ID: uint64(i), + CreateTime: now - 20, + Data: []byte("first"), + } + logs = append(logs, nl) + } + + if err := l.PurgeExpired(1); err != nil { + t.Fatal(err) + } + + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } } diff --git a/wal/wal.go b/wal/wal.go index cc18870..b879619 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -2,6 +2,8 @@ package wal import ( "errors" + "github.com/siddontang/ledisdb/config" + "path" ) const ( @@ -26,11 +28,25 @@ type Store interface { StoreLog(log *Log) error StoreLogs(logs []*Log) error - // Delete logs [start, stop] - DeleteRange(start, stop uint64) error + // Delete first n logs + Purge(n uint64) error + + // Delete logs before n seconds + PurgeExpired(n int) error // Clear all logs Clear() error Close() error } + +func NewStore(cfg *config.Config) (Store, error) { + //now we only support goleveldb + + base := cfg.WAL.Path + if len(base) == 0 { + base = path.Join(cfg.DataDir, "wal") + } + + return NewGoLevelDBStore(base) +}