From 5cd336a6a1072e8ca1624fcf2bb4a1a1f941c127 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 15 Apr 2014 16:36:20 +0800 Subject: [PATCH] add timer module refer linux timer --- timer/sleep.go | 34 +++++ timer/ticker.go | 30 +++++ timer/wheel.go | 308 ++++++++++++++++++++++++++++++++++++++++++++ timer/wheel_test.go | 43 +++++++ 4 files changed, 415 insertions(+) create mode 100644 timer/sleep.go create mode 100644 timer/ticker.go create mode 100644 timer/wheel.go create mode 100644 timer/wheel_test.go diff --git a/timer/sleep.go b/timer/sleep.go new file mode 100644 index 0000000..379abea --- /dev/null +++ b/timer/sleep.go @@ -0,0 +1,34 @@ +package timer + +import ( + "time" +) + +type Timer struct { + C <-chan time.Time + r *timer +} + +func After(d time.Duration) <-chan time.Time { + return defaultWheel.After(d) +} + +func Sleep(d time.Duration) { + defaultWheel.Sleep(d) +} + +func AfterFunc(d time.Duration, f func()) *Timer { + return defaultWheel.AfterFunc(d, f) +} + +func NewTimer(d time.Duration) *Timer { + return defaultWheel.NewTimer(d) +} + +func (t *Timer) Reset(d time.Duration) { + t.r.w.resetTimer(t.r, d, 0) +} + +func (t *Timer) Stop() { + t.r.w.delTimer(t.r) +} diff --git a/timer/ticker.go b/timer/ticker.go new file mode 100644 index 0000000..810f908 --- /dev/null +++ b/timer/ticker.go @@ -0,0 +1,30 @@ +package timer + +import ( + "time" +) + +type Ticker struct { + C <-chan time.Time + r *timer +} + +func NewTicker(d time.Duration) *Ticker { + return defaultWheel.NewTicker(d) +} + +func TickFunc(d time.Duration, f func()) *Ticker { + return defaultWheel.TickFunc(d, f) +} + +func Tick(d time.Duration) <-chan time.Time { + return defaultWheel.Tick(d) +} + +func (t *Ticker) Stop() { + t.r.w.delTimer(t.r) +} + +func (t *Ticker) Reset(d time.Duration) { + t.r.w.resetTimer(t.r, d, d) +} diff --git a/timer/wheel.go b/timer/wheel.go new file mode 100644 index 0000000..29ca387 --- /dev/null +++ b/timer/wheel.go @@ -0,0 +1,308 @@ +package timer + +import ( + "sync" + "time" +) + +const ( + tvn_bits uint64 = 6 + tvr_bits uint64 = 8 + tvn_size uint64 = 64 //1 << tvn_bits + tvr_size uint64 = 256 //1 << tvr_bits + + tvn_mask uint64 = 63 //tvn_size - 1 + tvr_mask uint64 = 255 //tvr_size -1 +) + +const ( + defaultTimerSize = 128 +) + +type timer struct { + expires uint64 + period uint64 + + f func(time.Time, interface{}) + arg interface{} + + w *Wheel + + vec []*timer + index int +} + +type Wheel struct { + sync.Mutex + + jiffies uint64 + + tv1 [][]*timer + tv2 [][]*timer + tv3 [][]*timer + tv4 [][]*timer + tv5 [][]*timer + + tick time.Duration + + quit chan struct{} +} + +//tick is the time for a jiffies +func NewWheel(tick time.Duration) *Wheel { + w := new(Wheel) + + w.quit = make(chan struct{}) + + f := func(size int) [][]*timer { + tv := make([][]*timer, size) + for i := range tv { + tv[i] = make([]*timer, 0, defaultTimerSize) + } + + return tv + } + + w.tv1 = f(int(tvr_size)) + w.tv2 = f(int(tvn_size)) + w.tv3 = f(int(tvn_size)) + w.tv4 = f(int(tvn_size)) + w.tv5 = f(int(tvn_size)) + + w.jiffies = 0 + w.tick = tick + + go w.run() + return w +} + +func (w *Wheel) addTimerInternal(t *timer) { + idx := t.expires - w.jiffies + + var tv [][]*timer + var i uint64 + + if idx < tvr_size { + i = t.expires & tvr_mask + tv = w.tv1 + } else if idx < (1 << (tvr_bits + tvn_bits)) { + i = (t.expires >> tvr_bits) & tvn_mask + tv = w.tv2 + } else if idx < (1 << (tvr_bits + 2*tvn_bits)) { + i = (t.expires >> (tvr_bits + tvn_bits)) & tvn_mask + tv = w.tv3 + } else if idx < (1 << (tvr_bits + 3*tvn_bits)) { + i = (t.expires >> (tvr_bits + 2*tvn_bits)) & tvn_mask + tv = w.tv4 + } else if int64(idx) < 0 { + i = w.jiffies & tvr_mask + tv = w.tv1 + } else { + if idx > 0x00000000ffffffff { + idx = 0x00000000ffffffff + + t.expires = idx + w.jiffies + } + + i = (t.expires >> (tvr_bits + 3*tvn_bits)) & tvn_mask + tv = w.tv5 + } + + tv[i] = append(tv[i], t) + + t.vec = tv[i] + t.index = len(tv[i]) - 1 +} + +func (w *Wheel) cascade(tv [][]*timer, index int) int { + vec := tv[index] + tv[index] = vec[0:0:defaultTimerSize] + + for _, t := range vec { + w.addTimerInternal(t) + } + + return index +} + +func (w *Wheel) getIndex(n int) int { + return int((w.jiffies >> (tvr_bits + uint64(n)*tvn_bits)) & tvn_mask) +} + +func (w *Wheel) onTick() { + w.Lock() + + index := int(w.jiffies & tvr_mask) + + if index == 0 && (w.cascade(w.tv2, w.getIndex(0))) == 0 && + (w.cascade(w.tv3, w.getIndex(1))) == 0 && + (w.cascade(w.tv4, w.getIndex(2))) == 0 && + (w.cascade(w.tv5, w.getIndex(3)) == 0) { + + } + + w.jiffies++ + + vec := w.tv1[index] + w.tv1[index] = vec[0:0:defaultTimerSize] + + w.Unlock() + + f := func(vec []*timer) { + now := time.Now() + for _, t := range vec { + if t == nil { + continue + } + + t.f(now, t.arg) + + if t.period > 0 { + t.expires = t.period + w.jiffies + w.addTimer(t) + } + } + } + + if len(vec) > 0 { + go f(vec) + } +} + +func (w *Wheel) addTimer(t *timer) { + w.Lock() + w.addTimerInternal(t) + w.Unlock() +} + +func (w *Wheel) delTimer(t *timer) { + w.Lock() + vec := t.vec + index := t.index + + if len(vec) > index && vec[index] == t { + vec[index] = nil + } + + w.Unlock() +} + +func (w *Wheel) resetTimer(t *timer, when time.Duration, period time.Duration) { + w.delTimer(t) + + t.expires = w.jiffies + uint64(when/w.tick) + t.period = uint64(period / w.tick) + + w.addTimer(t) +} + +func (w *Wheel) newTimer(when time.Duration, period time.Duration, + f func(time.Time, interface{}), arg interface{}) *timer { + t := new(timer) + + t.expires = w.jiffies + uint64(when/w.tick) + t.period = uint64(period / w.tick) + + t.f = f + t.arg = arg + + t.w = w + + return t +} + +func (w *Wheel) run() { + ticker := time.NewTicker(w.tick) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + w.onTick() + case <-w.quit: + return + } + } +} + +func (w *Wheel) Stop() { + close(w.quit) +} + +func sendTime(t time.Time, arg interface{}) { + select { + case arg.(chan time.Time) <- t: + default: + } +} + +func goFunc(t time.Time, arg interface{}) { + go arg.(func())() +} + +func dummyFunc(t time.Time, arg interface{}) { + +} + +func (w *Wheel) After(d time.Duration) <-chan time.Time { + return w.NewTimer(d).C +} + +func (w *Wheel) Sleep(d time.Duration) { + <-w.NewTimer(d).C +} + +func (w *Wheel) Tick(d time.Duration) <-chan time.Time { + return w.NewTicker(d).C +} + +func (w *Wheel) TickFunc(d time.Duration, f func()) *Ticker { + t := &Ticker{ + r: w.newTimer(d, d, goFunc, f), + } + + w.addTimer(t.r) + + return t + +} + +func (w *Wheel) AfterFunc(d time.Duration, f func()) *Timer { + t := &Timer{ + r: w.newTimer(d, 0, goFunc, f), + } + + w.addTimer(t.r) + + return t +} + +func (w *Wheel) NewTimer(d time.Duration) *Timer { + c := make(chan time.Time, 1) + t := &Timer{ + C: c, + r: w.newTimer(d, 0, sendTime, c), + } + + w.addTimer(t.r) + + return t +} + +func (w *Wheel) NewTicker(d time.Duration) *Ticker { + c := make(chan time.Time, 1) + t := &Ticker{ + C: c, + r: w.newTimer(d, d, sendTime, c), + } + + w.addTimer(t.r) + + return t +} + +var defaultWheel *Wheel + +func init() { + defaultWheel = NewWheel(500 * time.Millisecond) +} diff --git a/timer/wheel_test.go b/timer/wheel_test.go new file mode 100644 index 0000000..d8bf13d --- /dev/null +++ b/timer/wheel_test.go @@ -0,0 +1,43 @@ +package timer + +import ( + "testing" + "time" +) + +var testWheel = NewWheel(100 * time.Millisecond) + +func TestTimer(t *testing.T) { + t1 := testWheel.NewTimer(500 * time.Millisecond) + + before := time.Now() + <-t1.C + + after := time.Now() + + println(after.Sub(before).String()) +} + +func TestTicker(t *testing.T) { + wait := make(chan struct{}, 100) + i := 0 + f := func() { + println(time.Now().Unix()) + i++ + if i >= 10 { + wait <- struct{}{} + } + } + before := time.Now() + + t1 := testWheel.TickFunc(1000*time.Millisecond, f) + + <-wait + + t1.Stop() + + after := time.Now() + + println(after.Sub(before).String()) + +}