finish the ReSize() and Release() methods

This commit is contained in:
andy pan 2018-05-30 12:57:20 +08:00
parent 92a7dec196
commit d56ebb1a29
4 changed files with 22 additions and 23 deletions

20
pool.go
View File

@ -102,12 +102,22 @@ func (p *Pool) Cap() int {
func (p *Pool) Release() error { func (p *Pool) Release() error {
p.once.Do(func() { p.once.Do(func() {
p.release <- sig{} p.release <- sig{}
running := p.Running()
for i := 0; i < running; i++ {
p.getWorker().stop()
}
}) })
return nil return nil
} }
// ReSize change the capacity of this pool // ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) { func (p *Pool) ReSize(size int) {
if size < p.Cap() {
diff := p.Cap() - size
for i := 0; i < diff; i++ {
p.getWorker().stop()
}
}
atomic.StoreInt32(&p.capacity, int32(size)) atomic.StoreInt32(&p.capacity, int32(size))
} }
@ -151,21 +161,11 @@ func (p *Pool) getWorker() *Worker {
break break
} }
} else if w == nil { } else if w == nil {
//wp := p.workerPool.Get()
//if wp == nil {
// w = &Worker{
// pool: p,
// task: make(chan f, workerArgsCap),
// }
//} else {
// w = wp.(*Worker)
//}
w = &Worker{ w = &Worker{
pool: p, pool: p,
task: make(chan f), task: make(chan f),
} }
w.run() w.run()
//p.workerPool.Put(w)
} }
return w return w
} }

View File

@ -107,12 +107,22 @@ func (p *PoolWithFunc) Cap() int {
func (p *PoolWithFunc) Release() error { func (p *PoolWithFunc) Release() error {
p.once.Do(func() { p.once.Do(func() {
p.release <- sig{} p.release <- sig{}
running := p.Running()
for i := 0; i < running; i++ {
p.getWorker().stop()
}
}) })
return nil return nil
} }
// ReSize change the capacity of this pool // ReSize change the capacity of this pool
func (p *PoolWithFunc) ReSize(size int) { func (p *PoolWithFunc) ReSize(size int) {
if size < p.Cap() {
diff := p.Cap() - size
for i := 0; i < diff; i++ {
p.getWorker().stop()
}
}
atomic.StoreInt32(&p.capacity, int32(size)) atomic.StoreInt32(&p.capacity, int32(size))
} }
@ -156,28 +166,17 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
break break
} }
} else if w == nil { } else if w == nil {
//wp := p.workerPool.Get()
//if wp == nil {
// w = &WorkerWithFunc{
// pool: p,
// args: make(chan interface{}, workerArgsCap),
// }
//} else {
// w = wp.(*WorkerWithFunc)
//}
w = &WorkerWithFunc{ w = &WorkerWithFunc{
pool: p, pool: p,
args: make(chan interface{}), args: make(chan interface{}),
} }
w.run() w.run()
//p.workerPool.Put(w)
} }
return w return w
} }
// putWorker puts a worker back into free pool, recycling the goroutines. // putWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
//p.workerPool.Put(worker)
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
p.lock.Unlock() p.lock.Unlock()

View File

@ -43,7 +43,7 @@ func (w *Worker) run() {
//atomic.AddInt32(&w.pool.running, 1) //atomic.AddInt32(&w.pool.running, 1)
go func() { go func() {
for f := range w.task { for f := range w.task {
if f == nil || len(w.pool.release) > 0 { if f == nil {
atomic.AddInt32(&w.pool.running, -1) atomic.AddInt32(&w.pool.running, -1)
return return
} }

View File

@ -43,7 +43,7 @@ func (w *WorkerWithFunc) run() {
//atomic.AddInt32(&w.pool.running, 1) //atomic.AddInt32(&w.pool.running, 1)
go func() { go func() {
for args := range w.args { for args := range w.args {
if args == nil || len(w.pool.release) > 0 { if args == nil {
atomic.AddInt32(&w.pool.running, -1) atomic.AddInt32(&w.pool.running, -1)
return return
} }