Close connection on network timeout.

This commit is contained in:
Vladimir Mihailenco 2016-03-08 17:18:52 +02:00
parent 0ea1bdd306
commit 673e999431
8 changed files with 19 additions and 30 deletions

View File

@ -79,7 +79,7 @@ func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
if err != nil { if err != nil {
retErr = err retErr = err
} }
client.putConn(cn, err) client.putConn(cn, err, false)
} }
cmdsMap = failedCmds cmdsMap = failedCmds

View File

@ -32,7 +32,6 @@ type Cmder interface {
setErr(error) setErr(error)
reset() reset()
writeTimeout() *time.Duration
readTimeout() *time.Duration readTimeout() *time.Duration
clusterKey() string clusterKey() string
@ -82,7 +81,7 @@ type baseCmd struct {
_clusterKeyPos int _clusterKeyPos int
_writeTimeout, _readTimeout *time.Duration _readTimeout *time.Duration
} }
func (cmd *baseCmd) Err() error { func (cmd *baseCmd) Err() error {
@ -104,10 +103,6 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) {
cmd._readTimeout = &d cmd._readTimeout = &d
} }
func (cmd *baseCmd) writeTimeout() *time.Duration {
return cmd._writeTimeout
}
func (cmd *baseCmd) clusterKey() string { func (cmd *baseCmd) clusterKey() string {
if cmd._clusterKeyPos > 0 && cmd._clusterKeyPos < len(cmd._args) { if cmd._clusterKeyPos > 0 && cmd._clusterKeyPos < len(cmd._args) {
return fmt.Sprint(cmd._args[cmd._clusterKeyPos]) return fmt.Sprint(cmd._args[cmd._clusterKeyPos])
@ -115,10 +110,6 @@ func (cmd *baseCmd) clusterKey() string {
return "" return ""
} }
func (cmd *baseCmd) setWriteTimeout(d time.Duration) {
cmd._writeTimeout = &d
}
func (cmd *baseCmd) setErr(e error) { func (cmd *baseCmd) setErr(e error) {
cmd.err = e cmd.err = e
} }

View File

@ -33,15 +33,17 @@ func isNetworkError(err error) bool {
return ok return ok
} }
func isBadConn(err error) bool { func isBadConn(err error, allowTimeout bool) bool {
if err == nil { if err == nil {
return false return false
} }
if _, ok := err.(redisError); ok { if _, ok := err.(redisError); ok {
return false return false
} }
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if allowTimeout {
return false if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return false
}
} }
return true return true
} }

View File

@ -133,7 +133,7 @@ func (c *Multi) Exec(f func() error) ([]Cmder, error) {
} }
err = c.execCmds(cn, cmds) err = c.execCmds(cn, cmds)
c.base.putConn(cn, err) c.base.putConn(cn, err, false)
return retCmds, err return retCmds, err
} }

View File

@ -98,7 +98,7 @@ func (pipe *Pipeline) Exec() (cmds []Cmder, retErr error) {
resetCmds(failedCmds) resetCmds(failedCmds)
} }
failedCmds, err = execCmds(cn, failedCmds) failedCmds, err = execCmds(cn, failedCmds)
pipe.client.putConn(cn, err) pipe.client.putConn(cn, err, false)
if err != nil && retErr == nil { if err != nil && retErr == nil {
retErr = err retErr = err
} }

View File

@ -297,7 +297,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
} }
func (c *PubSub) putConn(cn *conn, err error) { func (c *PubSub) putConn(cn *conn, err error) {
if !c.base.putConn(cn, err) { if !c.base.putConn(cn, err, true) {
c.nsub = 0 c.nsub = 0
} }
} }

View File

@ -23,8 +23,8 @@ func (c *baseClient) conn() (*conn, bool, error) {
return c.connPool.Get() return c.connPool.Get()
} }
func (c *baseClient) putConn(cn *conn, err error) bool { func (c *baseClient) putConn(cn *conn, err error, allowTimeout bool) bool {
if isBadConn(err) { if isBadConn(err, allowTimeout) {
err = c.connPool.Remove(cn, err) err = c.connPool.Remove(cn, err)
if err != nil { if err != nil {
Logger.Printf("pool.Remove failed: %s", err) Logger.Printf("pool.Remove failed: %s", err)
@ -51,20 +51,16 @@ func (c *baseClient) process(cmd Cmder) {
return return
} }
if timeout := cmd.writeTimeout(); timeout != nil { readTimeout := cmd.readTimeout()
cn.WriteTimeout = *timeout if readTimeout != nil {
} else { cn.ReadTimeout = *readTimeout
cn.WriteTimeout = c.opt.WriteTimeout
}
if timeout := cmd.readTimeout(); timeout != nil {
cn.ReadTimeout = *timeout
} else { } else {
cn.ReadTimeout = c.opt.ReadTimeout cn.ReadTimeout = c.opt.ReadTimeout
} }
cn.WriteTimeout = c.opt.WriteTimeout
if err := cn.writeCmds(cmd); err != nil { if err := cn.writeCmds(cmd); err != nil {
c.putConn(cn, err) c.putConn(cn, err, false)
cmd.setErr(err) cmd.setErr(err)
if shouldRetry(err) { if shouldRetry(err) {
continue continue
@ -73,7 +69,7 @@ func (c *baseClient) process(cmd Cmder) {
} }
err = cmd.readReply(cn) err = cmd.readReply(cn)
c.putConn(cn, err) c.putConn(cn, err, readTimeout != nil)
if shouldRetry(err) { if shouldRetry(err) {
continue continue
} }

View File

@ -326,7 +326,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
resetCmds(cmds) resetCmds(cmds)
} }
failedCmds, err := execCmds(cn, cmds) failedCmds, err := execCmds(cn, cmds)
client.putConn(cn, err) client.putConn(cn, err, false)
if err != nil && retErr == nil { if err != nil && retErr == nil {
retErr = err retErr = err
} }