go/timingwheel/timingwheel.go

94 lines
1.2 KiB
Go
Raw Normal View History

2014-01-09 08:37:01 +04:00
package timingwheel
import (
2014-03-28 10:11:28 +04:00
"sync"
2014-01-09 08:37:01 +04:00
"time"
)
type TimingWheel struct {
2014-03-28 10:11:28 +04:00
sync.Mutex
2014-01-09 08:37:01 +04:00
interval time.Duration
ticker *time.Ticker
2014-03-28 10:11:28 +04:00
quit chan struct{}
2014-01-09 08:37:01 +04:00
maxTimeout time.Duration
2014-03-28 10:11:28 +04:00
cs []chan struct{}
2014-03-28 10:11:28 +04:00
pos int
2014-01-09 08:37:01 +04:00
}
func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
w := new(TimingWheel)
w.interval = interval
2014-03-28 10:11:28 +04:00
w.quit = make(chan struct{})
2014-01-09 08:37:01 +04:00
w.pos = 0
w.maxTimeout = time.Duration(interval * (time.Duration(buckets)))
w.cs = make([]chan struct{}, buckets)
2014-01-09 08:37:01 +04:00
for i := range w.cs {
w.cs[i] = make(chan struct{})
2014-01-09 08:37:01 +04:00
}
w.ticker = time.NewTicker(interval)
go w.run()
return w
}
func (w *TimingWheel) Stop() {
2014-03-28 10:11:28 +04:00
close(w.quit)
}
func (w *TimingWheel) After(timeout time.Duration) <-chan struct{} {
if timeout >= w.maxTimeout {
panic("timeout too much, over maxtimeout")
}
2016-10-05 14:08:31 +03:00
index := int(timeout / w.interval)
if 0 < index {
index--
}
2014-03-28 10:11:28 +04:00
w.Lock()
2016-10-05 14:08:31 +03:00
index = (w.pos + index) % len(w.cs)
2014-03-28 10:11:28 +04:00
b := w.cs[index]
2014-03-28 10:11:28 +04:00
w.Unlock()
return b
2014-01-09 08:37:01 +04:00
}
func (w *TimingWheel) run() {
for {
select {
case <-w.ticker.C:
w.onTicker()
case <-w.quit:
w.ticker.Stop()
return
}
}
}
2014-03-28 10:11:28 +04:00
func (w *TimingWheel) onTicker() {
w.Lock()
2014-01-09 08:37:01 +04:00
lastC := w.cs[w.pos]
w.cs[w.pos] = make(chan struct{})
2014-01-09 08:37:01 +04:00
w.pos = (w.pos + 1) % len(w.cs)
2014-01-09 08:37:01 +04:00
2014-03-28 10:11:28 +04:00
w.Unlock()
2014-01-09 08:37:01 +04:00
2014-03-28 10:11:28 +04:00
close(lastC)
2014-01-09 08:37:01 +04:00
}