generate db if not exists when select

put TTL checker in associated db too
This commit is contained in:
siddontang 2015-03-15 11:39:31 +08:00
parent b36d4aba37
commit 1b8d5af588
6 changed files with 69 additions and 61 deletions

View File

@ -214,10 +214,7 @@ func (cfg *Config) adjust() {
cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize) cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize)
cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize) cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize)
cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval) cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval)
cfg.Databases = getDefault(0, cfg.Databases) cfg.Databases = getDefault(16, cfg.Databases)
if cfg.Databases > 16 {
cfg.Databases = 16
}
} }
func (cfg *LevelDBConfig) adjust() { func (cfg *LevelDBConfig) adjust() {

View File

@ -104,6 +104,8 @@ var (
) )
const ( const (
MaxDatabases int = 16
//max key size //max key size
MaxKeySize int = 1024 MaxKeySize int = 1024
@ -127,11 +129,11 @@ var (
ErrRplNotSupport = errors.New("replication not support") ErrRplNotSupport = errors.New("replication not support")
) )
const ( // const (
DBAutoCommit uint8 = 0x0 // DBAutoCommit uint8 = 0x0
DBInTransaction uint8 = 0x1 // DBInTransaction uint8 = 0x1
DBInMulti uint8 = 0x2 // DBInMulti uint8 = 0x2
) // )
const ( const (
BitAND = "and" BitAND = "and"

View File

@ -18,7 +18,9 @@ type Ledis struct {
cfg *config.Config cfg *config.Config
ldb *store.DB ldb *store.DB
dbs []*DB
dbLock sync.Mutex
dbs map[int]*DB
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -35,7 +37,8 @@ type Ledis struct {
lock io.Closer lock io.Closer
tcs []*ttlChecker ttlCheckers []*ttlChecker
ttlCheckerCh chan *ttlChecker
} }
func Open(cfg *config.Config) (*Ledis, error) { func Open(cfg *config.Config) (*Ledis, error) {
@ -84,10 +87,7 @@ func Open(cfg *config.Config) (*Ledis, error) {
l.r = nil l.r = nil
} }
l.dbs = make([]*DB, cfg.Databases) l.dbs = make(map[int]*DB, 16)
for i := 0; i < cfg.Databases; i++ {
l.dbs[i] = l.newDB(uint8(i))
}
l.checkTTL() l.checkTTL()
@ -112,11 +112,26 @@ func (l *Ledis) Close() {
} }
func (l *Ledis) Select(index int) (*DB, error) { func (l *Ledis) Select(index int) (*DB, error) {
if index < 0 || index >= len(l.dbs) { if index < 0 || index >= MaxDatabases {
return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, len(l.dbs)-1) return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, MaxDatabases-1)
} }
return l.dbs[index], nil l.dbLock.Lock()
defer l.dbLock.Unlock()
db, ok := l.dbs[index]
if ok {
return db, nil
}
db = l.newDB(index)
l.dbs[index] = db
go func(db *DB) {
l.ttlCheckerCh <- db.ttlChecker
}(db)
return db, nil
} }
// Flush All will clear all data and replication logs // Flush All will clear all data and replication logs
@ -176,19 +191,8 @@ func (l *Ledis) IsReadOnly() bool {
} }
func (l *Ledis) checkTTL() { func (l *Ledis) checkTTL() {
l.tcs = make([]*ttlChecker, len(l.dbs)) l.ttlCheckers = make([]*ttlChecker, 0, 16)
for i, db := range l.dbs { l.ttlCheckerCh = make(chan *ttlChecker, 16)
c := newTTLChecker(db)
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
// c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
l.tcs[i] = c
}
if l.cfg.TTLCheckInterval == 0 { if l.cfg.TTLCheckInterval == 0 {
l.cfg.TTLCheckInterval = 1 l.cfg.TTLCheckInterval = 1
@ -208,9 +212,12 @@ func (l *Ledis) checkTTL() {
break break
} }
for _, c := range l.tcs { for _, c := range l.ttlCheckers {
c.check() c.check()
} }
case c := <-l.ttlCheckerCh:
l.ttlCheckers = append(l.ttlCheckers, c)
c.check()
case <-l.quit: case <-l.quit:
return return
} }

View File

@ -39,12 +39,14 @@ type DB struct {
// binBatch *batch // binBatch *batch
setBatch *batch setBatch *batch
status uint8 // status uint8
ttlChecker *ttlChecker
lbkeys *lBlockKeys lbkeys *lBlockKeys
} }
func (l *Ledis) newDB(index uint8) *DB { func (l *Ledis) newDB(index int) *DB {
d := new(DB) d := new(DB)
d.l = l d.l = l
@ -53,8 +55,8 @@ func (l *Ledis) newDB(index uint8) *DB {
d.bucket = d.sdb d.bucket = d.sdb
d.status = DBAutoCommit // d.status = DBAutoCommit
d.index = index d.index = uint8(index)
d.kvBatch = d.newBatch() d.kvBatch = d.newBatch()
d.listBatch = d.newBatch() d.listBatch = d.newBatch()
@ -65,9 +67,28 @@ func (l *Ledis) newDB(index uint8) *DB {
d.lbkeys = newLBlockKeys() d.lbkeys = newLBlockKeys()
d.ttlChecker = d.newTTLChecker()
return d return d
} }
func (db *DB) newTTLChecker() *ttlChecker {
c := new(ttlChecker)
c.db = db
c.txs = make([]*batch, maxDataType)
c.cbs = make([]onExpired, maxDataType)
c.nc = 0
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
// c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
return c
}
func (db *DB) newBatch() *batch { func (db *DB) newBatch() *batch {
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}) return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock})
} }
@ -76,9 +97,9 @@ func (db *DB) Index() int {
return int(db.index) return int(db.index)
} }
func (db *DB) IsAutoCommit() bool { // func (db *DB) IsAutoCommit() bool {
return db.status == DBAutoCommit // return db.status == DBAutoCommit
} // }
func (db *DB) FlushAll() (drop int64, err error) { func (db *DB) FlushAll() (drop int64, err error) {
all := [...](func() (int64, error)){ all := [...](func() (int64, error)){

View File

@ -521,11 +521,8 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([
} else if v != nil { } else if v != nil {
return []interface{}{key, v}, nil return []interface{}{key, v}, nil
} else { } else {
if db.IsAutoCommit() { db.lbkeys.wait(key, ch)
//block wait can not be supported in transaction and multi bkeys = append(bkeys, key)
db.lbkeys.wait(key, ch)
bkeys = append(bkeys, key)
}
} }
} }
if len(bkeys) == 0 { if len(bkeys) == 0 {
@ -575,12 +572,6 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([
} }
func (db *DB) lSignalAsReady(key []byte, num int) { func (db *DB) lSignalAsReady(key []byte, num int) {
if db.status == DBInTransaction {
//for transaction, only data can be pushed after tx commit and it is hard to signal
//so we don't handle it now
return
}
db.lbkeys.signal(key, num) db.lbkeys.signal(key, num)
} }

View File

@ -85,8 +85,7 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
t.Put(tk, mk) t.Put(tk, mk)
t.Put(mk, PutInt64(when)) t.Put(mk, PutInt64(when))
tc := db.l.tcs[db.index] db.ttlChecker.setNextCheckTime(when, false)
tc.setNextCheckTime(when, false)
} }
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
@ -121,15 +120,6 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) {
} }
} }
func newTTLChecker(db *DB) *ttlChecker {
c := new(ttlChecker)
c.db = db
c.txs = make([]*batch, maxDataType)
c.cbs = make([]onExpired, maxDataType)
c.nc = 0
return c
}
func (c *ttlChecker) register(dataType byte, t *batch, f onExpired) { func (c *ttlChecker) register(dataType byte, t *batch, f onExpired) {
c.txs[dataType] = t c.txs[dataType] = t
c.cbs[dataType] = f c.cbs[dataType] = f