reduce replication interface

This commit is contained in:
siddontang 2014-10-31 15:40:47 +08:00
parent b787ddbbd1
commit be265447ac
6 changed files with 37 additions and 68 deletions

View File

@ -14,14 +14,23 @@ import (
) )
const ( const (
defaultMaxLogFileSize = 1024 * 1024 * 1024 defaultMaxLogFileSize = uint32(1024 * 1024 * 1024)
//why 4G, we can use uint32 as offset, reduce memory useage
maxLogFileSize = uint32(4*1024*1024*1024 - 1)
) )
/* /*
index file format: File Store:
ledis-bin.00001 0000001.data
ledis-bin.00002 0000001.meta
ledis-bin.00003 0000002.data
0000002.meta
data: log1 data | log2 data | magic data
meta: log1 pos in data | log2 pos in data
we use table to mangage data + meta pair
*/ */
type FileStore struct { type FileStore struct {
@ -29,7 +38,7 @@ type FileStore struct {
m sync.Mutex m sync.Mutex
maxFileSize int maxFileSize uint32
first uint64 first uint64
last uint64 last uint64
@ -66,7 +75,7 @@ func NewFileStore(path string) (*FileStore, error) {
return s, nil return s, nil
} }
func (s *FileStore) SetMaxFileSize(size int) { func (s *FileStore) SetMaxFileSize(size uint32) {
s.maxFileSize = size s.maxFileSize = size
} }
@ -75,11 +84,6 @@ func (s *FileStore) GetLog(id uint64, log *Log) error {
return nil return nil
} }
func (s *FileStore) SeekLog(id uint64, log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) FirstID() (uint64, error) { func (s *FileStore) FirstID() (uint64, error) {
panic("not implementation") panic("not implementation")
return 0, nil return 0, nil
@ -95,11 +99,6 @@ func (s *FileStore) StoreLog(log *Log) error {
return nil return nil
} }
func (s *FileStore) StoreLogs(logs []*Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) Purge(n uint64) error { func (s *FileStore) Purge(n uint64) error {
panic("not implementation") panic("not implementation")
return nil return nil

1
rpl/file_table.go Normal file
View File

@ -0,0 +1 @@
package rpl

View File

@ -21,6 +21,8 @@ type GoLevelDBStore struct {
first uint64 first uint64
last uint64 last uint64
buf bytes.Buffer
} }
func (s *GoLevelDBStore) FirstID() (uint64, error) { func (s *GoLevelDBStore) FirstID() (uint64, error) {
@ -84,30 +86,10 @@ func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
} }
} }
func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error {
it := s.db.NewIterator()
defer it.Close()
it.Seek(num.Uint64ToBytes(id))
if !it.Valid() {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(it.RawValue()))
}
}
func (s *GoLevelDBStore) StoreLog(log *Log) error { func (s *GoLevelDBStore) StoreLog(log *Log) error {
return s.StoreLogs([]*Log{log})
}
func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
w := s.db.NewWriteBatch()
defer w.Rollback()
last, err := s.lastID() last, err := s.lastID()
if err != nil { if err != nil {
return err return err
@ -115,24 +97,20 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
s.last = InvalidLogID s.last = InvalidLogID
var buf bytes.Buffer s.buf.Reset()
for _, log := range logs {
buf.Reset()
if log.ID <= last { if log.ID != last+1 {
return ErrLessLogID return ErrStoreLogID
} }
last = log.ID last = log.ID
key := num.Uint64ToBytes(log.ID) key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&buf); err != nil { if err := log.Encode(&s.buf); err != nil {
return err return err
} }
w.Put(key, buf.Bytes())
}
if err = w.Commit(); err != nil { if err = s.db.Put(key, s.buf.Bytes()); err != nil {
return err return err
} }

View File

@ -135,14 +135,10 @@ func (r *Replication) WaitLog() <-chan struct{} {
} }
func (r *Replication) StoreLog(log *Log) error { func (r *Replication) StoreLog(log *Log) error {
return r.StoreLogs([]*Log{log})
}
func (r *Replication) StoreLogs(logs []*Log) error {
r.m.Lock() r.m.Lock()
defer r.m.Unlock() defer r.m.Unlock()
return r.s.StoreLogs(logs) return r.s.StoreLog(log)
} }
func (r *Replication) FirstLogID() (uint64, error) { func (r *Replication) FirstLogID() (uint64, error) {

View File

@ -10,22 +10,18 @@ const (
var ( var (
ErrLogNotFound = errors.New("log not found") ErrLogNotFound = errors.New("log not found")
ErrLessLogID = errors.New("log id is less") ErrStoreLogID = errors.New("log id is less")
ErrNoBehindLog = errors.New("no behind commit log") ErrNoBehindLog = errors.New("no behind commit log")
) )
type LogStore interface { type LogStore interface {
GetLog(id uint64, log *Log) error GetLog(id uint64, log *Log) error
// Get the first log which ID is equal or larger than id
SeekLog(id uint64, log *Log) error
FirstID() (uint64, error) FirstID() (uint64, error)
LastID() (uint64, error) LastID() (uint64, error)
// if log id is less than current last id, return error // if log id is less than current last id, return error
StoreLog(log *Log) error StoreLog(log *Log) error
StoreLogs(logs []*Log) error
// Delete first n logs // Delete first n logs
Purge(n uint64) error Purge(n uint64) error

View File

@ -63,17 +63,16 @@ func testLogs(t *testing.T, l LogStore) {
} }
// Attempt to write multiple logs // Attempt to write multiple logs
var logs []*Log
for i := 11; i <= 20; i++ { for i := 11; i <= 20; i++ {
nl := &Log{ nl := &Log{
ID: uint64(i), ID: uint64(i),
Data: []byte("first"), Data: []byte("first"),
} }
logs = append(logs, nl)
} if err := l.StoreLog(nl); err != nil {
if err := l.StoreLogs(logs); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
}
// Try to fetch // Try to fetch
if err := l.GetLog(10, &out); err != nil { if err := l.GetLog(10, &out); err != nil {
@ -157,7 +156,7 @@ func testLogs(t *testing.T, l LogStore) {
} }
now := uint32(time.Now().Unix()) now := uint32(time.Now().Unix())
logs = []*Log{} logs := []*Log{}
for i := 1; i <= 20; i++ { for i := 1; i <= 20; i++ {
nl := &Log{ nl := &Log{
ID: uint64(i), ID: uint64(i),