Merge pull request #326 from go-redis/fix/stateful-commands

Move Select to stateful commands and make it available only via Pipel…
This commit is contained in:
Vladimir Mihailenco 2016-06-05 13:01:00 +03:00
commit 08d3790ec5
13 changed files with 481 additions and 485 deletions

View File

@ -21,7 +21,7 @@ type clusterNode struct {
// or more underlying connections. It's safe for concurrent use by // or more underlying connections. It's safe for concurrent use by
// multiple goroutines. // multiple goroutines.
type ClusterClient struct { type ClusterClient struct {
commandable cmdable
opt *ClusterOptions opt *ClusterOptions
@ -51,7 +51,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
cmdsInfoOnce: new(sync.Once), cmdsInfoOnce: new(sync.Once),
} }
client.commandable.process = client.process client.cmdable.process = client.Process
for _, addr := range opt.Addrs { for _, addr := range opt.Addrs {
_ = client.nodeByAddr(addr) _ = client.nodeByAddr(addr)
@ -242,7 +242,7 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode) {
return slot, c.slotMasterNode(slot) return slot, c.slotMasterNode(slot)
} }
func (c *ClusterClient) process(cmd Cmder) { func (c *ClusterClient) Process(cmd Cmder) {
var ask bool var ask bool
slot, node := c.cmdSlotAndNode(cmd) slot, node := c.cmdSlotAndNode(cmd)
@ -398,11 +398,12 @@ func (c *ClusterClient) reaper(frequency time.Duration) {
} }
func (c *ClusterClient) Pipeline() *Pipeline { func (c *ClusterClient) Pipeline() *Pipeline {
pipe := &Pipeline{ pipe := Pipeline{
exec: c.pipelineExec, exec: c.pipelineExec,
} }
pipe.commandable.process = pipe.process pipe.cmdable.process = pipe.Process
return pipe pipe.statefulCmdable.process = pipe.Process
return &pipe
} }
func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {

View File

@ -300,17 +300,17 @@ var _ = Describe("Cluster", func() {
Expect(nodesList).Should(HaveLen(1)) Expect(nodesList).Should(HaveLen(1))
}) })
It("should CLUSTER READONLY", func() { // It("should CLUSTER READONLY", func() {
res, err := cluster.primary().ReadOnly().Result() // res, err := cluster.primary().ReadOnly().Result()
Expect(err).NotTo(HaveOccurred()) // Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK")) // Expect(res).To(Equal("OK"))
}) // })
It("should CLUSTER READWRITE", func() { // It("should CLUSTER READWRITE", func() {
res, err := cluster.primary().ReadWrite().Result() // res, err := cluster.primary().ReadWrite().Result()
Expect(err).NotTo(HaveOccurred()) // Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK")) // Expect(res).To(Equal("OK"))
}) // })
}) })
Describe("Client", func() { Describe("Client", func() {

File diff suppressed because it is too large Load Diff

View File

@ -26,11 +26,11 @@ var _ = Describe("Commands", func() {
Describe("server", func() { Describe("server", func() {
It("should Auth", func() { // It("should Auth", func() {
auth := client.Auth("password") // auth := client.Auth("password")
Expect(auth.Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) // Expect(auth.Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
Expect(auth.Val()).To(Equal("")) // Expect(auth.Val()).To(Equal(""))
}) // })
It("should Echo", func() { It("should Echo", func() {
echo := client.Echo("hello") echo := client.Echo("hello")
@ -44,11 +44,11 @@ var _ = Describe("Commands", func() {
Expect(ping.Val()).To(Equal("PONG")) Expect(ping.Val()).To(Equal("PONG"))
}) })
It("should Select", func() { // It("should Select", func() {
sel := client.Select(1) // sel := client.Select(1)
Expect(sel.Err()).NotTo(HaveOccurred()) // Expect(sel.Err()).NotTo(HaveOccurred())
Expect(sel.Val()).To(Equal("OK")) // Expect(sel.Val()).To(Equal("OK"))
}) // })
It("should BgRewriteAOF", func() { It("should BgRewriteAOF", func() {
Skip("flaky test") Skip("flaky test")
@ -309,15 +309,14 @@ var _ = Describe("Commands", func() {
Expect(get.Err()).To(Equal(redis.Nil)) Expect(get.Err()).To(Equal(redis.Nil))
Expect(get.Val()).To(Equal("")) Expect(get.Val()).To(Equal(""))
sel := client.Select(2) pipe := client.Pipeline()
Expect(sel.Err()).NotTo(HaveOccurred()) pipe.Select(2)
Expect(sel.Val()).To(Equal("OK")) get = pipe.Get("key")
pipe.FlushDb()
get = client.Get("key") _, err := pipe.Exec()
Expect(get.Err()).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("hello")) Expect(get.Val()).To(Equal("hello"))
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
Expect(client.Select(1).Err()).NotTo(HaveOccurred())
}) })
It("should Object", func() { It("should Object", func() {

View File

@ -3,7 +3,7 @@ package redis
import "sync" import "sync"
type Scanner struct { type Scanner struct {
client *commandable client cmdable
*ScanCmd *ScanCmd
} }
@ -54,7 +54,7 @@ func (it *ScanIterator) Next() bool {
// Fetch next page. // Fetch next page.
it.ScanCmd._args[1] = it.ScanCmd.cursor it.ScanCmd._args[1] = it.ScanCmd.cursor
it.ScanCmd.reset() it.ScanCmd.reset()
it.client.Process(it.ScanCmd) it.client.process(it.ScanCmd)
if it.ScanCmd.Err() != nil { if it.ScanCmd.Err() != nil {
return false return false
} }

View File

@ -22,7 +22,7 @@ type Options struct {
// requirepass server configuration option. // requirepass server configuration option.
Password string Password string
// A database to be selected after connecting to server. // A database to be selected after connecting to server.
DB int64 DB int
// The maximum number of retries before giving up. // The maximum number of retries before giving up.
// Default is to not retry failed commands. // Default is to not retry failed commands.

View File

@ -11,7 +11,8 @@ import (
// http://redis.io/topics/pipelining. It's safe for concurrent use // http://redis.io/topics/pipelining. It's safe for concurrent use
// by multiple goroutines. // by multiple goroutines.
type Pipeline struct { type Pipeline struct {
commandable cmdable
statefulCmdable
exec func([]Cmder) error exec func([]Cmder) error
@ -21,7 +22,7 @@ type Pipeline struct {
closed int32 closed int32
} }
func (pipe *Pipeline) process(cmd Cmder) { func (pipe *Pipeline) Process(cmd Cmder) {
pipe.mu.Lock() pipe.mu.Lock()
pipe.cmds = append(pipe.cmds, cmd) pipe.cmds = append(pipe.cmds, cmd)
pipe.mu.Unlock() pipe.mu.Unlock()

View File

@ -20,7 +20,7 @@ func (c *Client) Publish(channel, message string) *IntCmd {
// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by // http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
// multiple goroutines. // multiple goroutines.
type PubSub struct { type PubSub struct {
base *baseClient base baseClient
channels []string channels []string
patterns []string patterns []string
@ -31,7 +31,7 @@ type PubSub struct {
// Deprecated. Use Subscribe/PSubscribe instead. // Deprecated. Use Subscribe/PSubscribe instead.
func (c *Client) PubSub() *PubSub { func (c *Client) PubSub() *PubSub {
return &PubSub{ return &PubSub{
base: &baseClient{ base: baseClient{
opt: c.opt, opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false), connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
}, },

View File

@ -172,7 +172,7 @@ var _ = Describe("races", func() {
perform(C, func(id int) { perform(C, func(id int) {
opt := redisOptions() opt := redisOptions()
opt.DB = int64(id) opt.DB = id
client := redis.NewClient(opt) client := redis.NewClient(opt)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
err := client.Set("db", id, 0).Err() err := client.Set("db", id, 0).Err()
@ -194,7 +194,7 @@ var _ = Describe("races", func() {
It("should select DB with read timeout", func() { It("should select DB with read timeout", func() {
perform(C, func(id int) { perform(C, func(id int) {
opt := redisOptions() opt := redisOptions()
opt.DB = int64(id) opt.DB = id
opt.ReadTimeout = time.Nanosecond opt.ReadTimeout = time.Nanosecond
client := redis.NewClient(opt) client := redis.NewClient(opt)

View File

@ -56,29 +56,25 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Temp client for Auth and Select. // Temp client for Auth and Select.
client := newClient(c.opt, pool.NewSingleConnPool(cn)) client := newClient(c.opt, pool.NewSingleConnPool(cn))
_, err := client.Pipelined(func(pipe *Pipeline) error {
if c.opt.Password != "" { if c.opt.Password != "" {
if err := client.Auth(c.opt.Password).Err(); err != nil { pipe.Auth(c.opt.Password)
return err
}
} }
if c.opt.DB > 0 { if c.opt.DB > 0 {
if err := client.Select(c.opt.DB).Err(); err != nil { pipe.Select(c.opt.DB)
return err
}
} }
if c.opt.ReadOnly { if c.opt.ReadOnly {
if err := client.ReadOnly().Err(); err != nil { pipe.ReadOnly()
return err
}
} }
return nil return nil
})
return err
} }
func (c *baseClient) process(cmd Cmder) { func (c *baseClient) Process(cmd Cmder) {
for i := 0; i <= c.opt.MaxRetries; i++ { for i := 0; i <= c.opt.MaxRetries; i++ {
if i > 0 { if i > 0 {
cmd.reset() cmd.reset()
@ -145,16 +141,14 @@ func (c *baseClient) Close() error {
// goroutines. // goroutines.
type Client struct { type Client struct {
baseClient baseClient
commandable cmdable
} }
func newClient(opt *Options, pool pool.Pooler) *Client { func newClient(opt *Options, pool pool.Pooler) *Client {
base := baseClient{opt: opt, connPool: pool} base := baseClient{opt: opt, connPool: pool}
client := &Client{ client := &Client{
baseClient: base, baseClient: base,
commandable: commandable{ cmdable: cmdable{base.Process},
process: base.process,
},
} }
return client return client
} }
@ -178,11 +172,12 @@ func (c *Client) PoolStats() *PoolStats {
} }
func (c *Client) Pipeline() *Pipeline { func (c *Client) Pipeline() *Pipeline {
pipe := &Pipeline{ pipe := Pipeline{
exec: c.pipelineExec, exec: c.pipelineExec,
} }
pipe.commandable.process = pipe.process pipe.cmdable.process = pipe.Process
return pipe pipe.statefulCmdable.process = pipe.Process
return &pipe
} }
func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {

17
ring.go
View File

@ -24,7 +24,7 @@ type RingOptions struct {
// Following options are copied from Options struct. // Following options are copied from Options struct.
DB int64 DB int
Password string Password string
MaxRetries int MaxRetries int
@ -110,7 +110,7 @@ func (shard *ringShard) Vote(up bool) bool {
// and can tolerate losing data when one of the servers dies. // and can tolerate losing data when one of the servers dies.
// Otherwise you should use Redis Cluster. // Otherwise you should use Redis Cluster.
type Ring struct { type Ring struct {
commandable cmdable
opt *RingOptions opt *RingOptions
nreplicas int nreplicas int
@ -136,7 +136,7 @@ func NewRing(opt *RingOptions) *Ring {
cmdsInfoOnce: new(sync.Once), cmdsInfoOnce: new(sync.Once),
} }
ring.commandable.process = ring.process ring.cmdable.process = ring.Process
for name, addr := range opt.Addrs { for name, addr := range opt.Addrs {
clopt := opt.clientOptions() clopt := opt.clientOptions()
clopt.Addr = addr clopt.Addr = addr
@ -196,13 +196,13 @@ func (ring *Ring) getClient(key string) (*Client, error) {
return cl, nil return cl, nil
} }
func (ring *Ring) process(cmd Cmder) { func (ring *Ring) Process(cmd Cmder) {
cl, err := ring.getClient(ring.cmdFirstKey(cmd)) cl, err := ring.getClient(ring.cmdFirstKey(cmd))
if err != nil { if err != nil {
cmd.setErr(err) cmd.setErr(err)
return return
} }
cl.baseClient.process(cmd) cl.baseClient.Process(cmd)
} }
// rebalance removes dead shards from the ring. // rebalance removes dead shards from the ring.
@ -273,11 +273,12 @@ func (ring *Ring) Close() (retErr error) {
} }
func (ring *Ring) Pipeline() *Pipeline { func (ring *Ring) Pipeline() *Pipeline {
pipe := &Pipeline{ pipe := Pipeline{
exec: ring.pipelineExec, exec: ring.pipelineExec,
} }
pipe.commandable.process = pipe.process pipe.cmdable.process = pipe.Process
return pipe pipe.statefulCmdable.process = pipe.Process
return &pipe
} }
func (ring *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (ring *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {

View File

@ -25,7 +25,7 @@ type FailoverOptions struct {
// Following options are copied from Options struct. // Following options are copied from Options struct.
Password string Password string
DB int64 DB int
MaxRetries int MaxRetries int
@ -70,43 +70,41 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt, opt: opt,
} }
base := baseClient{ client := Client{
baseClient: baseClient{
opt: opt, opt: opt,
connPool: failover.Pool(), connPool: failover.Pool(),
onClose: func() error { onClose: func() error {
return failover.Close() return failover.Close()
}, },
}
return &Client{
baseClient: base,
commandable: commandable{
process: base.process,
}, },
} }
client.cmdable.process = client.Process
return &client
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type sentinelClient struct { type sentinelClient struct {
cmdable
baseClient baseClient
commandable
} }
func newSentinel(opt *Options) *sentinelClient { func newSentinel(opt *Options) *sentinelClient {
base := baseClient{ client := sentinelClient{
baseClient: baseClient{
opt: opt, opt: opt,
connPool: newConnPool(opt), connPool: newConnPool(opt),
},
} }
return &sentinelClient{ client.cmdable = cmdable{client.Process}
baseClient: base, return &client
commandable: commandable{process: base.process},
}
} }
func (c *sentinelClient) PubSub() *PubSub { func (c *sentinelClient) PubSub() *PubSub {
return &PubSub{ return &PubSub{
base: &baseClient{ base: baseClient{
opt: c.opt, opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false), connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
}, },

25
tx.go
View File

@ -15,23 +15,24 @@ var errDiscard = errors.New("redis: Discard can be used only inside Exec")
// by multiple goroutines, because Exec resets list of watched keys. // by multiple goroutines, because Exec resets list of watched keys.
// If you don't need WATCH it is better to use Pipeline. // If you don't need WATCH it is better to use Pipeline.
type Tx struct { type Tx struct {
commandable cmdable
statefulCmdable
base *baseClient baseClient
cmds []Cmder cmds []Cmder
closed bool closed bool
} }
func (c *Client) newTx() *Tx { func (c *Client) newTx() *Tx {
tx := &Tx{ tx := Tx{
base: &baseClient{ baseClient: baseClient{
opt: c.opt, opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true), connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
}, },
} }
tx.commandable.process = tx.process tx.cmdable.process = tx.Process
return tx tx.statefulCmdable.process = tx.Process
return &tx
} }
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
@ -49,9 +50,9 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
return retErr return retErr
} }
func (tx *Tx) process(cmd Cmder) { func (tx *Tx) Process(cmd Cmder) {
if tx.cmds == nil { if tx.cmds == nil {
tx.base.process(cmd) tx.baseClient.Process(cmd)
} else { } else {
tx.cmds = append(tx.cmds, cmd) tx.cmds = append(tx.cmds, cmd)
} }
@ -66,7 +67,7 @@ func (tx *Tx) close() error {
if err := tx.Unwatch().Err(); err != nil { if err := tx.Unwatch().Err(); err != nil {
internal.Logf("Unwatch failed: %s", err) internal.Logf("Unwatch failed: %s", err)
} }
return tx.base.Close() return tx.baseClient.Close()
} }
// Watch marks the keys to be watched for conditional execution // Watch marks the keys to be watched for conditional execution
@ -133,14 +134,14 @@ func (tx *Tx) MultiExec(fn func() error) ([]Cmder, error) {
// Strip MULTI and EXEC commands. // Strip MULTI and EXEC commands.
retCmds := cmds[1 : len(cmds)-1] retCmds := cmds[1 : len(cmds)-1]
cn, err := tx.base.conn() cn, err := tx.conn()
if err != nil { if err != nil {
setCmdsErr(retCmds, err) setCmdsErr(retCmds, err)
return retCmds, err return retCmds, err
} }
err = tx.execCmds(cn, cmds) err = tx.execCmds(cn, cmds)
tx.base.putConn(cn, err, false) tx.putConn(cn, err, false)
return retCmds, err return retCmds, err
} }