support add delay task

This commit is contained in:
siddontang 2014-03-28 14:11:28 +08:00
parent 333d680f38
commit 1a716a9b03
2 changed files with 100 additions and 34 deletions

View File

@ -1,25 +1,36 @@
package timingwheel package timingwheel
import ( import (
"github.com/siddontang/golib/log"
"sync"
"time" "time"
) )
type regItem struct { type TaskFunc func()
timeout time.Duration
reply chan chan bool type bucket struct {
c chan struct{}
tasks []TaskFunc
} }
const defaultTasksSize = 16
const defaultTaskPool = 4
type TimingWheel struct { type TimingWheel struct {
sync.Mutex
interval time.Duration interval time.Duration
ticker *time.Ticker ticker *time.Ticker
quit chan bool quit chan struct{}
reg chan *regItem
maxTimeout time.Duration maxTimeout time.Duration
buckets []chan bool
buckets []bucket
pos int pos int
tasks chan []TaskFunc
} }
func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel { func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
@ -27,46 +38,67 @@ func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
w.interval = interval w.interval = interval
w.reg = make(chan *regItem, 128) w.quit = make(chan struct{})
w.quit = make(chan bool)
w.pos = 0 w.pos = 0
w.maxTimeout = time.Duration(interval * (time.Duration(buckets))) w.maxTimeout = time.Duration(interval * (time.Duration(buckets)))
w.buckets = make([]chan bool, buckets) w.buckets = make([]bucket, buckets)
for i := range w.buckets { for i := range w.buckets {
w.buckets[i] = make(chan bool) w.buckets[i].c = make(chan struct{})
w.buckets[i].tasks = make([]TaskFunc, 0, defaultTasksSize)
} }
w.ticker = time.NewTicker(interval) w.ticker = time.NewTicker(interval)
go w.run() go w.run()
w.tasks = make(chan []TaskFunc, 1024)
for i := 0; i < defaultTaskPool; i++ {
go w.taskPool()
}
return w return w
} }
func (w *TimingWheel) Stop() { func (w *TimingWheel) Stop() {
w.quit <- true close(w.quit)
} }
func (w *TimingWheel) After(timeout time.Duration) <-chan bool { func (w *TimingWheel) After(timeout time.Duration) <-chan struct{} {
if timeout >= w.maxTimeout { if timeout >= w.maxTimeout {
panic("timeout too much, over maxtimeout") panic("timeout too much, over maxtimeout")
} }
reply := make(chan chan bool) w.Lock()
w.reg <- &regItem{timeout: timeout, reply: reply} index := (w.pos + int(timeout/w.interval)) % len(w.buckets)
return <-reply b := w.buckets[index].c
w.Unlock()
return b
}
func (w *TimingWheel) AddTask(timeout time.Duration, f TaskFunc) {
if timeout >= w.maxTimeout {
panic("timeout too much, over maxtimeout")
}
w.Lock()
index := (w.pos + int(timeout/w.interval)) % len(w.buckets)
w.buckets[index].tasks = append(w.buckets[index].tasks, f)
w.Unlock()
} }
func (w *TimingWheel) run() { func (w *TimingWheel) run() {
for { for {
select { select {
case item := <-w.reg:
w.register(item)
case <-w.ticker.C: case <-w.ticker.C:
w.onTicker() w.onTicker()
case <-w.quit: case <-w.quit:
@ -76,20 +108,39 @@ func (w *TimingWheel) run() {
} }
} }
func (w *TimingWheel) register(item *regItem) {
timeout := item.timeout
index := (w.pos + int(timeout/w.interval)) % len(w.buckets)
b := w.buckets[index]
item.reply <- b
}
func (w *TimingWheel) onTicker() { func (w *TimingWheel) onTicker() {
close(w.buckets[w.pos]) w.Lock()
w.buckets[w.pos] = make(chan bool) lastC := w.buckets[w.pos].c
tasks := w.buckets[w.pos].tasks
w.buckets[w.pos].c = make(chan struct{})
w.buckets[w.pos].tasks = w.buckets[w.pos].tasks[0:0:defaultTasksSize]
w.pos = (w.pos + 1) % len(w.buckets) w.pos = (w.pos + 1) % len(w.buckets)
w.Unlock()
close(lastC)
w.tasks <- tasks
}
func (w *TimingWheel) taskPool() {
defer func() {
if e := recover(); e != nil {
log.Fatal("task pool fatal %v", e)
}
}()
for {
select {
case tasks := <-w.tasks:
for _, task := range tasks {
task()
}
case <-w.quit:
return
}
}
} }

View File

@ -8,12 +8,27 @@ import (
func TestTimingWheel(t *testing.T) { func TestTimingWheel(t *testing.T) {
w := NewTimingWheel(1*time.Second, 10) w := NewTimingWheel(1*time.Second, 10)
t.Log(time.Now().Unix()) println(time.Now().Unix())
for { for {
select { select {
case <-w.After(1 * time.Second): case <-w.After(1 * time.Second):
t.Log(time.Now().Unix()) println(time.Now().Unix())
return return
} }
} }
} }
func TestTask(t *testing.T) {
w := NewTimingWheel(1*time.Second, 10)
r := make(chan struct{})
f := func() {
println("hello world")
r <- struct{}{}
}
w.AddTask(1*time.Second, f)
<-r
println("over")
}