From 21fee60d8bc8aca4bf49d9beaedc34699f26fe03 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 29 Oct 2014 16:02:46 +0800 Subject: [PATCH] optimize and bugfix TTL, must upgrade manually --- cmd/ledis-server/main.go | 3 + config/config.go | 3 + config/config.toml | 4 ++ ledis/const.go | 9 ++- ledis/ledis.go | 36 ++++++---- ledis/ledis_db.go | 13 ---- ledis/t_ttl.go | 97 +++++++++++++++++---------- upgrade/ledis-upgrade-ttl/main.go | 106 ++++++++++++++++++++++++++++++ 8 files changed, 209 insertions(+), 62 deletions(-) create mode 100644 upgrade/ledis-upgrade-ttl/main.go diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index 32be6c2..0ba897c 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -24,6 +24,7 @@ var slaveof = flag.String("slaveof", "", "make the server a slave of another ins var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not") +var ttlCheck = flag.Int("ttl_check", 1, "TTL check interval") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -67,6 +68,8 @@ func main() { cfg.Replication.Sync = *rplSync } + cfg.TTLCheckInterval = *ttlCheck + var app *server.App app, err = server.NewApp(cfg) if err != nil { diff --git a/config/config.go b/config/config.go index 5353ffc..3d197e3 100644 --- a/config/config.go +++ b/config/config.go @@ -113,6 +113,8 @@ type Config struct { ConnReadBufferSize int `toml:"conn_read_buffer_size"` ConnWriteBufferSize int `toml:"conn_write_buffer_size"` + + TTLCheckInterval int `toml:"ttl_check_interval"` } func NewConfigWithFile(fileName string) (*Config, error) { @@ -196,6 +198,7 @@ func (cfg *Config) adjust() { cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays) cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize) cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize) + cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval) } func (cfg *LevelDBConfig) adjust() { diff --git a/config/config.toml b/config/config.toml index 2b5d545..fc227f4 100644 --- a/config/config.toml +++ b/config/config.toml @@ -47,6 +47,10 @@ use_replication = false conn_read_buffer_size = 10240 conn_write_buffer_size = 10240 +# checking TTL (time to live) data every n seconds +# if you set big, the expired data may not be deleted immediately +ttl_check_interval = 1 + [leveldb] # for leveldb and goleveldb compression = false diff --git a/ledis/const.go b/ledis/const.go index 3e17a95..1c5eedd 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -21,8 +21,13 @@ const ( maxDataType byte = 100 - ExpTimeType byte = 101 - ExpMetaType byte = 102 + /* + I make a big mistake about TTL time key format and have to use a new one (change 101 to 103). + You must run the ledis-upgrade-ttl to upgrade db. + */ + ObsoleteExpTimeType byte = 101 + ExpMetaType byte = 102 + ExpTimeType byte = 103 MetaType byte = 201 ) diff --git a/ledis/ledis.go b/ledis/ledis.go index 10fa49c..a5e926e 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -34,6 +34,8 @@ type Ledis struct { commitLock sync.Mutex //allow one write commit at same time lock io.Closer + + tcs [MaxDBNumber]*ttlChecker } func Open(cfg *config.Config) (*Ledis, error) { @@ -81,7 +83,7 @@ func Open(cfg *config.Config) (*Ledis, error) { } l.wg.Add(1) - go l.onDataExpired() + go l.checkTTL() return l, nil } @@ -165,18 +167,28 @@ func (l *Ledis) IsReadOnly() bool { return false } -func (l *Ledis) onDataExpired() { +func (l *Ledis) checkTTL() { defer l.wg.Done() - var executors []*elimination = make([]*elimination, len(l.dbs)) for i, db := range l.dbs { - executors[i] = db.newEliminator() + c := newTTLChecker(db) + + c.register(KVType, db.kvBatch, db.delete) + c.register(ListType, db.listBatch, db.lDelete) + c.register(HashType, db.hashBatch, db.hDelete) + c.register(ZSetType, db.zsetBatch, db.zDelete) + c.register(BitType, db.binBatch, db.bDelete) + c.register(SetType, db.setBatch, db.sDelete) + + l.tcs[i] = c } - tick := time.NewTicker(1 * time.Second) - defer tick.Stop() + if l.cfg.TTLCheckInterval == 0 { + l.cfg.TTLCheckInterval = 1 + } - done := make(chan struct{}) + tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second) + defer tick.Stop() for { select { @@ -185,13 +197,9 @@ func (l *Ledis) onDataExpired() { break } - go func() { - for _, eli := range executors { - eli.active() - } - done <- struct{}{} - }() - <-done + for _, c := range l.tcs { + c.check() + } case <-l.quit: return } diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index c9609fb..bd092a3 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -100,19 +100,6 @@ func (db *DB) FlushAll() (drop int64, err error) { return } -func (db *DB) newEliminator() *elimination { - eliminator := newEliminator(db) - - eliminator.regRetireContext(KVType, db.kvBatch, db.delete) - eliminator.regRetireContext(ListType, db.listBatch, db.lDelete) - eliminator.regRetireContext(HashType, db.hashBatch, db.hDelete) - eliminator.regRetireContext(ZSetType, db.zsetBatch, db.zDelete) - eliminator.regRetireContext(BitType, db.binBatch, db.bDelete) - eliminator.regRetireContext(SetType, db.setBatch, db.sDelete) - - return eliminator -} - func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) { var deleteFunc func(t *batch, key []byte) int64 var metaDataType byte diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index 2d5e2d5..a912d26 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "github.com/siddontang/ledisdb/store" + "sync" "time" ) @@ -12,12 +13,16 @@ var ( errExpTimeKey = errors.New("invalid expire time key") ) -type retireCallback func(*batch, []byte) int64 +type onExpired func(*batch, []byte) int64 -type elimination struct { - db *DB - exp2Tx []*batch - exp2Retire []retireCallback +type ttlChecker struct { + sync.Mutex + db *DB + txs []*batch + cbs []onExpired + + //next check time + nc int64 } var errExpType = errors.New("invalid expire type") @@ -27,12 +32,14 @@ func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte { buf[0] = db.index buf[1] = ExpTimeType - buf[2] = dataType - pos := 3 + pos := 2 binary.BigEndian.PutUint64(buf[pos:], uint64(when)) pos += 8 + buf[pos] = dataType + pos++ + copy(buf[pos:], key) return buf @@ -64,7 +71,7 @@ func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) { return 0, nil, 0, errExpTimeKey } - return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil + return tk[10], tk[11:], int64(binary.BigEndian.Uint64(tk[2:])), nil } func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) { @@ -77,6 +84,8 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) { t.Put(tk, mk) t.Put(mk, PutInt64(when)) + + db.l.tcs[db.index].setNextCheckTime(when, false) } func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { @@ -111,48 +120,68 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) { } } -////////////////////////////////////////////////////////// -// -////////////////////////////////////////////////////////// - -func newEliminator(db *DB) *elimination { - eli := new(elimination) - eli.db = db - eli.exp2Tx = make([]*batch, maxDataType) - eli.exp2Retire = make([]retireCallback, maxDataType) - return eli +func newTTLChecker(db *DB) *ttlChecker { + c := new(ttlChecker) + c.db = db + c.txs = make([]*batch, maxDataType) + c.cbs = make([]onExpired, maxDataType) + c.nc = 0 + return c } -func (eli *elimination) regRetireContext(dataType byte, t *batch, onRetire retireCallback) { - - // todo .. need to ensure exist - mapExpMetaType[expType] - - eli.exp2Tx[dataType] = t - eli.exp2Retire[dataType] = onRetire +func (c *ttlChecker) register(dataType byte, t *batch, f onExpired) { + c.txs[dataType] = t + c.cbs[dataType] = f } -// call by outside ... (from *db to another *db) -func (eli *elimination) active() { +func (c *ttlChecker) setNextCheckTime(when int64, force bool) { + c.Lock() + if force { + c.nc = when + } else if !force && c.nc > when { + c.nc = when + } + c.Unlock() +} + +func (c *ttlChecker) check() { now := time.Now().Unix() - db := eli.db + + c.Lock() + nc := c.nc + c.Unlock() + + if now < nc { + return + } + + nc = now + 3600 + + db := c.db dbGet := db.bucket.Get minKey := db.expEncodeTimeKey(NoneType, nil, 0) - maxKey := db.expEncodeTimeKey(maxDataType, nil, now) + maxKey := db.expEncodeTimeKey(maxDataType, nil, nc) it := db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { tk := it.RawKey() mk := it.RawValue() - dt, k, _, err := db.expDecodeTimeKey(tk) + dt, k, nt, err := db.expDecodeTimeKey(tk) if err != nil { continue } - t := eli.exp2Tx[dt] - onRetire := eli.exp2Retire[dt] - if tk == nil || onRetire == nil { + if nt > now { + //the next ttl check time is nt! + nc = nt + break + } + + t := c.txs[dt] + cb := c.cbs[dt] + if tk == nil || cb == nil { continue } @@ -161,7 +190,7 @@ func (eli *elimination) active() { if exp, err := Int64(dbGet(mk)); err == nil { // check expire again if exp <= now { - onRetire(t, k) + cb(t, k) t.Delete(tk) t.Delete(mk) @@ -174,5 +203,7 @@ func (eli *elimination) active() { } it.Close() + c.setNextCheckTime(nc, true) + return } diff --git a/upgrade/ledis-upgrade-ttl/main.go b/upgrade/ledis-upgrade-ttl/main.go new file mode 100644 index 0000000..ea8dab1 --- /dev/null +++ b/upgrade/ledis-upgrade-ttl/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "encoding/binary" + "flag" + "fmt" + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/ledis" + "github.com/siddontang/ledisdb/store" +) + +var configPath = flag.String("config", "", "ledisdb config file") + +func main() { + flag.Parse() + + if len(*configPath) == 0 { + println("need ledis config file") + return + } + + cfg, err := config.NewConfigWithFile(*configPath) + if err != nil { + println(err.Error()) + return + } + + db, err := store.Open(cfg) + if err != nil { + println(err.Error()) + return + } + + // upgrade: ttl time key 101 to ttl time key 103 + + wb := db.NewWriteBatch() + + for i := uint8(0); i < ledis.MaxDBNumber; i++ { + minK, maxK := oldKeyPair(i) + + it := db.RangeIterator(minK, maxK, store.RangeROpen) + num := 0 + for ; it.Valid(); it.Next() { + dt, k, t, err := decodeOldKey(i, it.RawKey()) + if err != nil { + continue + } + + newKey := encodeNewKey(i, dt, k, t) + + wb.Put(newKey, it.RawValue()) + wb.Delete(it.RawKey()) + num++ + if num%1024 == 0 { + if err := wb.Commit(); err != nil { + fmt.Printf("commit error :%s\n", err.Error()) + } + } + } + it.Close() + + if err := wb.Commit(); err != nil { + fmt.Printf("commit error :%s\n", err.Error()) + } + } +} + +func oldKeyPair(index uint8) ([]byte, []byte) { + minB := make([]byte, 11) + minB[0] = index + minB[1] = ledis.ObsoleteExpTimeType + minB[2] = 0 + + maxB := make([]byte, 11) + maxB[0] = index + maxB[1] = ledis.ObsoleteExpTimeType + maxB[2] = 255 + + return minB, maxB +} + +func decodeOldKey(index uint8, tk []byte) (byte, []byte, int64, error) { + if len(tk) < 11 || tk[0] != index || tk[1] != ledis.ObsoleteExpTimeType { + return 0, nil, 0, fmt.Errorf("invalid exp time key") + } + + return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil +} + +func encodeNewKey(index uint8, dataType byte, key []byte, when int64) []byte { + buf := make([]byte, len(key)+11) + + buf[0] = index + buf[1] = ledis.ExpTimeType + pos := 2 + + binary.BigEndian.PutUint64(buf[pos:], uint64(when)) + pos += 8 + + buf[pos] = dataType + pos++ + + copy(buf[pos:], key) + + return buf +}