ledisdb/ledis/t_kv.go

415 lines
6.9 KiB
Go
Raw Normal View History

2014-05-09 10:49:22 +04:00
package ledis
2014-05-04 15:02:55 +04:00
import (
"errors"
2014-06-19 13:19:40 +04:00
"github.com/siddontang/ledisdb/leveldb"
"time"
2014-05-04 15:02:55 +04:00
)
2014-05-16 04:56:32 +04:00
type KVPair struct {
Key []byte
Value []byte
}
2014-05-04 15:02:55 +04:00
var errKVKey = errors.New("invalid encode kv key")
2014-05-20 04:41:24 +04:00
func checkKeySize(key []byte) error {
2014-05-23 08:23:20 +04:00
if len(key) > MaxKeySize || len(key) == 0 {
2014-06-05 11:46:38 +04:00
return errKeySize
2014-05-20 04:41:24 +04:00
}
return nil
}
func checkValueSize(value []byte) error {
if len(value) > MaxValueSize {
2014-06-05 11:46:38 +04:00
return errValueSize
}
return nil
}
2014-05-20 04:41:24 +04:00
func (db *DB) encodeKVKey(key []byte) []byte {
ek := make([]byte, len(key)+2)
ek[0] = db.index
ek[1] = kvType
copy(ek[2:], key)
2014-05-04 15:02:55 +04:00
return ek
}
2014-05-20 04:41:24 +04:00
func (db *DB) decodeKVKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != kvType {
2014-05-04 15:02:55 +04:00
return nil, errKVKey
}
2014-05-20 04:41:24 +04:00
return ek[2:], nil
2014-05-04 15:02:55 +04:00
}
2014-05-23 07:44:50 +04:00
func (db *DB) encodeKVMinKey() []byte {
ek := db.encodeKVKey(nil)
return ek
}
func (db *DB) encodeKVMaxKey() []byte {
ek := db.encodeKVKey(nil)
ek[len(ek)-1] = kvType + 1
return ek
}
func (db *DB) incr(key []byte, delta int64) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-05-04 15:02:55 +04:00
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
t := db.kvTx
2014-05-04 15:02:55 +04:00
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
var n int64
n, err = StrInt64(db.db.Get(key))
2014-05-14 04:50:19 +04:00
if err != nil {
return 0, err
2014-05-14 04:50:19 +04:00
}
2014-05-04 15:02:55 +04:00
n += delta
t.Put(key, StrPutInt64(n))
//todo binlog
2014-05-04 15:02:55 +04:00
err = t.Commit()
return n, err
}
2014-05-04 15:02:55 +04:00
// ps : here just focus on deleting the key-value data,
// any other likes expire is ignore.
func (db *DB) delete(t *tx, key []byte) int64 {
key = db.encodeKVKey(key)
t.Delete(key)
return 1
}
func (db *DB) setExpireAt(key []byte, when int64) (int64, error) {
t := db.kvTx
t.Lock()
defer t.Unlock()
if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err
} else {
db.expireAt(t, kvExpType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
}
return 1, nil
}
func (db *DB) Decr(key []byte) (int64, error) {
return db.incr(key, -1)
2014-05-04 15:02:55 +04:00
}
func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) {
2014-05-15 16:48:38 +04:00
return db.incr(key, -decrement)
}
2014-05-04 15:02:55 +04:00
2014-05-16 04:56:32 +04:00
func (db *DB) Del(keys ...[]byte) (int64, error) {
if len(keys) == 0 {
return 0, nil
}
codedKeys := make([][]byte, len(keys))
for i, k := range keys {
codedKeys[i] = db.encodeKVKey(k)
}
2014-05-04 15:02:55 +04:00
t := db.kvTx
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
for i, k := range keys {
t.Delete(codedKeys[i])
db.rmExpire(t, kvExpType, k)
2014-05-04 15:02:55 +04:00
}
err := t.Commit()
return int64(len(keys)), err
2014-05-04 15:02:55 +04:00
}
func (db *DB) Exists(key []byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-05-04 15:02:55 +04:00
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
var v []byte
v, err = db.db.Get(key)
2014-05-05 07:37:44 +04:00
if v != nil && err == nil {
2014-05-04 15:02:55 +04:00
return 1, nil
}
return 0, err
2014-05-04 15:02:55 +04:00
}
func (db *DB) Get(key []byte) ([]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
}
key = db.encodeKVKey(key)
return db.db.Get(key)
}
func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
} else if err := checkValueSize(value); err != nil {
return nil, err
2014-05-20 04:41:24 +04:00
}
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
t := db.kvTx
2014-05-04 15:02:55 +04:00
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
oldValue, err := db.db.Get(key)
2014-05-04 15:02:55 +04:00
if err != nil {
return nil, err
2014-05-04 15:02:55 +04:00
}
t.Put(key, value)
//todo, binlog
2014-05-04 15:02:55 +04:00
err = t.Commit()
return oldValue, err
2014-05-04 15:02:55 +04:00
}
func (db *DB) Incr(key []byte) (int64, error) {
return db.incr(key, 1)
}
2014-05-04 15:02:55 +04:00
func (db *DB) IncryBy(key []byte, increment int64) (int64, error) {
return db.incr(key, increment)
}
2014-05-04 15:02:55 +04:00
func (db *DB) MGet(keys ...[]byte) ([][]byte, error) {
values := make([][]byte, len(keys))
2014-05-04 15:02:55 +04:00
2014-06-19 13:19:40 +04:00
it := db.db.NewIterator()
defer it.Close()
2014-05-04 15:02:55 +04:00
for i := range keys {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(keys[i]); err != nil {
return nil, err
}
2014-06-19 13:19:40 +04:00
values[i] = it.Find(db.encodeKVKey(keys[i]))
2014-05-04 15:02:55 +04:00
}
return values, nil
2014-05-04 15:02:55 +04:00
}
2014-05-16 04:56:32 +04:00
func (db *DB) MSet(args ...KVPair) error {
if len(args) == 0 {
return nil
}
t := db.kvTx
2014-05-04 15:02:55 +04:00
2014-05-20 04:41:24 +04:00
var err error
var key []byte
var value []byte
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
2014-05-16 04:56:32 +04:00
for i := 0; i < len(args); i++ {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(args[i].Key); err != nil {
return err
} else if err := checkValueSize(args[i].Value); err != nil {
return err
2014-05-20 04:41:24 +04:00
}
key = db.encodeKVKey(args[i].Key)
value = args[i].Value
2014-05-04 15:02:55 +04:00
t.Put(key, value)
//todo binlog
}
2014-05-20 04:41:24 +04:00
err = t.Commit()
2014-05-04 15:02:55 +04:00
return err
}
func (db *DB) Set(key []byte, value []byte) error {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return err
} else if err := checkValueSize(value); err != nil {
return err
2014-05-20 04:41:24 +04:00
}
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
t := db.kvTx
2014-05-04 15:02:55 +04:00
t.Lock()
defer t.Unlock()
t.Put(key, value)
//todo, binlog
err = t.Commit()
return err
}
func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
} else if err := checkValueSize(value); err != nil {
return 0, err
2014-05-20 04:41:24 +04:00
}
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
var n int64 = 1
t := db.kvTx
t.Lock()
defer t.Unlock()
if v, err := db.db.Get(key); err != nil {
return 0, err
} else if v != nil {
n = 0
} else {
t.Put(key, value)
//todo binlog
err = t.Commit()
2014-05-04 15:02:55 +04:00
}
return n, err
2014-05-04 15:02:55 +04:00
}
func (db *DB) flush() (drop int64, err error) {
t := db.kvTx
t.Lock()
defer t.Unlock()
2014-05-23 07:44:50 +04:00
minKey := db.encodeKVMinKey()
maxKey := db.encodeKVMaxKey()
2014-06-19 13:19:40 +04:00
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
err = t.Commit()
err = db.expFlush(t, kvExpType)
return
}
2014-05-23 07:44:50 +04:00
2014-05-23 09:06:44 +04:00
//if inclusive is true, scan range [key, inf) else (key, inf)
func (db *DB) Scan(key []byte, count int, inclusive bool) ([]KVPair, error) {
var minKey []byte
if key != nil {
if err := checkKeySize(key); err != nil {
return nil, err
}
2014-05-26 05:21:17 +04:00
minKey = db.encodeKVKey(key)
} else {
minKey = db.encodeKVMinKey()
}
2014-05-23 07:44:50 +04:00
maxKey := db.encodeKVMaxKey()
if count <= 0 {
count = defaultScanCount
}
v := make([]KVPair, 0, 2*count)
2014-05-23 07:44:50 +04:00
2014-05-23 09:06:44 +04:00
rangeType := leveldb.RangeROpen
if !inclusive {
rangeType = leveldb.RangeOpen
}
2014-06-19 13:19:40 +04:00
it := db.db.RangeLimitIterator(minKey, maxKey, rangeType, 0, count)
2014-05-23 07:44:50 +04:00
for ; it.Valid(); it.Next() {
if key, err := db.decodeKVKey(it.Key()); err != nil {
continue
} else {
v = append(v, KVPair{Key: key, Value: it.Value()})
2014-05-23 07:44:50 +04:00
}
}
it.Close()
2014-05-23 07:44:50 +04:00
return v, nil
}
func (db *DB) Expire(key []byte, duration int64) (int64, error) {
if duration <= 0 {
2014-06-05 11:46:38 +04:00
return 0, errExpireValue
}
return db.setExpireAt(key, time.Now().Unix()+duration)
}
func (db *DB) ExpireAt(key []byte, when int64) (int64, error) {
if when <= time.Now().Unix() {
2014-06-05 11:46:38 +04:00
return 0, errExpireValue
}
return db.setExpireAt(key, when)
}
func (db *DB) TTL(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return -1, err
}
return db.ttl(kvExpType, key)
}
2014-06-23 07:12:20 +04:00
func (db *DB) Persist(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
t := db.kvTx
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, kvExpType, key)
if err != nil {
return 0, err
}
err = t.Commit()
return n, err
}