ledisdb/ledis/t_kv.go

770 lines
14 KiB
Go
Raw Normal View History

2014-05-09 10:49:22 +04:00
package ledis
2014-05-04 15:02:55 +04:00
import (
"encoding/binary"
2014-05-04 15:02:55 +04:00
"errors"
"fmt"
"strings"
"time"
2015-05-04 17:42:28 +03:00
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/store"
2014-05-04 15:02:55 +04:00
)
2014-05-16 04:56:32 +04:00
type KVPair struct {
Key []byte
Value []byte
}
2014-05-04 15:02:55 +04:00
var errKVKey = errors.New("invalid encode kv key")
2014-05-20 04:41:24 +04:00
func checkKeySize(key []byte) error {
2014-05-23 08:23:20 +04:00
if len(key) > MaxKeySize || len(key) == 0 {
2014-06-05 11:46:38 +04:00
return errKeySize
2014-05-20 04:41:24 +04:00
}
return nil
}
func checkValueSize(value []byte) error {
if len(value) > MaxValueSize {
2014-06-05 11:46:38 +04:00
return errValueSize
}
return nil
}
2014-05-20 04:41:24 +04:00
func (db *DB) encodeKVKey(key []byte) []byte {
2015-03-15 15:36:14 +03:00
ek := make([]byte, len(key)+1+len(db.indexVarBuf))
pos := copy(ek, db.indexVarBuf)
ek[pos] = KVType
pos++
copy(ek[pos:], key)
2014-05-04 15:02:55 +04:00
return ek
}
2014-05-20 04:41:24 +04:00
func (db *DB) decodeKVKey(ek []byte) ([]byte, error) {
2015-03-15 15:36:14 +03:00
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
if pos+1 > len(ek) || ek[pos] != KVType {
2014-05-04 15:02:55 +04:00
return nil, errKVKey
}
2015-03-15 15:36:14 +03:00
pos++
return ek[pos:], nil
2014-05-04 15:02:55 +04:00
}
2014-05-23 07:44:50 +04:00
func (db *DB) encodeKVMinKey() []byte {
ek := db.encodeKVKey(nil)
return ek
}
func (db *DB) encodeKVMaxKey() []byte {
ek := db.encodeKVKey(nil)
ek[len(ek)-1] = KVType + 1
2014-05-23 07:44:50 +04:00
return ek
}
func (db *DB) incr(key []byte, delta int64) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-05-04 15:02:55 +04:00
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-05-04 15:02:55 +04:00
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
var n int64
2014-08-25 10:18:23 +04:00
n, err = StrInt64(db.bucket.Get(key))
2014-05-14 04:50:19 +04:00
if err != nil {
return 0, err
2014-05-14 04:50:19 +04:00
}
2014-05-04 15:02:55 +04:00
n += delta
2014-09-24 09:29:27 +04:00
t.Put(key, num.FormatInt64ToSlice(n))
2014-05-04 15:02:55 +04:00
err = t.Commit()
return n, err
}
2014-05-04 15:02:55 +04:00
// ps : here just focus on deleting the key-value data,
// any other likes expire is ignore.
2014-08-25 10:18:23 +04:00
func (db *DB) delete(t *batch, key []byte) int64 {
key = db.encodeKVKey(key)
t.Delete(key)
return 1
}
func (db *DB) setExpireAt(key []byte, when int64) (int64, error) {
2014-08-25 10:18:23 +04:00
t := db.kvBatch
t.Lock()
defer t.Unlock()
if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err
} else {
db.expireAt(t, KVType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
}
return 1, nil
}
func (db *DB) Decr(key []byte) (int64, error) {
return db.incr(key, -1)
2014-05-04 15:02:55 +04:00
}
func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) {
2014-05-15 16:48:38 +04:00
return db.incr(key, -decrement)
}
2014-05-04 15:02:55 +04:00
2014-05-16 04:56:32 +04:00
func (db *DB) Del(keys ...[]byte) (int64, error) {
if len(keys) == 0 {
return 0, nil
}
codedKeys := make([][]byte, len(keys))
for i, k := range keys {
codedKeys[i] = db.encodeKVKey(k)
}
2014-05-04 15:02:55 +04:00
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
for i, k := range keys {
t.Delete(codedKeys[i])
db.rmExpire(t, KVType, k)
2014-05-04 15:02:55 +04:00
}
err := t.Commit()
return int64(len(keys)), err
2014-05-04 15:02:55 +04:00
}
func (db *DB) Exists(key []byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-05-04 15:02:55 +04:00
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
var v []byte
2014-08-25 10:18:23 +04:00
v, err = db.bucket.Get(key)
2014-05-05 07:37:44 +04:00
if v != nil && err == nil {
2014-05-04 15:02:55 +04:00
return 1, nil
}
return 0, err
2014-05-04 15:02:55 +04:00
}
func (db *DB) Get(key []byte) ([]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
}
key = db.encodeKVKey(key)
2014-08-25 10:18:23 +04:00
return db.bucket.Get(key)
}
func (db *DB) GetSlice(key []byte) (store.Slice, error) {
if err := checkKeySize(key); err != nil {
return nil, err
}
key = db.encodeKVKey(key)
return db.bucket.GetSlice(key)
}
func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
} else if err := checkValueSize(value); err != nil {
return nil, err
2014-05-20 04:41:24 +04:00
}
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-05-04 15:02:55 +04:00
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
2014-08-25 10:18:23 +04:00
oldValue, err := db.bucket.Get(key)
2014-05-04 15:02:55 +04:00
if err != nil {
return nil, err
2014-05-04 15:02:55 +04:00
}
t.Put(key, value)
2014-05-04 15:02:55 +04:00
err = t.Commit()
return oldValue, err
2014-05-04 15:02:55 +04:00
}
func (db *DB) Incr(key []byte) (int64, error) {
return db.incr(key, 1)
}
2014-05-04 15:02:55 +04:00
func (db *DB) IncrBy(key []byte, increment int64) (int64, error) {
return db.incr(key, increment)
}
2014-05-04 15:02:55 +04:00
func (db *DB) MGet(keys ...[]byte) ([][]byte, error) {
values := make([][]byte, len(keys))
2014-05-04 15:02:55 +04:00
2014-08-25 10:18:23 +04:00
it := db.bucket.NewIterator()
2014-06-19 13:19:40 +04:00
defer it.Close()
2014-05-04 15:02:55 +04:00
for i := range keys {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(keys[i]); err != nil {
return nil, err
}
2014-06-19 13:19:40 +04:00
values[i] = it.Find(db.encodeKVKey(keys[i]))
2014-05-04 15:02:55 +04:00
}
return values, nil
2014-05-04 15:02:55 +04:00
}
2014-05-16 04:56:32 +04:00
func (db *DB) MSet(args ...KVPair) error {
if len(args) == 0 {
return nil
}
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-05-04 15:02:55 +04:00
2014-05-20 04:41:24 +04:00
var err error
var key []byte
var value []byte
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-04 15:02:55 +04:00
2014-05-16 04:56:32 +04:00
for i := 0; i < len(args); i++ {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(args[i].Key); err != nil {
return err
} else if err := checkValueSize(args[i].Value); err != nil {
return err
2014-05-20 04:41:24 +04:00
}
key = db.encodeKVKey(args[i].Key)
value = args[i].Value
2014-05-04 15:02:55 +04:00
t.Put(key, value)
}
2014-05-20 04:41:24 +04:00
err = t.Commit()
2014-05-04 15:02:55 +04:00
return err
}
func (db *DB) Set(key []byte, value []byte) error {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return err
} else if err := checkValueSize(value); err != nil {
return err
2014-05-20 04:41:24 +04:00
}
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
2014-05-04 15:02:55 +04:00
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-05-04 15:02:55 +04:00
t.Lock()
defer t.Unlock()
t.Put(key, value)
err = t.Commit()
return err
}
func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
} else if err := checkValueSize(value); err != nil {
return 0, err
2014-05-20 04:41:24 +04:00
}
var err error
2014-05-20 04:41:24 +04:00
key = db.encodeKVKey(key)
var n int64 = 1
2014-08-25 10:18:23 +04:00
t := db.kvBatch
t.Lock()
defer t.Unlock()
2014-08-25 10:18:23 +04:00
if v, err := db.bucket.Get(key); err != nil {
return 0, err
} else if v != nil {
n = 0
} else {
t.Put(key, value)
err = t.Commit()
2014-05-04 15:02:55 +04:00
}
return n, err
2014-05-04 15:02:55 +04:00
}
2014-10-26 10:15:43 +03:00
func (db *DB) SetEX(key []byte, duration int64, value []byte) error {
if err := checkKeySize(key); err != nil {
return err
} else if err := checkValueSize(value); err != nil {
return err
} else if duration <= 0 {
return errExpireValue
}
ek := db.encodeKVKey(key)
t := db.kvBatch
t.Lock()
defer t.Unlock()
t.Put(ek, value)
db.expireAt(t, KVType, key, time.Now().Unix()+duration)
if err := t.Commit(); err != nil {
return err
}
return nil
}
func (db *DB) flush() (drop int64, err error) {
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-07-03 07:38:52 +04:00
t.Lock()
defer t.Unlock()
2014-08-16 11:35:05 +04:00
return db.flushType(t, KVType)
}
2014-05-23 07:44:50 +04:00
func (db *DB) Expire(key []byte, duration int64) (int64, error) {
if duration <= 0 {
2014-06-05 11:46:38 +04:00
return 0, errExpireValue
}
return db.setExpireAt(key, time.Now().Unix()+duration)
}
func (db *DB) ExpireAt(key []byte, when int64) (int64, error) {
if when <= time.Now().Unix() {
2014-06-05 11:46:38 +04:00
return 0, errExpireValue
}
return db.setExpireAt(key, when)
}
func (db *DB) TTL(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return -1, err
}
return db.ttl(KVType, key)
}
2014-06-23 07:12:20 +04:00
func (db *DB) Persist(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-08-25 10:18:23 +04:00
t := db.kvBatch
2014-06-23 07:12:20 +04:00
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, KVType, key)
2014-06-23 07:12:20 +04:00
if err != nil {
return 0, err
}
err = t.Commit()
return n, err
}
func (db *DB) SetRange(key []byte, offset int, value []byte) (int64, error) {
if len(value) == 0 {
return 0, nil
}
if err := checkKeySize(key); err != nil {
return 0, err
} else if len(value)+offset > MaxValueSize {
return 0, errValueSize
}
key = db.encodeKVKey(key)
t := db.kvBatch
t.Lock()
defer t.Unlock()
oldValue, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
extra := offset + len(value) - len(oldValue)
if extra > 0 {
oldValue = append(oldValue, make([]byte, extra)...)
}
copy(oldValue[offset:], value)
t.Put(key, oldValue)
if err := t.Commit(); err != nil {
return 0, err
}
return int64(len(oldValue)), nil
}
func getRange(start int, end int, valLen int) (int, int) {
if start < 0 {
start = valLen + start
}
if end < 0 {
end = valLen + end
}
if start < 0 {
start = 0
}
if end < 0 {
end = 0
}
if end >= valLen {
end = valLen - 1
}
return start, end
}
func (db *DB) GetRange(key []byte, start int, end int) ([]byte, error) {
if err := checkKeySize(key); err != nil {
return nil, err
}
key = db.encodeKVKey(key)
value, err := db.bucket.Get(key)
if err != nil {
return nil, err
}
valLen := len(value)
start, end = getRange(start, end, valLen)
if start > end {
return nil, nil
}
return value[start : end+1], nil
}
func (db *DB) StrLen(key []byte) (int64, error) {
s, err := db.GetSlice(key)
if err != nil {
return 0, err
}
n := s.Size()
s.Free()
return int64(n), nil
}
func (db *DB) Append(key []byte, value []byte) (int64, error) {
if len(value) == 0 {
return 0, nil
}
if err := checkKeySize(key); err != nil {
return 0, err
}
key = db.encodeKVKey(key)
t := db.kvBatch
t.Lock()
defer t.Unlock()
oldValue, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
if len(oldValue)+len(value) > MaxValueSize {
return 0, errValueSize
}
oldValue = append(oldValue, value...)
t.Put(key, oldValue)
if err := t.Commit(); err != nil {
return 0, nil
}
return int64(len(oldValue)), nil
}
func (db *DB) BitOP(op string, destKey []byte, srcKeys ...[]byte) (int64, error) {
if err := checkKeySize(destKey); err != nil {
return 0, err
}
op = strings.ToLower(op)
if len(srcKeys) == 0 {
return 0, nil
} else if op == BitNot && len(srcKeys) > 1 {
return 0, fmt.Errorf("BITOP NOT has only one srckey")
} else if len(srcKeys) < 2 {
return 0, nil
}
key := db.encodeKVKey(srcKeys[0])
value, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
if op == BitNot {
for i := 0; i < len(value); i++ {
value[i] ^= value[i]
}
} else {
for j := 1; j < len(srcKeys); j++ {
if err := checkKeySize(srcKeys[j]); err != nil {
return 0, err
}
key = db.encodeKVKey(srcKeys[j])
ovalue, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
if len(value) < len(ovalue) {
value, ovalue = ovalue, value
}
for i := 0; i < len(ovalue); i++ {
switch op {
case BitAND:
value[i] &= ovalue[i]
case BitOR:
value[i] |= ovalue[i]
case BitXOR:
value[i] ^= ovalue[i]
default:
return 0, fmt.Errorf("invalid op type: %s", op)
}
}
for i := len(ovalue); i < len(value); i++ {
switch op {
case BitAND:
value[i] &= 0
case BitOR:
value[i] |= 0
case BitXOR:
value[i] ^= 0
}
}
}
}
key = db.encodeKVKey(destKey)
t := db.kvBatch
t.Lock()
defer t.Unlock()
t.Put(key, value)
if err := t.Commit(); err != nil {
return 0, err
}
return int64(len(value)), nil
}
var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3,
4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3,
3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4,
5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4,
3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4,
5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2,
2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3,
4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4,
5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6,
6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5,
6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}
func numberBitCount(i uint32) uint32 {
i = i - ((i >> 1) & 0x55555555)
i = (i & 0x33333333) + ((i >> 2) & 0x33333333)
return (((i + (i >> 4)) & 0x0F0F0F0F) * 0x01010101) >> 24
}
func (db *DB) BitCount(key []byte, start int, end int) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
key = db.encodeKVKey(key)
value, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
start, end = getRange(start, end, len(value))
value = value[start : end+1]
var n int64 = 0
pos := 0
for ; pos+4 <= len(value); pos = pos + 4 {
n += int64(numberBitCount(binary.BigEndian.Uint32(value[pos : pos+4])))
}
for ; pos < len(value); pos++ {
n += int64(bitsInByte[value[pos]])
}
return n, nil
}
func (db *DB) BitPos(key []byte, on int, start int, end int) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
if (on & ^1) != 0 {
return 0, fmt.Errorf("bit must be 0 or 1, not %d", on)
}
var skipValue uint8 = 0
if on == 0 {
skipValue = 0xFF
}
key = db.encodeKVKey(key)
value, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
start, end = getRange(start, end, len(value))
value = value[start : end+1]
for i, v := range value {
if uint8(v) != skipValue {
for j := 0; j < 8; j++ {
isNull := uint8(v)&(1<<uint8(7-j)) == 0
if (on == 1 && !isNull) || (on == 0 && isNull) {
return int64((start+i)*8 + j), nil
}
}
}
}
return -1, nil
}
func (db *DB) SetBit(key []byte, offset int, on int) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
if (on & ^1) != 0 {
return 0, fmt.Errorf("bit must be 0 or 1, not %d", on)
}
t := db.kvBatch
t.Lock()
defer t.Unlock()
key = db.encodeKVKey(key)
value, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
byteOffset := int(uint32(offset) >> 3)
extra := byteOffset + 1 - len(value)
if extra > 0 {
value = append(value, make([]byte, extra)...)
}
byteVal := value[byteOffset]
bit := 7 - uint8(uint32(offset)&0x7)
bitVal := byteVal & (1 << bit)
byteVal &= ^(1 << bit)
byteVal |= (uint8(on&0x1) << bit)
value[byteOffset] = byteVal
t.Put(key, value)
if err := t.Commit(); err != nil {
return 0, err
}
if bitVal > 0 {
return 1, nil
} else {
return 0, nil
}
}
func (db *DB) GetBit(key []byte, offset int) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
key = db.encodeKVKey(key)
value, err := db.bucket.Get(key)
if err != nil {
return 0, err
}
byteOffset := uint32(offset) >> 3
bit := 7 - uint8(uint32(offset)&0x7)
if byteOffset >= uint32(len(value)) {
return 0, nil
}
bitVal := value[byteOffset] & (1 << bit)
if bitVal > 0 {
return 1, nil
} else {
return 0, nil
}
}