forked from mirror/ledisdb
unify all expire cycle activity into one routine; edit func name about flush
This commit is contained in:
parent
5a902fc9d1
commit
52c55988de
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/siddontang/go-log/log"
|
"github.com/siddontang/go-log/log"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -93,6 +94,8 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) {
|
||||||
l.dbs[i] = newDB(l, i)
|
l.dbs[i] = newDB(l, i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.activeExpireCycle()
|
||||||
|
|
||||||
return l, nil
|
return l, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,8 +113,6 @@ func newDB(l *Ledis, index uint8) *DB {
|
||||||
d.hashTx = newTx(l)
|
d.hashTx = newTx(l)
|
||||||
d.zsetTx = newTx(l)
|
d.zsetTx = newTx(l)
|
||||||
|
|
||||||
d.activeExpireCycle()
|
|
||||||
|
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,3 +149,26 @@ func (l *Ledis) FlushAll() error {
|
||||||
func (l *Ledis) DataDB() *leveldb.DB {
|
func (l *Ledis) DataDB() *leveldb.DB {
|
||||||
return l.ldb
|
return l.ldb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Ledis) activeExpireCycle() {
|
||||||
|
var executors []*elimination = make([]*elimination, len(l.dbs))
|
||||||
|
for i, db := range l.dbs {
|
||||||
|
executors[i] = db.newEliminator()
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
tick := time.NewTicker(1 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-tick.C:
|
||||||
|
for _, eli := range executors {
|
||||||
|
eli.active()
|
||||||
|
}
|
||||||
|
case <-l.quit:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tick.Stop()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
|
@ -1,15 +1,11 @@
|
||||||
package ledis
|
package ledis
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (db *DB) FlushAll() (drop int64, err error) {
|
func (db *DB) FlushAll() (drop int64, err error) {
|
||||||
all := [...](func() (int64, error)){
|
all := [...](func() (int64, error)){
|
||||||
db.Flush,
|
db.flush,
|
||||||
db.LFlush,
|
db.lFlush,
|
||||||
db.HFlush,
|
db.hFlush,
|
||||||
db.ZFlush}
|
db.zFlush}
|
||||||
|
|
||||||
for _, flush := range all {
|
for _, flush := range all {
|
||||||
if n, e := flush(); e != nil {
|
if n, e := flush(); e != nil {
|
||||||
|
@ -23,24 +19,12 @@ func (db *DB) FlushAll() (drop int64, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) activeExpireCycle() {
|
func (db *DB) newEliminator() *elimination {
|
||||||
eliminator := newEliminator(db)
|
eliminator := newEliminator(db)
|
||||||
eliminator.regRetireContext(kvExpType, db.kvTx, db.delete)
|
eliminator.regRetireContext(kvExpType, db.kvTx, db.delete)
|
||||||
eliminator.regRetireContext(lExpType, db.listTx, db.lDelete)
|
eliminator.regRetireContext(lExpType, db.listTx, db.lDelete)
|
||||||
eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete)
|
eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete)
|
||||||
eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete)
|
eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete)
|
||||||
|
|
||||||
go func() {
|
return eliminator
|
||||||
tick := time.NewTicker(1 * time.Second)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-tick.C:
|
|
||||||
eliminator.active()
|
|
||||||
case <-db.l.quit:
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tick.Stop()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -430,7 +430,7 @@ func (db *DB) HClear(key []byte) (int64, error) {
|
||||||
return num, err
|
return num, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) HFlush() (drop int64, err error) {
|
func (db *DB) hFlush() (drop int64, err error) {
|
||||||
t := db.kvTx
|
t := db.kvTx
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
|
@ -42,7 +42,7 @@ func TestDBHash(t *testing.T) {
|
||||||
func TestDBHScan(t *testing.T) {
|
func TestDBHScan(t *testing.T) {
|
||||||
db := getTestDB()
|
db := getTestDB()
|
||||||
|
|
||||||
db.HFlush()
|
db.hFlush()
|
||||||
|
|
||||||
key := []byte("a")
|
key := []byte("a")
|
||||||
db.HSet(key, []byte("1"), []byte{})
|
db.HSet(key, []byte("1"), []byte{})
|
||||||
|
|
|
@ -311,7 +311,7 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Flush() (drop int64, err error) {
|
func (db *DB) flush() (drop int64, err error) {
|
||||||
t := db.kvTx
|
t := db.kvTx
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
|
@ -395,7 +395,7 @@ func (db *DB) LClear(key []byte) (int64, error) {
|
||||||
return num, err
|
return num, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) LFlush() (drop int64, err error) {
|
func (db *DB) lFlush() (drop int64, err error) {
|
||||||
t := db.listTx
|
t := db.listTx
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
|
@ -732,7 +732,7 @@ func (db *DB) ZRangeByScoreGeneric(key []byte, min int64, max int64,
|
||||||
return db.zRange(key, min, max, withScores, offset, count, reverse)
|
return db.zRange(key, min, max, withScores, offset, count, reverse)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ZFlush() (drop int64, err error) {
|
func (db *DB) zFlush() (drop int64, err error) {
|
||||||
t := db.zsetTx
|
t := db.zsetTx
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
|
@ -220,7 +220,7 @@ func TestZSetOrder(t *testing.T) {
|
||||||
func TestDBZScan(t *testing.T) {
|
func TestDBZScan(t *testing.T) {
|
||||||
db := getTestDB()
|
db := getTestDB()
|
||||||
|
|
||||||
db.ZFlush()
|
db.zFlush()
|
||||||
|
|
||||||
key := []byte("key")
|
key := []byte("key")
|
||||||
db.ZAdd(key, pair("a", 0), pair("b", 1), pair("c", 2))
|
db.ZAdd(key, pair("a", 0), pair("b", 1), pair("c", 2))
|
||||||
|
|
Loading…
Reference in New Issue