refactor, can not run at all

This commit is contained in:
siddontang 2014-09-22 17:50:51 +08:00
commit 22109ed0f1
52 changed files with 1667 additions and 1863 deletions

View File

@ -15,3 +15,5 @@ go get github.com/ugorji/go/codec
go get github.com/BurntSushi/toml
go get github.com/siddontang/go-bson/bson
go get github.com/siddontang/go/num

View File

@ -9,8 +9,7 @@ from ledis.exceptions import (
ConnectionError,
DataError,
LedisError,
ResponseError,
TxNotBeginError
ResponseError
)
SYM_EMPTY = b('')
@ -200,11 +199,6 @@ class Ledis(object):
"Set a custom Response Callback"
self.response_callbacks[command] = callback
def tx(self):
return Transaction(
self.connection_pool,
self.response_callbacks)
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def execute_command(self, *args, **options):
@ -970,43 +964,3 @@ class Ledis(object):
def scriptflush(self):
return self.execute_command('SCRIPT', 'FLUSH')
class Transaction(Ledis):
def __init__(self, connection_pool, response_callbacks):
self.connection_pool = connection_pool
self.response_callbacks = response_callbacks
self.connection = None
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
command_name = args[0]
connection = self.connection
if self.connection is None:
raise TxNotBeginError
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except ConnectionError:
connection.disconnect()
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
def begin(self):
self.connection = self.connection_pool.get_connection('begin')
return self.execute_command("BEGIN")
def commit(self):
res = self.execute_command("COMMIT")
self.connection_pool.release(self.connection)
self.connection = None
return res
def rollback(self):
res = self.execute_command("ROLLBACK")
self.connection_pool.release(self.connection)
self.connection = None
return res

View File

@ -35,5 +35,3 @@ class DataError(LedisError):
class ExecAbortError(ResponseError):
pass
class TxNotBeginError(LedisError):
pass

View File

@ -1,46 +0,0 @@
import unittest
import sys
sys.path.append("..")
import ledis
global_l = ledis.Ledis()
#db that do not support transaction
dbs = ["leveldb", "rocksdb", "hyperleveldb", "goleveldb"]
check = global_l.info().get("db_name") in dbs
class TestTx(unittest.TestCase):
def setUp(self):
self.l = ledis.Ledis(port=6380)
def tearDown(self):
self.l.flushdb()
@unittest.skipIf(check, reason="db not support transaction")
def test_commit(self):
tx = self.l.tx()
self.l.set("a", "no-tx")
assert self.l.get("a") == "no-tx"
tx.begin()
tx.set("a", "tx")
assert self.l.get("a") == "no-tx"
assert tx.get("a") == "tx"
tx.commit()
assert self.l.get("a") == "tx"
@unittest.skipIf(check, reason="db not support transaction")
def test_rollback(self):
tx = self.l.tx()
self.l.set("a", "no-tx")
assert self.l.get("a") == "no-tx"
tx.begin()
tx.set("a", "tx")
assert tx.get("a") == "tx"
assert self.l.get("a") == "no-tx"
tx.rollback()
assert self.l.get("a") == "no-tx"

View File

@ -125,10 +125,6 @@ module.exports = [
"spersist",
"sxscan",
"begin",
"rollback",
"commit",
"eval",
"evalsha",
"script",

View File

@ -148,11 +148,6 @@ local commands = {
"flushall",
"flushdb",
-- [[transaction]]
"begin",
"commit",
"rollback",
-- [[script]]
"eval",
"evalsha",

View File

@ -1,96 +0,0 @@
local ledis = require "ledis"
local lds = ledis:new()
lds:set_timeout(1000)
-- connect
local ok, err = lds:connect("127.0.0.1", "6380")
if not ok then
ngx.say("failed to connect:", err)
return
end
lds:del("tx")
-- transaction
ok, err = lds:set("tx", "a")
if not ok then
ngx.say("failed to execute set in tx: ", err)
return
end
ngx.say("SET should be OK <=>", ok)
res, err = lds:get("tx")
if not res then
ngx.say("failed to execute get in tx: ", err)
return
end
ngx.say("GET should be a <=>", res)
ok, err = lds:begin()
if not ok then
ngx.say("failed to run begin: ", err)
return
end
ngx.say("BEGIN should be OK <=>", ok)
ok, err = lds:set("tx", "b")
if not ok then
ngx.say("failed to execute set in tx: ", err)
return
end
ngx.say("SET should be OK <=>", ok)
res, err = lds:get("tx")
if not res then
ngx.say("failed to execute get in tx: ", err)
return
end
ngx.say("GET should be b <=>", res)
ok, err = lds:rollback()
if not ok then
ngx.say("failed to rollback", err)
return
end
ngx.say("ROLLBACK should be OK <=>", ok)
res, err = lds:get("tx")
if not res then
ngx.say("failed to execute get in tx: ", err)
return
end
ngx.say("GET should be a <=>", res)
lds:begin()
lds:set("tx", "c")
lds:commit()
res, err = lds:get("tx")
if not res then
ngx.say("failed to execute get in tx: ", err)
return
end
ngx.say("GET should be c <=>", res)
local ok, err = lds:close()
if not ok then
ngx.say("failed to close: ", err)
return
end
ngx.say("close success")

View File

@ -1,85 +0,0 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/siddontang/ledisdb/ledis"
"os"
"time"
)
var TimeFormat = "2006-01-02 15:04:05"
var startDateTime = flag.String("start-datetime", "",
"Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.")
var stopDateTime = flag.String("stop-datetime", "",
"Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.")
var startTime uint32 = 0
var stopTime uint32 = 0xFFFFFFFF
func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0])
flag.PrintDefaults()
}
flag.Parse()
logFile := flag.Arg(0)
f, err := os.Open(logFile)
if err != nil {
println(err.Error())
return
}
defer f.Close()
var t time.Time
if len(*startDateTime) > 0 {
if t, err = time.Parse(TimeFormat, *startDateTime); err != nil {
println("parse start-datetime error: ", err.Error())
return
}
startTime = uint32(t.Unix())
}
if len(*stopDateTime) > 0 {
if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil {
println("parse stop-datetime error: ", err.Error())
return
}
stopTime = uint32(t.Unix())
}
rb := bufio.NewReaderSize(f, 4096)
err = ledis.ReadEventFromReader(rb, printEvent)
if err != nil {
println("read event error: ", err.Error())
return
}
}
func printEvent(head *ledis.BinLogHead, event []byte) error {
if head.CreateTime < startTime || head.CreateTime > stopTime {
return nil
}
t := time.Unix(int64(head.CreateTime), 0)
fmt.Printf("%s ", t.Format(TimeFormat))
s, err := ledis.FormatBinLogEvent(event)
if err != nil {
fmt.Printf("%s", err.Error())
} else {
fmt.Printf(s)
}
fmt.Printf("\n")
return nil
}

View File

