Retry if node is closed; close nodes with delay

This commit is contained in:
Vladimir Mihailenco 2018-01-17 12:55:20 +02:00
parent abb85b0fb8
commit a84876237b
1 changed files with 49 additions and 13 deletions

View File

@ -226,7 +226,7 @@ func (c *clusterNodes) NextGeneration() uint32 {
} }
// GC removes unused nodes. // GC removes unused nodes.
func (c *clusterNodes) GC(generation uint32) error { func (c *clusterNodes) GC(generation uint32) {
var collected []*clusterNode var collected []*clusterNode
c.mu.Lock() c.mu.Lock()
for i := 0; i < len(c.addrs); { for i := 0; i < len(c.addrs); {
@ -243,14 +243,11 @@ func (c *clusterNodes) GC(generation uint32) error {
} }
c.mu.Unlock() c.mu.Unlock()
var firstErr error time.AfterFunc(time.Minute, func() {
for _, node := range collected { for _, node := range collected {
if err := node.Client.Close(); err != nil && firstErr == nil { _ = node.Client.Close()
firstErr = err
} }
} })
return firstErr
} }
func (c *clusterNodes) All() ([]*clusterNode, error) { func (c *clusterNodes) All() ([]*clusterNode, error) {
@ -596,6 +593,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
break break
} }
if internal.IsRetryableError(err, true) {
continue
}
moved, ask, addr := internal.IsMovedError(err) moved, ask, addr := internal.IsMovedError(err)
if moved || ask { if moved || ask {
c.lazyReloadState() c.lazyReloadState()
@ -606,6 +607,13 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue continue
} }
if err == pool.ErrClosed {
node, err = state.slotMasterNode(slot)
if err != nil {
return err
}
}
return err return err
} }
@ -641,10 +649,10 @@ func (c *ClusterClient) Process(cmd Cmder) error {
if ask { if ask {
pipe := node.Client.Pipeline() pipe := node.Client.Pipeline()
pipe.Process(NewCmd("ASKING")) _ = pipe.Process(NewCmd("ASKING"))
pipe.Process(cmd) _ = pipe.Process(cmd)
_, err = pipe.Exec() _, err = pipe.Exec()
pipe.Close() _ = pipe.Close()
ask = false ask = false
} else { } else {
err = node.Client.Process(cmd) err = node.Client.Process(cmd)
@ -685,6 +693,14 @@ func (c *ClusterClient) Process(cmd Cmder) error {
continue continue
} }
if err == pool.ErrClosed {
_, node, err = c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
return err
}
}
break break
} }
@ -921,7 +937,11 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
for node, cmds := range cmdsMap { for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn() cn, _, err := node.Client.getConn()
if err != nil { if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
} else {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
}
continue continue
} }
@ -961,6 +981,18 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
return cmdsMap, nil return cmdsMap, nil
} }
func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
remappedCmds, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
return
}
for node, cmds := range remappedCmds {
failedCmds[node] = cmds
}
}
func (c *ClusterClient) pipelineProcessCmds( func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
@ -1067,7 +1099,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
for node, cmds := range cmdsMap { for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn() cn, _, err := node.Client.getConn()
if err != nil { if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
} else {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
}
continue continue
} }