redis/pubsub.go

309 lines
6.9 KiB
Go
Raw Normal View History

2012-07-25 17:00:50 +04:00
package redis
import (
"fmt"
2015-09-06 13:50:16 +03:00
"net"
2014-05-11 11:42:40 +04:00
"time"
2016-04-09 14:52:01 +03:00
"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/pool"
2012-07-25 17:00:50 +04:00
)
2015-07-11 13:42:44 +03:00
// Posts a message to the given channel.
func (c *Client) Publish(channel, message string) *IntCmd {
req := NewIntCmd("PUBLISH", channel, message)
c.Process(req)
return req
}
2015-05-23 14:15:05 +03:00
// PubSub implements Pub/Sub commands as described in
2015-09-12 09:36:03 +03:00
// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
// multiple goroutines.
2014-05-11 11:42:40 +04:00
type PubSub struct {
base baseClient
2015-09-06 13:50:16 +03:00
channels []string
patterns []string
nsub int // number of active subscriptions
2012-07-25 17:00:50 +04:00
}
2015-07-11 13:42:44 +03:00
// Deprecated. Use Subscribe/PSubscribe instead.
2014-05-11 11:42:40 +04:00
func (c *Client) PubSub() *PubSub {
return &PubSub{
base: baseClient{
2014-05-11 11:42:40 +04:00
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
},
2014-05-11 11:42:40 +04:00
}
}
2015-07-11 13:42:44 +03:00
// Subscribes the client to the specified channels.
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
pubsub := c.PubSub()
return pubsub, pubsub.Subscribe(channels...)
}
// Subscribes the client to the given patterns.
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
pubsub := c.PubSub()
return pubsub, pubsub.PSubscribe(channels...)
2012-07-25 17:00:50 +04:00
}
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
2016-03-15 15:04:35 +03:00
cn, err := c.base.conn()
2015-09-06 13:50:16 +03:00
if err != nil {
return err
}
c.putConn(cn, err)
2015-09-06 13:50:16 +03:00
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
2015-09-06 13:50:16 +03:00
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)
return writeCmd(cn, cmd)
2015-09-06 13:50:16 +03:00
}
// Subscribes the client to the specified channels.
func (c *PubSub) Subscribe(channels ...string) error {
err := c.subscribe("SUBSCRIBE", channels...)
if err == nil {
c.channels = append(c.channels, channels...)
c.nsub += len(channels)
2015-09-06 13:50:16 +03:00
}
return err
}
// Subscribes the client to the given patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
err := c.subscribe("PSUBSCRIBE", patterns...)
if err == nil {
c.patterns = append(c.patterns, patterns...)
c.nsub += len(patterns)
2015-09-06 13:50:16 +03:00
}
return err
}
func remove(ss []string, es ...string) []string {
2015-11-22 15:44:38 +03:00
if len(es) == 0 {
return ss[:0]
}
2015-09-06 13:50:16 +03:00
for _, e := range es {
for i, s := range ss {
if s == e {
ss = append(ss[:i], ss[i+1:]...)
break
}
}
}
return ss
}
// Unsubscribes the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
err := c.subscribe("UNSUBSCRIBE", channels...)
if err == nil {
c.channels = remove(c.channels, channels...)
}
return err
}
// Unsubscribes the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
err := c.subscribe("PUNSUBSCRIBE", patterns...)
if err == nil {
c.patterns = remove(c.patterns, patterns...)
}
return err
}
func (c *PubSub) Close() error {
return c.base.Close()
}
2015-07-11 13:12:47 +03:00
func (c *PubSub) Ping(payload string) error {
2016-03-15 15:04:35 +03:00
cn, err := c.base.conn()
2015-07-11 13:12:47 +03:00
if err != nil {
return err
}
args := []interface{}{"PING"}
if payload != "" {
args = append(args, payload)
}
cmd := NewCmd(args...)
return writeCmd(cn, cmd)
2015-07-11 13:12:47 +03:00
}
2015-07-11 13:42:44 +03:00
// Message received after a successful subscription to channel.
type Subscription struct {
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
Kind string
// Channel name we have subscribed to.
Channel string
// Number of channels we are currently subscribed to.
Count int
}
func (m *Subscription) String() string {
return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
}
2015-05-23 18:17:45 +03:00
// Message received as result of a PUBLISH command issued by another client.
2012-07-25 17:00:50 +04:00
type Message struct {
2014-05-11 11:42:40 +04:00
Channel string
2015-09-06 13:50:16 +03:00
Pattern string
2014-05-11 11:42:40 +04:00
Payload string
}
2012-07-25 17:00:50 +04:00
2014-05-11 18:11:55 +04:00
func (m *Message) String() string {
return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
}
2015-07-11 13:12:47 +03:00
// Pong received as result of a PING command issued by another client.
type Pong struct {
Payload string
}
func (p *Pong) String() string {
if p.Payload != "" {
return fmt.Sprintf("Pong<%s>", p.Payload)
}
return "Pong"
}
func (c *PubSub) newMessage(reply []interface{}) (interface{}, error) {
2015-07-11 13:12:47 +03:00
switch kind := reply[0].(string); kind {
2014-05-11 11:42:40 +04:00
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
return &Subscription{
Kind: kind,
2014-05-11 11:42:40 +04:00
Channel: reply[1].(string),
Count: int(reply[2].(int64)),
}, nil
case "message":
return &Message{
Channel: reply[1].(string),
Payload: reply[2].(string),
}, nil
case "pmessage":
2016-04-09 11:45:56 +03:00
return &Message{
2014-05-11 11:42:40 +04:00
Pattern: reply[1].(string),
Channel: reply[2].(string),
Payload: reply[3].(string),
}, nil
2015-07-11 13:12:47 +03:00
case "pong":
return &Pong{
Payload: reply[1].(string),
}, nil
default:
return nil, fmt.Errorf("redis: unsupported pubsub notification: %q", kind)
}
}
2015-07-11 13:42:44 +03:00
// ReceiveTimeout acts like Receive but returns an error if message
2015-09-06 13:50:16 +03:00
// is not received in time. This is low-level API and most clients
// should use ReceiveMessage.
2015-07-11 13:12:47 +03:00
func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
if c.nsub == 0 {
c.resubscribe()
}
2016-03-15 15:04:35 +03:00
cn, err := c.base.conn()
2015-07-11 13:12:47 +03:00
if err != nil {
return nil, err
2014-05-11 11:42:40 +04:00
}
2015-07-11 13:12:47 +03:00
cn.ReadTimeout = timeout
2015-07-11 13:12:47 +03:00
cmd := NewSliceCmd()
err = cmd.readReply(cn)
c.putConn(cn, err)
if err != nil {
2015-07-11 13:12:47 +03:00
return nil, err
}
return c.newMessage(cmd.Val())
2014-05-11 11:42:40 +04:00
}
2012-07-25 17:00:50 +04:00
2016-04-09 11:45:56 +03:00
// Receive returns a message as a Subscription, Message, Pong or error.
// See PubSub example for details. This is low-level API and most clients
// should use ReceiveMessage.
2015-09-06 13:50:16 +03:00
func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0)
}
2014-05-11 11:42:40 +04:00
// ReceiveMessage returns a Message or error ignoring Subscription or Pong
// messages. It automatically reconnects to Redis Server and resubscribes
// to channels in case of network errors.
2015-09-06 13:50:16 +03:00
func (c *PubSub) ReceiveMessage() (*Message, error) {
2016-04-09 14:52:01 +03:00
return c.receiveMessage(5 * time.Second)
}
func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
var errNum uint
2015-09-06 13:50:16 +03:00
for {
2016-04-09 14:52:01 +03:00
msgi, err := c.ReceiveTimeout(timeout)
2015-09-06 13:50:16 +03:00
if err != nil {
2015-12-02 16:40:44 +03:00
if !isNetworkError(err) {
return nil, err
}
errNum++
if errNum < 3 {
2015-12-02 16:40:44 +03:00
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err := c.Ping("")
if err != nil {
2016-04-09 14:52:01 +03:00
internal.Logf("PubSub.Ping failed: %s", err)
2015-12-02 16:40:44 +03:00
}
2015-09-06 13:50:16 +03:00
}
} else {
// 3 consequent errors - connection is bad
// and/or Redis Server is down.
// Sleep to not exceed max number of open connections.
2015-12-02 16:40:44 +03:00
time.Sleep(time.Second)
2015-09-06 13:50:16 +03:00
}
2015-12-02 16:40:44 +03:00
continue
2015-09-06 13:50:16 +03:00
}
// Reset error number, because we received a message.
2015-12-02 16:40:44 +03:00
errNum = 0
2015-09-06 13:50:16 +03:00
switch msg := msgi.(type) {
case *Subscription:
// Ignore.
case *Pong:
// Ignore.
case *Message:
return msg, nil
default:
return nil, fmt.Errorf("redis: unknown message: %T", msgi)
}
}
}
func (c *PubSub) putConn(cn *pool.Conn, err error) {
2016-03-08 18:18:52 +03:00
if !c.base.putConn(cn, err, true) {
c.nsub = 0
}
}
func (c *PubSub) resubscribe() {
if c.base.closed() {
return
}
if len(c.channels) > 0 {
if err := c.Subscribe(c.channels...); err != nil {
2016-04-09 14:52:01 +03:00
internal.Logf("Subscribe failed: %s", err)
}
}
if len(c.patterns) > 0 {
if err := c.PSubscribe(c.patterns...); err != nil {
2016-04-09 14:52:01 +03:00
internal.Logf("PSubscribe failed: %s", err)
}
}
}