adjust key/value max size, change step commit in flush

This commit is contained in:
siddontang 2014-05-28 14:20:34 +08:00
parent f66ffb18dc
commit d9dcab1d6b
8 changed files with 68 additions and 27 deletions

View File

@ -263,11 +263,19 @@ func (b *BinLog) Log(args ...[]byte) error {
return nil return nil
} }
func (b *BinLog) SavePoint() (string, int64) { func (b *BinLog) LogFileName() string {
if len(b.logNames) == 0 {
return ""
} else {
return b.logNames[len(b.logNames)-1]
}
}
func (b *BinLog) LogFilePos() int64 {
if b.logFile == nil { if b.logFile == nil {
return "", 0 return 0
} else { } else {
st, _ := b.logFile.Stat() st, _ := b.logFile.Stat()
return b.logNames[len(b.logNames)-1], st.Size() return st.Size()
} }
} }

View File

@ -24,17 +24,21 @@ const (
MaxDBNumber uint8 = 16 MaxDBNumber uint8 = 16
//max key size //max key size
MaxKeySize int = 1<<16 - 1 MaxKeySize int = 1024
//max hash field size //max hash field size
MaxHashFieldSize int = 1<<16 - 1 MaxHashFieldSize int = 1024
//max zset member size //max zset member size
MaxZSetMemberSize int = 1<<16 - 1 MaxZSetMemberSize int = 1024
//max value size
MaxValueSize int = 10 * 1024 * 1024
) )
var ( var (
ErrKeySize = errors.New("invalid key size") ErrKeySize = errors.New("invalid key size")
ErrValueSize = errors.New("invalid value size")
ErrHashFieldSize = errors.New("invalid hash field size") ErrHashFieldSize = errors.New("invalid hash field size")
ErrZSetMemberSize = errors.New("invalid zset member size") ErrZSetMemberSize = errors.New("invalid zset member size")
) )

View File

@ -93,16 +93,3 @@ func (l *Ledis) Select(index int) (*DB, error) {
return l.dbs[index], nil return l.dbs[index], nil
} }
func (l *Ledis) Snapshot() (*leveldb.Snapshot, string, int64) {
if l.binlog == nil {
return l.ldb.NewSnapshot(), "", 0
} else {
l.binlog.Lock()
s := l.ldb.NewSnapshot()
fileName, offset := l.binlog.SavePoint()
l.binlog.Unlock()
return s, fileName, offset
}
}

View File

@ -130,6 +130,8 @@ func (db *DB) hSetItem(key []byte, field []byte, value []byte) (int64, error) {
func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) {
if err := checkHashKFSize(key, field); err != nil { if err := checkHashKFSize(key, field); err != nil {
return 0, err return 0, err
} else if err := checkValueSize(value); err != nil {
return 0, err
} }
t := db.hashTx t := db.hashTx
@ -166,6 +168,8 @@ func (db *DB) HMset(key []byte, args ...FVPair) error {
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
if err := checkHashKFSize(key, args[i].Field); err != nil { if err := checkHashKFSize(key, args[i].Field); err != nil {
return err return err
} else if err := checkValueSize(args[i].Value); err != nil {
return err
} }
ek = db.hEncodeHashKey(key, args[i].Field) ek = db.hEncodeHashKey(key, args[i].Field)
@ -414,6 +418,11 @@ func (db *DB) HFlush() (drop int64, err error) {
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())
drop++ drop++
if drop%1000 == 0 {
if err = t.Commit(); err != nil {
return
}
}
} }
err = t.Commit() err = t.Commit()

View File

@ -19,6 +19,14 @@ func checkKeySize(key []byte) error {
return nil return nil
} }
func checkValueSize(value []byte) error {
if len(value) > MaxValueSize {
return ErrValueSize
}
return nil
}
func (db *DB) encodeKVKey(key []byte) []byte { func (db *DB) encodeKVKey(key []byte) []byte {
ek := make([]byte, len(key)+2) ek := make([]byte, len(key)+2)
ek[0] = db.index ek[0] = db.index
@ -137,6 +145,8 @@ func (db *DB) Get(key []byte) ([]byte, error) {
func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) { func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) {
if err := checkKeySize(key); err != nil { if err := checkKeySize(key); err != nil {
return nil, err return nil, err
} else if err := checkValueSize(value); err != nil {
return nil, err
} }
key = db.encodeKVKey(key) key = db.encodeKVKey(key)
@ -204,6 +214,8 @@ func (db *DB) MSet(args ...KVPair) error {
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
if err := checkKeySize(args[i].Key); err != nil { if err := checkKeySize(args[i].Key); err != nil {
return err return err
} else if err := checkValueSize(args[i].Value); err != nil {
return err
} }
key = db.encodeKVKey(args[i].Key) key = db.encodeKVKey(args[i].Key)
@ -222,6 +234,8 @@ func (db *DB) MSet(args ...KVPair) error {
func (db *DB) Set(key []byte, value []byte) error { func (db *DB) Set(key []byte, value []byte) error {
if err := checkKeySize(key); err != nil { if err := checkKeySize(key); err != nil {
return err return err
} else if err := checkValueSize(value); err != nil {
return err
} }
var err error var err error
@ -244,6 +258,8 @@ func (db *DB) Set(key []byte, value []byte) error {
func (db *DB) SetNX(key []byte, value []byte) (int64, error) { func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
if err := checkKeySize(key); err != nil { if err := checkKeySize(key); err != nil {
return 0, err return 0, err
} else if err := checkValueSize(value); err != nil {
return 0, err
} }
var err error var err error
@ -283,6 +299,12 @@ func (db *DB) KvFlush() (drop int64, err error) {
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())
drop++ drop++
if drop%1000 == 0 {
if err = t.Commit(); err != nil {
return
}
}
} }
err = t.Commit() err = t.Commit()

View File

@ -398,6 +398,12 @@ func (db *DB) LFlush() (drop int64, err error) {
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())
drop++ drop++
if drop%1000 == 0 {
if err = t.Commit(); err != nil {
return
}
}
} }
err = t.Commit() err = t.Commit()

View File

@ -697,6 +697,11 @@ func (db *DB) ZFlush() (drop int64, err error) {
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())
drop++ drop++
if drop%1000 == 0 {
if err = t.Commit(); err != nil {
return
}
}
} }
err = t.Commit() err = t.Commit()

View File

@ -33,11 +33,11 @@ func (t *tx) Put(key []byte, value []byte) {
t.wb.Put(key, value) t.wb.Put(key, value)
if t.binlog != nil { if t.binlog != nil {
buf := make([]byte, 9+len(key)+len(value)) buf := make([]byte, 7+len(key)+len(value))
buf[0] = BinLogTypeValue buf[0] = BinLogTypeValue
pos := 1 pos := 1
binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
pos += 4 pos += 2
copy(buf[pos:], key) copy(buf[pos:], key)
pos += len(key) pos += len(key)
binary.BigEndian.PutUint32(buf[pos:], uint32(len(value))) binary.BigEndian.PutUint32(buf[pos:], uint32(len(value)))
@ -52,11 +52,11 @@ func (t *tx) Delete(key []byte) {
t.wb.Delete(key) t.wb.Delete(key)
if t.binlog != nil { if t.binlog != nil {
buf := make([]byte, 5+len(key)) buf := make([]byte, 3+len(key))
buf[0] = BinLogTypeDeletion buf[0] = BinLogTypeDeletion
pos := 1 pos := 1
binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
pos += 4 pos += 2
copy(buf[pos:], key) copy(buf[pos:], key)
t.batch = append(t.batch, buf) t.batch = append(t.batch, buf)