redis/pubsub_test.go

366 lines
9.5 KiB
Go
Raw Normal View History

2015-01-15 18:51:22 +03:00
package redis_test
import (
"io"
2015-01-15 18:51:22 +03:00
"net"
2015-11-22 15:44:38 +03:00
"sync"
2015-01-15 18:51:22 +03:00
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
2015-05-14 15:19:29 +03:00
"gopkg.in/redis.v3"
2015-01-15 18:51:22 +03:00
)
var _ = Describe("PubSub", func() {
var client *redis.Client
2015-09-06 13:50:16 +03:00
readTimeout := 3 * time.Second
2015-01-15 18:51:22 +03:00
BeforeEach(func() {
2015-05-02 16:19:22 +03:00
client = redis.NewClient(&redis.Options{
2015-09-06 13:50:16 +03:00
Addr: redisAddr,
ReadTimeout: readTimeout,
2015-01-15 18:51:22 +03:00
})
2015-07-11 13:12:47 +03:00
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
2015-01-15 18:51:22 +03:00
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should support pattern matching", func() {
2015-07-11 13:42:44 +03:00
pubsub, err := client.PSubscribe("mychannel*")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
2015-01-15 18:51:22 +03:00
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Subscription)
Expect(subscr.Kind).To(Equal("psubscribe"))
Expect(subscr.Channel).To(Equal("mychannel*"))
Expect(subscr.Count).To(Equal(1))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err.(net.Error).Timeout()).To(Equal(true))
Expect(msgi).To(BeNil())
}
n, err := client.Publish("mychannel1", "hello").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
2015-01-15 18:51:22 +03:00
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.PMessage)
Expect(subscr.Channel).To(Equal("mychannel1"))
Expect(subscr.Pattern).To(Equal("mychannel*"))
Expect(subscr.Payload).To(Equal("hello"))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Subscription)
Expect(subscr.Kind).To(Equal("punsubscribe"))
Expect(subscr.Channel).To(Equal("mychannel*"))
Expect(subscr.Count).To(Equal(0))
}
stats := client.Pool().Stats()
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
2015-01-15 18:51:22 +03:00
})
It("should pub/sub channels", func() {
channels, err := client.PubSubChannels("mychannel*").Result()
Expect(err).NotTo(HaveOccurred())
Expect(channels).To(BeEmpty())
2015-07-11 13:42:44 +03:00
pubsub, err := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
2015-01-15 18:51:22 +03:00
channels, err = client.PubSubChannels("mychannel*").Result()
Expect(err).NotTo(HaveOccurred())
Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
channels, err = client.PubSubChannels("").Result()
Expect(err).NotTo(HaveOccurred())
Expect(channels).To(BeEmpty())
channels, err = client.PubSubChannels("*").Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(channels)).To(BeNumerically(">=", 2))
})
It("should return the numbers of subscribers", func() {
2015-07-11 13:42:44 +03:00
pubsub, err := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
2015-01-15 18:51:22 +03:00
channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
Expect(err).NotTo(HaveOccurred())
2015-01-25 15:05:19 +03:00
Expect(channels).To(Equal(map[string]int64{
"mychannel": 1,
"mychannel2": 1,
"mychannel3": 0,
2015-01-15 18:51:22 +03:00
}))
})
It("should return the numbers of subscribers by pattern", func() {
num, err := client.PubSubNumPat().Result()
Expect(err).NotTo(HaveOccurred())
Expect(num).To(Equal(int64(0)))
2015-07-11 13:42:44 +03:00
pubsub, err := client.PSubscribe("*")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
2015-01-15 18:51:22 +03:00
num, err = client.PubSubNumPat().Result()
Expect(err).NotTo(HaveOccurred())
Expect(num).To(Equal(int64(1)))
})
It("should pub/sub", func() {
2015-07-11 13:42:44 +03:00
pubsub, err := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
2015-01-15 18:51:22 +03:00
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Subscription)
Expect(subscr.Kind).To(Equal("subscribe"))
Expect(subscr.Channel).To(Equal("mychannel"))
Expect(subscr.Count).To(Equal(1))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Subscription)
Expect(subscr.Kind).To(Equal("subscribe"))
Expect(subscr.Channel).To(Equal("mychannel2"))
Expect(subscr.Count).To(Equal(2))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err.(net.Error).Timeout()).To(Equal(true))
Expect(msgi).NotTo(HaveOccurred())
}
n, err := client.Publish("mychannel", "hello").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
n, err = client.Publish("mychannel2", "hello2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
2015-01-15 18:51:22 +03:00
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Message)
Expect(subscr.Channel).To(Equal("mychannel"))
Expect(subscr.Payload).To(Equal("hello"))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
msg := msgi.(*redis.Message)
Expect(msg.Channel).To(Equal("mychannel2"))
Expect(msg.Payload).To(Equal("hello2"))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Subscription)
Expect(subscr.Kind).To(Equal("unsubscribe"))
Expect(subscr.Channel).To(Equal("mychannel"))
Expect(subscr.Count).To(Equal(1))
}
{
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
subscr := msgi.(*redis.Subscription)
Expect(subscr.Kind).To(Equal("unsubscribe"))
Expect(subscr.Channel).To(Equal("mychannel2"))
Expect(subscr.Count).To(Equal(0))
}
stats := client.Pool().Stats()
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
2015-01-15 18:51:22 +03:00
})
2015-07-11 13:12:47 +03:00
It("should ping/pong", func() {
2015-07-11 13:42:44 +03:00
pubsub, err := client.Subscribe("mychannel")
2015-07-11 13:12:47 +03:00
Expect(err).NotTo(HaveOccurred())
2015-07-11 13:42:44 +03:00
defer pubsub.Close()
2015-07-11 13:12:47 +03:00
_, err = pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("")
Expect(err).NotTo(HaveOccurred())
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
pong := msgi.(*redis.Pong)
Expect(pong.Payload).To(Equal(""))
})
It("should ping/pong with payload", func() {
2015-07-11 13:42:44 +03:00
pubsub, err := client.Subscribe("mychannel")
2015-07-11 13:12:47 +03:00
Expect(err).NotTo(HaveOccurred())
2015-07-11 13:42:44 +03:00
defer pubsub.Close()
2015-07-11 13:12:47 +03:00
_, err = pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("hello")
Expect(err).NotTo(HaveOccurred())
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
pong := msgi.(*redis.Pong)
Expect(pong.Payload).To(Equal("hello"))
})
It("should multi-ReceiveMessage", func() {
2015-09-06 13:50:16 +03:00
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
err = client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred())
err = client.Publish("mychannel", "world").Err()
Expect(err).NotTo(HaveOccurred())
msg, err := pubsub.ReceiveMessage()
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("hello"))
msg, err = pubsub.ReceiveMessage()
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("world"))
})
It("should ReceiveMessage after timeout", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
done := make(chan bool, 1)
2015-09-06 13:50:16 +03:00
go func() {
defer GinkgoRecover()
defer func() {
done <- true
}()
2015-09-06 13:50:16 +03:00
time.Sleep(5*time.Second + 100*time.Millisecond)
2015-09-06 13:50:16 +03:00
n, err := client.Publish("mychannel", "hello").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
}()
msg, err := pubsub.ReceiveMessage()
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("hello"))
2015-11-26 18:04:26 +03:00
Eventually(done).Should(Receive())
stats := client.Pool().Stats()
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
2015-09-06 13:50:16 +03:00
})
expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
2015-12-02 16:40:44 +03:00
cn1, _, err := pubsub.Pool().Get()
2015-09-06 13:50:16 +03:00
Expect(err).NotTo(HaveOccurred())
2015-12-02 16:40:44 +03:00
cn1.SetNetConn(&badConn{
readErr: io.EOF,
writeErr: io.EOF,
2015-09-06 13:50:16 +03:00
})
done := make(chan bool, 1)
2015-09-06 13:50:16 +03:00
go func() {
defer GinkgoRecover()
defer func() {
done <- true
}()
2015-09-06 13:50:16 +03:00
time.Sleep(100 * time.Millisecond)
2015-11-22 15:44:38 +03:00
err := client.Publish("mychannel", "hello").Err()
2015-09-06 13:50:16 +03:00
Expect(err).NotTo(HaveOccurred())
}()
msg, err := pubsub.ReceiveMessage()
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("hello"))
2015-11-22 15:44:38 +03:00
Eventually(done).Should(Receive())
stats := client.Pool().Stats()
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
}
It("Subscribe should reconnect on ReceiveMessage error", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
expectReceiveMessageOnError(pubsub)
})
It("PSubscribe should reconnect on ReceiveMessage error", func() {
pubsub, err := client.PSubscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
expectReceiveMessageOnError(pubsub)
2015-09-06 13:50:16 +03:00
})
2015-12-02 16:40:44 +03:00
It("should return on Close", func() {
2015-11-26 18:04:26 +03:00
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer GinkgoRecover()
wg.Done()
2015-12-02 16:40:44 +03:00
2015-11-26 18:04:26 +03:00
_, err := pubsub.ReceiveMessage()
Expect(err).To(MatchError("redis: client is closed"))
2015-12-02 16:40:44 +03:00
wg.Done()
2015-11-26 18:04:26 +03:00
}()
2015-12-02 16:40:44 +03:00
2015-11-26 18:04:26 +03:00
wg.Wait()
2015-12-02 16:40:44 +03:00
wg.Add(1)
2015-11-26 18:04:26 +03:00
err = pubsub.Close()
Expect(err).NotTo(HaveOccurred())
2015-12-02 16:40:44 +03:00
wg.Wait()
2015-11-26 18:04:26 +03:00
})
2015-01-15 18:51:22 +03:00
})