Merge branch 'develop'

This commit is contained in:
andy pan 2018-05-30 12:57:34 +08:00
commit d30d625580
4 changed files with 24 additions and 25 deletions

20
pool.go
View File

@ -102,12 +102,22 @@ func (p *Pool) Cap() int {
func (p *Pool) Release() error {
p.once.Do(func() {
p.release <- sig{}
running := p.Running()
for i := 0; i < running; i++ {
p.getWorker().stop()
}
})
return nil
}
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
if size < p.Cap() {
diff := p.Cap() - size
for i := 0; i < diff; i++ {
p.getWorker().stop()
}
}
atomic.StoreInt32(&p.capacity, int32(size))
}
@ -151,21 +161,11 @@ func (p *Pool) getWorker() *Worker {
break
}
} else if w == nil {
//wp := p.workerPool.Get()
//if wp == nil {
// w = &Worker{
// pool: p,
// task: make(chan f, workerArgsCap),
// }
//} else {
// w = wp.(*Worker)
//}
w = &Worker{
pool: p,
task: make(chan f),
}
w.run()
//p.workerPool.Put(w)
}
return w
}

View File

@ -107,12 +107,22 @@ func (p *PoolWithFunc) Cap() int {
func (p *PoolWithFunc) Release() error {
p.once.Do(func() {
p.release <- sig{}
running := p.Running()
for i := 0; i < running; i++ {
p.getWorker().stop()
}
})
return nil
}
// ReSize change the capacity of this pool
func (p *PoolWithFunc) ReSize(size int) {
if size < p.Cap() {
diff := p.Cap() - size
for i := 0; i < diff; i++ {
p.getWorker().stop()
}
}
atomic.StoreInt32(&p.capacity, int32(size))
}
@ -156,28 +166,17 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
break
}
} else if w == nil {
//wp := p.workerPool.Get()
//if wp == nil {
// w = &WorkerWithFunc{
// pool: p,
// args: make(chan interface{}, workerArgsCap),
// }
//} else {
// w = wp.(*WorkerWithFunc)
//}
w = &WorkerWithFunc{
pool: p,
args: make(chan interface{}),
}
w.run()
//p.workerPool.Put(w)
}
return w
}
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
//p.workerPool.Put(worker)
p.lock.Lock()
p.workers = append(p.workers, worker)
p.lock.Unlock()

View File

@ -43,7 +43,7 @@ func (w *Worker) run() {
//atomic.AddInt32(&w.pool.running, 1)
go func() {
for f := range w.task {
if f == nil || len(w.pool.release) > 0 {
if f == nil {
atomic.AddInt32(&w.pool.running, -1)
return
}
@ -55,7 +55,7 @@ func (w *Worker) run() {
// stop this worker.
func (w *Worker) stop() {
w.task <- nil
w.sendTask(nil)
}
// sendTask sends a task to this worker.

View File

@ -43,7 +43,7 @@ func (w *WorkerWithFunc) run() {
//atomic.AddInt32(&w.pool.running, 1)
go func() {
for args := range w.args {
if args == nil || len(w.pool.release) > 0 {
if args == nil {
atomic.AddInt32(&w.pool.running, -1)
return
}
@ -55,7 +55,7 @@ func (w *WorkerWithFunc) run() {
// stop this worker.
func (w *WorkerWithFunc) stop() {
w.args <- nil
w.sendTask(nil)
}
// sendTask sends a task to this worker.