diff --git a/parser.go b/parser.go index ab2a4bce..4a2a20c6 100644 --- a/parser.go +++ b/parser.go @@ -22,6 +22,16 @@ var ( //------------------------------------------------------------------------------ +type parserError struct { + err error +} + +func (e *parserError) Error() string { + return e.err.Error() +} + +//------------------------------------------------------------------------------ + func appendReq(buf []byte, args []string) []byte { buf = append(buf, '*') buf = strconv.AppendUint(buf, uint64(len(args)), 10) @@ -139,7 +149,7 @@ func parseBoolSliceReply(rd reader) (interface{}, error) { func _parseReply(rd reader, multiBulkType int) (interface{}, error) { line, err := readLine(rd) if err != nil { - return 0, err + return 0, &parserError{err} } switch line[0] { @@ -148,7 +158,11 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { case '+': return string(line[1:]), nil case ':': - return strconv.ParseInt(string(line[1:]), 10, 64) + v, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + return 0, &parserError{err} + } + return v, nil case '$': if len(line) == 3 && line[1] == '-' && line[2] == '1' { return "", Nil @@ -156,13 +170,13 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { replyLenInt32, err := strconv.ParseInt(string(line[1:]), 10, 32) if err != nil { - return "", err + return "", &parserError{err} } replyLen := int(replyLenInt32) + 2 line, err = readN(rd, replyLen) if err != nil { - return "", err + return "", &parserError{err} } return string(line[:len(line)-2]), nil case '*': @@ -170,13 +184,13 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { return nil, Nil } + numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + return nil, &parserError{err} + } + switch multiBulkType { case stringSlice: - numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64) - if err != nil { - return nil, err - } - vals := make([]string, 0, numReplies) for i := int64(0); i < numReplies; i++ { v, err := parseReply(rd) @@ -189,11 +203,6 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { return vals, nil case boolSlice: - numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64) - if err != nil { - return nil, err - } - vals := make([]bool, 0, numReplies) for i := int64(0); i < numReplies; i++ { v, err := parseReply(rd) @@ -206,11 +215,6 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { return vals, nil default: - numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64) - if err != nil { - return nil, err - } - vals := make([]interface{}, 0, numReplies) for i := int64(0); i < numReplies; i++ { v, err := parseReply(rd) @@ -226,7 +230,7 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { return vals, nil } default: - return nil, fmt.Errorf("redis: can't parse %q", line) + return nil, &parserError{fmt.Errorf("redis: can't parse %q", line)} } panic("not reachable") } diff --git a/redis.go b/redis.go index 56cae7e5..0a1e3d45 100644 --- a/redis.go +++ b/redis.go @@ -119,14 +119,14 @@ func (c *BaseClient) Run(req Req) { val, err := req.ParseReply(conn.Rd) if err != nil { - if err == Nil { - if err := c.ConnPool.Add(conn); err != nil { - Logger.Printf("ConnPool.Add error: %v", err) - } - } else { + if _, ok := err.(*parserError); ok { if err := c.ConnPool.Remove(conn); err != nil { Logger.Printf("ConnPool.Remove error: %v", err) } + } else { + if err := c.ConnPool.Add(conn); err != nil { + Logger.Printf("ConnPool.Add error: %v", err) + } } req.SetErr(err) return diff --git a/redis_test.go b/redis_test.go index 682b8723..1c3c8a99 100644 --- a/redis_test.go +++ b/redis_test.go @@ -52,11 +52,157 @@ func (t *RedisShutdownTest) TestShutdown(c *C) { //------------------------------------------------------------------------------ -type RedisTest struct { +type RedisConnPoolTest struct { openedConnCount, closedConnCount, initedConnCount int64 client *redis.Client } +var _ = Suite(&RedisConnPoolTest{}) + +func (t *RedisConnPoolTest) SetUpTest(c *C) { + if t.client == nil { + openConn := func() (net.Conn, error) { + t.openedConnCount++ + return net.Dial("tcp", redisAddr) + } + initConn := func(c *redis.Client) error { + t.initedConnCount++ + return nil + } + closeConn := func(conn net.Conn) error { + t.closedConnCount++ + return nil + } + + t.client = redis.NewClient(openConn, closeConn, initConn) + t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10 + } +} + +func (t *RedisConnPoolTest) TearDownTest(c *C) { + t.resetRedis(c) + t.resetClient(c) +} + +func (t *RedisConnPoolTest) resetRedis(c *C) { + // This is much faster than Flushall. + c.Assert(t.client.Select(1).Err(), IsNil) + c.Assert(t.client.FlushDb().Err(), IsNil) + c.Assert(t.client.Select(0).Err(), IsNil) + c.Assert(t.client.FlushDb().Err(), IsNil) +} + +func (t *RedisConnPoolTest) resetClient(c *C) { + t.client.Close() + c.Check(t.closedConnCount, Equals, t.openedConnCount) + c.Check(t.initedConnCount, Equals, t.openedConnCount) + t.openedConnCount, t.closedConnCount, t.initedConnCount = 0, 0, 0 +} + +func (t *RedisConnPoolTest) TestConnPoolMaxCap(c *C) { + wg := &sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + ping := t.client.Ping() + c.Assert(ping.Err(), IsNil) + c.Assert(ping.Val(), Equals, "PONG") + wg.Done() + }() + } + wg.Wait() + + c.Assert(t.client.Close(), IsNil) + c.Assert(t.openedConnCount, Equals, int64(10)) + c.Assert(t.closedConnCount, Equals, int64(10)) +} + +func (t *RedisConnPoolTest) TestConnPoolMaxCapOnPipelineClient(c *C) { + wg := &sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + pipeline, err := t.client.PipelineClient() + c.Assert(err, IsNil) + + ping := pipeline.Ping() + reqs, err := pipeline.RunQueued() + c.Assert(err, IsNil) + c.Assert(reqs, HasLen, 1) + c.Assert(ping.Err(), IsNil) + c.Assert(ping.Val(), Equals, "PONG") + + c.Assert(pipeline.Close(), IsNil) + + wg.Done() + }() + } + wg.Wait() + + c.Assert(t.client.Close(), IsNil) + c.Assert(t.openedConnCount, Equals, int64(10)) + c.Assert(t.closedConnCount, Equals, int64(10)) +} + +func (t *RedisConnPoolTest) TestConnPoolMaxCapOnMultiClient(c *C) { + wg := &sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + multi, err := t.client.MultiClient() + c.Assert(err, IsNil) + + var ping *redis.StatusReq + reqs, err := multi.Exec(func() { + ping = multi.Ping() + }) + c.Assert(err, IsNil) + c.Assert(reqs, HasLen, 1) + c.Assert(ping.Err(), IsNil) + c.Assert(ping.Val(), Equals, "PONG") + + c.Assert(multi.Close(), IsNil) + + wg.Done() + }() + } + wg.Wait() + + c.Assert(t.client.Close(), IsNil) + c.Assert(t.openedConnCount, Equals, int64(10)) + c.Assert(t.closedConnCount, Equals, int64(10)) +} + +func (t *RedisConnPoolTest) TestConnPoolMaxCapOnPubSubClient(c *C) { + wg := &sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + pubsub, err := t.client.PubSubClient() + c.Assert(err, IsNil) + + _, err = pubsub.Subscribe() + c.Assert(err, IsNil) + + c.Assert(pubsub.Close(), IsNil) + + wg.Done() + }() + } + wg.Wait() + + c.Assert(t.client.Close(), IsNil) + c.Assert(t.openedConnCount, Equals, int64(1000)) + c.Assert(t.closedConnCount, Equals, int64(1000)) +} + +//------------------------------------------------------------------------------ + +type RedisTest struct { + openedConnCount, closedConnCount, initedConnCount int + client *redis.Client +} + var _ = Suite(&RedisTest{}) func Test(t *testing.T) { TestingT(t) } @@ -79,33 +225,28 @@ func (t *RedisTest) SetUpTest(c *C) { t.client = redis.NewClient(openConn, closeConn, initConn) t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10 } - - t.openedConnCount = 0 - t.closedConnCount = 0 - t.initedConnCount = 0 - t.resetRedis(c) } func (t *RedisTest) TearDownTest(c *C) { - t.resetRedis(c) + // Assert that all connections are in pool. + c.Assert( + t.client.ConnPool.(*redis.MultiConnPool).Len(), + Equals, + t.openedConnCount-t.closedConnCount, + ) + c.Assert(t.openedConnCount, Equals, t.initedConnCount) + t.resetRedis(c) } func (t *RedisTest) resetRedis(c *C) { + // This is much faster than Flushall. c.Assert(t.client.Select(1).Err(), IsNil) c.Assert(t.client.FlushDb().Err(), IsNil) - c.Assert(t.client.Select(0).Err(), IsNil) c.Assert(t.client.FlushDb().Err(), IsNil) } -func (t *RedisTest) resetClient(c *C) { - c.Assert(t.client.Close(), IsNil) - t.openedConnCount = 0 - t.initedConnCount = 0 - t.closedConnCount = 0 -} - //------------------------------------------------------------------------------ func (t *RedisTest) TestRunWithouthCheckingErrVal(c *C) { @@ -145,111 +286,6 @@ func (t *RedisTest) TestGetBigVal(c *C) { //------------------------------------------------------------------------------ -func (t *RedisTest) TestConnPoolMaxCap(c *C) { - t.resetClient(c) - - wg := &sync.WaitGroup{} - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - ping := t.client.Ping() - c.Assert(ping.Err(), IsNil) - c.Assert(ping.Val(), Equals, "PONG") - wg.Done() - }() - } - wg.Wait() - - c.Assert(t.client.Close(), IsNil) - c.Assert(t.openedConnCount, Equals, int64(10)) - c.Assert(t.closedConnCount, Equals, int64(10)) -} - -func (t *RedisTest) TestConnPoolMaxCapOnPipelineClient(c *C) { - t.resetClient(c) - - wg := &sync.WaitGroup{} - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - pipeline, err := t.client.PipelineClient() - c.Assert(err, IsNil) - - ping := pipeline.Ping() - reqs, err := pipeline.RunQueued() - c.Assert(err, IsNil) - c.Assert(reqs, HasLen, 1) - c.Assert(ping.Err(), IsNil) - c.Assert(ping.Val(), Equals, "PONG") - - c.Assert(pipeline.Close(), IsNil) - - wg.Done() - }() - } - wg.Wait() - - c.Assert(t.client.Close(), IsNil) - c.Assert(t.openedConnCount, Equals, int64(10)) - c.Assert(t.closedConnCount, Equals, int64(10)) -} - -func (t *RedisTest) TestConnPoolMaxCapOnMultiClient(c *C) { - t.resetClient(c) - - wg := &sync.WaitGroup{} - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - multi, err := t.client.MultiClient() - c.Assert(err, IsNil) - - var ping *redis.StatusReq - reqs, err := multi.Exec(func() { - ping = multi.Ping() - }) - c.Assert(err, IsNil) - c.Assert(reqs, HasLen, 1) - c.Assert(ping.Err(), IsNil) - c.Assert(ping.Val(), Equals, "PONG") - - c.Assert(multi.Close(), IsNil) - - wg.Done() - }() - } - wg.Wait() - - c.Assert(t.client.Close(), IsNil) - c.Assert(t.openedConnCount, Equals, int64(10)) - c.Assert(t.closedConnCount, Equals, int64(10)) -} - -func (t *RedisTest) TestConnPoolMaxCapOnPubSubClient(c *C) { - t.resetClient(c) - - wg := &sync.WaitGroup{} - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - pubsub, err := t.client.PubSubClient() - c.Assert(err, IsNil) - - _, err = pubsub.Subscribe() - c.Assert(err, IsNil) - - c.Assert(pubsub.Close(), IsNil) - - wg.Done() - }() - } - wg.Wait() - - c.Assert(t.client.Close(), IsNil) - c.Assert(t.openedConnCount, Equals, int64(1000)) - c.Assert(t.closedConnCount, Equals, int64(1000)) -} - func (t *RedisTest) TestConnPoolRemovesBrokenConn(c *C) { conn, err := net.Dial("tcp", redisAddr) c.Assert(err, IsNil)