From 4e802b06e32047b812a4f0ed4b910cd60ebcd46d Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 12 Nov 2014 11:52:10 +0800 Subject: [PATCH] replication write table add lru cache --- rpl/file_table.go | 22 +++++++++- rpl/loglrucache.go | 95 +++++++++++++++++++++++++++++++++++++++++ rpl/loglrucache_test.go | 48 +++++++++++++++++++++ 3 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 rpl/loglrucache.go create mode 100644 rpl/loglrucache_test.go diff --git a/rpl/file_table.go b/rpl/file_table.go index bfff239..3c07635 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -380,6 +380,8 @@ type tableWriter struct { syncType int lastTime uint32 + + cache *logLRUCache } func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { @@ -397,6 +399,9 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { t.closed = false + //maybe use config later + t.cache = newLogLRUCache(1024*1024, 1000) + return t } @@ -451,6 +456,7 @@ func (t *tableWriter) reset() { t.index = t.index + 1 t.name = path.Join(t.base, fmtTableName(t.index)) t.offsetBuf = t.offsetBuf[0:0] + t.cache.Reset() } func (t *tableWriter) Flush() (*tableReader, error) { @@ -565,8 +571,11 @@ func (t *tableWriter) StoreLog(l *Log) error { offsetPos := uint32(st.Size()) - if err := l.Encode(t.wf); err != nil { + buf, _ := l.Marshal() + if n, err := t.wf.Write(buf); err != nil { return err + } else if n != len(buf) { + return io.ErrShortWrite } t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(offsetPos)...) @@ -578,7 +587,7 @@ func (t *tableWriter) StoreLog(l *Log) error { t.lastTime = l.CreateTime - //todo add LRU cache + t.cache.Set(l.ID, buf) if t.syncType == 2 { if err := t.wf.Sync(); err != nil { @@ -598,6 +607,13 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { } //todo memory cache + if cl := t.cache.Get(id); cl != nil { + if err := l.Unmarshal(cl); err == nil && l.ID == id { + return nil + } else { + t.cache.Delete(id) + } + } offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:]) @@ -607,6 +623,8 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { return fmt.Errorf("invalid log id %d != %d", id, l.ID) } + //todo add cache here? + return nil } diff --git a/rpl/loglrucache.go b/rpl/loglrucache.go new file mode 100644 index 0000000..3dcbaf3 --- /dev/null +++ b/rpl/loglrucache.go @@ -0,0 +1,95 @@ +package rpl + +import ( + "container/list" + "encoding/binary" +) + +type logLRUCache struct { + itemsList *list.List + itemsMap map[uint64]*list.Element + size int + capability int + maxNum int +} + +func newLogLRUCache(capability int, maxNum int) *logLRUCache { + if capability <= 0 { + capability = 1024 * 1024 + } + + if maxNum <= 0 { + maxNum = 16 + } + + return &logLRUCache{ + itemsList: list.New(), + itemsMap: make(map[uint64]*list.Element), + size: 0, + capability: capability, + maxNum: maxNum, + } +} + +func (cache *logLRUCache) Set(id uint64, data []byte) { + elem, ok := cache.itemsMap[id] + if ok { + //we may not enter here + // item already exists, so move it to the front of the list and update the data + cache.itemsList.MoveToFront(elem) + ol := elem.Value.([]byte) + elem.Value = data + cache.size += (len(data) - len(ol)) + } else { + cache.size += len(data) + + // item doesn't exist, so add it to front of list + elem = cache.itemsList.PushFront(data) + cache.itemsMap[id] = elem + } + + // evict LRU entry if the cache is full + for cache.size > cache.capability || cache.itemsList.Len() > cache.maxNum { + removedElem := cache.itemsList.Back() + l := removedElem.Value.([]byte) + cache.itemsList.Remove(removedElem) + delete(cache.itemsMap, binary.BigEndian.Uint64(l[0:8])) + + cache.size -= len(l) + if cache.size <= 0 { + cache.size = 0 + } + } +} + +func (cache *logLRUCache) Get(id uint64) []byte { + elem, ok := cache.itemsMap[id] + if !ok { + return nil + } + + // item exists, so move it to front of list and return it + cache.itemsList.MoveToFront(elem) + l := elem.Value.([]byte) + return l +} + +func (cache *logLRUCache) Delete(id uint64) { + elem, ok := cache.itemsMap[id] + if !ok { + return + } + + cache.itemsList.Remove(elem) + delete(cache.itemsMap, id) +} + +func (cache *logLRUCache) Len() int { + return cache.itemsList.Len() +} + +func (cache *logLRUCache) Reset() { + cache.itemsList = list.New() + cache.itemsMap = make(map[uint64]*list.Element) + cache.size = 0 +} diff --git a/rpl/loglrucache_test.go b/rpl/loglrucache_test.go new file mode 100644 index 0000000..88a2923 --- /dev/null +++ b/rpl/loglrucache_test.go @@ -0,0 +1,48 @@ +package rpl + +import ( + "testing" +) + +func TestLogLRUCache(t *testing.T) { + c := newLogLRUCache(180, 10) + + var i uint64 + for i = 1; i <= 10; i++ { + l := &Log{i, 0, 0, []byte("0")} + b, _ := l.Marshal() + c.Set(l.ID, b) + } + + for i = 1; i <= 10; i++ { + if l := c.Get(i); l == nil { + t.Fatal("must exist", i) + } + } + + for i = 11; i <= 20; i++ { + l := &Log{i, 0, 0, []byte("0")} + b, _ := l.Marshal() + c.Set(l.ID, b) + } + + for i = 1; i <= 10; i++ { + if l := c.Get(i); l != nil { + t.Fatal("must not exist", i) + } + } + + c.Get(11) + + l := &Log{21, 0, 0, []byte("0")} + b, _ := l.Marshal() + c.Set(l.ID, b) + + if l := c.Get(12); l != nil { + t.Fatal("must nil", 12) + } + + if l := c.Get(11); l == nil { + t.Fatal("must not nil", 11) + } +}