From dd4ef4e9cbc18066a373ec4115517e363d25d276 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sun, 12 Jan 2020 14:19:21 +0200 Subject: [PATCH] Change Tx.Pipeline and Tx.TxPipeline meaning --- CHANGELOG.md | 1 + cluster_test.go | 4 ++-- example_test.go | 2 +- race_test.go | 2 +- redis_test.go | 4 ++-- ring_test.go | 16 ++++++++-------- tx.go | 51 +++++++++++++++++++++++++++---------------------- tx_test.go | 10 +++++----- 8 files changed, 48 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd9af8f8..5a5038fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## v7 WIP +- Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline. - WrapProcess is replaced with more convenient AddHook that has access to context.Context. - WithContext now can not be used to create a shallow copy of the client. - New methods ProcessContext, DoContext, and ExecContext. diff --git a/cluster_test.go b/cluster_test.go index f4b7e366..05eaeb4d 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -360,7 +360,7 @@ var _ = Describe("ClusterClient", func() { return err } - _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set(key, strconv.FormatInt(n+1, 10), 0) return nil }) @@ -1009,7 +1009,7 @@ var _ = Describe("ClusterClient timeout", func() { It("Tx Pipeline timeouts", func() { err := client.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Ping() return nil }) diff --git a/example_test.go b/example_test.go index d643b941..b38f71f6 100644 --- a/example_test.go +++ b/example_test.go @@ -314,7 +314,7 @@ func ExampleClient_Watch() { n++ // runs only if the watched keys remain unchanged - _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { // pipe handles the error case pipe.Set(key, n, 0) return nil diff --git a/race_test.go b/race_test.go index 088b0086..afe06cf8 100644 --- a/race_test.go +++ b/race_test.go @@ -194,7 +194,7 @@ var _ = Describe("races", func() { num, err := strconv.ParseInt(val, 10, 64) Expect(err).NotTo(HaveOccurred()) - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set("key", strconv.FormatInt(num+1, 10), 0) return nil }) diff --git a/redis_test.go b/redis_test.go index cd9cd1be..b74281d5 100644 --- a/redis_test.go +++ b/redis_test.go @@ -79,7 +79,7 @@ var _ = Describe("Client", func() { It("should close Tx without closing the client", func() { err := client.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Ping() return nil }) @@ -286,7 +286,7 @@ var _ = Describe("Client timeout", func() { It("Tx Pipeline timeouts", func() { err := client.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Ping() return nil }) diff --git a/ring_test.go b/ring_test.go index e35daeb8..ac9d9a9e 100644 --- a/ring_test.go +++ b/ring_test.go @@ -253,7 +253,7 @@ var _ = Describe("Ring watch", func() { return err } - _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set(key, strconv.FormatInt(n+1, 10), 0) return nil }) @@ -285,7 +285,7 @@ var _ = Describe("Ring watch", func() { It("should discard", func() { err := ring.Watch(func(tx *redis.Tx) error { - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set("key1", "hello1", 0) pipe.Discard() pipe.Set("key2", "hello2", 0) @@ -308,7 +308,7 @@ var _ = Describe("Ring watch", func() { It("returns no error when there are no commands", func() { err := ring.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(redis.Pipeliner) error { return nil }) + _, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil }) return err }, "key") Expect(err).NotTo(HaveOccurred()) @@ -322,7 +322,7 @@ var _ = Describe("Ring watch", func() { const N = 20000 err := ring.Watch(func(tx *redis.Tx) error { - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { for i := 0; i < N; i++ { pipe.Incr("key") } @@ -358,7 +358,7 @@ var _ = Describe("Ring watch", func() { num, err := strconv.ParseInt(val, 10, 64) Expect(err).NotTo(HaveOccurred()) - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set("key", strconv.FormatInt(num+1, 10), 0) return nil }) @@ -380,7 +380,7 @@ var _ = Describe("Ring watch", func() { It("should close Tx without closing the client", func() { err := ring.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Ping() return nil }) @@ -396,7 +396,7 @@ var _ = Describe("Ring watch", func() { var ping *redis.StatusCmd err := ring.Watch(func(tx *redis.Tx) error { - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { ping = pipe.Ping() return nil }) @@ -443,7 +443,7 @@ var _ = Describe("Ring Tx timeout", func() { It("Tx Pipeline timeouts", func() { err := ring.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Ping() return nil }) diff --git a/tx.go b/tx.go index 15de9276..fd89a7e6 100644 --- a/tx.go +++ b/tx.go @@ -116,8 +116,35 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd { return cmd } -// Pipeline creates a new pipeline. It is more convenient to use Pipelined. func (c *Tx) Pipeline() Pipeliner { + pipe := Pipeline{ + ctx: c.ctx, + exec: func(ctx context.Context, cmds []Cmder) error { + return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline) + }, + } + pipe.init() + return &pipe +} + +func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipeline().Pipelined(fn) +} + +// TxPipelined executes commands queued in the fn in a transaction. +// +// When using WATCH, EXEC will execute commands only if the watched keys +// were not modified, allowing for a check-and-set mechanism. +// +// Exec always returns list of commands. If transaction fails +// TxFailedErr is returned. Otherwise Exec returns an error of the first +// failed command or nil. +func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.TxPipeline().Pipelined(fn) +} + +// TxPipeline creates a new pipeline. Usually it is more convenient to use TxPipelined. +func (c *Tx) TxPipeline() Pipeliner { pipe := Pipeline{ ctx: c.ctx, exec: func(ctx context.Context, cmds []Cmder) error { @@ -127,25 +154,3 @@ func (c *Tx) Pipeline() Pipeliner { pipe.init() return &pipe } - -// Pipelined executes commands queued in the fn in a transaction. -// -// When using WATCH, EXEC will execute commands only if the watched keys -// were not modified, allowing for a check-and-set mechanism. -// -// Exec always returns list of commands. If transaction fails -// TxFailedErr is returned. Otherwise Exec returns an error of the first -// failed command or nil. -func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().Pipelined(fn) -} - -// TxPipelined is an alias for Pipelined. -func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipelined(fn) -} - -// TxPipeline is an alias for Pipeline. -func (c *Tx) TxPipeline() Pipeliner { - return c.Pipeline() -} diff --git a/tx_test.go b/tx_test.go index dd109722..e8e7d83a 100644 --- a/tx_test.go +++ b/tx_test.go @@ -34,7 +34,7 @@ var _ = Describe("Tx", func() { return err } - _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set(key, strconv.FormatInt(n+1, 10), 0) return nil }) @@ -66,7 +66,7 @@ var _ = Describe("Tx", func() { It("should discard", func() { err := client.Watch(func(tx *redis.Tx) error { - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Set("key1", "hello1", 0) pipe.Discard() pipe.Set("key2", "hello2", 0) @@ -89,7 +89,7 @@ var _ = Describe("Tx", func() { It("returns no error when there are no commands", func() { err := client.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(redis.Pipeliner) error { return nil }) + _, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil }) return err }) Expect(err).NotTo(HaveOccurred()) @@ -103,7 +103,7 @@ var _ = Describe("Tx", func() { const N = 20000 err := client.Watch(func(tx *redis.Tx) error { - cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { for i := 0; i < N; i++ { pipe.Incr("key") } @@ -133,7 +133,7 @@ var _ = Describe("Tx", func() { do := func() error { err := client.Watch(func(tx *redis.Tx) error { - _, err := tx.Pipelined(func(pipe redis.Pipeliner) error { + _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error { pipe.Ping() return nil })