Change PubSub.Channel

This commit is contained in:
Vladimir Mihailenco 2019-07-01 17:21:32 +03:00
parent 73d3c18522
commit 11ef80b162
2 changed files with 153 additions and 70 deletions

View File

@ -7,6 +7,7 @@
- New methods ProcessContext, DoContext, and ExecContext. - New methods ProcessContext, DoContext, and ExecContext.
- Client respects Context.Deadline when setting net.Conn deadline. - Client respects Context.Deadline when setting net.Conn deadline.
- Client listens on Context.Done while waiting for a connection from the pool. - Client listens on Context.Done while waiting for a connection from the pool.
- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow detecting reconnections.
## v6.15 ## v6.15

222
pubsub.go
View File

@ -13,6 +13,8 @@ import (
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
) )
const pingTimeout = 30 * time.Second
var errPingTimeout = errors.New("redis: ping timeout") var errPingTimeout = errors.New("redis: ping timeout")
// PubSub implements Pub/Sub commands as described in // PubSub implements Pub/Sub commands as described in
@ -38,7 +40,8 @@ type PubSub struct {
cmd *Cmd cmd *Cmd
chOnce sync.Once chOnce sync.Once
ch chan *Message msgCh chan *Message
allCh chan interface{}
ping chan struct{} ping chan struct{}
} }
@ -394,95 +397,64 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
} }
// Channel returns a Go channel for concurrently receiving messages. // Channel returns a Go channel for concurrently receiving messages.
// It periodically sends Ping messages to test connection health. // The channel is closed together with the PubSub. If the Go channel
// The channel is closed with PubSub. Receive* APIs can not be used // is blocked full for 30 seconds the message is dropped.
// after channel is created. // Receive* APIs can not be used after channel is created.
// //
// If the Go channel is full for 30 seconds the message is dropped. // go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not not received for 30 seconds.
func (c *PubSub) Channel() <-chan *Message { func (c *PubSub) Channel() <-chan *Message {
return c.channel(100) return c.ChannelSize(100)
} }
// ChannelSize is like Channel, but creates a Go channel // ChannelSize is like Channel, but creates a Go channel
// with specified buffer size. // with specified buffer size.
func (c *PubSub) ChannelSize(size int) <-chan *Message { func (c *PubSub) ChannelSize(size int) <-chan *Message {
return c.channel(size)
}
func (c *PubSub) channel(size int) <-chan *Message {
c.chOnce.Do(func() { c.chOnce.Do(func() {
c.initChannel(size) c.initPing()
c.initMsgChan(size)
}) })
if cap(c.ch) != size { if c.msgCh == nil {
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size") err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
panic(err) panic(err)
} }
return c.ch if cap(c.msgCh) != size {
err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
panic(err)
}
return c.msgCh
} }
func (c *PubSub) initChannel(size int) { // ChannelWithSubscriptions is like Channel, but message type can be either
const timeout = 30 * time.Second // *Subscription or *Message. Subscription messages can be used to detect
// reconnections.
//
// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
func (c *PubSub) ChannelWithSubscriptions(size int) <-chan interface{} {
c.chOnce.Do(func() {
c.initPing()
c.initAllChan(size)
})
if c.allCh == nil {
err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
panic(err)
}
if cap(c.allCh) != size {
err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
panic(err)
}
return c.allCh
}
c.ch = make(chan *Message, size) func (c *PubSub) initPing() {
c.ping = make(chan struct{}, 1) c.ping = make(chan struct{}, 1)
go func() { go func() {
timer := time.NewTimer(timeout) timer := time.NewTimer(pingTimeout)
timer.Stop()
var errCount int
for {
msg, err := c.Receive()
if err != nil {
if err == pool.ErrClosed {
close(c.ch)
return
}
if errCount > 0 {
time.Sleep(c.retryBackoff(errCount))
}
errCount++
continue
}
errCount = 0
// Any message is as good as a ping.
select {
case c.ping <- struct{}{}:
default:
}
switch msg := msg.(type) {
case *Subscription:
// Ignore.
case *Pong:
// Ignore.
case *Message:
timer.Reset(timeout)
select {
case c.ch <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
internal.Logger.Printf(
"redis: %s channel is full for %s (message is dropped)",
c, timeout)
}
default:
internal.Logger.Printf("redis: unknown message type: %T", msg)
}
}
}()
go func() {
timer := time.NewTimer(timeout)
timer.Stop() timer.Stop()
healthy := true healthy := true
for { for {
timer.Reset(timeout) timer.Reset(pingTimeout)
select { select {
case <-c.ping: case <-c.ping:
healthy = true healthy = true
@ -508,6 +480,116 @@ func (c *PubSub) initChannel(size int) {
}() }()
} }
// initMsgChan must be in sync with initAllChan.
func (c *PubSub) initMsgChan(size int) {
c.msgCh = make(chan *Message, size)
go func() {
timer := time.NewTimer(pingTimeout)
timer.Stop()
var errCount int
for {
msg, err := c.Receive()
if err != nil {
if err == pool.ErrClosed {
close(c.msgCh)
return
}
if errCount > 0 {
time.Sleep(c.retryBackoff(errCount))
}
errCount++
continue
}
errCount = 0
// Any message is as good as a ping.
select {
case c.ping <- struct{}{}:
default:
}
switch msg := msg.(type) {
case *Subscription:
// Ignore.
case *Pong:
// Ignore.
case *Message:
timer.Reset(pingTimeout)
select {
case c.msgCh <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
internal.Logger.Printf(
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
}
default:
internal.Logger.Printf("redis: unknown message type: %T", msg)
}
}
}()
}
// initAllChan must be in sync with initMsgChan.
func (c *PubSub) initAllChan(size int) {
c.allCh = make(chan interface{}, size)
go func() {
timer := time.NewTimer(pingTimeout)
timer.Stop()
var errCount int
for {
msg, err := c.Receive()
if err != nil {
if err == pool.ErrClosed {
close(c.allCh)
return
}
if errCount > 0 {
time.Sleep(c.retryBackoff(errCount))
}
errCount++
continue
}
errCount = 0
// Any message is as good as a ping.
select {
case c.ping <- struct{}{}:
default:
}
switch msg := msg.(type) {
case *Subscription:
c.sendMessage(msg, timer)
case *Pong:
// Ignore.
case *Message:
c.sendMessage(msg, timer)
default:
internal.Logger.Printf("redis: unknown message type: %T", msg)
}
}
}()
}
func (c *PubSub) sendMessage(msg interface{}, timer *time.Timer) {
timer.Reset(pingTimeout)
select {
case c.allCh <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
internal.Logger.Printf(
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
}
}
func (c *PubSub) retryBackoff(attempt int) time.Duration { func (c *PubSub) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
} }