Merge branch 'replconf-feature' into develop

This commit is contained in:
siddontang 2014-10-22 09:11:49 +08:00
commit 8ad28fb592
9 changed files with 215 additions and 65 deletions

2
.gitignore vendored
View File

@ -3,5 +3,5 @@ build
.DS_Store .DS_Store
nohup.out nohup.out
build_config.mk build_config.mk
var var*
_workspace _workspace

View File

@ -15,12 +15,15 @@ import (
) )
var configFile = flag.String("config", "", "ledisdb config file") var configFile = flag.String("config", "", "ledisdb config file")
var addr = flag.String("addr", "", "ledisdb listen address")
var dataDir = flag.String("data_dir", "", "ledisdb base data dir")
var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name") var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name")
var usePprof = flag.Bool("pprof", false, "enable pprof") var usePprof = flag.Bool("pprof", false, "enable pprof")
var pprofPort = flag.Int("pprof_port", 6060, "pprof http port") var pprofPort = flag.Int("pprof_port", 6060, "pprof http port")
var slaveof = flag.String("slaveof", "", "make the server a slave of another instance") var slaveof = flag.String("slaveof", "", "make the server a slave of another instance")
var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly") var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly")
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not")
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
@ -42,6 +45,14 @@ func main() {
return return
} }
if len(*addr) > 0 {
cfg.Addr = *addr
}
if len(*dataDir) > 0 {
cfg.DataDir = *dataDir
}
if len(*dbName) > 0 { if len(*dbName) > 0 {
cfg.DBName = *dbName cfg.DBName = *dbName
} }
@ -53,6 +64,7 @@ func main() {
} else { } else {
cfg.Readonly = *readonly cfg.Readonly = *readonly
cfg.UseReplication = *rpl cfg.UseReplication = *rpl
cfg.Replication.Sync = *rplSync
} }
var app *server.App var app *server.App

View File

