From da22980e2cb200e137ffb530882fd822d0e1b28e Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 18 Jun 2024 02:42:55 +0800 Subject: [PATCH] opt: speed up ReleaseTimeout() for multi-pool (#332) --- multipool.go | 39 +++++++++++++++++++++++++++++++-------- multipool_func.go | 39 +++++++++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/multipool.go b/multipool.go index 1de75ae..ad5db10 100644 --- a/multipool.go +++ b/multipool.go @@ -28,6 +28,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/sync/errgroup" ) // LoadBalancingStrategy represents the type of load-balancing algorithm. @@ -182,14 +184,35 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { return ErrPoolClosed } - var errStr strings.Builder + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group for i, pool := range mp.pools { - if err := pool.ReleaseTimeout(timeout); err != nil { - errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) - if i < len(mp.pools)-1 { - errStr.WriteString(" | ") - } - return err + func(p *Pool, idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var ( + i int + errStr strings.Builder + ) + for err := range errCh { + i++ + if i == len(mp.pools) { + break + } + if err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") } } @@ -197,7 +220,7 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { return nil } - return errors.New(errStr.String()) + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } // Reboot reboots a released multi-pool. diff --git a/multipool_func.go b/multipool_func.go index c7d31ff..257b16e 100644 --- a/multipool_func.go +++ b/multipool_func.go @@ -28,6 +28,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/sync/errgroup" ) // MultiPoolWithFunc consists of multiple pools, from which you will benefit the @@ -172,14 +174,35 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { return ErrPoolClosed } - var errStr strings.Builder + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group for i, pool := range mp.pools { - if err := pool.ReleaseTimeout(timeout); err != nil { - errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) - if i < len(mp.pools)-1 { - errStr.WriteString(" | ") - } - return err + func(p *PoolWithFunc, idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var ( + i int + errStr strings.Builder + ) + for err := range errCh { + i++ + if i == len(mp.pools) { + break + } + if err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") } } @@ -187,7 +210,7 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { return nil } - return errors.New(errStr.String()) + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } // Reboot reboots a released multi-pool.