Change ReceiveMessage to not use Ping

This commit is contained in:
Vladimir Mihailenco 2018-07-24 09:41:14 +03:00
parent c696191b80
commit 9bb7bb3cde
3 changed files with 38 additions and 38 deletions

View File

@ -4,7 +4,7 @@
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. - Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards.
- Cluster client was optimized to use much less memory when reloading cluster state. - Cluster client was optimized to use much less memory when reloading cluster state.
- ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. - PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead.
## v6.12 ## v6.12

View File

@ -324,26 +324,18 @@ func ExamplePubSub() {
pubsub := client.Subscribe("mychannel1") pubsub := client.Subscribe("mychannel1")
defer pubsub.Close() defer pubsub.Close()
// Wait for subscription to be created before publishing message. // Go channel which receives messages.
subscr, err := pubsub.ReceiveTimeout(time.Second) ch := pubsub.Channel()
if err != nil {
panic(err)
}
fmt.Println(subscr)
err = client.Publish("mychannel1", "hello").Err() // Publish a message.
if err != nil { err := client.Publish("mychannel1", "hello").Err()
panic(err)
}
msg, err := pubsub.ReceiveMessage()
if err != nil { if err != nil {
panic(err) panic(err)
} }
msg := <-ch
fmt.Println(msg.Channel, msg.Payload) fmt.Println(msg.Channel, msg.Payload)
// Output: subscribe: mychannel1 // Output: mychannel1 hello
// mychannel1 hello
} }
func ExamplePubSub_Receive() { func ExamplePubSub_Receive() {

View File

@ -30,11 +30,9 @@ type PubSub struct {
cmd *Cmd cmd *Cmd
pingOnce sync.Once
ping chan struct{}
chOnce sync.Once chOnce sync.Once
ch chan *Message ch chan *Message
ping chan struct{}
} }
func (c *PubSub) init() { func (c *PubSub) init() {
@ -326,8 +324,8 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
} }
// ReceiveTimeout acts like Receive but returns an error if message // ReceiveTimeout acts like Receive but returns an error if message
// is not received in time. This is low-level API and most clients // is not received in time. This is low-level API and in most cases
// should use ReceiveMessage instead. // Channel should be used instead.
func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
if c.cmd == nil { if c.cmd == nil {
c.cmd = NewCmd() c.cmd = NewCmd()
@ -349,28 +347,22 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
} }
// Receive returns a message as a Subscription, Message, Pong or error. // Receive returns a message as a Subscription, Message, Pong or error.
// See PubSub example for details. This is low-level API and most clients // See PubSub example for details. This is low-level API and in most cases
// should use ReceiveMessage instead. // Channel should be used instead.
func (c *PubSub) Receive() (interface{}, error) { func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0) return c.ReceiveTimeout(0)
} }
// ReceiveMessage returns a Message or error ignoring Subscription or Pong // ReceiveMessage returns a Message or error ignoring Subscription and Pong
// messages. It periodically sends Ping messages to test connection health. // messages. This is low-level API and in most cases Channel should be used
// instead.
func (c *PubSub) ReceiveMessage() (*Message, error) { func (c *PubSub) ReceiveMessage() (*Message, error) {
c.pingOnce.Do(c.initPing)
for { for {
msg, err := c.Receive() msg, err := c.Receive()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Any message is as good as a ping.
select {
case c.ping <- struct{}{}:
default:
}
switch msg := msg.(type) { switch msg := msg.(type) {
case *Subscription: case *Subscription:
// Ignore. // Ignore.
@ -386,6 +378,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
} }
// Channel returns a Go channel for concurrently receiving messages. // Channel returns a Go channel for concurrently receiving messages.
// It periodically sends Ping messages to test connection health.
// The channel is closed with PubSub. Receive* APIs can not be used // The channel is closed with PubSub. Receive* APIs can not be used
// after channel is created. // after channel is created.
func (c *PubSub) Channel() <-chan *Message { func (c *PubSub) Channel() <-chan *Message {
@ -395,10 +388,12 @@ func (c *PubSub) Channel() <-chan *Message {
func (c *PubSub) initChannel() { func (c *PubSub) initChannel() {
c.ch = make(chan *Message, 100) c.ch = make(chan *Message, 100)
c.ping = make(chan struct{}, 10)
go func() { go func() {
var errCount int var errCount int
for { for {
msg, err := c.ReceiveMessage() msg, err := c.Receive()
if err != nil { if err != nil {
if err == pool.ErrClosed { if err == pool.ErrClosed {
close(c.ch) close(c.ch)
@ -411,16 +406,29 @@ func (c *PubSub) initChannel() {
continue continue
} }
errCount = 0 errCount = 0
c.ch <- msg
// Any message is as good as a ping.
select {
case c.ping <- struct{}{}:
default:
}
switch msg := msg.(type) {
case *Subscription:
// Ignore.
case *Pong:
// Ignore.
case *Message:
c.ch <- msg
default:
internal.Logf("redis: unknown message: %T", msg)
}
} }
}() }()
}
func (c *PubSub) initPing() {
const timeout = 5 * time.Second
c.ping = make(chan struct{}, 10)
go func() { go func() {
const timeout = 5 * time.Second
timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
timer.Stop() timer.Stop()