diff --git a/bootstrap.sh b/bootstrap.sh index ee260b7..a93c219 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -15,3 +15,5 @@ go get github.com/ugorji/go/codec go get github.com/BurntSushi/toml go get github.com/siddontang/go-bson/bson + +go get github.com/siddontang/go/num diff --git a/client/ledis-py/ledis/client.py b/client/ledis-py/ledis/client.py index 0247d86..420900f 100644 --- a/client/ledis-py/ledis/client.py +++ b/client/ledis-py/ledis/client.py @@ -9,8 +9,7 @@ from ledis.exceptions import ( ConnectionError, DataError, LedisError, - ResponseError, - TxNotBeginError + ResponseError ) SYM_EMPTY = b('') @@ -199,11 +198,6 @@ class Ledis(object): def set_response_callback(self, command, callback): "Set a custom Response Callback" self.response_callbacks[command] = callback - - def tx(self): - return Transaction( - self.connection_pool, - self.response_callbacks) #### COMMAND EXECUTION AND PROTOCOL PARSING #### @@ -970,43 +964,3 @@ class Ledis(object): def scriptflush(self): return self.execute_command('SCRIPT', 'FLUSH') - - -class Transaction(Ledis): - def __init__(self, connection_pool, response_callbacks): - self.connection_pool = connection_pool - self.response_callbacks = response_callbacks - self.connection = None - - def execute_command(self, *args, **options): - "Execute a command and return a parsed response" - command_name = args[0] - - connection = self.connection - if self.connection is None: - raise TxNotBeginError - - try: - connection.send_command(*args) - return self.parse_response(connection, command_name, **options) - except ConnectionError: - connection.disconnect() - connection.send_command(*args) - return self.parse_response(connection, command_name, **options) - - def begin(self): - self.connection = self.connection_pool.get_connection('begin') - return self.execute_command("BEGIN") - - def commit(self): - res = self.execute_command("COMMIT") - self.connection_pool.release(self.connection) - self.connection = None - return res - - def rollback(self): - res = self.execute_command("ROLLBACK") - self.connection_pool.release(self.connection) - self.connection = None - return res - diff --git a/client/ledis-py/ledis/exceptions.py b/client/ledis-py/ledis/exceptions.py index 9150db6..f92e530 100644 --- a/client/ledis-py/ledis/exceptions.py +++ b/client/ledis-py/ledis/exceptions.py @@ -35,5 +35,3 @@ class DataError(LedisError): class ExecAbortError(ResponseError): pass -class TxNotBeginError(LedisError): - pass \ No newline at end of file diff --git a/client/ledis-py/tests/test_tx.py b/client/ledis-py/tests/test_tx.py deleted file mode 100644 index cfbab20..0000000 --- a/client/ledis-py/tests/test_tx.py +++ /dev/null @@ -1,46 +0,0 @@ -import unittest -import sys -sys.path.append("..") - -import ledis - -global_l = ledis.Ledis() - -#db that do not support transaction -dbs = ["leveldb", "rocksdb", "hyperleveldb", "goleveldb"] -check = global_l.info().get("db_name") in dbs - - -class TestTx(unittest.TestCase): - def setUp(self): - self.l = ledis.Ledis(port=6380) - - def tearDown(self): - self.l.flushdb() - - @unittest.skipIf(check, reason="db not support transaction") - def test_commit(self): - tx = self.l.tx() - self.l.set("a", "no-tx") - assert self.l.get("a") == "no-tx" - tx.begin() - tx.set("a", "tx") - assert self.l.get("a") == "no-tx" - assert tx.get("a") == "tx" - - tx.commit() - assert self.l.get("a") == "tx" - - @unittest.skipIf(check, reason="db not support transaction") - def test_rollback(self): - tx = self.l.tx() - self.l.set("a", "no-tx") - assert self.l.get("a") == "no-tx" - - tx.begin() - tx.set("a", "tx") - assert tx.get("a") == "tx" - assert self.l.get("a") == "no-tx" - - tx.rollback() - assert self.l.get("a") == "no-tx" \ No newline at end of file diff --git a/client/nodejs/ledis/lib/commands.js b/client/nodejs/ledis/lib/commands.js index 1b77c00..41dc97e 100644 --- a/client/nodejs/ledis/lib/commands.js +++ b/client/nodejs/ledis/lib/commands.js @@ -125,10 +125,6 @@ module.exports = [ "spersist", "sxscan", - "begin", - "rollback", - "commit", - "eval", "evalsha", "script", diff --git a/client/openresty/ledis.lua b/client/openresty/ledis.lua index 07c3f2b..26a384a 100644 --- a/client/openresty/ledis.lua +++ b/client/openresty/ledis.lua @@ -148,11 +148,6 @@ local commands = { "flushall", "flushdb", - -- [[transaction]] - "begin", - "commit", - "rollback", - -- [[script]] "eval", "evalsha", diff --git a/client/openresty/tx_test.lua b/client/openresty/tx_test.lua deleted file mode 100644 index 069ddf9..0000000 --- a/client/openresty/tx_test.lua +++ /dev/null @@ -1,96 +0,0 @@ -local ledis = require "ledis" -local lds = ledis:new() - -lds:set_timeout(1000) - - - - --- connect -local ok, err = lds:connect("127.0.0.1", "6380") -if not ok then - ngx.say("failed to connect:", err) - return -end - -lds:del("tx") - --- transaction - -ok, err = lds:set("tx", "a") -if not ok then - ngx.say("failed to execute set in tx: ", err) - return -end - -ngx.say("SET should be OK <=>", ok) - -res, err = lds:get("tx") -if not res then - ngx.say("failed to execute get in tx: ", err) - return -end - -ngx.say("GET should be a <=>", res) - - - -ok, err = lds:begin() -if not ok then - ngx.say("failed to run begin: ", err) - return -end - -ngx.say("BEGIN should be OK <=>", ok) - -ok, err = lds:set("tx", "b") -if not ok then - ngx.say("failed to execute set in tx: ", err) - return -end - -ngx.say("SET should be OK <=>", ok) - - -res, err = lds:get("tx") -if not res then - ngx.say("failed to execute get in tx: ", err) - return -end - -ngx.say("GET should be b <=>", res) - -ok, err = lds:rollback() -if not ok then - ngx.say("failed to rollback", err) - return -end -ngx.say("ROLLBACK should be OK <=>", ok) - -res, err = lds:get("tx") -if not res then - ngx.say("failed to execute get in tx: ", err) - return -end - -ngx.say("GET should be a <=>", res) - - -lds:begin() -lds:set("tx", "c") -lds:commit() -res, err = lds:get("tx") -if not res then - ngx.say("failed to execute get in tx: ", err) - return -end - -ngx.say("GET should be c <=>", res) - - -local ok, err = lds:close() -if not ok then - ngx.say("failed to close: ", err) - return -end -ngx.say("close success") diff --git a/cmd/ledis-binlog/main.go b/cmd/ledis-binlog/main.go deleted file mode 100644 index 3725920..0000000 --- a/cmd/ledis-binlog/main.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "github.com/siddontang/ledisdb/ledis" - "os" - "time" -) - -var TimeFormat = "2006-01-02 15:04:05" - -var startDateTime = flag.String("start-datetime", "", - "Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.") -var stopDateTime = flag.String("stop-datetime", "", - "Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.") - -var startTime uint32 = 0 -var stopTime uint32 = 0xFFFFFFFF - -func main() { - flag.Usage = func() { - fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0]) - flag.PrintDefaults() - } - - flag.Parse() - - logFile := flag.Arg(0) - f, err := os.Open(logFile) - if err != nil { - println(err.Error()) - return - } - defer f.Close() - - var t time.Time - - if len(*startDateTime) > 0 { - if t, err = time.Parse(TimeFormat, *startDateTime); err != nil { - println("parse start-datetime error: ", err.Error()) - return - } - - startTime = uint32(t.Unix()) - } - - if len(*stopDateTime) > 0 { - if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil { - println("parse stop-datetime error: ", err.Error()) - return - } - - stopTime = uint32(t.Unix()) - } - - rb := bufio.NewReaderSize(f, 4096) - err = ledis.ReadEventFromReader(rb, printEvent) - if err != nil { - println("read event error: ", err.Error()) - return - } -} - -func printEvent(head *ledis.BinLogHead, event []byte) error { - if head.CreateTime < startTime || head.CreateTime > stopTime { - return nil - } - - t := time.Unix(int64(head.CreateTime), 0) - - fmt.Printf("%s ", t.Format(TimeFormat)) - - s, err := ledis.FormatBinLogEvent(event) - if err != nil { - fmt.Printf("%s", err.Error()) - } else { - fmt.Printf(s) - } - - fmt.Printf("\n") - - return nil -} diff --git a/cmd/ledis-cli/const.go b/cmd/ledis-cli/const.go index f816c5e..4fc7f67 100644 --- a/cmd/ledis-cli/const.go +++ b/cmd/ledis-cli/const.go @@ -1,10 +1,9 @@ -//This file was generated by .tools/generate_commands.py on Tue Sep 09 2014 09:48:57 +0800 +//This file was generated by .tools/generate_commands.py on Tue Sep 09 2014 09:48:57 +0800 package main var helpCommands = [][]string{ {"BCOUNT", "key [start end]", "Bitmap"}, {"BDELETE", "key", "ZSet"}, - {"BEGIN", "-", "Transaction"}, {"BEXPIRE", "key seconds", "Bitmap"}, {"BEXPIREAT", "key timestamp", "Bitmap"}, {"BGET", "key", "Bitmap"}, @@ -15,7 +14,6 @@ var helpCommands = [][]string{ {"BSETBIT", "key offset value", "Bitmap"}, {"BTTL", "key", "Bitmap"}, {"BXSCAN", "key [MATCH match] [COUNT count]", "Bitmap"}, - {"COMMIT", "-", "Transaction"}, {"DECR", "key", "KV"}, {"DECRBY", "key decrement", "KV"}, {"DEL", "key [key ...]", "KV"}, @@ -67,7 +65,6 @@ var helpCommands = [][]string{ {"MSET", "key value [key value ...]", "KV"}, {"PERSIST", "key", "KV"}, {"PING", "-", "Server"}, - {"ROLLBACK", "-", "Transaction"}, {"RPOP", "key", "List"}, {"RPUSH", "key value [value ...]", "List"}, {"SADD", "key member [member ...]", "Set"}, diff --git a/cmd/ledis-load/main.go b/cmd/ledis-load/main.go index 34165b8..57e85fb 100644 --- a/cmd/ledis-load/main.go +++ b/cmd/ledis-load/main.go @@ -57,18 +57,6 @@ func loadDump(cfg *config.Config, ldb *ledis.Ledis) error { return err } - var head *ledis.BinLogAnchor - head, err = ldb.LoadDumpFile(*dumpPath) - - if err != nil { - return err - } - - //master enable binlog, here output this like mysql - if head.LogFileIndex != 0 && head.LogPos != 0 { - format := "MASTER_LOG_FILE='binlog.%07d', MASTER_LOG_POS=%d;\n" - fmt.Printf(format, head.LogFileIndex, head.LogPos) - } - - return nil + _, err = ldb.LoadDumpFile(*dumpPath) + return err } diff --git a/config/config.go b/config/config.go index c5736ee..eb0f0fc 100644 --- a/config/config.go +++ b/config/config.go @@ -16,14 +16,6 @@ const ( DefaultDataDir string = "./var" ) -const ( - MaxBinLogFileSize int = 1024 * 1024 * 1024 - MaxBinLogFileNum int = 10000 - - DefaultBinLogFileSize int = MaxBinLogFileSize - DefaultBinLogFileNum int = 10 -) - type LevelDBConfig struct { Compression bool `toml:"compression"` BlockSize int `toml:"block_size"` @@ -37,9 +29,10 @@ type LMDBConfig struct { NoSync bool `toml:"nosync"` } -type BinLogConfig struct { - MaxFileSize int `toml:"max_file_size"` - MaxFileNum int `toml:"max_file_num"` +type ReplicationConfig struct { + Use bool `toml:"use"` + Path string `toml:"path"` + ExpiredLogDays int `toml:"expired_log_days"` } type Config struct { @@ -47,6 +40,8 @@ type Config struct { HttpAddr string `toml:"http_addr"` + SlaveOf string `toml:"slaveof"` + DataDir string `toml:"data_dir"` DBName string `toml:"db_name"` @@ -57,12 +52,9 @@ type Config struct { LMDB LMDBConfig `toml:"lmdb"` - UseBinLog bool `toml:"use_binlog"` - BinLog BinLogConfig `toml:"binlog"` - - SlaveOf string `toml:"slaveof"` - AccessLog string `toml:"access_log"` + + Replication ReplicationConfig `toml:"replication"` } func NewConfigWithFile(fileName string) (*Config, error) { @@ -95,11 +87,6 @@ func NewConfigDefault() *Config { cfg.DBName = DefaultDBName - // disable binlog - cfg.BinLog.MaxFileNum = 0 - cfg.BinLog.MaxFileSize = 0 - - // disable replication cfg.SlaveOf = "" // disable access log @@ -128,17 +115,3 @@ func (cfg *LevelDBConfig) Adjust() { cfg.MaxOpenFiles = 1024 } } - -func (cfg *BinLogConfig) Adjust() { - if cfg.MaxFileSize <= 0 { - cfg.MaxFileSize = DefaultBinLogFileSize - } else if cfg.MaxFileSize > MaxBinLogFileSize { - cfg.MaxFileSize = MaxBinLogFileSize - } - - if cfg.MaxFileNum <= 0 { - cfg.MaxFileNum = DefaultBinLogFileNum - } else if cfg.MaxFileNum > MaxBinLogFileNum { - cfg.MaxFileNum = MaxBinLogFileNum - } -} diff --git a/config/config.toml b/config/config.toml index 29e5c5d..848f52a 100644 --- a/config/config.toml +++ b/config/config.toml @@ -30,9 +30,6 @@ db_name = "leveldb" # If not set, use data_dir/"db_name"_data db_path = "" -# enable binlog or not -use_binlog = true - [leveldb] compression = false block_size = 32768 @@ -44,8 +41,15 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[binlog] -max_file_size = 0 -max_file_num = 0 +[replication] +# enable replication or not +use = true + +# Path to store replication information(write ahead log, commit log, etc.) +# if not set, use data_dir/rpl +path = "" + +# Expire write ahead logs after the given days +expired_log_days = 7 diff --git a/config/config_test.go b/config/config_test.go index 70b4c9c..ff98a7b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -11,7 +11,6 @@ func TestConfig(t *testing.T) { dstCfg.HttpAddr = "127.0.0.1:11181" dstCfg.DataDir = "/tmp/ledis_server" dstCfg.DBName = "leveldb" - dstCfg.UseBinLog = true dstCfg.LevelDB.Compression = false dstCfg.LevelDB.BlockSize = 32768 @@ -21,6 +20,9 @@ func TestConfig(t *testing.T) { dstCfg.LMDB.MapSize = 524288000 dstCfg.LMDB.NoSync = true + dstCfg.Replication.Use = true + dstCfg.Replication.ExpiredLogDays = 7 + cfg, err := NewConfigWithFile("./config.toml") if err != nil { t.Fatal(err) diff --git a/doc/DiffRedis.md b/doc/DiffRedis.md index ee1618c..5c597fe 100644 --- a/doc/DiffRedis.md +++ b/doc/DiffRedis.md @@ -35,15 +35,6 @@ The same for Del. ZSet only support int64 score, not double in Redis. -## Transaction - -LedisDB supports ACID transaction using LMDB or BoltDB, maybe later it will support `multi`, `exec`, `discard`. - -Transaction API: - -+ `begin` -+ `commit` -+ `rollback` ## Scan diff --git a/doc/commands.json b/doc/commands.json index 2ef0bbb..921a688 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -511,22 +511,6 @@ "group": "ZSet", "readonly": false }, - - "BEGIN": { - "arguments": "-", - "group": "Transaction", - "readonly": false - }, - "COMMIT": { - "arguments": "-", - "group": "Transaction", - "readonly": false - }, - "ROLLBACK": { - "arguments": "-", - "group": "Transaction", - "readonly": false - }, "XSCAN": { "arguments": "key [MATCH match] [COUNT count]", diff --git a/doc/commands.md b/doc/commands.md index 183ab81..99c8d7e 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -121,7 +121,7 @@ Table of Contents - [Replication](#replication) - [SLAVEOF host port](#slaveof-host-port) - [FULLSYNC](#fullsync) - - [SYNC index offset](#sync-index-offset) + - [SYNC logid](#sync-logid) - [Server](#server) - [PING](#ping) - [ECHO message](#echo-message) @@ -129,10 +129,6 @@ Table of Contents - [FLUSHALL](#flushall) - [FLUSHDB](#flushdb) - [INFO [section]](#info-section) -- [Transaction](#transaction) - - [BEGIN](#begin) - - [ROLLBACK](#rollback) - - [COMMIT](#commit) - [Script](#script) - [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-) - [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-) @@ -2416,9 +2412,9 @@ FULLSYNC will first try to sync all data from the master, save in local disk, th **Examples** -### SYNC index offset +### SYNC logid -Inner command, syncs the new changed from master set by SLAVEOF at offset in binlog.index file. +Inner command, syncs the new changed from master set by SLAVEOF with logid. **Return value** @@ -2502,69 +2498,6 @@ The optional parameter can be used to select a specific section of information: When no parameter is provided, all will return. -## Transaction - -### BEGIN - -Marks the start of a transaction block. Subsequent commands will be in a transaction context util using COMMIT or ROLLBACK. - -You must known that `BEGIN` will block any other write operators before you `COMMIT` or `ROLLBACK`. Don't use long-time transaction. - -**Return value** - -Returns `OK` if the backend store engine in use supports transaction, otherwise, returns `Err`. - -**Examples** -``` -ledis> BEGIN -OK -ledis> SET HELLO WORLD -OK -ledis> COMMIT -OK -``` - -### ROLLBACK - -Discards all the changes of previously commands in a transaction and restores the connection state to normal. - -**Return value** -Returns `OK` if in a transaction context, otherwise, `Err` - -**Examples** -``` -ledis> BEGIN -OK -ledis> SET HELLO WORLD -OK -ledis> GET HELLO -"WORLD" -ledis> ROLLBACK -OK -ledis> GET HELLO -(nil) -``` - -### COMMIT - -Persists the changes of all the commands in a transaction and restores the connection state to normal. - -**Return value** -Returns `OK` if in a transaction context, otherwise, `Err` - -**Examples** -``` -ledis> BEGIN -OK -ledis> SET HELLO WORLD -OK -ledis> GET HELLO -"WORLD" -ledis> COMMIT -OK -ledis> GET HELLO -"WORLD" -``` ## Script diff --git a/etc/ledis.conf b/etc/ledis.conf index d3adbd8..0d46aee 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -29,6 +29,9 @@ slaveof = "" # db_name = "leveldb" +# if not set, use data_dir/"db_name"_data +db_path = "" + [leveldb] compression = false block_size = 32768 @@ -40,9 +43,8 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[binlog] -# Set either size or num to 0 to disable binlog -max_file_size = 0 -max_file_num = 0 +[wal] +# if not set, use data_dir/wal +path = "" diff --git a/ledis/batch.go b/ledis/batch.go index b23cc47..9086a8b 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -1,6 +1,8 @@ package ledis import ( + "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/rpl" "github.com/siddontang/ledisdb/store" "sync" ) @@ -12,29 +14,39 @@ type batch struct { sync.Locker - logs [][]byte - - tx *Tx + eb *eventBatch } func (b *batch) Commit() error { + if b.l.IsReadOnly() { + return ErrWriteInROnly + } + b.l.commitLock.Lock() defer b.l.commitLock.Unlock() - err := b.WriteBatch.Commit() - - if b.l.binlog != nil { - if err == nil { - if b.tx == nil { - b.l.binlog.Log(b.logs...) - } else { - b.tx.logs = append(b.tx.logs, b.logs...) - } + var err error + if b.l.r != nil { + var l *rpl.Log + if l, err = b.l.r.Log(b.eb.Bytes()); err != nil { + log.Fatal("write wal error %s", err.Error()) + return err } - b.logs = [][]byte{} - } - return err + if err = b.WriteBatch.Commit(); err != nil { + log.Fatal("commit error %s", err.Error()) + return err + } + + if err = b.l.r.UpdateCommitID(l.ID); err != nil { + log.Fatal("update commit id error %s", err.Error()) + return err + } + + return nil + } else { + return b.WriteBatch.Commit() + } } func (b *batch) Lock() { @@ -42,26 +54,25 @@ func (b *batch) Lock() { } func (b *batch) Unlock() { - if b.l.binlog != nil { - b.logs = [][]byte{} - } + b.eb.Reset() + b.WriteBatch.Rollback() b.Locker.Unlock() } func (b *batch) Put(key []byte, value []byte) { - if b.l.binlog != nil { - buf := encodeBinLogPut(key, value) - b.logs = append(b.logs, buf) + if b.l.r != nil { + b.eb.Put(key, value) } + b.WriteBatch.Put(key, value) } func (b *batch) Delete(key []byte) { - if b.l.binlog != nil { - buf := encodeBinLogDelete(key) - b.logs = append(b.logs, buf) + if b.l.r != nil { + b.Delete(key) } + b.WriteBatch.Delete(key) } @@ -80,26 +91,20 @@ func (l *dbBatchLocker) Unlock() { l.wrLock.RUnlock() } -type txBatchLocker struct { -} - -func (l *txBatchLocker) Lock() {} -func (l *txBatchLocker) Unlock() {} - type multiBatchLocker struct { } func (l *multiBatchLocker) Lock() {} func (l *multiBatchLocker) Unlock() {} -func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch { +func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker) *batch { b := new(batch) b.l = l b.WriteBatch = wb - b.tx = tx b.Locker = locker - b.logs = [][]byte{} + b.eb = new(eventBatch) + return b } diff --git a/ledis/binlog.go b/ledis/binlog.go deleted file mode 100644 index 0d9c251..0000000 --- a/ledis/binlog.go +++ /dev/null @@ -1,400 +0,0 @@ -package ledis - -import ( - "bufio" - "encoding/binary" - "fmt" - "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/config" - "io" - "io/ioutil" - "os" - "path" - "strconv" - "strings" - "sync" - "time" -) - -type BinLogHead struct { - CreateTime uint32 - BatchId uint32 - PayloadLen uint32 -} - -func (h *BinLogHead) Len() int { - return 12 -} - -func (h *BinLogHead) Write(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil { - return err - } - - return nil -} - -func (h *BinLogHead) handleReadError(err error) error { - if err == io.EOF { - return io.ErrUnexpectedEOF - } else { - return err - } -} - -func (h *BinLogHead) Read(r io.Reader) error { - var err error - if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil { - return err - } - - if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil { - return h.handleReadError(err) - } - - if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil { - return h.handleReadError(err) - } - - return nil -} - -func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool { - if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId { - return true - } else { - return false - } -} - -/* -index file format: -ledis-bin.00001 -ledis-bin.00002 -ledis-bin.00003 - -log file format - -Log: Head|PayloadData - -Head: createTime|batchId|payloadData - -*/ - -type BinLog struct { - sync.Mutex - - path string - - cfg *config.BinLogConfig - - logFile *os.File - - logWb *bufio.Writer - - indexName string - logNames []string - lastLogIndex int64 - - batchId uint32 - - ch chan struct{} -} - -func NewBinLog(cfg *config.Config) (*BinLog, error) { - l := new(BinLog) - - l.cfg = &cfg.BinLog - l.cfg.Adjust() - - l.path = path.Join(cfg.DataDir, "binlog") - - if err := os.MkdirAll(l.path, 0755); err != nil { - return nil, err - } - - l.logNames = make([]string, 0, 16) - - l.ch = make(chan struct{}) - - if err := l.loadIndex(); err != nil { - return nil, err - } - - return l, nil -} - -func (l *BinLog) flushIndex() error { - data := strings.Join(l.logNames, "\n") - - bakName := fmt.Sprintf("%s.bak", l.indexName) - f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - log.Error("create binlog bak index error %s", err.Error()) - return err - } - - if _, err := f.WriteString(data); err != nil { - log.Error("write binlog index error %s", err.Error()) - f.Close() - return err - } - - f.Close() - - if err := os.Rename(bakName, l.indexName); err != nil { - log.Error("rename binlog bak index error %s", err.Error()) - return err - } - - return nil -} - -func (l *BinLog) loadIndex() error { - l.indexName = path.Join(l.path, fmt.Sprintf("ledis-bin.index")) - if _, err := os.Stat(l.indexName); os.IsNotExist(err) { - //no index file, nothing to do - } else { - indexData, err := ioutil.ReadFile(l.indexName) - if err != nil { - return err - } - - lines := strings.Split(string(indexData), "\n") - for _, line := range lines { - line = strings.Trim(line, "\r\n ") - if len(line) == 0 { - continue - } - - if _, err := os.Stat(path.Join(l.path, line)); err != nil { - log.Error("load index line %s error %s", line, err.Error()) - return err - } else { - l.logNames = append(l.logNames, line) - } - } - } - if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum { - //remove oldest logfile - if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil { - return err - } - } - - var err error - if len(l.logNames) == 0 { - l.lastLogIndex = 1 - } else { - lastName := l.logNames[len(l.logNames)-1] - - if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { - log.Error("invalid logfile name %s", err.Error()) - return err - } - - //like mysql, if server restart, a new binlog will create - l.lastLogIndex++ - } - - return nil -} - -func (l *BinLog) getLogFile() string { - return l.FormatLogFileName(l.lastLogIndex) -} - -func (l *BinLog) openNewLogFile() error { - var err error - lastName := l.getLogFile() - - logPath := path.Join(l.path, lastName) - if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil { - log.Error("open new logfile error %s", err.Error()) - return err - } - - if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum { - l.purge(1) - } - - l.logNames = append(l.logNames, lastName) - - if l.logWb == nil { - l.logWb = bufio.NewWriterSize(l.logFile, 1024) - } else { - l.logWb.Reset(l.logFile) - } - - if err = l.flushIndex(); err != nil { - return err - } - - return nil -} - -func (l *BinLog) checkLogFileSize() bool { - if l.logFile == nil { - return false - } - - st, _ := l.logFile.Stat() - if st.Size() >= int64(l.cfg.MaxFileSize) { - l.closeLog() - return true - } - - return false -} - -func (l *BinLog) closeLog() { - if l.logFile == nil { - return - } - - l.lastLogIndex++ - - l.logFile.Close() - l.logFile = nil -} - -func (l *BinLog) purge(n int) { - if len(l.logNames) < n { - n = len(l.logNames) - } - for i := 0; i < n; i++ { - logPath := path.Join(l.path, l.logNames[i]) - os.Remove(logPath) - } - - copy(l.logNames[0:], l.logNames[n:]) - l.logNames = l.logNames[0 : len(l.logNames)-n] -} - -func (l *BinLog) Close() { - if l.logFile != nil { - l.logFile.Close() - l.logFile = nil - } -} - -func (l *BinLog) LogNames() []string { - return l.logNames -} - -func (l *BinLog) LogFileName() string { - return l.getLogFile() -} - -func (l *BinLog) LogFilePos() int64 { - if l.logFile == nil { - return 0 - } else { - st, _ := l.logFile.Stat() - return st.Size() - } -} - -func (l *BinLog) LogFileIndex() int64 { - return l.lastLogIndex -} - -func (l *BinLog) FormatLogFileName(index int64) string { - return fmt.Sprintf("ledis-bin.%07d", index) -} - -func (l *BinLog) FormatLogFilePath(index int64) string { - return path.Join(l.path, l.FormatLogFileName(index)) -} - -func (l *BinLog) LogPath() string { - return l.path -} - -func (l *BinLog) Purge(n int) error { - l.Lock() - defer l.Unlock() - - if len(l.logNames) == 0 { - return nil - } - - if n >= len(l.logNames) { - n = len(l.logNames) - //can not purge current log file - if l.logNames[n-1] == l.getLogFile() { - n = n - 1 - } - } - - l.purge(n) - - return l.flushIndex() -} - -func (l *BinLog) PurgeAll() error { - l.Lock() - defer l.Unlock() - - l.closeLog() - - l.purge(len(l.logNames)) - - return l.openNewLogFile() -} - -func (l *BinLog) Log(args ...[]byte) error { - l.Lock() - defer l.Unlock() - - var err error - - if l.logFile == nil { - if err = l.openNewLogFile(); err != nil { - return err - } - } - - head := &BinLogHead{} - - head.CreateTime = uint32(time.Now().Unix()) - head.BatchId = l.batchId - - l.batchId++ - - for _, data := range args { - head.PayloadLen = uint32(len(data)) - - if err := head.Write(l.logWb); err != nil { - return err - } - - if _, err := l.logWb.Write(data); err != nil { - return err - } - } - - if err = l.logWb.Flush(); err != nil { - log.Error("write log error %s", err.Error()) - return err - } - - l.checkLogFileSize() - - close(l.ch) - l.ch = make(chan struct{}) - - return nil -} - -func (l *BinLog) Wait() <-chan struct{} { - return l.ch -} diff --git a/ledis/binlog_test.go b/ledis/binlog_test.go deleted file mode 100644 index ea62bd9..0000000 --- a/ledis/binlog_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package ledis - -import ( - "github.com/siddontang/ledisdb/config" - "io/ioutil" - "os" - "testing" -) - -func TestBinLog(t *testing.T) { - cfg := new(config.Config) - - cfg.BinLog.MaxFileNum = 1 - cfg.BinLog.MaxFileSize = 1024 - cfg.DataDir = "/tmp/ledis_binlog" - - os.RemoveAll(cfg.DataDir) - - b, err := NewBinLog(cfg) - if err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if fs, err := ioutil.ReadDir(b.LogPath()); err != nil { - t.Fatal(err) - } else if len(fs) != 2 { - t.Fatal(len(fs)) - } - - if err := b.PurgeAll(); err != nil { - t.Fatal(err) - } - - if fs, err := ioutil.ReadDir(b.LogPath()); err != nil { - t.Fatal(err) - } else if len(fs) != 2 { - t.Fatal(len(fs)) - } else if b.LogFilePos() != 0 { - t.Fatal(b.LogFilePos()) - } -} diff --git a/ledis/const.go b/ledis/const.go index e889f4e..9ad7033 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -23,6 +23,8 @@ const ( ExpTimeType byte = 101 ExpMetaType byte = 102 + + MetaType byte = 201 ) var ( @@ -44,6 +46,11 @@ var ( } ) +const ( + RDWRMode = 0 + ROnlyMode = 1 +) + const ( defaultScanCount int = 10 ) @@ -78,17 +85,11 @@ const ( ) var ( - ErrScoreMiss = errors.New("zset score miss") + ErrScoreMiss = errors.New("zset score miss") + ErrWriteInROnly = errors.New("write in readonly mode") ) const ( - BinLogTypeDeletion uint8 = 0x0 - BinLogTypePut uint8 = 0x1 - BinLogTypeCommand uint8 = 0x2 -) - -const ( - DBAutoCommit uint8 = 0x0 - DBInTransaction uint8 = 0x1 - DBInMulti uint8 = 0x2 + DBAutoCommit uint8 = 0x0 + DBInMulti uint8 = 0x2 ) diff --git a/ledis/doc.go b/ledis/doc.go index 6b04c52..c6bfe78 100644 --- a/ledis/doc.go +++ b/ledis/doc.go @@ -2,7 +2,7 @@ // // Ledis supports various data structure like kv, list, hash and zset like redis. // -// Other features include binlog replication, data with a limited time-to-live. +// Other features include replication, data with a limited time-to-live. // // Usage // @@ -54,8 +54,5 @@ // n, err := db.ZAdd(key, ScorePair{score1, member1}, ScorePair{score2, member2}) // ay, err := db.ZRangeByScore(key, minScore, maxScore, 0, -1) // -// Binlog -// -// ledis supports binlog, so you can sync binlog to another server for replication. If you want to open binlog support, set UseBinLog to true in config. // package ledis diff --git a/ledis/dump.go b/ledis/dump.go index f162481..9b5e439 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -4,41 +4,27 @@ import ( "bufio" "bytes" "encoding/binary" + "github.com/siddontang/go-log/log" "github.com/siddontang/go-snappy/snappy" + "github.com/siddontang/ledisdb/store" "io" "os" ) -//dump format -// fileIndex(bigendian int64)|filePos(bigendian int64) -// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... -// -//key and value are both compressed for fast transfer dump on network using snappy - -type BinLogAnchor struct { - LogFileIndex int64 - LogPos int64 +type DumpHead struct { + CommitID uint64 } -func (m *BinLogAnchor) WriteTo(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { +func (h *DumpHead) Read(r io.Reader) error { + if err := binary.Read(r, binary.BigEndian, &h.CommitID); err != nil { return err } - if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil { - return err - } return nil } -func (m *BinLogAnchor) ReadFrom(r io.Reader) error { - err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) - if err != nil { - return err - } - - err = binary.Read(r, binary.BigEndian, &m.LogPos) - if err != nil { +func (h *DumpHead) Write(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, h.CommitID); err != nil { return err } @@ -56,24 +42,35 @@ func (l *Ledis) DumpFile(path string) error { } func (l *Ledis) Dump(w io.Writer) error { - m := new(BinLogAnchor) - var err error - l.wLock.Lock() - defer l.wLock.Unlock() + var commitID uint64 + var snap *store.Snapshot - if l.binlog != nil { - m.LogFileIndex = l.binlog.LogFileIndex() - m.LogPos = l.binlog.LogFilePos() + { + l.wLock.Lock() + defer l.wLock.Unlock() + + if l.r != nil { + if commitID, err = l.r.LastCommitID(); err != nil { + return err + } + } + + if snap, err = l.ldb.NewSnapshot(); err != nil { + return err + } } wb := bufio.NewWriterSize(w, 4096) - if err = m.WriteTo(wb); err != nil { + + h := &DumpHead{commitID} + + if err = h.Write(wb); err != nil { return err } - it := l.ldb.NewIterator() + it := snap.NewIterator() it.SeekToFirst() compressBuf := make([]byte, 4096) @@ -118,7 +115,8 @@ func (l *Ledis) Dump(w io.Writer) error { return nil } -func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { +// clear all data and load dump file to db +func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -128,16 +126,42 @@ func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { return l.LoadDump(f) } -func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { +func (l *Ledis) clearAllWhenLoad() error { + it := l.ldb.NewIterator() + defer it.Close() + + w := l.ldb.NewWriteBatch() + defer w.Rollback() + + n := 0 + for ; it.Valid(); it.Next() { + n++ + if n == 10000 { + w.Commit() + n = 0 + } + w.Delete(it.RawKey()) + } + + return w.Commit() +} + +// clear all data and load dump file to db +func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { l.wLock.Lock() defer l.wLock.Unlock() - info := new(BinLogAnchor) + var err error + if err = l.clearAllWhenLoad(); err != nil { + log.Fatal("clear all error when loaddump, err :%s", err.Error()) + return nil, err + } rb := bufio.NewReaderSize(r, 4096) - err := info.ReadFrom(rb) - if err != nil { + h := new(DumpHead) + + if err = h.Read(rb); err != nil { return nil, err } @@ -190,10 +214,11 @@ func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { deKeyBuf = nil deValueBuf = nil - //if binlog enable, we will delete all binlogs and open a new one for handling simply - if l.binlog != nil { - l.binlog.PurgeAll() + if l.r != nil { + if err := l.r.UpdateCommitID(h.CommitID); err != nil { + return nil, err + } } - return info, nil + return h, nil } diff --git a/ledis/binlog_util.go b/ledis/event.go similarity index 64% rename from ledis/binlog_util.go rename to ledis/event.go index da058bd..6421766 100644 --- a/ledis/binlog_util.go +++ b/ledis/event.go @@ -1,97 +1,104 @@ package ledis import ( + "bytes" "encoding/binary" "errors" "fmt" + "io" "strconv" ) -var ( - errBinLogDeleteType = errors.New("invalid bin log delete type") - errBinLogPutType = errors.New("invalid bin log put type") - errBinLogCommandType = errors.New("invalid bin log command type") +const ( + kTypeDeleteEvent uint8 = 0 + kTypePutEvent uint8 = 1 ) -func encodeBinLogDelete(key []byte) []byte { - buf := make([]byte, 1+len(key)) - buf[0] = BinLogTypeDeletion - copy(buf[1:], key) - return buf +var ( + errInvalidPutEvent = errors.New("invalid put event") + errInvalidDeleteEvent = errors.New("invalid delete event") + errInvalidEvent = errors.New("invalid event") +) + +type eventBatch struct { + bytes.Buffer } -func decodeBinLogDelete(sz []byte) ([]byte, error) { - if len(sz) < 1 || sz[0] != BinLogTypeDeletion { - return nil, errBinLogDeleteType +func (b *eventBatch) Put(key []byte, value []byte) { + l := uint32(len(key) + len(value) + 1 + 2) + binary.Write(b, binary.BigEndian, l) + b.WriteByte(kTypePutEvent) + keyLen := uint16(len(key)) + binary.Write(b, binary.BigEndian, keyLen) + b.Write(key) + b.Write(value) +} + +func (b *eventBatch) Delete(key []byte) { + l := uint32(len(key) + 1) + binary.Write(b, binary.BigEndian, l) + b.WriteByte(kTypeDeleteEvent) + b.Write(key) +} + +type eventWriter interface { + Put(key []byte, value []byte) + Delete(key []byte) +} + +func decodeEventBatch(w eventWriter, data []byte) error { + for { + if len(data) == 0 { + return nil + } + + if len(data) < 4 { + return io.ErrUnexpectedEOF + } + + l := binary.BigEndian.Uint32(data) + data = data[4:] + if uint32(len(data)) < l { + return io.ErrUnexpectedEOF + } + + if err := decodeEvent(w, data[0:l]); err != nil { + return err + } + data = data[l:] + } +} + +func decodeEvent(w eventWriter, b []byte) error { + if len(b) == 0 { + return errInvalidEvent } - return sz[1:], nil -} + switch b[0] { + case kTypePutEvent: + if len(b[1:]) < 2 { + return errInvalidPutEvent + } -func encodeBinLogPut(key []byte, value []byte) []byte { - buf := make([]byte, 3+len(key)+len(value)) - buf[0] = BinLogTypePut - pos := 1 - binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) - pos += 2 - copy(buf[pos:], key) - pos += len(key) - copy(buf[pos:], value) + keyLen := binary.BigEndian.Uint16(b[1:3]) + b = b[3:] + if len(b) < int(keyLen) { + return errInvalidPutEvent + } - return buf -} - -func decodeBinLogPut(sz []byte) ([]byte, []byte, error) { - if len(sz) < 3 || sz[0] != BinLogTypePut { - return nil, nil, errBinLogPutType - } - - keyLen := int(binary.BigEndian.Uint16(sz[1:])) - if 3+keyLen > len(sz) { - return nil, nil, errBinLogPutType - } - - return sz[3 : 3+keyLen], sz[3+keyLen:], nil -} - -func FormatBinLogEvent(event []byte) (string, error) { - logType := uint8(event[0]) - - var err error - var k []byte - var v []byte - - var buf []byte = make([]byte, 0, 1024) - - switch logType { - case BinLogTypePut: - k, v, err = decodeBinLogPut(event) - buf = append(buf, "PUT "...) - case BinLogTypeDeletion: - k, err = decodeBinLogDelete(event) - buf = append(buf, "DELETE "...) + w.Put(b[0:keyLen], b[keyLen:]) + case kTypeDeleteEvent: + w.Delete(b[1:]) default: - err = errInvalidBinLogEvent + return errInvalidEvent } - if err != nil { - return "", err - } - - if buf, err = formatDataKey(buf, k); err != nil { - return "", err - } - - if v != nil && len(v) != 0 { - buf = append(buf, fmt.Sprintf(" %q", v)...) - } - - return String(buf), nil + return nil } -func formatDataKey(buf []byte, k []byte) ([]byte, error) { +func formatEventKey(buf []byte, k []byte) ([]byte, error) { if len(k) < 2 { - return nil, errInvalidBinLogEvent + return nil, errInvalidEvent } buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) @@ -208,7 +215,7 @@ func formatDataKey(buf []byte, k []byte) ([]byte, error) { buf = strconv.AppendQuote(buf, String(key)) } default: - return nil, errInvalidBinLogEvent + return nil, errInvalidEvent } return buf, nil diff --git a/ledis/event_test.go b/ledis/event_test.go new file mode 100644 index 0000000..d2271e2 --- /dev/null +++ b/ledis/event_test.go @@ -0,0 +1,56 @@ +package ledis + +import ( + "reflect" + "testing" +) + +type testEvent struct { + Key []byte + Value []byte +} + +type testEventWriter struct { + evs []testEvent +} + +func (w *testEventWriter) Put(key []byte, value []byte) { + e := testEvent{key, value} + w.evs = append(w.evs, e) +} + +func (w *testEventWriter) Delete(key []byte) { + e := testEvent{key, nil} + w.evs = append(w.evs, e) +} + +func TestEvent(t *testing.T) { + k1 := []byte("k1") + v1 := []byte("v1") + k2 := []byte("k2") + k3 := []byte("k3") + v3 := []byte("v3") + + b := new(eventBatch) + + b.Put(k1, v1) + b.Delete(k2) + b.Put(k3, v3) + + buf := b.Bytes() + + w := &testEventWriter{} + + ev2 := &testEventWriter{ + evs: []testEvent{ + testEvent{k1, v1}, + testEvent{k2, nil}, + testEvent{k3, v3}}, + } + + if err := decodeEventBatch(w, buf); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(w, ev2) { + t.Fatal("not equal") + } +} diff --git a/ledis/ledis.go b/ledis/ledis.go index bb46152..e33ad74 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/rpl" "github.com/siddontang/ledisdb/store" "sync" "time" @@ -16,15 +17,25 @@ type Ledis struct { dbs [MaxDBNumber]*DB quit chan struct{} - jobs *sync.WaitGroup + wg sync.WaitGroup - binlog *BinLog + r *rpl.Replication + rc chan struct{} + rbatch store.WriteBatch + rwg sync.WaitGroup wLock sync.RWMutex //allow one write at same time commitLock sync.Mutex //allow one write commit at same time + + // for readonly mode, only replication can write + readOnly bool } func Open(cfg *config.Config) (*Ledis, error) { + return Open2(cfg, RDWRMode) +} + +func Open2(cfg *config.Config, flags int) (*Ledis, error) { if len(cfg.DataDir) == 0 { cfg.DataDir = config.DefaultDataDir } @@ -36,39 +47,43 @@ func Open(cfg *config.Config) (*Ledis, error) { l := new(Ledis) + l.readOnly = (flags&ROnlyMode > 0) + l.quit = make(chan struct{}) - l.jobs = new(sync.WaitGroup) l.ldb = ldb - if cfg.UseBinLog { - println("binlog will be refactored later, use your own risk!!!") - l.binlog, err = NewBinLog(cfg) - if err != nil { + if cfg.Replication.Use { + if l.r, err = rpl.NewReplication(cfg); err != nil { return nil, err } + + l.rc = make(chan struct{}) + l.rbatch = l.ldb.NewWriteBatch() + + go l.onReplication() } else { - l.binlog = nil + l.r = nil } for i := uint8(0); i < MaxDBNumber; i++ { l.dbs[i] = l.newDB(i) } - l.activeExpireCycle() + go l.onDataExpired() return l, nil } func (l *Ledis) Close() { close(l.quit) - l.jobs.Wait() + l.wg.Wait() l.ldb.Close() - if l.binlog != nil { - l.binlog.Close() - l.binlog = nil + if l.r != nil { + l.r.Close() + l.r = nil } } @@ -90,34 +105,52 @@ func (l *Ledis) FlushAll() error { return nil } -func (l *Ledis) activeExpireCycle() { +func (l *Ledis) IsReadOnly() bool { + if l.readOnly { + return true + } else if l.r != nil { + if b, _ := l.r.CommitIDBehind(); b { + return true + } + } + return false +} + +func (l *Ledis) SetReadOnly(b bool) { + l.readOnly = b +} + +func (l *Ledis) onDataExpired() { + l.wg.Add(1) + defer l.wg.Done() + var executors []*elimination = make([]*elimination, len(l.dbs)) for i, db := range l.dbs { executors[i] = db.newEliminator() } - l.jobs.Add(1) - go func() { - tick := time.NewTicker(1 * time.Second) - end := false - done := make(chan struct{}) - for !end { - select { - case <-tick.C: - go func() { - for _, eli := range executors { - eli.active() - } - done <- struct{}{} - }() - <-done - case <-l.quit: - end = true + tick := time.NewTicker(1 * time.Second) + defer tick.Stop() + + done := make(chan struct{}) + + for { + select { + case <-tick.C: + if l.IsReadOnly() { break } - } - tick.Stop() - l.jobs.Done() - }() + go func() { + for _, eli := range executors { + eli.active() + } + done <- struct{}{} + }() + <-done + case <-l.quit: + return + } + } + } diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index dd8ff74..70eaf5a 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -64,7 +64,7 @@ func (l *Ledis) newDB(index uint8) *DB { } func (db *DB) newBatch() *batch { - return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil) + return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}) } func (db *DB) Index() int { @@ -109,21 +109,6 @@ func (db *DB) newEliminator() *elimination { return eliminator } -func (db *DB) flushRegion(t *batch, minKey []byte, maxKey []byte) (drop int64, err error) { - it := db.bucket.RangeIterator(minKey, maxKey, store.RangeROpen) - for ; it.Valid(); it.Next() { - t.Delete(it.RawKey()) - drop++ - if drop&1023 == 0 { - if err = t.Commit(); err != nil { - return - } - } - } - it.Close() - return -} - func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) { var deleteFunc func(t *batch, key []byte) int64 var metaDataType byte diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index d5a5476..45f1c7f 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -14,8 +14,6 @@ func getTestDB() *DB { f := func() { cfg := new(config.Config) cfg.DataDir = "/tmp/test_ledis" - // cfg.BinLog.MaxFileSize = 1073741824 - // cfg.BinLog.MaxFileNum = 3 os.RemoveAll(cfg.DataDir) diff --git a/ledis/multi.go b/ledis/multi.go index a549c2c..0ae4727 100644 --- a/ledis/multi.go +++ b/ledis/multi.go @@ -51,7 +51,7 @@ func (db *DB) Multi() (*Multi, error) { } func (m *Multi) newBatch() *batch { - return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil) + return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}) } func (m *Multi) Close() error { diff --git a/ledis/replication.go b/ledis/replication.go index 804573d..3473e6b 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -1,311 +1,167 @@ package ledis import ( - "bufio" "bytes" "errors" + "fmt" "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/rpl" "io" - "os" "time" ) const ( - maxReplBatchNum = 100 - maxReplLogSize = 1 * 1024 * 1024 + maxReplLogSize = 1 * 1024 * 1024 ) var ( - ErrSkipEvent = errors.New("skip to next event") + ErrLogMissed = errors.New("log is pured in server") ) -var ( - errInvalidBinLogEvent = errors.New("invalid binglog event") - errInvalidBinLogFile = errors.New("invalid binlog file") -) +func (l *Ledis) handleReplication() { + l.rwg.Add(1) + var rl *rpl.Log + for { + if err := l.r.NextCommitLog(rl); err != nil { + if err != rpl.ErrNoBehindLog { + log.Error("get next commit log err, %s", err.Error) + } else { + l.rwg.Done() + return + } + } else { + l.rbatch.Rollback() + decodeEventBatch(l.rbatch, rl.Data) -type replBatch struct { - wb driver.IWriteBatch - events [][]byte - l *Ledis - - lastHead *BinLogHead -} - -func (b *replBatch) Commit() error { - b.l.commitLock.Lock() - defer b.l.commitLock.Unlock() - - err := b.wb.Commit() - if err != nil { - b.Rollback() - return err - } - - if b.l.binlog != nil { - if err = b.l.binlog.Log(b.events...); err != nil { - b.Rollback() - return err + if err := l.rbatch.Commit(); err != nil { + log.Error("commit log error %s", err.Error()) + } else if err = l.r.UpdateCommitID(rl.ID); err != nil { + log.Error("update commit id error %s", err.Error()) + } } - } - b.events = [][]byte{} - b.lastHead = nil - - return nil -} - -func (b *replBatch) Rollback() error { - b.wb.Rollback() - b.events = [][]byte{} - b.lastHead = nil - return nil -} - -func (l *Ledis) replicateEvent(b *replBatch, event []byte) error { - if len(event) == 0 { - return errInvalidBinLogEvent - } - - b.events = append(b.events, event) - - logType := uint8(event[0]) - switch logType { - case BinLogTypePut: - return l.replicatePutEvent(b, event) - case BinLogTypeDeletion: - return l.replicateDeleteEvent(b, event) - default: - return errInvalidBinLogEvent } } -func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error { - key, value, err := decodeBinLogPut(event) - if err != nil { - return err +func (l *Ledis) onReplication() { + if l.r == nil { + return } - b.wb.Put(key, value) - - return nil -} - -func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error { - key, err := decodeBinLogDelete(event) - if err != nil { - return err - } - - b.wb.Delete(key) - - return nil -} - -func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { - head := &BinLogHead{} - var err error - for { - if err = head.Read(rb); err != nil { + select { + case <-l.rc: + l.handleReplication() + case <-time.After(5 * time.Second): + l.handleReplication() + } + } +} + +func (l *Ledis) WaitReplication() error { + l.rwg.Wait() + + return nil +} + +func (l *Ledis) StoreLogsFromReader(rb io.Reader) (uint64, error) { + if l.r == nil { + return 0, fmt.Errorf("replication not enable") + } + + var log *rpl.Log + var n uint64 + + for { + if err := log.Decode(rb); err != nil { if err == io.EOF { break } else { - return err + return 0, err } } - var dataBuf bytes.Buffer - - if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { - return err + if err := l.r.StoreLog(log); err != nil { + return 0, err } - err = f(head, dataBuf.Bytes()) - if err != nil && err != ErrSkipEvent { - return err - } + n = log.ID } - return nil + select { + case l.rc <- struct{}{}: + default: + break + } + + return n, nil } -func (l *Ledis) ReplicateFromReader(rb io.Reader) error { - b := new(replBatch) - - b.wb = l.ldb.NewWriteBatch() - b.l = l - - f := func(head *BinLogHead, event []byte) error { - if b.lastHead == nil { - b.lastHead = head - } else if !b.lastHead.InSameBatch(head) { - if err := b.Commit(); err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) - return ErrSkipEvent - } - b.lastHead = head - } - - err := l.replicateEvent(b, event) - if err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) - return ErrSkipEvent - } - return nil - } - - err := ReadEventFromReader(rb, f) - if err != nil { - b.Rollback() - return err - } - return b.Commit() -} - -func (l *Ledis) ReplicateFromData(data []byte) error { +func (l *Ledis) StoreLogsFromData(data []byte) (uint64, error) { rb := bytes.NewReader(data) - err := l.ReplicateFromReader(rb) - - return err + return l.StoreLogsFromReader(rb) } -func (l *Ledis) ReplicateFromBinLog(filePath string) error { - f, err := os.Open(filePath) - if err != nil { - return err +func (l *Ledis) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uint64, err error) { + if l.r == nil { + // no replication log + nextLogID = 0 + return } - rb := bufio.NewReaderSize(f, 4096) + var firtID, lastID uint64 - err = l.ReplicateFromReader(rb) + firtID, err = l.r.FirstLogID() + if err != nil { + return + } - f.Close() + if startLogID < firtID { + err = ErrLogMissed + return + } - return err + lastID, err = l.r.LastLogID() + if err != nil { + return + } + + var log *rpl.Log + for i := startLogID; i <= lastID; i++ { + if err = l.r.GetLog(i, log); err != nil { + return + } + + if err = log.Encode(w); err != nil { + return + } + + nextLogID = i + 1 + + n += log.Size() + + if n > maxReplLogSize { + break + } + } + + return } // try to read events, if no events read, try to wait the new event singal until timeout seconds -func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { - lastIndex := info.LogFileIndex - lastPos := info.LogPos - - n = 0 - if l.binlog == nil { - //binlog not supported - info.LogFileIndex = 0 - info.LogPos = 0 - return - } - - n, err = l.ReadEventsTo(info, w) - if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { - //no events read - select { - case <-l.binlog.Wait(): - case <-time.After(time.Duration(timeout) * time.Second): - } - return l.ReadEventsTo(info, w) - } - return -} - -func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { - n = 0 - if l.binlog == nil { - //binlog not supported - info.LogFileIndex = 0 - info.LogPos = 0 - return - } - - index := info.LogFileIndex - offset := info.LogPos - - filePath := l.binlog.FormatLogFilePath(index) - - var f *os.File - f, err = os.Open(filePath) - if os.IsNotExist(err) { - lastIndex := l.binlog.LogFileIndex() - - if index == lastIndex { - //no binlog at all - info.LogPos = 0 - } else { - //slave binlog info had lost - info.LogFileIndex = -1 - } - } - +func (l *Ledis) ReadLogsToTimeout(startLogID uint64, w io.Writer, timeout int) (n int, nextLogID uint64, err error) { + n, nextLogID, err = l.ReadLogsTo(startLogID, w) if err != nil { - if os.IsNotExist(err) { - err = nil - } + return + } else if n == 0 || nextLogID == 0 { return } - - defer f.Close() - - var fileSize int64 - st, _ := f.Stat() - fileSize = st.Size() - - if fileSize == info.LogPos { - return + //no events read + select { + //case <-l.binlog.Wait(): + case <-time.After(time.Duration(timeout) * time.Second): } + return l.ReadLogsTo(startLogID, w) - if _, err = f.Seek(offset, os.SEEK_SET); err != nil { - //may be invliad seek offset - return - } - - var lastHead *BinLogHead = nil - - head := &BinLogHead{} - - batchNum := 0 - - for { - if err = head.Read(f); err != nil { - if err == io.EOF { - //we will try to use next binlog - if index < l.binlog.LogFileIndex() { - info.LogFileIndex += 1 - info.LogPos = 0 - } - err = nil - return - } else { - return - } - - } - - if lastHead == nil { - lastHead = head - batchNum++ - } else if !lastHead.InSameBatch(head) { - lastHead = head - batchNum++ - if batchNum > maxReplBatchNum || n > maxReplLogSize { - return - } - } - - if err = head.Write(w); err != nil { - return - } - - if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { - return - } - - n += (head.Len() + int(head.PayloadLen)) - info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) - } - - return } diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 8515229..cc3a392 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -6,7 +6,6 @@ import ( "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store" "os" - "path" "testing" ) @@ -34,9 +33,7 @@ func TestReplication(t *testing.T) { cfgM := new(config.Config) cfgM.DataDir = "/tmp/test_repl/master" - cfgM.UseBinLog = true - cfgM.BinLog.MaxFileNum = 10 - cfgM.BinLog.MaxFileSize = 50 + cfgM.Replication.Use = true os.RemoveAll(cfgM.DataDir) @@ -47,6 +44,7 @@ func TestReplication(t *testing.T) { cfgS := new(config.Config) cfgS.DataDir = "/tmp/test_repl/slave" + cfgS.Replication.Use = true os.RemoveAll(cfgS.DataDir) @@ -60,16 +58,9 @@ func TestReplication(t *testing.T) { db.Set([]byte("b"), []byte("value")) db.Set([]byte("c"), []byte("value")) - if tx, err := db.Begin(); err == nil { - tx.HSet([]byte("a"), []byte("1"), []byte("value")) - tx.HSet([]byte("b"), []byte("2"), []byte("value")) - tx.HSet([]byte("c"), []byte("3"), []byte("value")) - tx.Commit() - } else { - db.HSet([]byte("a"), []byte("1"), []byte("value")) - db.HSet([]byte("b"), []byte("2"), []byte("value")) - db.HSet([]byte("c"), []byte("3"), []byte("value")) - } + db.HSet([]byte("a"), []byte("1"), []byte("value")) + db.HSet([]byte("b"), []byte("2"), []byte("value")) + db.HSet([]byte("c"), []byte("3"), []byte("value")) m, _ := db.Multi() m.Set([]byte("a1"), []byte("value")) @@ -77,19 +68,6 @@ func TestReplication(t *testing.T) { m.Set([]byte("c1"), []byte("value")) m.Close() - for _, name := range master.binlog.LogNames() { - p := path.Join(master.binlog.LogPath(), name) - - err = slave.ReplicateFromBinLog(p) - if err != nil { - t.Fatal(err) - } - } - - if err = checkLedisEqual(master, slave); err != nil { - t.Fatal(err) - } - slave.FlushAll() db.Set([]byte("a1"), []byte("value")) @@ -100,38 +78,28 @@ func TestReplication(t *testing.T) { db.HSet([]byte("b1"), []byte("2"), []byte("value")) db.HSet([]byte("c1"), []byte("3"), []byte("value")) - if tx, err := db.Begin(); err == nil { - tx.HSet([]byte("a1"), []byte("1"), []byte("value1")) - tx.HSet([]byte("b1"), []byte("2"), []byte("value1")) - tx.HSet([]byte("c1"), []byte("3"), []byte("value1")) - tx.Rollback() - } - - info := new(BinLogAnchor) - info.LogFileIndex = 1 - info.LogPos = 0 var buf bytes.Buffer var n int - + var id uint64 = 1 + var nid uint64 for { buf.Reset() - n, err = master.ReadEventsTo(info, &buf) + n, id, err = master.ReadLogsTo(id, &buf) if err != nil { t.Fatal(err) - } else if info.LogFileIndex == -1 { - t.Fatal("invalid log file index -1") - } else if info.LogFileIndex == 0 { - t.Fatal("invalid log file index 0") - } else { - if err = slave.ReplicateFromReader(&buf); err != nil { + } else if n != 0 { + if nid, err = slave.StoreLogsFromReader(&buf); err != nil { t.Fatal(err) + } else if nid != id { + t.Fatal(nid, id) } - if n == 0 { - break - } + } else if n == 0 { + break } } + slave.WaitReplication() + if err = checkLedisEqual(master, slave); err != nil { t.Fatal(err) } diff --git a/ledis/scan.go b/ledis/scan.go index 09e2b5c..f7fca13 100644 --- a/ledis/scan.go +++ b/ledis/scan.go @@ -24,17 +24,17 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s if err = checkKeySize(key); err != nil { return nil, err } - if minKey, err = db.encodeMetaKey(dataType, key); err != nil { + if minKey, err = db.encodeScanKey(dataType, key); err != nil { return nil, err } } else { - if minKey, err = db.encodeMinKey(dataType); err != nil { + if minKey, err = db.encodeScanMinKey(dataType); err != nil { return nil, err } } - if maxKey, err = db.encodeMaxKey(dataType); err != nil { + if maxKey, err = db.encodeScanMaxKey(dataType); err != nil { return nil, err } @@ -54,7 +54,7 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s } for i := 0; it.Valid() && i < count && bytes.Compare(it.RawKey(), maxKey) < 0; it.Next() { - if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil { + if k, err := db.decodeScanKey(dataType, it.Key()); err != nil { continue } else if r != nil && !r.Match(k) { continue @@ -67,12 +67,12 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s return v, nil } -func (db *DB) encodeMinKey(dataType byte) ([]byte, error) { - return db.encodeMetaKey(dataType, nil) +func (db *DB) encodeScanMinKey(dataType byte) ([]byte, error) { + return db.encodeScanKey(dataType, nil) } -func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) { - k, err := db.encodeMetaKey(dataType, nil) +func (db *DB) encodeScanMaxKey(dataType byte) ([]byte, error) { + k, err := db.encodeScanKey(dataType, nil) if err != nil { return nil, err } @@ -80,7 +80,7 @@ func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) { return k, nil } -func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) { +func (db *DB) encodeScanKey(dataType byte, key []byte) ([]byte, error) { switch dataType { case KVType: return db.encodeKVKey(key), nil @@ -98,7 +98,7 @@ func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) { return nil, errDataType } } -func (db *DB) decodeMetaKey(dataType byte, ek []byte) ([]byte, error) { +func (db *DB) decodeScanKey(dataType byte, ek []byte) ([]byte, error) { if len(ek) < 2 || ek[0] != db.index || ek[1] != dataType { return nil, errMetaKey } diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 8ee199e..952ddae 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -183,8 +183,6 @@ func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { return 0, err } - //todo add binlog - err = t.Commit() return n, err } diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 1dd540a..fd13436 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -77,8 +77,6 @@ func (db *DB) incr(key []byte, delta int64) (int64, error) { t.Put(key, StrPutInt64(n)) - //todo binlog - err = t.Commit() return n, err } @@ -244,7 +242,6 @@ func (db *DB) MSet(args ...KVPair) error { t.Put(key, value) - //todo binlog } err = t.Commit() @@ -297,8 +294,6 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) { } else { t.Put(key, value) - //todo binlog - err = t.Commit() } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index 3d12606..2d5e2d5 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -111,22 +111,6 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) { } } -func (db *DB) expFlush(t *batch, dataType byte) (err error) { - minKey := make([]byte, 3) - minKey[0] = db.index - minKey[1] = ExpTimeType - minKey[2] = dataType - - maxKey := make([]byte, 3) - maxKey[0] = db.index - maxKey[1] = ExpMetaType - maxKey[2] = dataType + 1 - - _, err = db.flushRegion(t, minKey, maxKey) - err = t.Commit() - return -} - ////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////// diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 47af6ec..50fc6aa 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -305,7 +305,6 @@ func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) { return 0, err } - //todo add binlog err := t.Commit() return num, err } @@ -862,7 +861,6 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg sk := db.zEncodeSizeKey(destKey) t.Put(sk, PutInt64(num)) - //todo add binlog if err := t.Commit(); err != nil { return 0, err } @@ -930,7 +928,7 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg var num int64 = int64(len(destMap)) sk := db.zEncodeSizeKey(destKey) t.Put(sk, PutInt64(num)) - //todo add binlog + if err := t.Commit(); err != nil { return 0, err } diff --git a/ledis/tx.go b/ledis/tx.go deleted file mode 100644 index 6339bae..0000000 --- a/ledis/tx.go +++ /dev/null @@ -1,112 +0,0 @@ -package ledis - -import ( - "errors" - "fmt" - "github.com/siddontang/ledisdb/store" -) - -var ( - ErrNestTx = errors.New("nest transaction not supported") - ErrTxDone = errors.New("Transaction has already been committed or rolled back") -) - -type Tx struct { - *DB - - tx *store.Tx - - logs [][]byte -} - -func (db *DB) IsTransaction() bool { - return db.status == DBInTransaction -} - -// Begin a transaction, it will block all other write operations before calling Commit or Rollback. -// You must be very careful to prevent long-time transaction. -func (db *DB) Begin() (*Tx, error) { - if db.IsTransaction() { - return nil, ErrNestTx - } - - tx := new(Tx) - - tx.DB = new(DB) - tx.DB.l = db.l - - tx.l.wLock.Lock() - - tx.DB.sdb = db.sdb - - var err error - tx.tx, err = db.sdb.Begin() - if err != nil { - tx.l.wLock.Unlock() - return nil, err - } - - tx.DB.bucket = tx.tx - - tx.DB.status = DBInTransaction - - tx.DB.index = db.index - - tx.DB.kvBatch = tx.newBatch() - tx.DB.listBatch = tx.newBatch() - tx.DB.hashBatch = tx.newBatch() - tx.DB.zsetBatch = tx.newBatch() - tx.DB.binBatch = tx.newBatch() - tx.DB.setBatch = tx.newBatch() - - return tx, nil -} - -func (tx *Tx) Commit() error { - if tx.tx == nil { - return ErrTxDone - } - - tx.l.commitLock.Lock() - err := tx.tx.Commit() - tx.tx = nil - - if len(tx.logs) > 0 { - tx.l.binlog.Log(tx.logs...) - } - - tx.l.commitLock.Unlock() - - tx.l.wLock.Unlock() - - tx.DB.bucket = nil - - return err -} - -func (tx *Tx) Rollback() error { - if tx.tx == nil { - return ErrTxDone - } - - err := tx.tx.Rollback() - tx.tx = nil - - tx.l.wLock.Unlock() - tx.DB.bucket = nil - - return err -} - -func (tx *Tx) newBatch() *batch { - return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx) -} - -func (tx *Tx) Select(index int) error { - if index < 0 || index >= int(MaxDBNumber) { - return fmt.Errorf("invalid db index %d", index) - } - - tx.DB.index = uint8(index) - return nil -} diff --git a/ledis/tx_test.go b/ledis/tx_test.go deleted file mode 100644 index 026b70d..0000000 --- a/ledis/tx_test.go +++ /dev/null @@ -1,219 +0,0 @@ -package ledis - -import ( - "github.com/siddontang/ledisdb/config" - "os" - "testing" -) - -func testTxRollback(t *testing.T, db *DB) { - var err error - key1 := []byte("tx_key1") - key2 := []byte("tx_key2") - field2 := []byte("tx_field2") - - err = db.Set(key1, []byte("value")) - if err != nil { - t.Fatal(err) - } - - _, err = db.HSet(key2, field2, []byte("value")) - if err != nil { - t.Fatal(err) - } - - var tx *Tx - tx, err = db.Begin() - if err != nil { - t.Fatal(err) - } - - defer tx.Rollback() - - err = tx.Set(key1, []byte("1")) - - if err != nil { - t.Fatal(err) - } - - _, err = tx.HSet(key2, field2, []byte("2")) - - if err != nil { - t.Fatal(err) - } - - _, err = tx.HSet([]byte("no_key"), field2, []byte("2")) - - if err != nil { - t.Fatal(err) - } - - if v, err := tx.Get(key1); err != nil { - t.Fatal(err) - } else if string(v) != "1" { - t.Fatal(string(v)) - } - - if v, err := tx.HGet(key2, field2); err != nil { - t.Fatal(err) - } else if string(v) != "2" { - t.Fatal(string(v)) - } - - err = tx.Rollback() - if err != nil { - t.Fatal(err) - } - - if v, err := db.Get(key1); err != nil { - t.Fatal(err) - } else if string(v) != "value" { - t.Fatal(string(v)) - } - - if v, err := db.HGet(key2, field2); err != nil { - t.Fatal(err) - } else if string(v) != "value" { - t.Fatal(string(v)) - } -} - -func testTxCommit(t *testing.T, db *DB) { - var err error - key1 := []byte("tx_key1") - key2 := []byte("tx_key2") - field2 := []byte("tx_field2") - - err = db.Set(key1, []byte("value")) - if err != nil { - t.Fatal(err) - } - - _, err = db.HSet(key2, field2, []byte("value")) - if err != nil { - t.Fatal(err) - } - - var tx *Tx - tx, err = db.Begin() - if err != nil { - t.Fatal(err) - } - - defer tx.Rollback() - - err = tx.Set(key1, []byte("1")) - - if err != nil { - t.Fatal(err) - } - - _, err = tx.HSet(key2, field2, []byte("2")) - - if err != nil { - t.Fatal(err) - } - - if v, err := tx.Get(key1); err != nil { - t.Fatal(err) - } else if string(v) != "1" { - t.Fatal(string(v)) - } - - if v, err := tx.HGet(key2, field2); err != nil { - t.Fatal(err) - } else if string(v) != "2" { - t.Fatal(string(v)) - } - - err = tx.Commit() - if err != nil { - t.Fatal(err) - } - - if v, err := db.Get(key1); err != nil { - t.Fatal(err) - } else if string(v) != "1" { - t.Fatal(string(v)) - } - - if v, err := db.HGet(key2, field2); err != nil { - t.Fatal(err) - } else if string(v) != "2" { - t.Fatal(string(v)) - } -} - -func testTxSelect(t *testing.T, db *DB) { - tx, err := db.Begin() - if err != nil { - t.Fatal(err) - } - - defer tx.Rollback() - - tx.Set([]byte("tx_select_1"), []byte("a")) - - tx.Select(1) - - tx.Set([]byte("tx_select_2"), []byte("b")) - - if err = tx.Commit(); err != nil { - t.Fatal(err) - } - - if v, err := db.Get([]byte("tx_select_1")); err != nil { - t.Fatal(err) - } else if string(v) != "a" { - t.Fatal(string(v)) - } - - if v, err := db.Get([]byte("tx_select_2")); err != nil { - t.Fatal(err) - } else if v != nil { - t.Fatal("must nil") - } - - db, _ = db.l.Select(1) - - if v, err := db.Get([]byte("tx_select_2")); err != nil { - t.Fatal(err) - } else if string(v) != "b" { - t.Fatal(string(v)) - } - - if v, err := db.Get([]byte("tx_select_1")); err != nil { - t.Fatal(err) - } else if v != nil { - t.Fatal("must nil") - } -} - -func testTx(t *testing.T, name string) { - cfg := new(config.Config) - cfg.DataDir = "/tmp/ledis_test_tx" - - cfg.DBName = name - cfg.LMDB.MapSize = 10 * 1024 * 1024 - - os.RemoveAll(cfg.DataDir) - - l, err := Open(cfg) - if err != nil { - t.Fatal(err) - } - - defer l.Close() - - db, _ := l.Select(0) - - testTxRollback(t, db) - testTxCommit(t, db) - testTxSelect(t, db) -} - -//only lmdb, boltdb support Transaction -func TestTx(t *testing.T) { - testTx(t, "lmdb") - testTx(t, "boltdb") -} diff --git a/ledis/util.go b/ledis/util.go index 770bca1..258c972 100644 --- a/ledis/util.go +++ b/ledis/util.go @@ -43,6 +43,18 @@ func Int64(v []byte, err error) (int64, error) { return int64(binary.LittleEndian.Uint64(v)), nil } +func Uint64(v []byte, err error) (uint64, error) { + if err != nil { + return 0, err + } else if v == nil || len(v) == 0 { + return 0, nil + } else if len(v) != 8 { + return 0, errIntNumber + } + + return binary.LittleEndian.Uint64(v), nil +} + func PutInt64(v int64) []byte { var b []byte pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b)) diff --git a/rpl/file_store.go b/rpl/file_store.go new file mode 100644 index 0000000..51ca293 --- /dev/null +++ b/rpl/file_store.go @@ -0,0 +1,242 @@ +package rpl + +import ( + "fmt" + "github.com/siddontang/go-log/log" + "io/ioutil" + "os" + "path" + "strconv" + "strings" + "sync" +) + +const ( + defaultMaxLogFileSize = 1024 * 1024 * 1024 +) + +/* +index file format: +ledis-bin.00001 +ledis-bin.00002 +ledis-bin.00003 +*/ + +type FileStore struct { + LogStore + + m sync.Mutex + + maxFileSize int + + first uint64 + last uint64 + + logFile *os.File + logNames []string + nextLogIndex int64 + + indexName string + + path string +} + +func NewFileStore(path string) (*FileStore, error) { + s := new(FileStore) + + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + s.path = path + + s.maxFileSize = defaultMaxLogFileSize + + s.first = 0 + s.last = 0 + + s.logNames = make([]string, 0, 16) + + if err := s.loadIndex(); err != nil { + return nil, err + } + + return s, nil +} + +func (s *FileStore) SetMaxFileSize(size int) { + s.maxFileSize = size +} + +func (s *FileStore) GetLog(id uint64, log *Log) error { + panic("not implementation") + return nil +} + +func (s *FileStore) SeekLog(id uint64, log *Log) error { + panic("not implementation") + return nil +} + +func (s *FileStore) FirstID() (uint64, error) { + panic("not implementation") + return 0, nil +} + +func (s *FileStore) LastID() (uint64, error) { + panic("not implementation") + return 0, nil +} + +func (s *FileStore) StoreLog(log *Log) error { + panic("not implementation") + return nil +} + +func (s *FileStore) StoreLogs(logs []*Log) error { + panic("not implementation") + return nil +} + +func (s *FileStore) Purge(n uint64) error { + panic("not implementation") + return nil +} + +func (s *FileStore) PuregeExpired(n int64) error { + panic("not implementation") + return nil +} + +func (s *FileStore) Clear() error { + panic("not implementation") + return nil +} + +func (s *FileStore) Close() error { + panic("not implementation") + return nil +} + +func (s *FileStore) flushIndex() error { + data := strings.Join(s.logNames, "\n") + + bakName := fmt.Sprintf("%s.bak", s.indexName) + f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Error("create bak index error %s", err.Error()) + return err + } + + if _, err := f.WriteString(data); err != nil { + log.Error("write index error %s", err.Error()) + f.Close() + return err + } + + f.Close() + + if err := os.Rename(bakName, s.indexName); err != nil { + log.Error("rename bak index error %s", err.Error()) + return err + } + + return nil +} + +func (s *FileStore) fileExists(name string) bool { + p := path.Join(s.path, name) + _, err := os.Stat(p) + return !os.IsNotExist(err) +} + +func (s *FileStore) loadIndex() error { + s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index")) + if _, err := os.Stat(s.indexName); os.IsNotExist(err) { + //no index file, nothing to do + } else { + indexData, err := ioutil.ReadFile(s.indexName) + if err != nil { + return err + } + + lines := strings.Split(string(indexData), "\n") + for _, line := range lines { + line = strings.Trim(line, "\r\n ") + if len(line) == 0 { + continue + } + + if s.fileExists(line) { + s.logNames = append(s.logNames, line) + } else { + log.Info("log %s has not exists", line) + } + } + } + + var err error + if len(s.logNames) == 0 { + s.nextLogIndex = 1 + } else { + lastName := s.logNames[len(s.logNames)-1] + + if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { + log.Error("invalid logfile name %s", err.Error()) + return err + } + + //like mysql, if server restart, a new log will create + s.nextLogIndex++ + } + + return nil +} + +func (s *FileStore) openNewLogFile() error { + var err error + lastName := s.formatLogFileName(s.nextLogIndex) + + logPath := path.Join(s.path, lastName) + if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil { + log.Error("open new logfile error %s", err.Error()) + return err + } + + s.logNames = append(s.logNames, lastName) + + if err = s.flushIndex(); err != nil { + return err + } + + return nil +} + +func (s *FileStore) checkLogFileSize() bool { + if s.logFile == nil { + return false + } + + st, _ := s.logFile.Stat() + if st.Size() >= int64(s.maxFileSize) { + s.closeLog() + return true + } + + return false +} + +func (s *FileStore) closeLog() { + if s.logFile == nil { + return + } + + s.nextLogIndex++ + + s.logFile.Close() + s.logFile = nil +} + +func (s *FileStore) formatLogFileName(index int64) string { + return fmt.Sprintf("ledis-bin.%07d", index) +} diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go new file mode 100644 index 0000000..f9d2a7e --- /dev/null +++ b/rpl/goleveldb_store.go @@ -0,0 +1,276 @@ +package rpl + +import ( + "bytes" + "fmt" + "github.com/siddontang/go/num" + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store" + "os" + "sync" + "time" +) + +type GoLevelDBStore struct { + LogStore + + m sync.Mutex + db *store.DB + + cfg *config.Config + + first uint64 + last uint64 +} + +func (s *GoLevelDBStore) FirstID() (uint64, error) { + s.m.Lock() + id, err := s.firstID() + s.m.Unlock() + + return id, err +} + +func (s *GoLevelDBStore) LastID() (uint64, error) { + s.m.Lock() + id, err := s.lastID() + s.m.Unlock() + + return id, err +} + +func (s *GoLevelDBStore) firstID() (uint64, error) { + if s.first != InvalidLogID { + return s.first, nil + } + + it := s.db.NewIterator() + defer it.Close() + + it.SeekToFirst() + + if it.Valid() { + s.first = num.BytesToUint64(it.RawKey()) + } + + return s.first, nil +} + +func (s *GoLevelDBStore) lastID() (uint64, error) { + if s.last != InvalidLogID { + return s.last, nil + } + + it := s.db.NewIterator() + defer it.Close() + + it.SeekToLast() + + if it.Valid() { + s.last = num.BytesToUint64(it.RawKey()) + } + + return s.last, nil +} + +func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error { + v, err := s.db.Get(num.Uint64ToBytes(id)) + if err != nil { + return err + } else if v == nil { + return ErrLogNotFound + } else { + return log.Decode(bytes.NewBuffer(v)) + } +} + +func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error { + it := s.db.NewIterator() + defer it.Close() + + it.Seek(num.Uint64ToBytes(id)) + + if !it.Valid() { + return ErrLogNotFound + } else { + return log.Decode(bytes.NewBuffer(it.RawValue())) + } +} + +func (s *GoLevelDBStore) StoreLog(log *Log) error { + return s.StoreLogs([]*Log{log}) +} + +func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { + s.m.Lock() + defer s.m.Unlock() + + w := s.db.NewWriteBatch() + defer w.Rollback() + + last, err := s.lastID() + if err != nil { + return err + } + + s.last = InvalidLogID + + var buf bytes.Buffer + for _, log := range logs { + buf.Reset() + + if log.ID <= last { + return ErrLessLogID + } + + last = log.ID + key := num.Uint64ToBytes(log.ID) + + if err := log.Encode(&buf); err != nil { + return err + } + w.Put(key, buf.Bytes()) + } + + if err := w.Commit(); err != nil { + return err + } + + s.last = last + return nil +} + +func (s *GoLevelDBStore) Purge(n uint64) error { + s.m.Lock() + defer s.m.Unlock() + + var first, last uint64 + var err error + + first, err = s.firstID() + if err != nil { + return err + } + + last, err = s.lastID() + if err != nil { + return err + } + + start := first + stop := num.MinUint64(last, first+n) + + w := s.db.NewWriteBatch() + defer w.Rollback() + + s.reset() + + for i := start; i < stop; i++ { + w.Delete(num.Uint64ToBytes(i)) + } + + if err = w.Commit(); err != nil { + return err + } + + return nil +} + +func (s *GoLevelDBStore) PurgeExpired(n int64) error { + if n <= 0 { + return fmt.Errorf("invalid expired time %d", n) + } + + t := uint32(time.Now().Unix() - int64(n)) + + s.m.Lock() + defer s.m.Unlock() + + s.reset() + + it := s.db.NewIterator() + it.SeekToFirst() + + w := s.db.NewWriteBatch() + defer w.Rollback() + + l := new(Log) + for ; it.Valid(); it.Next() { + v := it.RawValue() + + if err := l.Unmarshal(v); err != nil { + return err + } else if l.CreateTime > t { + break + } else { + w.Delete(it.RawKey()) + } + } + + if err := w.Commit(); err != nil { + return err + } + + return nil +} + +func (s *GoLevelDBStore) Clear() error { + s.m.Lock() + defer s.m.Unlock() + + if s.db != nil { + s.db.Close() + } + + s.reset() + os.RemoveAll(s.cfg.DBPath) + + return s.open() +} + +func (s *GoLevelDBStore) reset() { + s.first = InvalidLogID + s.last = InvalidLogID +} + +func (s *GoLevelDBStore) Close() error { + s.m.Lock() + defer s.m.Unlock() + + if s.db == nil { + return nil + } + + err := s.db.Close() + s.db = nil + return err +} + +func (s *GoLevelDBStore) open() error { + var err error + + s.first = InvalidLogID + s.last = InvalidLogID + + s.db, err = store.Open(s.cfg) + return err +} + +func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { + cfg := new(config.Config) + cfg.DBName = "goleveldb" + cfg.DBPath = base + cfg.LevelDB.BlockSize = 4 * 1024 * 1024 + cfg.LevelDB.CacheSize = 16 * 1024 * 1024 + cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 + cfg.LevelDB.Compression = false + + s := new(GoLevelDBStore) + s.cfg = cfg + + if err := s.open(); err != nil { + return nil, err + } + + return s, nil +} diff --git a/rpl/log.go b/rpl/log.go new file mode 100644 index 0000000..ad637ca --- /dev/null +++ b/rpl/log.go @@ -0,0 +1,99 @@ +package rpl + +import ( + "bytes" + "encoding/binary" + "io" + "time" +) + +type Log struct { + ID uint64 + CreateTime uint32 + + Data []byte +} + +func NewLog(id uint64, data []byte) *Log { + l := new(Log) + l.ID = id + l.CreateTime = uint32(time.Now().Unix()) + l.Data = data + + return l +} + +func (l *Log) HeadSize() int { + return 16 +} + +func (l *Log) Size() int { + return l.HeadSize() + len(l.Data) +} + +func (l *Log) Marshal() ([]byte, error) { + buf := bytes.NewBuffer(make([]byte, l.HeadSize()+len(l.Data))) + buf.Reset() + + if err := l.Encode(buf); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func (l *Log) Unmarshal(b []byte) error { + buf := bytes.NewBuffer(b) + + return l.Decode(buf) +} + +func (l *Log) Encode(w io.Writer) error { + buf := make([]byte, l.HeadSize()) + + pos := 0 + binary.BigEndian.PutUint64(buf[pos:], l.ID) + pos += 8 + + binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) + pos += 4 + + binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) + + if n, err := w.Write(buf); err != nil { + return err + } else if n != len(buf) { + return io.ErrShortWrite + } + + if n, err := w.Write(l.Data); err != nil { + return err + } else if n != len(l.Data) { + return io.ErrShortWrite + } + return nil +} + +func (l *Log) Decode(r io.Reader) error { + buf := make([]byte, l.HeadSize()) + + if _, err := io.ReadFull(r, buf); err != nil { + return err + } + + pos := 0 + l.ID = binary.BigEndian.Uint64(buf[pos:]) + pos += 8 + + l.CreateTime = binary.BigEndian.Uint32(buf[pos:]) + pos += 4 + + length := binary.BigEndian.Uint32(buf[pos:]) + + l.Data = make([]byte, length) + if _, err := io.ReadFull(r, l.Data); err != nil { + return err + } + + return nil +} diff --git a/rpl/log_test.go b/rpl/log_test.go new file mode 100644 index 0000000..7ef008a --- /dev/null +++ b/rpl/log_test.go @@ -0,0 +1,39 @@ +package rpl + +import ( + "bytes" + "reflect" + "testing" +) + +func TestLog(t *testing.T) { + l1 := &Log{ID: 1, CreateTime: 100, Data: []byte("hello world")} + + var buf bytes.Buffer + + if err := l1.Encode(&buf); err != nil { + t.Fatal(err) + } + + l2 := &Log{} + + if err := l2.Decode(&buf); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(l1, l2) { + t.Fatal("must equal") + } + + if buf, err := l1.Marshal(); err != nil { + t.Fatal(err) + } else { + if err = l2.Unmarshal(buf); err != nil { + t.Fatal(err) + } + } + + if !reflect.DeepEqual(l1, l2) { + t.Fatal("must equal") + } +} diff --git a/rpl/rpl.go b/rpl/rpl.go new file mode 100644 index 0000000..827e825 --- /dev/null +++ b/rpl/rpl.go @@ -0,0 +1,226 @@ +package rpl + +import ( + "encoding/binary" + "fmt" + "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/config" + "os" + "path" + "sync" + "time" +) + +type Replication struct { + m sync.Mutex + + cfg *config.Config + + s LogStore + + commitID uint64 + commitLog *os.File + + quit chan struct{} + + wg sync.WaitGroup +} + +func NewReplication(cfg *config.Config) (*Replication, error) { + if !cfg.Replication.Use { + return nil, fmt.Errorf("replication not enalbed") + } + + if len(cfg.Replication.Path) == 0 { + cfg.Replication.Path = path.Join(cfg.DataDir, "rpl") + } + + base := cfg.Replication.Path + + r := new(Replication) + + r.quit = make(chan struct{}) + + r.cfg = cfg + + var err error + if r.s, err = NewGoLevelDBStore(path.Join(base, "wal")); err != nil { + return nil, err + } + + if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, err + } + + if s, _ := r.commitLog.Stat(); s.Size() == 0 { + r.commitID = 0 + } else if err = binary.Read(r.commitLog, binary.BigEndian, &r.commitID); err != nil { + return nil, err + } + + go r.onPurgeExpired() + + return r, nil +} + +func (r *Replication) Close() error { + close(r.quit) + + r.wg.Wait() + + if r.s != nil { + r.s.Close() + r.s = nil + } + + if r.commitLog != nil { + r.commitLog.Close() + r.commitLog = nil + } + + return nil +} + +func (r *Replication) Log(data []byte) (*Log, error) { + r.m.Lock() + defer r.m.Unlock() + + lastID, err := r.s.LastID() + if err != nil { + return nil, err + } + + commitId := r.commitID + if lastID < commitId { + lastID = commitId + } + + l := new(Log) + l.ID = lastID + 1 + l.CreateTime = uint32(time.Now().Unix()) + + l.Data = data + + if err = r.s.StoreLog(l); err != nil { + return nil, err + } + + return l, nil +} + +func (r *Replication) StoreLog(log *Log) error { + return r.StoreLogs([]*Log{log}) +} + +func (r *Replication) StoreLogs(logs []*Log) error { + r.m.Lock() + defer r.m.Unlock() + + return r.s.StoreLogs(logs) +} + +func (r *Replication) FirstLogID() (uint64, error) { + r.m.Lock() + defer r.m.Unlock() + id, err := r.s.FirstID() + return id, err +} + +func (r *Replication) LastLogID() (uint64, error) { + r.m.Lock() + defer r.m.Unlock() + id, err := r.s.LastID() + return id, err +} + +func (r *Replication) NextSyncID() (uint64, error) { + r.m.Lock() + defer r.m.Unlock() + + lastId, err := r.s.LastID() + if err != nil { + return 0, err + } + + if lastId > r.commitID { + return lastId + 1, nil + } else { + return r.commitID + 1, nil + } +} + +func (r *Replication) LastCommitID() (uint64, error) { + r.m.Lock() + id := r.commitID + r.m.Unlock() + return id, nil +} + +func (r *Replication) UpdateCommitID(id uint64) error { + r.m.Lock() + defer r.m.Unlock() + + if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil { + return err + } + + if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil { + return err + } + + r.commitID = id + + return nil +} + +func (r *Replication) CommitIDBehind() (bool, error) { + r.m.Lock() + defer r.m.Unlock() + + id, err := r.s.LastID() + if err != nil { + return false, err + } + + return id > r.commitID, nil +} + +func (r *Replication) GetLog(id uint64, log *Log) error { + return r.s.GetLog(id, log) +} + +func (r *Replication) NextCommitLog(log *Log) error { + r.m.Lock() + defer r.m.Unlock() + + id, err := r.s.LastID() + if err != nil { + return err + } + + if id <= r.commitID { + return ErrNoBehindLog + } + + return r.s.GetLog(r.commitID+1, log) + +} + +func (r *Replication) onPurgeExpired() { + r.wg.Add(1) + defer r.wg.Done() + + for { + select { + case <-time.After(1 * time.Hour): + n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600) + r.m.Lock() + if err := r.s.PurgeExpired(int64(n)); err != nil { + log.Error("purge expired log error %s", err.Error()) + } + r.m.Unlock() + case <-r.quit: + return + } + } +} diff --git a/rpl/rpl_test.go b/rpl/rpl_test.go new file mode 100644 index 0000000..596f3b2 --- /dev/null +++ b/rpl/rpl_test.go @@ -0,0 +1,45 @@ +package rpl + +import ( + "github.com/siddontang/ledisdb/config" + "io/ioutil" + "os" + "testing" +) + +func TestReplication(t *testing.T) { + dir, err := ioutil.TempDir("", "rpl") + if err != nil { + t.Fatalf("err: %v ", err) + } + defer os.RemoveAll(dir) + + c := new(config.Config) + c.Replication.Use = true + c.Replication.Path = dir + + r, err := NewReplication(c) + if err != nil { + t.Fatal(err) + } + + if l1, err := r.Log([]byte("hello world")); err != nil { + t.Fatal(err) + } else if l1.ID != 1 { + t.Fatal(l1.ID) + } + + if b, _ := r.CommitIDBehind(); !b { + t.Fatal("must backward") + } + + if err := r.UpdateCommitID(1); err != nil { + t.Fatal(err) + } + + if b, _ := r.CommitIDBehind(); b { + t.Fatal("must not backward") + } + + r.Close() +} diff --git a/rpl/store.go b/rpl/store.go new file mode 100644 index 0000000..8d5e8ec --- /dev/null +++ b/rpl/store.go @@ -0,0 +1,40 @@ +package rpl + +import ( + "errors" +) + +const ( + InvalidLogID uint64 = 0 +) + +var ( + ErrLogNotFound = errors.New("log not found") + ErrLessLogID = errors.New("log id is less") + ErrNoBehindLog = errors.New("no behind commit log") +) + +type LogStore interface { + GetLog(id uint64, log *Log) error + + // Get the first log which ID is equal or larger than id + SeekLog(id uint64, log *Log) error + + FirstID() (uint64, error) + LastID() (uint64, error) + + // if log id is less than current last id, return error + StoreLog(log *Log) error + StoreLogs(logs []*Log) error + + // Delete first n logs + Purge(n uint64) error + + // Delete logs before n seconds + PurgeExpired(n int64) error + + // Clear all logs + Clear() error + + Close() error +} diff --git a/rpl/store_test.go b/rpl/store_test.go new file mode 100644 index 0000000..ddb43f0 --- /dev/null +++ b/rpl/store_test.go @@ -0,0 +1,189 @@ +package rpl + +import ( + "io/ioutil" + "os" + "testing" + "time" +) + +func TestGoLevelDBStore(t *testing.T) { + // Create a test dir + dir, err := ioutil.TempDir("", "wal") + if err != nil { + t.Fatalf("err: %v ", err) + } + defer os.RemoveAll(dir) + + // New level + l, err := NewGoLevelDBStore(dir) + if err != nil { + t.Fatalf("err: %v ", err) + } + defer l.Close() + + testLogs(t, l) +} + +func testLogs(t *testing.T, l LogStore) { + // Should be no first index + idx, err := l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Should be no last index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Try a filed fetch + var out Log + if err := l.GetLog(10, &out); err.Error() != "log not found" { + t.Fatalf("err: %v ", err) + } + + // Write out a log + log := Log{ + ID: 1, + Data: []byte("first"), + } + for i := 1; i <= 10; i++ { + log.ID = uint64(i) + if err := l.StoreLog(&log); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Attempt to write multiple logs + var logs []*Log + for i := 11; i <= 20; i++ { + nl := &Log{ + ID: uint64(i), + Data: []byte("first"), + } + logs = append(logs, nl) + } + if err := l.StoreLogs(logs); err != nil { + t.Fatalf("err: %v", err) + } + + // Try to fetch + if err := l.GetLog(10, &out); err != nil { + t.Fatalf("err: %v ", err) + } + + // Try to fetch + if err := l.GetLog(20, &out); err != nil { + t.Fatalf("err: %v ", err) + } + + // Check the lowest index + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 1 { + t.Fatalf("bad idx: %d", idx) + } + + // Check the highest index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 20 { + t.Fatalf("bad idx: %d", idx) + } + + // Delete a suffix + if err := l.Purge(5); err != nil { + t.Fatalf("err: %v ", err) + } + + // Verify they are all deleted + for i := 1; i <= 5; i++ { + if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound { + t.Fatalf("err: %v ", err) + } + } + + // Index should be one + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 6 { + t.Fatalf("bad idx: %d", idx) + } + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 20 { + t.Fatalf("bad idx: %d", idx) + } + + // Should not be able to fetch + if err := l.GetLog(5, &out); err != ErrLogNotFound { + t.Fatalf("err: %v ", err) + } + + if err := l.Clear(); err != nil { + t.Fatal(err) + } + + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + now := uint32(time.Now().Unix()) + logs = []*Log{} + for i := 1; i <= 20; i++ { + nl := &Log{ + ID: uint64(i), + CreateTime: now - 20, + Data: []byte("first"), + } + logs = append(logs, nl) + } + + if err := l.PurgeExpired(1); err != nil { + t.Fatal(err) + } + + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } +} diff --git a/server/client.go b/server/client.go index 27e08b1..83b14f6 100644 --- a/server/client.go +++ b/server/client.go @@ -8,17 +8,6 @@ import ( "time" ) -var txUnsupportedCmds = map[string]struct{}{ - "select": struct{}{}, - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, - "eval": struct{}{}, -} - var scriptUnsupportedCmds = map[string]struct{}{ "slaveof": struct{}{}, "fullsync": struct{}{}, @@ -62,7 +51,6 @@ type client struct { buf bytes.Buffer - tx *ledis.Tx script *ledis.Multi } @@ -89,11 +77,7 @@ func (c *client) perform() { } else if exeCmd, ok := regCmds[c.cmd]; !ok { err = ErrNotFound } else { - if c.db.IsTransaction() { - if _, ok := txUnsupportedCmds[c.cmd]; ok { - err = fmt.Errorf("%s not supported in transaction", c.cmd) - } - } else if c.db.IsInMulti() { + if c.db.IsInMulti() { if _, ok := scriptUnsupportedCmds[c.cmd]; ok { err = fmt.Errorf("%s not supported in multi", c.cmd) } diff --git a/server/client_http.go b/server/client_http.go index 0db0843..28ce7d1 100644 --- a/server/client_http.go +++ b/server/client_http.go @@ -23,9 +23,6 @@ var httpUnsupportedCommands = map[string]struct{}{ "fullsync": struct{}{}, "sync": struct{}{}, "quit": struct{}{}, - "begin": struct{}{}, - "commit": struct{}{}, - "rollback": struct{}{}, } type httpClient struct { diff --git a/server/cmd_replication.go b/server/cmd_replication.go index ec501f6..5fb9fdd 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -70,14 +70,14 @@ var reserveInfoSpace = make([]byte, 16) func syncCommand(c *client) error { args := c.args - if len(args) != 2 { + if len(args) != 1 { return ErrCmdParams } var logIndex int64 var logPos int64 var err error - logIndex, err = ledis.StrInt64(args[0], nil) + logIndex, err = ledis.Str(args[0], nil) if err != nil { return ErrCmdParams } diff --git a/server/cmd_tx.go b/server/cmd_tx.go deleted file mode 100644 index 19eb5c1..0000000 --- a/server/cmd_tx.go +++ /dev/null @@ -1,57 +0,0 @@ -package server - -import ( - "errors" -) - -var errTxMiss = errors.New("transaction miss") - -func beginCommand(c *client) error { - tx, err := c.db.Begin() - if err == nil { - c.tx = tx - c.db = tx.DB - c.resp.writeStatus(OK) - } - - return err -} - -func commitCommand(c *client) error { - if c.tx == nil { - return errTxMiss - } - - err := c.tx.Commit() - c.db, _ = c.ldb.Select(c.tx.Index()) - c.tx = nil - - if err == nil { - c.resp.writeStatus(OK) - } - - return err -} - -func rollbackCommand(c *client) error { - if c.tx == nil { - return errTxMiss - } - - err := c.tx.Rollback() - - c.db, _ = c.ldb.Select(c.tx.Index()) - c.tx = nil - - if err == nil { - c.resp.writeStatus(OK) - } - - return err -} - -func init() { - register("begin", beginCommand) - register("commit", commitCommand) - register("rollback", rollbackCommand) -} diff --git a/server/command.go b/server/command.go index 458343b..0c66542 100644 --- a/server/command.go +++ b/server/command.go @@ -41,13 +41,7 @@ func selectCommand(c *client) error { if index, err := strconv.Atoi(ledis.String(c.args[0])); err != nil { return err } else { - if c.db.IsTransaction() { - if err := c.tx.Select(index); err != nil { - return err - } else { - c.db = c.tx.DB - } - } else if c.db.IsInMulti() { + if c.db.IsInMulti() { if err := c.script.Select(index); err != nil { return err } else {