refactor ledis lock and binlog

This commit is contained in:
siddontang 2014-08-30 17:39:44 +08:00
parent 577d545486
commit 882e20a3e3
8 changed files with 161 additions and 119 deletions

View File

@ -11,6 +11,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
) )
@ -27,6 +28,8 @@ timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData
*/ */
type BinLog struct { type BinLog struct {
sync.Mutex
path string path string
cfg *config.BinLogConfig cfg *config.BinLogConfig
@ -177,16 +180,20 @@ func (l *BinLog) checkLogFileSize() bool {
st, _ := l.logFile.Stat() st, _ := l.logFile.Stat()
if st.Size() >= int64(l.cfg.MaxFileSize) { if st.Size() >= int64(l.cfg.MaxFileSize) {
l.lastLogIndex++ l.closeLog()
l.logFile.Close()
l.logFile = nil
return true return true
} }
return false return false
} }
func (l *BinLog) closeLog() {
l.lastLogIndex++
l.logFile.Close()
l.logFile = nil
}
func (l *BinLog) purge(n int) { func (l *BinLog) purge(n int) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
logPath := path.Join(l.path, l.logNames[i]) logPath := path.Join(l.path, l.logNames[i])
@ -238,6 +245,9 @@ func (l *BinLog) LogPath() string {
} }
func (l *BinLog) Purge(n int) error { func (l *BinLog) Purge(n int) error {
l.Lock()
defer l.Unlock()
if len(l.logNames) == 0 { if len(l.logNames) == 0 {
return nil return nil
} }
@ -255,7 +265,18 @@ func (l *BinLog) Purge(n int) error {
return l.flushIndex() return l.flushIndex()
} }
func (l *BinLog) PurgeAll() error {
l.Lock()
defer l.Unlock()
l.closeLog()
return l.openNewLogFile()
}
func (l *BinLog) Log(args ...[]byte) error { func (l *BinLog) Log(args ...[]byte) error {
l.Lock()
defer l.Unlock()
var err error var err error
if l.logFile == nil { if l.logFile == nil {

View File

@ -54,15 +54,6 @@ func decodeBinLogPut(sz []byte) ([]byte, []byte, error) {
return sz[3 : 3+keyLen], sz[3+keyLen:], nil return sz[3 : 3+keyLen], sz[3+keyLen:], nil
} }
func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte {
//to do
return nil
}
func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) {
return 0, nil, errBinLogCommandType
}
func FormatBinLogEvent(event []byte) (string, error) { func FormatBinLogEvent(event []byte) (string, error) {
logType := uint8(event[0]) logType := uint8(event[0])

View File

@ -57,16 +57,17 @@ func (l *Ledis) DumpFile(path string) error {
func (l *Ledis) Dump(w io.Writer) error { func (l *Ledis) Dump(w io.Writer) error {
var m *MasterInfo = new(MasterInfo) var m *MasterInfo = new(MasterInfo)
l.Lock()
defer l.Unlock() var err error
l.wLock.Lock()
defer l.wLock.Unlock()
if l.binlog != nil { if l.binlog != nil {
m.LogFileIndex = l.binlog.LogFileIndex() m.LogFileIndex = l.binlog.LogFileIndex()
m.LogPos = l.binlog.LogFilePos() m.LogPos = l.binlog.LogFilePos()
} }
var err error
wb := bufio.NewWriterSize(w, 4096) wb := bufio.NewWriterSize(w, 4096)
if err = m.WriteTo(wb); err != nil { if err = m.WriteTo(wb); err != nil {
return err return err
@ -128,8 +129,8 @@ func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) {
} }
func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
l.Lock() l.wLock.Lock()
defer l.Unlock() defer l.wLock.Unlock()
info := new(MasterInfo) info := new(MasterInfo)
@ -182,10 +183,6 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
return nil, err return nil, err
} }
if l.binlog != nil {
err = l.binlog.Log(encodeBinLogPut(key, value))
}
keyBuf.Reset() keyBuf.Reset()
valueBuf.Reset() valueBuf.Reset()
} }
@ -193,5 +190,10 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
deKeyBuf = nil deKeyBuf = nil
deValueBuf = nil deValueBuf = nil
//if binlog enable, we will delete all binlogs and open a new one for handling simply
if l.binlog != nil {
l.binlog.PurgeAll()
}
return info, nil return info, nil
} }

View File

@ -10,8 +10,6 @@ import (
) )
type Ledis struct { type Ledis struct {
sync.Mutex
cfg *config.Config cfg *config.Config
ldb *store.DB ldb *store.DB
@ -21,11 +19,13 @@ type Ledis struct {
jobs *sync.WaitGroup jobs *sync.WaitGroup
binlog *BinLog binlog *BinLog
wLock sync.RWMutex //allow one write at same time
commitLock sync.Mutex //allow one write commit at same time
} }
func Open(cfg *config.Config) (*Ledis, error) { func Open(cfg *config.Config) (*Ledis, error) {
if len(cfg.DataDir) == 0 { if len(cfg.DataDir) == 0 {
fmt.Printf("no datadir set, use default %s\n", config.DefaultDataDir)
cfg.DataDir = config.DefaultDataDir cfg.DataDir = config.DefaultDataDir
} }
@ -42,7 +42,6 @@ func Open(cfg *config.Config) (*Ledis, error) {
l.ldb = ldb l.ldb = ldb
if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 { if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 {
println("binlog will be refactored later, use your own risk!!!")
l.binlog, err = NewBinLog(cfg) l.binlog, err = NewBinLog(cfg)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -3,7 +3,6 @@ package ledis
import ( import (
"fmt" "fmt"
"github.com/siddontang/ledisdb/store" "github.com/siddontang/ledisdb/store"
"sync"
) )
type ibucket interface { type ibucket interface {
@ -29,8 +28,6 @@ type DB struct {
bucket ibucket bucket ibucket
dbLock *sync.RWMutex
index uint8 index uint8
kvBatch *batch kvBatch *batch
@ -54,7 +51,6 @@ func (l *Ledis) newDB(index uint8) *DB {
d.isTx = false d.isTx = false
d.index = index d.index = index
d.dbLock = &sync.RWMutex{}
d.kvBatch = d.newBatch() d.kvBatch = d.newBatch()
d.listBatch = d.newBatch() d.listBatch = d.newBatch()

View File

@ -14,8 +14,8 @@ func getTestDB() *DB {
f := func() { f := func() {
cfg := new(config.Config) cfg := new(config.Config)
cfg.DataDir = "/tmp/test_ledis" cfg.DataDir = "/tmp/test_ledis"
cfg.BinLog.MaxFileSize = 1073741824 // cfg.BinLog.MaxFileSize = 1073741824
cfg.BinLog.MaxFileNum = 3 // cfg.BinLog.MaxFileNum = 3
os.RemoveAll(cfg.DataDir) os.RemoveAll(cfg.DataDir)

View File

@ -6,6 +6,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/store/driver"
"io" "io"
"os" "os"
) )
@ -19,7 +20,41 @@ var (
errInvalidBinLogFile = errors.New("invalid binlog file") errInvalidBinLogFile = errors.New("invalid binlog file")
) )
func (l *Ledis) ReplicateEvent(event []byte) error { type replBatch struct {
wb driver.IWriteBatch
events [][]byte
createTime uint32
l *Ledis
}
func (b *replBatch) Commit() error {
b.l.commitLock.Lock()
defer b.l.commitLock.Unlock()
err := b.wb.Commit()
if err != nil {
b.Rollback()
return err
}
if b.l.binlog != nil {
if err = b.l.binlog.Log(b.events...); err != nil {
b.Rollback()
return err
}
}
return nil
}
func (b *replBatch) Rollback() error {
b.wb.Rollback()
b.events = [][]byte{}
b.createTime = 0
return nil
}
func (l *Ledis) replicateEvent(b *replBatch, event []byte) error {
if len(event) == 0 { if len(event) == 0 {
return errInvalidBinLogEvent return errInvalidBinLogEvent
} }
@ -27,52 +62,42 @@ func (l *Ledis) ReplicateEvent(event []byte) error {
logType := uint8(event[0]) logType := uint8(event[0])
switch logType { switch logType {
case BinLogTypePut: case BinLogTypePut:
return l.replicatePutEvent(event) return l.replicatePutEvent(b, event)
case BinLogTypeDeletion: case BinLogTypeDeletion:
return l.replicateDeleteEvent(event) return l.replicateDeleteEvent(b, event)
case BinLogTypeCommand:
return l.replicateCommandEvent(event)
default: default:
return errInvalidBinLogEvent return errInvalidBinLogEvent
} }
} }
func (l *Ledis) replicatePutEvent(event []byte) error { func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error {
key, value, err := decodeBinLogPut(event) key, value, err := decodeBinLogPut(event)
if err != nil { if err != nil {
return err return err
} }
if err = l.ldb.Put(key, value); err != nil { b.wb.Put(key, value)
return err
if b.l.binlog != nil {
b.events = append(b.events, event)
} }
if l.binlog != nil { return nil
err = l.binlog.Log(event)
}
return err
} }
func (l *Ledis) replicateDeleteEvent(event []byte) error { func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error {
key, err := decodeBinLogDelete(event) key, err := decodeBinLogDelete(event)
if err != nil { if err != nil {
return err return err
} }
if err = l.ldb.Delete(key); err != nil { b.wb.Delete(key)
return err
if b.l.binlog != nil {
b.events = append(b.events, event)
} }
if l.binlog != nil { return nil
err = l.binlog.Log(event)
}
return err
}
func (l *Ledis) replicateCommandEvent(event []byte) error {
return errors.New("command event not supported now")
} }
func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error {
@ -110,8 +135,23 @@ func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) e
} }
func (l *Ledis) ReplicateFromReader(rb io.Reader) error { func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
b := new(replBatch)
b.wb = l.ldb.NewWriteBatch()
b.l = l
f := func(createTime uint32, event []byte) error { f := func(createTime uint32, event []byte) error {
err := l.ReplicateEvent(event) if b.createTime == 0 {
b.createTime = createTime
} else if b.createTime != createTime {
if err := b.Commit(); err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
b.createTime = createTime
}
err := l.replicateEvent(b, event)
if err != nil { if err != nil {
log.Fatal("replication error %s, skip to next", err.Error()) log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent return ErrSkipEvent
@ -119,15 +159,18 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
return nil return nil
} }
return ReadEventFromReader(rb, f) err := ReadEventFromReader(rb, f)
if err != nil {
b.Rollback()
return err
}
return b.Commit()
} }
func (l *Ledis) ReplicateFromData(data []byte) error { func (l *Ledis) ReplicateFromData(data []byte) error {
rb := bytes.NewReader(data) rb := bytes.NewReader(data)
l.Lock()
err := l.ReplicateFromReader(rb) err := l.ReplicateFromReader(rb)
l.Unlock()
return err return err
} }
@ -140,17 +183,13 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error {
rb := bufio.NewReaderSize(f, 4096) rb := bufio.NewReaderSize(f, 4096)
l.Lock()
err = l.ReplicateFromReader(rb) err = l.ReplicateFromReader(rb)
l.Unlock()
f.Close() f.Close()
return err return err
} }
const maxSyncEvents = 64
func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
n = 0 n = 0
if l.binlog == nil { if l.binlog == nil {
@ -205,8 +244,6 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
var createTime uint32 var createTime uint32
var dataLen uint32 var dataLen uint32
var eventsNum int = 0
for { for {
if err = binary.Read(f, binary.BigEndian, &createTime); err != nil { if err = binary.Read(f, binary.BigEndian, &createTime); err != nil {
if err == io.EOF { if err == io.EOF {
@ -222,13 +259,10 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
} }
} }
eventsNum++
if lastCreateTime == 0 { if lastCreateTime == 0 {
lastCreateTime = createTime lastCreateTime = createTime
} else if lastCreateTime != createTime { } else if lastCreateTime != createTime {
return return
} else if eventsNum > maxSyncEvents {
return
} }
if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil { if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil {

View File

@ -24,42 +24,49 @@ type batch struct {
} }
type dbBatchLocker struct { type dbBatchLocker struct {
sync.Mutex l *sync.Mutex
dbLock *sync.RWMutex wrLock *sync.RWMutex
}
func (l *dbBatchLocker) Lock() {
l.wrLock.RLock()
l.l.Lock()
}
func (l *dbBatchLocker) Unlock() {
l.l.Unlock()
l.wrLock.RUnlock()
} }
type txBatchLocker struct { type txBatchLocker struct {
} }
func (l *txBatchLocker) Lock() { func (l *txBatchLocker) Lock() {}
} func (l *txBatchLocker) Unlock() {}
func (l *txBatchLocker) Unlock() { func (l *Ledis) newBatch(wb store.WriteBatch, tx *Tx) *batch {
}
func (l *dbBatchLocker) Lock() {
l.dbLock.RLock()
l.Mutex.Lock()
}
func (l *dbBatchLocker) Unlock() {
l.Mutex.Unlock()
l.dbLock.RUnlock()
}
func (db *DB) newBatch() *batch {
b := new(batch) b := new(batch)
b.l = l
b.WriteBatch = wb
b.WriteBatch = db.bucket.NewWriteBatch() b.tx = tx
b.Locker = &dbBatchLocker{dbLock: db.dbLock} if tx == nil {
b.l = db.l b.Locker = &dbBatchLocker{l: &sync.Mutex{}, wrLock: &l.wLock}
} else {
b.Locker = &txBatchLocker{}
}
b.logs = [][]byte{}
return b return b
} }
func (db *DB) newBatch() *batch {
return db.l.newBatch(db.bucket.NewWriteBatch(), nil)
}
func (b *batch) Commit() error { func (b *batch) Commit() error {
b.l.Lock() b.l.commitLock.Lock()
defer b.l.Unlock() defer b.l.commitLock.Unlock()
err := b.WriteBatch.Commit() err := b.WriteBatch.Commit()
@ -85,7 +92,7 @@ func (b *batch) Unlock() {
if b.l.binlog != nil { if b.l.binlog != nil {
b.logs = [][]byte{} b.logs = [][]byte{}
} }
b.Rollback() b.WriteBatch.Rollback()
b.Locker.Unlock() b.Locker.Unlock()
} }
@ -129,11 +136,10 @@ func (db *DB) Begin() (*Tx, error) {
tx := new(Tx) tx := new(Tx)
tx.DB = new(DB) tx.DB = new(DB)
tx.DB.dbLock = db.dbLock
tx.DB.dbLock.Lock()
tx.DB.l = db.l tx.DB.l = db.l
tx.l.wLock.Lock()
tx.index = db.index tx.index = db.index
tx.DB.sdb = db.sdb tx.DB.sdb = db.sdb
@ -141,7 +147,7 @@ func (db *DB) Begin() (*Tx, error) {
var err error var err error
tx.tx, err = db.sdb.Begin() tx.tx, err = db.sdb.Begin()
if err != nil { if err != nil {
tx.DB.dbLock.Unlock() tx.l.wLock.Unlock()
return nil, err return nil, err
} }
@ -151,12 +157,12 @@ func (db *DB) Begin() (*Tx, error) {
tx.DB.index = db.index tx.DB.index = db.index
tx.DB.kvBatch = tx.newBatch() tx.DB.kvBatch = tx.newTxBatch()
tx.DB.listBatch = tx.newBatch() tx.DB.listBatch = tx.newTxBatch()
tx.DB.hashBatch = tx.newBatch() tx.DB.hashBatch = tx.newTxBatch()
tx.DB.zsetBatch = tx.newBatch() tx.DB.zsetBatch = tx.newTxBatch()
tx.DB.binBatch = tx.newBatch() tx.DB.binBatch = tx.newTxBatch()
tx.DB.setBatch = tx.newBatch() tx.DB.setBatch = tx.newTxBatch()
return tx, nil return tx, nil
} }
@ -166,7 +172,7 @@ func (tx *Tx) Commit() error {
return ErrTxDone return ErrTxDone
} }
tx.l.Lock() tx.l.commitLock.Lock()
err := tx.tx.Commit() err := tx.tx.Commit()
tx.tx = nil tx.tx = nil
@ -174,9 +180,9 @@ func (tx *Tx) Commit() error {
tx.l.binlog.Log(tx.logs...) tx.l.binlog.Log(tx.logs...)
} }
tx.l.Unlock() tx.l.commitLock.Unlock()
tx.DB.dbLock.Unlock() tx.l.wLock.Unlock()
tx.DB = nil tx.DB = nil
return err return err
} }
@ -189,20 +195,13 @@ func (tx *Tx) Rollback() error {
err := tx.tx.Rollback() err := tx.tx.Rollback()
tx.tx = nil tx.tx = nil
tx.DB.dbLock.Unlock() tx.l.wLock.Unlock()
tx.DB = nil tx.DB = nil
return err return err
} }
func (tx *Tx) newBatch() *batch { func (tx *Tx) newTxBatch() *batch {
b := new(batch) return tx.l.newBatch(tx.tx.NewWriteBatch(), tx)
b.l = tx.l
b.WriteBatch = tx.tx.NewWriteBatch()
b.Locker = &txBatchLocker{}
b.tx = tx
return b
} }
func (tx *Tx) Index() int { func (tx *Tx) Index() int {