Merge branch 'rpl-feature' into develop

This commit is contained in:
siddontang 2014-11-11 09:08:33 +08:00
commit a5dcdbf6c3
35 changed files with 1661 additions and 584 deletions

6
Godeps/Godeps.json generated
View File

@ -14,6 +14,10 @@
"Comment": "data/v1-228-g8fb50d5",
"Rev": "8fb50d5ee57110936b904a7539d4c5f2bf2359db"
},
{
"ImportPath": "github.com/edsrzf/mmap-go",
"Rev": "6c75090c55983bef2e129e173681b20d24871ef8"
},
{
"ImportPath": "github.com/siddontang/go/arena",
"Rev": "ecf49fc0738105e87d20e29aa82c403b666ff0b4"
@ -52,7 +56,7 @@
},
{
"ImportPath": "github.com/siddontang/goleveldb/leveldb",
"Rev": "c1f6d721561c48f467b26a277741e55fd224df1e"
"Rev": "41805642b981fb3d9462f6641bcb94b8609ca791"
},
{
"ImportPath": "github.com/szferi/gomdb",

View File

@ -10,20 +10,19 @@ if [ "$?" = 0 ]; then
exit 0
fi
go get github.com/siddontang/goleveldb/leveldb
go get github.com/szferi/gomdb
go get github.com/boltdb/bolt
go get github.com/ugorji/go/codec
go get github.com/BurntSushi/toml
go get github.com/edsrzf/mmap-go
go get github.com/siddontang/goleveldb/leveldb
go get github.com/siddontang/go/bson
go get github.com/siddontang/go/log
go get github.com/siddontang/go/snappy
go get github.com/siddontang/go/num
go get github.com/siddontang/go/filelock
go get github.com/siddontang/go/sync2
go get github.com/siddontang/go/arena
go get github.com/siddontang/go/arena

View File

@ -74,6 +74,8 @@ type ReplicationConfig struct {
WaitSyncTime int `toml:"wait_sync_time"`
WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"`
ExpiredLogDays int `toml:"expired_log_days"`
StoreName string `toml:"store_name"`
MaxLogFileSize int64 `toml:"max_log_file_size"`
SyncLog int `toml:"sync_log"`
Compression bool `toml:"compression"`
}

View File

@ -72,11 +72,11 @@ max_open_files = 1024
# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc
compression = 0
block_size = 65536
write_buffer_size = 67108864
cache_size = 524288000
write_buffer_size = 134217728
cache_size = 1073741824
max_open_files = 1024
max_write_buffer_num = 2
min_write_buffer_number_to_merge = 1
max_write_buffer_num = 6
min_write_buffer_number_to_merge = 2
num_levels = 7
level0_file_num_compaction_trigger = 8
level0_slowdown_writes_trigger = 16
@ -88,9 +88,9 @@ max_bytes_for_level_multiplier = 8
disable_auto_compactions = false
disable_data_sync = false
use_fsync = false
background_theads = 4
background_theads = 16
high_priority_background_threads = 1
max_background_compactions = 3
max_background_compactions = 15
max_background_flushes = 1
allow_os_buffer = true
enable_statistics = false
@ -121,9 +121,16 @@ wait_sync_time = 500
# If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2
# store name: file, goleveldb
# change in runtime is very dangerous
store_name = "file"
# Expire write ahead logs after the given days
expired_log_days = 7
# for file store, if 0, use default 1G, max is 4G
max_log_file_size = 0
# Sync log to disk if possible
# 0: no sync
# 1: sync every second

View File

@ -72,11 +72,11 @@ max_open_files = 1024
# 0:no, 1:snappy, 2:zlib, 3:bz2, 4:lz4, 5:lz4hc
compression = 0
block_size = 65536
write_buffer_size = 67108864
cache_size = 524288000
write_buffer_size = 134217728
cache_size = 1073741824
max_open_files = 1024
max_write_buffer_num = 2
min_write_buffer_number_to_merge = 1
max_write_buffer_num = 6
min_write_buffer_number_to_merge = 2
num_levels = 7
level0_file_num_compaction_trigger = 8
level0_slowdown_writes_trigger = 16
@ -88,9 +88,9 @@ max_bytes_for_level_multiplier = 8
disable_auto_compactions = false
disable_data_sync = false
use_fsync = false
background_theads = 4
background_theads = 16
high_priority_background_threads = 1
max_background_compactions = 3
max_background_compactions = 15
max_background_flushes = 1
allow_os_buffer = true
enable_statistics = false
@ -121,9 +121,16 @@ wait_sync_time = 500
# If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2
# store name: file, goleveldb
# change in runtime is very dangerous
store_name = "file"
# Expire write ahead logs after the given days
expired_log_days = 7
# for file store, if 0, use default 1G, max is 4G
max_log_file_size = 0
# Sync log to disk if possible
# 0: no sync
# 1: sync every second

View File

@ -15,8 +15,6 @@ type batch struct {
sync.Locker
tx *Tx
eb *eventBatch
}
func (b *batch) Commit() error {
@ -25,10 +23,12 @@ func (b *batch) Commit() error {
}
if b.tx == nil {
return b.l.handleCommit(b.eb, b.WriteBatch)
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
} else {
if b.l.r != nil {
b.tx.eb.Write(b.eb.Bytes())
if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
return err
}
}
return b.WriteBatch.Commit()
}
@ -39,25 +39,15 @@ func (b *batch) Lock() {
}
func (b *batch) Unlock() {
b.eb.Reset()
b.WriteBatch.Rollback()
b.Locker.Unlock()
}
func (b *batch) Put(key []byte, value []byte) {
if b.l.r != nil {
b.eb.Put(key, value)
}
b.WriteBatch.Put(key, value)
}
func (b *batch) Delete(key []byte) {
if b.l.r != nil {
b.eb.Delete(key)
}
b.WriteBatch.Delete(key)
}
@ -96,7 +86,6 @@ func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker, tx *Tx) *batc
b.Locker = locker
b.tx = tx
b.eb = new(eventBatch)
return b
}
@ -105,14 +94,18 @@ type commiter interface {
Commit() error
}
func (l *Ledis) handleCommit(eb *eventBatch, c commiter) error {
type commitDataGetter interface {
Data() []byte
}
func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error {
l.commitLock.Lock()
defer l.commitLock.Unlock()
var err error
if l.r != nil {
var rl *rpl.Log
if rl, err = l.r.Log(eb.Bytes()); err != nil {
if rl, err = l.r.Log(g.Data()); err != nil {
log.Fatal("write wal error %s", err.Error())
return err
}

View File

@ -155,6 +155,11 @@ func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) {
var key, value []byte
wb := l.ldb.NewWriteBatch()
defer wb.Close()
n := 0
for {
if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF {
return nil, err
@ -182,14 +187,26 @@ func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) {
return nil, err
}
if err = l.ldb.Put(key, value); err != nil {
return nil, err
wb.Put(key, value)
n++
if n%1024 == 0 {
if err = wb.Commit(); err != nil {
return nil, err
}
}
// if err = l.ldb.Put(key, value); err != nil {
// return nil, err
// }
keyBuf.Reset()
valueBuf.Reset()
}
if err = wb.Commit(); err != nil {
return nil, err
}
deKeyBuf = nil
deValueBuf = nil

View File

@ -1,101 +1,13 @@
package ledis
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/siddontang/go/hack"
"io"
"strconv"
)
const (
kTypeDeleteEvent uint8 = 0
kTypePutEvent uint8 = 1
)
var (
errInvalidPutEvent = errors.New("invalid put event")
errInvalidDeleteEvent = errors.New("invalid delete event")
errInvalidEvent = errors.New("invalid event")
)
type eventBatch struct {
bytes.Buffer
}
func (b *eventBatch) Put(key []byte, value []byte) {
l := uint32(len(key) + len(value) + 1 + 2)
binary.Write(b, binary.BigEndian, l)
b.WriteByte(kTypePutEvent)
keyLen := uint16(len(key))
binary.Write(b, binary.BigEndian, keyLen)
b.Write(key)
b.Write(value)
}
func (b *eventBatch) Delete(key []byte) {
l := uint32(len(key) + 1)
binary.Write(b, binary.BigEndian, l)
b.WriteByte(kTypeDeleteEvent)
b.Write(key)
}
type eventWriter interface {
Put(key []byte, value []byte)
Delete(key []byte)
}
func decodeEventBatch(w eventWriter, data []byte) error {
for {
if len(data) == 0 {
return nil
}
if len(data) < 4 {
return io.ErrUnexpectedEOF
}
l := binary.BigEndian.Uint32(data)
data = data[4:]
if uint32(len(data)) < l {
return io.ErrUnexpectedEOF
}
if err := decodeEvent(w, data[0:l]); err != nil {
return err
}
data = data[l:]
}
}
func decodeEvent(w eventWriter, b []byte) error {
if len(b) == 0 {
return errInvalidEvent
}
switch b[0] {
case kTypePutEvent:
if len(b[1:]) < 2 {
return errInvalidPutEvent
}
keyLen := binary.BigEndian.Uint16(b[1:3])
b = b[3:]
if len(b) < int(keyLen) {
return errInvalidPutEvent
}
w.Put(b[0:keyLen], b[keyLen:])
case kTypeDeleteEvent:
w.Delete(b[1:])
default:
return errInvalidEvent
}
return nil
}
var errInvalidEvent = errors.New("invalid event")
func formatEventKey(buf []byte, k []byte) ([]byte, error) {
if len(k) < 2 {

View File

@ -1,56 +0,0 @@
package ledis
import (
"reflect"
"testing"
)
type testEvent struct {
Key []byte
Value []byte
}
type testEventWriter struct {
evs []testEvent
}
func (w *testEventWriter) Put(key []byte, value []byte) {
e := testEvent{key, value}
w.evs = append(w.evs, e)
}
func (w *testEventWriter) Delete(key []byte) {
e := testEvent{key, nil}
w.evs = append(w.evs, e)
}
func TestEvent(t *testing.T) {
k1 := []byte("k1")
v1 := []byte("v1")
k2 := []byte("k2")
k3 := []byte("k3")
v3 := []byte("v3")
b := new(eventBatch)
b.Put(k1, v1)
b.Delete(k2)
b.Put(k3, v3)
buf := b.Bytes()
w := &testEventWriter{}
ev2 := &testEventWriter{
evs: []testEvent{
testEvent{k1, v1},
testEvent{k2, nil},
testEvent{k3, v3}},
}
if err := decodeEventBatch(w, buf); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(w, ev2) {
t.Fatal("not equal")
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/siddontang/go/log"
"github.com/siddontang/go/snappy"
"github.com/siddontang/ledisdb/rpl"
"github.com/siddontang/ledisdb/store"
"io"
"time"
)
@ -49,7 +50,12 @@ func (l *Ledis) handleReplication() error {
}
}
decodeEventBatch(l.rbatch, rl.Data)
if bd, err := store.NewBatchData(rl.Data); err != nil {
log.Error("decode batch log error %s", err.Error())
return err
} else if err = bd.Replay(l.rbatch); err != nil {
log.Error("replay batch log error %s", err.Error())
}
l.commitLock.Lock()
if err = l.rbatch.Commit(); err != nil {

View File

@ -16,7 +16,7 @@ type Tx struct {
tx *store.Tx
eb *eventBatch
data *store.BatchData
}
func (db *DB) IsTransaction() bool {
@ -32,7 +32,7 @@ func (db *DB) Begin() (*Tx, error) {
tx := new(Tx)
tx.eb = new(eventBatch)
tx.data = &store.BatchData{}
tx.DB = new(DB)
tx.DB.l = db.l
@ -71,7 +71,8 @@ func (tx *Tx) Commit() error {
return ErrTxDone
}
err := tx.l.handleCommit(tx.eb, tx.tx)
err := tx.l.handleCommit(tx.data, tx.tx)
tx.data.Reset()
tx.tx = nil
@ -88,7 +89,7 @@ func (tx *Tx) Rollback() error {
}
err := tx.tx.Rollback()
tx.eb.Reset()
tx.data.Reset()
tx.tx = nil
tx.l.wLock.Unlock()

View File

@ -2,228 +2,349 @@ package rpl
import (
"fmt"
"github.com/siddontang/go/hack"
"github.com/siddontang/go/ioutil2"
"github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sort"
"sync"
"time"
)
const (
defaultMaxLogFileSize = 1024 * 1024 * 1024
defaultMaxLogFileSize = int64(1024 * 1024 * 1024)
//why 4G, we can use uint32 as offset, reduce memory useage
maxLogFileSize = int64(uint32(4*1024*1024*1024 - 1))
maxLogNumInFile = uint64(10000000)
)
/*
index file format:
ledis-bin.00001
ledis-bin.00002
ledis-bin.00003
File Store:
00000001.ldb
00000002.ldb
log: log1 data | log2 data | split data | log1 offset | log 2 offset | offset start pos | offset length | magic data
log id can not be 0, we use here for split data
if data has no magic data, it means that we don't close replication gracefully.
so we must repair the log data
log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data
split data = log0 data + [padding 0] -> file % pagesize() == 0
log0: id 0, create time 0, compression 0, data len 7, data "ledisdb"
log offset: bigendian uint32 | bigendian uint32
offset start pos: bigendian uint64
offset length: bigendian uint32
//sha1 of github.com/siddontang/ledisdb 20 bytes
magic data = "\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17"
we must guarantee that the log id is monotonic increment strictly.
if log1's id is 1, log2 must be 2
*/
type FileStore struct {
LogStore
m sync.Mutex
maxFileSize int64
maxFileSize int
base string
first uint64
last uint64
rm sync.RWMutex
wm sync.Mutex
logFile *os.File
logNames []string
nextLogIndex int64
indexName string
path string
rs tableReaders
w *tableWriter
}
func NewFileStore(path string) (*FileStore, error) {
func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) {
s := new(FileStore)
if err := os.MkdirAll(path, 0755); err != nil {
var err error
if err = os.MkdirAll(base, 0755); err != nil {
return nil, err
}
s.path = path
s.base = base
s.maxFileSize = defaultMaxLogFileSize
s.maxFileSize = num.MinInt64(maxLogFileSize, maxSize)
if s.maxFileSize == 0 {
s.maxFileSize = defaultMaxLogFileSize
}
s.first = 0
s.last = 0
s.logNames = make([]string, 0, 16)
if err := s.loadIndex(); err != nil {
if err = s.load(); err != nil {
return nil, err
}
index := int64(1)
if len(s.rs) != 0 {
index = s.rs[len(s.rs)-1].index + 1
}
s.w = newTableWriter(s.base, index, s.maxFileSize)
s.w.SetSyncType(syncType)
return s, nil
}
func (s *FileStore) SetMaxFileSize(size int) {
s.maxFileSize = size
}
func (s *FileStore) GetLog(id uint64, l *Log) error {
//first search in table writer
if err := s.w.GetLog(id, l); err == nil {
return nil
} else if err != ErrLogNotFound {
return err
}
func (s *FileStore) GetLog(id uint64, log *Log) error {
panic("not implementation")
return nil
}
s.rm.RLock()
t := s.rs.Search(id)
func (s *FileStore) SeekLog(id uint64, log *Log) error {
panic("not implementation")
return nil
if t == nil {
s.rm.RUnlock()
return ErrLogNotFound
}
err := t.GetLog(id, l)
s.rm.RUnlock()
return err
}
func (s *FileStore) FirstID() (uint64, error) {
panic("not implementation")
return 0, nil
id := uint64(0)
s.rm.RLock()
if len(s.rs) > 0 {
id = s.rs[0].first
} else {
id = 0
}
s.rm.RUnlock()
if id > 0 {
return id, nil
}
//if id = 0,
return s.w.First(), nil
}
func (s *FileStore) LastID() (uint64, error) {
panic("not implementation")
return 0, nil
id := s.w.Last()
if id > 0 {
return id, nil
}
//if table writer has no last id, we may find in the last table reader
s.rm.RLock()
if len(s.rs) > 0 {
id = s.rs[len(s.rs)-1].last
}
s.rm.RUnlock()
return id, nil
}
func (s *FileStore) StoreLog(log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) StoreLog(l *Log) error {
s.wm.Lock()
defer s.wm.Unlock()
func (s *FileStore) StoreLogs(logs []*Log) error {
panic("not implementation")
return nil
}
err := s.w.StoreLog(l)
if err == nil {
return nil
} else if err != errTableNeedFlush {
return err
}
func (s *FileStore) Purge(n uint64) error {
panic("not implementation")
return nil
s.rm.Lock()
var r *tableReader
if r, err = s.w.Flush(); err != nil {
log.Error("write table flush error %s, can not store now", err.Error())
s.w.Close()
s.rm.Unlock()
return err
}
s.rs = append(s.rs, r)
s.rm.Unlock()
return s.w.StoreLog(l)
}
func (s *FileStore) PuregeExpired(n int64) error {
panic("not implementation")
s.rm.Lock()
purges := []*tableReader{}
t := uint32(time.Now().Unix() - int64(n))
for i, r := range s.rs {
if r.lastTime > t {
purges = s.rs[0:i]
s.rs = s.rs[i:]
break
}
}
s.rm.Unlock()
for _, r := range purges {
name := r.name
r.Close()
if err := os.Remove(name); err != nil {
log.Error("purge table %s err: %s", name, err.Error())
}
}
return nil
}
func (s *FileStore) Clear() error {
panic("not implementation")
s.wm.Lock()
s.rm.Lock()
defer func() {
s.rm.Unlock()
s.wm.Unlock()
}()
s.w.Close()
for i := range s.rs {
s.rs[i].Close()
}
s.rs = tableReaders{}
if err := os.RemoveAll(s.base); err != nil {
return err
}
if err := os.MkdirAll(s.base, 0755); err != nil {
return err
}
s.w = newTableWriter(s.base, 1, s.maxFileSize)
return nil
}
func (s *FileStore) Close() error {
panic("not implementation")
s.wm.Lock()
s.rm.Lock()
if r, err := s.w.Flush(); err != nil {
log.Error("close err: %s", err.Error())
} else {
r.Close()
s.w.Close()
}
for i := range s.rs {
s.rs[i].Close()
}
s.rs = tableReaders{}
s.rm.Unlock()
s.wm.Unlock()
return nil
}
func (s *FileStore) flushIndex() error {
data := strings.Join(s.logNames, "\n")
if err := ioutil2.WriteFileAtomic(s.indexName, hack.Slice(data), 0644); err != nil {
log.Error("flush index error %s", err.Error())
func (s *FileStore) load() error {
fs, err := ioutil.ReadDir(s.base)
if err != nil {
return err
}
return nil
}
func (s *FileStore) fileExists(name string) bool {
p := path.Join(s.path, name)
_, err := os.Stat(p)
return !os.IsNotExist(err)
}
func (s *FileStore) loadIndex() error {
s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index"))
if _, err := os.Stat(s.indexName); os.IsNotExist(err) {
//no index file, nothing to do
} else {
indexData, err := ioutil.ReadFile(s.indexName)
if err != nil {
return err
}
lines := strings.Split(string(indexData), "\n")
for _, line := range lines {
line = strings.Trim(line, "\r\n ")
if len(line) == 0 {
continue
}
if s.fileExists(line) {
s.logNames = append(s.logNames, line)
var r *tableReader
var index int64
for _, f := range fs {
if _, err := fmt.Sscanf(f.Name(), "%08d.ldb", &index); err == nil {
if r, err = newTableReader(s.base, index); err != nil {
log.Error("load table %s err: %s", f.Name(), err.Error())
} else {
log.Info("log %s has not exists", line)
s.rs = append(s.rs, r)
}
}
}
var err error
if len(s.logNames) == 0 {
s.nextLogIndex = 1
} else {
lastName := s.logNames[len(s.logNames)-1]
if err := s.rs.check(); err != nil {
return err
}
if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil {
log.Error("invalid logfile name %s", err.Error())
return err
return nil
}
type tableReaders []*tableReader
func (ts tableReaders) Len() int {
return len(ts)
}
func (ts tableReaders) Swap(i, j int) {
ts[i], ts[j] = ts[j], ts[i]
}
func (ts tableReaders) Less(i, j int) bool {
return ts[i].first < ts[j].first
}
func (ts tableReaders) Search(id uint64) *tableReader {
i, j := 0, len(ts)-1
for i <= j {
h := i + (j-i)/2
if ts[h].first <= id && id <= ts[h].last {
return ts[h]
} else if ts[h].last < id {
i = h + 1
} else {
j = h - 1
}
}
return nil
}
func (ts tableReaders) check() error {
if len(ts) == 0 {
return nil
}
sort.Sort(ts)
first := ts[0].first
last := ts[0].last
index := ts[0].index
if first == 0 || first > last {
return fmt.Errorf("invalid log in table %s", ts[0].name)
}
for i := 1; i < len(ts); i++ {
if ts[i].first <= last {
return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i].name)
}
//like mysql, if server restart, a new log will create
s.nextLogIndex++
}
if ts[i].index <= index {
return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i].name)
}
first = ts[i].first
last = ts[i].last
index = ts[i].index
}
return nil
}
func (s *FileStore) openNewLogFile() error {
var err error
lastName := s.formatLogFileName(s.nextLogIndex)
logPath := path.Join(s.path, lastName)
if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
log.Error("open new logfile error %s", err.Error())
return err
}
s.logNames = append(s.logNames, lastName)
if err = s.flushIndex(); err != nil {
return err
}
return nil
}
func (s *FileStore) checkLogFileSize() bool {
if s.logFile == nil {
return false
}
st, _ := s.logFile.Stat()
if st.Size() >= int64(s.maxFileSize) {
s.closeLog()
return true
}
return false
}
func (s *FileStore) closeLog() {
if s.logFile == nil {
return
}
s.nextLogIndex++
s.logFile.Close()
s.logFile = nil
}
func (s *FileStore) formatLogFileName(index int64) string {
return fmt.Sprintf("ledis-bin.%07d", index)
}

617
rpl/file_table.go Normal file
View File

@ -0,0 +1,617 @@
package rpl
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/edsrzf/mmap-go"
"github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"github.com/siddontang/go/sync2"
"io"
"os"
"path"
"reflect"
"sync"
"time"
)
var (
magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17")
log0 = Log{0, 1, 1, []byte("ledisdb")}
log0Data = []byte{}
errTableNeedFlush = errors.New("write table need flush")
pageSize = int64(4096)
)
func init() {
log0Data, _ = log0.Marshal()
pageSize = int64(os.Getpagesize())
}
const tableReaderKeepaliveInterval int64 = 30
func fmtTableName(index int64) string {
return fmt.Sprintf("%08d.ldb", index)
}
type tableReader struct {
sync.Mutex
name string
index int64
f *os.File
m mmap.MMap
pf *os.File
first uint64
last uint64
lastTime uint32
offsetStartPos int64
offsetLen uint32
lastReadTime sync2.AtomicInt64
}
func newTableReader(base string, index int64) (*tableReader, error) {
if index <= 0 {
return nil, fmt.Errorf("invalid index %d", index)
}
t := new(tableReader)
t.name = path.Join(base, fmtTableName(index))
t.index = index
var err error
if err = t.check(); err != nil {
log.Error("check %s error: %s, try to repair", t.name, err.Error())
if err = t.repair(); err != nil {
log.Error("repair %s error: %s", t.name, err.Error())
return nil, err
}
}
t.close()
return t, nil
}
func (t *tableReader) Close() {
t.Lock()
defer t.Unlock()
t.close()
}
func (t *tableReader) close() {
if t.m != nil {
t.m.Unmap()
t.m = nil
}
if t.f != nil {
t.f.Close()
t.f = nil
}
}
func (t *tableReader) Keepalived() bool {
l := t.lastReadTime.Get()
if l > 0 && time.Now().Unix()-l > tableReaderKeepaliveInterval {
return false
}
return true
}
func (t *tableReader) getLogPos(index int) (uint32, error) {
// if _, err := t.pf.Seek(t.offsetStartPos+int64(index*4), os.SEEK_SET); err != nil {
// return 0, err
// }
// var pos uint32
// if err := binary.Read(t.pf, binary.BigEndian, &pos); err != nil {
// return 0, err
// }
// return pos, nil
return binary.BigEndian.Uint32(t.m[index*4:]), nil
}
func (t *tableReader) check() error {
var err error
if t.f, err = os.Open(t.name); err != nil {
return err
}
st, _ := t.f.Stat()
if st.Size() < 32 {
return fmt.Errorf("file size %d too short", st.Size())
}
var pos int64
if pos, err = t.f.Seek(-32, os.SEEK_END); err != nil {
return err
}
if err = binary.Read(t.f, binary.BigEndian, &t.offsetStartPos); err != nil {
return err
} else if t.offsetStartPos >= st.Size() {
return fmt.Errorf("invalid offset start pos %d, file size %d", t.offsetStartPos, st.Size())
} else if t.offsetStartPos%pageSize != 0 {
return fmt.Errorf("invalid offset start pos %d, must page size %d multi", t.offsetStartPos, pageSize)
}
if err = binary.Read(t.f, binary.BigEndian, &t.offsetLen); err != nil {
return err
} else if int64(t.offsetLen) >= st.Size() || t.offsetLen == 0 {
return fmt.Errorf("invalid offset len %d, file size %d", t.offsetLen, st.Size())
} else if t.offsetLen%4 != 0 {
return fmt.Errorf("invalid offset len %d, must 4 multiple", t.offsetLen)
}
if t.offsetStartPos+int64(t.offsetLen) != pos {
return fmt.Errorf("invalid offset %d %d", t.offsetStartPos, t.offsetLen)
}
b := make([]byte, 20)
if _, err = t.f.Read(b); err != nil {
return err
} else if !bytes.Equal(b, magic) {
return fmt.Errorf("invalid magic data %q", b)
}
if t.m, err = mmap.MapRegion(t.f, int(t.offsetLen), mmap.RDONLY, 0, t.offsetStartPos); err != nil {
return err
}
firstLogPos, _ := t.getLogPos(0)
lastLogPos, _ := t.getLogPos(int(t.offsetLen/4 - 1))
if firstLogPos != 0 {
return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos)
} else if int64(lastLogPos) > t.offsetStartPos {
return fmt.Errorf("invalid last log pos %d", lastLogPos)
}
var l Log
if _, err = t.decodeLogHead(&l, int64(firstLogPos)); err != nil {
return fmt.Errorf("decode first log err %s", err.Error())
}
t.first = l.ID
var n int64
if n, err = t.decodeLogHead(&l, int64(lastLogPos)); err != nil {
return fmt.Errorf("decode last log err %s", err.Error())
} else {
var l0 Log
if _, err := t.f.Seek(n, os.SEEK_SET); err != nil {
return fmt.Errorf("seek logo err %s", err.Error())
} else if err = l0.Decode(t.f); err != nil {
println(lastLogPos, n, l0.ID, l0.CreateTime, l0.Compression)
return fmt.Errorf("decode log0 err %s", err.Error())
} else if !reflect.DeepEqual(l0, log0) {
return fmt.Errorf("invalid log0 %#v != %#v", l0, log0)
}
}
t.last = l.ID
t.lastTime = l.CreateTime
if t.first > t.last {
return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last)
} else if (t.last - t.first + 1) != uint64(t.offsetLen/4) {
return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.offsetLen/4)
}
return nil
}
func (t *tableReader) repair() error {
t.close()
var err error
if t.f, err = os.Open(t.name); err != nil {
return err
}
defer t.close()
tw := newTableWriter(path.Dir(t.name), t.index, maxLogFileSize)
tmpName := tw.name + ".tmp"
tw.name = tmpName
os.Remove(tmpName)
defer func() {
tw.Close()
os.Remove(tmpName)
}()
var l Log
for {
if err := l.Decode(t.f); err != nil {
return err
}
if l.ID == 0 {
break
}
t.lastTime = l.CreateTime
if err := tw.StoreLog(&l); err != nil {
return err
}
}
t.close()
var tr *tableReader
if tr, err = tw.Flush(); err != nil {
return err
}
t.first = tr.first
t.last = tr.last
t.offsetStartPos = tr.offsetStartPos
t.offsetLen = tr.offsetLen
defer tr.Close()
os.Remove(t.name)
if err := os.Rename(tmpName, t.name); err != nil {
return err
}
return nil
}
func (t *tableReader) decodeLogHead(l *Log, pos int64) (int64, error) {
_, err := t.f.Seek(int64(pos), os.SEEK_SET)
if err != nil {
return 0, err
}
dataLen, err := l.DecodeHead(t.f)
if err != nil {
return 0, err
}
return pos + int64(l.HeadSize()) + int64(dataLen), nil
}
func (t *tableReader) GetLog(id uint64, l *Log) error {
if id < t.first || id > t.last {
return ErrLogNotFound
}
t.lastReadTime.Set(time.Now().Unix())
t.Lock()
defer t.Unlock()
if err := t.openTable(); err != nil {
t.close()
return err
}
pos, err := t.getLogPos(int(id - t.first))
if err != nil {
return err
}
if _, err := t.f.Seek(int64(pos), os.SEEK_SET); err != nil {
return err
}
if err := l.Decode(t.f); err != nil {
return err
} else if l.ID != id {
return fmt.Errorf("invalid log id %d != %d", l.ID, id)
}
return nil
}
func (t *tableReader) openTable() error {
var err error
if t.f == nil {
if t.f, err = os.Open(t.name); err != nil {
return err
}
}
if t.m == nil {
if t.m, err = mmap.MapRegion(t.f, int(t.offsetLen), mmap.RDONLY, 0, t.offsetStartPos); err != nil {
return err
}
}
return nil
}
type tableWriter struct {
sync.RWMutex
wf *os.File
rf *os.File
rm sync.Mutex
base string
name string
index int64
first uint64
last uint64
offsetBuf []byte
maxLogSize int64
closed bool
syncType int
lastTime uint32
}
func newTableWriter(base string, index int64, maxLogSize int64) *tableWriter {
if index <= 0 {
panic(fmt.Errorf("invalid index %d", index))
}
t := new(tableWriter)
t.base = base
t.name = path.Join(base, fmtTableName(index))
t.index = index
t.maxLogSize = maxLogSize
t.closed = false
return t
}
func (t *tableWriter) SetMaxLogSize(s int64) {
t.maxLogSize = s
}
func (t *tableWriter) SetSyncType(tp int) {
t.syncType = tp
}
func (t *tableWriter) close() {
if t.rf != nil {
t.rf.Close()
t.rf = nil
}
if t.wf != nil {
t.wf.Close()
t.wf = nil
}
}
func (t *tableWriter) Close() {
t.Lock()
defer t.Unlock()
t.closed = true
t.close()
}
func (t *tableWriter) First() uint64 {
t.Lock()
id := t.first
t.Unlock()
return id
}
func (t *tableWriter) Last() uint64 {
t.Lock()
id := t.last
t.Unlock()
return id
}
func (t *tableWriter) reset() {
t.close()
t.first = 0
t.last = 0
t.index = t.index + 1
t.name = path.Join(t.base, fmtTableName(t.index))
t.offsetBuf = t.offsetBuf[0:0]
}
func (t *tableWriter) Flush() (*tableReader, error) {
t.Lock()
defer t.Unlock()
if t.wf == nil {
return nil, fmt.Errorf("nil write handler")
}
defer t.reset()
tr := new(tableReader)
tr.name = t.name
tr.index = t.index
st, _ := t.wf.Stat()
tr.first = t.first
tr.last = t.last
if n, err := t.wf.Write(log0Data); err != nil {
return nil, fmt.Errorf("flush log0data error %s", err.Error())
} else if n != len(log0Data) {
return nil, fmt.Errorf("flush log0data only %d != %d", n, len(log0Data))
}
st, _ = t.wf.Stat()
if m := st.Size() % pageSize; m != 0 {
padding := pageSize - m
if n, err := t.wf.Write(make([]byte, padding)); err != nil {
return nil, fmt.Errorf("flush log padding error %s", err.Error())
} else if n != int(padding) {
return nil, fmt.Errorf("flush log padding error %d != %d", n, padding)
}
}
st, _ = t.wf.Stat()
if st.Size()%pageSize != 0 {
return nil, fmt.Errorf("invalid offset start pos, %d", st.Size())
}
tr.offsetStartPos = st.Size()
tr.offsetLen = uint32(len(t.offsetBuf))
if n, err := t.wf.Write(t.offsetBuf); err != nil {
log.Error("flush offset buffer error %s", err.Error())
return nil, err
} else if n != len(t.offsetBuf) {
log.Error("flush offset buffer only %d != %d", n, len(t.offsetBuf))
return nil, io.ErrShortWrite
}
if err := binary.Write(t.wf, binary.BigEndian, tr.offsetStartPos); err != nil {
log.Error("flush offset start pos error %s", err.Error())
return nil, err
}
if err := binary.Write(t.wf, binary.BigEndian, tr.offsetLen); err != nil {
log.Error("flush offset len error %s", err.Error())
return nil, err
}
if n, err := t.wf.Write(magic); err != nil {
log.Error("flush magic data error %s", err.Error())
return nil, err
} else if n != len(magic) {
log.Error("flush magic data only %d != %d", n, len(magic))
return nil, io.ErrShortWrite
}
return tr, nil
}
func (t *tableWriter) StoreLog(l *Log) error {
if l.ID == 0 {
return ErrStoreLogID
}
t.Lock()
defer t.Unlock()
if t.closed {
return fmt.Errorf("table writer is closed")
}
if t.last > 0 && l.ID != t.last+1 {
return ErrStoreLogID
}
if t.last-t.first+1 > maxLogNumInFile {
return errTableNeedFlush
}
var err error
if t.wf == nil {
if t.wf, err = os.OpenFile(t.name, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
return err
}
}
if t.offsetBuf == nil {
t.offsetBuf = make([]byte, 0, maxLogNumInFile*4)
}
st, _ := t.wf.Stat()
if st.Size() >= t.maxLogSize {
return errTableNeedFlush
}
offsetPos := uint32(st.Size())
if err := l.Encode(t.wf); err != nil {
return err
}
t.offsetBuf = append(t.offsetBuf, num.Uint32ToBytes(offsetPos)...)
if t.first == 0 {
t.first = l.ID
}
t.last = l.ID
t.lastTime = l.CreateTime
//todo add LRU cache
if t.syncType == 2 || (t.syncType == 1 && time.Now().Unix()-int64(t.lastTime) > 1) {
if err := t.wf.Sync(); err != nil {
log.Error("sync table error %s", err.Error())
}
}
return nil
}
func (t *tableWriter) GetLog(id uint64, l *Log) error {
t.RLock()
defer t.RUnlock()
if id < t.first || id > t.last {
return ErrLogNotFound
}
//todo memory cache
offset := binary.BigEndian.Uint32(t.offsetBuf[(id-t.first)*4:])
if err := t.getLog(l, int64(offset)); err != nil {
return err
} else if l.ID != id {
return fmt.Errorf("invalid log id %d != %d", id, l.ID)
}
return nil
}
func (t *tableWriter) getLog(l *Log, pos int64) error {
t.rm.Lock()
defer t.rm.Unlock()
var err error
if t.rf == nil {
if t.rf, err = os.Open(t.name); err != nil {
return err
}
}
if _, err = t.rf.Seek(pos, os.SEEK_SET); err != nil {
return err
}
if err = l.Decode(t.rf); err != nil {
return err
}
return nil
}

198
rpl/file_table_test.go Normal file
View File

@ -0,0 +1,198 @@
package rpl
import (
"github.com/siddontang/go/log"
"io/ioutil"
"os"
"path"
"testing"
"time"
)
func TestFileTable(t *testing.T) {
base, err := ioutil.TempDir("", "test_table")
if err != nil {
t.Fatal(err)
}
os.MkdirAll(base, 0755)
defer os.RemoveAll(base)
l := new(Log)
l.Compression = 0
l.Data = make([]byte, 4096)
w := newTableWriter(base, 1, 1024*1024)
defer w.Close()
for i := 0; i < 10; i++ {
l.ID = uint64(i + 1)
l.CreateTime = uint32(time.Now().Unix())
l.Data[0] = byte(i + 1)
if err := w.StoreLog(l); err != nil {
t.Fatal(err)
}
}
if w.first != 1 {
t.Fatal(w.first)
} else if w.last != 10 {
t.Fatal(w.last)
}
l.ID = 10
if err := w.StoreLog(l); err == nil {
t.Fatal("must err")
}
var ll Log
if err = ll.Unmarshal(log0Data); err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
if err := w.GetLog(uint64(i+1), &ll); err != nil {
t.Fatal(err)
} else if len(ll.Data) != 4096 {
t.Fatal(len(ll.Data))
} else if ll.Data[0] != byte(i+1) {
t.Fatal(ll.Data[0])
}
}
if err := w.GetLog(12, &ll); err == nil {
t.Fatal("must nil")
}
var r *tableReader
name := w.name
if r, err = w.Flush(); err != nil {
t.Fatal(err)
}
for i := 10; i < 20; i++ {
l.ID = uint64(i + 1)
l.CreateTime = uint32(time.Now().Unix())
l.Data[0] = byte(i + 1)
if err := w.StoreLog(l); err != nil {
t.Fatal(err)
}
}
if w.first != 11 {
t.Fatal(w.first)
} else if w.last != 20 {
t.Fatal(w.last)
}
defer r.Close()
for i := 0; i < 10; i++ {
if err := r.GetLog(uint64(i+1), &ll); err != nil {
t.Fatal(err)
} else if len(ll.Data) != 4096 {
t.Fatal(len(ll.Data))
} else if ll.Data[0] != byte(i+1) {
t.Fatal(ll.Data[0])
}
}
if err := r.GetLog(12, &ll); err == nil {
t.Fatal("must nil")
}
r.Close()
if r, err = newTableReader(base, 1); err != nil {
t.Fatal(err)
}
defer r.Close()
for i := 0; i < 10; i++ {
if err := r.GetLog(uint64(i+1), &ll); err != nil {
t.Fatal(err)
} else if len(ll.Data) != 4096 {
t.Fatal(len(ll.Data))
} else if ll.Data[0] != byte(i+1) {
t.Fatal(ll.Data[0])
}
}
if err := r.GetLog(12, &ll); err == nil {
t.Fatal("must nil")
}
st, _ := r.f.Stat()
s := st.Size()
r.Close()
log.SetLevel(log.LevelFatal)
testRepair(t, name, 1, s, 11)
testRepair(t, name, 1, s, 32)
testRepair(t, name, 1, s, 42)
testRepair(t, name, 1, s, 72)
if err := os.Truncate(name, s-(73+4096)); err != nil {
t.Fatal(err)
}
if r, err = newTableReader(base, 1); err == nil {
t.Fatal("can not repair")
}
if r, err := w.Flush(); err != nil {
t.Fatal(err)
} else {
r.Close()
}
if r, err = newTableReader(base, 2); err != nil {
t.Fatal(err)
}
defer r.Close()
}
func testRepair(t *testing.T, name string, index int64, s int64, cutSize int64) {
var r *tableReader
var err error
if err := os.Truncate(name, s-cutSize); err != nil {
t.Fatal(err)
}
if r, err = newTableReader(path.Dir(name), index); err != nil {
t.Fatal(err)
}
defer r.Close()
var ll Log
for i := 0; i < 10; i++ {
if err := r.GetLog(uint64(i+1), &ll); err != nil {
t.Fatal(err)
} else if len(ll.Data) != 4096 {
t.Fatal(len(ll.Data))
} else if ll.Data[0] != byte(i+1) {
t.Fatal(ll.Data[0])
}
}
if err := r.GetLog(12, &ll); err == nil {
t.Fatal("must nil")
}
st, _ := r.f.Stat()
if s != st.Size() {
t.Fatalf("repair error size %d != %d", s, st.Size())
}
}

View File

@ -21,6 +21,8 @@ type GoLevelDBStore struct {
first uint64
last uint64
buf bytes.Buffer
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
@ -84,30 +86,10 @@ func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
}
}
func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error {
it := s.db.NewIterator()
defer it.Close()
it.Seek(num.Uint64ToBytes(id))
if !it.Valid() {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(it.RawValue()))
}
}
func (s *GoLevelDBStore) StoreLog(log *Log) error {
return s.StoreLogs([]*Log{log})
}
func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
s.m.Lock()
defer s.m.Unlock()
w := s.db.NewWriteBatch()
defer w.Rollback()
last, err := s.lastID()
if err != nil {
return err
@ -115,24 +97,20 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
s.last = InvalidLogID
var buf bytes.Buffer
for _, log := range logs {
buf.Reset()
s.buf.Reset()
if log.ID <= last {
return ErrLessLogID
}
last = log.ID
key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&buf); err != nil {
return err
}
w.Put(key, buf.Bytes())
if log.ID != last+1 {
return ErrStoreLogID
}
if err = w.Commit(); err != nil {
last = log.ID
key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&s.buf); err != nil {
return err
}
if err = s.db.Put(key, s.buf.Bytes()); err != nil {
return err
}
@ -140,42 +118,6 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
return nil
}
func (s *GoLevelDBStore) Purge(n uint64) error {
s.m.Lock()
defer s.m.Unlock()
var first, last uint64
var err error
first, err = s.firstID()
if err != nil {
return err
}
last, err = s.lastID()
if err != nil {
return err
}
start := first
stop := num.MinUint64(last, first+n)
w := s.db.NewWriteBatch()
defer w.Rollback()
s.reset()
for i := start; i < stop; i++ {
w.Delete(num.Uint64ToBytes(i))
}
if err = w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) PurgeExpired(n int64) error {
if n <= 0 {
return fmt.Errorf("invalid expired time %d", n)
@ -214,6 +156,11 @@ func (s *GoLevelDBStore) PurgeExpired(n int64) error {
return nil
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Clear() error {
s.m.Lock()
defer s.m.Unlock()
@ -228,11 +175,6 @@ func (s *GoLevelDBStore) Clear() error {
return s.open()
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Close() error {
s.m.Lock()
defer s.m.Unlock()

View File

@ -69,24 +69,11 @@ func (l *Log) Encode(w io.Writer) error {
}
func (l *Log) Decode(r io.Reader) error {
buf := make([]byte, l.HeadSize())
if _, err := io.ReadFull(r, buf); err != nil {
length, err := l.DecodeHead(r)
if err != nil {
return err
}
pos := 0
l.ID = binary.BigEndian.Uint64(buf[pos:])
pos += 8
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
pos += 4
l.Compression = uint8(buf[pos])
pos++
length := binary.BigEndian.Uint32(buf[pos:])
l.Data = l.Data[0:0]
if cap(l.Data) >= int(length) {
@ -100,3 +87,25 @@ func (l *Log) Decode(r io.Reader) error {
return nil
}
func (l *Log) DecodeHead(r io.Reader) (uint32, error) {
buf := make([]byte, l.HeadSize())
if _, err := io.ReadFull(r, buf); err != nil {
return 0, err
}
pos := 0
l.ID = binary.BigEndian.Uint64(buf[pos:])
pos += 8
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
pos += 4
l.Compression = uint8(buf[pos])
pos++
length := binary.BigEndian.Uint32(buf[pos:])
return length, nil
}

View File

@ -24,8 +24,9 @@ type Replication struct {
s LogStore
commitID uint64
commitLog *os.File
commitID uint64
commitLog *os.File
commitLastTime time.Time
quit chan struct{}
@ -51,8 +52,16 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
r.cfg = cfg
var err error
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err
switch cfg.Replication.StoreName {
case "goleveldb":
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err
}
default:
if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg.Replication.MaxLogFileSize, cfg.Replication.SyncLog); err != nil {
return nil, err
}
}
if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
@ -84,6 +93,10 @@ func (r *Replication) Close() error {
r.s = nil
}
if err := r.updateCommitID(r.commitID, true); err != nil {
log.Error("update commit id err %s", err.Error())
}
if r.commitLog != nil {
r.commitLog.Close()
r.commitLog = nil
@ -146,14 +159,10 @@ func (r *Replication) WaitLog() <-chan struct{} {
}
func (r *Replication) StoreLog(log *Log) error {
return r.StoreLogs([]*Log{log})
}
func (r *Replication) StoreLogs(logs []*Log) error {
r.m.Lock()
defer r.m.Unlock()
return r.s.StoreLogs(logs)
return r.s.StoreLog(log)
}
func (r *Replication) FirstLogID() (uint64, error) {
@ -181,7 +190,7 @@ func (r *Replication) UpdateCommitID(id uint64) error {
r.m.Lock()
defer r.m.Unlock()
return r.updateCommitID(id)
return r.updateCommitID(id, r.cfg.Replication.SyncLog == 2)
}
func (r *Replication) Stat() (*Stat, error) {
@ -203,17 +212,23 @@ func (r *Replication) Stat() (*Stat, error) {
return s, nil
}
func (r *Replication) updateCommitID(id uint64) error {
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err
}
func (r *Replication) updateCommitID(id uint64, force bool) error {
n := time.Now()
if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil {
return err
if force || n.Sub(r.commitLastTime) > time.Second {
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err
}
if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil {
return err
}
}
r.commitID = id
r.commitLastTime = n
return nil
}
@ -262,7 +277,7 @@ func (r *Replication) ClearWithCommitID(id uint64) error {
return err
}
return r.updateCommitID(id)
return r.updateCommitID(id, true)
}
func (r *Replication) onPurgeExpired() {

View File

@ -10,25 +10,18 @@ const (
var (
ErrLogNotFound = errors.New("log not found")
ErrLessLogID = errors.New("log id is less")
ErrStoreLogID = errors.New("log id is less")
ErrNoBehindLog = errors.New("no behind commit log")
)
type LogStore interface {
GetLog(id uint64, log *Log) error
// Get the first log which ID is equal or larger than id
SeekLog(id uint64, log *Log) error
FirstID() (uint64, error)
LastID() (uint64, error)
// if log id is less than current last id, return error
StoreLog(log *Log) error
StoreLogs(logs []*Log) error
// Delete first n logs
Purge(n uint64) error
// Delete logs before n seconds
PurgeExpired(n int64) error

View File

@ -4,7 +4,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"
)
func TestGoLevelDBStore(t *testing.T) {
@ -25,6 +24,24 @@ func TestGoLevelDBStore(t *testing.T) {
testLogs(t, l)
}
func TestFileStore(t *testing.T) {
// Create a test dir
dir, err := ioutil.TempDir("", "ldb")
if err != nil {
t.Fatalf("err: %v ", err)
}
defer os.RemoveAll(dir)
// New level
l, err := NewFileStore(dir, 4096, 0)
if err != nil {
t.Fatalf("err: %v ", err)
}
defer l.Close()
testLogs(t, l)
}
func testLogs(t *testing.T, l LogStore) {
// Should be no first index
idx, err := l.FirstID()
@ -46,14 +63,16 @@ func testLogs(t *testing.T, l LogStore) {
// Try a filed fetch
var out Log
if err := l.GetLog(10, &out); err.Error() != "log not found" {
if err := l.GetLog(10, &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
data := make([]byte, 1024)
// Write out a log
log := Log{
ID: 1,
Data: []byte("first"),
Data: data,
}
for i := 1; i <= 10; i++ {
log.ID = uint64(i)
@ -63,16 +82,20 @@ func testLogs(t *testing.T, l LogStore) {
}
// Attempt to write multiple logs
var logs []*Log
for i := 11; i <= 20; i++ {
nl := &Log{
ID: uint64(i),
Data: []byte("first"),
Data: data,
}
if err := l.StoreLog(nl); err != nil {
t.Fatalf("err: %v", err)
}
logs = append(logs, nl)
}
if err := l.StoreLogs(logs); err != nil {
t.Fatalf("err: %v", err)
// Try to fetch
if err := l.GetLog(1, &out); err != nil {
t.Fatalf("err: %v ", err)
}
// Try to fetch
@ -103,87 +126,38 @@ func testLogs(t *testing.T, l LogStore) {
t.Fatalf("bad idx: %d", idx)
}
// Delete a suffix
if err := l.Purge(5); err != nil {
t.Fatalf("err: %v ", err)
if err = l.Clear(); err != nil {
t.Fatalf("err :%v", err)
}
// Verify they are all deleted
for i := 1; i <= 5; i++ {
if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
// Check the lowest index
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Check the highest index
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Write out a log
log = Log{
ID: 1,
Data: data,
}
for i := 1; i <= 10; i++ {
log.ID = uint64(i)
if err := l.StoreLog(&log); err != nil {
t.Fatalf("err: %v", err)
}
}
// Index should be one
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 6 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 20 {
t.Fatalf("bad idx: %d", idx)
}
// Should not be able to fetch
if err := l.GetLog(5, &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
if err := l.Clear(); err != nil {
t.Fatal(err)
}
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
now := uint32(time.Now().Unix())
logs = []*Log{}
for i := 1; i <= 20; i++ {
nl := &Log{
ID: uint64(i),
CreateTime: now - 20,
Data: []byte("first"),
}
logs = append(logs, nl)
}
if err := l.PurgeExpired(1); err != nil {
t.Fatal(err)
}
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
}

38
rpl/table_readers_test.go Normal file
View File

@ -0,0 +1,38 @@
package rpl
import (
"testing"
)
func TestTableReaders(t *testing.T) {
ts := make(tableReaders, 0, 10)
for i := uint64(0); i < 10; i++ {
t := new(tableReader)
t.index = int64(i) + 1
t.first = i*10 + 1
t.last = i*10 + 10
ts = append(ts, t)
}
if err := ts.check(); err != nil {
t.Fatal(err)
}
for i := 1; i <= 100; i++ {
if r := ts.Search(uint64(i)); r == nil {
t.Fatal("must hit", i)
} else if r.index != int64((i-1)/10)+1 {
t.Fatal("invalid index", r.index, i)
}
}
if r := ts.Search(1000); r != nil {
t.Fatal("must not hit")
}
if r := ts.Search(0); r != nil {
t.Fatal("must not hit")
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/siddontang/go/sync2"
"github.com/siddontang/ledisdb/ledis"
"io"
"sync"
"time"
)
@ -74,6 +75,13 @@ type client struct {
script *ledis.Multi
slaveListeningAddr string
quit chan struct{}
done chan error
wg sync.WaitGroup
fc chan CommandFunc
}
func newClient(app *App) *client {
@ -85,9 +93,35 @@ func newClient(app *App) *client {
// c.reqErr = make(chan error)
c.quit = make(chan struct{})
c.done = make(chan error, 1)
c.fc = make(chan CommandFunc, 1)
c.wg.Add(1)
go c.run()
return c
}
func (c *client) close() {
close(c.quit)
c.wg.Wait()
}
func (c *client) run() {
defer c.wg.Done()
for {
select {
case <-c.quit:
return
case f := <-c.fc:
c.done <- f(c)
}
}
}
func (c *client) perform() {
var err error
@ -114,7 +148,9 @@ func (c *client) perform() {
// }()
// err = <-c.reqErr
err = exeCmd(c)
c.fc <- exeCmd
err = <-c.done
}
}

View File

@ -50,6 +50,7 @@ func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) {
return
}
c.perform()
c.client.close()
}
func (c *httpClient) addr(r *http.Request) string {

View File

@ -56,6 +56,8 @@ func (c *respClient) run() {
c.app.info.addClients(1)
defer func() {
c.client.close()
c.app.info.addClients(-1)
if e := recover(); e != nil {
@ -82,32 +84,20 @@ func (c *respClient) run() {
}()
kc := time.Duration(c.app.cfg.ConnKeepaliveInterval) * time.Second
done := make(chan error)
for {
if kc > 0 {
c.conn.SetReadDeadline(time.Now().Add(kc))
}
// I still don't know why use goroutine can improve performance
// if someone knows and benchamrks with another different result without goroutine, please tell me
go func() {
reqData, err := c.readRequest()
if err == nil {
c.handleRequest(reqData)
}
reqData, err := c.readRequest()
if err == nil {
c.handleRequest(reqData)
}
done <- err
}()
// reqData, err := c.readRequest()
// if err == nil {
// c.handleRequest(reqData)
// }
err := <-done
if err != nil {
return
}
if c.conn == nil {
return
}

View File

@ -1,5 +1,9 @@
package driver
import (
"github.com/siddontang/goleveldb/leveldb"
)
type BatchPuter interface {
BatchPut([]Write) error
SyncBatchPut([]Write) error
@ -11,34 +15,56 @@ type Write struct {
}
type WriteBatch struct {
batch BatchPuter
wb []Write
d *leveldb.Batch
wb []Write
w BatchPuter
}
func (w *WriteBatch) Put(key, value []byte) {
func (wb *WriteBatch) Close() {
wb.d.Reset()
wb.wb = wb.wb[0:0]
}
func (wb *WriteBatch) Put(key, value []byte) {
if value == nil {
value = []byte{}
}
w.wb = append(w.wb, Write{key, value})
wb.wb = append(wb.wb, Write{key, value})
}
func (w *WriteBatch) Delete(key []byte) {
w.wb = append(w.wb, Write{key, nil})
func (wb *WriteBatch) Delete(key []byte) {
wb.wb = append(wb.wb, Write{key, nil})
}
func (w *WriteBatch) Commit() error {
return w.batch.BatchPut(w.wb)
func (wb *WriteBatch) Commit() error {
return wb.w.BatchPut(wb.wb)
}
func (w *WriteBatch) SyncCommit() error {
return w.batch.SyncBatchPut(w.wb)
func (wb *WriteBatch) SyncCommit() error {
return wb.w.SyncBatchPut(wb.wb)
}
func (w *WriteBatch) Rollback() error {
w.wb = w.wb[0:0]
func (wb *WriteBatch) Rollback() error {
wb.wb = wb.wb[0:0]
return nil
}
func NewWriteBatch(puter BatchPuter) IWriteBatch {
return &WriteBatch{puter, []Write{}}
func (wb *WriteBatch) Data() []byte {
wb.d.Reset()
for _, w := range wb.wb {
if w.Value == nil {
wb.d.Delete(w.Key)
} else {
wb.d.Put(w.Key, w.Value)
}
}
return wb.d.Dump()
}
func NewWriteBatch(puter BatchPuter) *WriteBatch {
return &WriteBatch{
&leveldb.Batch{},
[]Write{},
puter}
}

View File

@ -58,6 +58,8 @@ type IWriteBatch interface {
Commit() error
SyncCommit() error
Rollback() error
Data() []byte
Close()
}
type Tx interface {

View File

@ -29,3 +29,11 @@ func (w *WriteBatch) Rollback() error {
w.wbatch.Reset()
return nil
}
func (w *WriteBatch) Close() {
w.wbatch.Reset()
}
func (w *WriteBatch) Data() []byte {
return w.wbatch.Dump()
}

View File

@ -4,22 +4,37 @@ package leveldb
// #cgo LDFLAGS: -lleveldb
// #include "leveldb/c.h"
// #include "leveldb_ext.h"
import "C"
import (
"github.com/siddontang/goleveldb/leveldb"
"unsafe"
)
type WriteBatch struct {
db *DB
wbatch *C.leveldb_writebatch_t
gbatch *leveldb.Batch
}
func (w *WriteBatch) Close() error {
C.leveldb_writebatch_destroy(w.wbatch)
w.wbatch = nil
func newWriteBatch(db *DB) *WriteBatch {
w := new(WriteBatch)
w.db = db
w.wbatch = C.leveldb_writebatch_create()
w.gbatch = new(leveldb.Batch)
return nil
return w
}
func (w *WriteBatch) Close() {
if w.wbatch != nil {
C.leveldb_writebatch_destroy(w.wbatch)
w.wbatch = nil
}
w.gbatch = nil
}
func (w *WriteBatch) Put(key, value []byte) {
@ -52,6 +67,7 @@ func (w *WriteBatch) SyncCommit() error {
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil
}
@ -63,3 +79,26 @@ func (w *WriteBatch) commit(wb *WriteOptions) error {
}
return nil
}
//export leveldb_writebatch_iterate_put
func leveldb_writebatch_iterate_put(p unsafe.Pointer, k *C.char, klen C.size_t, v *C.char, vlen C.size_t) {
b := (*leveldb.Batch)(p)
key := slice(unsafe.Pointer(k), int(klen))
value := slice(unsafe.Pointer(v), int(vlen))
b.Put(key, value)
}
//export leveldb_writebatch_iterate_delete
func leveldb_writebatch_iterate_delete(p unsafe.Pointer, k *C.char, klen C.size_t) {
b := (*leveldb.Batch)(p)
key := slice(unsafe.Pointer(k), int(klen))
b.Delete(key)
}
func (w *WriteBatch) Data() []byte {
w.gbatch.Reset()
C.leveldb_writebatch_iterate_ext(w.wbatch,
unsafe.Pointer(w.gbatch))
b := w.gbatch.Dump()
return b
}

View File

@ -14,6 +14,7 @@ import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
@ -182,10 +183,11 @@ func (db *DB) SyncDelete(key []byte) error {
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: C.leveldb_writebatch_create(),
}
wb := newWriteBatch(db)
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb
}

View File

@ -84,5 +84,12 @@ unsigned char leveldb_iter_prev_ext(leveldb_iterator_t* iter) {
return leveldb_iter_valid(iter);
}
extern void leveldb_writebatch_iterate_put(void*, const char* k, size_t klen, const char* v, size_t vlen);
extern void leveldb_writebatch_iterate_delete(void*, const char* k, size_t klen);
void leveldb_writebatch_iterate_ext(leveldb_writebatch_t* w, void *p) {
leveldb_writebatch_iterate(w, p,
leveldb_writebatch_iterate_put, leveldb_writebatch_iterate_delete);
}
}

View File

@ -32,6 +32,7 @@ extern unsigned char leveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, s
extern unsigned char leveldb_iter_next_ext(leveldb_iterator_t*);
extern unsigned char leveldb_iter_prev_ext(leveldb_iterator_t*);
extern void leveldb_writebatch_iterate_ext(leveldb_writebatch_t*, void* p);
#ifdef __cplusplus
}

View File

@ -17,10 +17,11 @@ type WriteBatch struct {
commitOk bool
}
func (w *WriteBatch) Close() error {
C.rocksdb_writebatch_destroy(w.wbatch)
w.wbatch = nil
return nil
func (w *WriteBatch) Close() {
if w.wbatch != nil {
C.rocksdb_writebatch_destroy(w.wbatch)
w.wbatch = nil
}
}
func (w *WriteBatch) Put(key, value []byte) {
@ -73,3 +74,10 @@ func (w *WriteBatch) commit(wb *WriteOptions) error {
}
return nil
}
func (w *WriteBatch) Data() []byte {
var vallen C.size_t
value := C.rocksdb_writebatch_data(w.wbatch, &vallen)
return slice(unsafe.Pointer(value), int(vallen))
}

View File

@ -15,6 +15,7 @@ import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
@ -216,6 +217,10 @@ func (db *DB) NewWriteBatch() driver.IWriteBatch {
wbatch: C.rocksdb_writebatch_create(),
}
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb
}

View File

@ -57,6 +57,14 @@ func (o *Options) Close() {
C.rocksdb_options_destroy(o.Opt)
}
func (o *Options) IncreaseParallelism(n int) {
C.rocksdb_options_increase_parallelism(o.Opt, C.int(n))
}
func (o *Options) OptimizeLevelStyleCompaction(n int) {
C.rocksdb_options_optimize_level_style_compaction(o.Opt, C.uint64_t(n))
}
func (o *Options) SetComparator(cmp *C.rocksdb_comparator_t) {
C.rocksdb_options_set_comparator(o.Opt, cmp)
}

View File

@ -6,6 +6,7 @@ import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"reflect"
"testing"
)
@ -38,6 +39,7 @@ func testStore(db *DB, t *testing.T) {
testBatch(db, t)
testIterator(db, t)
testSnapshot(db, t)
testBatchData(db, t)
}
func testClear(db *DB, t *testing.T) {
@ -342,3 +344,49 @@ func testSnapshot(db *DB, t *testing.T) {
}
}
func testBatchData(db *DB, t *testing.T) {
w := db.NewWriteBatch()
w.Put([]byte("a"), []byte("1"))
w.Put([]byte("b"), nil)
w.Delete([]byte("c"))
d := w.BatchData()
if kvs, err := d.Items(); err != nil {
t.Fatal(err)
} else if len(kvs) != 3 {
t.Fatal(len(kvs))
} else if !reflect.DeepEqual(kvs[0], BatchItem{[]byte("a"), []byte("1")}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[1], BatchItem{[]byte("b"), []byte{}}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[2], BatchItem{[]byte("c"), nil}) {
t.Fatal("must equal")
}
if err := d.Append(d); err != nil {
t.Fatal(err)
} else if d.Len() != 6 {
t.Fatal(d.Len())
}
if kvs, err := d.Items(); err != nil {
t.Fatal(err)
} else if len(kvs) != 6 {
t.Fatal(len(kvs))
} else if !reflect.DeepEqual(kvs[0], BatchItem{[]byte("a"), []byte("1")}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[1], BatchItem{[]byte("b"), []byte{}}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[2], BatchItem{[]byte("c"), nil}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[3], BatchItem{[]byte("a"), []byte("1")}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[4], BatchItem{[]byte("b"), []byte{}}) {
t.Fatal("must equal")
} else if !reflect.DeepEqual(kvs[5], BatchItem{[]byte("c"), nil}) {
t.Fatal("must equal")
}
}

View File

@ -1,6 +1,8 @@
package store
import (
"encoding/binary"
"github.com/siddontang/goleveldb/leveldb"
"github.com/siddontang/ledisdb/store/driver"
"time"
)
@ -14,6 +16,10 @@ type WriteBatch struct {
db *DB
}
func (wb *WriteBatch) Close() {
wb.wb.Close()
}
func (wb *WriteBatch) Put(key []byte, value []byte) {
wb.putNum++
wb.wb.Put(key, value)
@ -50,3 +56,94 @@ func (wb *WriteBatch) Rollback() error {
return wb.wb.Rollback()
}
// the data will be undefined after commit or rollback
func (wb *WriteBatch) BatchData() *BatchData {
data := wb.wb.Data()
d, err := NewBatchData(data)
if err != nil {
//can not enter this
panic(err)
}
return d
}
func (wb *WriteBatch) Data() []byte {
b := wb.BatchData()
return b.Data()
}
const BatchDataHeadLen = 12
/*
see leveldb batch data format for more information
*/
type BatchData struct {
leveldb.Batch
}
func NewBatchData(data []byte) (*BatchData, error) {
b := new(BatchData)
if err := b.Load(data); err != nil {
return nil, err
}
return b, nil
}
func (d *BatchData) Append(do *BatchData) error {
d1 := d.Dump()
d2 := do.Dump()
n := d.Len() + do.Len()
d1 = append(d1, d2[BatchDataHeadLen:]...)
binary.LittleEndian.PutUint32(d1[8:], uint32(n))
return d.Load(d1)
}
func (d *BatchData) Data() []byte {
return d.Dump()
}
func (d *BatchData) Reset() {
d.Batch.Reset()
}
type BatchDataReplay interface {
Put(key, value []byte)
Delete(key []byte)
}
type BatchItem struct {
Key []byte
Value []byte
}
type batchItems []BatchItem
func (bs *batchItems) Put(key, value []byte) {
*bs = append(*bs, BatchItem{key, value})
}
func (bs *batchItems) Delete(key []byte) {
*bs = append(*bs, BatchItem{key, nil})
}
func (d *BatchData) Replay(r BatchDataReplay) error {
return d.Batch.Replay(r)
}
func (d *BatchData) Items() ([]BatchItem, error) {
is := make(batchItems, 0, d.Len())
if err := d.Replay(&is); err != nil {
return nil, err
}
return []BatchItem(is), nil
}