package redis_test import ( "strconv" "sync" "gopkg.in/redis.v5" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var _ = Describe("Pipeline", func() { var client *redis.Client BeforeEach(func() { client = redis.NewClient(redisOptions()) Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) }) AfterEach(func() { Expect(client.Close()).NotTo(HaveOccurred()) }) It("should pipeline", func() { set := client.Set("key2", "hello2", 0) Expect(set.Err()).NotTo(HaveOccurred()) Expect(set.Val()).To(Equal("OK")) pipeline := client.Pipeline() set = pipeline.Set("key1", "hello1", 0) get := pipeline.Get("key2") incr := pipeline.Incr("key3") getNil := pipeline.Get("key4") cmds, err := pipeline.Exec() Expect(err).To(Equal(redis.Nil)) Expect(cmds).To(HaveLen(4)) Expect(pipeline.Close()).NotTo(HaveOccurred()) Expect(set.Err()).NotTo(HaveOccurred()) Expect(set.Val()).To(Equal("OK")) Expect(get.Err()).NotTo(HaveOccurred()) Expect(get.Val()).To(Equal("hello2")) Expect(incr.Err()).NotTo(HaveOccurred()) Expect(incr.Val()).To(Equal(int64(1))) Expect(getNil.Err()).To(Equal(redis.Nil)) Expect(getNil.Val()).To(Equal("")) }) It("discards queued commands", func() { pipeline := client.Pipeline() pipeline.Get("key") pipeline.Discard() _, err := pipeline.Exec() Expect(err).To(MatchError("redis: pipeline is empty")) }) It("should support block style", func() { var get *redis.StringCmd cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error { get = pipe.Get("foo") return nil }) Expect(err).To(Equal(redis.Nil)) Expect(cmds).To(HaveLen(1)) Expect(cmds[0]).To(Equal(get)) Expect(get.Err()).To(Equal(redis.Nil)) Expect(get.Val()).To(Equal("")) }) It("should handle vals/err", func() { pipeline := client.Pipeline() get := pipeline.Get("key") Expect(get.Err()).NotTo(HaveOccurred()) Expect(get.Val()).To(Equal("")) Expect(pipeline.Close()).NotTo(HaveOccurred()) }) It("returns an error when there are no commands", func() { pipeline := client.Pipeline() _, err := pipeline.Exec() Expect(err).To(MatchError("redis: pipeline is empty")) }) It("should increment correctly", func() { const N = 20000 key := "TestPipelineIncr" pipeline := client.Pipeline() for i := 0; i < N; i++ { pipeline.Incr(key) } cmds, err := pipeline.Exec() Expect(err).NotTo(HaveOccurred()) Expect(pipeline.Close()).NotTo(HaveOccurred()) Expect(len(cmds)).To(Equal(20000)) for _, cmd := range cmds { Expect(cmd.Err()).NotTo(HaveOccurred()) } get := client.Get(key) Expect(get.Err()).NotTo(HaveOccurred()) Expect(get.Val()).To(Equal(strconv.Itoa(N))) }) It("should PipelineEcho", func() { const N = 1000 wg := &sync.WaitGroup{} wg.Add(N) for i := 0; i < N; i++ { go func(i int) { defer GinkgoRecover() defer wg.Done() pipeline := client.Pipeline() msg1 := "echo" + strconv.Itoa(i) msg2 := "echo" + strconv.Itoa(i+1) echo1 := pipeline.Echo(msg1) echo2 := pipeline.Echo(msg2) cmds, err := pipeline.Exec() Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(2)) Expect(echo1.Err()).NotTo(HaveOccurred()) Expect(echo1.Val()).To(Equal(msg1)) Expect(echo2.Err()).NotTo(HaveOccurred()) Expect(echo2.Val()).To(Equal(msg2)) Expect(pipeline.Close()).NotTo(HaveOccurred()) }(i) } wg.Wait() }) It("should be thread-safe", func() { const N = 1000 pipeline := client.Pipeline() var wg sync.WaitGroup wg.Add(N) for i := 0; i < N; i++ { go func() { defer GinkgoRecover() pipeline.Ping() wg.Done() }() } wg.Wait() cmds, err := pipeline.Exec() Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(N)) Expect(pipeline.Close()).NotTo(HaveOccurred()) }) })