From 1b8d5af5883cbf6c22a24abe8b109ac670a3fbbf Mon Sep 17 00:00:00 2001 From: siddontang Date: Sun, 15 Mar 2015 11:39:31 +0800 Subject: [PATCH] generate db if not exists when select put TTL checker in associated db too --- config/config.go | 5 +---- ledis/const.go | 12 ++++++----- ledis/ledis.go | 53 +++++++++++++++++++++++++++-------------------- ledis/ledis_db.go | 35 ++++++++++++++++++++++++------- ledis/t_list.go | 13 ++---------- ledis/t_ttl.go | 12 +---------- 6 files changed, 69 insertions(+), 61 deletions(-) diff --git a/config/config.go b/config/config.go index 8567b76..6ff5242 100644 --- a/config/config.go +++ b/config/config.go @@ -214,10 +214,7 @@ func (cfg *Config) adjust() { cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize) cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize) cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval) - cfg.Databases = getDefault(0, cfg.Databases) - if cfg.Databases > 16 { - cfg.Databases = 16 - } + cfg.Databases = getDefault(16, cfg.Databases) } func (cfg *LevelDBConfig) adjust() { diff --git a/ledis/const.go b/ledis/const.go index c6461db..393b97d 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -104,6 +104,8 @@ var ( ) const ( + MaxDatabases int = 16 + //max key size MaxKeySize int = 1024 @@ -127,11 +129,11 @@ var ( ErrRplNotSupport = errors.New("replication not support") ) -const ( - DBAutoCommit uint8 = 0x0 - DBInTransaction uint8 = 0x1 - DBInMulti uint8 = 0x2 -) +// const ( +// DBAutoCommit uint8 = 0x0 +// DBInTransaction uint8 = 0x1 +// DBInMulti uint8 = 0x2 +// ) const ( BitAND = "and" diff --git a/ledis/ledis.go b/ledis/ledis.go index 60c06a2..01e0977 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -18,7 +18,9 @@ type Ledis struct { cfg *config.Config ldb *store.DB - dbs []*DB + + dbLock sync.Mutex + dbs map[int]*DB quit chan struct{} wg sync.WaitGroup @@ -35,7 +37,8 @@ type Ledis struct { lock io.Closer - tcs []*ttlChecker + ttlCheckers []*ttlChecker + ttlCheckerCh chan *ttlChecker } func Open(cfg *config.Config) (*Ledis, error) { @@ -84,10 +87,7 @@ func Open(cfg *config.Config) (*Ledis, error) { l.r = nil } - l.dbs = make([]*DB, cfg.Databases) - for i := 0; i < cfg.Databases; i++ { - l.dbs[i] = l.newDB(uint8(i)) - } + l.dbs = make(map[int]*DB, 16) l.checkTTL() @@ -112,11 +112,26 @@ func (l *Ledis) Close() { } func (l *Ledis) Select(index int) (*DB, error) { - if index < 0 || index >= len(l.dbs) { - return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, len(l.dbs)-1) + if index < 0 || index >= MaxDatabases { + return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, MaxDatabases-1) } - return l.dbs[index], nil + l.dbLock.Lock() + defer l.dbLock.Unlock() + + db, ok := l.dbs[index] + if ok { + return db, nil + } + + db = l.newDB(index) + l.dbs[index] = db + + go func(db *DB) { + l.ttlCheckerCh <- db.ttlChecker + }(db) + + return db, nil } // Flush All will clear all data and replication logs @@ -176,19 +191,8 @@ func (l *Ledis) IsReadOnly() bool { } func (l *Ledis) checkTTL() { - l.tcs = make([]*ttlChecker, len(l.dbs)) - for i, db := range l.dbs { - c := newTTLChecker(db) - - c.register(KVType, db.kvBatch, db.delete) - c.register(ListType, db.listBatch, db.lDelete) - c.register(HashType, db.hashBatch, db.hDelete) - c.register(ZSetType, db.zsetBatch, db.zDelete) - // c.register(BitType, db.binBatch, db.bDelete) - c.register(SetType, db.setBatch, db.sDelete) - - l.tcs[i] = c - } + l.ttlCheckers = make([]*ttlChecker, 0, 16) + l.ttlCheckerCh = make(chan *ttlChecker, 16) if l.cfg.TTLCheckInterval == 0 { l.cfg.TTLCheckInterval = 1 @@ -208,9 +212,12 @@ func (l *Ledis) checkTTL() { break } - for _, c := range l.tcs { + for _, c := range l.ttlCheckers { c.check() } + case c := <-l.ttlCheckerCh: + l.ttlCheckers = append(l.ttlCheckers, c) + c.check() case <-l.quit: return } diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index b4a8016..21811b5 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -39,12 +39,14 @@ type DB struct { // binBatch *batch setBatch *batch - status uint8 + // status uint8 + + ttlChecker *ttlChecker lbkeys *lBlockKeys } -func (l *Ledis) newDB(index uint8) *DB { +func (l *Ledis) newDB(index int) *DB { d := new(DB) d.l = l @@ -53,8 +55,8 @@ func (l *Ledis) newDB(index uint8) *DB { d.bucket = d.sdb - d.status = DBAutoCommit - d.index = index + // d.status = DBAutoCommit + d.index = uint8(index) d.kvBatch = d.newBatch() d.listBatch = d.newBatch() @@ -65,9 +67,28 @@ func (l *Ledis) newDB(index uint8) *DB { d.lbkeys = newLBlockKeys() + d.ttlChecker = d.newTTLChecker() + return d } +func (db *DB) newTTLChecker() *ttlChecker { + c := new(ttlChecker) + c.db = db + c.txs = make([]*batch, maxDataType) + c.cbs = make([]onExpired, maxDataType) + c.nc = 0 + + c.register(KVType, db.kvBatch, db.delete) + c.register(ListType, db.listBatch, db.lDelete) + c.register(HashType, db.hashBatch, db.hDelete) + c.register(ZSetType, db.zsetBatch, db.zDelete) + // c.register(BitType, db.binBatch, db.bDelete) + c.register(SetType, db.setBatch, db.sDelete) + + return c +} + func (db *DB) newBatch() *batch { return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}) } @@ -76,9 +97,9 @@ func (db *DB) Index() int { return int(db.index) } -func (db *DB) IsAutoCommit() bool { - return db.status == DBAutoCommit -} +// func (db *DB) IsAutoCommit() bool { +// return db.status == DBAutoCommit +// } func (db *DB) FlushAll() (drop int64, err error) { all := [...](func() (int64, error)){ diff --git a/ledis/t_list.go b/ledis/t_list.go index f5756b2..ce6eeb0 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -521,11 +521,8 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([ } else if v != nil { return []interface{}{key, v}, nil } else { - if db.IsAutoCommit() { - //block wait can not be supported in transaction and multi - db.lbkeys.wait(key, ch) - bkeys = append(bkeys, key) - } + db.lbkeys.wait(key, ch) + bkeys = append(bkeys, key) } } if len(bkeys) == 0 { @@ -575,12 +572,6 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([ } func (db *DB) lSignalAsReady(key []byte, num int) { - if db.status == DBInTransaction { - //for transaction, only data can be pushed after tx commit and it is hard to signal - //so we don't handle it now - return - } - db.lbkeys.signal(key, num) } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index 70c8088..7758fc8 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -85,8 +85,7 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) { t.Put(tk, mk) t.Put(mk, PutInt64(when)) - tc := db.l.tcs[db.index] - tc.setNextCheckTime(when, false) + db.ttlChecker.setNextCheckTime(when, false) } func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { @@ -121,15 +120,6 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) { } } -func newTTLChecker(db *DB) *ttlChecker { - c := new(ttlChecker) - c.db = db - c.txs = make([]*batch, maxDataType) - c.cbs = make([]onExpired, maxDataType) - c.nc = 0 - return c -} - func (c *ttlChecker) register(dataType byte, t *batch, f onExpired) { c.txs[dataType] = t c.cbs[dataType] = f