diff --git a/ledis/binlog.go b/ledis/binlog.go index f7f2961..716b11b 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -263,11 +263,19 @@ func (b *BinLog) Log(args ...[]byte) error { return nil } -func (b *BinLog) SavePoint() (string, int64) { - if b.logFile == nil { - return "", 0 +func (b *BinLog) LogFileName() string { + if len(b.logNames) == 0 { + return "" } else { - st, _ := b.logFile.Stat() - return b.logNames[len(b.logNames)-1], st.Size() + return b.logNames[len(b.logNames)-1] + } +} + +func (b *BinLog) LogFilePos() int64 { + if b.logFile == nil { + return 0 + } else { + st, _ := b.logFile.Stat() + return st.Size() } } diff --git a/ledis/const.go b/ledis/const.go index 9fecc29..cfe3cd9 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -24,17 +24,21 @@ const ( MaxDBNumber uint8 = 16 //max key size - MaxKeySize int = 1<<16 - 1 + MaxKeySize int = 1024 //max hash field size - MaxHashFieldSize int = 1<<16 - 1 + MaxHashFieldSize int = 1024 //max zset member size - MaxZSetMemberSize int = 1<<16 - 1 + MaxZSetMemberSize int = 1024 + + //max value size + MaxValueSize int = 10 * 1024 * 1024 ) var ( ErrKeySize = errors.New("invalid key size") + ErrValueSize = errors.New("invalid value size") ErrHashFieldSize = errors.New("invalid hash field size") ErrZSetMemberSize = errors.New("invalid zset member size") ) diff --git a/ledis/ledis.go b/ledis/ledis.go index 886e69a..8d4d950 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -93,16 +93,3 @@ func (l *Ledis) Select(index int) (*DB, error) { return l.dbs[index], nil } - -func (l *Ledis) Snapshot() (*leveldb.Snapshot, string, int64) { - if l.binlog == nil { - return l.ldb.NewSnapshot(), "", 0 - } else { - l.binlog.Lock() - s := l.ldb.NewSnapshot() - fileName, offset := l.binlog.SavePoint() - l.binlog.Unlock() - - return s, fileName, offset - } -} diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 2d6550f..bb700c2 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -130,6 +130,8 @@ func (db *DB) hSetItem(key []byte, field []byte, value []byte) (int64, error) { func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { if err := checkHashKFSize(key, field); err != nil { return 0, err + } else if err := checkValueSize(value); err != nil { + return 0, err } t := db.hashTx @@ -166,6 +168,8 @@ func (db *DB) HMset(key []byte, args ...FVPair) error { for i := 0; i < len(args); i++ { if err := checkHashKFSize(key, args[i].Field); err != nil { return err + } else if err := checkValueSize(args[i].Value); err != nil { + return err } ek = db.hEncodeHashKey(key, args[i].Field) @@ -414,6 +418,11 @@ func (db *DB) HFlush() (drop int64, err error) { for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ + if drop%1000 == 0 { + if err = t.Commit(); err != nil { + return + } + } } err = t.Commit() diff --git a/ledis/t_kv.go b/ledis/t_kv.go index c7e228e..9469604 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -19,6 +19,14 @@ func checkKeySize(key []byte) error { return nil } +func checkValueSize(value []byte) error { + if len(value) > MaxValueSize { + return ErrValueSize + } + + return nil +} + func (db *DB) encodeKVKey(key []byte) []byte { ek := make([]byte, len(key)+2) ek[0] = db.index @@ -137,6 +145,8 @@ func (db *DB) Get(key []byte) ([]byte, error) { func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) { if err := checkKeySize(key); err != nil { return nil, err + } else if err := checkValueSize(value); err != nil { + return nil, err } key = db.encodeKVKey(key) @@ -204,6 +214,8 @@ func (db *DB) MSet(args ...KVPair) error { for i := 0; i < len(args); i++ { if err := checkKeySize(args[i].Key); err != nil { return err + } else if err := checkValueSize(args[i].Value); err != nil { + return err } key = db.encodeKVKey(args[i].Key) @@ -222,6 +234,8 @@ func (db *DB) MSet(args ...KVPair) error { func (db *DB) Set(key []byte, value []byte) error { if err := checkKeySize(key); err != nil { return err + } else if err := checkValueSize(value); err != nil { + return err } var err error @@ -244,6 +258,8 @@ func (db *DB) Set(key []byte, value []byte) error { func (db *DB) SetNX(key []byte, value []byte) (int64, error) { if err := checkKeySize(key); err != nil { return 0, err + } else if err := checkValueSize(value); err != nil { + return 0, err } var err error @@ -283,6 +299,12 @@ func (db *DB) KvFlush() (drop int64, err error) { for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ + + if drop%1000 == 0 { + if err = t.Commit(); err != nil { + return + } + } } err = t.Commit() diff --git a/ledis/t_list.go b/ledis/t_list.go index 34cbb66..eac2769 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -398,6 +398,12 @@ func (db *DB) LFlush() (drop int64, err error) { for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ + if drop%1000 == 0 { + if err = t.Commit(); err != nil { + return + } + } + } err = t.Commit() diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 222b4ae..1c86a9e 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -697,6 +697,11 @@ func (db *DB) ZFlush() (drop int64, err error) { for ; it.Valid(); it.Next() { t.Delete(it.Key()) drop++ + if drop%1000 == 0 { + if err = t.Commit(); err != nil { + return + } + } } err = t.Commit() diff --git a/ledis/tx.go b/ledis/tx.go index f5fa7f3..c5aca2d 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -33,11 +33,11 @@ func (t *tx) Put(key []byte, value []byte) { t.wb.Put(key, value) if t.binlog != nil { - buf := make([]byte, 9+len(key)+len(value)) + buf := make([]byte, 7+len(key)+len(value)) buf[0] = BinLogTypeValue pos := 1 - binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) - pos += 4 + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 copy(buf[pos:], key) pos += len(key) binary.BigEndian.PutUint32(buf[pos:], uint32(len(value))) @@ -52,11 +52,11 @@ func (t *tx) Delete(key []byte) { t.wb.Delete(key) if t.binlog != nil { - buf := make([]byte, 5+len(key)) + buf := make([]byte, 3+len(key)) buf[0] = BinLogTypeDeletion pos := 1 - binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) - pos += 4 + binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) + pos += 2 copy(buf[pos:], key) t.batch = append(t.batch, buf)