forked from mirror/ledisdb
reduce replication code
This commit is contained in:
parent
4db3dbb6f5
commit
e3765035ce
|
@ -1,19 +1,17 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/siddontang/go/hack"
|
||||
"github.com/siddontang/go/log"
|
||||
"github.com/siddontang/go/num"
|
||||
goledis "github.com/siddontang/ledisdb/client/go/ledis"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"github.com/siddontang/ledisdb/rpl"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -26,8 +24,7 @@ var (
|
|||
type master struct {
|
||||
sync.Mutex
|
||||
|
||||
conn net.Conn
|
||||
rb *bufio.Reader
|
||||
conn *goledis.Conn
|
||||
|
||||
app *App
|
||||
|
||||
|
@ -49,16 +46,12 @@ func newMaster(app *App) *master {
|
|||
return m
|
||||
}
|
||||
|
||||
var (
|
||||
quitCmd = []byte("*1\r\n$4\r\nquit\r\n")
|
||||
)
|
||||
|
||||
func (m *master) Close() {
|
||||
ledis.AsyncNotify(m.quit)
|
||||
|
||||
if m.conn != nil {
|
||||
//for replication, we send quit command to close gracefully
|
||||
m.conn.Write(quitCmd)
|
||||
m.conn.Send("quit")
|
||||
|
||||
m.conn.Close()
|
||||
m.conn = nil
|
||||
|
@ -67,7 +60,7 @@ func (m *master) Close() {
|
|||
m.wg.Wait()
|
||||
}
|
||||
|
||||
func (m *master) connect() error {
|
||||
func (m *master) resetConn() error {
|
||||
if len(m.addr) == 0 {
|
||||
return fmt.Errorf("no assign master addr")
|
||||
}
|
||||
|
@ -77,13 +70,7 @@ func (m *master) connect() error {
|
|||
m.conn = nil
|
||||
}
|
||||
|
||||
if conn, err := net.Dial("tcp", m.addr); err != nil {
|
||||
return err
|
||||
} else {
|
||||
m.conn = conn
|
||||
|
||||
m.rb = bufio.NewReaderSize(m.conn, 4096)
|
||||
}
|
||||
m.conn = goledis.NewConn(m.addr)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -112,13 +99,18 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
|
|||
func (m *master) runReplication(restart bool) {
|
||||
defer m.wg.Done()
|
||||
|
||||
if err := m.resetConn(); err != nil {
|
||||
log.Error("reset conn error %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.quit:
|
||||
return
|
||||
default:
|
||||
if err := m.connect(); err != nil {
|
||||
log.Error("connect master %s error %s, try 2s later", m.addr, err.Error())
|
||||
if _, err := m.conn.Do("ping"); err != nil {
|
||||
log.Error("ping master %s error %s, try 2s later", m.addr, err.Error())
|
||||
time.Sleep(2 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
@ -160,27 +152,15 @@ func (m *master) runReplication(restart bool) {
|
|||
return
|
||||
}
|
||||
|
||||
var (
|
||||
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
||||
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
|
||||
replconfCmdFormat = "*3\r\n$8\r\nreplconf\r\n$14\r\nlistening-port\r\n$%d\r\n%s\r\n" //replconf listening-port port
|
||||
)
|
||||
|
||||
func (m *master) replConf() error {
|
||||
_, port, err := net.SplitHostPort(m.app.cfg.Addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd := hack.Slice(fmt.Sprintf(replconfCmdFormat, len(port), port))
|
||||
|
||||
if _, err := m.conn.Write(cmd); err != nil {
|
||||
if s, err := goledis.String(m.conn.Do("replconf", "listening-port", port)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s, err := ReadStatus(m.rb); err != nil {
|
||||
return err
|
||||
} else if strings.ToLower(s) != "ok" {
|
||||
} else if strings.ToUpper(s) != "OK" {
|
||||
return fmt.Errorf("not ok but %s", s)
|
||||
}
|
||||
|
||||
|
@ -190,7 +170,7 @@ func (m *master) replConf() error {
|
|||
func (m *master) fullSync() error {
|
||||
log.Info("begin full sync")
|
||||
|
||||
if _, err := m.conn.Write(fullSyncCmd); err != nil {
|
||||
if err := m.conn.Send("fullsync"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -202,7 +182,7 @@ func (m *master) fullSync() error {
|
|||
|
||||
defer os.Remove(dumpPath)
|
||||
|
||||
err = ReadBulkTo(m.rb, f)
|
||||
err = m.conn.ReceiveBulkTo(f)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
log.Error("read dump data error %s", err.Error())
|
||||
|
@ -237,18 +217,13 @@ func (m *master) sync() error {
|
|||
return err
|
||||
}
|
||||
|
||||
logIDStr := strconv.FormatUint(syncID, 10)
|
||||
|
||||
cmd := hack.Slice(fmt.Sprintf(syncCmdFormat, len(logIDStr),
|
||||
logIDStr))
|
||||
|
||||
if _, err := m.conn.Write(cmd); err != nil {
|
||||
if err := m.conn.Send("sync", syncID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.syncBuf.Reset()
|
||||
|
||||
if err = ReadBulkTo(m.rb, &m.syncBuf); err != nil {
|
||||
if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil {
|
||||
switch err.Error() {
|
||||
case ledis.ErrLogMissed.Error():
|
||||
return m.fullSync()
|
||||
|
|
|
@ -3,16 +3,10 @@ package server
|
|||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"github.com/siddontang/go/hack"
|
||||
"io"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var (
|
||||
errArrayFormat = errors.New("bad array format")
|
||||
errBulkFormat = errors.New("bad bulk string format")
|
||||
errLineFormat = errors.New("bad response line format")
|
||||
errStatusFormat = errors.New("bad status format")
|
||||
errLineFormat = errors.New("bad response line format")
|
||||
)
|
||||
|
||||
func ReadLine(rb *bufio.Reader) ([]byte, error) {
|
||||
|
@ -27,54 +21,3 @@ func ReadLine(rb *bufio.Reader) ([]byte, error) {
|
|||
}
|
||||
return p[:i], nil
|
||||
}
|
||||
|
||||
func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
|
||||
l, err := ReadLine(rb)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(l) == 0 {
|
||||
return errBulkFormat
|
||||
} else if l[0] == '$' {
|
||||
var n int
|
||||
//handle resp string
|
||||
if n, err = strconv.Atoi(hack.String(l[1:])); err != nil {
|
||||
return err
|
||||
} else if n == -1 {
|
||||
return nil
|
||||
} else {
|
||||
var nn int64
|
||||
if nn, err = io.CopyN(w, rb, int64(n)); err != nil {
|
||||
return err
|
||||
} else if nn != int64(n) {
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
|
||||
if l, err = ReadLine(rb); err != nil {
|
||||
return err
|
||||
} else if len(l) != 0 {
|
||||
return errBulkFormat
|
||||
}
|
||||
}
|
||||
} else if l[0] == '-' {
|
||||
return errors.New(string(l[1:]))
|
||||
} else {
|
||||
return errBulkFormat
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadStatus(rb *bufio.Reader) (string, error) {
|
||||
l, err := ReadLine(rb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if len(l) == 0 {
|
||||
return "", errStatusFormat
|
||||
} else if l[0] == '+' {
|
||||
return string(l[1:]), nil
|
||||
} else if l[0] == '-' {
|
||||
return "", errors.New(string(l[1:]))
|
||||
} else {
|
||||
return "", errStatusFormat
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue