ledisdb/ledis/ledis_db.go

204 lines
4.3 KiB
Go
Raw Normal View History

package ledis
2014-07-03 07:38:52 +04:00
import (
2015-03-15 15:36:14 +03:00
"bytes"
"encoding/binary"
2014-08-16 08:38:49 +04:00
"fmt"
2014-07-25 13:58:00 +04:00
"github.com/siddontang/ledisdb/store"
2014-09-02 13:55:12 +04:00
"sync"
2014-07-03 07:38:52 +04:00
)
2014-08-25 10:18:23 +04:00
type ibucket interface {
Get(key []byte) ([]byte, error)
GetSlice(key []byte) (store.Slice, error)
2014-08-25 10:18:23 +04:00
Put(key []byte, value []byte) error
Delete(key []byte) error
NewIterator() *store.Iterator
2014-10-15 06:18:20 +04:00
NewWriteBatch() *store.WriteBatch
2014-08-25 10:18:23 +04:00
RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
}
type DB struct {
l *Ledis
sdb *store.DB
bucket ibucket
2015-03-15 15:36:14 +03:00
index int
// buffer to store index varint
indexVarBuf []byte
2014-08-25 10:18:23 +04:00
kvBatch *batch
listBatch *batch
hashBatch *batch
zsetBatch *batch
2015-03-03 04:21:00 +03:00
// binBatch *batch
setBatch *batch
2014-08-25 10:18:23 +04:00
// status uint8
ttlChecker *ttlChecker
2014-10-16 13:51:52 +04:00
lbkeys *lBlockKeys
2014-08-25 10:18:23 +04:00
}
func (l *Ledis) newDB(index int) *DB {
2014-08-25 10:18:23 +04:00
d := new(DB)
d.l = l
d.sdb = l.ldb
d.bucket = d.sdb
// d.status = DBAutoCommit
2015-03-15 15:36:14 +03:00
d.setIndex(index)
2014-08-25 10:18:23 +04:00
d.kvBatch = d.newBatch()
d.listBatch = d.newBatch()
d.hashBatch = d.newBatch()
d.zsetBatch = d.newBatch()
2015-03-03 04:21:00 +03:00
// d.binBatch = d.newBatch()
2014-08-25 10:18:23 +04:00
d.setBatch = d.newBatch()
2014-10-16 13:51:52 +04:00
d.lbkeys = newLBlockKeys()
d.ttlChecker = d.newTTLChecker()
2014-08-25 10:18:23 +04:00
return d
}
2015-03-15 15:36:14 +03:00
func decodeDBIndex(buf []byte) (int, int, error) {
index, n := binary.Uvarint(buf)
if n == 0 {
return 0, 0, fmt.Errorf("buf is too small to save index")
} else if n < 0 {
return 0, 0, fmt.Errorf("value larger than 64 bits")
} else if index > uint64(MaxDatabases) {
return 0, 0, fmt.Errorf("value %d is larger than max databases %d", index, MaxDatabases)
}
return int(index), n, nil
}
func (db *DB) setIndex(index int) {
db.index = index
// the most size for varint is 10 bytes
buf := make([]byte, 10)
n := binary.PutUvarint(buf, uint64(index))
db.indexVarBuf = buf[0:n]
}
func (db *DB) checkKeyIndex(buf []byte) (int, error) {
if len(buf) < len(db.indexVarBuf) {
return 0, fmt.Errorf("key is too small")
} else if !bytes.Equal(db.indexVarBuf, buf[0:len(db.indexVarBuf)]) {
return 0, fmt.Errorf("invalid db index")
}
return len(db.indexVarBuf), nil
}
func (db *DB) newTTLChecker() *ttlChecker {
c := new(ttlChecker)
c.db = db
c.txs = make([]*batch, maxDataType)
c.cbs = make([]onExpired, maxDataType)
c.nc = 0
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
// c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
return c
}
2014-09-02 13:55:12 +04:00
func (db *DB) newBatch() *batch {
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock})
2014-09-02 13:55:12 +04:00
}
2014-08-25 10:18:23 +04:00
func (db *DB) Index() int {
return int(db.index)
}
// func (db *DB) IsAutoCommit() bool {
// return db.status == DBAutoCommit
// }
2014-09-02 13:55:12 +04:00
func (db *DB) FlushAll() (drop int64, err error) {
all := [...](func() (int64, error)){
db.flush,
db.lFlush,
db.hFlush,
2014-07-03 07:38:52 +04:00
db.zFlush,
db.sFlush}
for _, flush := range all {
if n, e := flush(); e != nil {
err = e
return
} else {
drop += n
}
}
return
}
2014-08-25 10:18:23 +04:00
func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
var deleteFunc func(t *batch, key []byte) int64
2014-08-16 08:38:49 +04:00
var metaDataType byte
switch dataType {
case KVType:
deleteFunc = db.delete
metaDataType = KVType
case ListType:
deleteFunc = db.lDelete
metaDataType = LMetaType
case HashType:
deleteFunc = db.hDelete
metaDataType = HSizeType
case ZSetType:
deleteFunc = db.zDelete
metaDataType = ZSizeType
2015-03-03 04:21:00 +03:00
// case BitType:
// deleteFunc = db.bDelete
// metaDataType = BitMetaType
2014-08-16 12:55:36 +04:00
case SetType:
deleteFunc = db.sDelete
metaDataType = SSizeType
2014-08-16 08:38:49 +04:00
default:
return 0, fmt.Errorf("invalid data type: %s", TypeName[dataType])
}
var keys [][]byte
2015-03-02 06:10:54 +03:00
keys, err = db.scanGeneric(metaDataType, nil, 1024, false, "", false)
2014-08-16 08:38:49 +04:00
for len(keys) != 0 || err != nil {
for _, key := range keys {
2014-08-16 11:35:05 +04:00
deleteFunc(t, key)
2014-08-16 08:38:49 +04:00
db.rmExpire(t, dataType, key)
2014-08-16 11:35:05 +04:00
2014-08-16 08:38:49 +04:00
}
if err = t.Commit(); err != nil {
return
2014-08-16 11:35:05 +04:00
} else {
drop += int64(len(keys))
2014-08-16 08:38:49 +04:00
}
2015-03-02 06:10:54 +03:00
keys, err = db.scanGeneric(metaDataType, nil, 1024, false, "", false)
2014-08-16 08:38:49 +04:00
}
return
}