diff --git a/v2/pipeline.go b/v2/pipeline.go index a491b7d..e021d1f 100644 --- a/v2/pipeline.go +++ b/v2/pipeline.go @@ -3,6 +3,8 @@ package redis // Not thread-safe. type Pipeline struct { *Client + + closed bool } func (c *Client) Pipeline() *Pipeline { @@ -27,10 +29,14 @@ func (c *Client) Pipelined(f func(*Pipeline)) ([]Cmder, error) { } func (c *Pipeline) Close() error { + c.closed = true return nil } func (c *Pipeline) Discard() error { + if c.closed { + return errClosed + } c.cmds = c.cmds[:0] return nil } @@ -38,6 +44,10 @@ func (c *Pipeline) Discard() error { // Exec always returns list of commands and error of the first failed // command if any. func (c *Pipeline) Exec() ([]Cmder, error) { + if c.closed { + return nil, errClosed + } + cmds := c.cmds c.cmds = make([]Cmder, 0) diff --git a/v2/pool.go b/v2/pool.go index 10dbcb5..cb3ca59 100644 --- a/v2/pool.go +++ b/v2/pool.go @@ -12,7 +12,7 @@ import ( ) var ( - errPoolClosed = errors.New("attempt to use closed connection pool") + errClosed = errors.New("redis: client is closed") ) type pool interface { @@ -110,7 +110,7 @@ func (p *connPool) Get() (*conn, bool, error) { if p.closed { p.cond.L.Unlock() - return nil, false, errPoolClosed + return nil, false, errClosed } if p.idleTimeout > 0 { @@ -173,7 +173,7 @@ func (p *connPool) Put(cn *conn) error { p.cond.L.Lock() if p.closed { p.cond.L.Unlock() - return errPoolClosed + return errClosed } cn.inUse = false p.conns.MoveToFront(cn.elem) @@ -242,6 +242,8 @@ type singleConnPool struct { l sync.RWMutex cn *conn reusable bool + + closed bool } func newSingleConnPool(pool pool, cn *conn, reusable bool) *singleConnPool { @@ -254,6 +256,10 @@ func newSingleConnPool(pool pool, cn *conn, reusable bool) *singleConnPool { func (p *singleConnPool) Get() (*conn, bool, error) { p.l.RLock() + if p.closed { + p.l.RUnlock() + return nil, false, errClosed + } if p.cn != nil { p.l.RUnlock() return p.cn, false, nil @@ -276,6 +282,10 @@ func (p *singleConnPool) Put(cn *conn) error { if p.cn != cn { panic("p.cn != cn") } + if p.closed { + p.l.Unlock() + return errClosed + } p.l.Unlock() return nil } @@ -285,6 +295,10 @@ func (p *singleConnPool) Remove(cn *conn) error { if p.cn != cn { panic("p.cn != cn") } + if p.closed { + p.l.Unlock() + return errClosed + } p.cn = nil p.l.Unlock() return nil @@ -312,6 +326,11 @@ func (p *singleConnPool) Close() error { defer p.l.Unlock() p.l.Lock() + if p.closed { + return nil + } + p.closed = true + var err error if p.cn != nil { if p.reusable { diff --git a/v2/redis_test.go b/v2/redis_test.go index 269e0af..3193c62 100644 --- a/v2/redis_test.go +++ b/v2/redis_test.go @@ -27,45 +27,29 @@ func sortStrings(slice []string) []string { //------------------------------------------------------------------------------ -type RedisConnectionTest struct { - opt *redis.Options - client *redis.Client -} - -var _ = Suite(&RedisConnectionTest{}) - -func (t *RedisConnectionTest) SetUpTest(c *C) { - t.opt = &redis.Options{ - Addr: redisAddr, - } - t.client = redis.NewTCPClient(t.opt) -} - -func (t *RedisConnectionTest) TearDownTest(c *C) { - c.Assert(t.client.Close(), IsNil) -} - -func (t *RedisConnectionTest) TestShutdown(c *C) { - c.Skip("shutdowns server") - - shutdown := t.client.Shutdown() - c.Check(shutdown.Err(), Equals, io.EOF) - c.Check(shutdown.Val(), Equals, "") - - ping := t.client.Ping() - c.Check(ping.Err(), ErrorMatches, "dial tcp :[0-9]+: connection refused") - c.Check(ping.Val(), Equals, "") -} - -//------------------------------------------------------------------------------ - type RedisConnectorTest struct{} var _ = Suite(&RedisConnectorTest{}) +func (t *RedisConnectorTest) TestShutdown(c *C) { + c.Skip("shutdowns server") + + client := redis.NewTCPClient(&redis.Options{ + Addr: redisAddr, + }) + + shutdown := client.Shutdown() + c.Check(shutdown.Err(), Equals, io.EOF) + c.Check(shutdown.Val(), Equals, "") + + ping := client.Ping() + c.Check(ping.Err(), ErrorMatches, "dial tcp :[0-9]+: connection refused") + c.Check(ping.Val(), Equals, "") +} + func (t *RedisConnectorTest) TestNewTCPClient(c *C) { client := redis.NewTCPClient(&redis.Options{ - Addr: ":6379", + Addr: redisAddr, }) ping := client.Ping() c.Check(ping.Err(), IsNil) @@ -85,6 +69,75 @@ func (t *RedisConnectorTest) TestNewUnixClient(c *C) { c.Assert(client.Close(), IsNil) } +func (t *RedisConnectorTest) TestClose(c *C) { + client := redis.NewTCPClient(&redis.Options{ + Addr: redisAddr, + }) + c.Assert(client.Close(), IsNil) + + ping := client.Ping() + c.Assert(ping.Err(), Not(IsNil)) + c.Assert(ping.Err().Error(), Equals, "redis: client is closed") + + c.Assert(client.Close(), IsNil) +} + +func (t *RedisConnectorTest) TestPubSubClose(c *C) { + client := redis.NewTCPClient(&redis.Options{ + Addr: redisAddr, + }) + + pubsub := client.PubSub() + c.Assert(pubsub.Close(), IsNil) + + _, err := pubsub.Receive() + c.Assert(err, Not(IsNil)) + c.Assert(err.Error(), Equals, "redis: client is closed") + + ping := client.Ping() + c.Assert(ping.Err(), IsNil) + + c.Assert(client.Close(), IsNil) +} + +func (t *RedisConnectorTest) TestMultiClose(c *C) { + client := redis.NewTCPClient(&redis.Options{ + Addr: redisAddr, + }) + + multi := client.Multi() + c.Assert(multi.Close(), IsNil) + + _, err := multi.Exec(func() { + multi.Ping() + }) + c.Assert(err, Not(IsNil)) + c.Assert(err.Error(), Equals, "redis: client is closed") + + ping := client.Ping() + c.Assert(ping.Err(), IsNil) + + c.Assert(client.Close(), IsNil) +} + +func (t *RedisConnectorTest) TestPipelineClose(c *C) { + client := redis.NewTCPClient(&redis.Options{ + Addr: redisAddr, + }) + + _, err := client.Pipelined(func(pipeline *redis.Pipeline) { + c.Assert(pipeline.Close(), IsNil) + pipeline.Ping() + }) + c.Assert(err, Not(IsNil)) + c.Assert(err.Error(), Equals, "redis: client is closed") + + ping := client.Ping() + c.Assert(ping.Err(), IsNil) + + c.Assert(client.Close(), IsNil) +} + //------------------------------------------------------------------------------ type RedisConnPoolTest struct {