package redis import ( "fmt" "net" "sync" "time" "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" ) // PubSub implements Pub/Sub commands as described in // http://redis.io/topics/pubsub. It's NOT safe for concurrent use by // multiple goroutines. type PubSub struct { base baseClient mu sync.Mutex cn *pool.Conn closed bool cmd *Cmd subMu sync.Mutex channels []string patterns []string } func (c *PubSub) conn() (*pool.Conn, error) { cn, isNew, err := c._conn() if err != nil { return nil, err } if isNew { if err := c.resubscribe(); err != nil { internal.Logf("resubscribe failed: %s", err) } } return cn, nil } func (c *PubSub) resubscribe() error { c.subMu.Lock() channels := c.channels patterns := c.patterns c.subMu.Unlock() var firstErr error if len(channels) > 0 { if err := c.subscribe("subscribe", channels...); err != nil && firstErr == nil { firstErr = err } } if len(patterns) > 0 { if err := c.subscribe("psubscribe", patterns...); err != nil && firstErr == nil { firstErr = err } } return firstErr } func (c *PubSub) _conn() (*pool.Conn, bool, error) { c.mu.Lock() defer c.mu.Unlock() if c.closed { return nil, false, pool.ErrClosed } if c.cn != nil { return c.cn, false, nil } cn, err := c.base.connPool.NewConn() if err != nil { return nil, false, err } c.cn = cn return cn, true, nil } func (c *PubSub) putConn(cn *pool.Conn, err error) { if internal.IsBadConn(err, true) { c.mu.Lock() if c.cn == cn { _ = c.closeConn() } c.mu.Unlock() } } func (c *PubSub) subscribe(redisCmd string, channels ...string) error { args := make([]interface{}, 1+len(channels)) args[0] = redisCmd for i, channel := range channels { args[1+i] = channel } cmd := NewSliceCmd(args...) cn, isNew, err := c._conn() if err != nil { return err } if isNew { return c.resubscribe() } cn.SetWriteTimeout(c.base.opt.WriteTimeout) err = writeCmd(cn, cmd) c.putConn(cn, err) return err } // Subscribes the client to the specified channels. func (c *PubSub) Subscribe(channels ...string) error { c.subMu.Lock() c.channels = appendIfNotExists(c.channels, channels...) c.subMu.Unlock() return c.subscribe("subscribe", channels...) } // Subscribes the client to the given patterns. func (c *PubSub) PSubscribe(patterns ...string) error { c.subMu.Lock() c.patterns = appendIfNotExists(c.patterns, patterns...) c.subMu.Unlock() return c.subscribe("psubscribe", patterns...) } // Unsubscribes the client from the given channels, or from all of // them if none is given. func (c *PubSub) Unsubscribe(channels ...string) error { c.subMu.Lock() c.channels = remove(c.channels, channels...) c.subMu.Unlock() return c.subscribe("unsubscribe", channels...) } // Unsubscribes the client from the given patterns, or from all of // them if none is given. func (c *PubSub) PUnsubscribe(patterns ...string) error { c.subMu.Lock() c.patterns = remove(c.patterns, patterns...) c.subMu.Unlock() return c.subscribe("punsubscribe", patterns...) } func (c *PubSub) Close() error { c.mu.Lock() defer c.mu.Unlock() if c.closed { return pool.ErrClosed } c.closed = true if c.cn != nil { _ = c.closeConn() } return nil } func (c *PubSub) closeConn() error { err := c.base.connPool.CloseConn(c.cn) c.cn = nil return err } func (c *PubSub) Ping(payload ...string) error { args := []interface{}{"ping"} if len(payload) == 1 { args = append(args, payload[0]) } cmd := NewCmd(args...) cn, err := c.conn() if err != nil { return err } cn.SetWriteTimeout(c.base.opt.WriteTimeout) err = writeCmd(cn, cmd) c.putConn(cn, err) return err } // Message received after a successful subscription to channel. type Subscription struct { // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". Kind string // Channel name we have subscribed to. Channel string // Number of channels we are currently subscribed to. Count int } func (m *Subscription) String() string { return fmt.Sprintf("%s: %s", m.Kind, m.Channel) } // Message received as result of a PUBLISH command issued by another client. type Message struct { Channel string Pattern string Payload string } func (m *Message) String() string { return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload) } // Pong received as result of a PING command issued by another client. type Pong struct { Payload string } func (p *Pong) String() string { if p.Payload != "" { return fmt.Sprintf("Pong<%s>", p.Payload) } return "Pong" } func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { switch reply := reply.(type) { case string: return &Pong{ Payload: reply, }, nil case []interface{}: switch kind := reply[0].(string); kind { case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": return &Subscription{ Kind: kind, Channel: reply[1].(string), Count: int(reply[2].(int64)), }, nil case "message": return &Message{ Channel: reply[1].(string), Payload: reply[2].(string), }, nil case "pmessage": return &Message{ Pattern: reply[1].(string), Channel: reply[2].(string), Payload: reply[3].(string), }, nil case "pong": return &Pong{ Payload: reply[1].(string), }, nil default: return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind) } default: return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply) } } // ReceiveTimeout acts like Receive but returns an error if message // is not received in time. This is low-level API and most clients // should use ReceiveMessage. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { if c.cmd == nil { c.cmd = NewCmd() } cn, err := c.conn() if err != nil { return nil, err } cn.SetReadTimeout(timeout) err = c.cmd.readReply(cn) c.putConn(cn, err) if err != nil { return nil, err } return c.newMessage(c.cmd.Val()) } // Receive returns a message as a Subscription, Message, Pong or error. // See PubSub example for details. This is low-level API and most clients // should use ReceiveMessage. func (c *PubSub) Receive() (interface{}, error) { return c.ReceiveTimeout(0) } // ReceiveMessage returns a Message or error ignoring Subscription or Pong // messages. It automatically reconnects to Redis Server and resubscribes // to channels in case of network errors. func (c *PubSub) ReceiveMessage() (*Message, error) { return c.receiveMessage(5 * time.Second) } func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) { var errNum uint for { msgi, err := c.ReceiveTimeout(timeout) if err != nil { if !internal.IsNetworkError(err) { return nil, err } errNum++ if errNum < 3 { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { err := c.Ping() if err != nil { internal.Logf("PubSub.Ping failed: %s", err) } } } else { // 3 consequent errors - connection is broken or // Redis Server is down. // Sleep to not exceed max number of open connections. time.Sleep(time.Second) } continue } // Reset error number, because we received a message. errNum = 0 switch msg := msgi.(type) { case *Subscription: // Ignore. case *Pong: // Ignore. case *Message: return msg, nil default: return nil, fmt.Errorf("redis: unknown message: %T", msgi) } } } // Channel returns a channel for concurrently receiving messages. // The channel is closed with PubSub. func (c *PubSub) Channel() <-chan *Message { ch := make(chan *Message, 100) go func() { for { msg, err := c.ReceiveMessage() if err != nil { if err == pool.ErrClosed { break } continue } ch <- msg } close(ch) }() return ch } func appendIfNotExists(ss []string, es ...string) []string { loop: for _, e := range es { for _, s := range ss { if s == e { continue loop } } ss = append(ss, e) } return ss } func remove(ss []string, es ...string) []string { if len(es) == 0 { return ss[:0] } for _, e := range es { for i, s := range ss { if s == e { ss = append(ss[:i], ss[i+1:]...) break } } } return ss }