From 097357a9f9b1d1af308235042dda414c560c4d6d Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 9 Jan 2014 12:37:01 +0800 Subject: [PATCH] add ring and timingwheel --- ring/ring.go | 123 ++++++++++++++++++++++++ ring/ring_test.go | 162 ++++++++++++++++++++++++++++++++ timingwheel/timingwheel.go | 95 +++++++++++++++++++ timingwheel/timingwheel_test.go | 19 ++++ 4 files changed, 399 insertions(+) create mode 100644 ring/ring.go create mode 100644 ring/ring_test.go create mode 100644 timingwheel/timingwheel.go create mode 100644 timingwheel/timingwheel_test.go diff --git a/ring/ring.go b/ring/ring.go new file mode 100644 index 0000000..5ad3170 --- /dev/null +++ b/ring/ring.go @@ -0,0 +1,123 @@ +package ring + +import ( + "errors" +) + +var ( + ErrRingLenNotEnough = errors.New("ring has not enough items for pop n") + ErrRingCapNotEnough = errors.New("ring has not enough space for push n") +) + +type Ring struct { + items []interface{} + head int + tail int + size int + maxSize int +} + +func NewRing(maxSize int) *Ring { + r := new(Ring) + + r.size = maxSize + r.head = 0 + r.tail = 0 + + //for a empty item + r.maxSize = r.size + 1 + + r.items = make([]interface{}, r.maxSize) + + return r +} + +func (r *Ring) Len() int { + if r.head == r.tail { + return 0 + } else if r.tail > r.head { + return r.tail - r.head + } else { + return r.tail + r.maxSize - r.head + } +} + +func (r *Ring) Cap() int { + return r.size - r.Len() +} + +func (r *Ring) MPop(n int) ([]interface{}, error) { + if r.Len() < n { + return nil, ErrRingLenNotEnough + } + + items := make([]interface{}, n) + + for i := 0; i < n; i++ { + head := (r.head + i) % r.maxSize + items[i] = r.items[head] + r.items[head] = nil + } + + r.head = (r.head + n) % r.maxSize + return items, nil +} + +func (r *Ring) Pop() (interface{}, error) { + if items, err := r.MPop(1); err != nil { + return nil, err + } else { + return items[0], nil + } +} + +func (r *Ring) MPush(items []interface{}) error { + n := len(items) + + if r.Cap() < n { + return ErrRingCapNotEnough + } + + for i := 0; i < n; i++ { + tail := (r.tail + i) % r.maxSize + r.items[tail] = items[i] + } + + r.tail = (r.tail + n) % r.maxSize + return nil +} + +func (r *Ring) Push(item interface{}) error { + items := []interface{}{item} + return r.MPush(items) +} + +func (r *Ring) Full() bool { + return r.Cap() == 0 +} + +func (r *Ring) Empty() bool { + return r.Len() == 0 +} + +func (r *Ring) Gets(n int) []interface{} { + if r.Len() < n { + n = r.Len() + } + result := make([]interface{}, n) + for i := 0; i < n; i++ { + result[i] = r.items[(r.head+i)%r.maxSize] + } + return result +} + +func (r *Ring) Get() interface{} { + if r.Empty() { + return ErrRingLenNotEnough + } + return r.items[r.head] +} + +func (r *Ring) GetAll() []interface{} { + return r.Gets(r.Len()) +} diff --git a/ring/ring_test.go b/ring/ring_test.go new file mode 100644 index 0000000..0c231fc --- /dev/null +++ b/ring/ring_test.go @@ -0,0 +1,162 @@ +package ring + +import ( + "testing" +) + +func TestRing(t *testing.T) { + size := 5 + + r := NewRing(size) + + if r.Len() != 0 { + t.Fatal("len not:", 0) + } + + if r.Cap() != size { + t.Fatal("cap not:", size) + } + + var err error + + items := []interface{}{1, 2, 3, 4} + err = r.MPush(items) + + if err != nil { + t.Fatal(err) + } + + if r.Len() != 4 { + t.Fatal("invalid len", r.Len()) + } + + if r.Cap() != 1 { + t.Fatal("invalid cap", r.Cap()) + } + + items, err = r.MPop(2) + + if err != nil { + t.Fatal(err) + } + + if v, ok := items[0].(int); ok { + if v != 1 { + t.Fatal("invalid value", v) + } + } else { + t.Fatal("invalid data", items[0]) + } + + if v, ok := items[1].(int); ok { + if v != 2 { + t.Fatal("invalid value", v) + } + } else { + t.Fatal("invalid data", items[1]) + } + + items = []interface{}{5, 6, 7} + err = r.MPush(items) + + if err != nil { + t.Fatal(err) + } + + if r.Len() != size { + t.Fatal("invalid size", r.Len()) + } + + if r.Cap() != 0 { + t.Fatal("invalid cap", r.Cap()) + } + + items, err = r.MPop(3) + + if err != nil { + t.Fatal(err) + } + + if v, ok := items[0].(int); ok { + if v != 3 { + t.Fatal("invalid value", v) + } + } else { + t.Fatal("invalid data", items[0]) + } + + if v, ok := items[1].(int); ok { + if v != 4 { + t.Fatal("invalid value", v) + } + } else { + t.Fatal("invalid data", items[1]) + } + + if v, ok := items[2].(int); ok { + if v != 5 { + t.Fatal("invalid value", v) + } + } else { + t.Fatal("invalid data", items[2]) + } + + if r.Len() != 2 { + t.Fatal("invalid len", r.Len()) + } + + if r.Cap() != 3 { + t.Fatal("invalid cap", r.Cap()) + } + +} + +func TestRingGet(t *testing.T) { + r := NewRing(5) + if !r.Empty() { + t.Fatal(" invalid len", r.Len()) + } + err := r.MPush([]interface{}{1, 2, 3, 4, 5}) + if err != nil { + t.Fatal(err.Error()) + } + if !r.Full() { + t.Fatal(" invalid cap", r.Cap()) + } + + err = r.Push(1) + if err == nil { + t.Fatal("should return a error") + } + + result := r.GetAll() + if len(result) != 5 { + t.Fatal("invalid len", len(result)) + } + + value, _ := r.Pop() + v, _ := value.(int) + if v != 1 { + t.Fatal("invalid value", v) + } + + result = r.Gets(3) + + if len(result) != 3 { + t.Fatal("invalid len", len(result)) + } + + value, _ = result[0].(int) + v, _ = value.(int) + + if v != 2 { + t.Fatal("invalid value", v) + } + + value, _ = result[2].(int) + v, _ = value.(int) + + if v != 4 { + t.Fatal("invalid value", v) + } +} diff --git a/timingwheel/timingwheel.go b/timingwheel/timingwheel.go new file mode 100644 index 0000000..dc62b44 --- /dev/null +++ b/timingwheel/timingwheel.go @@ -0,0 +1,95 @@ +package timingwheel + +import ( + "time" +) + +type regItem struct { + timeout time.Duration + reply chan chan bool +} + +type TimingWheel struct { + interval time.Duration + + ticker *time.Ticker + quit chan bool + + reg chan *regItem + + maxTimeout time.Duration + buckets []chan bool + pos int +} + +func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel { + w := new(TimingWheel) + + w.interval = interval + + w.reg = make(chan *regItem, 128) + + w.quit = make(chan bool) + w.pos = 0 + + w.maxTimeout = time.Duration(interval * (time.Duration(buckets))) + + w.buckets = make([]chan bool, buckets) + + for i := range w.buckets { + w.buckets[i] = make(chan bool) + } + + w.ticker = time.NewTicker(interval) + go w.run() + + return w +} + +func (w *TimingWheel) Stop() { + w.quit <- true +} + +func (w *TimingWheel) After(timeout time.Duration) <-chan bool { + if timeout >= w.maxTimeout { + panic("timeout too much, over maxtimeout") + } + + reply := make(chan chan bool) + + w.reg <- ®Item{timeout: timeout, reply: reply} + + return <-reply +} + +func (w *TimingWheel) run() { + for { + select { + case item := <-w.reg: + w.register(item) + case <-w.ticker.C: + w.onTicker() + case <-w.quit: + w.ticker.Stop() + return + } + } +} + +func (w *TimingWheel) register(item *regItem) { + timeout := item.timeout + + index := (w.pos + int(timeout/w.interval)) % len(w.buckets) + + b := w.buckets[index] + + item.reply <- b +} + +func (w *TimingWheel) onTicker() { + close(w.buckets[w.pos]) + + w.buckets[w.pos] = make(chan bool) + + w.pos = (w.pos + 1) % len(w.buckets) +} diff --git a/timingwheel/timingwheel_test.go b/timingwheel/timingwheel_test.go new file mode 100644 index 0000000..6385831 --- /dev/null +++ b/timingwheel/timingwheel_test.go @@ -0,0 +1,19 @@ +package timingwheel + +import ( + "testing" + "time" +) + +func TestTimingWheel(t *testing.T) { + w := NewTimingWheel(1*time.Second, 10) + + t.Log(time.Now().Unix()) + for { + select { + case <-w.After(1 * time.Second): + t.Log(time.Now().Unix()) + return + } + } +}