Merge pull request #1234 from go-redis/fix/tx-pipeline

Change Tx.Pipeline and Tx.TxPipeline meaning
This commit is contained in:
Vladimir Mihailenco 2020-01-12 14:45:23 +02:00 committed by GitHub
commit a4bad76db9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 48 additions and 42 deletions

View File

@ -2,6 +2,7 @@
## v7 WIP
- Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline.
- WrapProcess is replaced with more convenient AddHook that has access to context.Context.
- WithContext now can not be used to create a shallow copy of the client.
- New methods ProcessContext, DoContext, and ExecContext.

View File

@ -360,7 +360,7 @@ var _ = Describe("ClusterClient", func() {
return err
}
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
@ -1009,7 +1009,7 @@ var _ = Describe("ClusterClient timeout", func() {
It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})

View File

@ -314,7 +314,7 @@ func ExampleClient_Watch() {
n++
// runs only if the watched keys remain unchanged
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
// pipe handles the error case
pipe.Set(key, n, 0)
return nil

View File

@ -194,7 +194,7 @@ var _ = Describe("races", func() {
num, err := strconv.ParseInt(val, 10, 64)
Expect(err).NotTo(HaveOccurred())
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
return nil
})

View File

@ -79,7 +79,7 @@ var _ = Describe("Client", func() {
It("should close Tx without closing the client", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
@ -286,7 +286,7 @@ var _ = Describe("Client timeout", func() {
It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})

View File

@ -253,7 +253,7 @@ var _ = Describe("Ring watch", func() {
return err
}
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
@ -285,7 +285,7 @@ var _ = Describe("Ring watch", func() {
It("should discard", func() {
err := ring.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key1", "hello1", 0)
pipe.Discard()
pipe.Set("key2", "hello2", 0)
@ -308,7 +308,7 @@ var _ = Describe("Ring watch", func() {
It("returns no error when there are no commands", func() {
err := ring.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
_, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
return err
}, "key")
Expect(err).NotTo(HaveOccurred())
@ -322,7 +322,7 @@ var _ = Describe("Ring watch", func() {
const N = 20000
err := ring.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
for i := 0; i < N; i++ {
pipe.Incr("key")
}
@ -358,7 +358,7 @@ var _ = Describe("Ring watch", func() {
num, err := strconv.ParseInt(val, 10, 64)
Expect(err).NotTo(HaveOccurred())
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
return nil
})
@ -380,7 +380,7 @@ var _ = Describe("Ring watch", func() {
It("should close Tx without closing the client", func() {
err := ring.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
@ -396,7 +396,7 @@ var _ = Describe("Ring watch", func() {
var ping *redis.StatusCmd
err := ring.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
ping = pipe.Ping()
return nil
})
@ -443,7 +443,7 @@ var _ = Describe("Ring Tx timeout", func() {
It("Tx Pipeline timeouts", func() {
err := ring.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})

51
tx.go
View File

@ -116,8 +116,35 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
return cmd
}
// Pipeline creates a new pipeline. It is more convenient to use Pipelined.
func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: func(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
},
}
pipe.init()
return &pipe
}
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
// TxPipelined executes commands queued in the fn in a transaction.
//
// When using WATCH, EXEC will execute commands only if the watched keys
// were not modified, allowing for a check-and-set mechanism.
//
// Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns an error of the first
// failed command or nil.
func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(fn)
}
// TxPipeline creates a new pipeline. Usually it is more convenient to use TxPipelined.
func (c *Tx) TxPipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: func(ctx context.Context, cmds []Cmder) error {
@ -127,25 +154,3 @@ func (c *Tx) Pipeline() Pipeliner {
pipe.init()
return &pipe
}
// Pipelined executes commands queued in the fn in a transaction.
//
// When using WATCH, EXEC will execute commands only if the watched keys
// were not modified, allowing for a check-and-set mechanism.
//
// Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns an error of the first
// failed command or nil.
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
// TxPipelined is an alias for Pipelined.
func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipelined(fn)
}
// TxPipeline is an alias for Pipeline.
func (c *Tx) TxPipeline() Pipeliner {
return c.Pipeline()
}

View File

@ -34,7 +34,7 @@ var _ = Describe("Tx", func() {
return err
}
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
@ -66,7 +66,7 @@ var _ = Describe("Tx", func() {
It("should discard", func() {
err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key1", "hello1", 0)
pipe.Discard()
pipe.Set("key2", "hello2", 0)
@ -89,7 +89,7 @@ var _ = Describe("Tx", func() {
It("returns no error when there are no commands", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
_, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
return err
})
Expect(err).NotTo(HaveOccurred())
@ -103,7 +103,7 @@ var _ = Describe("Tx", func() {
const N = 20000
err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
for i := 0; i < N; i++ {
pipe.Incr("key")
}
@ -133,7 +133,7 @@ var _ = Describe("Tx", func() {
do := func() error {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})