update replication

This commit is contained in:
siddontang 2014-10-22 09:11:14 +08:00
parent e92dd80442
commit 9b58e105b2
5 changed files with 54 additions and 10 deletions

2
.gitignore vendored
View File

@ -3,5 +3,5 @@ build
.DS_Store .DS_Store
nohup.out nohup.out
build_config.mk build_config.mk
var var*
_workspace _workspace

View File

@ -15,12 +15,15 @@ import (
) )
var configFile = flag.String("config", "", "ledisdb config file") var configFile = flag.String("config", "", "ledisdb config file")
var addr = flag.String("addr", "", "ledisdb listen address")
var dataDir = flag.String("data_dir", "", "ledisdb base data dir")
var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name")
var usePprof = flag.Bool("pprof", false, "enable pprof") var usePprof = flag.Bool("pprof", false, "enable pprof")
var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port")
var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") var slaveof = flag.String("slaveof", "", "make the server a slave of another instance")
var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly")
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not")
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
@ -42,6 +45,14 @@ func main() {
return return
} }
if len(*addr) > 0 {
cfg.Addr = *addr
}
if len(*dataDir) > 0 {
cfg.DataDir = *dataDir
}
if len(*dbName) > 0 { if len(*dbName) > 0 {
cfg.DBName = *dbName cfg.DBName = *dbName
} }
@ -53,6 +64,7 @@ func main() {
} else { } else {
cfg.Readonly = *readonly cfg.Readonly = *readonly
cfg.UseReplication = *rpl cfg.UseReplication = *rpl
cfg.Replication.Sync = *rplSync
} }
var app *server.App var app *server.App

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"encoding/binary"
"fmt" "fmt"
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
"github.com/siddontang/go/num" "github.com/siddontang/go/num"
@ -94,6 +95,8 @@ func fullsyncCommand(c *client) error {
return nil return nil
} }
var dummyBuf = make([]byte, 8)
func syncCommand(c *client) error { func syncCommand(c *client) error {
args := c.args args := c.args
if len(args) != 1 { if len(args) != 1 {
@ -122,11 +125,20 @@ func syncCommand(c *client) error {
c.syncBuf.Reset() c.syncBuf.Reset()
c.syncBuf.Write(dummyBuf)
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil { if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil {
return err return err
} else { } else {
buf := c.syncBuf.Bytes() buf := c.syncBuf.Bytes()
stat, err = c.app.ldb.ReplicationStat()
if err != nil {
return err
}
binary.BigEndian.PutUint64(buf, stat.LastID)
c.resp.writeBulk(buf) c.resp.writeBulk(buf)
} }

View File

@ -3,6 +3,7 @@ package server
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/siddontang/go/sync2"
"os" "os"
"runtime" "runtime"
"strings" "strings"
@ -25,8 +26,11 @@ type info struct {
} }
Replication struct { Replication struct {
PubLogNum int64 PubLogNum sync2.AtomicInt64
PubLogTotalTime int64 //milliseconds PubLogAckNum sync2.AtomicInt64
PubLogTotalAckTime sync2.AtomicDuration
MasterLastLogID sync2.AtomicUint64
} }
} }
@ -156,13 +160,15 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
} }
i.app.slock.Unlock() i.app.slock.Unlock()
num := i.Replication.PubLogNum num := i.Replication.PubLogNum.Get()
p = append(p, infoPair{"pub_log_num", num}) p = append(p, infoPair{"pub_log_num", num})
if num != 0 { ackNum := i.Replication.PubLogAckNum.Get()
p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num}) totalTime := i.Replication.PubLogTotalAckTime.Get().Nanoseconds() / 1e6
if ackNum != 0 {
p = append(p, infoPair{"pub_log_ack_per_time", totalTime / ackNum})
} else { } else {
p = append(p, infoPair{"pub_log_per_time", 0}) p = append(p, infoPair{"pub_log_ack_per_time", 0})
} }
p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf}) p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf})
@ -181,6 +187,8 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
p = append(p, infoPair{"commit_log_id", 0}) p = append(p, infoPair{"commit_log_id", 0})
} }
p = append(p, infoPair{"master_last_log_id", i.Replication.MasterLastLogID.Get()})
i.dumpPairs(buf, p...) i.dumpPairs(buf, p...)
} }

View File

@ -16,7 +16,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -263,6 +262,17 @@ func (m *master) sync() error {
buf := m.syncBuf.Bytes() buf := m.syncBuf.Bytes()
if len(buf) < 8 {
return fmt.Errorf("inavlid sync size %d", len(buf))
}
m.app.info.Replication.MasterLastLogID.Set(num.BytesToUint64(buf))
var t bytes.Buffer
m.app.info.dumpReplication(&t)
buf = buf[8:]
if len(buf) == 0 { if len(buf) == 0 {
return nil return nil
} }
@ -370,6 +380,8 @@ func (app *App) publishNewLog(l *rpl.Log) {
return return
} }
app.info.Replication.PubLogNum.Add(1)
app.slock.Lock() app.slock.Lock()
total := (len(app.slaves) + 1) / 2 total := (len(app.slaves) + 1) / 2
@ -414,6 +426,6 @@ func (app *App) publishNewLog(l *rpl.Log) {
} }
stopTime := time.Now() stopTime := time.Now()
atomic.AddInt64(&app.info.Replication.PubLogNum, 1) app.info.Replication.PubLogAckNum.Add(1)
atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6) app.info.Replication.PubLogTotalAckTime.Add(stopTime.Sub(startTime))
} }