@ -33,7 +33,8 @@ type App struct {
// handle slaves // handle slaves
slock sync.Mutex slock sync.Mutex
slaves map[*client]struct{} slaves map[string]*client
slaveSyncAck chan uint64
snap *snapshotStore snap *snapshotStore
} }
@ -60,7 +61,8 @@ func NewApp(cfg *config.Config) (*App, error) {
app.cfg = cfg app.cfg = cfg
app.slaves = make(map[*client]struct{}) app.slaves = make(map[string]*client)
app.slaveSyncAck = make(chan uint64)
var err error var err error

View File

@ -64,8 +64,6 @@ type client struct {
lastLogID uint64 lastLogID uint64
ack *syncAck
reqErr chan error reqErr chan error
buf bytes.Buffer buf bytes.Buffer
@ -73,6 +71,8 @@ type client struct {
tx *ledis.Tx tx *ledis.Tx
script *ledis.Multi script *ledis.Multi
slaveListeningAddr string
} }
func newClient(app *App) *client { func newClient(app *App) *client {

View File

@ -51,7 +51,10 @@ func (c *respClient) run() {
log.Fatal("client run panic %s:%v", buf, e) log.Fatal("client run panic %s:%v", buf, e)
} }
handleQuit := true
if c.conn != nil { if c.conn != nil {
//if handle quit command before, conn is nil
handleQuit = false
c.conn.Close() c.conn.Close()
} }
@ -60,7 +63,7 @@ func (c *respClient) run() {
c.tx = nil c.tx = nil
} }
c.app.removeSlave(c.client) c.app.removeSlave(c.client, handleQuit)
}() }()
for { for {
@ -144,6 +147,7 @@ func (c *respClient) handleRequest(reqData [][]byte) {
c.resp.writeStatus(OK) c.resp.writeStatus(OK)
c.resp.flush() c.resp.flush()
c.conn.Close() c.conn.Close()
c.conn = nil
return return
} }

View File

@ -1,9 +1,12 @@
package server package server
import ( import (
"encoding/binary"
"fmt" "fmt"
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"net"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -92,6 +95,8 @@ func fullsyncCommand(c *client) error {
return nil return nil
} }
var dummyBuf = make([]byte, 8)
func syncCommand(c *client) error { func syncCommand(c *client) error {
args := c.args args := c.args
if len(args) != 1 { if len(args) != 1 {
@ -107,23 +112,69 @@ func syncCommand(c *client) error {
c.lastLogID = logId - 1 c.lastLogID = logId - 1
if c.ack != nil && logId > c.ack.id { stat, err := c.app.ldb.ReplicationStat()
asyncNotifyUint64(c.ack.ch, logId) if err != nil {
c.ack = nil return err
}
if c.lastLogID > stat.LastID {
return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID)
} else if c.lastLogID == stat.LastID {
c.app.slaveAck(c)
} }
c.syncBuf.Reset() c.syncBuf.Reset()
c.syncBuf.Write(dummyBuf)
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil { if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil {
return err return err
} else { } else {
buf := c.syncBuf.Bytes() buf := c.syncBuf.Bytes()
stat, err = c.app.ldb.ReplicationStat()
if err != nil {
return err
}
binary.BigEndian.PutUint64(buf, stat.LastID)
c.resp.writeBulk(buf) c.resp.writeBulk(buf)
} }
c.app.addSlave(c) return nil
}
//inner command, only for replication
//REPLCONF <option> <value> <option> <value> ...
func replconfCommand(c *client) error {
args := c.args
if len(args)%2 != 0 {
return ErrCmdParams
}
//now only support "listening-port"
for i := 0; i < len(args); i += 2 {
switch strings.ToLower(hack.String(args[i])) {
case "listening-port":
var host string
var err error
if _, err = num.ParseUint16(hack.String(args[i+1])); err != nil {
return err
}
if host, _, err = net.SplitHostPort(c.remoteAddr); err != nil {
return err
} else {
c.slaveListeningAddr = net.JoinHostPort(host, hack.String(args[i+1]))
}
c.app.addSlave(c)
default:
return ErrSyntax
}
}
c.resp.writeStatus(OK)
return nil return nil
} }
@ -131,4 +182,5 @@ func init() {
register("slaveof", slaveofCommand) register("slaveof", slaveofCommand)
register("fullsync", fullsyncCommand) register("fullsync", fullsyncCommand)
register("sync", syncCommand) register("sync", syncCommand)
register("replconf", replconfCommand)
} }

View File

@ -3,6 +3,7 @@ package server
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/siddontang/go/sync2"
"os" "os"
"runtime" "runtime"
"strings" "strings"
@ -25,8 +26,11 @@ type info struct {
} }
Replication struct { Replication struct {
PubLogNum int64 PubLogNum sync2.AtomicInt64
PubLogTotalTime int64 //milliseconds PubLogAckNum sync2.AtomicInt64
PubLogTotalAckTime sync2.AtomicDuration
MasterLastLogID sync2.AtomicUint64
} }
} }
@ -149,18 +153,22 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
buf.WriteString("# Replication\r\n") buf.WriteString("# Replication\r\n")
p := []infoPair{} p := []infoPair{}
i.app.slock.Lock()
slaves := make([]string, 0, len(i.app.slaves)) slaves := make([]string, 0, len(i.app.slaves))
for s, _ := range i.app.slaves { for _, s := range i.app.slaves {
slaves = append(slaves, s.remoteAddr) slaves = append(slaves, s.slaveListeningAddr)
} }
i.app.slock.Unlock()
num := i.Replication.PubLogNum num := i.Replication.PubLogNum.Get()
p = append(p, infoPair{"pub_log_num", num}) p = append(p, infoPair{"pub_log_num", num})
if num != 0 { ackNum := i.Replication.PubLogAckNum.Get()
p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num}) totalTime := i.Replication.PubLogTotalAckTime.Get().Nanoseconds() / 1e6
if ackNum != 0 {
p = append(p, infoPair{"pub_log_ack_per_time", totalTime / ackNum})
} else { } else {
p = append(p, infoPair{"pub_log_per_time", 0}) p = append(p, infoPair{"pub_log_ack_per_time", 0})
} }
p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf}) p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf})
@ -179,6 +187,8 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
p = append(p, infoPair{"commit_log_id", 0}) p = append(p, infoPair{"commit_log_id", 0})
} }
p = append(p, infoPair{"master_last_log_id", i.Replication.MasterLastLogID.Get()})
i.dumpPairs(buf, p...) i.dumpPairs(buf, p...)
} }

View File

@ -14,8 +14,8 @@ import (
"os" "os"
"path" "path"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -49,10 +49,17 @@ func newMaster(app *App) *master {
return m return m
} }
var (
quitCmd = []byte("*1\r\n$4\r\nquit\r\n")
)
func (m *master) Close() { func (m *master) Close() {
ledis.AsyncNotify(m.quit) ledis.AsyncNotify(m.quit)
if m.conn != nil { if m.conn != nil {
//for replication, we send quit command to close gracefully
m.conn.Write(quitCmd)
m.conn.Close() m.conn.Close()
m.conn = nil m.conn = nil
} }
@ -77,6 +84,7 @@ func (m *master) connect() error {
m.rb = bufio.NewReaderSize(m.conn, 4096) m.rb = bufio.NewReaderSize(m.conn, 4096)
} }
return nil return nil
} }
@ -116,6 +124,11 @@ func (m *master) runReplication(restart bool) {
} }
} }
if err := m.replConf(); err != nil {
log.Error("replconf error %s", err.Error())
return
}
if restart { if restart {
if err := m.fullSync(); err != nil { if err := m.fullSync(); err != nil {
if m.conn != nil { if m.conn != nil {
@ -150,8 +163,30 @@ func (m *master) runReplication(restart bool) {
var ( var (
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
replconfCmdFormat = "*3\r\n$8\r\nreplconf\r\n$14\r\nlistening-port\r\n$%d\r\n%s\r\n" //replconf listening-port port
) )
func (m *master) replConf() error {
_, port, err := net.SplitHostPort(m.app.cfg.Addr)
if err != nil {
return err
}
cmd := hack.Slice(fmt.Sprintf(replconfCmdFormat, len(port), port))
if _, err := m.conn.Write(cmd); err != nil {
return err
}
if s, err := ReadStatus(m.rb); err != nil {
return err
} else if strings.ToLower(s) != "ok" {
return fmt.Errorf("not ok but %s", s)
}
return nil
}
func (m *master) fullSync() error { func (m *master) fullSync() error {
log.Info("begin full sync") log.Info("begin full sync")
@ -227,6 +262,17 @@ func (m *master) sync() error {
buf := m.syncBuf.Bytes() buf := m.syncBuf.Bytes()
if len(buf) < 8 {
return fmt.Errorf("inavlid sync size %d", len(buf))
}
m.app.info.Replication.MasterLastLogID.Set(num.BytesToUint64(buf))
var t bytes.Buffer
m.app.info.dumpReplication(&t)
buf = buf[8:]
if len(buf) == 0 { if len(buf) == 0 {
return nil return nil
} }
@ -284,24 +330,41 @@ func (app *App) tryReSlaveof() error {
} }
func (app *App) addSlave(c *client) { func (app *App) addSlave(c *client) {
addr := c.slaveListeningAddr
app.slock.Lock() app.slock.Lock()
defer app.slock.Unlock() defer app.slock.Unlock()
app.slaves[c] = struct{}{} app.slaves[addr] = c
} }
func (app *App) removeSlave(c *client) { func (app *App) removeSlave(c *client, activeQuit bool) {
addr := c.slaveListeningAddr
app.slock.Lock() app.slock.Lock()
defer app.slock.Unlock() defer app.slock.Unlock()
if _, ok := app.slaves[c]; ok { if _, ok := app.slaves[addr]; ok {
delete(app.slaves, c) delete(app.slaves, addr)
log.Info("remove slave %s", c.remoteAddr) log.Info("remove slave %s", addr)
if activeQuit {
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
}
}
}
func (app *App) slaveAck(c *client) {
addr := c.slaveListeningAddr
app.slock.Lock()
defer app.slock.Unlock()
if _, ok := app.slaves[addr]; !ok {
//slave not add
return
} }
if c.ack != nil { asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
asyncNotifyUint64(c.ack.ch, c.lastLogID)
}
} }
func asyncNotifyUint64(ch chan uint64, v uint64) { func asyncNotifyUint64(ch chan uint64, v uint64) {
@ -317,51 +380,40 @@ func (app *App) publishNewLog(l *rpl.Log) {
return return
} }
ss := make([]*client, 0, 4) app.info.Replication.PubLogNum.Add(1)
app.slock.Lock() app.slock.Lock()
total := (len(app.slaves) + 1) / 2
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
}
n := 0
logId := l.ID logId := l.ID
for s, _ := range app.slaves { for _, s := range app.slaves {
if s.lastLogID >= logId { if s.lastLogID == logId {
//slave has already owned this log //slave has already owned this log
ss = []*client{} n++
break } else if s.lastLogID > logId {
} else { log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId)
ss = append(ss, s)
} }
} }
app.slock.Unlock() app.slock.Unlock()
if len(ss) == 0 { if n >= total {
//at least total slaves have owned this log
return return
} }
startTime := time.Now() startTime := time.Now()
ack := &syncAck{
logId, make(chan uint64, len(ss)),
}
for _, s := range ss {
s.ack = ack
}
total := (len(ss) + 1) / 2
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
}
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
go func(total int) { go func(total int) {
n := 0 for i := 0; i < total; i++ {
for i := 0; i < len(ss); i++ { id := <-app.slaveSyncAck
id := <-ack.ch if id < logId {
if id > logId { log.Info("some slave may close with last logid %d < %d", id, logId)
n++
if n >= total {
break
}
} }
} }
done <- struct{}{} done <- struct{}{}
@ -374,6 +426,6 @@ func (app *App) publishNewLog(l *rpl.Log) {
} }
stopTime := time.Now() stopTime := time.Now()
atomic.AddInt64(&app.info.Replication.PubLogNum, 1) app.info.Replication.PubLogAckNum.Add(1)
atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6) app.info.Replication.PubLogTotalAckTime.Add(stopTime.Sub(startTime))
} }

View File

@ -12,6 +12,7 @@ var (
errArrayFormat = errors.New("bad array format") errArrayFormat = errors.New("bad array format")
errBulkFormat = errors.New("bad bulk string format") errBulkFormat = errors.New("bad bulk string format")
errLineFormat = errors.New("bad response line format") errLineFormat = errors.New("bad response line format")
errStatusFormat = errors.New("bad status format")
) )
func ReadLine(rb *bufio.Reader) ([]byte, error) { func ReadLine(rb *bufio.Reader) ([]byte, error) {
@ -54,9 +55,26 @@ func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
return errBulkFormat return errBulkFormat
} }
} }
} else if l[0] == '-' {
return errors.New(string(l[1:]))
} else { } else {
return errBulkFormat return errBulkFormat
} }
return nil return nil
} }
func ReadStatus(rb *bufio.Reader) (string, error) {
l, err := ReadLine(rb)
if err != nil {
return "", err
} else if len(l) == 0 {
return "", errStatusFormat
} else if l[0] == '+' {
return string(l[1:]), nil
} else if l[0] == '-' {
return "", errors.New(string(l[1:]))
} else {
return "", errStatusFormat
}
}