diff --git a/v2/example_test.go b/v2/example_test.go index 75af13c..190a6e5 100644 --- a/v2/example_test.go +++ b/v2/example_test.go @@ -112,18 +112,19 @@ func ExamplePubSub() { }) defer client.Close() - pubsub, err := client.PubSubClient() + pubsub := client.PubSub() defer pubsub.Close() - err = pubsub.Subscribe("mychannel") + err := pubsub.Subscribe("mychannel") + _ = err - msg, err := pubsub.Receive(0) + msg, err := pubsub.Receive() fmt.Println(msg, err) pub := client.Publish("mychannel", "hello") _ = pub.Err() - msg, err = pubsub.Receive(0) + msg, err = pubsub.Receive() fmt.Println(msg, err) // Output: &{subscribe mychannel 1} diff --git a/v2/pubsub.go b/v2/pubsub.go index 5280fd4..290fe99 100644 --- a/v2/pubsub.go +++ b/v2/pubsub.go @@ -5,17 +5,17 @@ import ( "time" ) -type PubSubClient struct { +type PubSub struct { *baseClient } -func (c *Client) PubSubClient() (*PubSubClient, error) { - return &PubSubClient{ +func (c *Client) PubSub() *PubSub { + return &PubSub{ baseClient: &baseClient{ opt: c.opt, connPool: newSingleConnPool(c.connPool, nil, false), }, - }, nil + } } func (c *Client) Publish(channel, message string) *IntReq { @@ -41,7 +41,11 @@ type Subscription struct { Count int } -func (c *PubSubClient) Receive(timeout time.Duration) (interface{}, error) { +func (c *PubSub) Receive() (interface{}, error) { + return c.ReceiveTimeout(0) +} + +func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { cn, err := c.conn() if err != nil { return nil, err @@ -80,7 +84,7 @@ func (c *PubSubClient) Receive(timeout time.Duration) (interface{}, error) { } } -func (c *PubSubClient) subscribe(cmd string, channels ...string) error { +func (c *PubSub) subscribe(cmd string, channels ...string) error { cn, err := c.conn() if err != nil { return err @@ -91,15 +95,15 @@ func (c *PubSubClient) subscribe(cmd string, channels ...string) error { return c.writeReq(cn, req) } -func (c *PubSubClient) Subscribe(channels ...string) error { +func (c *PubSub) Subscribe(channels ...string) error { return c.subscribe("SUBSCRIBE", channels...) } -func (c *PubSubClient) PSubscribe(patterns ...string) error { +func (c *PubSub) PSubscribe(patterns ...string) error { return c.subscribe("PSUBSCRIBE", patterns...) } -func (c *PubSubClient) unsubscribe(cmd string, channels ...string) error { +func (c *PubSub) unsubscribe(cmd string, channels ...string) error { cn, err := c.conn() if err != nil { return err @@ -110,10 +114,10 @@ func (c *PubSubClient) unsubscribe(cmd string, channels ...string) error { return c.writeReq(cn, req) } -func (c *PubSubClient) Unsubscribe(channels ...string) error { +func (c *PubSub) Unsubscribe(channels ...string) error { return c.unsubscribe("UNSUBSCRIBE", channels...) } -func (c *PubSubClient) PUnsubscribe(patterns ...string) error { +func (c *PubSub) PUnsubscribe(patterns ...string) error { return c.unsubscribe("PUNSUBSCRIBE", patterns...) } diff --git a/v2/redis_test.go b/v2/redis_test.go index 7336008..88a2f78 100644 --- a/v2/redis_test.go +++ b/v2/redis_test.go @@ -202,12 +202,12 @@ func (t *RedisConnectorTest) TestUnixConnector(c *C) { // c.Assert(t.closedConns, Equals, int64(10)) // } -// func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnPubSubClient(c *C) { +// func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnPubSub(c *C) { // wg := &sync.WaitGroup{} // for i := 0; i < 1000; i++ { // wg.Add(1) // go func() { -// pubsub, err := t.client.PubSubClient() +// pubsub, err := t.client.PubSub() // c.Assert(err, IsNil) // _, err = pubsub.Subscribe() @@ -2262,8 +2262,7 @@ func (t *RedisTest) TestZUnionStore(c *C) { //------------------------------------------------------------------------------ func (t *RedisTest) TestPatternPubSub(c *C) { - pubsub, err := t.client.PubSubClient() - c.Assert(err, IsNil) + pubsub := t.client.PubSub() defer func() { c.Assert(pubsub.Close(), IsNil) }() @@ -2277,7 +2276,7 @@ func (t *RedisTest) TestPatternPubSub(c *C) { c.Assert(pubsub.PUnsubscribe("mychannel*"), IsNil) { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Subscription) c.Assert(subscr.Kind, Equals, "psubscribe") @@ -2286,7 +2285,7 @@ func (t *RedisTest) TestPatternPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.PMessage) c.Assert(subscr.Channel, Equals, "mychannel1") @@ -2295,7 +2294,7 @@ func (t *RedisTest) TestPatternPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Subscription) c.Assert(subscr.Kind, Equals, "punsubscribe") @@ -2304,15 +2303,14 @@ func (t *RedisTest) TestPatternPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err.(net.Error).Timeout(), Equals, true) c.Assert(msgi, IsNil) } } func (t *RedisTest) TestPubSub(c *C) { - pubsub, err := t.client.PubSubClient() - c.Assert(err, IsNil) + pubsub := t.client.PubSub() defer func() { c.Assert(pubsub.Close(), IsNil) }() @@ -2330,7 +2328,7 @@ func (t *RedisTest) TestPubSub(c *C) { c.Assert(pubsub.Unsubscribe("mychannel", "mychannel2"), IsNil) { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Subscription) c.Assert(subscr.Kind, Equals, "subscribe") @@ -2339,7 +2337,7 @@ func (t *RedisTest) TestPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Subscription) c.Assert(subscr.Kind, Equals, "subscribe") @@ -2348,7 +2346,7 @@ func (t *RedisTest) TestPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Message) c.Assert(subscr.Channel, Equals, "mychannel") @@ -2356,7 +2354,7 @@ func (t *RedisTest) TestPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) msg := msgi.(*redis.Message) c.Assert(msg.Channel, Equals, "mychannel2") @@ -2364,7 +2362,7 @@ func (t *RedisTest) TestPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Subscription) c.Assert(subscr.Kind, Equals, "unsubscribe") @@ -2373,7 +2371,7 @@ func (t *RedisTest) TestPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err, IsNil) subscr := msgi.(*redis.Subscription) c.Assert(subscr.Kind, Equals, "unsubscribe") @@ -2382,7 +2380,7 @@ func (t *RedisTest) TestPubSub(c *C) { } { - msgi, err := pubsub.Receive(time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) c.Assert(err.(net.Error).Timeout(), Equals, true) c.Assert(msgi, IsNil) }