forked from mirror/ledisdb
add replconf for replication
This commit is contained in:
parent
f09e43ad99
commit
c3824552b6
|
@ -32,8 +32,9 @@ type App struct {
|
|||
s *script
|
||||
|
||||
// handle slaves
|
||||
slock sync.Mutex
|
||||
slaves map[*client]struct{}
|
||||
slock sync.Mutex
|
||||
slaves map[string]*client
|
||||
slaveSyncAck chan uint64
|
||||
|
||||
snap *snapshotStore
|
||||
}
|
||||
|
@ -60,7 +61,8 @@ func NewApp(cfg *config.Config) (*App, error) {
|
|||
|
||||
app.cfg = cfg
|
||||
|
||||
app.slaves = make(map[*client]struct{})
|
||||
app.slaves = make(map[string]*client)
|
||||
app.slaveSyncAck = make(chan uint64)
|
||||
|
||||
var err error
|
||||
|
||||
|
|
|
@ -64,8 +64,6 @@ type client struct {
|
|||
|
||||
lastLogID uint64
|
||||
|
||||
ack *syncAck
|
||||
|
||||
reqErr chan error
|
||||
|
||||
buf bytes.Buffer
|
||||
|
@ -73,6 +71,8 @@ type client struct {
|
|||
tx *ledis.Tx
|
||||
|
||||
script *ledis.Multi
|
||||
|
||||
slaveListeningAddr string
|
||||
}
|
||||
|
||||
func newClient(app *App) *client {
|
||||
|
|
|
@ -3,7 +3,9 @@ package server
|
|||
import (
|
||||
"fmt"
|
||||
"github.com/siddontang/go/hack"
|
||||
"github.com/siddontang/go/num"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -107,10 +109,7 @@ func syncCommand(c *client) error {
|
|||
|
||||
c.lastLogID = logId - 1
|
||||
|
||||
if c.ack != nil && logId > c.ack.id {
|
||||
asyncNotifyUint64(c.ack.ch, logId)
|
||||
c.ack = nil
|
||||
}
|
||||
c.app.slaveAck(c)
|
||||
|
||||
c.syncBuf.Reset()
|
||||
|
||||
|
@ -122,8 +121,39 @@ func syncCommand(c *client) error {
|
|||
c.resp.writeBulk(buf)
|
||||
}
|
||||
|
||||
c.app.addSlave(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
//inner command, only for replication
|
||||
//REPLCONF <option> <value> <option> <value> ...
|
||||
func replconfCommand(c *client) error {
|
||||
args := c.args
|
||||
if len(args)%2 != 0 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
//now only support "listening-port"
|
||||
for i := 0; i < len(args); i += 2 {
|
||||
switch strings.ToLower(hack.String(args[i])) {
|
||||
case "listening-port":
|
||||
var host string
|
||||
var err error
|
||||
if _, err = num.ParseUint16(hack.String(args[i+1])); err != nil {
|
||||
return err
|
||||
}
|
||||
if host, _, err = net.SplitHostPort(c.remoteAddr); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.slaveListeningAddr = net.JoinHostPort(host, hack.String(args[i+1]))
|
||||
}
|
||||
|
||||
c.app.addSlave(c)
|
||||
default:
|
||||
return ErrSyntax
|
||||
}
|
||||
}
|
||||
|
||||
c.resp.writeStatus(OK)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -131,4 +161,5 @@ func init() {
|
|||
register("slaveof", slaveofCommand)
|
||||
register("fullsync", fullsyncCommand)
|
||||
register("sync", syncCommand)
|
||||
register("replconf", replconfCommand)
|
||||
}
|
||||
|
|
|
@ -149,10 +149,12 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
|
|||
buf.WriteString("# Replication\r\n")
|
||||
|
||||
p := []infoPair{}
|
||||
i.app.slock.Lock()
|
||||
slaves := make([]string, 0, len(i.app.slaves))
|
||||
for s, _ := range i.app.slaves {
|
||||
slaves = append(slaves, s.remoteAddr)
|
||||
for _, s := range i.app.slaves {
|
||||
slaves = append(slaves, s.slaveListeningAddr)
|
||||
}
|
||||
i.app.slock.Unlock()
|
||||
|
||||
num := i.Replication.PubLogNum
|
||||
p = append(p, infoPair{"pub_log_num", num})
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -77,6 +78,7 @@ func (m *master) connect() error {
|
|||
|
||||
m.rb = bufio.NewReaderSize(m.conn, 4096)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -116,6 +118,11 @@ func (m *master) runReplication(restart bool) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := m.replConf(); err != nil {
|
||||
log.Error("replconf error %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if restart {
|
||||
if err := m.fullSync(); err != nil {
|
||||
if m.conn != nil {
|
||||
|
@ -148,10 +155,32 @@ func (m *master) runReplication(restart bool) {
|
|||
}
|
||||
|
||||
var (
|
||||
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
||||
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
|
||||
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
||||
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
|
||||
replconfCmdFormat = "*3\r\n$8\r\nreplconf\r\n$14\r\nlistening-port\r\n$%d\r\n%s\r\n" //replconf listening-port port
|
||||
)
|
||||
|
||||
func (m *master) replConf() error {
|
||||
_, port, err := net.SplitHostPort(m.app.cfg.Addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd := hack.Slice(fmt.Sprintf(replconfCmdFormat, len(port), port))
|
||||
|
||||
if _, err := m.conn.Write(cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s, err := ReadStatus(m.rb); err != nil {
|
||||
return err
|
||||
} else if strings.ToLower(s) != "ok" {
|
||||
return fmt.Errorf("not ok but %s", s)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *master) fullSync() error {
|
||||
log.Info("begin full sync")
|
||||
|
||||
|
@ -284,24 +313,39 @@ func (app *App) tryReSlaveof() error {
|
|||
}
|
||||
|
||||
func (app *App) addSlave(c *client) {
|
||||
addr := c.slaveListeningAddr
|
||||
|
||||
app.slock.Lock()
|
||||
defer app.slock.Unlock()
|
||||
|
||||
app.slaves[c] = struct{}{}
|
||||
app.slaves[addr] = c
|
||||
}
|
||||
|
||||
func (app *App) removeSlave(c *client) {
|
||||
addr := c.slaveListeningAddr
|
||||
|
||||
app.slock.Lock()
|
||||
defer app.slock.Unlock()
|
||||
|
||||
if _, ok := app.slaves[c]; ok {
|
||||
delete(app.slaves, c)
|
||||
log.Info("remove slave %s", c.remoteAddr)
|
||||
if _, ok := app.slaves[addr]; ok {
|
||||
delete(app.slaves, addr)
|
||||
log.Info("remove slave %s", addr)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *App) slaveAck(c *client) {
|
||||
addr := c.slaveListeningAddr
|
||||
|
||||
app.slock.Lock()
|
||||
defer app.slock.Unlock()
|
||||
|
||||
if _, ok := app.slaves[addr]; !ok {
|
||||
//slave not add
|
||||
return
|
||||
}
|
||||
|
||||
if c.ack != nil {
|
||||
asyncNotifyUint64(c.ack.ch, c.lastLogID)
|
||||
}
|
||||
println("ack", c.lastLogID)
|
||||
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
|
||||
}
|
||||
|
||||
func asyncNotifyUint64(ch chan uint64, v uint64) {
|
||||
|
@ -317,47 +361,36 @@ func (app *App) publishNewLog(l *rpl.Log) {
|
|||
return
|
||||
}
|
||||
|
||||
ss := make([]*client, 0, 4)
|
||||
app.slock.Lock()
|
||||
|
||||
total := (len(app.slaves) + 1) / 2
|
||||
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
|
||||
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
|
||||
}
|
||||
|
||||
n := 0
|
||||
logId := l.ID
|
||||
for s, _ := range app.slaves {
|
||||
for _, s := range app.slaves {
|
||||
if s.lastLogID >= logId {
|
||||
//slave has already owned this log
|
||||
ss = []*client{}
|
||||
break
|
||||
} else {
|
||||
ss = append(ss, s)
|
||||
n++
|
||||
}
|
||||
}
|
||||
|
||||
app.slock.Unlock()
|
||||
|
||||
if len(ss) == 0 {
|
||||
if n >= total {
|
||||
//at least total slaves have owned this log
|
||||
return
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
ack := &syncAck{
|
||||
logId, make(chan uint64, len(ss)),
|
||||
}
|
||||
|
||||
for _, s := range ss {
|
||||
s.ack = ack
|
||||
}
|
||||
|
||||
total := (len(ss) + 1) / 2
|
||||
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
|
||||
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
|
||||
}
|
||||
|
||||
done := make(chan struct{}, 1)
|
||||
go func(total int) {
|
||||
n := 0
|
||||
for i := 0; i < len(ss); i++ {
|
||||
id := <-ack.ch
|
||||
if id > logId {
|
||||
for {
|
||||
id := <-app.slaveSyncAck
|
||||
if id >= logId {
|
||||
n++
|
||||
if n >= total {
|
||||
break
|
||||
|
|
|
@ -9,9 +9,10 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
errArrayFormat = errors.New("bad array format")
|
||||
errBulkFormat = errors.New("bad bulk string format")
|
||||
errLineFormat = errors.New("bad response line format")
|
||||
errArrayFormat = errors.New("bad array format")
|
||||
errBulkFormat = errors.New("bad bulk string format")
|
||||
errLineFormat = errors.New("bad response line format")
|
||||
errStatusFormat = errors.New("bad status format")
|
||||
)
|
||||
|
||||
func ReadLine(rb *bufio.Reader) ([]byte, error) {
|
||||
|
@ -54,9 +55,26 @@ func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
|
|||
return errBulkFormat
|
||||
}
|
||||
}
|
||||
} else if l[0] == '-' {
|
||||
return errors.New(string(l[1:]))
|
||||
} else {
|
||||
return errBulkFormat
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadStatus(rb *bufio.Reader) (string, error) {
|
||||
l, err := ReadLine(rb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if len(l) == 0 {
|
||||
return "", errStatusFormat
|
||||
} else if l[0] == '+' {
|
||||
return string(l[1:]), nil
|
||||
} else if l[0] == '-' {
|
||||
return "", errors.New(string(l[1:]))
|
||||
} else {
|
||||
return "", errStatusFormat
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue