From be265447ac22c5495c472ad5b82ea410eef57ea8 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 31 Oct 2014 15:40:47 +0800 Subject: [PATCH] reduce replication interface --- rpl/file_store.go | 33 ++++++++++++++--------------- rpl/file_table.go | 1 + rpl/goleveldb_store.go | 48 ++++++++++++------------------------------ rpl/rpl.go | 6 +----- rpl/store.go | 6 +----- rpl/store_test.go | 11 +++++----- 6 files changed, 37 insertions(+), 68 deletions(-) create mode 100644 rpl/file_table.go diff --git a/rpl/file_store.go b/rpl/file_store.go index 91ea418..d6b9d26 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -14,14 +14,23 @@ import ( ) const ( - defaultMaxLogFileSize = 1024 * 1024 * 1024 + defaultMaxLogFileSize = uint32(1024 * 1024 * 1024) + + //why 4G, we can use uint32 as offset, reduce memory useage + maxLogFileSize = uint32(4*1024*1024*1024 - 1) ) /* -index file format: -ledis-bin.00001 -ledis-bin.00002 -ledis-bin.00003 + File Store: + 0000001.data + 0000001.meta + 0000002.data + 0000002.meta + + data: log1 data | log2 data | magic data + meta: log1 pos in data | log2 pos in data + + we use table to mangage data + meta pair */ type FileStore struct { @@ -29,7 +38,7 @@ type FileStore struct { m sync.Mutex - maxFileSize int + maxFileSize uint32 first uint64 last uint64 @@ -66,7 +75,7 @@ func NewFileStore(path string) (*FileStore, error) { return s, nil } -func (s *FileStore) SetMaxFileSize(size int) { +func (s *FileStore) SetMaxFileSize(size uint32) { s.maxFileSize = size } @@ -75,11 +84,6 @@ func (s *FileStore) GetLog(id uint64, log *Log) error { return nil } -func (s *FileStore) SeekLog(id uint64, log *Log) error { - panic("not implementation") - return nil -} - func (s *FileStore) FirstID() (uint64, error) { panic("not implementation") return 0, nil @@ -95,11 +99,6 @@ func (s *FileStore) StoreLog(log *Log) error { return nil } -func (s *FileStore) StoreLogs(logs []*Log) error { - panic("not implementation") - return nil -} - func (s *FileStore) Purge(n uint64) error { panic("not implementation") return nil diff --git a/rpl/file_table.go b/rpl/file_table.go new file mode 100644 index 0000000..def4355 --- /dev/null +++ b/rpl/file_table.go @@ -0,0 +1 @@ +package rpl diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index 39bf63a..349cb4a 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -21,6 +21,8 @@ type GoLevelDBStore struct { first uint64 last uint64 + + buf bytes.Buffer } func (s *GoLevelDBStore) FirstID() (uint64, error) { @@ -84,30 +86,10 @@ func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error { } } -func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error { - it := s.db.NewIterator() - defer it.Close() - - it.Seek(num.Uint64ToBytes(id)) - - if !it.Valid() { - return ErrLogNotFound - } else { - return log.Decode(bytes.NewBuffer(it.RawValue())) - } -} - func (s *GoLevelDBStore) StoreLog(log *Log) error { - return s.StoreLogs([]*Log{log}) -} - -func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { s.m.Lock() defer s.m.Unlock() - w := s.db.NewWriteBatch() - defer w.Rollback() - last, err := s.lastID() if err != nil { return err @@ -115,24 +97,20 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { s.last = InvalidLogID - var buf bytes.Buffer - for _, log := range logs { - buf.Reset() + s.buf.Reset() - if log.ID <= last { - return ErrLessLogID - } - - last = log.ID - key := num.Uint64ToBytes(log.ID) - - if err := log.Encode(&buf); err != nil { - return err - } - w.Put(key, buf.Bytes()) + if log.ID != last+1 { + return ErrStoreLogID } - if err = w.Commit(); err != nil { + last = log.ID + key := num.Uint64ToBytes(log.ID) + + if err := log.Encode(&s.buf); err != nil { + return err + } + + if err = s.db.Put(key, s.buf.Bytes()); err != nil { return err } diff --git a/rpl/rpl.go b/rpl/rpl.go index d862132..151d17c 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -135,14 +135,10 @@ func (r *Replication) WaitLog() <-chan struct{} { } func (r *Replication) StoreLog(log *Log) error { - return r.StoreLogs([]*Log{log}) -} - -func (r *Replication) StoreLogs(logs []*Log) error { r.m.Lock() defer r.m.Unlock() - return r.s.StoreLogs(logs) + return r.s.StoreLog(log) } func (r *Replication) FirstLogID() (uint64, error) { diff --git a/rpl/store.go b/rpl/store.go index 8d5e8ec..00d9594 100644 --- a/rpl/store.go +++ b/rpl/store.go @@ -10,22 +10,18 @@ const ( var ( ErrLogNotFound = errors.New("log not found") - ErrLessLogID = errors.New("log id is less") + ErrStoreLogID = errors.New("log id is less") ErrNoBehindLog = errors.New("no behind commit log") ) type LogStore interface { GetLog(id uint64, log *Log) error - // Get the first log which ID is equal or larger than id - SeekLog(id uint64, log *Log) error - FirstID() (uint64, error) LastID() (uint64, error) // if log id is less than current last id, return error StoreLog(log *Log) error - StoreLogs(logs []*Log) error // Delete first n logs Purge(n uint64) error diff --git a/rpl/store_test.go b/rpl/store_test.go index 0dda1ce..f572317 100644 --- a/rpl/store_test.go +++ b/rpl/store_test.go @@ -63,16 +63,15 @@ func testLogs(t *testing.T, l LogStore) { } // Attempt to write multiple logs - var logs []*Log for i := 11; i <= 20; i++ { nl := &Log{ ID: uint64(i), Data: []byte("first"), } - logs = append(logs, nl) - } - if err := l.StoreLogs(logs); err != nil { - t.Fatalf("err: %v", err) + + if err := l.StoreLog(nl); err != nil { + t.Fatalf("err: %v", err) + } } // Try to fetch @@ -157,7 +156,7 @@ func testLogs(t *testing.T, l LogStore) { } now := uint32(time.Now().Unix()) - logs = []*Log{} + logs := []*Log{} for i := 1; i <= 20; i++ { nl := &Log{ ID: uint64(i),