forked from mirror/redis
Merge pull request #310 from go-redis/feature/scan-iterators
Scan iterators (v4)
This commit is contained in:
commit
2add1e06fb
19
command.go
19
command.go
|
@ -694,41 +694,42 @@ type ScanCmd struct {
|
||||||
baseCmd
|
baseCmd
|
||||||
|
|
||||||
cursor int64
|
cursor int64
|
||||||
keys []string
|
page []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScanCmd(args ...interface{}) *ScanCmd {
|
func NewScanCmd(args ...interface{}) *ScanCmd {
|
||||||
return &ScanCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
|
return &ScanCmd{
|
||||||
|
baseCmd: baseCmd{_args: args, _clusterKeyPos: 1},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ScanCmd) reset() {
|
func (cmd *ScanCmd) reset() {
|
||||||
cmd.cursor = 0
|
cmd.cursor = 0
|
||||||
cmd.keys = nil
|
cmd.page = nil
|
||||||
cmd.err = nil
|
cmd.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: cursor should be string to match redis type
|
|
||||||
// TODO: swap return values
|
// TODO: swap return values
|
||||||
|
|
||||||
func (cmd *ScanCmd) Val() (int64, []string) {
|
func (cmd *ScanCmd) Val() (int64, []string) {
|
||||||
return cmd.cursor, cmd.keys
|
return cmd.cursor, cmd.page
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ScanCmd) Result() (int64, []string, error) {
|
func (cmd *ScanCmd) Result() (int64, []string, error) {
|
||||||
return cmd.cursor, cmd.keys, cmd.err
|
return cmd.cursor, cmd.page, cmd.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ScanCmd) String() string {
|
func (cmd *ScanCmd) String() string {
|
||||||
return cmdString(cmd, cmd.keys)
|
return cmdString(cmd, cmd.page)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
|
func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
|
||||||
keys, cursor, err := readScanReply(cn)
|
page, cursor, err := readScanReply(cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.err = err
|
cmd.err = err
|
||||||
return cmd.err
|
return cmd.err
|
||||||
}
|
}
|
||||||
cmd.keys = keys
|
cmd.page = page
|
||||||
cmd.cursor = cursor
|
cmd.cursor = cursor
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
28
commands.go
28
commands.go
|
@ -318,7 +318,7 @@ func (c *commandable) Type(key string) *StatusCmd {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandable) Scan(cursor int64, match string, count int64) *ScanCmd {
|
func (c *commandable) Scan(cursor int64, match string, count int64) Scanner {
|
||||||
args := []interface{}{"SCAN", cursor}
|
args := []interface{}{"SCAN", cursor}
|
||||||
if match != "" {
|
if match != "" {
|
||||||
args = append(args, "MATCH", match)
|
args = append(args, "MATCH", match)
|
||||||
|
@ -328,10 +328,13 @@ func (c *commandable) Scan(cursor int64, match string, count int64) *ScanCmd {
|
||||||
}
|
}
|
||||||
cmd := NewScanCmd(args...)
|
cmd := NewScanCmd(args...)
|
||||||
c.Process(cmd)
|
c.Process(cmd)
|
||||||
return cmd
|
return Scanner{
|
||||||
|
client: c,
|
||||||
|
ScanCmd: cmd,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandable) SScan(key string, cursor int64, match string, count int64) *ScanCmd {
|
func (c *commandable) SScan(key string, cursor int64, match string, count int64) Scanner {
|
||||||
args := []interface{}{"SSCAN", key, cursor}
|
args := []interface{}{"SSCAN", key, cursor}
|
||||||
if match != "" {
|
if match != "" {
|
||||||
args = append(args, "MATCH", match)
|
args = append(args, "MATCH", match)
|
||||||
|
@ -341,10 +344,13 @@ func (c *commandable) SScan(key string, cursor int64, match string, count int64)
|
||||||
}
|
}
|
||||||
cmd := NewScanCmd(args...)
|
cmd := NewScanCmd(args...)
|
||||||
c.Process(cmd)
|
c.Process(cmd)
|
||||||
return cmd
|
return Scanner{
|
||||||
|
client: c,
|
||||||
|
ScanCmd: cmd,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandable) HScan(key string, cursor int64, match string, count int64) *ScanCmd {
|
func (c *commandable) HScan(key string, cursor int64, match string, count int64) Scanner {
|
||||||
args := []interface{}{"HSCAN", key, cursor}
|
args := []interface{}{"HSCAN", key, cursor}
|
||||||
if match != "" {
|
if match != "" {
|
||||||
args = append(args, "MATCH", match)
|
args = append(args, "MATCH", match)
|
||||||
|
@ -354,10 +360,13 @@ func (c *commandable) HScan(key string, cursor int64, match string, count int64)
|
||||||
}
|
}
|
||||||
cmd := NewScanCmd(args...)
|
cmd := NewScanCmd(args...)
|
||||||
c.Process(cmd)
|
c.Process(cmd)
|
||||||
return cmd
|
return Scanner{
|
||||||
|
client: c,
|
||||||
|
ScanCmd: cmd,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandable) ZScan(key string, cursor int64, match string, count int64) *ScanCmd {
|
func (c *commandable) ZScan(key string, cursor int64, match string, count int64) Scanner {
|
||||||
args := []interface{}{"ZSCAN", key, cursor}
|
args := []interface{}{"ZSCAN", key, cursor}
|
||||||
if match != "" {
|
if match != "" {
|
||||||
args = append(args, "MATCH", match)
|
args = append(args, "MATCH", match)
|
||||||
|
@ -367,7 +376,10 @@ func (c *commandable) ZScan(key string, cursor int64, match string, count int64)
|
||||||
}
|
}
|
||||||
cmd := NewScanCmd(args...)
|
cmd := NewScanCmd(args...)
|
||||||
c.Process(cmd)
|
c.Process(cmd)
|
||||||
return cmd
|
return Scanner{
|
||||||
|
client: c,
|
||||||
|
ScanCmd: cmd,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
|
@ -314,3 +314,23 @@ func Example_customCommand() {
|
||||||
fmt.Printf("%q %s", v, err)
|
fmt.Printf("%q %s", v, err)
|
||||||
// Output: "" redis: nil
|
// Output: "" redis: nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ExampleScanIterator() {
|
||||||
|
iter := client.Scan(0, "", 0).Iterator()
|
||||||
|
for iter.Next() {
|
||||||
|
fmt.Println(iter.Val())
|
||||||
|
}
|
||||||
|
if err := iter.Err(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleScanCmd_Iterator() {
|
||||||
|
iter := client.Scan(0, "", 0).Iterator()
|
||||||
|
for iter.Next() {
|
||||||
|
fmt.Println(iter.Val())
|
||||||
|
}
|
||||||
|
if err := iter.Err(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
package redis
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type Scanner struct {
|
||||||
|
client *commandable
|
||||||
|
*ScanCmd
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterator creates a new ScanIterator.
|
||||||
|
func (s Scanner) Iterator() *ScanIterator {
|
||||||
|
return &ScanIterator{
|
||||||
|
Scanner: s,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ScanIterator is used to incrementally iterate over a collection of elements.
|
||||||
|
// It's safe for concurrent use by multiple goroutines.
|
||||||
|
type ScanIterator struct {
|
||||||
|
mu sync.Mutex // protects Scanner and pos
|
||||||
|
Scanner
|
||||||
|
pos int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns the last iterator error, if any.
|
||||||
|
func (it *ScanIterator) Err() error {
|
||||||
|
it.mu.Lock()
|
||||||
|
err := it.ScanCmd.Err()
|
||||||
|
it.mu.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next advances the cursor and returns true if more values can be read.
|
||||||
|
func (it *ScanIterator) Next() bool {
|
||||||
|
it.mu.Lock()
|
||||||
|
defer it.mu.Unlock()
|
||||||
|
|
||||||
|
// Instantly return on errors.
|
||||||
|
if it.ScanCmd.Err() != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance cursor, check if we are still within range.
|
||||||
|
if it.pos < len(it.ScanCmd.page) {
|
||||||
|
it.pos++
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return if there is more data to fetch.
|
||||||
|
if it.ScanCmd.cursor == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch next page.
|
||||||
|
it.ScanCmd._args[1] = it.ScanCmd.cursor
|
||||||
|
it.ScanCmd.reset()
|
||||||
|
it.client.Process(it.ScanCmd)
|
||||||
|
if it.ScanCmd.Err() != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
it.pos = 1
|
||||||
|
return len(it.ScanCmd.page) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Val returns the key/field at the current cursor position.
|
||||||
|
func (it *ScanIterator) Val() string {
|
||||||
|
var v string
|
||||||
|
it.mu.Lock()
|
||||||
|
if it.ScanCmd.Err() == nil && it.pos > 0 && it.pos <= len(it.ScanCmd.page) {
|
||||||
|
v = it.ScanCmd.page[it.pos-1]
|
||||||
|
}
|
||||||
|
it.mu.Unlock()
|
||||||
|
return v
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
package redis_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
"gopkg.in/redis.v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("ScanIterator", func() {
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
var seed = func(n int) error {
|
||||||
|
pipe := client.Pipeline()
|
||||||
|
for i := 1; i <= n; i++ {
|
||||||
|
pipe.Set(fmt.Sprintf("K%02d", i), "x", 0).Err()
|
||||||
|
}
|
||||||
|
_, err := pipe.Exec()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
client = redis.NewClient(redisOptions())
|
||||||
|
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should scan across empty DBs", func() {
|
||||||
|
iter := client.Scan(0, "", 10).Iterator()
|
||||||
|
Expect(iter.Next()).To(BeFalse())
|
||||||
|
Expect(iter.Err()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should scan across one page", func() {
|
||||||
|
Expect(seed(7)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
var vals []string
|
||||||
|
iter := client.Scan(0, "", 0).Iterator()
|
||||||
|
for iter.Next() {
|
||||||
|
vals = append(vals, iter.Val())
|
||||||
|
}
|
||||||
|
Expect(iter.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(vals).To(ConsistOf([]string{"K01", "K02", "K03", "K04", "K05", "K06", "K07"}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should scan across multiple pages", func() {
|
||||||
|
Expect(seed(71)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
var vals []string
|
||||||
|
iter := client.Scan(0, "", 10).Iterator()
|
||||||
|
for iter.Next() {
|
||||||
|
vals = append(vals, iter.Val())
|
||||||
|
}
|
||||||
|
Expect(iter.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(vals).To(HaveLen(71))
|
||||||
|
Expect(vals).To(ContainElement("K01"))
|
||||||
|
Expect(vals).To(ContainElement("K71"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should scan to page borders", func() {
|
||||||
|
Expect(seed(20)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
var vals []string
|
||||||
|
iter := client.Scan(0, "", 10).Iterator()
|
||||||
|
for iter.Next() {
|
||||||
|
vals = append(vals, iter.Val())
|
||||||
|
}
|
||||||
|
Expect(iter.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(vals).To(HaveLen(20))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should scan with match", func() {
|
||||||
|
Expect(seed(33)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
var vals []string
|
||||||
|
iter := client.Scan(0, "K*2*", 10).Iterator()
|
||||||
|
for iter.Next() {
|
||||||
|
vals = append(vals, iter.Val())
|
||||||
|
}
|
||||||
|
Expect(iter.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(vals).To(HaveLen(13))
|
||||||
|
})
|
||||||
|
|
||||||
|
})
|
3
redis.go
3
redis.go
|
@ -146,12 +146,13 @@ type Client struct {
|
||||||
|
|
||||||
func newClient(opt *Options, pool pool.Pooler) *Client {
|
func newClient(opt *Options, pool pool.Pooler) *Client {
|
||||||
base := baseClient{opt: opt, connPool: pool}
|
base := baseClient{opt: opt, connPool: pool}
|
||||||
return &Client{
|
client := &Client{
|
||||||
baseClient: base,
|
baseClient: base,
|
||||||
commandable: commandable{
|
commandable: commandable{
|
||||||
process: base.process,
|
process: base.process,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient returns a client to the Redis Server specified by Options.
|
// NewClient returns a client to the Redis Server specified by Options.
|
||||||
|
|
Loading…
Reference in New Issue