Merge pull request #557 from felipejfc/master

pipeline now has its own interface "Pipeliner"
This commit is contained in:
Vladimir Mihailenco 2017-05-03 12:04:13 +03:00 committed by GitHub
commit e1263fea7b
16 changed files with 72 additions and 42 deletions

View File

@ -188,7 +188,7 @@ func BenchmarkPipeline(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key", "hello", 0) pipe.Set("key", "hello", 0)
pipe.Expire("key", time.Second) pipe.Expire("key", time.Second)
return nil return nil

View File

@ -674,7 +674,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
} }
} }
func (c *ClusterClient) Pipeline() *Pipeline { func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{ pipe := Pipeline{
exec: c.pipelineExec, exec: c.pipelineExec,
} }
@ -683,7 +683,7 @@ func (c *ClusterClient) Pipeline() *Pipeline {
return &pipe return &pipe
} }
func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn) return c.Pipeline().pipelined(fn)
} }
@ -797,7 +797,7 @@ func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]C
} }
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() *Pipeline { func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{ pipe := Pipeline{
exec: c.txPipelineExec, exec: c.txPipelineExec,
} }
@ -806,7 +806,7 @@ func (c *ClusterClient) TxPipeline() *Pipeline {
return &pipe return &pipe
} }
func (c *ClusterClient) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().pipelined(fn) return c.TxPipeline().pipelined(fn)
} }

View File

