🐳 Use the nil of chan to indicate the pool closed or not

This commit is contained in:
Andy Pan 2019-01-26 12:26:26 +08:00
parent 6761702a6d
commit d3bcc5d10b
2 changed files with 8 additions and 10 deletions

View File

@ -72,7 +72,7 @@ func (p *Pool) periodicallyPurge() {
currentTime := time.Now() currentTime := time.Now()
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 { if len(idleWorkers) == 0 && p.Running() == 0 && p.release != nil {
p.lock.Unlock() p.lock.Unlock()
return return
} }
@ -111,7 +111,6 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
} }
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
} }
p.cond = sync.NewCond(&p.lock) p.cond = sync.NewCond(&p.lock)
@ -123,7 +122,7 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
// Submit submits a task to this pool. // Submit submits a task to this pool.
func (p *Pool) Submit(task f) error { func (p *Pool) Submit(task f) error {
if len(p.release) > 0 { if p.release == nil {
return ErrPoolClosed return ErrPoolClosed
} }
p.getWorker().task <- task p.getWorker().task <- task
@ -160,7 +159,7 @@ func (p *Pool) ReSize(size int) {
// Release Closes this pool. // Release Closes this pool.
func (p *Pool) Release() error { func (p *Pool) Release() error {
p.once.Do(func() { p.once.Do(func() {
p.release <- sig{} p.release = make(chan sig)
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
for i, w := range idleWorkers { for i, w := range idleWorkers {
@ -233,4 +232,4 @@ func (p *Pool) putWorker(worker *Worker) {
// Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue. // Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue.
p.cond.Signal() p.cond.Signal()
p.lock.Unlock() p.lock.Unlock()
} }

View File

@ -73,7 +73,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
currentTime := time.Now() currentTime := time.Now()
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 { if len(idleWorkers) == 0 && p.Running() == 0 && p.release != nil {
p.lock.Unlock() p.lock.Unlock()
return return
} }
@ -112,7 +112,6 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
} }
p := &PoolWithFunc{ p := &PoolWithFunc{
capacity: int32(size), capacity: int32(size),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
poolFunc: f, poolFunc: f,
} }
@ -125,7 +124,7 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
// Serve submits a task to pool. // Serve submits a task to pool.
func (p *PoolWithFunc) Serve(args interface{}) error { func (p *PoolWithFunc) Serve(args interface{}) error {
if len(p.release) > 0 { if p.release != nil {
return ErrPoolClosed return ErrPoolClosed
} }
p.getWorker().args <- args p.getWorker().args <- args
@ -162,7 +161,7 @@ func (p *PoolWithFunc) ReSize(size int) {
// Release Closed this pool. // Release Closed this pool.
func (p *PoolWithFunc) Release() error { func (p *PoolWithFunc) Release() error {
p.once.Do(func() { p.once.Do(func() {
p.release <- sig{} p.release = make(chan sig)
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
for i, w := range idleWorkers { for i, w := range idleWorkers {
@ -236,4 +235,4 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
// Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue. // Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue.
p.cond.Signal() p.cond.Signal()
p.lock.Unlock() p.lock.Unlock()
} }