diff --git a/CHANGELOG.md b/CHANGELOG.md index b35c4dd1..c166c617 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. - Cluster client was optimized to use much less memory when reloading cluster state. -- ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. +- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. ## v6.12 diff --git a/example_test.go b/example_test.go index 2206e7f5..3d0fde0c 100644 --- a/example_test.go +++ b/example_test.go @@ -324,26 +324,18 @@ func ExamplePubSub() { pubsub := client.Subscribe("mychannel1") defer pubsub.Close() - // Wait for subscription to be created before publishing message. - subscr, err := pubsub.ReceiveTimeout(time.Second) - if err != nil { - panic(err) - } - fmt.Println(subscr) + // Go channel which receives messages. + ch := pubsub.Channel() - err = client.Publish("mychannel1", "hello").Err() - if err != nil { - panic(err) - } - - msg, err := pubsub.ReceiveMessage() + // Publish a message. + err := client.Publish("mychannel1", "hello").Err() if err != nil { panic(err) } + msg := <-ch fmt.Println(msg.Channel, msg.Payload) - // Output: subscribe: mychannel1 - // mychannel1 hello + // Output: mychannel1 hello } func ExamplePubSub_Receive() { diff --git a/pubsub.go b/pubsub.go index b76dbc6d..2cfcd150 100644 --- a/pubsub.go +++ b/pubsub.go @@ -30,11 +30,9 @@ type PubSub struct { cmd *Cmd - pingOnce sync.Once - ping chan struct{} - chOnce sync.Once ch chan *Message + ping chan struct{} } func (c *PubSub) init() { @@ -326,8 +324,8 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { } // ReceiveTimeout acts like Receive but returns an error if message -// is not received in time. This is low-level API and most clients -// should use ReceiveMessage instead. +// is not received in time. This is low-level API and in most cases +// Channel should be used instead. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { if c.cmd == nil { c.cmd = NewCmd() @@ -349,28 +347,22 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { } // Receive returns a message as a Subscription, Message, Pong or error. -// See PubSub example for details. This is low-level API and most clients -// should use ReceiveMessage instead. +// See PubSub example for details. This is low-level API and in most cases +// Channel should be used instead. func (c *PubSub) Receive() (interface{}, error) { return c.ReceiveTimeout(0) } -// ReceiveMessage returns a Message or error ignoring Subscription or Pong -// messages. It periodically sends Ping messages to test connection health. +// ReceiveMessage returns a Message or error ignoring Subscription and Pong +// messages. This is low-level API and in most cases Channel should be used +// instead. func (c *PubSub) ReceiveMessage() (*Message, error) { - c.pingOnce.Do(c.initPing) for { msg, err := c.Receive() if err != nil { return nil, err } - // Any message is as good as a ping. - select { - case c.ping <- struct{}{}: - default: - } - switch msg := msg.(type) { case *Subscription: // Ignore. @@ -386,6 +378,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { } // Channel returns a Go channel for concurrently receiving messages. +// It periodically sends Ping messages to test connection health. // The channel is closed with PubSub. Receive* APIs can not be used // after channel is created. func (c *PubSub) Channel() <-chan *Message { @@ -395,10 +388,12 @@ func (c *PubSub) Channel() <-chan *Message { func (c *PubSub) initChannel() { c.ch = make(chan *Message, 100) + c.ping = make(chan struct{}, 10) + go func() { var errCount int for { - msg, err := c.ReceiveMessage() + msg, err := c.Receive() if err != nil { if err == pool.ErrClosed { close(c.ch) @@ -411,16 +406,29 @@ func (c *PubSub) initChannel() { continue } errCount = 0 - c.ch <- msg + + // Any message is as good as a ping. + select { + case c.ping <- struct{}{}: + default: + } + + switch msg := msg.(type) { + case *Subscription: + // Ignore. + case *Pong: + // Ignore. + case *Message: + c.ch <- msg + default: + internal.Logf("redis: unknown message: %T", msg) + } } }() -} -func (c *PubSub) initPing() { - const timeout = 5 * time.Second - - c.ping = make(chan struct{}, 10) go func() { + const timeout = 5 * time.Second + timer := time.NewTimer(timeout) timer.Stop()