forked from mirror/redis
Improve pool tests by verifying number of created connections.
This commit is contained in:
parent
4edc7a059c
commit
0db1d730c8
|
@ -1303,6 +1303,9 @@ var _ = Describe("Commands", func() {
|
||||||
bLPop := client.BLPop(time.Second, "list1")
|
bLPop := client.BLPop(time.Second, "list1")
|
||||||
Expect(bLPop.Val()).To(BeNil())
|
Expect(bLPop.Val()).To(BeNil())
|
||||||
Expect(bLPop.Err()).To(Equal(redis.Nil))
|
Expect(bLPop.Err()).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
stats := client.Pool().Stats()
|
||||||
|
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(1)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should BRPop", func() {
|
It("should BRPop", func() {
|
||||||
|
|
4
pool.go
4
pool.go
|
@ -18,7 +18,8 @@ var (
|
||||||
// PoolStats contains pool state information and accumulated stats.
|
// PoolStats contains pool state information and accumulated stats.
|
||||||
type PoolStats struct {
|
type PoolStats struct {
|
||||||
Requests uint32 // number of times a connection was requested by the pool
|
Requests uint32 // number of times a connection was requested by the pool
|
||||||
Waits uint32 // number of times our pool had to wait for a connection
|
Hits uint32 // number of times free connection was found in the pool
|
||||||
|
Waits uint32 // number of times the pool had to wait for a connection
|
||||||
Timeouts uint32 // number of times a wait timeout occurred
|
Timeouts uint32 // number of times a wait timeout occurred
|
||||||
|
|
||||||
TotalConns uint32 // the number of total connections in the pool
|
TotalConns uint32 // the number of total connections in the pool
|
||||||
|
@ -241,6 +242,7 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
|
||||||
|
|
||||||
// Fetch first non-idle connection, if available.
|
// Fetch first non-idle connection, if available.
|
||||||
if cn = p.First(); cn != nil {
|
if cn = p.First(); cn != nil {
|
||||||
|
atomic.AddUint32(&p.stats.Hits, 1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
12
pool_test.go
12
pool_test.go
|
@ -123,6 +123,12 @@ var _ = Describe("pool", func() {
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(Equal(1))
|
Expect(pool.Len()).To(Equal(1))
|
||||||
Expect(pool.FreeLen()).To(Equal(1))
|
Expect(pool.FreeLen()).To(Equal(1))
|
||||||
|
|
||||||
|
stats := pool.Stats()
|
||||||
|
Expect(stats.Requests).To(Equal(uint32(3)))
|
||||||
|
Expect(stats.Hits).To(Equal(uint32(2)))
|
||||||
|
Expect(stats.Waits).To(Equal(uint32(0)))
|
||||||
|
Expect(stats.Timeouts).To(Equal(uint32(0)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should reuse connections", func() {
|
It("should reuse connections", func() {
|
||||||
|
@ -135,6 +141,12 @@ var _ = Describe("pool", func() {
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(Equal(1))
|
Expect(pool.Len()).To(Equal(1))
|
||||||
Expect(pool.FreeLen()).To(Equal(1))
|
Expect(pool.FreeLen()).To(Equal(1))
|
||||||
|
|
||||||
|
stats := pool.Stats()
|
||||||
|
Expect(stats.Requests).To(Equal(uint32(100)))
|
||||||
|
Expect(stats.Hits).To(Equal(uint32(99)))
|
||||||
|
Expect(stats.Waits).To(Equal(uint32(0)))
|
||||||
|
Expect(stats.Timeouts).To(Equal(uint32(0)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should unblock client when connection is removed", func() {
|
It("should unblock client when connection is removed", func() {
|
||||||
|
|
12
pubsub.go
12
pubsub.go
|
@ -245,10 +245,11 @@ func (c *PubSub) Receive() (interface{}, error) {
|
||||||
return c.ReceiveTimeout(0)
|
return c.ReceiveTimeout(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReceiveMessage returns a message or error. It automatically
|
// ReceiveMessage returns a Message or error ignoring Subscription or Pong
|
||||||
// reconnects to Redis in case of network errors.
|
// messages. It automatically reconnects to Redis Server and resubscribes
|
||||||
|
// to channels in case of network errors.
|
||||||
func (c *PubSub) ReceiveMessage() (*Message, error) {
|
func (c *PubSub) ReceiveMessage() (*Message, error) {
|
||||||
var errNum int
|
var errNum uint
|
||||||
for {
|
for {
|
||||||
msgi, err := c.ReceiveTimeout(5 * time.Second)
|
msgi, err := c.ReceiveTimeout(5 * time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -260,10 +261,9 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
|
||||||
if errNum < 3 {
|
if errNum < 3 {
|
||||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||||
err := c.Ping("")
|
err := c.Ping("")
|
||||||
if err == nil {
|
if err != nil {
|
||||||
continue
|
Logger.Printf("PubSub.Ping failed: %s", err)
|
||||||
}
|
}
|
||||||
Logger.Printf("PubSub.Ping failed: %s", err)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 3 consequent errors - connection is bad
|
// 3 consequent errors - connection is bad
|
||||||
|
|
|
@ -33,12 +33,6 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer pubsub.Close()
|
defer pubsub.Close()
|
||||||
|
|
||||||
n, err := client.Publish("mychannel1", "hello").Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(1)))
|
|
||||||
|
|
||||||
Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
{
|
{
|
||||||
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -48,6 +42,18 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(subscr.Count).To(Equal(1))
|
Expect(subscr.Count).To(Equal(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
||||||
|
Expect(err.(net.Error).Timeout()).To(Equal(true))
|
||||||
|
Expect(msgi).To(BeNil())
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := client.Publish("mychannel1", "hello").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(1)))
|
||||||
|
|
||||||
|
Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
|
||||||
|
|
||||||
{
|
{
|
||||||
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -66,11 +72,8 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(subscr.Count).To(Equal(0))
|
Expect(subscr.Count).To(Equal(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
stats := client.Pool().Stats()
|
||||||
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
|
||||||
Expect(err.(net.Error).Timeout()).To(Equal(true))
|
|
||||||
Expect(msgi).NotTo(HaveOccurred())
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should pub/sub channels", func() {
|
It("should pub/sub channels", func() {
|
||||||
|
@ -128,16 +131,6 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer pubsub.Close()
|
defer pubsub.Close()
|
||||||
|
|
||||||
n, err := client.Publish("mychannel", "hello").Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(1)))
|
|
||||||
|
|
||||||
n, err = client.Publish("mychannel2", "hello2").Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(1)))
|
|
||||||
|
|
||||||
Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
{
|
{
|
||||||
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -156,6 +149,22 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(subscr.Count).To(Equal(2))
|
Expect(subscr.Count).To(Equal(2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
||||||
|
Expect(err.(net.Error).Timeout()).To(Equal(true))
|
||||||
|
Expect(msgi).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := client.Publish("mychannel", "hello").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(1)))
|
||||||
|
|
||||||
|
n, err = client.Publish("mychannel2", "hello2").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(1)))
|
||||||
|
|
||||||
|
Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
|
||||||
|
|
||||||
{
|
{
|
||||||
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -190,11 +199,8 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(subscr.Count).To(Equal(0))
|
Expect(subscr.Count).To(Equal(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
stats := client.Pool().Stats()
|
||||||
msgi, err := pubsub.ReceiveTimeout(time.Second)
|
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
|
||||||
Expect(err.(net.Error).Timeout()).To(Equal(true))
|
|
||||||
Expect(msgi).NotTo(HaveOccurred())
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should ping/pong", func() {
|
It("should ping/pong", func() {
|
||||||
|
@ -277,6 +283,9 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(msg.Payload).To(Equal("hello"))
|
Expect(msg.Payload).To(Equal("hello"))
|
||||||
|
|
||||||
Eventually(done).Should(Receive())
|
Eventually(done).Should(Receive())
|
||||||
|
|
||||||
|
stats := client.Pool().Stats()
|
||||||
|
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
|
||||||
})
|
})
|
||||||
|
|
||||||
expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
|
expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
|
||||||
|
@ -305,6 +314,9 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(msg.Payload).To(Equal("hello"))
|
Expect(msg.Payload).To(Equal("hello"))
|
||||||
|
|
||||||
Eventually(done).Should(Receive())
|
Eventually(done).Should(Receive())
|
||||||
|
|
||||||
|
stats := client.Pool().Stats()
|
||||||
|
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2)))
|
||||||
}
|
}
|
||||||
|
|
||||||
It("Subscribe should reconnect on ReceiveMessage error", func() {
|
It("Subscribe should reconnect on ReceiveMessage error", func() {
|
||||||
|
|
Loading…
Reference in New Issue