replication write table add lru cache

This commit is contained in:
siddontang 2014-11-12 11:52:10 +08:00
parent 6f39cb17a9
commit 4e802b06e3
3 changed files with 163 additions and 2 deletions

View File

@ -380,6 +380,8 @@ type tableWriter struct {
syncType int syncType int
lastTime uint32 lastTime uint32
cache *logLRUCache
} }
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
@ -397,6 +399,9 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
t.closed = false t.closed = false
//maybe use config later
t.cache = newLogLRUCache(1024*1024, 1000)
return t return t
} }
@ -451,6 +456,7 @@ func (t *tableWriter) reset() {
t.index = t.index + 1 t.index = t.index + 1
t.name = path.Join(t.base, fmtTableName(t.index)) t.name = path.Join(t.base, fmtTableName(t.index))
t.offsetBuf = t.offsetBuf[0:0] t.offsetBuf = t.offsetBuf[0:0]
t.cache.Reset()
} }
func (t *tableWriter) Flush() (*tableReader, error) { func (t *tableWriter) Flush() (*tableReader, error) {
@ -565,8 +571,11 @@ func (t *tableWriter) StoreLog(l *Log) error {
offsetPos := uint32(st.Size()) offsetPos := uint32(st.Size())
if err := l.Encode(t.wf); err != nil { buf, _ := l.Marshal()
if n, err := t.wf.Write(buf); err != nil {
return err return err
} else if n != len(buf) {
return io.ErrShortWrite
} }
t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(offsetPos)...) t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(offsetPos)...)
@ -578,7 +587,7 @@ func (t *tableWriter) StoreLog(l *Log) error {
t.lastTime = l.CreateTime t.lastTime = l.CreateTime
//todo add LRU cache t.cache.Set(l.ID, buf)
if t.syncType == 2 { if t.syncType == 2 {
if err := t.wf.Sync(); err != nil { if err := t.wf.Sync(); err != nil {
@ -598,6 +607,13 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
} }
//todo memory cache //todo memory cache
if cl := t.cache.Get(id); cl != nil {
if err := l.Unmarshal(cl); err == nil && l.ID == id {
return nil
} else {
t.cache.Delete(id)
}
}
offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:]) offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:])
@ -607,6 +623,8 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
return fmt.Errorf("invalid log id %d != %d", id, l.ID) return fmt.Errorf("invalid log id %d != %d", id, l.ID)
} }
//todo add cache here?
return nil return nil
} }

95
rpl/loglrucache.go Normal file
View File

@ -0,0 +1,95 @@
package rpl
import (
"container/list"
"encoding/binary"
)
type logLRUCache struct {
itemsList *list.List
itemsMap map[uint64]*list.Element
size int
capability int
maxNum int
}
func newLogLRUCache(capability int, maxNum int) *logLRUCache {
if capability <= 0 {
capability = 1024 * 1024
}
if maxNum <= 0 {
maxNum = 16
}
return &logLRUCache{
itemsList: list.New(),
itemsMap: make(map[uint64]*list.Element),
size: 0,
capability: capability,
maxNum: maxNum,
}
}
func (cache *logLRUCache) Set(id uint64, data []byte) {
elem, ok := cache.itemsMap[id]
if ok {
//we may not enter here
// item already exists, so move it to the front of the list and update the data
cache.itemsList.MoveToFront(elem)
ol := elem.Value.([]byte)
elem.Value = data
cache.size += (len(data) - len(ol))
} else {
cache.size += len(data)
// item doesn't exist, so add it to front of list
elem = cache.itemsList.PushFront(data)
cache.itemsMap[id] = elem
}
// evict LRU entry if the cache is full
for cache.size > cache.capability || cache.itemsList.Len() > cache.maxNum {
removedElem := cache.itemsList.Back()
l := removedElem.Value.([]byte)
cache.itemsList.Remove(removedElem)
delete(cache.itemsMap, binary.BigEndian.Uint64(l[0:8]))
cache.size -= len(l)
if cache.size <= 0 {
cache.size = 0
}
}
}
func (cache *logLRUCache) Get(id uint64) []byte {
elem, ok := cache.itemsMap[id]
if !ok {
return nil
}
// item exists, so move it to front of list and return it
cache.itemsList.MoveToFront(elem)
l := elem.Value.([]byte)
return l
}
func (cache *logLRUCache) Delete(id uint64) {
elem, ok := cache.itemsMap[id]
if !ok {
return
}
cache.itemsList.Remove(elem)
delete(cache.itemsMap, id)
}
func (cache *logLRUCache) Len() int {
return cache.itemsList.Len()
}
func (cache *logLRUCache) Reset() {
cache.itemsList = list.New()
cache.itemsMap = make(map[uint64]*list.Element)
cache.size = 0
}

48
rpl/loglrucache_test.go Normal file
View File

@ -0,0 +1,48 @@
package rpl
import (
"testing"
)
func TestLogLRUCache(t *testing.T) {
c := newLogLRUCache(180, 10)
var i uint64
for i = 1; i <= 10; i++ {
l := &Log{i, 0, 0, []byte("0")}
b, _ := l.Marshal()
c.Set(l.ID, b)
}
for i = 1; i <= 10; i++ {
if l := c.Get(i); l == nil {
t.Fatal("must exist", i)
}
}
for i = 11; i <= 20; i++ {
l := &Log{i, 0, 0, []byte("0")}
b, _ := l.Marshal()
c.Set(l.ID, b)
}
for i = 1; i <= 10; i++ {
if l := c.Get(i); l != nil {
t.Fatal("must not exist", i)
}
}
c.Get(11)
l := &Log{21, 0, 0, []byte("0")}
b, _ := l.Marshal()
c.Set(l.ID, b)
if l := c.Get(12); l != nil {
t.Fatal("must nil", 12)
}
if l := c.Get(11); l == nil {
t.Fatal("must not nil", 11)
}
}