add replication, test later

This commit is contained in:
siddontang 2014-06-09 17:23:32 +08:00
parent 993ccdd052
commit ef22d7000d
10 changed files with 621 additions and 159 deletions

View File

@ -35,7 +35,6 @@ timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData
*/ */
type BinLogConfig struct { type BinLogConfig struct {
Name string `json:"name"`
Path string `json:"path"` Path string `json:"path"`
MaxFileSize int `json:"max_file_size"` MaxFileSize int `json:"max_file_size"`
MaxFileNum int `json:"max_file_num"` MaxFileNum int `json:"max_file_num"`
@ -53,10 +52,6 @@ func (cfg *BinLogConfig) adjust() {
} else if cfg.MaxFileNum > MaxBinLogFileNum { } else if cfg.MaxFileNum > MaxBinLogFileNum {
cfg.MaxFileNum = MaxBinLogFileNum cfg.MaxFileNum = MaxBinLogFileNum
} }
if len(cfg.Name) == 0 {
cfg.Name = "ledis"
}
} }
type BinLog struct { type BinLog struct {
@ -68,7 +63,7 @@ type BinLog struct {
indexName string indexName string
logNames []string logNames []string
lastLogIndex int lastLogIndex int64
} }
func NewBinLog(data json.RawMessage) (*BinLog, error) { func NewBinLog(data json.RawMessage) (*BinLog, error) {
@ -128,7 +123,7 @@ func (l *BinLog) flushIndex() error {
} }
func (l *BinLog) loadIndex() error { func (l *BinLog) loadIndex() error {
l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("%s-bin.index", l.cfg.Name)) l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("ledis-bin.index"))
if _, err := os.Stat(l.indexName); os.IsNotExist(err) { if _, err := os.Stat(l.indexName); os.IsNotExist(err) {
//no index file, nothing to do //no index file, nothing to do
} else { } else {
@ -165,7 +160,7 @@ func (l *BinLog) loadIndex() error {
} else { } else {
lastName := l.logNames[len(l.logNames)-1] lastName := l.logNames[len(l.logNames)-1]
if l.lastLogIndex, err = strconv.Atoi(path.Ext(lastName)[1:]); err != nil { if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil {
log.Error("invalid logfile name %s", err.Error()) log.Error("invalid logfile name %s", err.Error())
return err return err
} }
@ -178,7 +173,7 @@ func (l *BinLog) loadIndex() error {
} }
func (l *BinLog) getLogFile() string { func (l *BinLog) getLogFile() string {
return fmt.Sprintf("%s-bin.%07d", l.cfg.Name, l.lastLogIndex) return l.FormatLogFileName(l.lastLogIndex)
} }
func (l *BinLog) openNewLogFile() error { func (l *BinLog) openNewLogFile() error {
@ -261,6 +256,22 @@ func (l *BinLog) LogFilePos() int64 {
} }
} }
func (l *BinLog) LogFileIndex() int64 {
return l.lastLogIndex
}
func (l *BinLog) FormatLogFileName(index int64) string {
return fmt.Sprintf("ledis-bin.%07d", index)
}
func (l *BinLog) FormatLogFilePath(index int64) string {
return path.Join(l.cfg.Path, fmt.Sprintf("ledis-bin.%07d", index))
}
func (l *BinLog) LogPath() string {
return l.cfg.Path
}
func (l *BinLog) Purge(n int) error { func (l *BinLog) Purge(n int) error {
if len(l.logNames) == 0 { if len(l.logNames) == 0 {
return nil return nil

View File

@ -12,7 +12,6 @@ func TestBinLog(t *testing.T) {
cfg.MaxFileNum = 1 cfg.MaxFileNum = 1
cfg.MaxFileSize = 1024 cfg.MaxFileSize = 1024
cfg.Path = "/tmp/ledis_binlog" cfg.Path = "/tmp/ledis_binlog"
cfg.Name = "ledis"
os.RemoveAll(cfg.Path) os.RemoveAll(cfg.Path)

View File

@ -4,19 +4,43 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"encoding/json"
"github.com/siddontang/go-leveldb/leveldb" "github.com/siddontang/go-leveldb/leveldb"
"io" "io"
"os" "os"
) )
//dump format //dump format
// head len(bigendian int32)|head(json format) // fileIndex(bigendian int64)|filePos(bigendian int64)
// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... // |keylen(bigendian int32)|key|valuelen(bigendian int32)|value......
type DumpHead struct { type MasterInfo struct {
LogFile string `json:"log_file"` LogFileIndex int64
LogPos int64 `json:"log_pos"` LogPos int64
}
func (m *MasterInfo) WriteTo(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil {
return err
}
return nil
}
func (m *MasterInfo) ReadFrom(r io.Reader) error {
err := binary.Read(r, binary.BigEndian, &m.LogFileIndex)
if err != nil {
return err
}
err = binary.Read(r, binary.BigEndian, &m.LogPos)
if err != nil {
return err
}
return nil
} }
func (l *Ledis) DumpFile(path string) error { func (l *Ledis) DumpFile(path string) error {
@ -31,34 +55,21 @@ func (l *Ledis) DumpFile(path string) error {
func (l *Ledis) Dump(w io.Writer) error { func (l *Ledis) Dump(w io.Writer) error {
var sp *leveldb.Snapshot var sp *leveldb.Snapshot
var logFileName string var m *MasterInfo = new(MasterInfo)
var logPos int64
if l.binlog == nil { if l.binlog == nil {
sp = l.ldb.NewSnapshot() sp = l.ldb.NewSnapshot()
} else { } else {
l.Lock() l.Lock()
sp = l.ldb.NewSnapshot() sp = l.ldb.NewSnapshot()
logFileName = l.binlog.LogFileName() m.LogFileIndex = l.binlog.LogFileIndex()
logPos = l.binlog.LogFilePos() m.LogPos = l.binlog.LogFilePos()
l.Unlock() l.Unlock()
} }
var head = DumpHead{ var err error
LogFile: logFileName,
LogPos: logPos,
}
data, err := json.Marshal(&head)
if err != nil {
return err
}
wb := bufio.NewWriterSize(w, 4096) wb := bufio.NewWriterSize(w, 4096)
if err = binary.Write(wb, binary.BigEndian, uint32(len(data))); err != nil { if err = m.WriteTo(wb); err != nil {
return err
}
if _, err = wb.Write(data); err != nil {
return err return err
} }
@ -93,7 +104,7 @@ func (l *Ledis) Dump(w io.Writer) error {
return nil return nil
} }
func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) { func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) {
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -103,28 +114,19 @@ func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) {
return l.LoadDump(f) return l.LoadDump(f)
} }
func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
info := new(MasterInfo)
rb := bufio.NewReaderSize(r, 4096) rb := bufio.NewReaderSize(r, 4096)
var headLen uint32 err := info.ReadFrom(rb)
err := binary.Read(rb, binary.BigEndian, &headLen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
buf := make([]byte, headLen)
if _, err = io.ReadFull(rb, buf); err != nil {
return nil, err
}
var head DumpHead
if err = json.Unmarshal(buf, &head); err != nil {
return nil, err
}
var keyLen uint16 var keyLen uint16
var valueLen uint32 var valueLen uint32
@ -161,5 +163,5 @@ func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) {
valueBuf.Reset() valueBuf.Reset()
} }
return &head, nil return info, nil
} }

View File

@ -12,6 +12,7 @@ import (
var ( var (
errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogEvent = errors.New("invalid binglog event")
errInvalidBinLogFile = errors.New("invalid binlog file")
) )
func (l *Ledis) ReplicateEvent(event []byte) error { func (l *Ledis) ReplicateEvent(event []byte) error {
@ -70,63 +71,154 @@ func (l *Ledis) replicateCommandEvent(event []byte) error {
return errors.New("command event not supported now") return errors.New("command event not supported now")
} }
func (l *Ledis) RepliateFromBinLog(filePath string, offset int64) (int64, error) { func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
f, err := os.Open(filePath)
if err != nil {
return 0, err
}
defer f.Close()
st, _ := f.Stat()
totalSize := st.Size()
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
return 0, err
}
rb := bufio.NewReaderSize(f, 4096)
var createTime uint32 var createTime uint32
var dataLen uint32 var dataLen uint32
var dataBuf bytes.Buffer var dataBuf bytes.Buffer
var err error
for { for {
if offset+8 > totalSize {
//event may not sync completely
return f.Seek(offset, os.SEEK_SET)
}
if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil { if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil {
return 0, err if err == io.EOF {
break
} else {
return err
}
} }
if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil { if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil {
return 0, err return err
} }
if offset+8+int64(dataLen) > totalSize {
//event may not sync completely
return f.Seek(offset, os.SEEK_SET)
} else {
if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil { if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil {
return 0, err return err
} }
l.Lock()
err = l.ReplicateEvent(dataBuf.Bytes()) err = l.ReplicateEvent(dataBuf.Bytes())
l.Unlock()
if err != nil { if err != nil {
log.Fatal("replication error %s, skip to next", err.Error()) log.Fatal("replication error %s, skip to next", err.Error())
} }
dataBuf.Reset() dataBuf.Reset()
}
offset += (8 + int64(dataLen)) return nil
}
func (l *Ledis) ReplicateFromData(data []byte) error {
rb := bytes.NewReader(data)
l.Lock()
err := l.ReplicateFromReader(rb)
l.Unlock()
return err
}
func (l *Ledis) ReplicateFromBinLog(filePath string) error {
f, err := os.Open(filePath)
if err != nil {
return err
}
rb := bufio.NewReaderSize(f, 4096)
err = l.ReplicateFromReader(rb)
f.Close()
return err
}
const maxSyncEvents = 16
//events data format
// nextfileIndex(bigendian int64)|nextfilePos(bigendian int64)|binlogevents
func (l *Ledis) ReadEventsTo(index int64, offset int64, w io.Writer) (info *MasterInfo, err error) {
info = new(MasterInfo)
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
return
}
info.LogFileIndex = index
info.LogPos = offset
filePath := l.binlog.FormatLogFilePath(index)
var f *os.File
f, err = os.Open(filePath)
if err != nil && !os.IsNotExist(err) {
return
} else if os.IsNotExist(err) {
l.Lock()
lastIndex := l.binlog.LogFileIndex()
if index == lastIndex {
//no binlog at all
l.Unlock()
return
}
l.Unlock()
//slave binlog info had lost
info.LogFileIndex = -1
}
defer f.Close()
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
//may be invliad seek offset
return
}
var lastCreateTime uint32 = 0
var createTime uint32
var dataLen uint32
var n int = 0
for {
if err = binary.Read(f, binary.BigEndian, &createTime); err != nil {
if err == io.EOF {
//we will try to use next binlog
info.LogFileIndex = index + 1
info.LogPos = 0
return
} else {
return
} }
} }
//can not go here??? n++
log.Error("can not go here") if lastCreateTime == 0 {
return offset, nil lastCreateTime = createTime
} else if lastCreateTime != createTime {
return
} else if n > maxSyncEvents {
return
}
if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil {
return
}
if err = binary.Write(w, binary.BigEndian, createTime); err != nil {
return
}
if err = binary.Write(w, binary.BigEndian, dataLen); err != nil {
return
}
if _, err = io.CopyN(w, f, int64(dataLen)); err != nil {
return
}
info.LogPos = info.LogPos + 8 + int64(dataLen)
}
return
} }

View File

@ -40,16 +40,9 @@ func TestReplication(t *testing.T) {
binLogName := "/tmp/test_repl/master/bin_log/ledis-bin.0000001" binLogName := "/tmp/test_repl/master/bin_log/ledis-bin.0000001"
var offset int64 err = slave.ReplicateFromBinLog(binLogName)
offset, err = slave.RepliateFromBinLog(binLogName, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else {
if st, err := os.Stat(binLogName); err != nil {
t.Fatal(err)
} else if st.Size() != offset {
t.Fatal(st.Size(), offset)
}
} }
it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1) it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1)

View File

@ -19,7 +19,7 @@ type App struct {
quit chan struct{} quit chan struct{}
//for slave replication //for slave replication
master masterInfo m *master
} }
func NewApp(cfg *Config) (*App, error) { func NewApp(cfg *Config) (*App, error) {
@ -55,6 +55,8 @@ func NewApp(cfg *Config) (*App, error) {
return nil, err return nil, err
} }
app.m = newMaster(app)
return app, nil return app, nil
} }
@ -69,12 +71,14 @@ func (app *App) Close() {
app.listener.Close() app.listener.Close()
app.m.Close()
app.ldb.Close() app.ldb.Close()
} }
func (app *App) Run() { func (app *App) Run() {
if len(app.cfg.SlaveOf) > 0 { if len(app.cfg.SlaveOf) > 0 {
app.runReplication() app.slaveof(app.cfg.SlaveOf)
} }
for !app.closed { for !app.closed {

View File

@ -2,6 +2,7 @@ package server
import ( import (
"bufio" "bufio"
"bytes"
"errors" "errors"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
@ -28,6 +29,8 @@ type client struct {
args [][]byte args [][]byte
reqC chan error reqC chan error
syncBuf bytes.Buffer
} }
func newClient(c net.Conn, app *App) { func newClient(c net.Conn, app *App) {
@ -71,22 +74,7 @@ func (c *client) run() {
} }
func (c *client) readLine() ([]byte, error) { func (c *client) readLine() ([]byte, error) {
var line []byte return readLine(c.rb)
for {
l, more, err := c.rb.ReadLine()
if err != nil {
return nil, err
}
if line == nil && !more {
return l, nil
}
line = append(line, l...)
if !more {
break
}
}
return line, nil
} }
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings. //A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
@ -121,15 +109,19 @@ func (c *client) readRequest() ([][]byte, error) {
} else if n == -1 { } else if n == -1 {
req = append(req, nil) req = append(req, nil)
} else { } else {
buf := make([]byte, n+2) buf := make([]byte, n)
if _, err = io.ReadFull(c.rb, buf); err != nil { if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err return nil, err
} else if buf[len(buf)-2] != '\r' || buf[len(buf)-1] != '\n' {
return nil, errReadRequest
} else {
req = append(req, buf[0:len(buf)-2])
} }
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
req = append(req, buf)
} }
} else { } else {
@ -226,3 +218,12 @@ func (c *client) writeArray(ay []interface{}) {
} }
} }
} }
func (c *client) writeBulkFrom(n int64, rb io.Reader) {
c.wb.WriteByte('$')
c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10)))
c.wb.Write(Delims)
io.Copy(c.wb, rb)
c.wb.Write(Delims)
}

