diff --git a/ledis/binlog.go b/ledis/binlog.go index 716b11b..258af09 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -11,7 +11,6 @@ import ( "path" "strconv" "strings" - "sync" "time" ) @@ -28,6 +27,8 @@ timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData|Lo */ type BinLogConfig struct { + BaseName string `json:"base_name"` + IndexName string `json:"index_name"` Path string `json:"path"` MaxFileSize int `json:"max_file_size"` MaxFileNum int `json:"max_file_num"` @@ -45,11 +46,16 @@ func (cfg *BinLogConfig) adjust() { } else if cfg.MaxFileNum > MaxBinLogFileNum { cfg.MaxFileNum = MaxBinLogFileNum } + + if len(cfg.BaseName) == 0 { + cfg.BaseName = "ledis" + } + if len(cfg.IndexName) == 0 { + cfg.IndexName = "ledis" + } } type BinLog struct { - sync.Mutex - cfg *BinLogConfig logFile *os.File @@ -132,7 +138,7 @@ func (b *BinLog) flushIndex() error { } func (b *BinLog) loadIndex() error { - b.indexName = path.Join(b.cfg.Path, BinLogIndexFile) + b.indexName = path.Join(b.cfg.Path, fmt.Sprintf("%s-bin.index", b.cfg.IndexName)) fd, err := os.OpenFile(b.indexName, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return err @@ -187,7 +193,7 @@ func (b *BinLog) loadIndex() error { } func (b *BinLog) getLogName() string { - return fmt.Sprintf("%s.%05d", BinLogBaseName, b.lastLogIndex) + return fmt.Sprintf("%s-bin.%05d", b.cfg.BaseName, b.lastLogIndex) } func (b *BinLog) openNewLogFile() error { @@ -241,14 +247,14 @@ func (b *BinLog) openLogFile() error { func (b *BinLog) Log(args ...[]byte) error { var err error + if err = b.openLogFile(); err != nil { + return err + } + for _, data := range args { createTime := uint32(time.Now().Unix()) payLoadLen := len(data) - if err = b.openLogFile(); err != nil { - return err - } - binary.Write(b.logWb, binary.BigEndian, createTime) binary.Write(b.logWb, binary.BigEndian, payLoadLen) diff --git a/ledis/binlog_util.go b/ledis/binlog_util.go new file mode 100644 index 0000000..b0abd4e --- /dev/null +++ b/ledis/binlog_util.go @@ -0,0 +1,34 @@ +package ledis + +import ( + "encoding/binary" +) + +func encodeBinLogDelete(key []byte) []byte { + buf := make([]byte, 3+len(key)) + buf[0] = BinLogTypeDeletion + pos := 1 + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + copy(buf[pos:], key) + return buf +} + +func encodeBinLogPut(key []byte, value []byte) []byte { + buf := make([]byte, 7+len(key)+len(value)) + buf[0] = BinLogTypePut + pos := 1 + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 + copy(buf[pos:], key) + pos += len(key) + binary.BigEndian.PutUint32(buf[pos:], uint32(len(value))) + pos += 4 + copy(buf[pos:], value) + return buf +} + +func encodeBinLogCommand(commandType uint8, args []byte) []byte { + //to do + return nil +} diff --git a/ledis/const.go b/ledis/const.go index cfe3cd9..042cf45 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -43,9 +43,6 @@ var ( ErrZSetMemberSize = errors.New("invalid zset member size") ) -const BinLogBaseName = "ledis-bin" -const BinLogIndexFile = "ledis-bin.index" - const ( MaxBinLogFileSize int = 1024 * 1024 * 1024 MaxBinLogFileNum int = 10000 @@ -54,8 +51,8 @@ const ( DefaultBinLogFileNum int = 10 ) -//like leveldb const ( BinLogTypeDeletion uint8 = 0x0 - BinLogTypeValue uint8 = 0x1 + BinLogTypePut uint8 = 0x1 + BinLogTypeCommand uint8 = 0x2 ) diff --git a/ledis/dump.go b/ledis/dump.go index 546ef31..e7ff8a0 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -36,11 +36,11 @@ func (l *Ledis) Dump(w io.Writer) error { if l.binlog == nil { sp = l.ldb.NewSnapshot() } else { - l.binlog.Lock() + l.Lock() sp = l.ldb.NewSnapshot() logFileName = l.binlog.LogFileName() logPos = l.binlog.LogFilePos() - l.binlog.Unlock() + l.Unlock() } var head = DumpHead{ @@ -104,6 +104,9 @@ func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) { } func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { + l.Lock() + defer l.Unlock() + rb := bufio.NewReaderSize(r, 4096) var headLen uint32 @@ -150,6 +153,10 @@ func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { return nil, err } + if l.binlog != nil { + err = l.binlog.Log(encodeBinLogPut(keyBuf.Bytes(), valueBuf.Bytes())) + } + keyBuf.Reset() valueBuf.Reset() } diff --git a/ledis/ledis.go b/ledis/ledis.go index 8d4d950..7a3a0d5 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "github.com/siddontang/go-leveldb/leveldb" + "sync" ) type Config struct { @@ -24,6 +25,8 @@ type DB struct { } type Ledis struct { + sync.Mutex + cfg *Config ldb *leveldb.DB diff --git a/ledis/tx.go b/ledis/tx.go index c5aca2d..fa7379b 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -1,7 +1,6 @@ package ledis import ( - "encoding/binary" "github.com/siddontang/go-leveldb/leveldb" "sync" ) @@ -9,6 +8,7 @@ import ( type tx struct { m sync.Mutex + l *Ledis wb *leveldb.WriteBatch binlog *BinLog @@ -18,6 +18,7 @@ type tx struct { func newTx(l *Ledis) *tx { t := new(tx) + t.l = l t.wb = l.ldb.NewWriteBatch() t.batch = make([][]byte, 0, 4) @@ -33,17 +34,7 @@ func (t *tx) Put(key []byte, value []byte) { t.wb.Put(key, value) if t.binlog != nil { - buf := make([]byte, 7+len(key)+len(value)) - buf[0] = BinLogTypeValue - pos := 1 - binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) - pos += 2 - copy(buf[pos:], key) - pos += len(key) - binary.BigEndian.PutUint32(buf[pos:], uint32(len(value))) - pos += 4 - copy(buf[pos:], value) - + buf := encodeBinLogPut(key, value) t.batch = append(t.batch, buf) } } @@ -52,13 +43,7 @@ func (t *tx) Delete(key []byte) { t.wb.Delete(key) if t.binlog != nil { - buf := make([]byte, 3+len(key)) - buf[0] = BinLogTypeDeletion - pos := 1 - binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) - pos += 2 - copy(buf[pos:], key) - + buf := encodeBinLogDelete(key) t.batch = append(t.batch, buf) } } @@ -76,18 +61,20 @@ func (t *tx) Unlock() { func (t *tx) Commit() error { var err error if t.binlog != nil { - t.binlog.Lock() + t.l.Lock() err = t.wb.Commit() if err != nil { - t.binlog.Unlock() + t.l.Unlock() return err } err = t.binlog.Log(t.batch...) - t.binlog.Unlock() + t.l.Unlock() } else { + t.l.Lock() err = t.wb.Commit() + t.l.Unlock() } return err }