forked from mirror/redis
Add releaseConnStrict
This commit is contained in:
parent
77f7b88603
commit
fa76dd0e87
12
cluster.go
12
cluster.go
|
@ -1272,11 +1272,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
|
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
node.Client.releaseConnStrict(cn, err)
|
||||||
node.Client.connPool.Put(cn)
|
|
||||||
} else {
|
|
||||||
node.Client.connPool.Remove(cn)
|
|
||||||
}
|
|
||||||
}(node, cmds)
|
}(node, cmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1466,11 +1462,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
|
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
node.Client.releaseConnStrict(cn, err)
|
||||||
node.Client.connPool.Put(cn)
|
|
||||||
} else {
|
|
||||||
node.Client.connPool.Remove(cn)
|
|
||||||
}
|
|
||||||
}(node, cmds)
|
}(node, cmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func IsRetryableError(err error, retryTimeout bool) bool {
|
func IsRetryableError(err error, retryTimeout bool) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
19
redis.go
19
redis.go
|
@ -77,14 +77,20 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
|
||||||
return cn, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
|
func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
|
||||||
if internal.IsBadConn(err, false) {
|
if internal.IsBadConn(err, false) {
|
||||||
c.connPool.Remove(cn)
|
c.connPool.Remove(cn)
|
||||||
return false
|
} else {
|
||||||
|
c.connPool.Put(cn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
|
||||||
|
if err == nil || internal.IsRedisError(err) {
|
||||||
c.connPool.Put(cn)
|
c.connPool.Put(cn)
|
||||||
return true
|
} else {
|
||||||
|
c.connPool.Remove(cn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) initConn(cn *pool.Conn) error {
|
func (c *baseClient) initConn(cn *pool.Conn) error {
|
||||||
|
@ -248,12 +254,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
|
||||||
}
|
}
|
||||||
|
|
||||||
canRetry, err := p(cn, cmds)
|
canRetry, err := p(cn, cmds)
|
||||||
|
c.releaseConnStrict(cn, err)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
|
||||||
c.connPool.Put(cn)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
c.connPool.Remove(cn)
|
|
||||||
|
|
||||||
if !canRetry || !internal.IsRetryableError(err, true) {
|
if !canRetry || !internal.IsRetryableError(err, true) {
|
||||||
break
|
break
|
||||||
|
|
6
ring.go
6
ring.go
|
@ -614,11 +614,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
|
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
shard.Client.releaseConnStrict(cn, err)
|
||||||
shard.Client.connPool.Put(cn)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
shard.Client.connPool.Remove(cn)
|
|
||||||
|
|
||||||
if canRetry && internal.IsRetryableError(err, true) {
|
if canRetry && internal.IsRetryableError(err, true) {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
|
Loading…
Reference in New Issue