View File

@ -1,8 +1,11 @@
package server package server
import ( import (
"encoding/binary"
"fmt" "fmt"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"io/ioutil"
"os"
"strconv" "strconv"
"strings" "strings"
) )
@ -35,3 +38,77 @@ func slaveofCommand(c *client) error {
return nil return nil
} }
func fullsyncCommand(c *client) error {
//todo, multi fullsync may use same dump file
dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_")
if err != nil {
return err
}
if err = c.app.ldb.Dump(dumpFile); err != nil {
return err
}
st, _ := dumpFile.Stat()
n := st.Size()
dumpFile.Seek(0, os.SEEK_SET)
c.writeBulkFrom(n, dumpFile)
name := dumpFile.Name()
dumpFile.Close()
os.Remove(name)
return nil
}
var reserveInfoSpace = make([]byte, 16)
func syncCommand(c *client) error {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
var logIndex int64
var logPos int64
var err error
logIndex, err = ledis.StrInt64(args[0], nil)
if err != nil {
return ErrCmdParams
}
logPos, err = ledis.StrInt64(args[1], nil)
if err != nil {
return ErrCmdParams
}
c.syncBuf.Reset()
//reserve space to write master info
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
return err
}
if m, err := c.app.ldb.ReadEventsTo(logIndex, logPos, &c.syncBuf); err != nil {
return err
} else {
buf := c.syncBuf.Bytes()
binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex))
binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos))
c.writeBulk(buf)
}
return nil
}
func init() {
register("slaveof", slaveofCommand)
register("fullsync", fullsyncCommand)
register("sync", syncCommand)
}

