From 54a686277da2f122b40d861f8fbf394091bf089c Mon Sep 17 00:00:00 2001 From: holys Date: Wed, 13 Aug 2014 21:54:27 +0800 Subject: [PATCH] add set data type support(unfinished) --- ledis/ledis.go | 2 + ledis/t_set.go | 321 +++++++++++++++++++++++++++++++++++++++++++- ledis/t_set_test.go | 131 ++++++++++++++++++ 3 files changed, 453 insertions(+), 1 deletion(-) create mode 100644 ledis/t_set_test.go diff --git a/ledis/ledis.go b/ledis/ledis.go index 4c20736..65ab1ec 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -21,6 +21,7 @@ type DB struct { hashTx *tx zsetTx *tx binTx *tx + setTx *tx } type Ledis struct { @@ -88,6 +89,7 @@ func newDB(l *Ledis, index uint8) *DB { d.hashTx = newTx(l) d.zsetTx = newTx(l) d.binTx = newTx(l) + d.setTx = newTx(l) return d } diff --git a/ledis/t_set.go b/ledis/t_set.go index 363af80..7bfc170 100644 --- a/ledis/t_set.go +++ b/ledis/t_set.go @@ -3,6 +3,8 @@ package ledis import ( "encoding/binary" "errors" + "github.com/siddontang/ledisdb/store" + "time" ) var errSetKey = errors.New("invalid set key") @@ -13,7 +15,7 @@ const ( setStopSep byte = setStartSep + 1 ) -func checkSetKeyMember(key []byte, member []byte) error { +func checkSetKMSize(key []byte, member []byte) error { if len(key) > MaxKeySize || len(key) == 0 { return errKeySize } else if len(member) > MaxSetMemberSize || len(member) == 0 { @@ -98,3 +100,320 @@ func (db *DB) sEncodeStopKey(key []byte) []byte { return k } + +// func (db *DB) sFlush() { + +// } + +func (db *DB) sDelete(t *tx, key []byte) int64 { + sk := db.sEncodeSizeKey(key) + start := db.sEncodeStartKey(key) + stop := db.sEncodeStopKey(key) + + var num int64 = 0 + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + t.Delete(it.Key()) + num++ + } + + it.Close() + + t.Delete(sk) + return num +} + +func (db *DB) sIncrSize(key []byte, delta int64) (int64, error) { + t := db.setTx + sk := db.sEncodeSizeKey(key) + + var err error + var size int64 = 0 + if size, err = Int64(db.db.Get(sk)); err != nil { + return 0, err + } else { + size += delta + if size <= 0 { + size = 0 + t.Delete(sk) + db.rmExpire(t, SetType, key) + } else { + t.Put(sk, PutInt64(size)) + //FIXME + if err := t.Commit(); err != nil { + return 0, err + } + } + } + + return size, nil +} + +func (db *DB) sExpireAt(key []byte, when int64) (int64, error) { + t := db.setTx + t.Lock() + defer t.Unlock() + + if scnt, err := db.SCard(key); err != nil || scnt == 0 { + return 0, err + } else { + db.expireAt(t, SetType, key, when) + if err := t.Commit(); err != nil { + return 0, err + } + + } + + return 1, nil +} + +func (db *DB) sSetItem(key []byte, member []byte) (int64, error) { + t := db.setTx + ek := db.sEncodeSetKey(key, member) + + var n int64 = 1 + if v, _ := db.db.Get(ek); v != nil { + n = 0 + } else { + if _, err := db.sIncrSize(key, 1); err != nil { + return 0, err + } + } + + t.Put(ek, nil) + return n, nil +} + +func (db *DB) SAdd(key []byte, args ...[]byte) (int64, error) { + t := db.setTx + t.Lock() + defer t.Unlock() + + var num int64 = 0 + for i := 0; i < len(args); i++ { + member := args[i] + if err := checkSetKMSize(key, member); err != nil { + return 0, err + } + + if n, err := db.sSetItem(key, member); err != nil { + return 0, err + } else if n == 1 { + num++ + } + } + + err := t.Commit() + return num, err +} + +func (db *DB) SCard(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + sk := db.sEncodeSizeKey(key) + + return Int64(db.db.Get(sk)) +} + +// TODO +// func (db *DB) sDiffGeneric() + +// func (db *DB) SDiff() { + +// } + +// func (db *DB) SDiffStore() { + +// } + +// func (db *DB) SInter() { + +// } + +// func (db *DB) SInterStore() { + +// } + +func (db *DB) SIsMember(key []byte, member []byte) (int64, error) { + ek := db.sEncodeSetKey(key, member) + + var n int64 = 1 + if v, err := db.db.Get(ek); err != nil { + return 0, err + } else if v == nil { + n = 0 + } + return n, nil +} + +func (db *DB) SMembers(key []byte) ([][]byte, error) { + if err := checkKeySize(key); err != nil { + return nil, err + } + + start := db.sEncodeStartKey(key) + stop := db.sEncodeStopKey(key) + + v := make([][]byte, 0, 16) + + it := db.db.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + for ; it.Valid(); it.Next() { + _, m, err := db.sDecodeSetKey(it.Key()) + if err != nil { + return nil, err + } + + v = append(v, m) + } + + it.Close() + + return v, nil +} + +func (db *DB) SRem(key []byte, args ...[]byte) (int64, error) { + t := db.setTx + t.Lock() + defer t.Unlock() + + var ek []byte + var v []byte + var err error + + it := db.db.NewIterator() + defer it.Close() + + var num int64 = 0 + for i := 0; i < len(args); i++ { + if err := checkSetKMSize(key, args[i]); err != nil { + return 0, err + } + + ek = db.sEncodeSetKey(key, args[i]) + + v = it.RawFind(ek) + if v == nil { + continue + } else { + num++ + t.Delete(ek) + } + } + + if _, err = db.sIncrSize(key, -num); err != nil { + return 0, err + } + + err = t.Commit() + return num, err + +} + +// TODO +// func (db *DB) sUnionGeneric(keys ...[]byte) ([][]byte, error) { + +// for _, key := range keys { +// if err := checkKeySize(key); err != nil { +// return nil, err +// } + +// } +// } + +// func (db *DB) SUnion(keys ...[]byte) ([][]byte, error) { + +// if v, err := db.sUnionGeneric(keys); err != nil { +// return nil, err +// } else if v == nil { +// return nil, nil +// } else { +// return v, nil +// } + +// } + +// func (db *DB) SUnionStore(dstkey []byte, keys []byte) (int64, error) { +// if err := checkKeySize(dstkey); err != nil { +// return 0, err +// } + +// } + +func (db *DB) SClear(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.setTx + t.Lock() + defer t.Unlock() + + num := db.sDelete(t, key) + db.rmExpire(t, SetType, key) + + err := t.Commit() + return num, err +} + +func (db *DB) SMclear(keys ...[]byte) (int64, error) { + t := db.setTx + t.Lock() + defer t.Unlock() + + for _, key := range keys { + if err := checkKeySize(key); err != nil { + return 0, err + } + + db.sDelete(t, key) + db.rmExpire(t, SetType, key) + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (db *DB) SExpire(key []byte, duration int64) (int64, error) { + if duration <= 0 { + return 0, errExpireValue + } + + return db.sExpireAt(key, time.Now().Unix()+duration) + +} + +func (db *DB) SExpireAt(key []byte, when int64) (int64, error) { + if when <= time.Now().Unix() { + return 0, errExpireValue + } + + return db.sExpireAt(key, when) + +} + +func (db *DB) STTL(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return -1, err + } + + return db.ttl(SetType, key) +} + +func (db *DB) SPersist(key []byte) (int64, error) { + if err := checkKeySize(key); err != nil { + return 0, err + } + + t := db.setTx + t.Lock() + defer t.Unlock() + + n, err := db.rmExpire(t, SetType, key) + if err != nil { + return 0, err + } + err = t.Commit() + return n, err +} diff --git a/ledis/t_set_test.go b/ledis/t_set_test.go new file mode 100644 index 0000000..6518556 --- /dev/null +++ b/ledis/t_set_test.go @@ -0,0 +1,131 @@ +package ledis + +import ( + "testing" + "time" +) + +func TestSetCodec(t *testing.T) { + db := getTestDB() + + key := []byte("key") + member := []byte("member") + + ek := db.sEncodeSizeKey(key) + if k, err := db.sDecodeSizeKey(ek); err != nil { + t.Fatal(err) + } else if string(k) != "key" { + t.Fatal(string(k)) + } + + ek = db.sEncodeSetKey(key, member) + if k, m, err := db.sDecodeSetKey(ek); err != nil { + t.Fatal(err) + } else if string(k) != "key" { + t.Fatal(string(k)) + } else if string(m) != "member" { + t.Fatal(string(m)) + } +} + +func TestDBSet(t *testing.T) { + db := getTestDB() + + key := []byte("testdb_set_a") + member := []byte("member") + key1 := []byte("testdb_set_a1") + key2 := []byte("testdb_set_a2") + member1 := []byte("testdb_set_m1") + member2 := []byte("testdb_set_m2") + + // if n, err := db.sSetItem(key, []byte("m1")); err != nil { + // t.Fatal(err) + // } else if n != 1 { + // t.Fatal(n) + // } + + // if size, err := db.sIncrSize(key, 1); err != nil { + // t.Fatal(err) + // } else if size != 1 { + // t.Fatal(size) + // } + + if n, err := db.SAdd(key, member); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if cnt, err := db.SCard(key); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Fatal(cnt) + } + + if n, err := db.SIsMember(key, member); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if v, err := db.SMembers(key); err != nil { + t.Fatal(err) + } else if string(v[0]) != "member" { + t.Fatal(string(v[0])) + } + + if n, err := db.SRem(key, member); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + db.SAdd(key1, member1, member2) + + // tx := db.setTx + + // if n := db.sDelete(tx, key1); n != 2 { + // t.Fatal(n) + // } + + if n, err := db.SClear(key1); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + db.SAdd(key2, member1, member2) + db.SAdd(key1, member1, member2) + + if n, err := db.SMclear(key1, key2); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + db.SAdd(key2, member1, member2) + if n, err := db.SExpire(key2, 3600); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if n, err := db.SExpireAt(key2, time.Now().Unix()+3600); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if n, err := db.STTL(key2); err != nil { + t.Fatal(err) + } else if n < 0 { + t.Fatal(n) + } + + if n, err := db.SPersist(key2); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + +}