mirror of https://github.com/go-redis/redis.git
parser: Introduce parserError.
This commit is contained in:
parent
037888ee0f
commit
a94daf0c96
44
parser.go
44
parser.go
|
@ -22,6 +22,16 @@ var (
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type parserError struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *parserError) Error() string {
|
||||||
|
return e.err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
func appendReq(buf []byte, args []string) []byte {
|
func appendReq(buf []byte, args []string) []byte {
|
||||||
buf = append(buf, '*')
|
buf = append(buf, '*')
|
||||||
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
|
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
|
||||||
|
@ -139,7 +149,7 @@ func parseBoolSliceReply(rd reader) (interface{}, error) {
|
||||||
func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
line, err := readLine(rd)
|
line, err := readLine(rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, &parserError{err}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch line[0] {
|
switch line[0] {
|
||||||
|
@ -148,7 +158,11 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
case '+':
|
case '+':
|
||||||
return string(line[1:]), nil
|
return string(line[1:]), nil
|
||||||
case ':':
|
case ':':
|
||||||
return strconv.ParseInt(string(line[1:]), 10, 64)
|
v, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, &parserError{err}
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
case '$':
|
case '$':
|
||||||
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
|
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
|
||||||
return "", Nil
|
return "", Nil
|
||||||
|
@ -156,13 +170,13 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
|
|
||||||
replyLenInt32, err := strconv.ParseInt(string(line[1:]), 10, 32)
|
replyLenInt32, err := strconv.ParseInt(string(line[1:]), 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", &parserError{err}
|
||||||
}
|
}
|
||||||
replyLen := int(replyLenInt32) + 2
|
replyLen := int(replyLenInt32) + 2
|
||||||
|
|
||||||
line, err = readN(rd, replyLen)
|
line, err = readN(rd, replyLen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", &parserError{err}
|
||||||
}
|
}
|
||||||
return string(line[:len(line)-2]), nil
|
return string(line[:len(line)-2]), nil
|
||||||
case '*':
|
case '*':
|
||||||
|
@ -170,13 +184,13 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
return nil, Nil
|
return nil, Nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &parserError{err}
|
||||||
|
}
|
||||||
|
|
||||||
switch multiBulkType {
|
switch multiBulkType {
|
||||||
case stringSlice:
|
case stringSlice:
|
||||||
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
vals := make([]string, 0, numReplies)
|
vals := make([]string, 0, numReplies)
|
||||||
for i := int64(0); i < numReplies; i++ {
|
for i := int64(0); i < numReplies; i++ {
|
||||||
v, err := parseReply(rd)
|
v, err := parseReply(rd)
|
||||||
|
@ -189,11 +203,6 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
|
|
||||||
return vals, nil
|
return vals, nil
|
||||||
case boolSlice:
|
case boolSlice:
|
||||||
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
vals := make([]bool, 0, numReplies)
|
vals := make([]bool, 0, numReplies)
|
||||||
for i := int64(0); i < numReplies; i++ {
|
for i := int64(0); i < numReplies; i++ {
|
||||||
v, err := parseReply(rd)
|
v, err := parseReply(rd)
|
||||||
|
@ -206,11 +215,6 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
|
|
||||||
return vals, nil
|
return vals, nil
|
||||||
default:
|
default:
|
||||||
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
vals := make([]interface{}, 0, numReplies)
|
vals := make([]interface{}, 0, numReplies)
|
||||||
for i := int64(0); i < numReplies; i++ {
|
for i := int64(0); i < numReplies; i++ {
|
||||||
v, err := parseReply(rd)
|
v, err := parseReply(rd)
|
||||||
|
@ -226,7 +230,7 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
return vals, nil
|
return vals, nil
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("redis: can't parse %q", line)
|
return nil, &parserError{fmt.Errorf("redis: can't parse %q", line)}
|
||||||
}
|
}
|
||||||
panic("not reachable")
|
panic("not reachable")
|
||||||
}
|
}
|
||||||
|
|
10
redis.go
10
redis.go
|
@ -119,14 +119,14 @@ func (c *BaseClient) Run(req Req) {
|
||||||
|
|
||||||
val, err := req.ParseReply(conn.Rd)
|
val, err := req.ParseReply(conn.Rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == Nil {
|
if _, ok := err.(*parserError); ok {
|
||||||
if err := c.ConnPool.Add(conn); err != nil {
|
|
||||||
Logger.Printf("ConnPool.Add error: %v", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err := c.ConnPool.Remove(conn); err != nil {
|
if err := c.ConnPool.Remove(conn); err != nil {
|
||||||
Logger.Printf("ConnPool.Remove error: %v", err)
|
Logger.Printf("ConnPool.Remove error: %v", err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if err := c.ConnPool.Add(conn); err != nil {
|
||||||
|
Logger.Printf("ConnPool.Add error: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
req.SetErr(err)
|
req.SetErr(err)
|
||||||
return
|
return
|
||||||
|
|
276
redis_test.go
276
redis_test.go
|
@ -52,11 +52,157 @@ func (t *RedisShutdownTest) TestShutdown(c *C) {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type RedisTest struct {
|
type RedisConnPoolTest struct {
|
||||||
openedConnCount, closedConnCount, initedConnCount int64
|
openedConnCount, closedConnCount, initedConnCount int64
|
||||||
client *redis.Client
|
client *redis.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ = Suite(&RedisConnPoolTest{})
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) SetUpTest(c *C) {
|
||||||
|
if t.client == nil {
|
||||||
|
openConn := func() (net.Conn, error) {
|
||||||
|
t.openedConnCount++
|
||||||
|
return net.Dial("tcp", redisAddr)
|
||||||
|
}
|
||||||
|
initConn := func(c *redis.Client) error {
|
||||||
|
t.initedConnCount++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
closeConn := func(conn net.Conn) error {
|
||||||
|
t.closedConnCount++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
t.client = redis.NewClient(openConn, closeConn, initConn)
|
||||||
|
t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) TearDownTest(c *C) {
|
||||||
|
t.resetRedis(c)
|
||||||
|
t.resetClient(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) resetRedis(c *C) {
|
||||||
|
// This is much faster than Flushall.
|
||||||
|
c.Assert(t.client.Select(1).Err(), IsNil)
|
||||||
|
c.Assert(t.client.FlushDb().Err(), IsNil)
|
||||||
|
c.Assert(t.client.Select(0).Err(), IsNil)
|
||||||
|
c.Assert(t.client.FlushDb().Err(), IsNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) resetClient(c *C) {
|
||||||
|
t.client.Close()
|
||||||
|
c.Check(t.closedConnCount, Equals, t.openedConnCount)
|
||||||
|
c.Check(t.initedConnCount, Equals, t.openedConnCount)
|
||||||
|
t.openedConnCount, t.closedConnCount, t.initedConnCount = 0, 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) TestConnPoolMaxCap(c *C) {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
ping := t.client.Ping()
|
||||||
|
c.Assert(ping.Err(), IsNil)
|
||||||
|
c.Assert(ping.Val(), Equals, "PONG")
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
c.Assert(t.client.Close(), IsNil)
|
||||||
|
c.Assert(t.openedConnCount, Equals, int64(10))
|
||||||
|
c.Assert(t.closedConnCount, Equals, int64(10))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) TestConnPoolMaxCapOnPipelineClient(c *C) {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
pipeline, err := t.client.PipelineClient()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
ping := pipeline.Ping()
|
||||||
|
reqs, err := pipeline.RunQueued()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(reqs, HasLen, 1)
|
||||||
|
c.Assert(ping.Err(), IsNil)
|
||||||
|
c.Assert(ping.Val(), Equals, "PONG")
|
||||||
|
|
||||||
|
c.Assert(pipeline.Close(), IsNil)
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
c.Assert(t.client.Close(), IsNil)
|
||||||
|
c.Assert(t.openedConnCount, Equals, int64(10))
|
||||||
|
c.Assert(t.closedConnCount, Equals, int64(10))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) TestConnPoolMaxCapOnMultiClient(c *C) {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
multi, err := t.client.MultiClient()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
var ping *redis.StatusReq
|
||||||
|
reqs, err := multi.Exec(func() {
|
||||||
|
ping = multi.Ping()
|
||||||
|
})
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(reqs, HasLen, 1)
|
||||||
|
c.Assert(ping.Err(), IsNil)
|
||||||
|
c.Assert(ping.Val(), Equals, "PONG")
|
||||||
|
|
||||||
|
c.Assert(multi.Close(), IsNil)
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
c.Assert(t.client.Close(), IsNil)
|
||||||
|
c.Assert(t.openedConnCount, Equals, int64(10))
|
||||||
|
c.Assert(t.closedConnCount, Equals, int64(10))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RedisConnPoolTest) TestConnPoolMaxCapOnPubSubClient(c *C) {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
pubsub, err := t.client.PubSubClient()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
_, err = pubsub.Subscribe()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
c.Assert(pubsub.Close(), IsNil)
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
c.Assert(t.client.Close(), IsNil)
|
||||||
|
c.Assert(t.openedConnCount, Equals, int64(1000))
|
||||||
|
c.Assert(t.closedConnCount, Equals, int64(1000))
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type RedisTest struct {
|
||||||
|
openedConnCount, closedConnCount, initedConnCount int
|
||||||
|
client *redis.Client
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Suite(&RedisTest{})
|
var _ = Suite(&RedisTest{})
|
||||||
|
|
||||||
func Test(t *testing.T) { TestingT(t) }
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
@ -79,33 +225,28 @@ func (t *RedisTest) SetUpTest(c *C) {
|
||||||
t.client = redis.NewClient(openConn, closeConn, initConn)
|
t.client = redis.NewClient(openConn, closeConn, initConn)
|
||||||
t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10
|
t.client.ConnPool.(*redis.MultiConnPool).MaxCap = 10
|
||||||
}
|
}
|
||||||
|
|
||||||
t.openedConnCount = 0
|
|
||||||
t.closedConnCount = 0
|
|
||||||
t.initedConnCount = 0
|
|
||||||
t.resetRedis(c)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RedisTest) TearDownTest(c *C) {
|
func (t *RedisTest) TearDownTest(c *C) {
|
||||||
t.resetRedis(c)
|
// Assert that all connections are in pool.
|
||||||
|
c.Assert(
|
||||||
|
t.client.ConnPool.(*redis.MultiConnPool).Len(),
|
||||||
|
Equals,
|
||||||
|
t.openedConnCount-t.closedConnCount,
|
||||||
|
)
|
||||||
|
|
||||||
c.Assert(t.openedConnCount, Equals, t.initedConnCount)
|
c.Assert(t.openedConnCount, Equals, t.initedConnCount)
|
||||||
|
t.resetRedis(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RedisTest) resetRedis(c *C) {
|
func (t *RedisTest) resetRedis(c *C) {
|
||||||
|
// This is much faster than Flushall.
|
||||||
c.Assert(t.client.Select(1).Err(), IsNil)
|
c.Assert(t.client.Select(1).Err(), IsNil)
|
||||||
c.Assert(t.client.FlushDb().Err(), IsNil)
|
c.Assert(t.client.FlushDb().Err(), IsNil)
|
||||||
|
|
||||||
c.Assert(t.client.Select(0).Err(), IsNil)
|
c.Assert(t.client.Select(0).Err(), IsNil)
|
||||||
c.Assert(t.client.FlushDb().Err(), IsNil)
|
c.Assert(t.client.FlushDb().Err(), IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RedisTest) resetClient(c *C) {
|
|
||||||
c.Assert(t.client.Close(), IsNil)
|
|
||||||
t.openedConnCount = 0
|
|
||||||
t.initedConnCount = 0
|
|
||||||
t.closedConnCount = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
func (t *RedisTest) TestRunWithouthCheckingErrVal(c *C) {
|
func (t *RedisTest) TestRunWithouthCheckingErrVal(c *C) {
|
||||||
|
@ -145,111 +286,6 @@ func (t *RedisTest) TestGetBigVal(c *C) {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
func (t *RedisTest) TestConnPoolMaxCap(c *C) {
|
|
||||||
t.resetClient(c)
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
ping := t.client.Ping()
|
|
||||||
c.Assert(ping.Err(), IsNil)
|
|
||||||
c.Assert(ping.Val(), Equals, "PONG")
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
c.Assert(t.client.Close(), IsNil)
|
|
||||||
c.Assert(t.openedConnCount, Equals, int64(10))
|
|
||||||
c.Assert(t.closedConnCount, Equals, int64(10))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *RedisTest) TestConnPoolMaxCapOnPipelineClient(c *C) {
|
|
||||||
t.resetClient(c)
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
pipeline, err := t.client.PipelineClient()
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
|
|
||||||
ping := pipeline.Ping()
|
|
||||||
reqs, err := pipeline.RunQueued()
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
c.Assert(reqs, HasLen, 1)
|
|
||||||
c.Assert(ping.Err(), IsNil)
|
|
||||||
c.Assert(ping.Val(), Equals, "PONG")
|
|
||||||
|
|
||||||
c.Assert(pipeline.Close(), IsNil)
|
|
||||||
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
c.Assert(t.client.Close(), IsNil)
|
|
||||||
c.Assert(t.openedConnCount, Equals, int64(10))
|
|
||||||
c.Assert(t.closedConnCount, Equals, int64(10))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *RedisTest) TestConnPoolMaxCapOnMultiClient(c *C) {
|
|
||||||
t.resetClient(c)
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
multi, err := t.client.MultiClient()
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
|
|
||||||
var ping *redis.StatusReq
|
|
||||||
reqs, err := multi.Exec(func() {
|
|
||||||
ping = multi.Ping()
|
|
||||||
})
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
c.Assert(reqs, HasLen, 1)
|
|
||||||
c.Assert(ping.Err(), IsNil)
|
|
||||||
c.Assert(ping.Val(), Equals, "PONG")
|
|
||||||
|
|
||||||
c.Assert(multi.Close(), IsNil)
|
|
||||||
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
c.Assert(t.client.Close(), IsNil)
|
|
||||||
c.Assert(t.openedConnCount, Equals, int64(10))
|
|
||||||
c.Assert(t.closedConnCount, Equals, int64(10))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *RedisTest) TestConnPoolMaxCapOnPubSubClient(c *C) {
|
|
||||||
t.resetClient(c)
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
pubsub, err := t.client.PubSubClient()
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
|
|
||||||
_, err = pubsub.Subscribe()
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
|
|
||||||
c.Assert(pubsub.Close(), IsNil)
|
|
||||||
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
c.Assert(t.client.Close(), IsNil)
|
|
||||||
c.Assert(t.openedConnCount, Equals, int64(1000))
|
|
||||||
c.Assert(t.closedConnCount, Equals, int64(1000))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *RedisTest) TestConnPoolRemovesBrokenConn(c *C) {
|
func (t *RedisTest) TestConnPoolRemovesBrokenConn(c *C) {
|
||||||
conn, err := net.Dial("tcp", redisAddr)
|
conn, err := net.Dial("tcp", redisAddr)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
Loading…
Reference in New Issue