optimize and bugfix TTL, must upgrade manually

This commit is contained in:
siddontang 2014-10-29 16:02:46 +08:00
parent 9282c7223e
commit 21fee60d8b
8 changed files with 209 additions and 62 deletions

View File

@ -24,6 +24,7 @@ var slaveof = flag.String("slaveof", "", "make the server a slave of another ins
var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly")
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not") var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not")
var ttlCheck = flag.Int("ttl_check", 1, "TTL check interval")
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
@ -67,6 +68,8 @@ func main() {
cfg.Replication.Sync = *rplSync cfg.Replication.Sync = *rplSync
} }
cfg.TTLCheckInterval = *ttlCheck
var app *server.App var app *server.App
app, err = server.NewApp(cfg) app, err = server.NewApp(cfg)
if err != nil { if err != nil {

View File

@ -113,6 +113,8 @@ type Config struct {
ConnReadBufferSize int `toml:"conn_read_buffer_size"` ConnReadBufferSize int `toml:"conn_read_buffer_size"`
ConnWriteBufferSize int `toml:"conn_write_buffer_size"` ConnWriteBufferSize int `toml:"conn_write_buffer_size"`
TTLCheckInterval int `toml:"ttl_check_interval"`
} }
func NewConfigWithFile(fileName string) (*Config, error) { func NewConfigWithFile(fileName string) (*Config, error) {
@ -196,6 +198,7 @@ func (cfg *Config) adjust() {
cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays) cfg.Replication.ExpiredLogDays = getDefault(7, cfg.Replication.ExpiredLogDays)
cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize) cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize)
cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize) cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize)
cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval)
} }
func (cfg *LevelDBConfig) adjust() { func (cfg *LevelDBConfig) adjust() {

View File

@ -47,6 +47,10 @@ use_replication = false
conn_read_buffer_size = 10240 conn_read_buffer_size = 10240
conn_write_buffer_size = 10240 conn_write_buffer_size = 10240
# checking TTL (time to live) data every n seconds
# if you set big, the expired data may not be deleted immediately
ttl_check_interval = 1
[leveldb] [leveldb]
# for leveldb and goleveldb # for leveldb and goleveldb
compression = false compression = false

View File

@ -21,8 +21,13 @@ const (
maxDataType byte = 100 maxDataType byte = 100
ExpTimeType byte = 101 /*
I make a big mistake about TTL time key format and have to use a new one (change 101 to 103).
You must run the ledis-upgrade-ttl to upgrade db.
*/
ObsoleteExpTimeType byte = 101
ExpMetaType byte = 102 ExpMetaType byte = 102
ExpTimeType byte = 103
MetaType byte = 201 MetaType byte = 201
) )

View File

@ -34,6 +34,8 @@ type Ledis struct {
commitLock sync.Mutex //allow one write commit at same time commitLock sync.Mutex //allow one write commit at same time
lock io.Closer lock io.Closer
tcs [MaxDBNumber]*ttlChecker
} }
func Open(cfg *config.Config) (*Ledis, error) { func Open(cfg *config.Config) (*Ledis, error) {
@ -81,7 +83,7 @@ func Open(cfg *config.Config) (*Ledis, error) {
} }
l.wg.Add(1) l.wg.Add(1)
go l.onDataExpired() go l.checkTTL()
return l, nil return l, nil
} }
@ -165,18 +167,28 @@ func (l *Ledis) IsReadOnly() bool {
return false return false
} }
func (l *Ledis) onDataExpired() { func (l *Ledis) checkTTL() {
defer l.wg.Done() defer l.wg.Done()
var executors []*elimination = make([]*elimination, len(l.dbs))
for i, db := range l.dbs { for i, db := range l.dbs {
executors[i] = db.newEliminator() c := newTTLChecker(db)
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
l.tcs[i] = c
} }
tick := time.NewTicker(1 * time.Second) if l.cfg.TTLCheckInterval == 0 {
defer tick.Stop() l.cfg.TTLCheckInterval = 1
}
done := make(chan struct{}) tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second)
defer tick.Stop()
for { for {
select { select {
@ -185,13 +197,9 @@ func (l *Ledis) onDataExpired() {
break break
} }
go func() { for _, c := range l.tcs {
for _, eli := range executors { c.check()
eli.active()
} }
done <- struct{}{}
}()
<-done
case <-l.quit: case <-l.quit:
return return
} }

View File

@ -100,19 +100,6 @@ func (db *DB) FlushAll() (drop int64, err error) {
return return
} }
func (db *DB) newEliminator() *elimination {
eliminator := newEliminator(db)
eliminator.regRetireContext(KVType, db.kvBatch, db.delete)
eliminator.regRetireContext(ListType, db.listBatch, db.lDelete)
eliminator.regRetireContext(HashType, db.hashBatch, db.hDelete)
eliminator.regRetireContext(ZSetType, db.zsetBatch, db.zDelete)
eliminator.regRetireContext(BitType, db.binBatch, db.bDelete)
eliminator.regRetireContext(SetType, db.setBatch, db.sDelete)
return eliminator
}
func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) { func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
var deleteFunc func(t *batch, key []byte) int64 var deleteFunc func(t *batch, key []byte) int64
var metaDataType byte var metaDataType byte

View File

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/ledisdb/store" "github.com/siddontang/ledisdb/store"
"sync"
"time" "time"
) )
@ -12,12 +13,16 @@ var (
errExpTimeKey = errors.New("invalid expire time key") errExpTimeKey = errors.New("invalid expire time key")
) )
type retireCallback func(*batch, []byte) int64 type onExpired func(*batch, []byte) int64
type elimination struct { type ttlChecker struct {
sync.Mutex
db *DB db *DB
exp2Tx []*batch txs []*batch
exp2Retire []retireCallback cbs []onExpired
//next check time
nc int64
} }
var errExpType = errors.New("invalid expire type") var errExpType = errors.New("invalid expire type")
@ -27,12 +32,14 @@ func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
buf[0] = db.index buf[0] = db.index
buf[1] = ExpTimeType buf[1] = ExpTimeType
buf[2] = dataType pos := 2
pos := 3
binary.BigEndian.PutUint64(buf[pos:], uint64(when)) binary.BigEndian.PutUint64(buf[pos:], uint64(when))
pos += 8 pos += 8
buf[pos] = dataType
pos++
copy(buf[pos:], key) copy(buf[pos:], key)
return buf return buf
@ -64,7 +71,7 @@ func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
return 0, nil, 0, errExpTimeKey return 0, nil, 0, errExpTimeKey
} }
return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil return tk[10], tk[11:], int64(binary.BigEndian.Uint64(tk[2:])), nil
} }
func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) { func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) {
@ -77,6 +84,8 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
t.Put(tk, mk) t.Put(tk, mk)
t.Put(mk, PutInt64(when)) t.Put(mk, PutInt64(when))
db.l.tcs[db.index].setNextCheckTime(when, false)
} }
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
@ -111,48 +120,68 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) {
} }
} }
////////////////////////////////////////////////////////// func newTTLChecker(db *DB) *ttlChecker {
// c := new(ttlChecker)
////////////////////////////////////////////////////////// c.db = db
c.txs = make([]*batch, maxDataType)
func newEliminator(db *DB) *elimination { c.cbs = make([]onExpired, maxDataType)
eli := new(elimination) c.nc = 0
eli.db = db return c
eli.exp2Tx = make([]*batch, maxDataType)
eli.exp2Retire = make([]retireCallback, maxDataType)
return eli
} }
func (eli *elimination) regRetireContext(dataType byte, t *batch, onRetire retireCallback) { func (c *ttlChecker) register(dataType byte, t *batch, f onExpired) {
c.txs[dataType] = t
// todo .. need to ensure exist - mapExpMetaType[expType] c.cbs[dataType] = f
eli.exp2Tx[dataType] = t
eli.exp2Retire[dataType] = onRetire
} }
// call by outside ... (from *db to another *db) func (c *ttlChecker) setNextCheckTime(when int64, force bool) {
func (eli *elimination) active() { c.Lock()
if force {
c.nc = when
} else if !force && c.nc > when {
c.nc = when
}
c.Unlock()
}
func (c *ttlChecker) check() {
now := time.Now().Unix() now := time.Now().Unix()
db := eli.db
c.Lock()
nc := c.nc
c.Unlock()
if now < nc {
return
}
nc = now + 3600
db := c.db
dbGet := db.bucket.Get dbGet := db.bucket.Get
minKey := db.expEncodeTimeKey(NoneType, nil, 0) minKey := db.expEncodeTimeKey(NoneType, nil, 0)
maxKey := db.expEncodeTimeKey(maxDataType, nil, now) maxKey := db.expEncodeTimeKey(maxDataType, nil, nc)
it := db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1) it := db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
tk := it.RawKey() tk := it.RawKey()
mk := it.RawValue() mk := it.RawValue()
dt, k, _, err := db.expDecodeTimeKey(tk) dt, k, nt, err := db.expDecodeTimeKey(tk)
if err != nil { if err != nil {
continue continue
} }
t := eli.exp2Tx[dt] if nt > now {
onRetire := eli.exp2Retire[dt] //the next ttl check time is nt!
if tk == nil || onRetire == nil { nc = nt
break
}
t := c.txs[dt]
cb := c.cbs[dt]
if tk == nil || cb == nil {
continue continue
} }
@ -161,7 +190,7 @@ func (eli *elimination) active() {
if exp, err := Int64(dbGet(mk)); err == nil { if exp, err := Int64(dbGet(mk)); err == nil {
// check expire again // check expire again
if exp <= now { if exp <= now {
onRetire(t, k) cb(t, k)
t.Delete(tk) t.Delete(tk)
t.Delete(mk) t.Delete(mk)
@ -174,5 +203,7 @@ func (eli *elimination) active() {
} }
it.Close() it.Close()
c.setNextCheckTime(nc, true)
return return
} }

View File

@ -0,0 +1,106 @@
package main
import (
"encoding/binary"
"flag"
"fmt"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/store"
)
var configPath = flag.String("config", "", "ledisdb config file")
func main() {
flag.Parse()
if len(*configPath) == 0 {
println("need ledis config file")
return
}
cfg, err := config.NewConfigWithFile(*configPath)
if err != nil {
println(err.Error())
return
}
db, err := store.Open(cfg)
if err != nil {
println(err.Error())
return
}
// upgrade: ttl time key 101 to ttl time key 103
wb := db.NewWriteBatch()
for i := uint8(0); i < ledis.MaxDBNumber; i++ {
minK, maxK := oldKeyPair(i)
it := db.RangeIterator(minK, maxK, store.RangeROpen)
num := 0
for ; it.Valid(); it.Next() {
dt, k, t, err := decodeOldKey(i, it.RawKey())
if err != nil {
continue
}
newKey := encodeNewKey(i, dt, k, t)
wb.Put(newKey, it.RawValue())
wb.Delete(it.RawKey())
num++
if num%1024 == 0 {
if err := wb.Commit(); err != nil {
fmt.Printf("commit error :%s\n", err.Error())
}
}
}
it.Close()
if err := wb.Commit(); err != nil {
fmt.Printf("commit error :%s\n", err.Error())
}
}
}
func oldKeyPair(index uint8) ([]byte, []byte) {
minB := make([]byte, 11)
minB[0] = index
minB[1] = ledis.ObsoleteExpTimeType
minB[2] = 0
maxB := make([]byte, 11)
maxB[0] = index
maxB[1] = ledis.ObsoleteExpTimeType
maxB[2] = 255
return minB, maxB
}
func decodeOldKey(index uint8, tk []byte) (byte, []byte, int64, error) {
if len(tk) < 11 || tk[0] != index || tk[1] != ledis.ObsoleteExpTimeType {
return 0, nil, 0, fmt.Errorf("invalid exp time key")
}
return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil
}
func encodeNewKey(index uint8, dataType byte, key []byte, when int64) []byte {
buf := make([]byte, len(key)+11)
buf[0] = index
buf[1] = ledis.ExpTimeType
pos := 2
binary.BigEndian.PutUint64(buf[pos:], uint64(when))
pos += 8
buf[pos] = dataType
pos++
copy(buf[pos:], key)
return buf
}