Merge branch 'develop'

This commit is contained in:
siddontang 2014-10-13 14:40:11 +08:00
commit 0b1157260e
46 changed files with 697 additions and 135 deletions

View File

@ -1,7 +1,5 @@
INSTALL_PATH ?= $(CURDIR)
$(shell ./bootstrap.sh >> /dev/null 2>&1)
$(shell ./tools/build_config.sh build_config.mk $INSTALL_PATH)
include build_config.mk

View File

@ -29,6 +29,9 @@ Create a workspace and checkout ledisdb source
cd src/github.com/siddontang/ledisdb
#install go dependences
./bootstrap.sh
#set build and run environment
source dev.sh

View File

@ -1,4 +1,4 @@
//This file was generated by .tools/generate_commands.py on Wed Oct 08 2014 16:36:20 +0800
//This file was generated by .tools/generate_commands.py on Fri Oct 10 2014 09:08:54 +0800
package main
var helpCommands = [][]string{
@ -87,7 +87,7 @@ var helpCommands = [][]string{
{"SINTER", "key [key ...]", "Set"},
{"SINTERSTORE", "destination key [key ...]", "Set"},
{"SISMEMBER", "key member", "Set"},
{"SLAVEOF", "host port [restart]", "Replication"},
{"SLAVEOF", "host port [RESTART] [READONLY]", "Replication"},
{"SMCLEAR", "key [key ...]", "Set"},
{"SMEMBERS", "key", "Set"},
{"SPERSIST", "key", "Set"},

View File

@ -14,7 +14,7 @@ var port = flag.Int("port", 6380, "ledis server port")
var sock = flag.String("sock", "", "ledis unix socket domain")
var dumpFile = flag.String("o", "./ledis.dump", "dump file to save")
var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync
func main() {
flag.Parse()

View File

@ -18,6 +18,9 @@ var configFile = flag.String("config", "", "ledisdb config file")
var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name")
var usePprof = flag.Bool("pprof", false, "enable pprof")
var pprofPort = flag.Int("pprof_port", 6060, "pprof http port")
var slaveof = flag.String("slaveof", "", "make the server a slave of another instance")
var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly")
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
@ -43,6 +46,15 @@ func main() {
cfg.DBName = *dbName
}
if len(*slaveof) > 0 {
cfg.SlaveOf = *slaveof
cfg.Readonly = true
cfg.UseReplication = true
} else {
cfg.Readonly = *readonly
cfg.UseReplication = *rpl
}
var app *server.App
app, err = server.NewApp(cfg)
if err != nil {

View File

@ -15,7 +15,6 @@ var (
const (
DefaultAddr string = "127.0.0.1:6380"
DefaultHttpAddr string = "127.0.0.1:11181"
DefaultDBName string = "goleveldb"
@ -37,13 +36,19 @@ type LMDBConfig struct {
type ReplicationConfig struct {
Path string `toml:"path"`
ExpiredLogDays int `toml:"expired_log_days"`
Sync bool `toml:"sync"`
WaitSyncTime int `toml:"wait_sync_time"`
WaitMaxSlaveAcks int `toml:"wait_max_slave_acks"`
ExpiredLogDays int `toml:"expired_log_days"`
SyncLog int `toml:"sync_log"`
Compression bool `toml:"compression"`
}
type SnapshotConfig struct {
Path string `toml:"path"`
MaxNum int `toml:"max_num"`
}
type Config struct {
FileName string `toml:"-"`
@ -53,6 +58,8 @@ type Config struct {
SlaveOf string `toml:"slaveof"`
Readonly bool `toml:readonly`
DataDir string `toml:"data_dir"`
DBName string `toml:"db_name"`
@ -67,6 +74,8 @@ type Config struct {
UseReplication bool `toml:"use_replication"`
Replication ReplicationConfig `toml:"replication"`
Snapshot SnapshotConfig `toml:"snapshot"`
}
func NewConfigWithFile(fileName string) (*Config, error) {
@ -91,6 +100,8 @@ func NewConfigWithData(data []byte) (*Config, error) {
return nil, err
}
cfg.adjust()
return cfg, nil
}
@ -98,13 +109,14 @@ func NewConfigDefault() *Config {
cfg := new(Config)
cfg.Addr = DefaultAddr
cfg.HttpAddr = DefaultHttpAddr
cfg.HttpAddr = ""
cfg.DataDir = DefaultDataDir
cfg.DBName = DefaultDBName
cfg.SlaveOf = ""
cfg.Readonly = false
// disable access log
cfg.AccessLog = ""
@ -112,28 +124,37 @@ func NewConfigDefault() *Config {
cfg.LMDB.MapSize = 20 * 1024 * 1024
cfg.LMDB.NoSync = true
cfg.Replication.WaitSyncTime = 1
cfg.UseReplication = false
cfg.Replication.WaitSyncTime = 500
cfg.Replication.Compression = true
cfg.Replication.WaitMaxSlaveAcks = 2
cfg.Replication.SyncLog = 0
cfg.Snapshot.MaxNum = 1
cfg.adjust()
return cfg
}
func (cfg *LevelDBConfig) Adjust() {
if cfg.CacheSize <= 0 {
cfg.CacheSize = 4 * 1024 * 1024
func (cfg *Config) adjust() {
if cfg.LevelDB.CacheSize <= 0 {
cfg.LevelDB.CacheSize = 4 * 1024 * 1024
}
if cfg.BlockSize <= 0 {
cfg.BlockSize = 4 * 1024
if cfg.LevelDB.BlockSize <= 0 {
cfg.LevelDB.BlockSize = 4 * 1024
}
if cfg.WriteBufferSize <= 0 {
cfg.WriteBufferSize = 4 * 1024 * 1024
if cfg.LevelDB.WriteBufferSize <= 0 {
cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024
}
if cfg.MaxOpenFiles < 1024 {
cfg.MaxOpenFiles = 1024
if cfg.LevelDB.MaxOpenFiles < 1024 {
cfg.LevelDB.MaxOpenFiles = 1024
}
if cfg.Replication.ExpiredLogDays <= 0 {
cfg.Replication.ExpiredLogDays = 7
}
}

View File

@ -16,6 +16,10 @@ access_log = ""
# Any write operations except flushall and replication will be disabled in slave mode.
slaveof = ""
# Readonly mode, slave server is always readonly even readonly = false
# for readonly mode, only replication and flushall can write
readonly = false
# Choose which backend storage to use, now support:
#
# leveldb
@ -50,20 +54,35 @@ nosync = true
# if not set, use data_dir/rpl
path = ""
# Expire write ahead logs after the given days
expired_log_days = 7
# If sync is true, the new log must be sent to some slaves, and then commit.
# It will reduce performance but have better high availability.
sync = true
# If sync is true, wait at last wait_sync_time seconds for slave syncing this log
wait_sync_time = 1
# If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log
wait_sync_time = 500
# If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok.
# n is slave number
# If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2
# Expire write ahead logs after the given days
expired_log_days = 7
# Sync log to disk if possible
# 0: no sync
# 1: sync every second
# 2: sync every commit
sync_log = 0
# Compress the log or not
compression = true
[snapshot]
# Path to store snapshot dump file
# if not set, use data_dir/snapshot
# snapshot file name format is dmp-2006-01-02T15:04:05.999999999
path = ""
# Reserve newest max_num snapshot dump files
max_num = 1

View File

@ -90,7 +90,7 @@
"readonly": false
},
"FULLSYNC": {
"arguments": "-",
"arguments": "[NEW]",
"group": "Replication",
"readonly": false
@ -301,7 +301,7 @@
"readonly": false
},
"SLAVEOF": {
"arguments": "host port [restart]",
"arguments": "host port [RESTART] [READONLY]",
"group": "Replication",
"readonly": false
},

View File

@ -122,8 +122,8 @@ Table of Contents
- [BPERSIST key](#bpersist-key)
- [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count)
- [Replication](#replication)
- [SLAVEOF host port [restart]](#slaveof-host-port-restart)
- [FULLSYNC](#fullsync)
- [SLAVEOF host port [RESTART] [READONLY]](#slaveof-host-port-restart-readonly)
- [FULLSYNC [NEW]](#fullsync-new)
- [SYNC logid](#sync-logid)
- [Server](#server)
- [PING](#ping)
@ -2466,21 +2466,25 @@ See [XSCAN](#xscan-key-match-match-count-count) for more information.
## Replication
### SLAVEOF host port [restart]
### SLAVEOF host port [RESTART] [READONLY]
Changes the replication settings of a slave on the fly. If the server is already acting as slave, SLAVEOF NO ONE will turn off the replication.
Changes the replication settings of a slave on the fly. If the server is already acting as slave, `SLAVEOF NO ONE` will turn off the replication and turn the server into master. `SLAVEOF NO ONE READONLY` will turn the server into master with readonly mode.
SLAVEOF host port will make the server a slave of another server listening at the specified host and port.
If the server is already master, `SLAVEOF NO ONE READONLY` will force the server to readonly mode, and `SLAVEOF NO ONE` will disable readonly.
If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, if restart is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1.
`SLAVEOF host port` will make the server a slave of another server listening at the specified host and port.
If a server is already a slave of a master, `SLAVEOF host port` will stop the replication against the old and start the synchronization against the new one, if RESTART is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1.
### FULLSYNC
### FULLSYNC [NEW]
Inner command, starts a fullsync from the master set by SLAVEOF.
FULLSYNC will first try to sync all data from the master, save in local disk, then discard old dataset and load new one.
`FULLSYNC NEW` will generate a new snapshot and sync, otherwise it will use the latest existing snapshot if possible.
**Return value**
**Examples**

View File

@ -16,6 +16,10 @@ access_log = ""
# Any write operations except flushall and replication will be disabled in slave mode.
slaveof = ""
# Readonly mode, slave server is always readonly even readonly = false
# for readonly mode, only replication and flushall can write
readonly = false
# Choose which backend storage to use, now support:
#
# leveldb
@ -50,20 +54,35 @@ nosync = true
# if not set, use data_dir/rpl
path = ""
# Expire write ahead logs after the given days
expired_log_days = 7
# If sync is true, the new log must be sent to some slaves, and then commit.
# It will reduce performance but have better high availability.
sync = true
# If sync is true, wait at last wait_sync_time seconds for slave syncing this log
wait_sync_time = 1
# If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log
wait_sync_time = 500
# If sync is true, wait at most min(wait_max_slave_acks, (n + 1) / 2) to promise syncing ok.
# n is slave number
# If 0, wait (n + 1) / 2 acks.
wait_max_slave_acks = 2
# Expire write ahead logs after the given days
expired_log_days = 7
# Sync log to disk if possible
# 0: no sync
# 1: sync every second
# 2: sync every commit
sync_log = 0
# Compress the log or not
compression = true
[snapshot]
# Path to store snapshot dump file
# if not set, use data_dir/snapshot
# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp
path = ""
# Reserve newest max_num snapshot dump files
max_num = 1

View File

@ -46,11 +46,6 @@ var (
}
)
const (
RDWRMode = 0
ROnlyMode = 1
)
const (
defaultScanCount int = 10
)

View File

@ -46,20 +46,21 @@ func (l *Ledis) Dump(w io.Writer) error {
var commitID uint64
var snap *store.Snapshot
{
l.wLock.Lock()
defer l.wLock.Unlock()
if l.r != nil {
if commitID, err = l.r.LastCommitID(); err != nil {
l.wLock.Unlock()
return err
}
}
if snap, err = l.ldb.NewSnapshot(); err != nil {
l.wLock.Unlock()
return err
}
}
l.wLock.Unlock()
wb := bufio.NewWriterSize(w, 4096)

View File

@ -9,7 +9,7 @@ import (
)
func TestDump(t *testing.T) {
cfgM := new(config.Config)
cfgM := config.NewConfigDefault()
cfgM.DataDir = "/tmp/test_ledis_master"
os.RemoveAll(cfgM.DataDir)
@ -19,7 +19,7 @@ func TestDump(t *testing.T) {
t.Fatal(err)
}
cfgS := new(config.Config)
cfgS := config.NewConfigDefault()
cfgS.DataDir = "/tmp/test_ledis_slave"
os.RemoveAll(cfgM.DataDir)

View File

@ -33,17 +33,10 @@ type Ledis struct {
wLock sync.RWMutex //allow one write at same time
commitLock sync.Mutex //allow one write commit at same time
// for readonly mode, only replication and flushall can write
readOnly bool
lock io.Closer
}
func Open(cfg *config.Config) (*Ledis, error) {
return Open2(cfg, RDWRMode)
}
func Open2(cfg *config.Config, flags int) (*Ledis, error) {
if len(cfg.DataDir) == 0 {
cfg.DataDir = config.DefaultDataDir
}
@ -53,13 +46,12 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) {
var err error
l := new(Ledis)
l.cfg = cfg
if l.lock, err = filelock.Lock(path.Join(cfg.DataDir, "LOCK")); err != nil {
return nil, err
}
l.readOnly = (flags&ROnlyMode > 0)
l.quit = make(chan struct{})
if l.ldb, err = store.Open(cfg); err != nil {
@ -163,7 +155,7 @@ func (l *Ledis) flushAll() error {
}
func (l *Ledis) IsReadOnly() bool {
if l.readOnly {
if l.cfg.Readonly {
return true
} else if l.r != nil {
if b, _ := l.r.CommitIDBehind(); b {
@ -173,10 +165,6 @@ func (l *Ledis) IsReadOnly() bool {
return false
}
func (l *Ledis) SetReadOnly(b bool) {
l.readOnly = b
}
func (l *Ledis) onDataExpired() {
defer l.wg.Done()

View File

@ -12,7 +12,7 @@ var testLedisOnce sync.Once
func getTestDB() *DB {
f := func() {
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/test_ledis"
os.RemoveAll(cfg.DataDir)

View File

@ -110,7 +110,7 @@ func (l *Ledis) WaitReplication() error {
func (l *Ledis) StoreLogsFromReader(rb io.Reader) error {
if !l.ReplicationUsed() {
return ErrRplNotSupport
} else if !l.readOnly {
} else if !l.cfg.Readonly {
return ErrRplInRDWR
}

View File

@ -30,7 +30,7 @@ func TestReplication(t *testing.T) {
var slave *Ledis
var err error
cfgM := new(config.Config)
cfgM := config.NewConfigDefault()
cfgM.DataDir = "/tmp/test_repl/master"
cfgM.UseReplication = true
@ -43,13 +43,14 @@ func TestReplication(t *testing.T) {
t.Fatal(err)
}
cfgS := new(config.Config)
cfgS := config.NewConfigDefault()
cfgS.DataDir = "/tmp/test_repl/slave"
cfgS.UseReplication = true
cfgS.Readonly = true
os.RemoveAll(cfgS.DataDir)
slave, err = Open2(cfgS, ROnlyMode)
slave, err = Open(cfgS)
if err != nil {
t.Fatal(err)
}

View File

@ -190,7 +190,7 @@ func testTxSelect(t *testing.T, db *DB) {
}
func testTx(t *testing.T, name string) {
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/ledis_test_tx"
cfg.DBName = name

View File

@ -21,6 +21,8 @@ type GoLevelDBStore struct {
first uint64
last uint64
lastCommit time.Time
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
@ -132,7 +134,17 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
w.Put(key, buf.Bytes())
}
if err := w.Commit(); err != nil {
n := time.Now()
if s.cfg.Replication.SyncLog == 2 ||
(s.cfg.Replication.SyncLog == 1 && n.Sub(s.lastCommit) > time.Second) {
err = w.SyncCommit()
} else {
err = w.Commit()
}
s.lastCommit = n
if err != nil {
return err
}
@ -257,12 +269,12 @@ func (s *GoLevelDBStore) open() error {
}
func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.DBName = "goleveldb"
cfg.DBPath = base
cfg.LevelDB.BlockSize = 4 * 1024 * 1024
cfg.LevelDB.CacheSize = 16 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024
cfg.LevelDB.BlockSize = 16 * 1024 * 1024
cfg.LevelDB.CacheSize = 64 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024
cfg.LevelDB.Compression = false
s := new(GoLevelDBStore)

View File

@ -14,7 +14,7 @@ func TestReplication(t *testing.T) {
}
defer os.RemoveAll(dir)
c := new(config.Config)
c := config.NewConfigDefault()
c.Replication.Path = dir
r, err := NewReplication(c)

View File

@ -34,6 +34,8 @@ type App struct {
// handle slaves
slock sync.Mutex
slaves map[*client]struct{}
snap *snapshotStore
}
func netType(s string) string {
@ -88,13 +90,16 @@ func NewApp(cfg *config.Config) (*App, error) {
}
}
flag := ledis.RDWRMode
if len(app.cfg.SlaveOf) > 0 {
//slave must readonly
flag = ledis.ROnlyMode
if app.snap, err = newSnapshotStore(cfg); err != nil {
return nil, err
}
if app.ldb, err = ledis.Open2(cfg, flag); err != nil {
if len(app.cfg.SlaveOf) > 0 {
//slave must readonly
app.cfg.Readonly = true
}
if app.ldb, err = ledis.Open(cfg); err != nil {
return nil, err
}
@ -126,6 +131,8 @@ func (app *App) Close() {
app.m.Close()
app.snap.Close()
if app.access != nil {
app.access.Close()
}
@ -135,7 +142,7 @@ func (app *App) Close() {
func (app *App) Run() {
if len(app.cfg.SlaveOf) > 0 {
app.slaveof(app.cfg.SlaveOf, false)
app.slaveof(app.cfg.SlaveOf, false, app.cfg.Readonly)
}
go app.httpServe()

View File

@ -29,7 +29,7 @@ func startTestApp() {
f := func() {
newTestLedisClient()
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/testdb"
os.RemoveAll(cfg.DataDir)

View File

@ -4,27 +4,27 @@ import (
"fmt"
"github.com/siddontang/go/hack"
"github.com/siddontang/ledisdb/ledis"
"io/ioutil"
"os"
"strconv"
"strings"
"time"
)
func slaveofCommand(c *client) error {
args := c.args
if len(args) != 2 || len(args) != 3 {
if len(args) != 2 && len(args) != 3 {
return ErrCmdParams
}
masterAddr := ""
restart := false
readonly := false
if strings.ToLower(hack.String(args[0])) == "no" &&
strings.ToLower(hack.String(args[1])) == "one" {
//stop replication, use master = ""
if len(args) != 2 {
return ErrCmdParams
if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "readonly" {
readonly = true
}
} else {
if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil {
@ -38,7 +38,7 @@ func slaveofCommand(c *client) error {
}
}
if err := c.app.slaveof(masterAddr, restart); err != nil {
if err := c.app.slaveof(masterAddr, restart, readonly); err != nil {
return err
}
@ -48,27 +48,46 @@ func slaveofCommand(c *client) error {
}
func fullsyncCommand(c *client) error {
//todo, multi fullsync may use same dump file
dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_")
args := c.args
needNew := false
if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" {
needNew = true
}
var s *snapshot
var err error
var t time.Time
dumper := c.app.ldb
if needNew {
s, t, err = c.app.snap.Create(dumper)
} else {
if s, t, err = c.app.snap.OpenLatest(); err != nil {
return err
} else if s == nil {
s, t, err = c.app.snap.Create(dumper)
} else {
gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2
minT := time.Now().Add(-gap)
//snapshot is too old
if t.Before(minT) {
s.Close()
s, t, err = c.app.snap.Create(dumper)
}
}
}
if err != nil {
return err
}
if err = c.app.ldb.Dump(dumpFile); err != nil {
return err
}
n := s.Size()
st, _ := dumpFile.Stat()
n := st.Size()
c.resp.writeBulkFrom(n, s)
dumpFile.Seek(0, os.SEEK_SET)
c.resp.writeBulkFrom(n, dumpFile)
name := dumpFile.Name()
dumpFile.Close()
os.Remove(name)
s.Close()
return nil
}

View File

@ -37,12 +37,12 @@ func TestReplication(t *testing.T) {
data_dir := "/tmp/test_replication"
os.RemoveAll(data_dir)
masterCfg := new(config.Config)
masterCfg := config.NewConfigDefault()
masterCfg.DataDir = fmt.Sprintf("%s/master", data_dir)
masterCfg.Addr = "127.0.0.1:11182"
masterCfg.UseReplication = true
masterCfg.Replication.Sync = true
masterCfg.Replication.WaitSyncTime = 5
masterCfg.Replication.WaitSyncTime = 5000
var master *App
var slave *App
@ -53,7 +53,7 @@ func TestReplication(t *testing.T) {
}
defer master.Close()
slaveCfg := new(config.Config)
slaveCfg := config.NewConfigDefault()
slaveCfg.DataDir = fmt.Sprintf("%s/slave", data_dir)
slaveCfg.Addr = "127.0.0.1:11183"
slaveCfg.SlaveOf = masterCfg.Addr
@ -96,7 +96,7 @@ func TestReplication(t *testing.T) {
t.Fatal(err)
}
slave.slaveof("", false)
slave.slaveof("", false, false)
db.Set([]byte("a2"), value)
db.Set([]byte("b2"), value)
@ -112,7 +112,7 @@ func TestReplication(t *testing.T) {
t.Fatal("must error")
}
slave.slaveof(masterCfg.Addr, false)
slave.slaveof(masterCfg.Addr, false, false)
time.Sleep(1 * time.Second)

View File

@ -9,7 +9,7 @@
//
// Start a ledis server is very simple:
//
// cfg := new(config.Config)
// cfg := config.NewConfigDefault()
// cfg.Addr = "127.0.0.1:6380"
// cfg.DataDir = "/tmp/ledis"
// app := server.NewApp(cfg)

View File

@ -28,6 +28,11 @@ type info struct {
Persistence struct {
DBName string
}
Replication struct {
PubLogNum int64
PubLogTotalTime int64 //milliseconds
}
}
func newInfo(app *App) (i *info, err error) {
@ -115,7 +120,8 @@ func (i *info) dumpServer(buf *bytes.Buffer) {
i.dumpPairs(buf, infoPair{"os", i.Server.OS},
infoPair{"process_id", i.Server.ProceessId},
infoPair{"addr", i.app.cfg.Addr},
infoPair{"http_addr", i.app.cfg.HttpAddr})
infoPair{"http_addr", i.app.cfg.HttpAddr},
infoPair{"readonly", i.app.cfg.Readonly})
}
func (i *info) dumpClients(buf *bytes.Buffer) {
@ -155,16 +161,29 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
slaves = append(slaves, s.remoteAddr)
}
p = append(p, infoPair{"readonly", i.app.ldb.IsReadOnly()})
num := i.Replication.PubLogNum
p = append(p, infoPair{"pub_log_num", num})
if num != 0 {
p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num})
} else {
p = append(p, infoPair{"pub_log_per_time", 0})
}
p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf})
if len(slaves) > 0 {
p = append(p, infoPair{"slave", strings.Join(slaves, ",")})
p = append(p, infoPair{"slaves", strings.Join(slaves, ",")})
}
if s, _ := i.app.ldb.ReplicationStat(); s != nil {
p = append(p, infoPair{"last_log_id", s.LastID})
p = append(p, infoPair{"first_log_id", s.FirstID})
p = append(p, infoPair{"commit_log_id", s.CommitID})
} else {
p = append(p, infoPair{"last_log_id", 0})
p = append(p, infoPair{"first_log_id", 0})
p = append(p, infoPair{"commit_log_id", 0})
}
i.dumpPairs(buf, p...)

View File

@ -15,6 +15,7 @@ import (
"path"
"strconv"
"sync"
"sync/atomic"
"time"
)
@ -93,7 +94,7 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
m.quit = make(chan struct{}, 1)
m.app.ldb.SetReadOnly(true)
m.app.cfg.Readonly = true
m.wg.Add(1)
go m.runReplication(restart)
@ -238,10 +239,16 @@ func (m *master) sync() error {
}
func (app *App) slaveof(masterAddr string, restart bool) error {
func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error {
app.m.Lock()
defer app.m.Unlock()
//in master mode and no slaveof, only set readonly
if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 {
app.cfg.Readonly = readonly
return nil
}
if !app.ldb.ReplicationUsed() {
return fmt.Errorf("slaveof must enable replication")
}
@ -253,7 +260,7 @@ func (app *App) slaveof(masterAddr string, restart bool) error {
return err
}
app.ldb.SetReadOnly(false)
app.cfg.Readonly = readonly
} else {
return app.m.startReplication(masterAddr, restart)
}
@ -287,7 +294,10 @@ func (app *App) removeSlave(c *client) {
app.slock.Lock()
defer app.slock.Unlock()
if _, ok := app.slaves[c]; ok {
delete(app.slaves, c)
log.Info("remove slave %s", c.remoteAddr)
}
if c.ack != nil {
asyncNotifyUint64(c.ack.ch, c.lastLogID)
@ -313,7 +323,7 @@ func (app *App) publishNewLog(l *rpl.Log) {
logId := l.ID
for s, _ := range app.slaves {
if s.lastLogID >= logId {
//slave has already this log
//slave has already owned this log
ss = []*client{}
break
} else {
@ -327,6 +337,8 @@ func (app *App) publishNewLog(l *rpl.Log) {
return
}
startTime := time.Now()
ack := &syncAck{
logId, make(chan uint64, len(ss)),
}
@ -357,7 +369,11 @@ func (app *App) publishNewLog(l *rpl.Log) {
select {
case <-done:
case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second):
case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Millisecond):
log.Info("replication wait timeout")
}
stopTime := time.Now()
atomic.AddInt64(&app.info.Replication.PubLogNum, 1)
atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6)
}

View File

@ -9,7 +9,7 @@ import (
)
func TestScan(t *testing.T) {
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/test_scan"
cfg.Addr = "127.0.0.1:11185"

View File

@ -101,7 +101,7 @@ var testScript4 = `
`
func TestLuaCall(t *testing.T) {
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.Addr = ":11188"
cfg.DataDir = "/tmp/testscript"
cfg.DBName = "memory"

240
server/snapshot.go Normal file
View File

@ -0,0 +1,240 @@
package server
import (
"fmt"
"github.com/siddontang/go/log"
"github.com/siddontang/ledisdb/config"
"io"
"io/ioutil"
"os"
"path"
"sort"
"sync"
"time"
)
const (
snapshotTimeFormat = "2006-01-02T15:04:05.999999999"
)
type snapshotStore struct {
sync.Mutex
cfg *config.Config
names []string
quit chan struct{}
}
func snapshotName(t time.Time) string {
return fmt.Sprintf("dmp-%s", t.Format(snapshotTimeFormat))
}
func parseSnapshotName(name string) (time.Time, error) {
var timeString string
if _, err := fmt.Sscanf(name, "dmp-%s", &timeString); err != nil {
println(err.Error())
return time.Time{}, err
}
when, err := time.Parse(snapshotTimeFormat, timeString)
if err != nil {
return time.Time{}, err
}
return when, nil
}
func newSnapshotStore(cfg *config.Config) (*snapshotStore, error) {
if len(cfg.Snapshot.Path) == 0 {
cfg.Snapshot.Path = path.Join(cfg.DataDir, "snapshot")
}
if err := os.MkdirAll(cfg.Snapshot.Path, 0755); err != nil {
return nil, err
}
s := new(snapshotStore)
s.cfg = cfg
s.names = make([]string, 0, s.cfg.Snapshot.MaxNum)
s.quit = make(chan struct{})
if err := s.checkSnapshots(); err != nil {
return nil, err
}
go s.run()
return s, nil
}
func (s *snapshotStore) Close() {
close(s.quit)
}
func (s *snapshotStore) checkSnapshots() error {
cfg := s.cfg
snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path)
if err != nil {
log.Error("read %s error: %s", cfg.Snapshot.Path, err.Error())
return err
}
names := []string{}
for _, info := range snapshots {
if path.Ext(info.Name()) == ".tmp" {
log.Error("temp snapshot file name %s, try remove", info.Name())
os.Remove(path.Join(cfg.Snapshot.Path, info.Name()))
continue
}
if _, err := parseSnapshotName(info.Name()); err != nil {
log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error())
continue
}
names = append(names, info.Name())
}
//from old to new
sort.Strings(names)
s.names = names
s.purge(false)
return nil
}
func (s *snapshotStore) run() {
t := time.NewTicker(60 * time.Minute)
defer t.Stop()
for {
select {
case <-t.C:
s.Lock()
if err := s.checkSnapshots(); err != nil {
log.Error("check snapshots error %s", err.Error())
}
s.Unlock()
case <-s.quit:
return
}
}
}
func (s *snapshotStore) purge(create bool) {
var names []string
maxNum := s.cfg.Snapshot.MaxNum
num := len(s.names) - maxNum
if create {
num++
if num > len(s.names) {
num = len(s.names)
}
}
if num > 0 {
names = s.names[0:num]
n := copy(s.names, s.names[num:])
s.names = s.names[0:n]
}
for _, name := range names {
if err := os.Remove(s.snapshotPath(name)); err != nil {
log.Error("purge snapshot %s error %s", name, err.Error())
}
}
}
func (s *snapshotStore) snapshotPath(name string) string {
return path.Join(s.cfg.Snapshot.Path, name)
}
type snapshotDumper interface {
Dump(w io.Writer) error
}
type snapshot struct {
io.ReadCloser
f *os.File
}
func (st *snapshot) Read(b []byte) (int, error) {
return st.f.Read(b)
}
func (st *snapshot) Close() error {
return st.f.Close()
}
func (st *snapshot) Size() int64 {
s, _ := st.f.Stat()
return s.Size()
}
func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) {
s.Lock()
defer s.Unlock()
s.purge(true)
now := time.Now()
name := snapshotName(now)
tmpName := name + ".tmp"
if len(s.names) > 0 {
lastTime, _ := parseSnapshotName(s.names[len(s.names)-1])
if !now.After(lastTime) {
return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ",
now.Format(snapshotTimeFormat), lastTime.Format(snapshotTimeFormat))
}
}
f, err := os.OpenFile(s.snapshotPath(tmpName), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, time.Time{}, err
}
if err := d.Dump(f); err != nil {
f.Close()
os.Remove(s.snapshotPath(tmpName))
return nil, time.Time{}, err
}
f.Close()
if err := os.Rename(s.snapshotPath(tmpName), s.snapshotPath(name)); err != nil {
return nil, time.Time{}, err
}
if f, err = os.Open(s.snapshotPath(name)); err != nil {
return nil, time.Time{}, err
}
s.names = append(s.names, name)
return &snapshot{f: f}, now, nil
}
func (s *snapshotStore) OpenLatest() (*snapshot, time.Time, error) {
s.Lock()
defer s.Unlock()
if len(s.names) == 0 {
return nil, time.Time{}, nil
}
name := s.names[len(s.names)-1]
t, _ := parseSnapshotName(name)
f, err := os.Open(s.snapshotPath(name))
if err != nil {
return nil, time.Time{}, err
}
return &snapshot{f: f}, t, err
}

77
server/snapshot_test.go Normal file
View File

@ -0,0 +1,77 @@
package server
import (
"github.com/siddontang/ledisdb/config"
"io"
"io/ioutil"
"os"
"path"
"testing"
)
type testSnapshotDumper struct {
}
func (d *testSnapshotDumper) Dump(w io.Writer) error {
w.Write([]byte("hello world"))
return nil
}
func TestSnapshot(t *testing.T) {
cfg := config.NewConfigDefault()
cfg.Snapshot.MaxNum = 2
cfg.Snapshot.Path = path.Join(os.TempDir(), "snapshot")
defer os.RemoveAll(cfg.Snapshot.Path)
d := new(testSnapshotDumper)
s, err := newSnapshotStore(cfg)
if err != nil {
t.Fatal(err)
}
if f, _, err := s.Create(d); err != nil {
t.Fatal(err)
} else {
defer f.Close()
if b, _ := ioutil.ReadAll(f); string(b) != "hello world" {
t.Fatal("invalid read snapshot")
}
if len(s.names) != 1 {
t.Fatal("must 1 snapshot")
}
}
if f, _, err := s.Create(d); err != nil {
t.Fatal(err)
} else {
defer f.Close()
if b, _ := ioutil.ReadAll(f); string(b) != "hello world" {
t.Fatal("invalid read snapshot")
}
if len(s.names) != 2 {
t.Fatal("must 2 snapshot")
}
}
if f, _, err := s.Create(d); err != nil {
t.Fatal(err)
} else {
defer f.Close()
if b, _ := ioutil.ReadAll(f); string(b) != "hello world" {
t.Fatal("invalid read snapshot")
}
if len(s.names) != 2 {
t.Fatal("must 2 snapshot")
}
}
fs, _ := ioutil.ReadDir(cfg.Snapshot.Path)
if len(fs) != 2 {
t.Fatal("must 2 snapshot")
}
s.Close()
}

View File

@ -98,7 +98,14 @@ func (db *DB) Delete(key []byte) error {
return b.Delete(key)
})
return err
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.Put(key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.Delete(key)
}
func (db *DB) NewIterator() driver.IIterator {
@ -152,6 +159,10 @@ func (db *DB) BatchPut(writes []driver.Write) error {
return err
}
func (db *DB) SyncBatchPut(writes []driver.Write) error {
return db.BatchPut(writes)
}
func (db *DB) Compact() error {
return nil
}

View File

@ -48,6 +48,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error {
return nil
}
func (t *Tx) SyncBatchPut(writes []driver.Write) error {
return t.BatchPut(writes)
}
func (t *Tx) Rollback() error {
return t.tx.Rollback()
}

View File

@ -2,6 +2,7 @@ package driver
type BatchPuter interface {
BatchPut([]Write) error
SyncBatchPut([]Write) error
}
type Write struct {
@ -29,6 +30,10 @@ func (w *WriteBatch) Commit() error {
return w.batch.BatchPut(w.wb)
}
func (w *WriteBatch) SyncCommit() error {
return w.batch.SyncBatchPut(w.wb)
}
func (w *WriteBatch) Rollback() error {
w.wb = w.wb[0:0]
return nil

View File

@ -16,6 +16,9 @@ type IDB interface {
Put(key []byte, value []byte) error
Delete(key []byte) error
SyncPut(key []byte, value []byte) error
SyncDelete(key []byte) error
NewIterator() IIterator
NewWriteBatch() IWriteBatch
@ -53,6 +56,7 @@ type IWriteBatch interface {
Put(key []byte, value []byte)
Delete(key []byte)
Commit() error
SyncCommit() error
Rollback() error
}

View File

@ -21,6 +21,10 @@ func (w *WriteBatch) Commit() error {
return w.db.db.Write(w.wbatch, nil)
}
func (w *WriteBatch) SyncCommit() error {
return w.db.db.Write(w.wbatch, w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
w.wbatch.Reset()
return nil

View File

@ -41,6 +41,8 @@ type DB struct {
iteratorOpts *opt.ReadOptions
syncOpts *opt.WriteOptions
cache cache.Cache
filter filter.Filter
@ -102,14 +104,15 @@ func (db *DB) initOpts() {
db.iteratorOpts = &opt.ReadOptions{}
db.iteratorOpts.DontFillCache = true
db.syncOpts = &opt.WriteOptions{}
db.syncOpts.Sync = true
}
func newOptions(cfg *config.LevelDBConfig) *opt.Options {
opts := &opt.Options{}
opts.ErrorIfMissing = false
cfg.Adjust()
opts.BlockCache = cache.NewLRUCache(cfg.CacheSize)
//we must use bloomfilter
@ -147,6 +150,14 @@ func (db *DB) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.db.Put(key, value, db.syncOpts)
}
func (db *DB) SyncDelete(key []byte) error {
return db.db.Delete(key, db.syncOpts)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil

View File

@ -81,6 +81,8 @@ type DB struct {
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
@ -106,8 +108,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
opts.SetCreateIfMissing(true)
cfg.Adjust()
db.cache = NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
@ -132,6 +132,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
@ -171,6 +174,14 @@ func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil

View File

@ -81,6 +81,8 @@ type DB struct {
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
@ -106,8 +108,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
opts.SetCreateIfMissing(true)
cfg.Adjust()
db.cache = NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
@ -132,6 +132,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
@ -171,6 +174,14 @@ func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -122,6 +122,14 @@ func (db MDB) BatchPut(writes []driver.Write) error {
return itr.err
}
func (db MDB) SyncBatchPut(writes []driver.Write) error {
if err := db.BatchPut(writes); err != nil {
return err
}
return db.env.Sync(1)
}
func (db MDB) Get(key []byte) ([]byte, error) {
tx, err := db.env.BeginTxn(nil, mdb.RDONLY)
if err != nil {
@ -148,6 +156,22 @@ func (db MDB) Delete(key []byte) error {
return itr.Error()
}
func (db MDB) SyncPut(key []byte, value []byte) error {
if err := db.Put(key, value); err != nil {
return err
}
return db.env.Sync(1)
}
func (db MDB) SyncDelete(key []byte) error {
if err := db.Delete(key); err != nil {
return err
}
return db.env.Sync(1)
}
type MDBIterator struct {
key []byte
value []byte

View File

@ -74,7 +74,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error {
itr.setState()
return itr.Close()
}
func (t *Tx) SyncBatchPut(writes []driver.Write) error {
return t.BatchPut(writes)
}
func (t *Tx) Rollback() error {

View File

@ -45,6 +45,10 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.rocksdb_writebatch_clear(w.wbatch)
return nil

View File

@ -85,6 +85,8 @@ type DB struct {
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
@ -111,8 +113,6 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
opts.SetCreateIfMissing(true)
cfg.Adjust()
db.env = NewDefaultEnv()
db.env.SetBackgroundThreads(runtime.NumCPU() * 2)
db.env.SetHighPriorityBackgroundThreads(1)
@ -152,6 +152,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
@ -197,6 +200,14 @@ func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -10,7 +10,7 @@ import (
)
func TestStore(t *testing.T) {
cfg := new(config.Config)
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/testdb"
cfg.LMDB.MapSize = 10 * 1024 * 1024