mirror of https://github.com/panjf2000/ants.git
opt: speed up ReleaseTimeout() for multi-pool (#332)
This commit is contained in:
parent
95dad45c7d
commit
da22980e2c
35
multipool.go
35
multipool.go
|
@ -28,6 +28,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoadBalancingStrategy represents the type of load-balancing algorithm.
|
// LoadBalancingStrategy represents the type of load-balancing algorithm.
|
||||||
|
@ -182,14 +184,35 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
var errStr strings.Builder
|
errCh := make(chan error, len(mp.pools))
|
||||||
|
var wg errgroup.Group
|
||||||
for i, pool := range mp.pools {
|
for i, pool := range mp.pools {
|
||||||
if err := pool.ReleaseTimeout(timeout); err != nil {
|
func(p *Pool, idx int) {
|
||||||
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
|
wg.Go(func() error {
|
||||||
if i < len(mp.pools)-1 {
|
err := p.ReleaseTimeout(timeout)
|
||||||
errStr.WriteString(" | ")
|
if err != nil {
|
||||||
|
err = fmt.Errorf("pool %d: %v", idx, err)
|
||||||
}
|
}
|
||||||
|
errCh <- err
|
||||||
return err
|
return err
|
||||||
|
})
|
||||||
|
}(pool, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = wg.Wait()
|
||||||
|
|
||||||
|
var (
|
||||||
|
i int
|
||||||
|
errStr strings.Builder
|
||||||
|
)
|
||||||
|
for err := range errCh {
|
||||||
|
i++
|
||||||
|
if i == len(mp.pools) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
errStr.WriteString(err.Error())
|
||||||
|
errStr.WriteString(" | ")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,7 +220,7 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.New(errStr.String())
|
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reboot reboots a released multi-pool.
|
// Reboot reboots a released multi-pool.
|
||||||
|
|
|
@ -28,6 +28,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MultiPoolWithFunc consists of multiple pools, from which you will benefit the
|
// MultiPoolWithFunc consists of multiple pools, from which you will benefit the
|
||||||
|
@ -172,14 +174,35 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
var errStr strings.Builder
|
errCh := make(chan error, len(mp.pools))
|
||||||
|
var wg errgroup.Group
|
||||||
for i, pool := range mp.pools {
|
for i, pool := range mp.pools {
|
||||||
if err := pool.ReleaseTimeout(timeout); err != nil {
|
func(p *PoolWithFunc, idx int) {
|
||||||
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
|
wg.Go(func() error {
|
||||||
if i < len(mp.pools)-1 {
|
err := p.ReleaseTimeout(timeout)
|
||||||
errStr.WriteString(" | ")
|
if err != nil {
|
||||||
|
err = fmt.Errorf("pool %d: %v", idx, err)
|
||||||
}
|
}
|
||||||
|
errCh <- err
|
||||||
return err
|
return err
|
||||||
|
})
|
||||||
|
}(pool, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = wg.Wait()
|
||||||
|
|
||||||
|
var (
|
||||||
|
i int
|
||||||
|
errStr strings.Builder
|
||||||
|
)
|
||||||
|
for err := range errCh {
|
||||||
|
i++
|
||||||
|
if i == len(mp.pools) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
errStr.WriteString(err.Error())
|
||||||
|
errStr.WriteString(" | ")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +210,7 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.New(errStr.String())
|
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reboot reboots a released multi-pool.
|
// Reboot reboots a released multi-pool.
|
||||||
|
|
Loading…
Reference in New Issue