@ -347,7 +347,7 @@ var _ = Describe("ClusterClient", func() {
return err return err
} }
_, err = tx.Pipelined(func(pipe *redis.Pipeline) error { _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0) pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil return nil
}) })
@ -449,7 +449,7 @@ var _ = Describe("ClusterClient", func() {
Describe("Pipeline", func() { Describe("Pipeline", func() {
BeforeEach(func() { BeforeEach(func() {
pipe = client.Pipeline() pipe = client.Pipeline().(*redis.Pipeline)
}) })
AfterEach(func() { AfterEach(func() {
@ -461,7 +461,7 @@ var _ = Describe("ClusterClient", func() {
Describe("TxPipeline", func() { Describe("TxPipeline", func() {
BeforeEach(func() { BeforeEach(func() {
pipe = client.TxPipeline() pipe = client.TxPipeline().(*redis.Pipeline)
}) })
AfterEach(func() { AfterEach(func() {
@ -544,7 +544,7 @@ var _ = Describe("ClusterClient without nodes", func() {
}) })
It("pipeline returns an error", func() { It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })
@ -571,7 +571,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
}) })
It("pipeline returns an error", func() { It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })
@ -594,7 +594,7 @@ var _ = Describe("ClusterClient timeout", func() {
}) })
It("Pipeline timeouts", func() { It("Pipeline timeouts", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })
@ -612,7 +612,7 @@ var _ = Describe("ClusterClient timeout", func() {
It("Tx Pipeline timeouts", func() { It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error { _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })

View File

@ -39,8 +39,8 @@ func formatSec(dur time.Duration) int64 {
} }
type Cmdable interface { type Cmdable interface {
Pipeline() *Pipeline Pipeline() Pipeliner
Pipelined(fn func(*Pipeline) error) ([]Cmder, error) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)
Echo(message interface{}) *StringCmd Echo(message interface{}) *StringCmd
Ping() *StatusCmd Ping() *StatusCmd
@ -237,6 +237,15 @@ type Cmdable interface {
Command() *CommandsInfoCmd Command() *CommandsInfoCmd
} }
type StatefulCmdable interface {
Auth(password string) *StatusCmd
Select(index int) *StatusCmd
ClientSetName(name string) *BoolCmd
ClientGetName() *StringCmd
ReadOnly() *StatusCmd
ReadWrite() *StatusCmd
}
var _ Cmdable = (*Client)(nil) var _ Cmdable = (*Client)(nil)
var _ Cmdable = (*Tx)(nil) var _ Cmdable = (*Tx)(nil)
var _ Cmdable = (*Ring)(nil) var _ Cmdable = (*Ring)(nil)

View File

@ -27,7 +27,7 @@ var _ = Describe("Commands", func() {
Describe("server", func() { Describe("server", func() {
It("should Auth", func() { It("should Auth", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Auth("password") pipe.Auth("password")
return nil return nil
}) })

View File

@ -159,7 +159,7 @@ func ExampleClient_Scan() {
func ExampleClient_Pipelined() { func ExampleClient_Pipelined() {
var incr *redis.IntCmd var incr *redis.IntCmd
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
incr = pipe.Incr("pipelined_counter") incr = pipe.Incr("pipelined_counter")
pipe.Expire("pipelined_counter", time.Hour) pipe.Expire("pipelined_counter", time.Hour)
return nil return nil
@ -187,7 +187,7 @@ func ExampleClient_Pipeline() {
func ExampleClient_TxPipelined() { func ExampleClient_TxPipelined() {
var incr *redis.IntCmd var incr *redis.IntCmd
_, err := client.TxPipelined(func(pipe *redis.Pipeline) error { _, err := client.TxPipelined(func(pipe redis.Pipeliner) error {
incr = pipe.Incr("tx_pipelined_counter") incr = pipe.Incr("tx_pipelined_counter")
pipe.Expire("tx_pipelined_counter", time.Hour) pipe.Expire("tx_pipelined_counter", time.Hour)
return nil return nil
@ -226,7 +226,7 @@ func ExampleClient_Watch() {
return err return err
} }
_, err = tx.Pipelined(func(pipe *redis.Pipeline) error { _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0) pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil return nil
}) })

View File

@ -9,6 +9,19 @@ import (
type pipelineExecer func([]Cmder) error type pipelineExecer func([]Cmder) error
type Pipeliner interface {
Cmdable
StatefulCmdable
Process(cmd Cmder) error
Close() error
Discard() error
discard() error
Exec() ([]Cmder, error)
pipelined(fn func(Pipeliner) error) ([]Cmder, error)
}
var _ Pipeliner = (*Pipeline)(nil)
// Pipeline implements pipelining as described in // Pipeline implements pipelining as described in
// http://redis.io/topics/pipelining. It's safe for concurrent use // http://redis.io/topics/pipelining. It's safe for concurrent use
// by multiple goroutines. // by multiple goroutines.
@ -78,7 +91,7 @@ func (c *Pipeline) Exec() ([]Cmder, error) {
return cmds, c.exec(cmds) return cmds, c.exec(cmds)
} }
func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
if err := fn(c); err != nil { if err := fn(c); err != nil {
return nil, err return nil, err
} }
@ -86,3 +99,11 @@ func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
_ = c.Close() _ = c.Close()
return cmds, err return cmds, err
} }
func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.pipelined(fn)
}
func (c *Pipeline) Pipeline() Pipeliner {
return c
}

View File

@ -22,7 +22,7 @@ var _ = Describe("pipelining", func() {
It("supports block style", func() { It("supports block style", func() {
var get *redis.StringCmd var get *redis.StringCmd
cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error { cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error {
get = pipe.Get("foo") get = pipe.Get("foo")
return nil return nil
}) })
@ -63,7 +63,7 @@ var _ = Describe("pipelining", func() {
Describe("Pipeline", func() { Describe("Pipeline", func() {
BeforeEach(func() { BeforeEach(func() {
pipe = client.Pipeline() pipe = client.Pipeline().(*redis.Pipeline)
}) })
assertPipeline() assertPipeline()

View File

@ -39,7 +39,7 @@ var _ = Describe("pool", func() {
var ping *redis.StatusCmd var ping *redis.StatusCmd
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error { cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
ping = pipe.Ping() ping = pipe.Ping()
return nil return nil
}) })

View File

@ -193,7 +193,7 @@ var _ = Describe("races", func() {
num, err := strconv.ParseInt(val, 10, 64) num, err := strconv.ParseInt(val, 10, 64)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error { cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key", strconv.FormatInt(num+1, 10), 0) pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
return nil return nil
}) })

View File

@ -61,7 +61,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Temp client for Auth and Select. // Temp client for Auth and Select.
client := newClient(c.opt, pool.NewSingleConnPool(cn)) client := newClient(c.opt, pool.NewSingleConnPool(cn))
_, err := client.Pipelined(func(pipe *Pipeline) error { _, err := client.Pipelined(func(pipe Pipeliner) error {
if c.opt.Password != "" { if c.opt.Password != "" {
pipe.Auth(c.opt.Password) pipe.Auth(c.opt.Password)
} }
@ -324,11 +324,11 @@ func (c *Client) PoolStats() *PoolStats {
} }
} }
func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn) return c.Pipeline().pipelined(fn)
} }
func (c *Client) Pipeline() *Pipeline { func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{ pipe := Pipeline{
exec: c.pipelineExecer(c.pipelineProcessCmds), exec: c.pipelineExecer(c.pipelineProcessCmds),
} }
@ -337,7 +337,7 @@ func (c *Client) Pipeline() *Pipeline {
return &pipe return &pipe
} }
func (c *Client) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().pipelined(fn) return c.TxPipeline().pipelined(fn)
} }

View File

@ -68,7 +68,7 @@ var _ = Describe("Client", func() {
It("should close Tx without closing the client", func() { It("should close Tx without closing the client", func() {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error { _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })
@ -232,7 +232,7 @@ var _ = Describe("Client timeout", func() {
}) })
It("Pipeline timeouts", func() { It("Pipeline timeouts", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error { _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })
@ -263,7 +263,7 @@ var _ = Describe("Client timeout", func() {
It("Tx Pipeline timeouts", func() { It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error { _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })

View File

@ -381,7 +381,7 @@ func (c *Ring) Close() error {
return firstErr return firstErr
} }
func (c *Ring) Pipeline() *Pipeline { func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{ pipe := Pipeline{
exec: c.pipelineExec, exec: c.pipelineExec,
} }
@ -390,7 +390,7 @@ func (c *Ring) Pipeline() *Pipeline {
return &pipe return &pipe
} }
func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn) return c.Pipeline().pipelined(fn)
} }

View File

@ -137,7 +137,7 @@ var _ = Describe("Redis Ring", func() {
keys = append(keys, string(key)) keys = append(keys, string(key))
} }
_, err := ring.Pipelined(func(pipe *redis.Pipeline) error { _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
for _, key := range keys { for _, key := range keys {
pipe.Set(key, "value", 0).Err() pipe.Set(key, "value", 0).Err()
} }
@ -153,7 +153,7 @@ var _ = Describe("Redis Ring", func() {
}) })
It("supports hash tags", func() { It("supports hash tags", func() {
_, err := ring.Pipelined(func(pipe *redis.Pipeline) error { _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err() pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
} }
@ -184,7 +184,7 @@ var _ = Describe("empty Redis Ring", func() {
}) })
It("pipeline returns an error", func() { It("pipeline returns an error", func() {
_, err := ring.Pipelined(func(pipe *redis.Pipeline) error { _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })

4
tx.go
View File

@ -76,7 +76,7 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
return cmd return cmd
} }
func (c *Tx) Pipeline() *Pipeline { func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{ pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds), exec: c.pipelineExecer(c.txPipelineProcessCmds),
} }
@ -94,6 +94,6 @@ func (c *Tx) Pipeline() *Pipeline {
// Exec always returns list of commands. If transaction fails // Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns error of the first // TxFailedErr is returned. Otherwise Exec returns error of the first
// failed command or nil. // failed command or nil.
func (c *Tx) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn) return c.Pipeline().pipelined(fn)
} }

View File

@ -33,7 +33,7 @@ var _ = Describe("Tx", func() {
return err return err
} }
_, err = tx.Pipelined(func(pipe *redis.Pipeline) error { _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0) pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil return nil
}) })
@ -65,7 +65,7 @@ var _ = Describe("Tx", func() {
It("should discard", func() { It("should discard", func() {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error { cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key1", "hello1", 0) pipe.Set("key1", "hello1", 0)
pipe.Discard() pipe.Discard()
pipe.Set("key2", "hello2", 0) pipe.Set("key2", "hello2", 0)
@ -88,7 +88,7 @@ var _ = Describe("Tx", func() {
It("returns an error when there are no commands", func() { It("returns an error when there are no commands", func() {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(*redis.Pipeline) error { return nil }) _, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
return err return err
}) })
Expect(err).To(MatchError("redis: pipeline is empty")) Expect(err).To(MatchError("redis: pipeline is empty"))
@ -102,7 +102,7 @@ var _ = Describe("Tx", func() {
const N = 20000 const N = 20000
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error { cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
pipe.Incr("key") pipe.Incr("key")
} }
@ -133,7 +133,7 @@ var _ = Describe("Tx", func() {
do := func() error { do := func() error {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error { _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping() pipe.Ping()
return nil return nil
}) })