View File

@ -1,25 +1,77 @@
package server package server
import ( import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json" "encoding/json"
"errors"
"fmt"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"path" "path"
"strconv"
"sync"
"time"
) )
type masterInfo struct { var (
Addr string `json:"addr"` errConnectMaster = errors.New("connect master error")
LogFile string `json:"log_name"` )
LogPos int64 `json:"log_pos"`
type master struct {
sync.Mutex
addr string `json:"addr"`
logFileIndex int64 `json:"log_file_index"`
logPos int64 `json:"log_pos"`
c net.Conn
rb *bufio.Reader
app *App
quit chan struct{}
infoName string
infoNameBak string
wg sync.WaitGroup
syncBuf bytes.Buffer
} }
func (app *App) getMasterInfoName() string { func newMaster(app *App) *master {
return path.Join(app.cfg.DataDir, "master.info") m := new(master)
m.app = app
m.infoName = path.Join(m.app.cfg.DataDir, "master.info")
m.infoNameBak = fmt.Sprintf("%s.bak", m.infoName)
m.quit = make(chan struct{})
//if load error, we will start a fullsync later
m.loadInfo()
return m
} }
func (app *App) loadMasterInfo() error { func (m *master) Close() {
data, err := ioutil.ReadFile(app.getMasterInfoName()) close(m.quit)
if m.c != nil {
m.c.Close()
m.c = nil
}
m.wg.Wait()
}
func (m *master) loadInfo() error {
data, err := ioutil.ReadFile(m.infoName)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
@ -28,23 +80,21 @@ func (app *App) loadMasterInfo() error {
} }
} }
if err = json.Unmarshal(data, &app.master); err != nil { if err = json.Unmarshal(data, m); err != nil {
return err return err
} }
return nil return nil
} }
func (app *App) saveMasterInfo() error { func (m *master) saveInfo() error {
bakName := path.Join(app.cfg.DataDir, "master.info.bak") data, err := json.Marshal(m)
data, err := json.Marshal(&app.master)
if err != nil { if err != nil {
return err return err
} }
var fd *os.File var fd *os.File
fd, err = os.OpenFile(bakName, os.O_CREATE|os.O_WRONLY, os.ModePerm) fd, err = os.OpenFile(m.infoNameBak, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil { if err != nil {
return err return err
} }
@ -55,32 +105,208 @@ func (app *App) saveMasterInfo() error {
} }
fd.Close() fd.Close()
return os.Rename(bakName, app.getMasterInfoName()) return os.Rename(m.infoNameBak, m.infoName)
} }
func (app *App) slaveof(masterAddr string) error { func (m *master) connect() error {
if len(masterAddr) == 0 { if len(m.addr) == 0 {
//stop replication return fmt.Errorf("no assign master addr")
}
if m.c != nil {
m.c.Close()
m.c = nil
}
if c, err := net.Dial("tcp", m.addr); err != nil {
return err
} else { } else {
} m.c = c
m.rb = bufio.NewReaderSize(m.c, 4096)
}
return nil return nil
} }
func (app *App) runReplication() { func (m *master) resetInfo(addr string) {
m.addr = addr
m.logFileIndex = 0
m.logPos = 0
} }
func (app *App) startReplication(masterAddr string) error { func (m *master) stopReplication() error {
if err := app.loadMasterInfo(); err != nil { m.Close()
log.Error("load master.info error %s, use fullsync", err.Error())
app.master = masterInfo{masterAddr, "", 0} if err := m.saveInfo(); err != nil {
} else if app.master.Addr != masterAddr { log.Error("save master info error %s", err.Error())
if err := app.ldb.FlushAll(); err != nil {
log.Error("replication flush old data error %s", err.Error())
return err return err
} }
app.master = masterInfo{masterAddr, "", 0} return nil
}
func (m *master) startReplication(masterAddr string) error {
//stop last replcation, if avaliable
m.Close()
if masterAddr != m.addr {
m.resetInfo(masterAddr)
if err := m.saveInfo(); err != nil {
log.Error("save master info error %s", err.Error())
return err
}
}
m.quit = make(chan struct{})
go m.runReplication()
return nil
}
func (m *master) runReplication() {
m.wg.Add(1)
defer m.wg.Done()
for {
select {
case <-m.quit:
return
default:
if err := m.connect(); err != nil {
log.Error("connect master %s error %s, try 2s later", m.addr, err.Error())
time.Sleep(2 * time.Second)
continue
}
}
if m.logFileIndex == 0 {
//try a fullsync
if err := m.fullSync(); err != nil {
log.Warn("full sync error %s", err.Error())
return
}
if m.logFileIndex == 0 {
//master not support binlog, we cannot sync, so stop replication
m.stopReplication()
return
}
}
t := time.NewTicker(1 * time.Second)
//then we will try sync every 1 seconds
for {
select {
case <-t.C:
if err := m.sync(); err != nil {
log.Warn("sync error %s", err.Error())
return
}
case <-m.quit:
return
}
}
}
return
}
var (
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
syncCmdFormat = "*3\r\n$4\r\nsync\r\n$%d\r\n%s\r\n%d\r\n%s\r\n" //sync file pos
)
func (m *master) fullSync() error {
if _, err := m.c.Write(fullSyncCmd); err != nil {
return err
}
dumpPath := path.Join(m.app.cfg.DataDir, "master.dump")
f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer os.Remove(dumpPath)
err = readBulkTo(m.rb, f)
f.Close()
if err != nil {
log.Error("read dump data error %s", err.Error())
return err
}
if err = m.app.ldb.FlushAll(); err != nil {
return err
}
var head *ledis.MasterInfo
head, err = m.app.ldb.LoadDumpFile(dumpPath)
if err != nil {
log.Error("load dump file error %s", err.Error())
return err
}
m.logFileIndex = head.LogFileIndex
m.logPos = head.LogPos
return nil
}
func (m *master) sync() error {
logIndexStr := strconv.FormatInt(m.logFileIndex, 10)
logPosStr := strconv.FormatInt(m.logPos, 10)
if _, err := m.c.Write(ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr))); err != nil {
return err
}
m.syncBuf.Reset()
err := readBulkTo(m.rb, &m.syncBuf)
if err != nil {
return err
}
err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logFileIndex)
if err != nil {
return err
}
err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logPos)
if err != nil {
return err
}
if m.logFileIndex == 0 {
//master now not support binlog, stop replication
m.stopReplication()
return nil
} else if m.logFileIndex == -1 {
//-1 means than binlog index and pos are lost, we must start a full sync instead
return m.fullSync()
}
err = m.app.ldb.ReplicateFromReader(&m.syncBuf)
if err != nil {
return err
}
return nil
}
func (app *App) slaveof(masterAddr string) error {
app.m.Lock()
defer app.m.Unlock()
if len(masterAddr) == 0 {
return app.m.stopReplication()
} else {
return app.m.startReplication(masterAddr)
} }
return nil return nil

57
server/util.go Normal file
View File

@ -0,0 +1,57 @@
package server
import (
"bufio"
"errors"
"github.com/siddontang/ledisdb/ledis"
"io"
"strconv"
)
var (
errArrayFormat = errors.New("bad array format")
errBulkFormat = errors.New("bad bulk string format")
errLineFormat = errors.New("bad response line format")
)
func readLine(rb *bufio.Reader) ([]byte, error) {
p, err := rb.ReadSlice('\n')
if err != nil {
return nil, err
}
i := len(p) - 2
if i < 0 || p[i] != '\r' {
return nil, errLineFormat
}
return p[:i], nil
}
func readBulkTo(rb *bufio.Reader, w io.Writer) error {
l, err := readLine(rb)
if len(l) == 0 {
return errArrayFormat
} else if l[0] == '$' {
var n int
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return err
} else if n == -1 {
return nil
} else {
if _, err = io.CopyN(w, rb, int64(n)); err != nil {
return err
}
if l, err = readLine(rb); err != nil {
return err
} else if len(l) != 0 {
return errBulkFormat
}
}
} else {
return errArrayFormat
}
return nil
}