mirror of https://github.com/ledisdb/ledisdb.git
add set data type support(unfinished)
This commit is contained in:
parent
3ff3dc3323
commit
54a686277d
|
@ -21,6 +21,7 @@ type DB struct {
|
|||
hashTx *tx
|
||||
zsetTx *tx
|
||||
binTx *tx
|
||||
setTx *tx
|
||||
}
|
||||
|
||||
type Ledis struct {
|
||||
|
@ -88,6 +89,7 @@ func newDB(l *Ledis, index uint8) *DB {
|
|||
d.hashTx = newTx(l)
|
||||
d.zsetTx = newTx(l)
|
||||
d.binTx = newTx(l)
|
||||
d.setTx = newTx(l)
|
||||
|
||||
return d
|
||||
}
|
||||
|
|
321
ledis/t_set.go
321
ledis/t_set.go
|
@ -3,6 +3,8 @@ package ledis
|
|||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"github.com/siddontang/ledisdb/store"
|
||||
"time"
|
||||
)
|
||||
|
||||
var errSetKey = errors.New("invalid set key")
|
||||
|
@ -13,7 +15,7 @@ const (
|
|||
setStopSep byte = setStartSep + 1
|
||||
)
|
||||
|
||||
func checkSetKeyMember(key []byte, member []byte) error {
|
||||
func checkSetKMSize(key []byte, member []byte) error {
|
||||
if len(key) > MaxKeySize || len(key) == 0 {
|
||||
return errKeySize
|
||||
} else if len(member) > MaxSetMemberSize || len(member) == 0 {
|
||||
|
@ -98,3 +100,320 @@ func (db *DB) sEncodeStopKey(key []byte) []byte {
|
|||
|
||||
return k
|
||||
}
|
||||
|
||||
// func (db *DB) sFlush() {
|
||||
|
||||
// }
|
||||
|
||||
func (db *DB) sDelete(t *tx, key []byte) int64 {
|
||||
sk := db.sEncodeSizeKey(key)
|
||||
start := db.sEncodeStartKey(key)
|
||||
stop := db.sEncodeStopKey(key)
|
||||
|
||||
var num int64 = 0
|
||||
it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
t.Delete(it.Key())
|
||||
num++
|
||||
}
|
||||
|
||||
it.Close()
|
||||
|
||||
t.Delete(sk)
|
||||
return num
|
||||
}
|
||||
|
||||
func (db *DB) sIncrSize(key []byte, delta int64) (int64, error) {
|
||||
t := db.setTx
|
||||
sk := db.sEncodeSizeKey(key)
|
||||
|
||||
var err error
|
||||
var size int64 = 0
|
||||
if size, err = Int64(db.db.Get(sk)); err != nil {
|
||||
return 0, err
|
||||
} else {
|
||||
size += delta
|
||||
if size <= 0 {
|
||||
size = 0
|
||||
t.Delete(sk)
|
||||
db.rmExpire(t, SetType, key)
|
||||
} else {
|
||||
t.Put(sk, PutInt64(size))
|
||||
//FIXME
|
||||
if err := t.Commit(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (db *DB) sExpireAt(key []byte, when int64) (int64, error) {
|
||||
t := db.setTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
if scnt, err := db.SCard(key); err != nil || scnt == 0 {
|
||||
return 0, err
|
||||
} else {
|
||||
db.expireAt(t, SetType, key, when)
|
||||
if err := t.Commit(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (db *DB) sSetItem(key []byte, member []byte) (int64, error) {
|
||||
t := db.setTx
|
||||
ek := db.sEncodeSetKey(key, member)
|
||||
|
||||
var n int64 = 1
|
||||
if v, _ := db.db.Get(ek); v != nil {
|
||||
n = 0
|
||||
} else {
|
||||
if _, err := db.sIncrSize(key, 1); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
t.Put(ek, nil)
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (db *DB) SAdd(key []byte, args ...[]byte) (int64, error) {
|
||||
t := db.setTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
var num int64 = 0
|
||||
for i := 0; i < len(args); i++ {
|
||||
member := args[i]
|
||||
if err := checkSetKMSize(key, member); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if n, err := db.sSetItem(key, member); err != nil {
|
||||
return 0, err
|
||||
} else if n == 1 {
|
||||
num++
|
||||
}
|
||||
}
|
||||
|
||||
err := t.Commit()
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (db *DB) SCard(key []byte) (int64, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
sk := db.sEncodeSizeKey(key)
|
||||
|
||||
return Int64(db.db.Get(sk))
|
||||
}
|
||||
|
||||
// TODO
|
||||
// func (db *DB) sDiffGeneric()
|
||||
|
||||
// func (db *DB) SDiff() {
|
||||
|
||||
// }
|
||||
|
||||
// func (db *DB) SDiffStore() {
|
||||
|
||||
// }
|
||||
|
||||
// func (db *DB) SInter() {
|
||||
|
||||
// }
|
||||
|
||||
// func (db *DB) SInterStore() {
|
||||
|
||||
// }
|
||||
|
||||
func (db *DB) SIsMember(key []byte, member []byte) (int64, error) {
|
||||
ek := db.sEncodeSetKey(key, member)
|
||||
|
||||
var n int64 = 1
|
||||
if v, err := db.db.Get(ek); err != nil {
|
||||
return 0, err
|
||||
} else if v == nil {
|
||||
n = 0
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (db *DB) SMembers(key []byte) ([][]byte, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
start := db.sEncodeStartKey(key)
|
||||
stop := db.sEncodeStopKey(key)
|
||||
|
||||
v := make([][]byte, 0, 16)
|
||||
|
||||
it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
_, m, err := db.sDecodeSetKey(it.Key())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v = append(v, m)
|
||||
}
|
||||
|
||||
it.Close()
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (db *DB) SRem(key []byte, args ...[]byte) (int64, error) {
|
||||
t := db.setTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
var ek []byte
|
||||
var v []byte
|
||||
var err error
|
||||
|
||||
it := db.db.NewIterator()
|
||||
defer it.Close()
|
||||
|
||||
var num int64 = 0
|
||||
for i := 0; i < len(args); i++ {
|
||||
if err := checkSetKMSize(key, args[i]); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
ek = db.sEncodeSetKey(key, args[i])
|
||||
|
||||
v = it.RawFind(ek)
|
||||
if v == nil {
|
||||
continue
|
||||
} else {
|
||||
num++
|
||||
t.Delete(ek)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err = db.sIncrSize(key, -num); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = t.Commit()
|
||||
return num, err
|
||||
|
||||
}
|
||||
|
||||
// TODO
|
||||
// func (db *DB) sUnionGeneric(keys ...[]byte) ([][]byte, error) {
|
||||
|
||||
// for _, key := range keys {
|
||||
// if err := checkKeySize(key); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (db *DB) SUnion(keys ...[]byte) ([][]byte, error) {
|
||||
|
||||
// if v, err := db.sUnionGeneric(keys); err != nil {
|
||||
// return nil, err
|
||||
// } else if v == nil {
|
||||
// return nil, nil
|
||||
// } else {
|
||||
// return v, nil
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// func (db *DB) SUnionStore(dstkey []byte, keys []byte) (int64, error) {
|
||||
// if err := checkKeySize(dstkey); err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
func (db *DB) SClear(key []byte) (int64, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
t := db.setTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
num := db.sDelete(t, key)
|
||||
db.rmExpire(t, SetType, key)
|
||||
|
||||
err := t.Commit()
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (db *DB) SMclear(keys ...[]byte) (int64, error) {
|
||||
t := db.setTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
for _, key := range keys {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
db.sDelete(t, key)
|
||||
db.rmExpire(t, SetType, key)
|
||||
}
|
||||
|
||||
err := t.Commit()
|
||||
return int64(len(keys)), err
|
||||
}
|
||||
|
||||
func (db *DB) SExpire(key []byte, duration int64) (int64, error) {
|
||||
if duration <= 0 {
|
||||
return 0, errExpireValue
|
||||
}
|
||||
|
||||
return db.sExpireAt(key, time.Now().Unix()+duration)
|
||||
|
||||
}
|
||||
|
||||
func (db *DB) SExpireAt(key []byte, when int64) (int64, error) {
|
||||
if when <= time.Now().Unix() {
|
||||
return 0, errExpireValue
|
||||
}
|
||||
|
||||
return db.sExpireAt(key, when)
|
||||
|
||||
}
|
||||
|
||||
func (db *DB) STTL(key []byte) (int64, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return db.ttl(SetType, key)
|
||||
}
|
||||
|
||||
func (db *DB) SPersist(key []byte) (int64, error) {
|
||||
if err := checkKeySize(key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
t := db.setTx
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
n, err := db.rmExpire(t, SetType, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
err = t.Commit()
|
||||
return n, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
package ledis
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSetCodec(t *testing.T) {
|
||||
db := getTestDB()
|
||||
|
||||
key := []byte("key")
|
||||
member := []byte("member")
|
||||
|
||||
ek := db.sEncodeSizeKey(key)
|
||||
if k, err := db.sDecodeSizeKey(ek); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(k) != "key" {
|
||||
t.Fatal(string(k))
|
||||
}
|
||||
|
||||
ek = db.sEncodeSetKey(key, member)
|
||||
if k, m, err := db.sDecodeSetKey(ek); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(k) != "key" {
|
||||
t.Fatal(string(k))
|
||||
} else if string(m) != "member" {
|
||||
t.Fatal(string(m))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDBSet(t *testing.T) {
|
||||
db := getTestDB()
|
||||
|
||||
key := []byte("testdb_set_a")
|
||||
member := []byte("member")
|
||||
key1 := []byte("testdb_set_a1")
|
||||
key2 := []byte("testdb_set_a2")
|
||||
member1 := []byte("testdb_set_m1")
|
||||
member2 := []byte("testdb_set_m2")
|
||||
|
||||
// if n, err := db.sSetItem(key, []byte("m1")); err != nil {
|
||||
// t.Fatal(err)
|
||||
// } else if n != 1 {
|
||||
// t.Fatal(n)
|
||||
// }
|
||||
|
||||
// if size, err := db.sIncrSize(key, 1); err != nil {
|
||||
// t.Fatal(err)
|
||||
// } else if size != 1 {
|
||||
// t.Fatal(size)
|
||||
// }
|
||||
|
||||
if n, err := db.SAdd(key, member); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 1 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
if cnt, err := db.SCard(key); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if cnt != 1 {
|
||||
t.Fatal(cnt)
|
||||
}
|
||||
|
||||
if n, err := db.SIsMember(key, member); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 1 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
if v, err := db.SMembers(key); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(v[0]) != "member" {
|
||||
t.Fatal(string(v[0]))
|
||||
}
|
||||
|
||||
if n, err := db.SRem(key, member); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 1 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
db.SAdd(key1, member1, member2)
|
||||
|
||||
// tx := db.setTx
|
||||
|
||||
// if n := db.sDelete(tx, key1); n != 2 {
|
||||
// t.Fatal(n)
|
||||
// }
|
||||
|
||||
if n, err := db.SClear(key1); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 2 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
db.SAdd(key2, member1, member2)
|
||||
db.SAdd(key1, member1, member2)
|
||||
|
||||
if n, err := db.SMclear(key1, key2); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 2 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
db.SAdd(key2, member1, member2)
|
||||
if n, err := db.SExpire(key2, 3600); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 1 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
if n, err := db.SExpireAt(key2, time.Now().Unix()+3600); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 1 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
if n, err := db.STTL(key2); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n < 0 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
if n, err := db.SPersist(key2); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 1 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue