diff --git a/ring.go b/ring.go index eb5add1..69715b6 100644 --- a/ring.go +++ b/ring.go @@ -177,6 +177,34 @@ func (c *Ring) PoolStats() *PoolStats { return &acc } +// Subscribe subscribes the client to the specified channels. +func (c *Ring) Subscribe(channels ...string) *PubSub { + if len(channels) == 0 { + panic("at least one channel is required") + } + + shard, err := c.shardByKey(channels[0]) + if err != nil { + // TODO: return PubSub with sticky error + panic(err) + } + return shard.Client.Subscribe(channels...) +} + +// PSubscribe subscribes the client to the given patterns. +func (c *Ring) PSubscribe(channels ...string) *PubSub { + if len(channels) == 0 { + panic("at least one channel is required") + } + + shard, err := c.shardByKey(channels[0]) + if err != nil { + // TODO: return PubSub with sticky error + panic(err) + } + return shard.Client.PSubscribe(channels...) +} + // ForEachShard concurrently calls the fn on each live shard in the ring. // It returns the first error if any. func (c *Ring) ForEachShard(fn func(client *Client) error) error {