add ring and timingwheel

This commit is contained in:
siddontang 2014-01-09 12:37:01 +08:00
parent eead8cb809
commit 097357a9f9
4 changed files with 399 additions and 0 deletions

123
ring/ring.go Normal file
View File

@ -0,0 +1,123 @@
package ring
import (
"errors"
)
var (
ErrRingLenNotEnough = errors.New("ring has not enough items for pop n")
ErrRingCapNotEnough = errors.New("ring has not enough space for push n")
)
type Ring struct {
items []interface{}
head int
tail int
size int
maxSize int
}
func NewRing(maxSize int) *Ring {
r := new(Ring)
r.size = maxSize
r.head = 0
r.tail = 0
//for a empty item
r.maxSize = r.size + 1
r.items = make([]interface{}, r.maxSize)
return r
}
func (r *Ring) Len() int {
if r.head == r.tail {
return 0
} else if r.tail > r.head {
return r.tail - r.head
} else {
return r.tail + r.maxSize - r.head
}
}
func (r *Ring) Cap() int {
return r.size - r.Len()
}
func (r *Ring) MPop(n int) ([]interface{}, error) {
if r.Len() < n {
return nil, ErrRingLenNotEnough
}
items := make([]interface{}, n)
for i := 0; i < n; i++ {
head := (r.head + i) % r.maxSize
items[i] = r.items[head]
r.items[head] = nil
}
r.head = (r.head + n) % r.maxSize
return items, nil
}
func (r *Ring) Pop() (interface{}, error) {
if items, err := r.MPop(1); err != nil {
return nil, err
} else {
return items[0], nil
}
}
func (r *Ring) MPush(items []interface{}) error {
n := len(items)
if r.Cap() < n {
return ErrRingCapNotEnough
}
for i := 0; i < n; i++ {
tail := (r.tail + i) % r.maxSize
r.items[tail] = items[i]
}
r.tail = (r.tail + n) % r.maxSize
return nil
}
func (r *Ring) Push(item interface{}) error {
items := []interface{}{item}
return r.MPush(items)
}
func (r *Ring) Full() bool {
return r.Cap() == 0
}
func (r *Ring) Empty() bool {
return r.Len() == 0
}
func (r *Ring) Gets(n int) []interface{} {
if r.Len() < n {
n = r.Len()
}
result := make([]interface{}, n)
for i := 0; i < n; i++ {
result[i] = r.items[(r.head+i)%r.maxSize]
}
return result
}
func (r *Ring) Get() interface{} {
if r.Empty() {
return ErrRingLenNotEnough
}
return r.items[r.head]
}
func (r *Ring) GetAll() []interface{} {
return r.Gets(r.Len())
}

162
ring/ring_test.go Normal file
View File

