From be0464a21c16cde542b0092f74158ebf4b6d9c0a Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 5 Jun 2014 15:59:10 +0800 Subject: [PATCH] add relay log, add quit channel --- ledis/ledis.go | 36 ++++++++++++++++++++++++++++++++++-- ledis/ledis_db.go | 11 +++++++++-- ledis/replication.go | 11 +++++++++-- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/ledis/ledis.go b/ledis/ledis.go index e132ab4..040f362 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -11,10 +11,13 @@ import ( type Config struct { DataDB leveldb.Config `json:"data_db"` - BinLog replication.BinLogConfig `json:"binlog"` + BinLog replication.BinLogConfig `json:"binlog"` + RelayLog replication.RelayLogConfig `json:"relaylog"` } type DB struct { + l *Ledis + db *leveldb.DB index uint8 @@ -33,7 +36,10 @@ type Ledis struct { ldb *leveldb.DB dbs [MaxDBNumber]*DB - binlog *replication.Log + binlog *replication.Log + relaylog *replication.Log + + quit chan struct{} } func Open(configJson json.RawMessage) (*Ledis, error) { @@ -53,6 +59,9 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) { } l := new(Ledis) + + l.quit = make(chan struct{}) + l.ldb = ldb if len(cfg.BinLog.Path) > 0 { @@ -64,6 +73,15 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) { l.binlog = nil } + if len(cfg.RelayLog.Path) > 0 { + l.relaylog, err = replication.NewRelayLogWithConfig(&cfg.RelayLog) + if err != nil { + return nil, err + } + } else { + l.relaylog = nil + } + for i := uint8(0); i < MaxDBNumber; i++ { l.dbs[i] = newDB(l, i) } @@ -74,6 +92,8 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) { func newDB(l *Ledis, index uint8) *DB { d := new(DB) + d.l = l + d.db = l.ldb d.index = index @@ -89,7 +109,19 @@ func newDB(l *Ledis, index uint8) *DB { } func (l *Ledis) Close() { + close(l.quit) + l.ldb.Close() + + if l.binlog != nil { + l.binlog.Close() + l.binlog = nil + } + + if l.relaylog != nil { + l.relaylog.Close() + l.relaylog = nil + } } func (l *Ledis) Select(index int) (*DB, error) { diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index a45f59b..cae79f9 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -28,9 +28,16 @@ func (db *DB) activeExpireCycle() { eliminator.regRetireContext(kvExpType, db.kvTx, db.delete) go func() { + tick := time.NewTicker(1 * time.Second) for { - eliminator.active() - time.Sleep(1 * time.Second) + select { + case <-tick.C: + eliminator.active() + case <-db.l.quit: + break + } } + + tick.Stop() }() } diff --git a/ledis/replication.go b/ledis/replication.go index 2559dba..111f96b 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -11,7 +11,8 @@ import ( ) var ( - errInvalidBinLogEvent = errors.New("invalid binglog event") + errInvalidBinLogEvent = errors.New("invalid binglog event") + errRelayLogNotSupported = errors.New("write relay log not supported") ) func (l *Ledis) replicateEvent(event []byte) error { @@ -132,5 +133,11 @@ func (l *Ledis) RepliateRelayLog(relayLog string, offset int64) (int64, error) { } func (l *Ledis) WriteRelayLog(data []byte) error { - return nil + if l.relaylog == nil { + return errRelayLogNotSupported + } + + err := l.relaylog.Log(data) + + return err }