mirror of https://github.com/ledisdb/ledisdb.git
Merge branch 'develop'
This commit is contained in:
commit
124756cf8b
|
@ -3,5 +3,5 @@ build
|
||||||
.DS_Store
|
.DS_Store
|
||||||
nohup.out
|
nohup.out
|
||||||
build_config.mk
|
build_config.mk
|
||||||
var
|
var*
|
||||||
_workspace
|
_workspace
|
|
@ -16,35 +16,35 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/bson",
|
"ImportPath": "github.com/siddontang/go/bson",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/filelock",
|
"ImportPath": "github.com/siddontang/go/filelock",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/hack",
|
"ImportPath": "github.com/siddontang/go/hack",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/ioutil2",
|
"ImportPath": "github.com/siddontang/go/ioutil2",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/log",
|
"ImportPath": "github.com/siddontang/go/log",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/num",
|
"ImportPath": "github.com/siddontang/go/num",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/snappy",
|
"ImportPath": "github.com/siddontang/go/snappy",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/sync2",
|
"ImportPath": "github.com/siddontang/go/sync2",
|
||||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
"Rev": "c7a17e4e4a1b72e4bc38b8b52cac8558aff4a4b1"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/goleveldb/leveldb",
|
"ImportPath": "github.com/siddontang/goleveldb/leveldb",
|
||||||
|
|
|
@ -15,12 +15,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var configFile = flag.String("config", "", "ledisdb config file")
|
var configFile = flag.String("config", "", "ledisdb config file")
|
||||||
|
var addr = flag.String("addr", "", "ledisdb listen address")
|
||||||
|
var dataDir = flag.String("data_dir", "", "ledisdb base data dir")
|
||||||
var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name")
|
var dbName = flag.String("db_name", "", "select a db to use, it will overwrite the config's db name")
|
||||||
var usePprof = flag.Bool("pprof", false, "enable pprof")
|
var usePprof = flag.Bool("pprof", false, "enable pprof")
|
||||||
var pprofPort = flag.Int("pprof_port", 6060, "pprof http port")
|
var pprofPort = flag.Int("pprof_port", 6060, "pprof http port")
|
||||||
var slaveof = flag.String("slaveof", "", "make the server a slave of another instance")
|
var slaveof = flag.String("slaveof", "", "make the server a slave of another instance")
|
||||||
var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly")
|
var readonly = flag.Bool("readonly", false, "set readonly mode, salve server is always readonly")
|
||||||
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
|
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
|
||||||
|
var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not")
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||||
|
@ -42,6 +45,14 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(*addr) > 0 {
|
||||||
|
cfg.Addr = *addr
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(*dataDir) > 0 {
|
||||||
|
cfg.DataDir = *dataDir
|
||||||
|
}
|
||||||
|
|
||||||
if len(*dbName) > 0 {
|
if len(*dbName) > 0 {
|
||||||
cfg.DBName = *dbName
|
cfg.DBName = *dbName
|
||||||
}
|
}
|
||||||
|
@ -53,6 +64,7 @@ func main() {
|
||||||
} else {
|
} else {
|
||||||
cfg.Readonly = *readonly
|
cfg.Readonly = *readonly
|
||||||
cfg.UseReplication = *rpl
|
cfg.UseReplication = *rpl
|
||||||
|
cfg.Replication.Sync = *rplSync
|
||||||
}
|
}
|
||||||
|
|
||||||
var app *server.App
|
var app *server.App
|
||||||
|
|
|
@ -33,7 +33,8 @@ type App struct {
|
||||||
|
|
||||||
// handle slaves
|
// handle slaves
|
||||||
slock sync.Mutex
|
slock sync.Mutex
|
||||||
slaves map[*client]struct{}
|
slaves map[string]*client
|
||||||
|
slaveSyncAck chan uint64
|
||||||
|
|
||||||
snap *snapshotStore
|
snap *snapshotStore
|
||||||
}
|
}
|
||||||
|
@ -60,7 +61,8 @@ func NewApp(cfg *config.Config) (*App, error) {
|
||||||
|
|
||||||
app.cfg = cfg
|
app.cfg = cfg
|
||||||
|
|
||||||
app.slaves = make(map[*client]struct{})
|
app.slaves = make(map[string]*client)
|
||||||
|
app.slaveSyncAck = make(chan uint64)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
|
@ -64,8 +64,6 @@ type client struct {
|
||||||
|
|
||||||
lastLogID uint64
|
lastLogID uint64
|
||||||
|
|
||||||
ack *syncAck
|
|
||||||
|
|
||||||
reqErr chan error
|
reqErr chan error
|
||||||
|
|
||||||
buf bytes.Buffer
|
buf bytes.Buffer
|
||||||
|
@ -73,6 +71,8 @@ type client struct {
|
||||||
tx *ledis.Tx
|
tx *ledis.Tx
|
||||||
|
|
||||||
script *ledis.Multi
|
script *ledis.Multi
|
||||||
|
|
||||||
|
slaveListeningAddr string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(app *App) *client {
|
func newClient(app *App) *client {
|
||||||
|
|
|
@ -51,7 +51,10 @@ func (c *respClient) run() {
|
||||||
log.Fatal("client run panic %s:%v", buf, e)
|
log.Fatal("client run panic %s:%v", buf, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleQuit := true
|
||||||
if c.conn != nil {
|
if c.conn != nil {
|
||||||
|
//if handle quit command before, conn is nil
|
||||||
|
handleQuit = false
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +63,7 @@ func (c *respClient) run() {
|
||||||
c.tx = nil
|
c.tx = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.app.removeSlave(c.client)
|
c.app.removeSlave(c.client, handleQuit)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -144,6 +147,7 @@ func (c *respClient) handleRequest(reqData [][]byte) {
|
||||||
c.resp.writeStatus(OK)
|
c.resp.writeStatus(OK)
|
||||||
c.resp.flush()
|
c.resp.flush()
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
|
c.conn = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/siddontang/go/hack"
|
"github.com/siddontang/go/hack"
|
||||||
|
"github.com/siddontang/go/num"
|
||||||
"github.com/siddontang/ledisdb/ledis"
|
"github.com/siddontang/ledisdb/ledis"
|
||||||
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -92,6 +95,8 @@ func fullsyncCommand(c *client) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var dummyBuf = make([]byte, 8)
|
||||||
|
|
||||||
func syncCommand(c *client) error {
|
func syncCommand(c *client) error {
|
||||||
args := c.args
|
args := c.args
|
||||||
if len(args) != 1 {
|
if len(args) != 1 {
|
||||||
|
@ -107,23 +112,69 @@ func syncCommand(c *client) error {
|
||||||
|
|
||||||
c.lastLogID = logId - 1
|
c.lastLogID = logId - 1
|
||||||
|
|
||||||
if c.ack != nil && logId > c.ack.id {
|
stat, err := c.app.ldb.ReplicationStat()
|
||||||
asyncNotifyUint64(c.ack.ch, logId)
|
if err != nil {
|
||||||
c.ack = nil
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.lastLogID > stat.LastID {
|
||||||
|
return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID)
|
||||||
|
} else if c.lastLogID == stat.LastID {
|
||||||
|
c.app.slaveAck(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.syncBuf.Reset()
|
c.syncBuf.Reset()
|
||||||
|
|
||||||
|
c.syncBuf.Write(dummyBuf)
|
||||||
|
|
||||||
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil {
|
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil {
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
buf := c.syncBuf.Bytes()
|
buf := c.syncBuf.Bytes()
|
||||||
|
|
||||||
|
stat, err = c.app.ldb.ReplicationStat()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
binary.BigEndian.PutUint64(buf, stat.LastID)
|
||||||
|
|
||||||
c.resp.writeBulk(buf)
|
c.resp.writeBulk(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.app.addSlave(c)
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//inner command, only for replication
|
||||||
|
//REPLCONF <option> <value> <option> <value> ...
|
||||||
|
func replconfCommand(c *client) error {
|
||||||
|
args := c.args
|
||||||
|
if len(args)%2 != 0 {
|
||||||
|
return ErrCmdParams
|
||||||
|
}
|
||||||
|
|
||||||
|
//now only support "listening-port"
|
||||||
|
for i := 0; i < len(args); i += 2 {
|
||||||
|
switch strings.ToLower(hack.String(args[i])) {
|
||||||
|
case "listening-port":
|
||||||
|
var host string
|
||||||
|
var err error
|
||||||
|
if _, err = num.ParseUint16(hack.String(args[i+1])); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host, _, err = net.SplitHostPort(c.remoteAddr); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
c.slaveListeningAddr = net.JoinHostPort(host, hack.String(args[i+1]))
|
||||||
|
}
|
||||||
|
|
||||||
|
c.app.addSlave(c)
|
||||||
|
default:
|
||||||
|
return ErrSyntax
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.resp.writeStatus(OK)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,4 +182,5 @@ func init() {
|
||||||
register("slaveof", slaveofCommand)
|
register("slaveof", slaveofCommand)
|
||||||
register("fullsync", fullsyncCommand)
|
register("fullsync", fullsyncCommand)
|
||||||
register("sync", syncCommand)
|
register("sync", syncCommand)
|
||||||
|
register("replconf", replconfCommand)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package server
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/siddontang/go/sync2"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -25,8 +26,11 @@ type info struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
Replication struct {
|
Replication struct {
|
||||||
PubLogNum int64
|
PubLogNum sync2.AtomicInt64
|
||||||
PubLogTotalTime int64 //milliseconds
|
PubLogAckNum sync2.AtomicInt64
|
||||||
|
PubLogTotalAckTime sync2.AtomicDuration
|
||||||
|
|
||||||
|
MasterLastLogID sync2.AtomicUint64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,18 +153,22 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
|
||||||
buf.WriteString("# Replication\r\n")
|
buf.WriteString("# Replication\r\n")
|
||||||
|
|
||||||
p := []infoPair{}
|
p := []infoPair{}
|
||||||
|
i.app.slock.Lock()
|
||||||
slaves := make([]string, 0, len(i.app.slaves))
|
slaves := make([]string, 0, len(i.app.slaves))
|
||||||
for s, _ := range i.app.slaves {
|
for _, s := range i.app.slaves {
|
||||||
slaves = append(slaves, s.remoteAddr)
|
slaves = append(slaves, s.slaveListeningAddr)
|
||||||
}
|
}
|
||||||
|
i.app.slock.Unlock()
|
||||||
|
|
||||||
num := i.Replication.PubLogNum
|
num := i.Replication.PubLogNum.Get()
|
||||||
p = append(p, infoPair{"pub_log_num", num})
|
p = append(p, infoPair{"pub_log_num", num})
|
||||||
|
|
||||||
if num != 0 {
|
ackNum := i.Replication.PubLogAckNum.Get()
|
||||||
p = append(p, infoPair{"pub_log_per_time", i.Replication.PubLogTotalTime / num})
|
totalTime := i.Replication.PubLogTotalAckTime.Get().Nanoseconds() / 1e6
|
||||||
|
if ackNum != 0 {
|
||||||
|
p = append(p, infoPair{"pub_log_ack_per_time", totalTime / ackNum})
|
||||||
} else {
|
} else {
|
||||||
p = append(p, infoPair{"pub_log_per_time", 0})
|
p = append(p, infoPair{"pub_log_ack_per_time", 0})
|
||||||
}
|
}
|
||||||
|
|
||||||
p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf})
|
p = append(p, infoPair{"slaveof", i.app.cfg.SlaveOf})
|
||||||
|
@ -179,6 +187,8 @@ func (i *info) dumpReplication(buf *bytes.Buffer) {
|
||||||
p = append(p, infoPair{"commit_log_id", 0})
|
p = append(p, infoPair{"commit_log_id", 0})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p = append(p, infoPair{"master_last_log_id", i.Replication.MasterLastLogID.Get()})
|
||||||
|
|
||||||
i.dumpPairs(buf, p...)
|
i.dumpPairs(buf, p...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,8 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -49,10 +49,17 @@ func newMaster(app *App) *master {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
quitCmd = []byte("*1\r\n$4\r\nquit\r\n")
|
||||||
|
)
|
||||||
|
|
||||||
func (m *master) Close() {
|
func (m *master) Close() {
|
||||||
ledis.AsyncNotify(m.quit)
|
ledis.AsyncNotify(m.quit)
|
||||||
|
|
||||||
if m.conn != nil {
|
if m.conn != nil {
|
||||||
|
//for replication, we send quit command to close gracefully
|
||||||
|
m.conn.Write(quitCmd)
|
||||||
|
|
||||||
m.conn.Close()
|
m.conn.Close()
|
||||||
m.conn = nil
|
m.conn = nil
|
||||||
}
|
}
|
||||||
|
@ -77,6 +84,7 @@ func (m *master) connect() error {
|
||||||
|
|
||||||
m.rb = bufio.NewReaderSize(m.conn, 4096)
|
m.rb = bufio.NewReaderSize(m.conn, 4096)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,6 +124,11 @@ func (m *master) runReplication(restart bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := m.replConf(); err != nil {
|
||||||
|
log.Error("replconf error %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if restart {
|
if restart {
|
||||||
if err := m.fullSync(); err != nil {
|
if err := m.fullSync(); err != nil {
|
||||||
if m.conn != nil {
|
if m.conn != nil {
|
||||||
|
@ -150,8 +163,30 @@ func (m *master) runReplication(restart bool) {
|
||||||
var (
|
var (
|
||||||
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
||||||
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
|
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
|
||||||
|
replconfCmdFormat = "*3\r\n$8\r\nreplconf\r\n$14\r\nlistening-port\r\n$%d\r\n%s\r\n" //replconf listening-port port
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (m *master) replConf() error {
|
||||||
|
_, port, err := net.SplitHostPort(m.app.cfg.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := hack.Slice(fmt.Sprintf(replconfCmdFormat, len(port), port))
|
||||||
|
|
||||||
|
if _, err := m.conn.Write(cmd); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s, err := ReadStatus(m.rb); err != nil {
|
||||||
|
return err
|
||||||
|
} else if strings.ToLower(s) != "ok" {
|
||||||
|
return fmt.Errorf("not ok but %s", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *master) fullSync() error {
|
func (m *master) fullSync() error {
|
||||||
log.Info("begin full sync")
|
log.Info("begin full sync")
|
||||||
|
|
||||||
|
@ -227,6 +262,17 @@ func (m *master) sync() error {
|
||||||
|
|
||||||
buf := m.syncBuf.Bytes()
|
buf := m.syncBuf.Bytes()
|
||||||
|
|
||||||
|
if len(buf) < 8 {
|
||||||
|
return fmt.Errorf("inavlid sync size %d", len(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
m.app.info.Replication.MasterLastLogID.Set(num.BytesToUint64(buf))
|
||||||
|
|
||||||
|
var t bytes.Buffer
|
||||||
|
m.app.info.dumpReplication(&t)
|
||||||
|
|
||||||
|
buf = buf[8:]
|
||||||
|
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -284,24 +330,41 @@ func (app *App) tryReSlaveof() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *App) addSlave(c *client) {
|
func (app *App) addSlave(c *client) {
|
||||||
|
addr := c.slaveListeningAddr
|
||||||
|
|
||||||
app.slock.Lock()
|
app.slock.Lock()
|
||||||
defer app.slock.Unlock()
|
defer app.slock.Unlock()
|
||||||
|
|
||||||
app.slaves[c] = struct{}{}
|
app.slaves[addr] = c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *App) removeSlave(c *client) {
|
func (app *App) removeSlave(c *client, activeQuit bool) {
|
||||||
|
addr := c.slaveListeningAddr
|
||||||
|
|
||||||
app.slock.Lock()
|
app.slock.Lock()
|
||||||
defer app.slock.Unlock()
|
defer app.slock.Unlock()
|
||||||
|
|
||||||
if _, ok := app.slaves[c]; ok {
|
if _, ok := app.slaves[addr]; ok {
|
||||||
delete(app.slaves, c)
|
delete(app.slaves, addr)
|
||||||
log.Info("remove slave %s", c.remoteAddr)
|
log.Info("remove slave %s", addr)
|
||||||
|
if activeQuit {
|
||||||
|
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.ack != nil {
|
func (app *App) slaveAck(c *client) {
|
||||||
asyncNotifyUint64(c.ack.ch, c.lastLogID)
|
addr := c.slaveListeningAddr
|
||||||
|
|
||||||
|
app.slock.Lock()
|
||||||
|
defer app.slock.Unlock()
|
||||||
|
|
||||||
|
if _, ok := app.slaves[addr]; !ok {
|
||||||
|
//slave not add
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func asyncNotifyUint64(ch chan uint64, v uint64) {
|
func asyncNotifyUint64(ch chan uint64, v uint64) {
|
||||||
|
@ -317,51 +380,40 @@ func (app *App) publishNewLog(l *rpl.Log) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ss := make([]*client, 0, 4)
|
app.info.Replication.PubLogNum.Add(1)
|
||||||
|
|
||||||
app.slock.Lock()
|
app.slock.Lock()
|
||||||
|
|
||||||
|
total := (len(app.slaves) + 1) / 2
|
||||||
|
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
|
||||||
|
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 0
|
||||||
logId := l.ID
|
logId := l.ID
|
||||||
for s, _ := range app.slaves {
|
for _, s := range app.slaves {
|
||||||
if s.lastLogID >= logId {
|
if s.lastLogID == logId {
|
||||||
//slave has already owned this log
|
//slave has already owned this log
|
||||||
ss = []*client{}
|
n++
|
||||||
break
|
} else if s.lastLogID > logId {
|
||||||
} else {
|
log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId)
|
||||||
ss = append(ss, s)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
app.slock.Unlock()
|
app.slock.Unlock()
|
||||||
|
|
||||||
if len(ss) == 0 {
|
if n >= total {
|
||||||
|
//at least total slaves have owned this log
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
ack := &syncAck{
|
|
||||||
logId, make(chan uint64, len(ss)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range ss {
|
|
||||||
s.ack = ack
|
|
||||||
}
|
|
||||||
|
|
||||||
total := (len(ss) + 1) / 2
|
|
||||||
if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
|
|
||||||
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
|
|
||||||
}
|
|
||||||
|
|
||||||
done := make(chan struct{}, 1)
|
done := make(chan struct{}, 1)
|
||||||
go func(total int) {
|
go func(total int) {
|
||||||
n := 0
|
for i := 0; i < total; i++ {
|
||||||
for i := 0; i < len(ss); i++ {
|
id := <-app.slaveSyncAck
|
||||||
id := <-ack.ch
|
if id < logId {
|
||||||
if id > logId {
|
log.Info("some slave may close with last logid %d < %d", id, logId)
|
||||||
n++
|
|
||||||
if n >= total {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done <- struct{}{}
|
done <- struct{}{}
|
||||||
|
@ -374,6 +426,6 @@ func (app *App) publishNewLog(l *rpl.Log) {
|
||||||
}
|
}
|
||||||
|
|
||||||
stopTime := time.Now()
|
stopTime := time.Now()
|
||||||
atomic.AddInt64(&app.info.Replication.PubLogNum, 1)
|
app.info.Replication.PubLogAckNum.Add(1)
|
||||||
atomic.AddInt64(&app.info.Replication.PubLogTotalTime, stopTime.Sub(startTime).Nanoseconds()/1e6)
|
app.info.Replication.PubLogTotalAckTime.Add(stopTime.Sub(startTime))
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ var (
|
||||||
errArrayFormat = errors.New("bad array format")
|
errArrayFormat = errors.New("bad array format")
|
||||||
errBulkFormat = errors.New("bad bulk string format")
|
errBulkFormat = errors.New("bad bulk string format")
|
||||||
errLineFormat = errors.New("bad response line format")
|
errLineFormat = errors.New("bad response line format")
|
||||||
|
errStatusFormat = errors.New("bad status format")
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadLine(rb *bufio.Reader) ([]byte, error) {
|
func ReadLine(rb *bufio.Reader) ([]byte, error) {
|
||||||
|
@ -54,9 +55,26 @@ func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
|
||||||
return errBulkFormat
|
return errBulkFormat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if l[0] == '-' {
|
||||||
|
return errors.New(string(l[1:]))
|
||||||
} else {
|
} else {
|
||||||
return errBulkFormat
|
return errBulkFormat
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ReadStatus(rb *bufio.Reader) (string, error) {
|
||||||
|
l, err := ReadLine(rb)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
} else if len(l) == 0 {
|
||||||
|
return "", errStatusFormat
|
||||||
|
} else if l[0] == '+' {
|
||||||
|
return string(l[1:]), nil
|
||||||
|
} else if l[0] == '-' {
|
||||||
|
return "", errors.New(string(l[1:]))
|
||||||
|
} else {
|
||||||
|
return "", errStatusFormat
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue