scan add match support

match uses regexp
This commit is contained in:
siddontang 2014-08-25 22:25:10 +08:00
parent 76896882a9
commit ba8d9ea69d
15 changed files with 113 additions and 75 deletions

View File

@ -146,7 +146,7 @@ func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
} }
var keys [][]byte var keys [][]byte
keys, err = db.scan(metaDataType, nil, 1024, false) keys, err = db.scan(metaDataType, nil, 1024, false, "")
for len(keys) != 0 || err != nil { for len(keys) != 0 || err != nil {
for _, key := range keys { for _, key := range keys {
deleteFunc(t, key) deleteFunc(t, key)
@ -159,7 +159,7 @@ func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
} else { } else {
drop += int64(len(keys)) drop += int64(len(keys))
} }
keys, err = db.scan(metaDataType, nil, 1024, false) keys, err = db.scan(metaDataType, nil, 1024, false, "")
} }
return return
} }

View File

@ -1,16 +1,25 @@
package ledis package ledis
import ( import (
"bytes"
"errors" "errors"
"github.com/siddontang/ledisdb/store" "regexp"
) )
var errDataType = errors.New("error data type") var errDataType = errors.New("error data type")
var errMetaKey = errors.New("error meta key") var errMetaKey = errors.New("error meta key")
func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match string) ([][]byte, error) {
var minKey, maxKey []byte var minKey, maxKey []byte
var err error var err error
var r *regexp.Regexp
if len(match) > 0 {
if r, err = regexp.Compile(match); err != nil {
return nil, err
}
}
if key != nil { if key != nil {
if err = checkKeySize(key); err != nil { if err = checkKeySize(key); err != nil {
return nil, err return nil, err
@ -35,18 +44,23 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool) ([][]by
v := make([][]byte, 0, count) v := make([][]byte, 0, count)
rangeType := store.RangeROpen it := db.bucket.NewIterator()
it.Seek(minKey)
if !inclusive { if !inclusive {
rangeType = store.RangeOpen if it.Valid() && bytes.Equal(it.RawKey(), minKey) {
it.Next()
}
} }
it := db.bucket.RangeLimitIterator(minKey, maxKey, rangeType, 0, count) for i := 0; it.Valid() && i < count && bytes.Compare(it.RawKey(), maxKey) < 0; it.Next() {
for ; it.Valid(); it.Next() {
if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil { if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil {
continue continue
} else if r != nil && !r.Match(k) {
continue
} else { } else {
v = append(v, k) v = append(v, k)
i++
} }
} }
it.Close() it.Close()

View File

@ -9,7 +9,7 @@ func TestDBScan(t *testing.T) {
db.FlushAll() db.FlushAll()
if v, err := db.Scan(nil, 10, true); err != nil { if v, err := db.Scan(nil, 10, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 0 { } else if len(v) != 0 {
t.Fatal(len(v)) t.Fatal(len(v))
@ -19,23 +19,42 @@ func TestDBScan(t *testing.T) {
db.Set([]byte("b"), []byte{}) db.Set([]byte("b"), []byte{})
db.Set([]byte("c"), []byte{}) db.Set([]byte("c"), []byte{})
if v, err := db.Scan(nil, 1, true); err != nil { if v, err := db.Scan(nil, 1, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 1 { } else if len(v) != 1 {
t.Fatal(len(v)) t.Fatal(len(v))
} }
if v, err := db.Scan([]byte("a"), 2, false); err != nil { if v, err := db.Scan([]byte("a"), 2, false, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal(len(v)) t.Fatal(len(v))
} }
if v, err := db.Scan(nil, 3, true); err != nil { if v, err := db.Scan(nil, 3, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 3 { } else if len(v) != 3 {
t.Fatal(len(v)) t.Fatal(len(v))
} }
if v, err := db.Scan(nil, 3, true, "b"); err != nil {
t.Fatal(err)
} else if len(v) != 1 {
t.Fatal(len(v))
}
if v, err := db.Scan(nil, 3, true, "."); err != nil {
t.Fatal(err)
} else if len(v) != 3 {
t.Fatal(len(v))
}
if v, err := db.Scan(nil, 3, true, "a+"); err != nil {
t.Fatal(err)
} else if len(v) != 1 {
t.Fatal(len(v))
}
} }
func TestDBHScan(t *testing.T) { func TestDBHScan(t *testing.T) {
@ -52,7 +71,7 @@ func TestDBHScan(t *testing.T) {
k3 := []byte("k3") k3 := []byte("k3")
db.HSet(k3, []byte("3"), []byte{}) db.HSet(k3, []byte("3"), []byte{})
if v, err := db.HScan(nil, 1, true); err != nil { if v, err := db.HScan(nil, 1, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 1 { } else if len(v) != 1 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -60,7 +79,7 @@ func TestDBHScan(t *testing.T) {
t.Fatal("invalid value ", string(v[0])) t.Fatal("invalid value ", string(v[0]))
} }
if v, err := db.HScan(k1, 2, true); err != nil { if v, err := db.HScan(k1, 2, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -70,7 +89,7 @@ func TestDBHScan(t *testing.T) {
t.Fatal("invalid value ", string(v[1])) t.Fatal("invalid value ", string(v[1]))
} }
if v, err := db.HScan(k1, 2, false); err != nil { if v, err := db.HScan(k1, 2, false, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -96,7 +115,7 @@ func TestDBZScan(t *testing.T) {
k3 := []byte("k3") k3 := []byte("k3")
db.ZAdd(k3, ScorePair{3, []byte("m")}) db.ZAdd(k3, ScorePair{3, []byte("m")})
if v, err := db.ZScan(nil, 1, true); err != nil { if v, err := db.ZScan(nil, 1, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 1 { } else if len(v) != 1 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -104,7 +123,7 @@ func TestDBZScan(t *testing.T) {
t.Fatal("invalid value ", string(v[0])) t.Fatal("invalid value ", string(v[0]))
} }
if v, err := db.ZScan(k1, 2, true); err != nil { if v, err := db.ZScan(k1, 2, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -114,7 +133,7 @@ func TestDBZScan(t *testing.T) {
t.Fatal("invalid value ", string(v[1])) t.Fatal("invalid value ", string(v[1]))
} }
if v, err := db.ZScan(k1, 2, false); err != nil { if v, err := db.ZScan(k1, 2, false, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -146,7 +165,7 @@ func TestDBLScan(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
if v, err := db.LScan(nil, 1, true); err != nil { if v, err := db.LScan(nil, 1, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 1 { } else if len(v) != 1 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -154,7 +173,7 @@ func TestDBLScan(t *testing.T) {
t.Fatal("invalid value ", string(v[0])) t.Fatal("invalid value ", string(v[0]))
} }
if v, err := db.LScan(k1, 2, true); err != nil { if v, err := db.LScan(k1, 2, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -164,7 +183,7 @@ func TestDBLScan(t *testing.T) {
t.Fatal("invalid value ", string(v[1])) t.Fatal("invalid value ", string(v[1]))
} }
if v, err := db.LScan(k1, 2, false); err != nil { if v, err := db.LScan(k1, 2, false, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -196,7 +215,7 @@ func TestDBBScan(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
if v, err := db.BScan(nil, 1, true); err != nil { if v, err := db.BScan(nil, 1, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 1 { } else if len(v) != 1 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -204,7 +223,7 @@ func TestDBBScan(t *testing.T) {
t.Fatal("invalid value ", string(v[0])) t.Fatal("invalid value ", string(v[0]))
} }
if v, err := db.BScan(k1, 2, true); err != nil { if v, err := db.BScan(k1, 2, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -214,7 +233,7 @@ func TestDBBScan(t *testing.T) {
t.Fatal("invalid value ", string(v[1])) t.Fatal("invalid value ", string(v[1]))
} }
if v, err := db.BScan(k1, 2, false); err != nil { if v, err := db.BScan(k1, 2, false, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -246,7 +265,7 @@ func TestDBSScan(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
if v, err := db.SScan(nil, 1, true); err != nil { if v, err := db.SScan(nil, 1, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 1 { } else if len(v) != 1 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -254,7 +273,7 @@ func TestDBSScan(t *testing.T) {
t.Fatal("invalid value ", string(v[0])) t.Fatal("invalid value ", string(v[0]))
} }
if v, err := db.SScan(k1, 2, true); err != nil { if v, err := db.SScan(k1, 2, true, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))
@ -264,7 +283,7 @@ func TestDBSScan(t *testing.T) {
t.Fatal("invalid value ", string(v[1])) t.Fatal("invalid value ", string(v[1]))
} }
if v, err := db.SScan(k1, 2, false); err != nil { if v, err := db.SScan(k1, 2, false, ""); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(v) != 2 { } else if len(v) != 2 {
t.Fatal("invalid length ", len(v)) t.Fatal("invalid length ", len(v))

View File

@ -908,8 +908,8 @@ func (db *DB) BPersist(key []byte) (int64, error) {
return n, err return n, err
} }
func (db *DB) BScan(key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) BScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
return db.scan(BitMetaType, key, count, inclusive) return db.scan(BitMetaType, key, count, inclusive, match)
} }
func (db *DB) bFlush() (drop int64, err error) { func (db *DB) bFlush() (drop int64, err error) {

View File

@ -551,7 +551,7 @@ func testBFlush(t *testing.T) {
} }
} }
if v, err := db.BScan(nil, 3000, true); err != nil { if v, err := db.BScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 2000 { } else if len(v) != 2000 {
t.Fatal("invalid value ", len(v)) t.Fatal("invalid value ", len(v))
@ -573,7 +573,7 @@ func testBFlush(t *testing.T) {
t.Fatal("invalid value ", n) t.Fatal("invalid value ", n)
} }
if v, err := db.BScan(nil, 3000, true); err != nil { if v, err := db.BScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 0 { } else if len(v) != 0 {
t.Fatal("invalid value length ", len(v)) t.Fatal("invalid value length ", len(v))

View File

@ -461,8 +461,8 @@ func (db *DB) hFlush() (drop int64, err error) {
return db.flushType(t, HashType) return db.flushType(t, HashType)
} }
func (db *DB) HScan(key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) HScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
return db.scan(HSizeType, key, count, inclusive) return db.scan(HSizeType, key, count, inclusive, match)
} }
func (db *DB) HExpire(key []byte, duration int64) (int64, error) { func (db *DB) HExpire(key []byte, duration int64) (int64, error) {

View File

@ -91,7 +91,7 @@ func TestHFlush(t *testing.T) {
} }
} }
if v, err := db.HScan(nil, 3000, true); err != nil { if v, err := db.HScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 2000 { } else if len(v) != 2000 {
t.Fatal("invalid value ", len(v)) t.Fatal("invalid value ", len(v))
@ -112,7 +112,7 @@ func TestHFlush(t *testing.T) {
t.Fatal("invalid value ", n) t.Fatal("invalid value ", n)
} }
if v, err := db.HScan(nil, 3000, true); err != nil { if v, err := db.HScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 0 { } else if len(v) != 0 {
t.Fatal("invalid value length ", len(v)) t.Fatal("invalid value length ", len(v))

View File

@ -313,8 +313,8 @@ func (db *DB) flush() (drop int64, err error) {
} }
//if inclusive is true, scan range [key, inf) else (key, inf) //if inclusive is true, scan range [key, inf) else (key, inf)
func (db *DB) Scan(key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) Scan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
return db.scan(KVType, key, count, inclusive) return db.scan(KVType, key, count, inclusive, match)
} }
func (db *DB) Expire(key []byte, duration int64) (int64, error) { func (db *DB) Expire(key []byte, duration int64) (int64, error) {

View File

@ -77,7 +77,7 @@ func TestKVFlush(t *testing.T) {
} }
} }
if v, err := db.Scan(nil, 3000, true); err != nil { if v, err := db.Scan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 2000 { } else if len(v) != 2000 {
t.Fatal("invalid value ", len(v)) t.Fatal("invalid value ", len(v))
@ -98,7 +98,7 @@ func TestKVFlush(t *testing.T) {
t.Fatal("invalid value ", n) t.Fatal("invalid value ", n)
} }
if v, err := db.Scan(nil, 3000, true); err != nil { if v, err := db.Scan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 0 { } else if len(v) != 0 {
t.Fatal("invalid value length ", len(v)) t.Fatal("invalid value length ", len(v))

View File

@ -472,8 +472,8 @@ func (db *DB) LPersist(key []byte) (int64, error) {
return n, err return n, err
} }
func (db *DB) LScan(key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) LScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
return db.scan(LMetaType, key, count, inclusive) return db.scan(LMetaType, key, count, inclusive, match)
} }
func (db *DB) lEncodeMinKey() []byte { func (db *DB) lEncodeMinKey() []byte {

View File

@ -113,7 +113,7 @@ func TestLFlush(t *testing.T) {
} }
} }
if v, err := db.LScan(nil, 3000, true); err != nil { if v, err := db.LScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 2000 { } else if len(v) != 2000 {
t.Fatal("invalid value ", len(v)) t.Fatal("invalid value ", len(v))
@ -125,32 +125,7 @@ func TestLFlush(t *testing.T) {
t.Fatal("invalid value ", n) t.Fatal("invalid value ", n)
} }
if v, err := db.LScan(nil, 3000, true); err != nil { if v, err := db.LScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error())
} else if len(v) != 0 {
t.Fatal("invalid value length ", len(v))
}
for i := 0; i < 2000; i++ {
key := fmt.Sprintf("%d", i)
if _, err := db.ZAdd([]byte(key), ScorePair{1, []byte("v")}); err != nil {
t.Fatal(err.Error())
}
}
if v, err := db.ZScan(nil, 3000, true); err != nil {
t.Fatal(err.Error())
} else if len(v) != 2000 {
t.Fatal("invalid value ", len(v))
}
if n, err := db.zFlush(); err != nil {
t.Fatal(err.Error())
} else if n != 2000 {
t.Fatal("invalid value ", n)
}
if v, err := db.ZScan(nil, 3000, true); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 0 { } else if len(v) != 0 {
t.Fatal("invalid value length ", len(v)) t.Fatal("invalid value length ", len(v))

View File

@ -595,6 +595,6 @@ func (db *DB) SPersist(key []byte) (int64, error) {
return n, err return n, err
} }
func (db *DB) SScan(key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) SScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
return db.scan(SSizeType, key, count, inclusive) return db.scan(SSizeType, key, count, inclusive, match)
} }

View File

@ -352,7 +352,7 @@ func TestSFlush(t *testing.T) {
} }
} }
if v, err := db.SScan(nil, 3000, true); err != nil { if v, err := db.SScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 2000 { } else if len(v) != 2000 {
t.Fatal("invalid value ", len(v)) t.Fatal("invalid value ", len(v))
@ -364,7 +364,7 @@ func TestSFlush(t *testing.T) {
t.Fatal("invalid value ", n) t.Fatal("invalid value ", n)
} }
if v, err := db.SScan(nil, 3000, true); err != nil { if v, err := db.SScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} else if len(v) != 0 { } else if len(v) != 0 {
t.Fatal("invalid value length ", len(v)) t.Fatal("invalid value length ", len(v))

View File

@ -937,6 +937,6 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg
return num, nil return num, nil
} }
func (db *DB) ZScan(key []byte, count int, inclusive bool) ([][]byte, error) { func (db *DB) ZScan(key []byte, count int, inclusive bool, match string) ([][]byte, error) {
return db.scan(ZSizeType, key, count, inclusive) return db.scan(ZSizeType, key, count, inclusive, match)
} }

View File

@ -377,3 +377,33 @@ func TestZInterStore(t *testing.T) {
t.Fatal("invalid value ", n) t.Fatal("invalid value ", n)
} }
} }
func TestZScan(t *testing.T) {
db := getTestDB()
db.FlushAll()
for i := 0; i < 2000; i++ {
key := fmt.Sprintf("%d", i)
if _, err := db.ZAdd([]byte(key), ScorePair{1, []byte("v")}); err != nil {
t.Fatal(err.Error())
}
}
if v, err := db.ZScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error())
} else if len(v) != 2000 {
t.Fatal("invalid value ", len(v))
}
if n, err := db.zFlush(); err != nil {
t.Fatal(err.Error())
} else if n != 2000 {
t.Fatal("invalid value ", n)
}
if v, err := db.ZScan(nil, 3000, true, ""); err != nil {
t.Fatal(err.Error())
} else if len(v) != 0 {
t.Fatal("invalid value length ", len(v))
}
}