@ -0,0 +1,162 @@
package ring
import (
"testing"
)
func TestRing(t *testing.T) {
size := 5
r := NewRing(size)
if r.Len() != 0 {
t.Fatal("len not:", 0)
}
if r.Cap() != size {
t.Fatal("cap not:", size)
}
var err error
items := []interface{}{1, 2, 3, 4}
err = r.MPush(items)
if err != nil {
t.Fatal(err)
}
if r.Len() != 4 {
t.Fatal("invalid len", r.Len())
}
if r.Cap() != 1 {
t.Fatal("invalid cap", r.Cap())
}
items, err = r.MPop(2)
if err != nil {
t.Fatal(err)
}
if v, ok := items[0].(int); ok {
if v != 1 {
t.Fatal("invalid value", v)
}
} else {
t.Fatal("invalid data", items[0])
}
if v, ok := items[1].(int); ok {
if v != 2 {
t.Fatal("invalid value", v)
}
} else {
t.Fatal("invalid data", items[1])
}
items = []interface{}{5, 6, 7}
err = r.MPush(items)
if err != nil {
t.Fatal(err)
}
if r.Len() != size {
t.Fatal("invalid size", r.Len())
}
if r.Cap() != 0 {
t.Fatal("invalid cap", r.Cap())
}
items, err = r.MPop(3)
if err != nil {
t.Fatal(err)
}
if v, ok := items[0].(int); ok {
if v != 3 {
t.Fatal("invalid value", v)
}
} else {
t.Fatal("invalid data", items[0])
}
if v, ok := items[1].(int); ok {
if v != 4 {
t.Fatal("invalid value", v)
}
} else {
t.Fatal("invalid data", items[1])
}
if v, ok := items[2].(int); ok {
if v != 5 {
t.Fatal("invalid value", v)
}
} else {
t.Fatal("invalid data", items[2])
}
if r.Len() != 2 {
t.Fatal("invalid len", r.Len())
}
if r.Cap() != 3 {
t.Fatal("invalid cap", r.Cap())
}
}
func TestRingGet(t *testing.T) {
r := NewRing(5)
if !r.Empty() {
t.Fatal(" invalid len", r.Len())
}
err := r.MPush([]interface{}{1, 2, 3, 4, 5})
if err != nil {
t.Fatal(err.Error())
}
if !r.Full() {
t.Fatal(" invalid cap", r.Cap())
}
err = r.Push(1)
if err == nil {
t.Fatal("should return a error")
}
result := r.GetAll()
if len(result) != 5 {
t.Fatal("invalid len", len(result))
}
value, _ := r.Pop()
v, _ := value.(int)
if v != 1 {
t.Fatal("invalid value", v)
}
result = r.Gets(3)
if len(result) != 3 {
t.Fatal("invalid len", len(result))
}
value, _ = result[0].(int)
v, _ = value.(int)
if v != 2 {
t.Fatal("invalid value", v)
}
value, _ = result[2].(int)
v, _ = value.(int)
if v != 4 {
t.Fatal("invalid value", v)
}
}

View File

@ -0,0 +1,95 @@
package timingwheel
import (
"time"
)
type regItem struct {
timeout time.Duration
reply chan chan bool
}
type TimingWheel struct {
interval time.Duration
ticker *time.Ticker
quit chan bool
reg chan *regItem
maxTimeout time.Duration
buckets []chan bool
pos int
}
func NewTimingWheel(interval time.Duration, buckets int) *TimingWheel {
w := new(TimingWheel)
w.interval = interval
w.reg = make(chan *regItem, 128)
w.quit = make(chan bool)
w.pos = 0
w.maxTimeout = time.Duration(interval * (time.Duration(buckets)))
w.buckets = make([]chan bool, buckets)
for i := range w.buckets {
w.buckets[i] = make(chan bool)
}
w.ticker = time.NewTicker(interval)
go w.run()
return w
}
func (w *TimingWheel) Stop() {
w.quit <- true
}
func (w *TimingWheel) After(timeout time.Duration) <-chan bool {
if timeout >= w.maxTimeout {
panic("timeout too much, over maxtimeout")
}
reply := make(chan chan bool)
w.reg <- &regItem{timeout: timeout, reply: reply}
return <-reply
}
func (w *TimingWheel) run() {
for {
select {
case item := <-w.reg:
w.register(item)
case <-w.ticker.C:
w.onTicker()
case <-w.quit:
w.ticker.Stop()
return
}
}
}
func (w *TimingWheel) register(item *regItem) {
timeout := item.timeout
index := (w.pos + int(timeout/w.interval)) % len(w.buckets)
b := w.buckets[index]
item.reply <- b
}
func (w *TimingWheel) onTicker() {
close(w.buckets[w.pos])
w.buckets[w.pos] = make(chan bool)
w.pos = (w.pos + 1) % len(w.buckets)
}

View File

@ -0,0 +1,19 @@
package timingwheel
import (
"testing"
"time"
)
func TestTimingWheel(t *testing.T) {
w := NewTimingWheel(1*time.Second, 10)
t.Log(time.Now().Unix())
for {
select {
case <-w.After(1 * time.Second):
t.Log(time.Now().Unix())
return
}
}
}