replication optimize

This commit is contained in:
siddontang 2014-11-18 22:27:39 +08:00
parent b7cabcbb5f
commit 9256808dc5
2 changed files with 58 additions and 39 deletions

View File

@ -1,6 +1,7 @@
package rpl package rpl
import ( import (
"bufio"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
@ -10,6 +11,7 @@ import (
"github.com/siddontang/go/num" "github.com/siddontang/go/num"
"github.com/siddontang/go/sync2" "github.com/siddontang/go/sync2"
"io" "io"
"io/ioutil"
"os" "os"
"path" "path"
"reflect" "reflect"
@ -363,6 +365,8 @@ type tableWriter struct {
wf *os.File wf *os.File
rf *os.File rf *os.File
wb *bufio.Writer
rm sync.Mutex rm sync.Mutex
base string base string
@ -372,6 +376,7 @@ type tableWriter struct {
first uint64 first uint64
last uint64 last uint64
offsetPos int64
offsetBuf []byte offsetBuf []byte
maxLogSize int64 maxLogSize int64
@ -381,7 +386,7 @@ type tableWriter struct {
syncType int syncType int
lastTime uint32 lastTime uint32
cache *logLRUCache // cache *logLRUCache
} }
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter { func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
@ -395,12 +400,16 @@ func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
t.name = path.Join(base, fmtTableName(index)) t.name = path.Join(base, fmtTableName(index))
t.index = index t.index = index
t.offsetPos = 0
t.maxLogSize = maxLogSize t.maxLogSize = maxLogSize
//maybe config later?
t.wb = bufio.NewWriterSize(ioutil.Discard, 4096)
t.closed = false t.closed = false
//maybe use config later //maybe use config later
t.cache = newLogLRUCache(1024*1024, 1000) // t.cache = newLogLRUCache(1024*1024, 1000)
return t return t
} }
@ -423,6 +432,8 @@ func (t *tableWriter) close() {
t.wf.Close() t.wf.Close()
t.wf = nil t.wf = nil
} }
t.wb.Reset(ioutil.Discard)
} }
func (t *tableWriter) Close() { func (t *tableWriter) Close() {
@ -456,7 +467,8 @@ func (t *tableWriter) reset() {
t.index = t.index + 1 t.index = t.index + 1
t.name = path.Join(t.base, fmtTableName(t.index)) t.name = path.Join(t.base, fmtTableName(t.index))
t.offsetBuf = t.offsetBuf[0:0] t.offsetBuf = t.offsetBuf[0:0]
t.cache.Reset() t.offsetPos = 0
// t.cache.Reset()
} }
func (t *tableWriter) Flush() (*tableReader, error) { func (t *tableWriter) Flush() (*tableReader, error) {
@ -558,27 +570,40 @@ func (t *tableWriter) StoreLog(l *Log) error {
if t.wf, err = os.OpenFile(t.name, os.O_CREATE|os.O_WRONLY, 0644); err != nil { if t.wf, err = os.OpenFile(t.name, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
return err return err
} }
t.wb.Reset(t.wf)
} }
if t.offsetBuf == nil { if t.offsetBuf == nil {
t.offsetBuf = make([]byte, 0, maxLogNumInFile*4) t.offsetBuf = make([]byte, 0, maxLogNumInFile*4)
} }
st, _ := t.wf.Stat() // st, _ := t.wf.Stat()
if st.Size() >= t.maxLogSize { // if st.Size() >= t.maxLogSize {
// return errTableNeedFlush
// }
if t.offsetPos >= t.maxLogSize {
return errTableNeedFlush return errTableNeedFlush
} }
offsetPos := uint32(st.Size()) offsetPos := t.offsetPos
buf, _ := l.Marshal() if err := l.Encode(t.wb); err != nil {
if n, err := t.wf.Write(buf); err != nil { return err
} else if err = t.wb.Flush(); err != nil {
return err return err
} else if n != len(buf) {
return io.ErrShortWrite
} }
t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(offsetPos)...) // buf, _ := l.Marshal()
// if n, err := t.wf.Write(buf); err != nil {
// return err
// } else if n != len(buf) {
// return io.ErrShortWrite
// }
t.offsetPos += int64(l.Size())
t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(uint32(offsetPos))...)
if t.first == 0 { if t.first == 0 {
t.first = l.ID t.first = l.ID
} }
@ -587,7 +612,7 @@ func (t *tableWriter) StoreLog(l *Log) error {
t.lastTime = l.CreateTime t.lastTime = l.CreateTime
t.cache.Set(l.ID, buf) // t.cache.Set(l.ID, buf)
if t.syncType == 2 { if t.syncType == 2 {
if err := t.wf.Sync(); err != nil { if err := t.wf.Sync(); err != nil {
@ -606,14 +631,13 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
return ErrLogNotFound return ErrLogNotFound
} }
//todo memory cache // if cl := t.cache.Get(id); cl != nil {
if cl := t.cache.Get(id); cl != nil { // if err := l.Unmarshal(cl); err == nil && l.ID == id {
if err := l.Unmarshal(cl); err == nil && l.ID == id { // return nil
return nil // } else {
} else { // t.cache.Delete(id)
t.cache.Delete(id) // }
} // }
}
offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:]) offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:])
@ -623,8 +647,6 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error {
return fmt.Errorf("invalid log id %d != %d", id, l.ID) return fmt.Errorf("invalid log id %d != %d", id, l.ID)
} }
//todo add cache here?
return nil return nil
} }

View File

@ -40,24 +40,21 @@ func (l *Log) Unmarshal(b []byte) error {
} }
func (l *Log) Encode(w io.Writer) error { func (l *Log) Encode(w io.Writer) error {
buf := make([]byte, l.HeadSize()) if err := binary.Write(w, binary.BigEndian, l.ID); err != nil {
return err
pos := 0 }
binary.BigEndian.PutUint64(buf[pos:], l.ID)
pos += 8 if err := binary.Write(w, binary.BigEndian, l.CreateTime); err != nil {
return err
binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) }
pos += 4
if _, err := w.Write([]byte{l.Compression}); err != nil {
buf[pos] = l.Compression return err
pos++ }
binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) dataLen := uint32(len(l.Data))
if err := binary.Write(w, binary.BigEndian, dataLen); err != nil {
if n, err := w.Write(buf); err != nil {
return err return err
} else if n != len(buf) {
return io.ErrShortWrite
} }
if n, err := w.Write(l.Data); err != nil { if n, err := w.Write(l.Data); err != nil {