timingwheel remove task pool
This commit is contained in:
parent
1a716a9b03
commit
e009b5fbd8
|
@ -14,7 +14,6 @@ type bucket struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultTasksSize = 16
|
const defaultTasksSize = 16
|
||||||
const defaultTaskPool = 4
|
|
||||||
|
|
||||||
type TimingWheel struct {
|
type TimingWheel struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
@ -29,8 +28,6 @@ type TimingWheel struct {
|
||||||
buckets []bucket
|
buckets []bucket
|
||||||
|
|
||||||
pos int
|
pos int
|
||||||
|
|
||||||
tasks chan []TaskFunc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
|
func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
|
||||||
|
@ -53,12 +50,6 @@ func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
|
||||||
w.ticker = time.NewTicker(interval)
|
w.ticker = time.NewTicker(interval)
|
||||||
go w.run()
|
go w.run()
|
||||||
|
|
||||||
w.tasks = make(chan []TaskFunc, 1024)
|
|
||||||
|
|
||||||
for i := 0; i < defaultTaskPool; i++ {
|
|
||||||
go w.taskPool()
|
|
||||||
}
|
|
||||||
|
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,24 +114,18 @@ func (w *TimingWheel) onTicker() {
|
||||||
|
|
||||||
close(lastC)
|
close(lastC)
|
||||||
|
|
||||||
w.tasks <- tasks
|
if len(tasks) > 0 {
|
||||||
}
|
f := func(tasks []TaskFunc) {
|
||||||
|
|
||||||
func (w *TimingWheel) taskPool() {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if e := recover(); e != nil {
|
if e := recover(); e != nil {
|
||||||
log.Fatal("task pool fatal %v", e)
|
log.Fatal("run task fatal %v", e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case tasks := <-w.tasks:
|
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
task()
|
task()
|
||||||
}
|
}
|
||||||
case <-w.quit:
|
}
|
||||||
return
|
|
||||||
}
|
go f(tasks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue