2012-07-25 17:00:50 +04:00
|
|
|
package redis
|
|
|
|
|
|
|
|
import (
|
2019-06-08 15:02:51 +03:00
|
|
|
"context"
|
2012-07-25 17:00:50 +04:00
|
|
|
"fmt"
|
2019-03-12 13:40:08 +03:00
|
|
|
"strings"
|
2017-04-17 15:43:58 +03:00
|
|
|
"sync"
|
2014-05-11 11:42:40 +04:00
|
|
|
"time"
|
2016-03-12 11:52:13 +03:00
|
|
|
|
2023-01-23 09:48:54 +03:00
|
|
|
"github.com/redis/go-redis/v9/internal"
|
|
|
|
"github.com/redis/go-redis/v9/internal/pool"
|
|
|
|
"github.com/redis/go-redis/v9/internal/proto"
|
2012-07-25 17:00:50 +04:00
|
|
|
)
|
|
|
|
|
2019-04-08 15:06:31 +03:00
|
|
|
// PubSub implements Pub/Sub commands as described in
|
2018-07-23 15:55:13 +03:00
|
|
|
// http://redis.io/topics/pubsub. Message receiving is NOT safe
|
|
|
|
// for concurrent use by multiple goroutines.
|
2017-05-11 17:02:26 +03:00
|
|
|
//
|
2018-07-23 15:55:13 +03:00
|
|
|
// PubSub automatically reconnects to Redis Server and resubscribes
|
|
|
|
// to the channels in case of network errors.
|
2014-05-11 11:42:40 +04:00
|
|
|
type PubSub struct {
|
2017-07-09 10:07:20 +03:00
|
|
|
opt *Options
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
|
2017-07-09 10:07:20 +03:00
|
|
|
closeConn func(*pool.Conn) error
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2022-10-11 10:22:42 +03:00
|
|
|
mu sync.Mutex
|
|
|
|
cn *pool.Conn
|
|
|
|
channels map[string]struct{}
|
|
|
|
patterns map[string]struct{}
|
2022-08-03 18:10:03 +03:00
|
|
|
schannels map[string]struct{}
|
2019-03-12 13:40:08 +03:00
|
|
|
|
|
|
|
closed bool
|
|
|
|
exit chan struct{}
|
2017-04-24 12:43:15 +03:00
|
|
|
|
|
|
|
cmd *Cmd
|
2017-10-30 13:09:57 +03:00
|
|
|
|
|
|
|
chOnce sync.Once
|
2021-05-26 06:25:18 +03:00
|
|
|
msgCh *channel
|
|
|
|
allCh *channel
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *PubSub) init() {
|
|
|
|
c.exit = make(chan struct{})
|
2016-09-29 15:07:04 +03:00
|
|
|
}
|
|
|
|
|
2019-03-12 13:40:08 +03:00
|
|
|
func (c *PubSub) String() string {
|
|
|
|
channels := mapKeys(c.channels)
|
|
|
|
channels = append(channels, mapKeys(c.patterns)...)
|
2022-08-03 18:10:03 +03:00
|
|
|
channels = append(channels, mapKeys(c.schannels)...)
|
2019-03-12 13:40:08 +03:00
|
|
|
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) {
|
2017-04-24 12:43:15 +03:00
|
|
|
c.mu.Lock()
|
2020-03-11 17:26:42 +03:00
|
|
|
cn, err := c.conn(ctx, nil)
|
2017-06-29 17:05:08 +03:00
|
|
|
c.mu.Unlock()
|
|
|
|
return cn, err
|
|
|
|
}
|
2017-04-24 12:43:15 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, error) {
|
2017-04-24 12:43:15 +03:00
|
|
|
if c.closed {
|
2017-06-29 17:05:08 +03:00
|
|
|
return nil, pool.ErrClosed
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
if c.cn != nil {
|
2017-06-29 17:05:08 +03:00
|
|
|
return c.cn, nil
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
|
2018-08-04 12:19:19 +03:00
|
|
|
channels := mapKeys(c.channels)
|
|
|
|
channels = append(channels, newChannels...)
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
cn, err := c.newConn(ctx, channels)
|
2016-09-29 15:07:04 +03:00
|
|
|
if err != nil {
|
2017-06-29 17:05:08 +03:00
|
|
|
return nil, err
|
2016-09-29 15:07:04 +03:00
|
|
|
}
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
if err := c.resubscribe(ctx, cn); err != nil {
|
2017-07-09 10:07:20 +03:00
|
|
|
_ = c.closeConn(cn)
|
2017-06-29 17:05:08 +03:00
|
|
|
return nil, err
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
c.cn = cn
|
2017-06-29 17:05:08 +03:00
|
|
|
return cn, nil
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
|
|
|
|
2019-06-08 15:02:51 +03:00
|
|
|
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
|
2024-07-05 05:27:44 +03:00
|
|
|
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
2018-08-17 13:56:37 +03:00
|
|
|
return writeCmd(wr, cmd)
|
2018-08-15 11:53:15 +03:00
|
|
|
})
|
2018-08-04 12:19:19 +03:00
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) resubscribe(ctx context.Context, cn *pool.Conn) error {
|
2017-04-17 16:05:01 +03:00
|
|
|
var firstErr error
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2017-04-24 12:43:15 +03:00
|
|
|
if len(c.channels) > 0 {
|
2020-03-11 17:26:42 +03:00
|
|
|
firstErr = c._subscribe(ctx, cn, "subscribe", mapKeys(c.channels))
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2017-04-24 12:43:15 +03:00
|
|
|
if len(c.patterns) > 0 {
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c._subscribe(ctx, cn, "psubscribe", mapKeys(c.patterns))
|
2018-07-23 15:55:13 +03:00
|
|
|
if err != nil && firstErr == nil {
|
2017-04-17 16:05:01 +03:00
|
|
|
firstErr = err
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2022-08-03 18:10:03 +03:00
|
|
|
if len(c.schannels) > 0 {
|
|
|
|
err := c._subscribe(ctx, cn, "ssubscribe", mapKeys(c.schannels))
|
|
|
|
if err != nil && firstErr == nil {
|
|
|
|
firstErr = err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-17 16:05:01 +03:00
|
|
|
return firstErr
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
func mapKeys(m map[string]struct{}) []string {
|
|
|
|
s := make([]string, len(m))
|
|
|
|
i := 0
|
|
|
|
for k := range m {
|
|
|
|
s[i] = k
|
2021-04-08 09:44:31 +03:00
|
|
|
i++
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
2018-08-04 12:19:19 +03:00
|
|
|
func (c *PubSub) _subscribe(
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx context.Context, cn *pool.Conn, redisCmd string, channels []string,
|
2018-08-04 12:19:19 +03:00
|
|
|
) error {
|
|
|
|
args := make([]interface{}, 0, 1+len(channels))
|
|
|
|
args = append(args, redisCmd)
|
|
|
|
for _, channel := range channels {
|
|
|
|
args = append(args, channel)
|
2017-06-29 16:53:49 +03:00
|
|
|
}
|
2020-03-11 17:26:42 +03:00
|
|
|
cmd := NewSliceCmd(ctx, args...)
|
|
|
|
return c.writeCmd(ctx, cn, cmd)
|
2017-06-29 16:53:49 +03:00
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) releaseConnWithLock(
|
|
|
|
ctx context.Context,
|
|
|
|
cn *pool.Conn,
|
|
|
|
err error,
|
|
|
|
allowTimeout bool,
|
|
|
|
) {
|
2017-04-24 12:43:15 +03:00
|
|
|
c.mu.Lock()
|
2020-03-11 17:26:42 +03:00
|
|
|
c.releaseConn(ctx, cn, err, allowTimeout)
|
2017-04-24 12:43:15 +03:00
|
|
|
c.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allowTimeout bool) {
|
2017-08-31 15:22:47 +03:00
|
|
|
if c.cn != cn {
|
|
|
|
return
|
|
|
|
}
|
2021-10-04 13:10:42 +03:00
|
|
|
if isBadConn(err, allowTimeout, c.opt.Addr) {
|
2020-03-11 17:26:42 +03:00
|
|
|
c.reconnect(ctx, err)
|
2017-08-01 14:21:26 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) reconnect(ctx context.Context, reason error) {
|
2019-06-17 12:32:40 +03:00
|
|
|
_ = c.closeTheCn(reason)
|
2020-03-11 17:26:42 +03:00
|
|
|
_, _ = c.conn(ctx, nil)
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
|
|
|
|
2019-06-17 12:32:40 +03:00
|
|
|
func (c *PubSub) closeTheCn(reason error) error {
|
2018-08-07 10:33:07 +03:00
|
|
|
if c.cn == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if !c.closed {
|
2020-07-18 09:04:36 +03:00
|
|
|
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason)
|
2018-08-07 10:33:07 +03:00
|
|
|
}
|
|
|
|
err := c.closeConn(c.cn)
|
|
|
|
c.cn = nil
|
|
|
|
return err
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
|
2017-04-24 12:43:15 +03:00
|
|
|
func (c *PubSub) Close() error {
|
2017-04-17 15:43:58 +03:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
if c.closed {
|
2017-04-24 12:43:15 +03:00
|
|
|
return pool.ErrClosed
|
2017-04-17 15:43:58 +03:00
|
|
|
}
|
2017-04-24 12:43:15 +03:00
|
|
|
c.closed = true
|
2018-07-23 15:55:13 +03:00
|
|
|
close(c.exit)
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2019-06-17 12:32:40 +03:00
|
|
|
return c.closeTheCn(pool.ErrClosed)
|
2017-04-24 12:43:15 +03:00
|
|
|
}
|
2017-04-17 15:43:58 +03:00
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// Subscribe the client to the specified channels. It returns
|
2017-05-11 17:02:26 +03:00
|
|
|
// empty subscription if there are no channels.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Subscribe(ctx context.Context, channels ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "subscribe", channels...)
|
2018-03-14 13:42:51 +03:00
|
|
|
if c.channels == nil {
|
|
|
|
c.channels = make(map[string]struct{})
|
|
|
|
}
|
2018-08-04 12:19:19 +03:00
|
|
|
for _, s := range channels {
|
|
|
|
c.channels[s] = struct{}{}
|
2018-03-14 13:42:51 +03:00
|
|
|
}
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// PSubscribe the client to the given patterns. It returns
|
2017-05-11 17:02:26 +03:00
|
|
|
// empty subscription if there are no patterns.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) PSubscribe(ctx context.Context, patterns ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "psubscribe", patterns...)
|
2018-03-14 13:42:51 +03:00
|
|
|
if c.patterns == nil {
|
|
|
|
c.patterns = make(map[string]struct{})
|
|
|
|
}
|
2018-08-04 12:19:19 +03:00
|
|
|
for _, s := range patterns {
|
|
|
|
c.patterns[s] = struct{}{}
|
2018-03-14 13:42:51 +03:00
|
|
|
}
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2022-08-03 18:10:03 +03:00
|
|
|
// SSubscribe Subscribes the client to the specified shard channels.
|
|
|
|
func (c *PubSub) SSubscribe(ctx context.Context, channels ...string) error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
err := c.subscribe(ctx, "ssubscribe", channels...)
|
|
|
|
if c.schannels == nil {
|
|
|
|
c.schannels = make(map[string]struct{})
|
|
|
|
}
|
|
|
|
for _, s := range channels {
|
|
|
|
c.schannels[s] = struct{}{}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// Unsubscribe the client from the given channels, or from all of
|
2015-09-06 13:50:16 +03:00
|
|
|
// them if none is given.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2022-10-14 15:15:06 +03:00
|
|
|
if len(channels) > 0 {
|
|
|
|
for _, channel := range channels {
|
|
|
|
delete(c.channels, channel)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Unsubscribe from all channels.
|
|
|
|
for channel := range c.channels {
|
|
|
|
delete(c.channels, channel)
|
|
|
|
}
|
2018-03-14 13:42:51 +03:00
|
|
|
}
|
2022-10-14 15:15:06 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "unsubscribe", channels...)
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// PUnsubscribe the client from the given patterns, or from all of
|
2015-09-06 13:50:16 +03:00
|
|
|
// them if none is given.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error {
|
2017-06-29 16:53:49 +03:00
|
|
|
c.mu.Lock()
|
2018-07-23 15:55:13 +03:00
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2022-10-14 15:15:06 +03:00
|
|
|
if len(patterns) > 0 {
|
|
|
|
for _, pattern := range patterns {
|
|
|
|
delete(c.patterns, pattern)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Unsubscribe from all patterns.
|
|
|
|
for pattern := range c.patterns {
|
|
|
|
delete(c.patterns, pattern)
|
|
|
|
}
|
2018-03-14 13:42:51 +03:00
|
|
|
}
|
2022-10-14 15:15:06 +03:00
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err := c.subscribe(ctx, "punsubscribe", patterns...)
|
2017-06-29 17:05:08 +03:00
|
|
|
return err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2022-08-03 18:10:03 +03:00
|
|
|
// SUnsubscribe unsubscribes the client from the given shard channels,
|
|
|
|
// or from all of them if none is given.
|
|
|
|
func (c *PubSub) SUnsubscribe(ctx context.Context, channels ...string) error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2022-10-14 15:15:06 +03:00
|
|
|
if len(channels) > 0 {
|
|
|
|
for _, channel := range channels {
|
|
|
|
delete(c.schannels, channel)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Unsubscribe from all channels.
|
|
|
|
for channel := range c.schannels {
|
|
|
|
delete(c.schannels, channel)
|
|
|
|
}
|
2022-08-03 18:10:03 +03:00
|
|
|
}
|
2022-10-14 15:15:06 +03:00
|
|
|
|
2022-08-03 18:10:03 +03:00
|
|
|
err := c.subscribe(ctx, "sunsubscribe", channels...)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) subscribe(ctx context.Context, redisCmd string, channels ...string) error {
|
|
|
|
cn, err := c.conn(ctx, channels)
|
2017-06-29 16:53:49 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err = c._subscribe(ctx, cn, redisCmd, channels)
|
|
|
|
c.releaseConn(ctx, cn, err, false)
|
2017-06-29 16:53:49 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Ping(ctx context.Context, payload ...string) error {
|
2017-04-17 15:43:58 +03:00
|
|
|
args := []interface{}{"ping"}
|
2017-02-23 16:29:38 +03:00
|
|
|
if len(payload) == 1 {
|
|
|
|
args = append(args, payload[0])
|
2015-07-11 13:12:47 +03:00
|
|
|
}
|
2020-03-11 17:26:42 +03:00
|
|
|
cmd := NewCmd(ctx, args...)
|
2016-12-03 18:30:13 +03:00
|
|
|
|
2021-09-03 12:57:34 +03:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
cn, err := c.conn(ctx, nil)
|
2016-12-03 18:30:13 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
err = c.writeCmd(ctx, cn, cmd)
|
2021-09-03 12:57:34 +03:00
|
|
|
c.releaseConn(ctx, cn, err, false)
|
2016-12-03 18:30:13 +03:00
|
|
|
return err
|
2015-07-11 13:12:47 +03:00
|
|
|
}
|
|
|
|
|
2018-01-24 21:38:47 +03:00
|
|
|
// Subscription received after a successful subscription to channel.
|
2015-07-11 13:42:44 +03:00
|
|
|
type Subscription struct {
|
|
|
|
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
|
|
|
|
Kind string
|
|
|
|
// Channel name we have subscribed to.
|
|
|
|
Channel string
|
|
|
|
// Number of channels we are currently subscribed to.
|
|
|
|
Count int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Subscription) String() string {
|
|
|
|
return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
|
|
|
|
}
|
|
|
|
|
2015-05-23 18:17:45 +03:00
|
|
|
// Message received as result of a PUBLISH command issued by another client.
|
2012-07-25 17:00:50 +04:00
|
|
|
type Message struct {
|
2020-08-21 12:19:31 +03:00
|
|
|
Channel string
|
|
|
|
Pattern string
|
|
|
|
Payload string
|
|
|
|
PayloadSlice []string
|
2014-05-11 11:42:40 +04:00
|
|
|
}
|
2012-07-25 17:00:50 +04:00
|
|
|
|
2014-05-11 18:11:55 +04:00
|
|
|
func (m *Message) String() string {
|
|
|
|
return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
|
|
|
|
}
|
|
|
|
|
2015-07-11 13:12:47 +03:00
|
|
|
// Pong received as result of a PING command issued by another client.
|
|
|
|
type Pong struct {
|
|
|
|
Payload string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pong) String() string {
|
|
|
|
if p.Payload != "" {
|
|
|
|
return fmt.Sprintf("Pong<%s>", p.Payload)
|
|
|
|
}
|
|
|
|
return "Pong"
|
|
|
|
}
|
|
|
|
|
2017-02-08 12:24:09 +03:00
|
|
|
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
|
|
|
switch reply := reply.(type) {
|
|
|
|
case string:
|
2015-07-11 13:12:47 +03:00
|
|
|
return &Pong{
|
2017-02-08 12:24:09 +03:00
|
|
|
Payload: reply,
|
2015-07-11 13:12:47 +03:00
|
|
|
}, nil
|
2017-02-08 12:24:09 +03:00
|
|
|
case []interface{}:
|
|
|
|
switch kind := reply[0].(string); kind {
|
2022-08-03 18:10:03 +03:00
|
|
|
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "ssubscribe", "sunsubscribe":
|
2020-05-21 09:10:43 +03:00
|
|
|
// Can be nil in case of "unsubscribe".
|
|
|
|
channel, _ := reply[1].(string)
|
2017-02-08 12:24:09 +03:00
|
|
|
return &Subscription{
|
|
|
|
Kind: kind,
|
2020-05-21 09:10:43 +03:00
|
|
|
Channel: channel,
|
2017-02-08 12:24:09 +03:00
|
|
|
Count: int(reply[2].(int64)),
|
|
|
|
}, nil
|
2022-08-03 18:10:03 +03:00
|
|
|
case "message", "smessage":
|
2020-08-21 12:19:31 +03:00
|
|
|
switch payload := reply[2].(type) {
|
|
|
|
case string:
|
|
|
|
return &Message{
|
|
|
|
Channel: reply[1].(string),
|
|
|
|
Payload: payload,
|
|
|
|
}, nil
|
|
|
|
case []interface{}:
|
|
|
|
ss := make([]string, len(payload))
|
|
|
|
for i, s := range payload {
|
|
|
|
ss[i] = s.(string)
|
|
|
|
}
|
|
|
|
return &Message{
|
|
|
|
Channel: reply[1].(string),
|
|
|
|
PayloadSlice: ss,
|
|
|
|
}, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload)
|
|
|
|
}
|
2017-02-08 12:24:09 +03:00
|
|
|
case "pmessage":
|
|
|
|
return &Message{
|
|
|
|
Pattern: reply[1].(string),
|
|
|
|
Channel: reply[2].(string),
|
|
|
|
Payload: reply[3].(string),
|
|
|
|
}, nil
|
|
|
|
case "pong":
|
|
|
|
return &Pong{
|
|
|
|
Payload: reply[1].(string),
|
|
|
|
}, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
|
|
|
}
|
2015-07-11 13:12:47 +03:00
|
|
|
default:
|
2017-02-08 12:24:09 +03:00
|
|
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
|
2015-07-11 13:12:47 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-07-11 13:42:44 +03:00
|
|
|
// ReceiveTimeout acts like Receive but returns an error if message
|
2018-07-24 09:41:14 +03:00
|
|
|
// is not received in time. This is low-level API and in most cases
|
|
|
|
// Channel should be used instead.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error) {
|
2017-02-08 12:24:09 +03:00
|
|
|
if c.cmd == nil {
|
2020-03-11 17:26:42 +03:00
|
|
|
c.cmd = NewCmd(ctx)
|
2017-02-08 12:24:09 +03:00
|
|
|
}
|
2016-12-03 18:30:13 +03:00
|
|
|
|
2021-09-03 12:57:34 +03:00
|
|
|
// Don't hold the lock to allow subscriptions and pings.
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
cn, err := c.connWithLock(ctx)
|
2015-07-11 13:12:47 +03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2014-05-11 11:42:40 +04:00
|
|
|
}
|
2015-05-10 15:33:04 +03:00
|
|
|
|
2022-10-11 10:22:42 +03:00
|
|
|
err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
|
2018-08-15 11:53:15 +03:00
|
|
|
return c.cmd.readReply(rd)
|
|
|
|
})
|
|
|
|
|
2020-03-11 17:26:42 +03:00
|
|
|
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
|
2021-09-03 12:57:34 +03:00
|
|
|
|
2015-11-14 15:44:16 +03:00
|
|
|
if err != nil {
|
2015-07-11 13:12:47 +03:00
|
|
|
return nil, err
|
|
|
|
}
|
2016-03-01 13:31:06 +03:00
|
|
|
|
2017-02-08 12:24:09 +03:00
|
|
|
return c.newMessage(c.cmd.Val())
|
2014-05-11 11:42:40 +04:00
|
|
|
}
|
2012-07-25 17:00:50 +04:00
|
|
|
|
2016-04-09 11:45:56 +03:00
|
|
|
// Receive returns a message as a Subscription, Message, Pong or error.
|
2018-07-24 09:41:14 +03:00
|
|
|
// See PubSub example for details. This is low-level API and in most cases
|
|
|
|
// Channel should be used instead.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
|
|
|
|
return c.ReceiveTimeout(ctx, 0)
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
2014-05-11 11:42:40 +04:00
|
|
|
|
2018-07-24 09:41:14 +03:00
|
|
|
// ReceiveMessage returns a Message or error ignoring Subscription and Pong
|
|
|
|
// messages. This is low-level API and in most cases Channel should be used
|
|
|
|
// instead.
|
2020-03-11 17:26:42 +03:00
|
|
|
func (c *PubSub) ReceiveMessage(ctx context.Context) (*Message, error) {
|
2015-09-06 13:50:16 +03:00
|
|
|
for {
|
2020-03-11 17:26:42 +03:00
|
|
|
msg, err := c.Receive(ctx)
|
2015-09-06 13:50:16 +03:00
|
|
|
if err != nil {
|
2018-07-23 15:55:13 +03:00
|
|
|
return nil, err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
switch msg := msg.(type) {
|
2015-09-06 13:50:16 +03:00
|
|
|
case *Subscription:
|
|
|
|
// Ignore.
|
|
|
|
case *Pong:
|
|
|
|
// Ignore.
|
|
|
|
case *Message:
|
|
|
|
return msg, nil
|
|
|
|
default:
|
2018-07-23 15:55:13 +03:00
|
|
|
err := fmt.Errorf("redis: unknown message: %T", msg)
|
|
|
|
return nil, err
|
2015-09-06 13:50:16 +03:00
|
|
|
}
|
|
|
|
}
|
2012-08-09 16:27:06 +04:00
|
|
|
}
|
2016-03-01 13:31:06 +03:00
|
|
|
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *PubSub) getContext() context.Context {
|
|
|
|
if c.cmd != nil {
|
|
|
|
return c.cmd.ctx
|
|
|
|
}
|
|
|
|
return context.Background()
|
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
2017-10-30 13:09:57 +03:00
|
|
|
// Channel returns a Go channel for concurrently receiving messages.
|
2019-07-01 17:21:32 +03:00
|
|
|
// The channel is closed together with the PubSub. If the Go channel
|
2023-10-30 15:35:02 +03:00
|
|
|
// is blocked full for 1 minute the message is dropped.
|
2019-07-01 17:21:32 +03:00
|
|
|
// Receive* APIs can not be used after channel is created.
|
2019-03-12 13:40:08 +03:00
|
|
|
//
|
2019-07-01 17:21:32 +03:00
|
|
|
// go-redis periodically sends ping messages to test connection health
|
2024-04-13 20:15:23 +03:00
|
|
|
// and re-subscribes if ping can not received for 1 minute.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
|
2019-07-01 17:21:32 +03:00
|
|
|
c.chOnce.Do(func() {
|
2021-05-26 06:25:18 +03:00
|
|
|
c.msgCh = newChannel(c, opts...)
|
|
|
|
c.msgCh.initMsgChan()
|
2019-07-01 17:21:32 +03:00
|
|
|
})
|
|
|
|
if c.msgCh == nil {
|
|
|
|
err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
|
|
|
|
panic(err)
|
|
|
|
}
|
2021-05-26 06:25:18 +03:00
|
|
|
return c.msgCh.msgCh
|
|
|
|
}
|
|
|
|
|
|
|
|
// ChannelSize is like Channel, but creates a Go channel
|
|
|
|
// with specified buffer size.
|
|
|
|
//
|
|
|
|
// Deprecated: use Channel(WithChannelSize(size)), remove in v9.
|
|
|
|
func (c *PubSub) ChannelSize(size int) <-chan *Message {
|
|
|
|
return c.Channel(WithChannelSize(size))
|
2019-03-12 13:48:32 +03:00
|
|
|
}
|
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
// ChannelWithSubscriptions is like Channel, but message type can be either
|
|
|
|
// *Subscription or *Message. Subscription messages can be used to detect
|
|
|
|
// reconnections.
|
|
|
|
//
|
|
|
|
// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
|
2022-06-04 16:20:10 +03:00
|
|
|
func (c *PubSub) ChannelWithSubscriptions(opts ...ChannelOption) <-chan interface{} {
|
2019-03-12 13:48:32 +03:00
|
|
|
c.chOnce.Do(func() {
|
2022-06-04 16:20:10 +03:00
|
|
|
c.allCh = newChannel(c, opts...)
|
2021-05-26 06:25:18 +03:00
|
|
|
c.allCh.initAllChan()
|
2019-03-12 13:48:32 +03:00
|
|
|
})
|
2019-07-01 17:21:32 +03:00
|
|
|
if c.allCh == nil {
|
|
|
|
err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
|
|
|
|
panic(err)
|
|
|
|
}
|
2021-05-26 06:25:18 +03:00
|
|
|
return c.allCh.allCh
|
|
|
|
}
|
|
|
|
|
|
|
|
type ChannelOption func(c *channel)
|
|
|
|
|
|
|
|
// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
|
|
|
|
//
|
|
|
|
// The default is 100 messages.
|
|
|
|
func WithChannelSize(size int) ChannelOption {
|
|
|
|
return func(c *channel) {
|
|
|
|
c.chanSize = size
|
2019-03-12 13:48:32 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
|
2021-05-26 06:25:18 +03:00
|
|
|
// WithChannelHealthCheckInterval specifies the health check interval.
|
|
|
|
// PubSub will ping Redis Server if it does not receive any messages within the interval.
|
|
|
|
// To disable health check, use zero interval.
|
|
|
|
//
|
|
|
|
// The default is 3 seconds.
|
|
|
|
func WithChannelHealthCheckInterval(d time.Duration) ChannelOption {
|
|
|
|
return func(c *channel) {
|
|
|
|
c.checkInterval = d
|
2020-07-18 09:04:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-26 15:12:25 +03:00
|
|
|
// WithChannelSendTimeout specifies the channel send timeout after which
|
2021-05-26 06:25:18 +03:00
|
|
|
// the message is dropped.
|
|
|
|
//
|
|
|
|
// The default is 60 seconds.
|
|
|
|
func WithChannelSendTimeout(d time.Duration) ChannelOption {
|
|
|
|
return func(c *channel) {
|
|
|
|
c.chanSendTimeout = d
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type channel struct {
|
|
|
|
pubSub *PubSub
|
|
|
|
|
|
|
|
msgCh chan *Message
|
|
|
|
allCh chan interface{}
|
|
|
|
ping chan struct{}
|
|
|
|
|
|
|
|
chanSize int
|
|
|
|
chanSendTimeout time.Duration
|
|
|
|
checkInterval time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
|
|
|
|
c := &channel{
|
|
|
|
pubSub: pubSub,
|
|
|
|
|
|
|
|
chanSize: 100,
|
|
|
|
chanSendTimeout: time.Minute,
|
|
|
|
checkInterval: 3 * time.Second,
|
|
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(c)
|
|
|
|
}
|
|
|
|
if c.checkInterval > 0 {
|
|
|
|
c.initHealthCheck()
|
|
|
|
}
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *channel) initHealthCheck() {
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx := context.TODO()
|
2019-03-12 13:40:08 +03:00
|
|
|
c.ping = make(chan struct{}, 1)
|
2021-05-26 06:25:18 +03:00
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
go func() {
|
2020-09-05 10:56:09 +03:00
|
|
|
timer := time.NewTimer(time.Minute)
|
2019-07-01 17:21:32 +03:00
|
|
|
timer.Stop()
|
2018-07-24 09:41:14 +03:00
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
for {
|
2021-05-26 06:25:18 +03:00
|
|
|
timer.Reset(c.checkInterval)
|
2019-07-01 17:21:32 +03:00
|
|
|
select {
|
|
|
|
case <-c.ping:
|
|
|
|
if !timer.Stop() {
|
|
|
|
<-timer.C
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
2021-05-26 06:25:18 +03:00
|
|
|
if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
|
|
|
|
c.pubSub.mu.Lock()
|
|
|
|
c.pubSub.reconnect(ctx, pingErr)
|
|
|
|
c.pubSub.mu.Unlock()
|
2019-07-01 17:21:32 +03:00
|
|
|
}
|
2021-05-26 06:25:18 +03:00
|
|
|
case <-c.pubSub.exit:
|
2019-07-01 17:21:32 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
// initMsgChan must be in sync with initAllChan.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *channel) initMsgChan() {
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx := context.TODO()
|
2021-05-26 06:25:18 +03:00
|
|
|
c.msgCh = make(chan *Message, c.chanSize)
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
go func() {
|
2020-09-05 10:56:09 +03:00
|
|
|
timer := time.NewTimer(time.Minute)
|
2019-03-12 13:40:08 +03:00
|
|
|
timer.Stop()
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
var errCount int
|
|
|
|
for {
|
2021-05-26 06:25:18 +03:00
|
|
|
msg, err := c.pubSub.Receive(ctx)
|
2018-07-23 15:55:13 +03:00
|
|
|
if err != nil {
|
|
|
|
if err == pool.ErrClosed {
|
2019-07-01 17:21:32 +03:00
|
|
|
close(c.msgCh)
|
2018-07-23 15:55:13 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
if errCount > 0 {
|
2020-09-17 12:54:48 +03:00
|
|
|
time.Sleep(100 * time.Millisecond)
|
2017-04-11 16:18:35 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
errCount++
|
|
|
|
continue
|
2017-04-11 16:18:35 +03:00
|
|
|
}
|
2019-03-12 13:40:08 +03:00
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
errCount = 0
|
2018-07-24 09:41:14 +03:00
|
|
|
|
|
|
|
// Any message is as good as a ping.
|
|
|
|
select {
|
|
|
|
case c.ping <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msg := msg.(type) {
|
|
|
|
case *Subscription:
|
|
|
|
// Ignore.
|
|
|
|
case *Pong:
|
|
|
|
// Ignore.
|
|
|
|
case *Message:
|
2021-05-26 06:25:18 +03:00
|
|
|
timer.Reset(c.chanSendTimeout)
|
2019-03-12 13:40:08 +03:00
|
|
|
select {
|
2019-07-01 17:21:32 +03:00
|
|
|
case c.msgCh <- msg:
|
2019-03-12 13:40:08 +03:00
|
|
|
if !timer.Stop() {
|
|
|
|
<-timer.C
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
2019-06-17 12:32:40 +03:00
|
|
|
internal.Logger.Printf(
|
2021-05-26 06:25:18 +03:00
|
|
|
ctx, "redis: %s channel is full for %s (message is dropped)",
|
|
|
|
c, c.chanSendTimeout)
|
2019-03-12 13:40:08 +03:00
|
|
|
}
|
2018-07-24 09:41:14 +03:00
|
|
|
default:
|
2021-05-26 06:25:18 +03:00
|
|
|
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
|
2018-07-24 09:41:14 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
}()
|
2019-07-01 17:21:32 +03:00
|
|
|
}
|
2018-07-23 15:55:13 +03:00
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
// initAllChan must be in sync with initMsgChan.
|
2021-05-26 06:25:18 +03:00
|
|
|
func (c *channel) initAllChan() {
|
2020-03-11 17:26:42 +03:00
|
|
|
ctx := context.TODO()
|
2021-05-26 06:25:18 +03:00
|
|
|
c.allCh = make(chan interface{}, c.chanSize)
|
|
|
|
|
2018-07-23 15:55:13 +03:00
|
|
|
go func() {
|
2021-05-26 06:25:18 +03:00
|
|
|
timer := time.NewTimer(time.Minute)
|
2018-07-23 15:55:13 +03:00
|
|
|
timer.Stop()
|
|
|
|
|
2019-07-01 17:21:32 +03:00
|
|
|
var errCount int
|
2018-07-23 15:55:13 +03:00
|
|
|
for {
|
2021-05-26 06:25:18 +03:00
|
|
|
msg, err := c.pubSub.Receive(ctx)
|
2019-07-01 17:21:32 +03:00
|
|
|
if err != nil {
|
|
|
|
if err == pool.ErrClosed {
|
|
|
|
close(c.allCh)
|
|
|
|
return
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
2019-07-01 17:21:32 +03:00
|
|
|
if errCount > 0 {
|
2020-09-17 12:54:48 +03:00
|
|
|
time.Sleep(100 * time.Millisecond)
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
2019-07-01 17:21:32 +03:00
|
|
|
errCount++
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
errCount = 0
|
|
|
|
|
|
|
|
// Any message is as good as a ping.
|
|
|
|
select {
|
|
|
|
case c.ping <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msg := msg.(type) {
|
|
|
|
case *Pong:
|
|
|
|
// Ignore.
|
2021-05-26 06:25:18 +03:00
|
|
|
case *Subscription, *Message:
|
|
|
|
timer.Reset(c.chanSendTimeout)
|
|
|
|
select {
|
|
|
|
case c.allCh <- msg:
|
|
|
|
if !timer.Stop() {
|
|
|
|
<-timer.C
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
|
|
|
internal.Logger.Printf(
|
|
|
|
ctx, "redis: %s channel is full for %s (message is dropped)",
|
|
|
|
c, c.chanSendTimeout)
|
|
|
|
}
|
2019-07-01 17:21:32 +03:00
|
|
|
default:
|
2021-05-26 06:25:18 +03:00
|
|
|
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
|
2018-07-23 15:55:13 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|