@ -4,7 +4,6 @@ package main
var helpCommands = [][]string{
{"BCOUNT", "key [start end]", "Bitmap"},
{"BDELETE", "key", "ZSet"},
{"BEGIN", "-", "Transaction"},
{"BEXPIRE", "key seconds", "Bitmap"},
{"BEXPIREAT", "key timestamp", "Bitmap"},
{"BGET", "key", "Bitmap"},
@ -15,7 +14,6 @@ var helpCommands = [][]string{
{"BSETBIT", "key offset value", "Bitmap"},
{"BTTL", "key", "Bitmap"},
{"BXSCAN", "key [MATCH match] [COUNT count]", "Bitmap"},
{"COMMIT", "-", "Transaction"},
{"DECR", "key", "KV"},
{"DECRBY", "key decrement", "KV"},
{"DEL", "key [key ...]", "KV"},
@ -67,7 +65,6 @@ var helpCommands = [][]string{
{"MSET", "key value [key value ...]", "KV"},
{"PERSIST", "key", "KV"},
{"PING", "-", "Server"},
{"ROLLBACK", "-", "Transaction"},
{"RPOP", "key", "List"},
{"RPUSH", "key value [value ...]", "List"},
{"SADD", "key member [member ...]", "Set"},

View File

@ -57,18 +57,6 @@ func loadDump(cfg *config.Config, ldb *ledis.Ledis) error {
return err
}
var head *ledis.BinLogAnchor
head, err = ldb.LoadDumpFile(*dumpPath)
if err != nil {
_, err = ldb.LoadDumpFile(*dumpPath)
return err
}
//master enable binlog, here output this like mysql
if head.LogFileIndex != 0 && head.LogPos != 0 {
format := "MASTER_LOG_FILE='binlog.%07d', MASTER_LOG_POS=%d;\n"
fmt.Printf(format, head.LogFileIndex, head.LogPos)
}
return nil
}

View File

@ -16,14 +16,6 @@ const (
DefaultDataDir string = "./var"
)
const (
MaxBinLogFileSize int = 1024 * 1024 * 1024
MaxBinLogFileNum int = 10000
DefaultBinLogFileSize int = MaxBinLogFileSize
DefaultBinLogFileNum int = 10
)
type LevelDBConfig struct {
Compression bool `toml:"compression"`
BlockSize int `toml:"block_size"`
@ -37,9 +29,10 @@ type LMDBConfig struct {
NoSync bool `toml:"nosync"`
}
type BinLogConfig struct {
MaxFileSize int `toml:"max_file_size"`
MaxFileNum int `toml:"max_file_num"`
type ReplicationConfig struct {
Use bool `toml:"use"`
Path string `toml:"path"`
ExpiredLogDays int `toml:"expired_log_days"`
}
type Config struct {
@ -47,6 +40,8 @@ type Config struct {
HttpAddr string `toml:"http_addr"`
SlaveOf string `toml:"slaveof"`
DataDir string `toml:"data_dir"`
DBName string `toml:"db_name"`
@ -57,12 +52,9 @@ type Config struct {
LMDB LMDBConfig `toml:"lmdb"`
UseBinLog bool `toml:"use_binlog"`
BinLog BinLogConfig `toml:"binlog"`
SlaveOf string `toml:"slaveof"`
AccessLog string `toml:"access_log"`
Replication ReplicationConfig `toml:"replication"`
}
func NewConfigWithFile(fileName string) (*Config, error) {
@ -95,11 +87,6 @@ func NewConfigDefault() *Config {
cfg.DBName = DefaultDBName
// disable binlog
cfg.BinLog.MaxFileNum = 0
cfg.BinLog.MaxFileSize = 0
// disable replication
cfg.SlaveOf = ""
// disable access log
@ -128,17 +115,3 @@ func (cfg *LevelDBConfig) Adjust() {
cfg.MaxOpenFiles = 1024
}
}
func (cfg *BinLogConfig) Adjust() {
if cfg.MaxFileSize <= 0 {
cfg.MaxFileSize = DefaultBinLogFileSize
} else if cfg.MaxFileSize > MaxBinLogFileSize {
cfg.MaxFileSize = MaxBinLogFileSize
}
if cfg.MaxFileNum <= 0 {
cfg.MaxFileNum = DefaultBinLogFileNum
} else if cfg.MaxFileNum > MaxBinLogFileNum {
cfg.MaxFileNum = MaxBinLogFileNum
}
}

View File

@ -30,9 +30,6 @@ db_name = "leveldb"
# If not set, use data_dir/"db_name"_data
db_path = ""
# enable binlog or not
use_binlog = true
[leveldb]
compression = false
block_size = 32768
@ -44,8 +41,15 @@ max_open_files = 1024
map_size = 524288000
nosync = true
[binlog]
max_file_size = 0
max_file_num = 0
[replication]
# enable replication or not
use = true
# Path to store replication information(write ahead log, commit log, etc.)
# if not set, use data_dir/rpl
path = ""
# Expire write ahead logs after the given days
expired_log_days = 7

View File

@ -11,7 +11,6 @@ func TestConfig(t *testing.T) {
dstCfg.HttpAddr = "127.0.0.1:11181"
dstCfg.DataDir = "/tmp/ledis_server"
dstCfg.DBName = "leveldb"
dstCfg.UseBinLog = true
dstCfg.LevelDB.Compression = false
dstCfg.LevelDB.BlockSize = 32768
@ -21,6 +20,9 @@ func TestConfig(t *testing.T) {
dstCfg.LMDB.MapSize = 524288000
dstCfg.LMDB.NoSync = true
dstCfg.Replication.Use = true
dstCfg.Replication.ExpiredLogDays = 7
cfg, err := NewConfigWithFile("./config.toml")
if err != nil {
t.Fatal(err)

View File

@ -35,15 +35,6 @@ The same for Del.
ZSet only support int64 score, not double in Redis.
## Transaction
LedisDB supports ACID transaction using LMDB or BoltDB, maybe later it will support `multi`, `exec`, `discard`.
Transaction API:
+ `begin`
+ `commit`
+ `rollback`
## Scan

View File

@ -512,22 +512,6 @@
"readonly": false
},
"BEGIN": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"COMMIT": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"ROLLBACK": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"XSCAN": {
"arguments": "key [MATCH match] [COUNT count]",
"group": "KV",

View File

@ -121,7 +121,7 @@ Table of Contents
- [Replication](#replication)
- [SLAVEOF host port](#slaveof-host-port)
- [FULLSYNC](#fullsync)
- [SYNC index offset](#sync-index-offset)
- [SYNC logid](#sync-logid)
- [Server](#server)
- [PING](#ping)
- [ECHO message](#echo-message)
@ -129,10 +129,6 @@ Table of Contents
- [FLUSHALL](#flushall)
- [FLUSHDB](#flushdb)
- [INFO [section]](#info-section)
- [Transaction](#transaction)
- [BEGIN](#begin)
- [ROLLBACK](#rollback)
- [COMMIT](#commit)
- [Script](#script)
- [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-)
- [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-)
@ -2416,9 +2412,9 @@ FULLSYNC will first try to sync all data from the master, save in local disk, th
**Examples**
### SYNC index offset
### SYNC logid
Inner command, syncs the new changed from master set by SLAVEOF at offset in binlog.index file.
Inner command, syncs the new changed from master set by SLAVEOF with logid.
**Return value**
@ -2502,69 +2498,6 @@ The optional parameter can be used to select a specific section of information:
When no parameter is provided, all will return.
## Transaction
### BEGIN
Marks the start of a transaction block. Subsequent commands will be in a transaction context util using COMMIT or ROLLBACK.
You must known that `BEGIN` will block any other write operators before you `COMMIT` or `ROLLBACK`. Don't use long-time transaction.
**Return value**
Returns `OK` if the backend store engine in use supports transaction, otherwise, returns `Err`.
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> COMMIT
OK
```
### ROLLBACK
Discards all the changes of previously commands in a transaction and restores the connection state to normal.
**Return value**
Returns `OK` if in a transaction context, otherwise, `Err`
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> GET HELLO
"WORLD"
ledis> ROLLBACK
OK
ledis> GET HELLO
(nil)
```
### COMMIT
Persists the changes of all the commands in a transaction and restores the connection state to normal.
**Return value**
Returns `OK` if in a transaction context, otherwise, `Err`
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> GET HELLO
"WORLD"
ledis> COMMIT
OK
ledis> GET HELLO
"WORLD"
```
## Script

View File

@ -29,6 +29,9 @@ slaveof = ""
#
db_name = "leveldb"
# if not set, use data_dir/"db_name"_data
db_path = ""
[leveldb]
compression = false
block_size = 32768
@ -40,9 +43,8 @@ max_open_files = 1024
map_size = 524288000
nosync = true
[binlog]
# Set either size or num to 0 to disable binlog
max_file_size = 0
max_file_num = 0
[wal]
# if not set, use data_dir/wal
path = ""

View File

@ -1,6 +1,8 @@
package ledis
import (
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/rpl"
"github.com/siddontang/ledisdb/store"
"sync"
)
@ -12,56 +14,65 @@ type batch struct {
sync.Locker
logs [][]byte
tx *Tx
eb *eventBatch
}
func (b *batch) Commit() error {
if b.l.IsReadOnly() {
return ErrWriteInROnly
}
b.l.commitLock.Lock()
defer b.l.commitLock.Unlock()
err := b.WriteBatch.Commit()
if b.l.binlog != nil {
if err == nil {
if b.tx == nil {
b.l.binlog.Log(b.logs...)
} else {
b.tx.logs = append(b.tx.logs, b.logs...)
}
}
b.logs = [][]byte{}
}
var err error
if b.l.r != nil {
var l *rpl.Log
if l, err = b.l.r.Log(b.eb.Bytes()); err != nil {
log.Fatal("write wal error %s", err.Error())
return err
}
if err = b.WriteBatch.Commit(); err != nil {
log.Fatal("commit error %s", err.Error())
return err
}
if err = b.l.r.UpdateCommitID(l.ID); err != nil {
log.Fatal("update commit id error %s", err.Error())
return err
}
return nil
} else {
return b.WriteBatch.Commit()
}
}
func (b *batch) Lock() {
b.Locker.Lock()
}
func (b *batch) Unlock() {
if b.l.binlog != nil {
b.logs = [][]byte{}
}
b.eb.Reset()
b.WriteBatch.Rollback()
b.Locker.Unlock()
}
func (b *batch) Put(key []byte, value []byte) {
if b.l.binlog != nil {
buf := encodeBinLogPut(key, value)
b.logs = append(b.logs, buf)
if b.l.r != nil {
b.eb.Put(key, value)
}
b.WriteBatch.Put(key, value)
}
func (b *batch) Delete(key []byte) {
if b.l.binlog != nil {
buf := encodeBinLogDelete(key)
b.logs = append(b.logs, buf)
if b.l.r != nil {
b.Delete(key)
}
b.WriteBatch.Delete(key)
}
@ -80,26 +91,20 @@ func (l *dbBatchLocker) Unlock() {
l.wrLock.RUnlock()
}
type txBatchLocker struct {
}
func (l *txBatchLocker) Lock() {}
func (l *txBatchLocker) Unlock() {}
type multiBatchLocker struct {
}
func (l *multiBatchLocker) Lock() {}
func (l *multiBatchLocker) Unlock() {}
func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker) *batch {
b := new(batch)
b.l = l
b.WriteBatch = wb
b.tx = tx
b.Locker = locker
b.logs = [][]byte{}
b.eb = new(eventBatch)
return b
}

View File

@ -1,400 +0,0 @@
package ledis
import (
"bufio"
"encoding/binary"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/config"
"io"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
)
type BinLogHead struct {
CreateTime uint32
BatchId uint32
PayloadLen uint32
}
func (h *BinLogHead) Len() int {
return 12
}
func (h *BinLogHead) Write(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil {
return err
}
return nil
}
func (h *BinLogHead) handleReadError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
} else {
return err
}
}
func (h *BinLogHead) Read(r io.Reader) error {
var err error
if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil {
return err
}
if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil {
return h.handleReadError(err)
}
if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil {
return h.handleReadError(err)
}
return nil
}
func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool {
if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId {
return true
} else {
return false
}
}
/*
index file format:
ledis-bin.00001
ledis-bin.00002
ledis-bin.00003
log file format
Log: Head|PayloadData
Head: createTime|batchId|payloadData
*/
type BinLog struct {
sync.Mutex
path string
cfg *config.BinLogConfig
logFile *os.File
logWb *bufio.Writer
indexName string
logNames []string
lastLogIndex int64
batchId uint32
ch chan struct{}
}
func NewBinLog(cfg *config.Config) (*BinLog, error) {
l := new(BinLog)
l.cfg = &cfg.BinLog
l.cfg.Adjust()
l.path = path.Join(cfg.DataDir, "binlog")
if err := os.MkdirAll(l.path, 0755); err != nil {
return nil, err
}
l.logNames = make([]string, 0, 16)
l.ch = make(chan struct{})
if err := l.loadIndex(); err != nil {
return nil, err
}
return l, nil
}
func (l *BinLog) flushIndex() error {
data := strings.Join(l.logNames, "\n")
bakName := fmt.Sprintf("%s.bak", l.indexName)
f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
log.Error("create binlog bak index error %s", err.Error())
return err
}
if _, err := f.WriteString(data); err != nil {
log.Error("write binlog index error %s", err.Error())
f.Close()
return err
}
f.Close()
if err := os.Rename(bakName, l.indexName); err != nil {
log.Error("rename binlog bak index error %s", err.Error())
return err
}
return nil
}
func (l *BinLog) loadIndex() error {
l.indexName = path.Join(l.path, fmt.Sprintf("ledis-bin.index"))
if _, err := os.Stat(l.indexName); os.IsNotExist(err) {
//no index file, nothing to do
} else {
indexData, err := ioutil.ReadFile(l.indexName)
if err != nil {
return err
}
lines := strings.Split(string(indexData), "\n")
for _, line := range lines {
line = strings.Trim(line, "\r\n ")
if len(line) == 0 {
continue
}
if _, err := os.Stat(path.Join(l.path, line)); err != nil {
log.Error("load index line %s error %s", line, err.Error())
return err
} else {
l.logNames = append(l.logNames, line)
}
}
}
if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum {
//remove oldest logfile
if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil {
return err
}
}
var err error
if len(l.logNames) == 0 {
l.lastLogIndex = 1
} else {
lastName := l.logNames[len(l.logNames)-1]
if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil {
log.Error("invalid logfile name %s", err.Error())
return err
}
//like mysql, if server restart, a new binlog will create
l.lastLogIndex++
}
return nil
}
func (l *BinLog) getLogFile() string {
return l.FormatLogFileName(l.lastLogIndex)
}
func (l *BinLog) openNewLogFile() error {
var err error
lastName := l.getLogFile()
logPath := path.Join(l.path, lastName)
if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
log.Error("open new logfile error %s", err.Error())
return err
}
if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum {
l.purge(1)
}
l.logNames = append(l.logNames, lastName)
if l.logWb == nil {
l.logWb = bufio.NewWriterSize(l.logFile, 1024)
} else {
l.logWb.Reset(l.logFile)
}
if err = l.flushIndex(); err != nil {
return err
}
return nil
}
func (l *BinLog) checkLogFileSize() bool {
if l.logFile == nil {
return false
}
st, _ := l.logFile.Stat()
if st.Size() >= int64(l.cfg.MaxFileSize) {
l.closeLog()
return true
}
return false
}
func (l *BinLog) closeLog() {
if l.logFile == nil {
return
}
l.lastLogIndex++
l.logFile.Close()
l.logFile = nil
}
func (l *BinLog) purge(n int) {
if len(l.logNames) < n {
n = len(l.logNames)
}
for i := 0; i < n; i++ {
logPath := path.Join(l.path, l.logNames[i])
os.Remove(logPath)
}
copy(l.logNames[0:], l.logNames[n:])
l.logNames = l.logNames[0 : len(l.logNames)-n]
}
func (l *BinLog) Close() {
if l.logFile != nil {
l.logFile.Close()
l.logFile = nil
}
}
func (l *BinLog) LogNames() []string {
return l.logNames
}
func (l *BinLog) LogFileName() string {
return l.getLogFile()
}
func (l *BinLog) LogFilePos() int64 {
if l.logFile == nil {
return 0
} else {
st, _ := l.logFile.Stat()
return st.Size()
}
}
func (l *BinLog) LogFileIndex() int64 {
return l.lastLogIndex
}
func (l *BinLog) FormatLogFileName(index int64) string {
return fmt.Sprintf("ledis-bin.%07d", index)
}
func (l *BinLog) FormatLogFilePath(index int64) string {
return path.Join(l.path, l.FormatLogFileName(index))
}
func (l *BinLog) LogPath() string {
return l.path
}
func (l *BinLog) Purge(n int) error {
l.Lock()
defer l.Unlock()
if len(l.logNames) == 0 {
return nil
}
if n >= len(l.logNames) {
n = len(l.logNames)
//can not purge current log file
if l.logNames[n-1] == l.getLogFile() {
n = n - 1
}
}
l.purge(n)
return l.flushIndex()
}
func (l *BinLog) PurgeAll() error {
l.Lock()
defer l.Unlock()
l.closeLog()
l.purge(len(l.logNames))
return l.openNewLogFile()
}
func (l *BinLog) Log(args ...[]byte) error {
l.Lock()
defer l.Unlock()
var err error
if l.logFile == nil {
if err = l.openNewLogFile(); err != nil {
return err
}
}
head := &BinLogHead{}
head.CreateTime = uint32(time.Now().Unix())
head.BatchId = l.batchId
l.batchId++
for _, data := range args {
head.PayloadLen = uint32(len(data))
if err := head.Write(l.logWb); err != nil {
return err
}
if _, err := l.logWb.Write(data); err != nil {
return err
}
}
if err = l.logWb.Flush(); err != nil {
log.Error("write log error %s", err.Error())
return err
}
l.checkLogFileSize()
close(l.ch)
l.ch = make(chan struct{})
return nil
}
func (l *BinLog) Wait() <-chan struct{} {
return l.ch
}

View File

@ -1,49 +0,0 @@
package ledis
import (
"github.com/siddontang/ledisdb/config"
"io/ioutil"
"os"
"testing"
)
func TestBinLog(t *testing.T) {
cfg := new(config.Config)
cfg.BinLog.MaxFileNum = 1
cfg.BinLog.MaxFileSize = 1024
cfg.DataDir = "/tmp/ledis_binlog"
os.RemoveAll(cfg.DataDir)
b, err := NewBinLog(cfg)
if err != nil {
t.Fatal(err)
}
if err := b.Log(make([]byte, 1024)); err != nil {
t.Fatal(err)
}
if err := b.Log(make([]byte, 1024)); err != nil {
t.Fatal(err)
}
if fs, err := ioutil.ReadDir(b.LogPath()); err != nil {
t.Fatal(err)
} else if len(fs) != 2 {
t.Fatal(len(fs))
}
if err := b.PurgeAll(); err != nil {
t.Fatal(err)
}
if fs, err := ioutil.ReadDir(b.LogPath()); err != nil {
t.Fatal(err)
} else if len(fs) != 2 {
t.Fatal(len(fs))
} else if b.LogFilePos() != 0 {
t.Fatal(b.LogFilePos())
}
}

View File

@ -23,6 +23,8 @@ const (
ExpTimeType byte = 101
ExpMetaType byte = 102
MetaType byte = 201
)
var (
@ -44,6 +46,11 @@ var (
}
)
const (
RDWRMode = 0
ROnlyMode = 1
)
const (
defaultScanCount int = 10
)
@ -79,16 +86,10 @@ const (
var (
ErrScoreMiss = errors.New("zset score miss")
)
const (
BinLogTypeDeletion uint8 = 0x0
BinLogTypePut uint8 = 0x1
BinLogTypeCommand uint8 = 0x2
ErrWriteInROnly = errors.New("write in readonly mode")
)
const (
DBAutoCommit uint8 = 0x0
DBInTransaction uint8 = 0x1
DBInMulti uint8 = 0x2
)

View File

@ -2,7 +2,7 @@
//
// Ledis supports various data structure like kv, list, hash and zset like redis.
//
// Other features include binlog replication, data with a limited time-to-live.
// Other features include replication, data with a limited time-to-live.
//
// Usage
//
@ -54,8 +54,5 @@
// n, err := db.ZAdd(key, ScorePair{score1, member1}, ScorePair{score2, member2})
// ay, err := db.ZRangeByScore(key, minScore, maxScore, 0, -1)
//
// Binlog
//
// ledis supports binlog, so you can sync binlog to another server for replication. If you want to open binlog support, set UseBinLog to true in config.
//
package ledis

View File

@ -4,41 +4,27 @@ import (
"bufio"
"bytes"
"encoding/binary"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-snappy/snappy"
"github.com/siddontang/ledisdb/store"
"io"
"os"
)
//dump format
// fileIndex(bigendian int64)|filePos(bigendian int64)
// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value......
//
//key and value are both compressed for fast transfer dump on network using snappy
type BinLogAnchor struct {
LogFileIndex int64
LogPos int64
type DumpHead struct {
CommitID uint64
}
func (m *BinLogAnchor) WriteTo(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil {
func (h *DumpHead) Read(r io.Reader) error {
if err := binary.Read(r, binary.BigEndian, &h.CommitID); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil {
return err
}
return nil
}
func (m *BinLogAnchor) ReadFrom(r io.Reader) error {
err := binary.Read(r, binary.BigEndian, &m.LogFileIndex)
if err != nil {
return err
}
err = binary.Read(r, binary.BigEndian, &m.LogPos)
if err != nil {
func (h *DumpHead) Write(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, h.CommitID); err != nil {
return err
}
@ -56,24 +42,35 @@ func (l *Ledis) DumpFile(path string) error {
}
func (l *Ledis) Dump(w io.Writer) error {
m := new(BinLogAnchor)
var err error
var commitID uint64
var snap *store.Snapshot
{
l.wLock.Lock()
defer l.wLock.Unlock()
if l.binlog != nil {
m.LogFileIndex = l.binlog.LogFileIndex()
m.LogPos = l.binlog.LogFilePos()
if l.r != nil {
if commitID, err = l.r.LastCommitID(); err != nil {
return err
}
}
if snap, err = l.ldb.NewSnapshot(); err != nil {
return err
}
}
wb := bufio.NewWriterSize(w, 4096)
if err = m.WriteTo(wb); err != nil {
h := &DumpHead{commitID}
if err = h.Write(wb); err != nil {
return err
}
it := l.ldb.NewIterator()
it := snap.NewIterator()
it.SeekToFirst()
compressBuf := make([]byte, 4096)
@ -118,7 +115,8 @@ func (l *Ledis) Dump(w io.Writer) error {
return nil
}
func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) {
// clear all data and load dump file to db
func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
@ -128,16 +126,42 @@ func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) {
return l.LoadDump(f)
}
func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) {
func (l *Ledis) clearAllWhenLoad() error {
it := l.ldb.NewIterator()
defer it.Close()
w := l.ldb.NewWriteBatch()
defer w.Rollback()
n := 0
for ; it.Valid(); it.Next() {
n++
if n == 10000 {
w.Commit()
n = 0
}
w.Delete(it.RawKey())
}
return w.Commit()
}
// clear all data and load dump file to db
func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) {
l.wLock.Lock()
defer l.wLock.Unlock()
info := new(BinLogAnchor)
var err error
if err = l.clearAllWhenLoad(); err != nil {
log.Fatal("clear all error when loaddump, err :%s", err.Error())
return nil, err
}
rb := bufio.NewReaderSize(r, 4096)
err := info.ReadFrom(rb)
if err != nil {
h := new(DumpHead)
if err = h.Read(rb); err != nil {
return nil, err
}
@ -190,10 +214,11 @@ func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) {
deKeyBuf = nil
deValueBuf = nil
//if binlog enable, we will delete all binlogs and open a new one for handling simply
if l.binlog != nil {
l.binlog.PurgeAll()
if l.r != nil {
if err := l.r.UpdateCommitID(h.CommitID); err != nil {
return nil, err
}
}
return info, nil
return h, nil
}

View File

@ -1,97 +1,104 @@
package ledis
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"strconv"
)
var (
errBinLogDeleteType = errors.New("invalid bin log delete type")
errBinLogPutType = errors.New("invalid bin log put type")
errBinLogCommandType = errors.New("invalid bin log command type")
const (
kTypeDeleteEvent uint8 = 0
kTypePutEvent uint8 = 1
)
func encodeBinLogDelete(key []byte) []byte {
buf := make([]byte, 1+len(key))
buf[0] = BinLogTypeDeletion
copy(buf[1:], key)
return buf
var (
errInvalidPutEvent = errors.New("invalid put event")
errInvalidDeleteEvent = errors.New("invalid delete event")
errInvalidEvent = errors.New("invalid event")
)
type eventBatch struct {
bytes.Buffer
}
func decodeBinLogDelete(sz []byte) ([]byte, error) {
if len(sz) < 1 || sz[0] != BinLogTypeDeletion {
return nil, errBinLogDeleteType
func (b *eventBatch) Put(key []byte, value []byte) {
l := uint32(len(key) + len(value) + 1 + 2)
binary.Write(b, binary.BigEndian, l)
b.WriteByte(kTypePutEvent)
keyLen := uint16(len(key))
binary.Write(b, binary.BigEndian, keyLen)
b.Write(key)
b.Write(value)
}
return sz[1:], nil
func (b *eventBatch) Delete(key []byte) {
l := uint32(len(key) + 1)
binary.Write(b, binary.BigEndian, l)
b.WriteByte(kTypeDeleteEvent)
b.Write(key)
}
func encodeBinLogPut(key []byte, value []byte) []byte {
buf := make([]byte, 3+len(key)+len(value))
buf[0] = BinLogTypePut
pos := 1
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
pos += 2
copy(buf[pos:], key)
pos += len(key)
copy(buf[pos:], value)
return buf
type eventWriter interface {
Put(key []byte, value []byte)
Delete(key []byte)
}
func decodeBinLogPut(sz []byte) ([]byte, []byte, error) {
if len(sz) < 3 || sz[0] != BinLogTypePut {
return nil, nil, errBinLogPutType
func decodeEventBatch(w eventWriter, data []byte) error {
for {
if len(data) == 0 {
return nil
}
keyLen := int(binary.BigEndian.Uint16(sz[1:]))
if 3+keyLen > len(sz) {
return nil, nil, errBinLogPutType
if len(data) < 4 {
return io.ErrUnexpectedEOF
}
return sz[3 : 3+keyLen], sz[3+keyLen:], nil
l := binary.BigEndian.Uint32(data)
data = data[4:]
if uint32(len(data)) < l {
return io.ErrUnexpectedEOF
}
func FormatBinLogEvent(event []byte) (string, error) {
logType := uint8(event[0])
if err := decodeEvent(w, data[0:l]); err != nil {
return err
}
data = data[l:]
}
}
var err error
var k []byte
var v []byte
func decodeEvent(w eventWriter, b []byte) error {
if len(b) == 0 {
return errInvalidEvent
}
var buf []byte = make([]byte, 0, 1024)
switch b[0] {
case kTypePutEvent:
if len(b[1:]) < 2 {
return errInvalidPutEvent
}
switch logType {
case BinLogTypePut:
k, v, err = decodeBinLogPut(event)
buf = append(buf, "PUT "...)
case BinLogTypeDeletion:
k, err = decodeBinLogDelete(event)
buf = append(buf, "DELETE "...)
keyLen := binary.BigEndian.Uint16(b[1:3])
b = b[3:]
if len(b) < int(keyLen) {
return errInvalidPutEvent
}
w.Put(b[0:keyLen], b[keyLen:])
case kTypeDeleteEvent:
w.Delete(b[1:])
default:
err = errInvalidBinLogEvent
return errInvalidEvent
}
if err != nil {
return "", err
return nil
}
if buf, err = formatDataKey(buf, k); err != nil {
return "", err
}
if v != nil && len(v) != 0 {
buf = append(buf, fmt.Sprintf(" %q", v)...)
}
return String(buf), nil
}
func formatDataKey(buf []byte, k []byte) ([]byte, error) {
func formatEventKey(buf []byte, k []byte) ([]byte, error) {
if len(k) < 2 {
return nil, errInvalidBinLogEvent
return nil, errInvalidEvent
}
buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...)
@ -208,7 +215,7 @@ func formatDataKey(buf []byte, k []byte) ([]byte, error) {
buf = strconv.AppendQuote(buf, String(key))
}
default:
return nil, errInvalidBinLogEvent
return nil, errInvalidEvent
}
return buf, nil

56
ledis/event_test.go Normal file
View File

@ -0,0 +1,56 @@
package ledis
import (
"reflect"
"testing"
)
type testEvent struct {
Key []byte
Value []byte
}
type testEventWriter struct {
evs []testEvent
}
func (w *testEventWriter) Put(key []byte, value []byte) {
e := testEvent{key, value}
w.evs = append(w.evs, e)
}
func (w *testEventWriter) Delete(key []byte) {
e := testEvent{key, nil}
w.evs = append(w.evs, e)
}
func TestEvent(t *testing.T) {
k1 := []byte("k1")
v1 := []byte("v1")
k2 := []byte("k2")
k3 := []byte("k3")
v3 := []byte("v3")
b := new(eventBatch)
b.Put(k1, v1)
b.Delete(k2)
b.Put(k3, v3)
buf := b.Bytes()
w := &testEventWriter{}
ev2 := &testEventWriter{
evs: []testEvent{
testEvent{k1, v1},
testEvent{k2, nil},
testEvent{k3, v3}},
}
if err := decodeEventBatch(w, buf); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(w, ev2) {
t.Fatal("not equal")
}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/rpl"
"github.com/siddontang/ledisdb/store"
"sync"
"time"
@ -16,15 +17,25 @@ type Ledis struct {
dbs [MaxDBNumber]*DB
quit chan struct{}
jobs *sync.WaitGroup
wg sync.WaitGroup
binlog *BinLog
r *rpl.Replication
rc chan struct{}
rbatch store.WriteBatch
rwg sync.WaitGroup
wLock sync.RWMutex //allow one write at same time
commitLock sync.Mutex //allow one write commit at same time
// for readonly mode, only replication can write
readOnly bool
}
func Open(cfg *config.Config) (*Ledis, error) {
return Open2(cfg, RDWRMode)
}
func Open2(cfg *config.Config, flags int) (*Ledis, error) {
if len(cfg.DataDir) == 0 {
cfg.DataDir = config.DefaultDataDir
}
@ -36,39 +47,43 @@ func Open(cfg *config.Config) (*Ledis, error) {
l := new(Ledis)
l.readOnly = (flags&ROnlyMode > 0)
l.quit = make(chan struct{})
l.jobs = new(sync.WaitGroup)
l.ldb = ldb
if cfg.UseBinLog {
println("binlog will be refactored later, use your own risk!!!")
l.binlog, err = NewBinLog(cfg)
if err != nil {
if cfg.Replication.Use {
if l.r, err = rpl.NewReplication(cfg); err != nil {
return nil, err
}
l.rc = make(chan struct{})
l.rbatch = l.ldb.NewWriteBatch()
go l.onReplication()
} else {
l.binlog = nil
l.r = nil
}
for i := uint8(0); i < MaxDBNumber; i++ {
l.dbs[i] = l.newDB(i)
}
l.activeExpireCycle()
go l.onDataExpired()
return l, nil
}
func (l *Ledis) Close() {
close(l.quit)
l.jobs.Wait()
l.wg.Wait()
l.ldb.Close()
if l.binlog != nil {
l.binlog.Close()
l.binlog = nil
if l.r != nil {
l.r.Close()
l.r = nil
}
}
@ -90,20 +105,42 @@ func (l *Ledis) FlushAll() error {
return nil
}
func (l *Ledis) activeExpireCycle() {
func (l *Ledis) IsReadOnly() bool {
if l.readOnly {
return true
} else if l.r != nil {
if b, _ := l.r.CommitIDBehind(); b {
return true
}
}
return false
}
func (l *Ledis) SetReadOnly(b bool) {
l.readOnly = b
}
func (l *Ledis) onDataExpired() {
l.wg.Add(1)
defer l.wg.Done()
var executors []*elimination = make([]*elimination, len(l.dbs))
for i, db := range l.dbs {
executors[i] = db.newEliminator()
}
l.jobs.Add(1)
go func() {
tick := time.NewTicker(1 * time.Second)
end := false
defer tick.Stop()
done := make(chan struct{})
for !end {
for {
select {
case <-tick.C:
if l.IsReadOnly() {
break
}
go func() {
for _, eli := range executors {
eli.active()
@ -112,12 +149,8 @@ func (l *Ledis) activeExpireCycle() {
}()
<-done
case <-l.quit:
end = true
break
return
}
}
tick.Stop()
l.jobs.Done()
}()
}

View File

@ -64,7 +64,7 @@ func (l *Ledis) newDB(index uint8) *DB {
}
func (db *DB) newBatch() *batch {
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock})
}
func (db *DB) Index() int {
@ -109,21 +109,6 @@ func (db *DB) newEliminator() *elimination {
return eliminator
}
func (db *DB) flushRegion(t *batch, minKey []byte, maxKey []byte) (drop int64, err error) {
it := db.bucket.RangeIterator(minKey, maxKey, store.RangeROpen)
for ; it.Valid(); it.Next() {
t.Delete(it.RawKey())
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {
return
}
}
}
it.Close()
return
}
func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
var deleteFunc func(t *batch, key []byte) int64
var metaDataType byte

View File

@ -14,8 +14,6 @@ func getTestDB() *DB {
f := func() {
cfg := new(config.Config)
cfg.DataDir = "/tmp/test_ledis"
// cfg.BinLog.MaxFileSize = 1073741824
// cfg.BinLog.MaxFileNum = 3
os.RemoveAll(cfg.DataDir)

View File

@ -51,7 +51,7 @@ func (db *DB) Multi() (*Multi, error) {
}
func (m *Multi) newBatch() *batch {
return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{})
}
func (m *Multi) Close() error {

View File

@ -1,311 +1,167 @@
package ledis
import (
"bufio"
"bytes"
"errors"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/store/driver"
"github.com/siddontang/ledisdb/rpl"
"io"
"os"
"time"
)
const (
maxReplBatchNum = 100
maxReplLogSize = 1 * 1024 * 1024
)
var (
ErrSkipEvent = errors.New("skip to next event")
ErrLogMissed = errors.New("log is pured in server")
)
var (
errInvalidBinLogEvent = errors.New("invalid binglog event")
errInvalidBinLogFile = errors.New("invalid binlog file")
)
type replBatch struct {
wb driver.IWriteBatch
events [][]byte
l *Ledis
lastHead *BinLogHead
func (l *Ledis) handleReplication() {
l.rwg.Add(1)
var rl *rpl.Log
for {
if err := l.r.NextCommitLog(rl); err != nil {
if err != rpl.ErrNoBehindLog {
log.Error("get next commit log err, %s", err.Error)
} else {
l.rwg.Done()
return
}
} else {
l.rbatch.Rollback()
decodeEventBatch(l.rbatch, rl.Data)
func (b *replBatch) Commit() error {
b.l.commitLock.Lock()
defer b.l.commitLock.Unlock()
err := b.wb.Commit()
if err != nil {
b.Rollback()
return err
}
if b.l.binlog != nil {
if err = b.l.binlog.Log(b.events...); err != nil {
b.Rollback()
return err
if err := l.rbatch.Commit(); err != nil {
log.Error("commit log error %s", err.Error())
} else if err = l.r.UpdateCommitID(rl.ID); err != nil {
log.Error("update commit id error %s", err.Error())
}
}
b.events = [][]byte{}
b.lastHead = nil
return nil
}
func (b *replBatch) Rollback() error {
b.wb.Rollback()
b.events = [][]byte{}
b.lastHead = nil
return nil
}
func (l *Ledis) replicateEvent(b *replBatch, event []byte) error {
if len(event) == 0 {
return errInvalidBinLogEvent
}
b.events = append(b.events, event)
logType := uint8(event[0])
switch logType {
case BinLogTypePut:
return l.replicatePutEvent(b, event)
case BinLogTypeDeletion:
return l.replicateDeleteEvent(b, event)
default:
return errInvalidBinLogEvent
}
}
func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error {
key, value, err := decodeBinLogPut(event)
if err != nil {
return err
func (l *Ledis) onReplication() {
if l.r == nil {
return
}
b.wb.Put(key, value)
return nil
}
func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error {
key, err := decodeBinLogDelete(event)
if err != nil {
return err
}
b.wb.Delete(key)
return nil
}
func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
head := &BinLogHead{}
var err error
for {
if err = head.Read(rb); err != nil {
select {
case <-l.rc:
l.handleReplication()
case <-time.After(5 * time.Second):
l.handleReplication()
}
}
}
func (l *Ledis) WaitReplication() error {
l.rwg.Wait()
return nil
}
func (l *Ledis) StoreLogsFromReader(rb io.Reader) (uint64, error) {
if l.r == nil {
return 0, fmt.Errorf("replication not enable")
}
var log *rpl.Log
var n uint64
for {
if err := log.Decode(rb); err != nil {
if err == io.EOF {
break
} else {
return err
return 0, err
}
}
var dataBuf bytes.Buffer
if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
return err
if err := l.r.StoreLog(log); err != nil {
return 0, err
}
err = f(head, dataBuf.Bytes())
if err != nil && err != ErrSkipEvent {
return err
}
n = log.ID
}
return nil
select {
case l.rc <- struct{}{}:
default:
break
}
func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
b := new(replBatch)
b.wb = l.ldb.NewWriteBatch()
b.l = l
f := func(head *BinLogHead, event []byte) error {
if b.lastHead == nil {
b.lastHead = head
} else if !b.lastHead.InSameBatch(head) {
if err := b.Commit(); err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
b.lastHead = head
return n, nil
}
err := l.replicateEvent(b, event)
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
return nil
}
err := ReadEventFromReader(rb, f)
if err != nil {
b.Rollback()
return err
}
return b.Commit()
}
func (l *Ledis) ReplicateFromData(data []byte) error {
func (l *Ledis) StoreLogsFromData(data []byte) (uint64, error) {
rb := bytes.NewReader(data)
err := l.ReplicateFromReader(rb)
return err
return l.StoreLogsFromReader(rb)
}
func (l *Ledis) ReplicateFromBinLog(filePath string) error {
f, err := os.Open(filePath)
func (l *Ledis) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uint64, err error) {
if l.r == nil {
// no replication log
nextLogID = 0
return
}
var firtID, lastID uint64
firtID, err = l.r.FirstLogID()
if err != nil {
return err
return
}
rb := bufio.NewReaderSize(f, 4096)
if startLogID < firtID {
err = ErrLogMissed
return
}
err = l.ReplicateFromReader(rb)
lastID, err = l.r.LastLogID()
if err != nil {
return
}
f.Close()
var log *rpl.Log
for i := startLogID; i <= lastID; i++ {
if err = l.r.GetLog(i, log); err != nil {
return
}
return err
if err = log.Encode(w); err != nil {
return
}
nextLogID = i + 1
n += log.Size()
if n > maxReplLogSize {
break
}
}
return
}
// try to read events, if no events read, try to wait the new event singal until timeout seconds
func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
lastIndex := info.LogFileIndex
lastPos := info.LogPos
n = 0
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
info.LogPos = 0
func (l *Ledis) ReadLogsToTimeout(startLogID uint64, w io.Writer, timeout int) (n int, nextLogID uint64, err error) {
n, nextLogID, err = l.ReadLogsTo(startLogID, w)
if err != nil {
return
} else if n == 0 || nextLogID == 0 {
return
}
n, err = l.ReadEventsTo(info, w)
if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
//no events read
select {
case <-l.binlog.Wait():
//case <-l.binlog.Wait():
case <-time.After(time.Duration(timeout) * time.Second):
}
return l.ReadEventsTo(info, w)
}
return
}
func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
n = 0
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
info.LogPos = 0
return
}
index := info.LogFileIndex
offset := info.LogPos
filePath := l.binlog.FormatLogFilePath(index)
var f *os.File
f, err = os.Open(filePath)
if os.IsNotExist(err) {
lastIndex := l.binlog.LogFileIndex()
if index == lastIndex {
//no binlog at all
info.LogPos = 0
} else {
//slave binlog info had lost
info.LogFileIndex = -1
}
}
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return
}
defer f.Close()
var fileSize int64
st, _ := f.Stat()
fileSize = st.Size()
if fileSize == info.LogPos {
return
}
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
//may be invliad seek offset
return
}
var lastHead *BinLogHead = nil
head := &BinLogHead{}
batchNum := 0
for {
if err = head.Read(f); err != nil {
if err == io.EOF {
//we will try to use next binlog
if index < l.binlog.LogFileIndex() {
info.LogFileIndex += 1
info.LogPos = 0
}
err = nil
return
} else {
return
}
return l.ReadLogsTo(startLogID, w)
}
if lastHead == nil {
lastHead = head
batchNum++
} else if !lastHead.InSameBatch(head) {
lastHead = head
batchNum++
if batchNum > maxReplBatchNum || n > maxReplLogSize {
return
}
}
if err = head.Write(w); err != nil {
return
}
if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
return
}
n += (head.Len() + int(head.PayloadLen))
info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
}
return
}

View File

@ -6,7 +6,6 @@ import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store"
"os"
"path"
"testing"
)
@ -34,9 +33,7 @@ func TestReplication(t *testing.T) {
cfgM := new(config.Config)
cfgM.DataDir = "/tmp/test_repl/master"
cfgM.UseBinLog = true
cfgM.BinLog.MaxFileNum = 10
cfgM.BinLog.MaxFileSize = 50
cfgM.Replication.Use = true
os.RemoveAll(cfgM.DataDir)
@ -47,6 +44,7 @@ func TestReplication(t *testing.T) {
cfgS := new(config.Config)
cfgS.DataDir = "/tmp/test_repl/slave"
cfgS.Replication.Use = true
os.RemoveAll(cfgS.DataDir)
@ -60,16 +58,9 @@ func TestReplication(t *testing.T) {
db.Set([]byte("b"), []byte("value"))
db.Set([]byte("c"), []byte("value"))
if tx, err := db.Begin(); err == nil {
tx.HSet([]byte("a"), []byte("1"), []byte("value"))
tx.HSet([]byte("b"), []byte("2"), []byte("value"))
tx.HSet([]byte("c"), []byte("3"), []byte("value"))
tx.Commit()
} else {
db.HSet([]byte("a"), []byte("1"), []byte("value"))
db.HSet([]byte("b"), []byte("2"), []byte("value"))
db.HSet([]byte("c"), []byte("3"), []byte("value"))
}
m, _ := db.Multi()
m.Set([]byte("a1"), []byte("value"))
@ -77,19 +68,6 @@ func TestReplication(t *testing.T) {
m.Set([]byte("c1"), []byte("value"))
m.Close()
for _, name := range master.binlog.LogNames() {
p := path.Join(master.binlog.LogPath(), name)
err = slave.ReplicateFromBinLog(p)
if err != nil {
t.Fatal(err)
}
}
if err = checkLedisEqual(master, slave); err != nil {
t.Fatal(err)
}
slave.FlushAll()
db.Set([]byte("a1"), []byte("value"))
@ -100,37 +78,27 @@ func TestReplication(t *testing.T) {
db.HSet([]byte("b1"), []byte("2"), []byte("value"))
db.HSet([]byte("c1"), []byte("3"), []byte("value"))
if tx, err := db.Begin(); err == nil {
tx.HSet([]byte("a1"), []byte("1"), []byte("value1"))
tx.HSet([]byte("b1"), []byte("2"), []byte("value1"))
tx.HSet([]byte("c1"), []byte("3"), []byte("value1"))
tx.Rollback()
}
info := new(BinLogAnchor)
info.LogFileIndex = 1
info.LogPos = 0
var buf bytes.Buffer
var n int
var id uint64 = 1
var nid uint64
for {
buf.Reset()
n, err = master.ReadEventsTo(info, &buf)
n, id, err = master.ReadLogsTo(id, &buf)
if err != nil {
t.Fatal(err)
} else if info.LogFileIndex == -1 {
t.Fatal("invalid log file index -1")
} else if info.LogFileIndex == 0 {
t.Fatal("invalid log file index 0")
} else {
if err = slave.ReplicateFromReader(&buf); err != nil {
} else if n != 0 {
if nid, err = slave.StoreLogsFromReader(&buf); err != nil {
t.Fatal(err)
} else if nid != id {
t.Fatal(nid, id)
}
if n == 0 {
} else if n == 0 {
break
}
}
}
slave.WaitReplication()
if err = checkLedisEqual(master, slave); err != nil {
t.Fatal(err)

View File

@ -24,17 +24,17 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s
if err = checkKeySize(key); err != nil {
return nil, err
}
if minKey, err = db.encodeMetaKey(dataType, key); err != nil {
if minKey, err = db.encodeScanKey(dataType, key); err != nil {
return nil, err
}
} else {
if minKey, err = db.encodeMinKey(dataType); err != nil {
if minKey, err = db.encodeScanMinKey(dataType); err != nil {
return nil, err
}
}
if maxKey, err = db.encodeMaxKey(dataType); err != nil {
if maxKey, err = db.encodeScanMaxKey(dataType); err != nil {
return nil, err
}
@ -54,7 +54,7 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s
}
for i := 0; it.Valid() && i < count && bytes.Compare(it.RawKey(), maxKey) < 0; it.Next() {
if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil {
if k, err := db.decodeScanKey(dataType, it.Key()); err != nil {
continue
} else if r != nil && !r.Match(k) {
continue
@ -67,12 +67,12 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s
return v, nil
}
func (db *DB) encodeMinKey(dataType byte) ([]byte, error) {
return db.encodeMetaKey(dataType, nil)
func (db *DB) encodeScanMinKey(dataType byte) ([]byte, error) {
return db.encodeScanKey(dataType, nil)
}
func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) {
k, err := db.encodeMetaKey(dataType, nil)
func (db *DB) encodeScanMaxKey(dataType byte) ([]byte, error) {
k, err := db.encodeScanKey(dataType, nil)
if err != nil {
return nil, err
}
@ -80,7 +80,7 @@ func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) {
return k, nil
}
func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) {
func (db *DB) encodeScanKey(dataType byte, key []byte) ([]byte, error) {
switch dataType {
case KVType:
return db.encodeKVKey(key), nil
@ -98,7 +98,7 @@ func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) {
return nil, errDataType
}
}
func (db *DB) decodeMetaKey(dataType byte, ek []byte) ([]byte, error) {
func (db *DB) decodeScanKey(dataType byte, ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != dataType {
return nil, errMetaKey
}

View File

@ -183,8 +183,6 @@ func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) {
return 0, err
}
//todo add binlog
err = t.Commit()
return n, err
}

View File

@ -77,8 +77,6 @@ func (db *DB) incr(key []byte, delta int64) (int64, error) {
t.Put(key, StrPutInt64(n))
//todo binlog
err = t.Commit()
return n, err
}
@ -244,7 +242,6 @@ func (db *DB) MSet(args ...KVPair) error {
t.Put(key, value)
//todo binlog
}
err = t.Commit()
@ -297,8 +294,6 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
} else {
t.Put(key, value)
//todo binlog
err = t.Commit()
}

View File

@ -111,22 +111,6 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) {
}
}
func (db *DB) expFlush(t *batch, dataType byte) (err error) {
minKey := make([]byte, 3)
minKey[0] = db.index
minKey[1] = ExpTimeType
minKey[2] = dataType
maxKey := make([]byte, 3)
maxKey[0] = db.index
maxKey[1] = ExpMetaType
maxKey[2] = dataType + 1
_, err = db.flushRegion(t, minKey, maxKey)
err = t.Commit()
return
}
//////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////

View File

@ -305,7 +305,6 @@ func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) {
return 0, err
}
//todo add binlog
err := t.Commit()
return num, err
}
@ -862,7 +861,6 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg
sk := db.zEncodeSizeKey(destKey)
t.Put(sk, PutInt64(num))
//todo add binlog
if err := t.Commit(); err != nil {
return 0, err
}
@ -930,7 +928,7 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg
var num int64 = int64(len(destMap))
sk := db.zEncodeSizeKey(destKey)
t.Put(sk, PutInt64(num))
//todo add binlog
if err := t.Commit(); err != nil {
return 0, err
}

View File

@ -1,112 +0,0 @@
package ledis
import (
"errors"
"fmt"
"github.com/siddontang/ledisdb/store"
)
var (
ErrNestTx = errors.New("nest transaction not supported")
ErrTxDone = errors.New("Transaction has already been committed or rolled back")
)
type Tx struct {
*DB
tx *store.Tx
logs [][]byte
}
func (db *DB) IsTransaction() bool {
return db.status == DBInTransaction
}
// Begin a transaction, it will block all other write operations before calling Commit or Rollback.
// You must be very careful to prevent long-time transaction.
func (db *DB) Begin() (*Tx, error) {
if db.IsTransaction() {
return nil, ErrNestTx
}
tx := new(Tx)
tx.DB = new(DB)
tx.DB.l = db.l
tx.l.wLock.Lock()
tx.DB.sdb = db.sdb
var err error
tx.tx, err = db.sdb.Begin()
if err != nil {
tx.l.wLock.Unlock()
return nil, err
}
tx.DB.bucket = tx.tx
tx.DB.status = DBInTransaction
tx.DB.index = db.index
tx.DB.kvBatch = tx.newBatch()
tx.DB.listBatch = tx.newBatch()
tx.DB.hashBatch = tx.newBatch()
tx.DB.zsetBatch = tx.newBatch()
tx.DB.binBatch = tx.newBatch()
tx.DB.setBatch = tx.newBatch()
return tx, nil
}
func (tx *Tx) Commit() error {
if tx.tx == nil {
return ErrTxDone
}
tx.l.commitLock.Lock()
err := tx.tx.Commit()
tx.tx = nil
if len(tx.logs) > 0 {
tx.l.binlog.Log(tx.logs...)
}
tx.l.commitLock.Unlock()
tx.l.wLock.Unlock()
tx.DB.bucket = nil
return err
}
func (tx *Tx) Rollback() error {
if tx.tx == nil {
return ErrTxDone
}
err := tx.tx.Rollback()
tx.tx = nil
tx.l.wLock.Unlock()
tx.DB.bucket = nil
return err
}
func (tx *Tx) newBatch() *batch {
return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
}
func (tx *Tx) Select(index int) error {
if index < 0 || index >= int(MaxDBNumber) {
return fmt.Errorf("invalid db index %d", index)
}
tx.DB.index = uint8(index)
return nil
}

View File

@ -1,219 +0,0 @@
package ledis
import (
"github.com/siddontang/ledisdb/config"
"os"
"testing"
)
func testTxRollback(t *testing.T, db *DB) {
var err error
key1 := []byte("tx_key1")
key2 := []byte("tx_key2")
field2 := []byte("tx_field2")
err = db.Set(key1, []byte("value"))
if err != nil {
t.Fatal(err)
}
_, err = db.HSet(key2, field2, []byte("value"))
if err != nil {
t.Fatal(err)
}
var tx *Tx
tx, err = db.Begin()
if err != nil {
t.Fatal(err)
}
defer tx.Rollback()
err = tx.Set(key1, []byte("1"))
if err != nil {
t.Fatal(err)
}
_, err = tx.HSet(key2, field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
_, err = tx.HSet([]byte("no_key"), field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
if v, err := tx.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
if v, err := tx.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
err = tx.Rollback()
if err != nil {
t.Fatal(err)
}
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "value" {
t.Fatal(string(v))
}
if v, err := db.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "value" {
t.Fatal(string(v))
}
}
func testTxCommit(t *testing.T, db *DB) {
var err error
key1 := []byte("tx_key1")
key2 := []byte("tx_key2")
field2 := []byte("tx_field2")
err = db.Set(key1, []byte("value"))
if err != nil {
t.Fatal(err)
}
_, err = db.HSet(key2, field2, []byte("value"))
if err != nil {
t.Fatal(err)
}
var tx *Tx
tx, err = db.Begin()
if err != nil {
t.Fatal(err)
}
defer tx.Rollback()
err = tx.Set(key1, []byte("1"))
if err != nil {
t.Fatal(err)
}
_, err = tx.HSet(key2, field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
if v, err := tx.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
if v, err := tx.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
err = tx.Commit()
if err != nil {
t.Fatal(err)
}
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
if v, err := db.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
}
func testTxSelect(t *testing.T, db *DB) {
tx, err := db.Begin()
if err != nil {
t.Fatal(err)
}
defer tx.Rollback()
tx.Set([]byte("tx_select_1"), []byte("a"))
tx.Select(1)
tx.Set([]byte("tx_select_2"), []byte("b"))
if err = tx.Commit(); err != nil {
t.Fatal(err)
}
if v, err := db.Get([]byte("tx_select_1")); err != nil {
t.Fatal(err)
} else if string(v) != "a" {
t.Fatal(string(v))
}
if v, err := db.Get([]byte("tx_select_2")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
db, _ = db.l.Select(1)
if v, err := db.Get([]byte("tx_select_2")); err != nil {
t.Fatal(err)
} else if string(v) != "b" {
t.Fatal(string(v))
}
if v, err := db.Get([]byte("tx_select_1")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
}
func testTx(t *testing.T, name string) {
cfg := new(config.Config)
cfg.DataDir = "/tmp/ledis_test_tx"
cfg.DBName = name
cfg.LMDB.MapSize = 10 * 1024 * 1024
os.RemoveAll(cfg.DataDir)
l, err := Open(cfg)
if err != nil {
t.Fatal(err)
}
defer l.Close()
db, _ := l.Select(0)
testTxRollback(t, db)
testTxCommit(t, db)
testTxSelect(t, db)
}
//only lmdb, boltdb support Transaction
func TestTx(t *testing.T) {
testTx(t, "lmdb")
testTx(t, "boltdb")
}

View File

@ -43,6 +43,18 @@ func Int64(v []byte, err error) (int64, error) {
return int64(binary.LittleEndian.Uint64(v)), nil
}
func Uint64(v []byte, err error) (uint64, error) {
if err != nil {
return 0, err
} else if v == nil || len(v) == 0 {
return 0, nil
} else if len(v) != 8 {
return 0, errIntNumber
}
return binary.LittleEndian.Uint64(v), nil
}
func PutInt64(v int64) []byte {
var b []byte
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))

242
rpl/file_store.go Normal file
View File

@ -0,0 +1,242 @@
package rpl
import (
"fmt"
"github.com/siddontang/go-log/log"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sync"
)
const (
defaultMaxLogFileSize = 1024 * 1024 * 1024
)
/*
index file format:
ledis-bin.00001
ledis-bin.00002
ledis-bin.00003
*/
type FileStore struct {
LogStore
m sync.Mutex
maxFileSize int
first uint64
last uint64
logFile *os.File
logNames []string
nextLogIndex int64
indexName string
path string
}
func NewFileStore(path string) (*FileStore, error) {
s := new(FileStore)
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
s.path = path
s.maxFileSize = defaultMaxLogFileSize
s.first = 0
s.last = 0
s.logNames = make([]string, 0, 16)
if err := s.loadIndex(); err != nil {
return nil, err
}
return s, nil
}
func (s *FileStore) SetMaxFileSize(size int) {
s.maxFileSize = size
}
func (s *FileStore) GetLog(id uint64, log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) SeekLog(id uint64, log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) FirstID() (uint64, error) {
panic("not implementation")
return 0, nil
}
func (s *FileStore) LastID() (uint64, error) {
panic("not implementation")
return 0, nil
}
func (s *FileStore) StoreLog(log *Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) StoreLogs(logs []*Log) error {
panic("not implementation")
return nil
}
func (s *FileStore) Purge(n uint64) error {
panic("not implementation")
return nil
}
func (s *FileStore) PuregeExpired(n int64) error {
panic("not implementation")
return nil
}
func (s *FileStore) Clear() error {
panic("not implementation")
return nil
}
func (s *FileStore) Close() error {
panic("not implementation")
return nil
}
func (s *FileStore) flushIndex() error {
data := strings.Join(s.logNames, "\n")
bakName := fmt.Sprintf("%s.bak", s.indexName)
f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
log.Error("create bak index error %s", err.Error())
return err
}
if _, err := f.WriteString(data); err != nil {
log.Error("write index error %s", err.Error())
f.Close()
return err
}
f.Close()
if err := os.Rename(bakName, s.indexName); err != nil {
log.Error("rename bak index error %s", err.Error())
return err
}
return nil
}
func (s *FileStore) fileExists(name string) bool {
p := path.Join(s.path, name)
_, err := os.Stat(p)
return !os.IsNotExist(err)
}
func (s *FileStore) loadIndex() error {
s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index"))
if _, err := os.Stat(s.indexName); os.IsNotExist(err) {
//no index file, nothing to do
} else {
indexData, err := ioutil.ReadFile(s.indexName)
if err != nil {
return err
}
lines := strings.Split(string(indexData), "\n")
for _, line := range lines {
line = strings.Trim(line, "\r\n ")
if len(line) == 0 {
continue
}
if s.fileExists(line) {
s.logNames = append(s.logNames, line)
} else {
log.Info("log %s has not exists", line)
}
}
}
var err error
if len(s.logNames) == 0 {
s.nextLogIndex = 1
} else {
lastName := s.logNames[len(s.logNames)-1]
if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil {
log.Error("invalid logfile name %s", err.Error())
return err
}
//like mysql, if server restart, a new log will create
s.nextLogIndex++
}
return nil
}
func (s *FileStore) openNewLogFile() error {
var err error
lastName := s.formatLogFileName(s.nextLogIndex)
logPath := path.Join(s.path, lastName)
if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil {
log.Error("open new logfile error %s", err.Error())
return err
}
s.logNames = append(s.logNames, lastName)
if err = s.flushIndex(); err != nil {
return err
}
return nil
}
func (s *FileStore) checkLogFileSize() bool {
if s.logFile == nil {
return false
}
st, _ := s.logFile.Stat()
if st.Size() >= int64(s.maxFileSize) {
s.closeLog()
return true
}
return false
}
func (s *FileStore) closeLog() {
if s.logFile == nil {
return
}
s.nextLogIndex++
s.logFile.Close()
s.logFile = nil
}
func (s *FileStore) formatLogFileName(index int64) string {
return fmt.Sprintf("ledis-bin.%07d", index)
}

276
rpl/goleveldb_store.go Normal file
View File

@ -0,0 +1,276 @@
package rpl
import (
"bytes"
"fmt"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store"
"os"
"sync"
"time"
)
type GoLevelDBStore struct {
LogStore
m sync.Mutex
db *store.DB
cfg *config.Config
first uint64
last uint64
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
s.m.Lock()
id, err := s.firstID()
s.m.Unlock()
return id, err
}
func (s *GoLevelDBStore) LastID() (uint64, error) {
s.m.Lock()
id, err := s.lastID()
s.m.Unlock()
return id, err
}
func (s *GoLevelDBStore) firstID() (uint64, error) {
if s.first != InvalidLogID {
return s.first, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToFirst()
if it.Valid() {
s.first = num.BytesToUint64(it.RawKey())
}
return s.first, nil
}
func (s *GoLevelDBStore) lastID() (uint64, error) {
if s.last != InvalidLogID {
return s.last, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToLast()
if it.Valid() {
s.last = num.BytesToUint64(it.RawKey())
}
return s.last, nil
}
func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
v, err := s.db.Get(num.Uint64ToBytes(id))
if err != nil {
return err
} else if v == nil {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(v))
}
}
func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error {
it := s.db.NewIterator()
defer it.Close()
it.Seek(num.Uint64ToBytes(id))
if !it.Valid() {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(it.RawValue()))
}
}
func (s *GoLevelDBStore) StoreLog(log *Log) error {
return s.StoreLogs([]*Log{log})
}
func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
s.m.Lock()
defer s.m.Unlock()
w := s.db.NewWriteBatch()
defer w.Rollback()
last, err := s.lastID()
if err != nil {
return err
}
s.last = InvalidLogID
var buf bytes.Buffer
for _, log := range logs {
buf.Reset()
if log.ID <= last {
return ErrLessLogID
}
last = log.ID
key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&buf); err != nil {
return err
}
w.Put(key, buf.Bytes())
}
if err := w.Commit(); err != nil {
return err
}
s.last = last
return nil
}
func (s *GoLevelDBStore) Purge(n uint64) error {
s.m.Lock()
defer s.m.Unlock()
var first, last uint64
var err error
first, err = s.firstID()
if err != nil {
return err
}
last, err = s.lastID()
if err != nil {
return err
}
start := first
stop := num.MinUint64(last, first+n)
w := s.db.NewWriteBatch()
defer w.Rollback()
s.reset()
for i := start; i < stop; i++ {
w.Delete(num.Uint64ToBytes(i))
}
if err = w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) PurgeExpired(n int64) error {
if n <= 0 {
return fmt.Errorf("invalid expired time %d", n)
}
t := uint32(time.Now().Unix() - int64(n))
s.m.Lock()
defer s.m.Unlock()
s.reset()
it := s.db.NewIterator()
it.SeekToFirst()
w := s.db.NewWriteBatch()
defer w.Rollback()
l := new(Log)
for ; it.Valid(); it.Next() {
v := it.RawValue()
if err := l.Unmarshal(v); err != nil {
return err
} else if l.CreateTime > t {
break
} else {
w.Delete(it.RawKey())
}
}
if err := w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) Clear() error {
s.m.Lock()
defer s.m.Unlock()
if s.db != nil {
s.db.Close()
}
s.reset()
os.RemoveAll(s.cfg.DBPath)
return s.open()
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Close() error {
s.m.Lock()
defer s.m.Unlock()
if s.db == nil {
return nil
}
err := s.db.Close()
s.db = nil
return err
}
func (s *GoLevelDBStore) open() error {
var err error
s.first = InvalidLogID
s.last = InvalidLogID
s.db, err = store.Open(s.cfg)
return err
}
func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
cfg := new(config.Config)
cfg.DBName = "goleveldb"
cfg.DBPath = base
cfg.LevelDB.BlockSize = 4 * 1024 * 1024
cfg.LevelDB.CacheSize = 16 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024
cfg.LevelDB.Compression = false
s := new(GoLevelDBStore)
s.cfg = cfg
if err := s.open(); err != nil {
return nil, err
}
return s, nil
}

99
rpl/log.go Normal file
View File

@ -0,0 +1,99 @@
package rpl
import (
"bytes"
"encoding/binary"
"io"
"time"
)
type Log struct {
ID uint64
CreateTime uint32
Data []byte
}
func NewLog(id uint64, data []byte) *Log {
l := new(Log)
l.ID = id
l.CreateTime = uint32(time.Now().Unix())
l.Data = data
return l
}
func (l *Log) HeadSize() int {
return 16
}
func (l *Log) Size() int {
return l.HeadSize() + len(l.Data)
}
func (l *Log) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, l.HeadSize()+len(l.Data)))
buf.Reset()
if err := l.Encode(buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (l *Log) Unmarshal(b []byte) error {
buf := bytes.NewBuffer(b)
return l.Decode(buf)
}
func (l *Log) Encode(w io.Writer) error {
buf := make([]byte, l.HeadSize())
pos := 0
binary.BigEndian.PutUint64(buf[pos:], l.ID)
pos += 8
binary.BigEndian.PutUint32(buf[pos:], l.CreateTime)
pos += 4
binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data)))
if n, err := w.Write(buf); err != nil {
return err
} else if n != len(buf) {
return io.ErrShortWrite
}
if n, err := w.Write(l.Data); err != nil {
return err
} else if n != len(l.Data) {
return io.ErrShortWrite
}
return nil
}
func (l *Log) Decode(r io.Reader) error {
buf := make([]byte, l.HeadSize())
if _, err := io.ReadFull(r, buf); err != nil {
return err
}
pos := 0
l.ID = binary.BigEndian.Uint64(buf[pos:])
pos += 8
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
pos += 4
length := binary.BigEndian.Uint32(buf[pos:])
l.Data = make([]byte, length)
if _, err := io.ReadFull(r, l.Data); err != nil {
return err
}
return nil
}

39
rpl/log_test.go Normal file
View File

@ -0,0 +1,39 @@
package rpl
import (
"bytes"
"reflect"
"testing"
)
func TestLog(t *testing.T) {
l1 := &Log{ID: 1, CreateTime: 100, Data: []byte("hello world")}
var buf bytes.Buffer
if err := l1.Encode(&buf); err != nil {
t.Fatal(err)
}
l2 := &Log{}
if err := l2.Decode(&buf); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(l1, l2) {
t.Fatal("must equal")
}
if buf, err := l1.Marshal(); err != nil {
t.Fatal(err)
} else {
if err = l2.Unmarshal(buf); err != nil {
t.Fatal(err)
}
}
if !reflect.DeepEqual(l1, l2) {
t.Fatal("must equal")
}
}

226
rpl/rpl.go Normal file
View File

@ -0,0 +1,226 @@
package rpl
import (
"encoding/binary"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/config"
"os"
"path"
"sync"
"time"
)
type Replication struct {
m sync.Mutex
cfg *config.Config
s LogStore
commitID uint64
commitLog *os.File
quit chan struct{}
wg sync.WaitGroup
}
func NewReplication(cfg *config.Config) (*Replication, error) {
if !cfg.Replication.Use {
return nil, fmt.Errorf("replication not enalbed")
}
if len(cfg.Replication.Path) == 0 {
cfg.Replication.Path = path.Join(cfg.DataDir, "rpl")
}
base := cfg.Replication.Path
r := new(Replication)
r.quit = make(chan struct{})
r.cfg = cfg
var err error
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal")); err != nil {
return nil, err
}
if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, err
}
if s, _ := r.commitLog.Stat(); s.Size() == 0 {
r.commitID = 0
} else if err = binary.Read(r.commitLog, binary.BigEndian, &r.commitID); err != nil {
return nil, err
}
go r.onPurgeExpired()
return r, nil
}
func (r *Replication) Close() error {
close(r.quit)
r.wg.Wait()
if r.s != nil {
r.s.Close()
r.s = nil
}
if r.commitLog != nil {
r.commitLog.Close()
r.commitLog = nil
}
return nil
}
func (r *Replication) Log(data []byte) (*Log, error) {
r.m.Lock()
defer r.m.Unlock()
lastID, err := r.s.LastID()
if err != nil {
return nil, err
}
commitId := r.commitID
if lastID < commitId {
lastID = commitId
}
l := new(Log)
l.ID = lastID + 1
l.CreateTime = uint32(time.Now().Unix())
l.Data = data
if err = r.s.StoreLog(l); err != nil {
return nil, err
}
return l, nil
}
func (r *Replication) StoreLog(log *Log) error {
return r.StoreLogs([]*Log{log})
}
func (r *Replication) StoreLogs(logs []*Log) error {
r.m.Lock()
defer r.m.Unlock()
return r.s.StoreLogs(logs)
}
func (r *Replication) FirstLogID() (uint64, error) {
r.m.Lock()
defer r.m.Unlock()
id, err := r.s.FirstID()
return id, err
}
func (r *Replication) LastLogID() (uint64, error) {
r.m.Lock()
defer r.m.Unlock()
id, err := r.s.LastID()
return id, err
}
func (r *Replication) NextSyncID() (uint64, error) {
r.m.Lock()
defer r.m.Unlock()
lastId, err := r.s.LastID()
if err != nil {
return 0, err
}
if lastId > r.commitID {
return lastId + 1, nil
} else {
return r.commitID + 1, nil
}
}
func (r *Replication) LastCommitID() (uint64, error) {
r.m.Lock()
id := r.commitID
r.m.Unlock()
return id, nil
}
func (r *Replication) UpdateCommitID(id uint64) error {
r.m.Lock()
defer r.m.Unlock()
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err
}
if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil {
return err
}
r.commitID = id
return nil
}
func (r *Replication) CommitIDBehind() (bool, error) {
r.m.Lock()
defer r.m.Unlock()
id, err := r.s.LastID()
if err != nil {
return false, err
}
return id > r.commitID, nil
}
func (r *Replication) GetLog(id uint64, log *Log) error {
return r.s.GetLog(id, log)
}
func (r *Replication) NextCommitLog(log *Log) error {
r.m.Lock()
defer r.m.Unlock()
id, err := r.s.LastID()
if err != nil {
return err
}
if id <= r.commitID {
return ErrNoBehindLog
}
return r.s.GetLog(r.commitID+1, log)
}
func (r *Replication) onPurgeExpired() {
r.wg.Add(1)
defer r.wg.Done()
for {
select {
case <-time.After(1 * time.Hour):
n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600)
r.m.Lock()
if err := r.s.PurgeExpired(int64(n)); err != nil {
log.Error("purge expired log error %s", err.Error())
}
r.m.Unlock()
case <-r.quit:
return
}
}
}

45
rpl/rpl_test.go Normal file
View File

@ -0,0 +1,45 @@
package rpl
import (
"github.com/siddontang/ledisdb/config"
"io/ioutil"
"os"
"testing"
)
func TestReplication(t *testing.T) {
dir, err := ioutil.TempDir("", "rpl")
if err != nil {
t.Fatalf("err: %v ", err)
}
defer os.RemoveAll(dir)
c := new(config.Config)
c.Replication.Use = true
c.Replication.Path = dir
r, err := NewReplication(c)
if err != nil {
t.Fatal(err)
}
if l1, err := r.Log([]byte("hello world")); err != nil {
t.Fatal(err)
} else if l1.ID != 1 {
t.Fatal(l1.ID)
}
if b, _ := r.CommitIDBehind(); !b {
t.Fatal("must backward")
}
if err := r.UpdateCommitID(1); err != nil {
t.Fatal(err)
}
if b, _ := r.CommitIDBehind(); b {
t.Fatal("must not backward")
}
r.Close()
}

40
rpl/store.go Normal file
View File

@ -0,0 +1,40 @@
package rpl
import (
"errors"
)
const (
InvalidLogID uint64 = 0
)
var (
ErrLogNotFound = errors.New("log not found")
ErrLessLogID = errors.New("log id is less")
ErrNoBehindLog = errors.New("no behind commit log")
)
type LogStore interface {
GetLog(id uint64, log *Log) error
// Get the first log which ID is equal or larger than id
SeekLog(id uint64, log *Log) error
FirstID() (uint64, error)
LastID() (uint64, error)
// if log id is less than current last id, return error
StoreLog(log *Log) error
StoreLogs(logs []*Log) error
// Delete first n logs
Purge(n uint64) error
// Delete logs before n seconds
PurgeExpired(n int64) error
// Clear all logs
Clear() error
Close() error
}

189
rpl/store_test.go Normal file
View File

@ -0,0 +1,189 @@
package rpl
import (
"io/ioutil"
"os"
"testing"
"time"
)
func TestGoLevelDBStore(t *testing.T) {
// Create a test dir
dir, err := ioutil.TempDir("", "wal")
if err != nil {
t.Fatalf("err: %v ", err)
}
defer os.RemoveAll(dir)
// New level
l, err := NewGoLevelDBStore(dir)
if err != nil {
t.Fatalf("err: %v ", err)
}
defer l.Close()
testLogs(t, l)
}
func testLogs(t *testing.T, l LogStore) {
// Should be no first index
idx, err := l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Should be no last index
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Try a filed fetch
var out Log
if err := l.GetLog(10, &out); err.Error() != "log not found" {
t.Fatalf("err: %v ", err)
}
// Write out a log
log := Log{
ID: 1,
Data: []byte("first"),
}
for i := 1; i <= 10; i++ {
log.ID = uint64(i)
if err := l.StoreLog(&log); err != nil {
t.Fatalf("err: %v", err)
}
}
// Attempt to write multiple logs
var logs []*Log
for i := 11; i <= 20; i++ {
nl := &Log{
ID: uint64(i),
Data: []byte("first"),
}
logs = append(logs, nl)
}
if err := l.StoreLogs(logs); err != nil {
t.Fatalf("err: %v", err)
}
// Try to fetch
if err := l.GetLog(10, &out); err != nil {
t.Fatalf("err: %v ", err)
}
// Try to fetch
if err := l.GetLog(20, &out); err != nil {
t.Fatalf("err: %v ", err)
}
// Check the lowest index
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 1 {
t.Fatalf("bad idx: %d", idx)
}
// Check the highest index
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 20 {
t.Fatalf("bad idx: %d", idx)
}
// Delete a suffix
if err := l.Purge(5); err != nil {
t.Fatalf("err: %v ", err)
}
// Verify they are all deleted
for i := 1; i <= 5; i++ {
if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
}
// Index should be one
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 6 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 20 {
t.Fatalf("bad idx: %d", idx)
}
// Should not be able to fetch
if err := l.GetLog(5, &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
if err := l.Clear(); err != nil {
t.Fatal(err)
}
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
now := uint32(time.Now().Unix())
logs = []*Log{}
for i := 1; i <= 20; i++ {
nl := &Log{
ID: uint64(i),
CreateTime: now - 20,
Data: []byte("first"),
}
logs = append(logs, nl)
}
if err := l.PurgeExpired(1); err != nil {
t.Fatal(err)
}
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
}

View File

@ -8,17 +8,6 @@ import (
"time"
)
var txUnsupportedCmds = map[string]struct{}{
"select": struct{}{},
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"begin": struct{}{},
"flushall": struct{}{},
"flushdb": struct{}{},
"eval": struct{}{},
}
var scriptUnsupportedCmds = map[string]struct{}{
"slaveof": struct{}{},
"fullsync": struct{}{},
@ -62,7 +51,6 @@ type client struct {
buf bytes.Buffer
tx *ledis.Tx
script *ledis.Multi
}
@ -89,11 +77,7 @@ func (c *client) perform() {
} else if exeCmd, ok := regCmds[c.cmd]; !ok {
err = ErrNotFound
} else {
if c.db.IsTransaction() {
if _, ok := txUnsupportedCmds[c.cmd]; ok {
err = fmt.Errorf("%s not supported in transaction", c.cmd)
}
} else if c.db.IsInMulti() {
if c.db.IsInMulti() {
if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
err = fmt.Errorf("%s not supported in multi", c.cmd)
}

View File

@ -23,9 +23,6 @@ var httpUnsupportedCommands = map[string]struct{}{
"fullsync": struct{}{},
"sync": struct{}{},
"quit": struct{}{},
"begin": struct{}{},
"commit": struct{}{},
"rollback": struct{}{},
}
type httpClient struct {

View File

@ -70,14 +70,14 @@ var reserveInfoSpace = make([]byte, 16)
func syncCommand(c *client) error {
args := c.args
if len(args) != 2 {
if len(args) != 1 {
return ErrCmdParams
}
var logIndex int64
var logPos int64
var err error
logIndex, err = ledis.StrInt64(args[0], nil)
logIndex, err = ledis.Str(args[0], nil)
if err != nil {
return ErrCmdParams
}

View File

@ -1,57 +0,0 @@
package server
import (
"errors"
)
var errTxMiss = errors.New("transaction miss")
func beginCommand(c *client) error {
tx, err := c.db.Begin()
if err == nil {
c.tx = tx
c.db = tx.DB
c.resp.writeStatus(OK)
}
return err
}
func commitCommand(c *client) error {
if c.tx == nil {
return errTxMiss
}
err := c.tx.Commit()
c.db, _ = c.ldb.Select(c.tx.Index())
c.tx = nil
if err == nil {
c.resp.writeStatus(OK)
}
return err
}
func rollbackCommand(c *client) error {
if c.tx == nil {
return errTxMiss
}
err := c.tx.Rollback()
c.db, _ = c.ldb.Select(c.tx.Index())
c.tx = nil
if err == nil {
c.resp.writeStatus(OK)
}
return err
}
func init() {
register("begin", beginCommand)
register("commit", commitCommand)
register("rollback", rollbackCommand)
}

View File

@ -41,13 +41,7 @@ func selectCommand(c *client) error {
if index, err := strconv.Atoi(ledis.String(c.args[0])); err != nil {
return err
} else {
if c.db.IsTransaction() {
if err := c.tx.Select(index); err != nil {
return err
} else {
c.db = c.tx.DB
}
} else if c.db.IsInMulti() {
if c.db.IsInMulti() {
if err := c.script.Select(index); err != nil {
return err
} else {