Reuse connections to Redis during tests.

This commit is contained in:
Vladimir Mihailenco 2012-08-19 23:59:52 +03:00
parent a654224ced
commit 9764065750
3 changed files with 61 additions and 23 deletions

View File

@ -19,6 +19,11 @@ func (c *Client) MultiClient() (*MultiClient, error) {
}, nil }, nil
} }
func (c *MultiClient) Close() error {
c.Unwatch()
return c.Client.Close()
}
func (c *MultiClient) Watch(keys ...string) *StatusReq { func (c *MultiClient) Watch(keys ...string) *StatusReq {
args := append([]string{"WATCH"}, keys...) args := append([]string{"WATCH"}, keys...)
req := NewStatusReq(args...) req := NewStatusReq(args...)

View File

@ -41,13 +41,18 @@ func (c *PubSubClient) consumeMessages(conn *Conn) {
for { for {
msg := &Message{} msg := &Message{}
replyI, err := req.ParseReply(conn.Rd) replyIface, err := req.ParseReply(conn.Rd)
if err != nil { if err != nil {
msg.Err = err msg.Err = err
c.ch <- msg c.ch <- msg
break break
} }
reply := replyI.([]interface{}) reply, ok := replyIface.([]interface{})
if !ok {
msg.Err = fmt.Errorf("redis: unexpected reply type %T", replyIface)
c.ch <- msg
return
}
msgName := reply[0].(string) msgName := reply[0].(string)
switch msgName { switch msgName {

View File

@ -39,33 +39,50 @@ func sortStrings(slice []string) []string {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (t *RedisTest) SetUpTest(c *C) { func (t *RedisTest) SetUpTest(c *C) {
if t.client == nil {
openConn := func() (io.ReadWriteCloser, error) {
t.openedConnCount++
return net.Dial("tcp", redisAddr)
}
initConn := func(c *redis.Client) error {
t.initedConnCount++
return nil
}
closeConn := func(conn io.ReadWriteCloser) error {
t.closedConnCount++
return nil
}
t.client = redis.NewClient(openConn, closeConn, initConn)
t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10
}
t.openedConnCount = 0 t.openedConnCount = 0
openConn := func() (io.ReadWriteCloser, error) {
t.openedConnCount++
return net.Dial("tcp", redisAddr)
}
t.closedConnCount = 0 t.closedConnCount = 0
closeConn := func(conn io.ReadWriteCloser) error {
t.closedConnCount++
return nil
}
t.initedConnCount = 0 t.initedConnCount = 0
initConn := func(c *redis.Client) error { t.resetRedis(c)
t.initedConnCount++
return nil
}
t.client = redis.NewClient(openConn, closeConn, initConn)
t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10
c.Assert(t.client.FlushDb().Err(), IsNil)
} }
func (t *RedisTest) TearDownTest(c *C) { func (t *RedisTest) TearDownTest(c *C) {
c.Assert(t.client.FlushDb().Err(), IsNil) t.resetRedis(c)
c.Assert(t.client.Close(), IsNil)
c.Assert(t.openedConnCount, Equals, t.closedConnCount)
c.Assert(t.openedConnCount, Equals, t.initedConnCount) c.Assert(t.openedConnCount, Equals, t.initedConnCount)
} }
func (t *RedisTest) resetRedis(c *C) {
c.Assert(t.client.Select(1).Err(), IsNil)
c.Assert(t.client.FlushDb().Err(), IsNil)
c.Assert(t.client.Select(0).Err(), IsNil)
c.Assert(t.client.FlushDb().Err(), IsNil)
}
func (t *RedisTest) resetClient(c *C) {
c.Assert(t.client.Close(), IsNil)
t.openedConnCount = 0
t.initedConnCount = 0
t.closedConnCount = 0
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (t *RedisTest) TestRunWithouthCheckingErrVal(c *C) { func (t *RedisTest) TestRunWithouthCheckingErrVal(c *C) {
@ -106,6 +123,8 @@ func (t *RedisTest) TestGetBigVal(c *C) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (t *RedisTest) TestConnPoolMaxCap(c *C) { func (t *RedisTest) TestConnPoolMaxCap(c *C) {
t.resetClient(c)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
wg.Add(1) wg.Add(1)
@ -124,6 +143,8 @@ func (t *RedisTest) TestConnPoolMaxCap(c *C) {
} }
func (t *RedisTest) TestConnPoolMaxCapOnPipelineClient(c *C) { func (t *RedisTest) TestConnPoolMaxCapOnPipelineClient(c *C) {
t.resetClient(c)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
wg.Add(1) wg.Add(1)
@ -151,6 +172,8 @@ func (t *RedisTest) TestConnPoolMaxCapOnPipelineClient(c *C) {
} }
func (t *RedisTest) TestConnPoolMaxCapOnMultiClient(c *C) { func (t *RedisTest) TestConnPoolMaxCapOnMultiClient(c *C) {
t.resetClient(c)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
wg.Add(1) wg.Add(1)
@ -180,6 +203,8 @@ func (t *RedisTest) TestConnPoolMaxCapOnMultiClient(c *C) {
} }
func (t *RedisTest) TestConnPoolMaxCapOnPubSubClient(c *C) { func (t *RedisTest) TestConnPoolMaxCapOnPubSubClient(c *C) {
t.resetClient(c)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
wg.Add(1) wg.Add(1)
@ -2477,9 +2502,12 @@ func (t *RedisTest) TestCmdBgRewriteAOF(c *C) {
} }
func (t *RedisTest) TestCmdBgSave(c *C) { func (t *RedisTest) TestCmdBgSave(c *C) {
// workaround for "ERR Can't BGSAVE while AOF log rewriting is in progress"
time.Sleep(time.Second)
r := t.client.BgSave() r := t.client.BgSave()
c.Assert(r.Err(), ErrorMatches, "ERR Can't BGSAVE while AOF log rewriting is in progress") c.Assert(r.Err(), IsNil)
c.Assert(r.Val(), Equals, "") c.Assert(r.Val(), Equals, "Background saving started")
} }
func (t *RedisTest) TestCmdClientKill(c *C) { func (t *RedisTest) TestCmdClientKill(c *C) {
@ -2662,7 +2690,7 @@ func (t *RedisTest) BenchmarkRedisMGet(c *C) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
mGet := t.client.MGet("key1", "key2") mGet := t.client.MGet("key1", "key2")
c.Assert(mGet.Err(), IsNil) c.Assert(mGet.Err(), IsNil)
c.Assert(mGet.Val(), DeepEquals, []string{"hello1", "hello2"}) c.Assert(mGet.Val(), DeepEquals, []interface{}{"hello1", "hello2"})
} }
c.StartTimer() c.StartTimer()