Merge pull request #993 from go-redis/fix/full-chan-drop-message

pubsub: drop a message when the channel is full
This commit is contained in:
Vladimir Mihailenco 2019-03-12 13:07:51 +02:00 committed by GitHub
commit 9ecae37814
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 10 deletions

View File

@ -3,6 +3,7 @@ package redis
import ( import (
"errors" "errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@ -29,8 +30,9 @@ type PubSub struct {
cn *pool.Conn cn *pool.Conn
channels map[string]struct{} channels map[string]struct{}
patterns map[string]struct{} patterns map[string]struct{}
closed bool
exit chan struct{} closed bool
exit chan struct{}
cmd *Cmd cmd *Cmd
@ -39,6 +41,12 @@ type PubSub struct {
ping chan struct{} ping chan struct{}
} }
func (c *PubSub) String() string {
channels := mapKeys(c.channels)
channels = append(channels, mapKeys(c.patterns)...)
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
}
func (c *PubSub) init() { func (c *PubSub) init() {
c.exit = make(chan struct{}) c.exit = make(chan struct{})
} }
@ -389,16 +397,39 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
// It periodically sends Ping messages to test connection health. // It periodically sends Ping messages to test connection health.
// The channel is closed with PubSub. Receive* APIs can not be used // The channel is closed with PubSub. Receive* APIs can not be used
// after channel is created. // after channel is created.
//
// If the Go channel is full for 30 seconds the message is dropped.
func (c *PubSub) Channel() <-chan *Message { func (c *PubSub) Channel() <-chan *Message {
c.chOnce.Do(c.initChannel) return c.channel(100)
}
// ChannelSize is like Channel, but creates a Go channel
// with specified buffer size.
func (c *PubSub) ChannelSize(size int) <-chan *Message {
return c.channel(size)
}
func (c *PubSub) channel(size int) <-chan *Message {
c.chOnce.Do(func() {
c.initChannel(size)
})
if cap(c.ch) != size {
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
panic(err)
}
return c.ch return c.ch
} }
func (c *PubSub) initChannel() { func (c *PubSub) initChannel(size int) {
c.ch = make(chan *Message, 100) const timeout = 30 * time.Second
c.ping = make(chan struct{}, 10)
c.ch = make(chan *Message, size)
c.ping = make(chan struct{}, 1)
go func() { go func() {
timer := time.NewTimer(timeout)
timer.Stop()
var errCount int var errCount int
for { for {
msg, err := c.Receive() msg, err := c.Receive()
@ -413,6 +444,7 @@ func (c *PubSub) initChannel() {
errCount++ errCount++
continue continue
} }
errCount = 0 errCount = 0
// Any message is as good as a ping. // Any message is as good as a ping.
@ -427,16 +459,24 @@ func (c *PubSub) initChannel() {
case *Pong: case *Pong:
// Ignore. // Ignore.
case *Message: case *Message:
c.ch <- msg timer.Reset(timeout)
select {
case c.ch <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
internal.Logf(
"redis: %s channel is full for %s (message is dropped)",
c, timeout)
}
default: default:
internal.Logf("redis: unknown message: %T", msg) internal.Logf("redis: unknown message type: %T", msg)
} }
} }
}() }()
go func() { go func() {
const timeout = 5 * time.Second
timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
timer.Stop() timer.Stop()

View File

@ -27,6 +27,13 @@ var _ = Describe("PubSub", func() {
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
It("implements Stringer", func() {
pubsub := client.PSubscribe("mychannel*")
defer pubsub.Close()
Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
})
It("should support pattern matching", func() { It("should support pattern matching", func() {
pubsub := client.PSubscribe("mychannel*") pubsub := client.PSubscribe("mychannel*")
defer pubsub.Close() defer pubsub.Close()