From 882e20a3e31a357e94ad02fbfac3c3ef858818aa Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 30 Aug 2014 17:39:44 +0800 Subject: [PATCH] refactor ledis lock and binlog --- ledis/binlog.go | 29 ++++++++++-- ledis/binlog_util.go | 9 ---- ledis/dump.go | 22 +++++---- ledis/ledis.go | 7 ++- ledis/ledis_db.go | 4 -- ledis/ledis_test.go | 4 +- ledis/replication.go | 110 ++++++++++++++++++++++++++++--------------- ledis/tx.go | 95 ++++++++++++++++++------------------- 8 files changed, 161 insertions(+), 119 deletions(-) diff --git a/ledis/binlog.go b/ledis/binlog.go index 087c13f..3bdf50a 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -11,6 +11,7 @@ import ( "path" "strconv" "strings" + "sync" "time" ) @@ -27,6 +28,8 @@ timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData */ type BinLog struct { + sync.Mutex + path string cfg *config.BinLogConfig @@ -177,16 +180,20 @@ func (l *BinLog) checkLogFileSize() bool { st, _ := l.logFile.Stat() if st.Size() >= int64(l.cfg.MaxFileSize) { - l.lastLogIndex++ - - l.logFile.Close() - l.logFile = nil + l.closeLog() return true } return false } +func (l *BinLog) closeLog() { + l.lastLogIndex++ + + l.logFile.Close() + l.logFile = nil +} + func (l *BinLog) purge(n int) { for i := 0; i < n; i++ { logPath := path.Join(l.path, l.logNames[i]) @@ -238,6 +245,9 @@ func (l *BinLog) LogPath() string { } func (l *BinLog) Purge(n int) error { + l.Lock() + defer l.Unlock() + if len(l.logNames) == 0 { return nil } @@ -255,7 +265,18 @@ func (l *BinLog) Purge(n int) error { return l.flushIndex() } +func (l *BinLog) PurgeAll() error { + l.Lock() + defer l.Unlock() + + l.closeLog() + return l.openNewLogFile() +} + func (l *BinLog) Log(args ...[]byte) error { + l.Lock() + defer l.Unlock() + var err error if l.logFile == nil { diff --git a/ledis/binlog_util.go b/ledis/binlog_util.go index 5167b40..da058bd 100644 --- a/ledis/binlog_util.go +++ b/ledis/binlog_util.go @@ -54,15 +54,6 @@ func decodeBinLogPut(sz []byte) ([]byte, []byte, error) { return sz[3 : 3+keyLen], sz[3+keyLen:], nil } -func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte { - //to do - return nil -} - -func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) { - return 0, nil, errBinLogCommandType -} - func FormatBinLogEvent(event []byte) (string, error) { logType := uint8(event[0]) diff --git a/ledis/dump.go b/ledis/dump.go index 14d7ff7..67908a8 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -57,16 +57,17 @@ func (l *Ledis) DumpFile(path string) error { func (l *Ledis) Dump(w io.Writer) error { var m *MasterInfo = new(MasterInfo) - l.Lock() - defer l.Unlock() + + var err error + + l.wLock.Lock() + defer l.wLock.Unlock() if l.binlog != nil { m.LogFileIndex = l.binlog.LogFileIndex() m.LogPos = l.binlog.LogFilePos() } - var err error - wb := bufio.NewWriterSize(w, 4096) if err = m.WriteTo(wb); err != nil { return err @@ -128,8 +129,8 @@ func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) { } func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { - l.Lock() - defer l.Unlock() + l.wLock.Lock() + defer l.wLock.Unlock() info := new(MasterInfo) @@ -182,10 +183,6 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { return nil, err } - if l.binlog != nil { - err = l.binlog.Log(encodeBinLogPut(key, value)) - } - keyBuf.Reset() valueBuf.Reset() } @@ -193,5 +190,10 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { deKeyBuf = nil deValueBuf = nil + //if binlog enable, we will delete all binlogs and open a new one for handling simply + if l.binlog != nil { + l.binlog.PurgeAll() + } + return info, nil } diff --git a/ledis/ledis.go b/ledis/ledis.go index 70d22d1..f3c1c8c 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -10,8 +10,6 @@ import ( ) type Ledis struct { - sync.Mutex - cfg *config.Config ldb *store.DB @@ -21,11 +19,13 @@ type Ledis struct { jobs *sync.WaitGroup binlog *BinLog + + wLock sync.RWMutex //allow one write at same time + commitLock sync.Mutex //allow one write commit at same time } func Open(cfg *config.Config) (*Ledis, error) { if len(cfg.DataDir) == 0 { - fmt.Printf("no datadir set, use default %s\n", config.DefaultDataDir) cfg.DataDir = config.DefaultDataDir } @@ -42,7 +42,6 @@ func Open(cfg *config.Config) (*Ledis, error) { l.ldb = ldb if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 { - println("binlog will be refactored later, use your own risk!!!") l.binlog, err = NewBinLog(cfg) if err != nil { return nil, err diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index c774d03..9241b1d 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -3,7 +3,6 @@ package ledis import ( "fmt" "github.com/siddontang/ledisdb/store" - "sync" ) type ibucket interface { @@ -29,8 +28,6 @@ type DB struct { bucket ibucket - dbLock *sync.RWMutex - index uint8 kvBatch *batch @@ -54,7 +51,6 @@ func (l *Ledis) newDB(index uint8) *DB { d.isTx = false d.index = index - d.dbLock = &sync.RWMutex{} d.kvBatch = d.newBatch() d.listBatch = d.newBatch() diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index aff4ebe..d5a5476 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -14,8 +14,8 @@ func getTestDB() *DB { f := func() { cfg := new(config.Config) cfg.DataDir = "/tmp/test_ledis" - cfg.BinLog.MaxFileSize = 1073741824 - cfg.BinLog.MaxFileNum = 3 + // cfg.BinLog.MaxFileSize = 1073741824 + // cfg.BinLog.MaxFileNum = 3 os.RemoveAll(cfg.DataDir) diff --git a/ledis/replication.go b/ledis/replication.go index bd6c192..2b19cfe 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/store/driver" "io" "os" ) @@ -19,7 +20,41 @@ var ( errInvalidBinLogFile = errors.New("invalid binlog file") ) -func (l *Ledis) ReplicateEvent(event []byte) error { +type replBatch struct { + wb driver.IWriteBatch + events [][]byte + createTime uint32 + l *Ledis +} + +func (b *replBatch) Commit() error { + b.l.commitLock.Lock() + defer b.l.commitLock.Unlock() + + err := b.wb.Commit() + if err != nil { + b.Rollback() + return err + } + + if b.l.binlog != nil { + if err = b.l.binlog.Log(b.events...); err != nil { + b.Rollback() + return err + } + } + + return nil +} + +func (b *replBatch) Rollback() error { + b.wb.Rollback() + b.events = [][]byte{} + b.createTime = 0 + return nil +} + +func (l *Ledis) replicateEvent(b *replBatch, event []byte) error { if len(event) == 0 { return errInvalidBinLogEvent } @@ -27,52 +62,42 @@ func (l *Ledis) ReplicateEvent(event []byte) error { logType := uint8(event[0]) switch logType { case BinLogTypePut: - return l.replicatePutEvent(event) + return l.replicatePutEvent(b, event) case BinLogTypeDeletion: - return l.replicateDeleteEvent(event) - case BinLogTypeCommand: - return l.replicateCommandEvent(event) + return l.replicateDeleteEvent(b, event) default: return errInvalidBinLogEvent } } -func (l *Ledis) replicatePutEvent(event []byte) error { +func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error { key, value, err := decodeBinLogPut(event) if err != nil { return err } - if err = l.ldb.Put(key, value); err != nil { - return err + b.wb.Put(key, value) + + if b.l.binlog != nil { + b.events = append(b.events, event) } - if l.binlog != nil { - err = l.binlog.Log(event) - } - - return err + return nil } -func (l *Ledis) replicateDeleteEvent(event []byte) error { +func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error { key, err := decodeBinLogDelete(event) if err != nil { return err } - if err = l.ldb.Delete(key); err != nil { - return err + b.wb.Delete(key) + + if b.l.binlog != nil { + b.events = append(b.events, event) } - if l.binlog != nil { - err = l.binlog.Log(event) - } - - return err -} - -func (l *Ledis) replicateCommandEvent(event []byte) error { - return errors.New("command event not supported now") + return nil } func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { @@ -110,8 +135,23 @@ func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) e } func (l *Ledis) ReplicateFromReader(rb io.Reader) error { + b := new(replBatch) + + b.wb = l.ldb.NewWriteBatch() + b.l = l + f := func(createTime uint32, event []byte) error { - err := l.ReplicateEvent(event) + if b.createTime == 0 { + b.createTime = createTime + } else if b.createTime != createTime { + if err := b.Commit(); err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + b.createTime = createTime + } + + err := l.replicateEvent(b, event) if err != nil { log.Fatal("replication error %s, skip to next", err.Error()) return ErrSkipEvent @@ -119,15 +159,18 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return nil } - return ReadEventFromReader(rb, f) + err := ReadEventFromReader(rb, f) + if err != nil { + b.Rollback() + return err + } + return b.Commit() } func (l *Ledis) ReplicateFromData(data []byte) error { rb := bytes.NewReader(data) - l.Lock() err := l.ReplicateFromReader(rb) - l.Unlock() return err } @@ -140,17 +183,13 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error { rb := bufio.NewReaderSize(f, 4096) - l.Lock() err = l.ReplicateFromReader(rb) - l.Unlock() f.Close() return err } -const maxSyncEvents = 64 - func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { n = 0 if l.binlog == nil { @@ -205,8 +244,6 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { var createTime uint32 var dataLen uint32 - var eventsNum int = 0 - for { if err = binary.Read(f, binary.BigEndian, &createTime); err != nil { if err == io.EOF { @@ -222,13 +259,10 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { } } - eventsNum++ if lastCreateTime == 0 { lastCreateTime = createTime } else if lastCreateTime != createTime { return - } else if eventsNum > maxSyncEvents { - return } if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil { diff --git a/ledis/tx.go b/ledis/tx.go index 38eb626..7488233 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -24,42 +24,49 @@ type batch struct { } type dbBatchLocker struct { - sync.Mutex - dbLock *sync.RWMutex + l *sync.Mutex + wrLock *sync.RWMutex +} + +func (l *dbBatchLocker) Lock() { + l.wrLock.RLock() + l.l.Lock() +} + +func (l *dbBatchLocker) Unlock() { + l.l.Unlock() + l.wrLock.RUnlock() } type txBatchLocker struct { } -func (l *txBatchLocker) Lock() { -} +func (l *txBatchLocker) Lock() {} +func (l *txBatchLocker) Unlock() {} -func (l *txBatchLocker) Unlock() { -} - -func (l *dbBatchLocker) Lock() { - l.dbLock.RLock() - l.Mutex.Lock() -} - -func (l *dbBatchLocker) Unlock() { - l.Mutex.Unlock() - l.dbLock.RUnlock() -} - -func (db *DB) newBatch() *batch { +func (l *Ledis) newBatch(wb store.WriteBatch, tx *Tx) *batch { b := new(batch) + b.l = l + b.WriteBatch = wb - b.WriteBatch = db.bucket.NewWriteBatch() - b.Locker = &dbBatchLocker{dbLock: db.dbLock} - b.l = db.l + b.tx = tx + if tx == nil { + b.Locker = &dbBatchLocker{l: &sync.Mutex{}, wrLock: &l.wLock} + } else { + b.Locker = &txBatchLocker{} + } + b.logs = [][]byte{} return b } +func (db *DB) newBatch() *batch { + return db.l.newBatch(db.bucket.NewWriteBatch(), nil) +} + func (b *batch) Commit() error { - b.l.Lock() - defer b.l.Unlock() + b.l.commitLock.Lock() + defer b.l.commitLock.Unlock() err := b.WriteBatch.Commit() @@ -85,7 +92,7 @@ func (b *batch) Unlock() { if b.l.binlog != nil { b.logs = [][]byte{} } - b.Rollback() + b.WriteBatch.Rollback() b.Locker.Unlock() } @@ -129,11 +136,10 @@ func (db *DB) Begin() (*Tx, error) { tx := new(Tx) tx.DB = new(DB) - tx.DB.dbLock = db.dbLock - - tx.DB.dbLock.Lock() - tx.DB.l = db.l + + tx.l.wLock.Lock() + tx.index = db.index tx.DB.sdb = db.sdb @@ -141,7 +147,7 @@ func (db *DB) Begin() (*Tx, error) { var err error tx.tx, err = db.sdb.Begin() if err != nil { - tx.DB.dbLock.Unlock() + tx.l.wLock.Unlock() return nil, err } @@ -151,12 +157,12 @@ func (db *DB) Begin() (*Tx, error) { tx.DB.index = db.index - tx.DB.kvBatch = tx.newBatch() - tx.DB.listBatch = tx.newBatch() - tx.DB.hashBatch = tx.newBatch() - tx.DB.zsetBatch = tx.newBatch() - tx.DB.binBatch = tx.newBatch() - tx.DB.setBatch = tx.newBatch() + tx.DB.kvBatch = tx.newTxBatch() + tx.DB.listBatch = tx.newTxBatch() + tx.DB.hashBatch = tx.newTxBatch() + tx.DB.zsetBatch = tx.newTxBatch() + tx.DB.binBatch = tx.newTxBatch() + tx.DB.setBatch = tx.newTxBatch() return tx, nil } @@ -166,7 +172,7 @@ func (tx *Tx) Commit() error { return ErrTxDone } - tx.l.Lock() + tx.l.commitLock.Lock() err := tx.tx.Commit() tx.tx = nil @@ -174,9 +180,9 @@ func (tx *Tx) Commit() error { tx.l.binlog.Log(tx.logs...) } - tx.l.Unlock() + tx.l.commitLock.Unlock() - tx.DB.dbLock.Unlock() + tx.l.wLock.Unlock() tx.DB = nil return err } @@ -189,20 +195,13 @@ func (tx *Tx) Rollback() error { err := tx.tx.Rollback() tx.tx = nil - tx.DB.dbLock.Unlock() + tx.l.wLock.Unlock() tx.DB = nil return err } -func (tx *Tx) newBatch() *batch { - b := new(batch) - - b.l = tx.l - b.WriteBatch = tx.tx.NewWriteBatch() - b.Locker = &txBatchLocker{} - b.tx = tx - - return b +func (tx *Tx) newTxBatch() *batch { + return tx.l.newBatch(tx.tx.NewWriteBatch(), tx) } func (tx *Tx) Index() int {