add relay log, add quit channel

This commit is contained in:
siddontang 2014-06-05 15:59:10 +08:00
parent 51349ea12a
commit be0464a21c
3 changed files with 52 additions and 6 deletions

View File

@ -12,9 +12,12 @@ type Config struct {
DataDB leveldb.Config `json:"data_db"` DataDB leveldb.Config `json:"data_db"`
BinLog replication.BinLogConfig `json:"binlog"` BinLog replication.BinLogConfig `json:"binlog"`
RelayLog replication.RelayLogConfig `json:"relaylog"`
} }
type DB struct { type DB struct {
l *Ledis
db *leveldb.DB db *leveldb.DB
index uint8 index uint8
@ -34,6 +37,9 @@ type Ledis struct {
dbs [MaxDBNumber]*DB dbs [MaxDBNumber]*DB
binlog *replication.Log binlog *replication.Log
relaylog *replication.Log
quit chan struct{}
} }
func Open(configJson json.RawMessage) (*Ledis, error) { func Open(configJson json.RawMessage) (*Ledis, error) {
@ -53,6 +59,9 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) {
} }
l := new(Ledis) l := new(Ledis)
l.quit = make(chan struct{})
l.ldb = ldb l.ldb = ldb
if len(cfg.BinLog.Path) > 0 { if len(cfg.BinLog.Path) > 0 {
@ -64,6 +73,15 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) {
l.binlog = nil l.binlog = nil
} }
if len(cfg.RelayLog.Path) > 0 {
l.relaylog, err = replication.NewRelayLogWithConfig(&cfg.RelayLog)
if err != nil {
return nil, err
}
} else {
l.relaylog = nil
}
for i := uint8(0); i < MaxDBNumber; i++ { for i := uint8(0); i < MaxDBNumber; i++ {
l.dbs[i] = newDB(l, i) l.dbs[i] = newDB(l, i)
} }
@ -74,6 +92,8 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) {
func newDB(l *Ledis, index uint8) *DB { func newDB(l *Ledis, index uint8) *DB {
d := new(DB) d := new(DB)
d.l = l
d.db = l.ldb d.db = l.ldb
d.index = index d.index = index
@ -89,7 +109,19 @@ func newDB(l *Ledis, index uint8) *DB {
} }
func (l *Ledis) Close() { func (l *Ledis) Close() {
close(l.quit)
l.ldb.Close() l.ldb.Close()
if l.binlog != nil {
l.binlog.Close()
l.binlog = nil
}
if l.relaylog != nil {
l.relaylog.Close()
l.relaylog = nil
}
} }
func (l *Ledis) Select(index int) (*DB, error) { func (l *Ledis) Select(index int) (*DB, error) {

View File

@ -28,9 +28,16 @@ func (db *DB) activeExpireCycle() {
eliminator.regRetireContext(kvExpType, db.kvTx, db.delete) eliminator.regRetireContext(kvExpType, db.kvTx, db.delete)
go func() { go func() {
tick := time.NewTicker(1 * time.Second)
for { for {
select {
case <-tick.C:
eliminator.active() eliminator.active()
time.Sleep(1 * time.Second) case <-db.l.quit:
break
} }
}
tick.Stop()
}() }()
} }

View File

@ -12,6 +12,7 @@ import (
var ( var (
errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogEvent = errors.New("invalid binglog event")
errRelayLogNotSupported = errors.New("write relay log not supported")
) )
func (l *Ledis) replicateEvent(event []byte) error { func (l *Ledis) replicateEvent(event []byte) error {
@ -132,5 +133,11 @@ func (l *Ledis) RepliateRelayLog(relayLog string, offset int64) (int64, error) {
} }
func (l *Ledis) WriteRelayLog(data []byte) error { func (l *Ledis) WriteRelayLog(data []byte) error {
return nil if l.relaylog == nil {
return errRelayLogNotSupported
}
err := l.relaylog.Log(data)
return err
} }