diff --git a/commands_test.go b/commands_test.go index 334ddd18..49d8488b 100644 --- a/commands_test.go +++ b/commands_test.go @@ -1303,6 +1303,9 @@ var _ = Describe("Commands", func() { bLPop := client.BLPop(time.Second, "list1") Expect(bLPop.Val()).To(BeNil()) Expect(bLPop.Err()).To(Equal(redis.Nil)) + + stats := client.Pool().Stats() + Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(1))) }) It("should BRPop", func() { diff --git a/pool.go b/pool.go index bb713bf4..3725c408 100644 --- a/pool.go +++ b/pool.go @@ -18,7 +18,8 @@ var ( // PoolStats contains pool state information and accumulated stats. type PoolStats struct { Requests uint32 // number of times a connection was requested by the pool - Waits uint32 // number of times our pool had to wait for a connection + Hits uint32 // number of times free connection was found in the pool + Waits uint32 // number of times the pool had to wait for a connection Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // the number of total connections in the pool @@ -241,6 +242,7 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) { // Fetch first non-idle connection, if available. if cn = p.First(); cn != nil { + atomic.AddUint32(&p.stats.Hits, 1) return } diff --git a/pool_test.go b/pool_test.go index 4d787a68..bc88c5fc 100644 --- a/pool_test.go +++ b/pool_test.go @@ -123,6 +123,12 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(Equal(1)) Expect(pool.FreeLen()).To(Equal(1)) + + stats := pool.Stats() + Expect(stats.Requests).To(Equal(uint32(3))) + Expect(stats.Hits).To(Equal(uint32(2))) + Expect(stats.Waits).To(Equal(uint32(0))) + Expect(stats.Timeouts).To(Equal(uint32(0))) }) It("should reuse connections", func() { @@ -135,6 +141,12 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(Equal(1)) Expect(pool.FreeLen()).To(Equal(1)) + + stats := pool.Stats() + Expect(stats.Requests).To(Equal(uint32(100))) + Expect(stats.Hits).To(Equal(uint32(99))) + Expect(stats.Waits).To(Equal(uint32(0))) + Expect(stats.Timeouts).To(Equal(uint32(0))) }) It("should unblock client when connection is removed", func() { diff --git a/pubsub.go b/pubsub.go index 1b422ec8..c1fb4628 100644 --- a/pubsub.go +++ b/pubsub.go @@ -245,10 +245,11 @@ func (c *PubSub) Receive() (interface{}, error) { return c.ReceiveTimeout(0) } -// ReceiveMessage returns a message or error. It automatically -// reconnects to Redis in case of network errors. +// ReceiveMessage returns a Message or error ignoring Subscription or Pong +// messages. It automatically reconnects to Redis Server and resubscribes +// to channels in case of network errors. func (c *PubSub) ReceiveMessage() (*Message, error) { - var errNum int + var errNum uint for { msgi, err := c.ReceiveTimeout(5 * time.Second) if err != nil { @@ -260,10 +261,9 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { if errNum < 3 { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { err := c.Ping("") - if err == nil { - continue + if err != nil { + Logger.Printf("PubSub.Ping failed: %s", err) } - Logger.Printf("PubSub.Ping failed: %s", err) } } else { // 3 consequent errors - connection is bad diff --git a/pubsub_test.go b/pubsub_test.go index 36c75c38..669c0737 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -33,12 +33,6 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) defer pubsub.Close() - n, err := client.Publish("mychannel1", "hello").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(1))) - - Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred()) - { msgi, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) @@ -48,6 +42,18 @@ var _ = Describe("PubSub", func() { Expect(subscr.Count).To(Equal(1)) } + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err.(net.Error).Timeout()).To(Equal(true)) + Expect(msgi).To(BeNil()) + } + + n, err := client.Publish("mychannel1", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred()) + { msgi, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) @@ -66,11 +72,8 @@ var _ = Describe("PubSub", func() { Expect(subscr.Count).To(Equal(0)) } - { - msgi, err := pubsub.ReceiveTimeout(time.Second) - Expect(err.(net.Error).Timeout()).To(Equal(true)) - Expect(msgi).NotTo(HaveOccurred()) - } + stats := client.Pool().Stats() + Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2))) }) It("should pub/sub channels", func() { @@ -128,16 +131,6 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) defer pubsub.Close() - n, err := client.Publish("mychannel", "hello").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(1))) - - n, err = client.Publish("mychannel2", "hello2").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(1))) - - Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) - { msgi, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) @@ -156,6 +149,22 @@ var _ = Describe("PubSub", func() { Expect(subscr.Count).To(Equal(2)) } + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err.(net.Error).Timeout()).To(Equal(true)) + Expect(msgi).NotTo(HaveOccurred()) + } + + n, err := client.Publish("mychannel", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + n, err = client.Publish("mychannel2", "hello2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) + { msgi, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) @@ -190,11 +199,8 @@ var _ = Describe("PubSub", func() { Expect(subscr.Count).To(Equal(0)) } - { - msgi, err := pubsub.ReceiveTimeout(time.Second) - Expect(err.(net.Error).Timeout()).To(Equal(true)) - Expect(msgi).NotTo(HaveOccurred()) - } + stats := client.Pool().Stats() + Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2))) }) It("should ping/pong", func() { @@ -277,6 +283,9 @@ var _ = Describe("PubSub", func() { Expect(msg.Payload).To(Equal("hello")) Eventually(done).Should(Receive()) + + stats := client.Pool().Stats() + Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2))) }) expectReceiveMessageOnError := func(pubsub *redis.PubSub) { @@ -305,6 +314,9 @@ var _ = Describe("PubSub", func() { Expect(msg.Payload).To(Equal("hello")) Eventually(done).Should(Receive()) + + stats := client.Pool().Stats() + Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(2))) } It("Subscribe should reconnect on ReceiveMessage error", func() {