diff --git a/ledis/binlog.go b/ledis/binlog.go index c5df748..c7f534c 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -35,7 +35,6 @@ timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData */ type BinLogConfig struct { - Name string `json:"name"` Path string `json:"path"` MaxFileSize int `json:"max_file_size"` MaxFileNum int `json:"max_file_num"` @@ -53,10 +52,6 @@ func (cfg *BinLogConfig) adjust() { } else if cfg.MaxFileNum > MaxBinLogFileNum { cfg.MaxFileNum = MaxBinLogFileNum } - - if len(cfg.Name) == 0 { - cfg.Name = "ledis" - } } type BinLog struct { @@ -68,7 +63,7 @@ type BinLog struct { indexName string logNames []string - lastLogIndex int + lastLogIndex int64 } func NewBinLog(data json.RawMessage) (*BinLog, error) { @@ -128,7 +123,7 @@ func (l *BinLog) flushIndex() error { } func (l *BinLog) loadIndex() error { - l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("%s-bin.index", l.cfg.Name)) + l.indexName = path.Join(l.cfg.Path, fmt.Sprintf("ledis-bin.index")) if _, err := os.Stat(l.indexName); os.IsNotExist(err) { //no index file, nothing to do } else { @@ -165,7 +160,7 @@ func (l *BinLog) loadIndex() error { } else { lastName := l.logNames[len(l.logNames)-1] - if l.lastLogIndex, err = strconv.Atoi(path.Ext(lastName)[1:]); err != nil { + if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { log.Error("invalid logfile name %s", err.Error()) return err } @@ -178,7 +173,7 @@ func (l *BinLog) loadIndex() error { } func (l *BinLog) getLogFile() string { - return fmt.Sprintf("%s-bin.%07d", l.cfg.Name, l.lastLogIndex) + return l.FormatLogFileName(l.lastLogIndex) } func (l *BinLog) openNewLogFile() error { @@ -261,6 +256,22 @@ func (l *BinLog) LogFilePos() int64 { } } +func (l *BinLog) LogFileIndex() int64 { + return l.lastLogIndex +} + +func (l *BinLog) FormatLogFileName(index int64) string { + return fmt.Sprintf("ledis-bin.%07d", index) +} + +func (l *BinLog) FormatLogFilePath(index int64) string { + return path.Join(l.cfg.Path, fmt.Sprintf("ledis-bin.%07d", index)) +} + +func (l *BinLog) LogPath() string { + return l.cfg.Path +} + func (l *BinLog) Purge(n int) error { if len(l.logNames) == 0 { return nil diff --git a/ledis/binlog_test.go b/ledis/binlog_test.go index 995a8d1..7fc89b4 100644 --- a/ledis/binlog_test.go +++ b/ledis/binlog_test.go @@ -12,7 +12,6 @@ func TestBinLog(t *testing.T) { cfg.MaxFileNum = 1 cfg.MaxFileSize = 1024 cfg.Path = "/tmp/ledis_binlog" - cfg.Name = "ledis" os.RemoveAll(cfg.Path) diff --git a/ledis/dump.go b/ledis/dump.go index 2cb08af..47bca19 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -4,19 +4,43 @@ import ( "bufio" "bytes" "encoding/binary" - "encoding/json" "github.com/siddontang/go-leveldb/leveldb" "io" "os" ) //dump format -// head len(bigendian int32)|head(json format) +// fileIndex(bigendian int64)|filePos(bigendian int64) // |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... -type DumpHead struct { - LogFile string `json:"log_file"` - LogPos int64 `json:"log_pos"` +type MasterInfo struct { + LogFileIndex int64 + LogPos int64 +} + +func (m *MasterInfo) WriteTo(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil { + return err + } + return nil +} + +func (m *MasterInfo) ReadFrom(r io.Reader) error { + err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) + if err != nil { + return err + } + + err = binary.Read(r, binary.BigEndian, &m.LogPos) + if err != nil { + return err + } + + return nil } func (l *Ledis) DumpFile(path string) error { @@ -31,34 +55,21 @@ func (l *Ledis) DumpFile(path string) error { func (l *Ledis) Dump(w io.Writer) error { var sp *leveldb.Snapshot - var logFileName string - var logPos int64 + var m *MasterInfo = new(MasterInfo) if l.binlog == nil { sp = l.ldb.NewSnapshot() } else { l.Lock() sp = l.ldb.NewSnapshot() - logFileName = l.binlog.LogFileName() - logPos = l.binlog.LogFilePos() + m.LogFileIndex = l.binlog.LogFileIndex() + m.LogPos = l.binlog.LogFilePos() l.Unlock() } - var head = DumpHead{ - LogFile: logFileName, - LogPos: logPos, - } - - data, err := json.Marshal(&head) - if err != nil { - return err - } + var err error wb := bufio.NewWriterSize(w, 4096) - if err = binary.Write(wb, binary.BigEndian, uint32(len(data))); err != nil { - return err - } - - if _, err = wb.Write(data); err != nil { + if err = m.WriteTo(wb); err != nil { return err } @@ -93,7 +104,7 @@ func (l *Ledis) Dump(w io.Writer) error { return nil } -func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) { +func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -103,28 +114,19 @@ func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) { return l.LoadDump(f) } -func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { +func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { l.Lock() defer l.Unlock() + info := new(MasterInfo) + rb := bufio.NewReaderSize(r, 4096) - var headLen uint32 - err := binary.Read(rb, binary.BigEndian, &headLen) + err := info.ReadFrom(rb) if err != nil { return nil, err } - buf := make([]byte, headLen) - if _, err = io.ReadFull(rb, buf); err != nil { - return nil, err - } - - var head DumpHead - if err = json.Unmarshal(buf, &head); err != nil { - return nil, err - } - var keyLen uint16 var valueLen uint32 @@ -161,5 +163,5 @@ func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { valueBuf.Reset() } - return &head, nil + return info, nil } diff --git a/ledis/replication.go b/ledis/replication.go index 9a2cb1b..045ee01 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -12,6 +12,7 @@ import ( var ( errInvalidBinLogEvent = errors.New("invalid binglog event") + errInvalidBinLogFile = errors.New("invalid binlog file") ) func (l *Ledis) ReplicateEvent(event []byte) error { @@ -70,63 +71,154 @@ func (l *Ledis) replicateCommandEvent(event []byte) error { return errors.New("command event not supported now") } -func (l *Ledis) RepliateFromBinLog(filePath string, offset int64) (int64, error) { - f, err := os.Open(filePath) - if err != nil { - return 0, err +func (l *Ledis) ReplicateFromReader(rb io.Reader) error { + var createTime uint32 + var dataLen uint32 + var dataBuf bytes.Buffer + var err error + + for { + if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil { + return err + } + + if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil { + return err + } + + err = l.ReplicateEvent(dataBuf.Bytes()) + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + } + + dataBuf.Reset() } - defer f.Close() + return nil +} - st, _ := f.Stat() - totalSize := st.Size() +func (l *Ledis) ReplicateFromData(data []byte) error { + rb := bytes.NewReader(data) - if _, err = f.Seek(offset, os.SEEK_SET); err != nil { - return 0, err + l.Lock() + err := l.ReplicateFromReader(rb) + l.Unlock() + + return err +} + +func (l *Ledis) ReplicateFromBinLog(filePath string) error { + f, err := os.Open(filePath) + if err != nil { + return err } rb := bufio.NewReaderSize(f, 4096) - var createTime uint32 - var dataLen uint32 - var dataBuf bytes.Buffer + err = l.ReplicateFromReader(rb) - for { - if offset+8 > totalSize { - //event may not sync completely - return f.Seek(offset, os.SEEK_SET) - } + f.Close() - if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil { - return 0, err - } + return err +} - if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil { - return 0, err - } +const maxSyncEvents = 16 - if offset+8+int64(dataLen) > totalSize { - //event may not sync completely - return f.Seek(offset, os.SEEK_SET) - } else { - if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil { - return 0, err - } +//events data format +// nextfileIndex(bigendian int64)|nextfilePos(bigendian int64)|binlogevents +func (l *Ledis) ReadEventsTo(index int64, offset int64, w io.Writer) (info *MasterInfo, err error) { + info = new(MasterInfo) - l.Lock() - err = l.ReplicateEvent(dataBuf.Bytes()) - l.Unlock() - if err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) - } - - dataBuf.Reset() - - offset += (8 + int64(dataLen)) - } + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + return } - //can not go here??? - log.Error("can not go here") - return offset, nil + info.LogFileIndex = index + info.LogPos = offset + + filePath := l.binlog.FormatLogFilePath(index) + + var f *os.File + f, err = os.Open(filePath) + if err != nil && !os.IsNotExist(err) { + return + } else if os.IsNotExist(err) { + l.Lock() + lastIndex := l.binlog.LogFileIndex() + if index == lastIndex { + //no binlog at all + l.Unlock() + return + } + l.Unlock() + + //slave binlog info had lost + info.LogFileIndex = -1 + } + + defer f.Close() + + if _, err = f.Seek(offset, os.SEEK_SET); err != nil { + //may be invliad seek offset + return + } + + var lastCreateTime uint32 = 0 + var createTime uint32 + var dataLen uint32 + + var n int = 0 + + for { + if err = binary.Read(f, binary.BigEndian, &createTime); err != nil { + if err == io.EOF { + //we will try to use next binlog + info.LogFileIndex = index + 1 + info.LogPos = 0 + + return + } else { + return + } + } + + n++ + if lastCreateTime == 0 { + lastCreateTime = createTime + } else if lastCreateTime != createTime { + return + } else if n > maxSyncEvents { + return + } + + if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, createTime); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, dataLen); err != nil { + return + } + + if _, err = io.CopyN(w, f, int64(dataLen)); err != nil { + return + } + + info.LogPos = info.LogPos + 8 + int64(dataLen) + } + + return } diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 2e4b977..f722bc0 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -40,16 +40,9 @@ func TestReplication(t *testing.T) { binLogName := "/tmp/test_repl/master/bin_log/ledis-bin.0000001" - var offset int64 - offset, err = slave.RepliateFromBinLog(binLogName, 0) + err = slave.ReplicateFromBinLog(binLogName) if err != nil { t.Fatal(err) - } else { - if st, err := os.Stat(binLogName); err != nil { - t.Fatal(err) - } else if st.Size() != offset { - t.Fatal(st.Size(), offset) - } } it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1) diff --git a/server/app.go b/server/app.go index 1b745b6..3b28e7d 100644 --- a/server/app.go +++ b/server/app.go @@ -19,7 +19,7 @@ type App struct { quit chan struct{} //for slave replication - master masterInfo + m *master } func NewApp(cfg *Config) (*App, error) { @@ -55,6 +55,8 @@ func NewApp(cfg *Config) (*App, error) { return nil, err } + app.m = newMaster(app) + return app, nil } @@ -69,12 +71,14 @@ func (app *App) Close() { app.listener.Close() + app.m.Close() + app.ldb.Close() } func (app *App) Run() { if len(app.cfg.SlaveOf) > 0 { - app.runReplication() + app.slaveof(app.cfg.SlaveOf) } for !app.closed { diff --git a/server/client.go b/server/client.go index 40533b0..8a7ed6e 100644 --- a/server/client.go +++ b/server/client.go @@ -2,6 +2,7 @@ package server import ( "bufio" + "bytes" "errors" "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/ledis" @@ -28,6 +29,8 @@ type client struct { args [][]byte reqC chan error + + syncBuf bytes.Buffer } func newClient(c net.Conn, app *App) { @@ -71,22 +74,7 @@ func (c *client) run() { } func (c *client) readLine() ([]byte, error) { - var line []byte - for { - l, more, err := c.rb.ReadLine() - if err != nil { - return nil, err - } - - if line == nil && !more { - return l, nil - } - line = append(line, l...) - if !more { - break - } - } - return line, nil + return readLine(c.rb) } //A client sends to the Redis server a RESP Array consisting of just Bulk Strings. @@ -121,15 +109,19 @@ func (c *client) readRequest() ([][]byte, error) { } else if n == -1 { req = append(req, nil) } else { - buf := make([]byte, n+2) + buf := make([]byte, n) if _, err = io.ReadFull(c.rb, buf); err != nil { return nil, err - } else if buf[len(buf)-2] != '\r' || buf[len(buf)-1] != '\n' { - return nil, errReadRequest - - } else { - req = append(req, buf[0:len(buf)-2]) } + + if l, err = c.readLine(); err != nil { + return nil, err + } else if len(l) != 0 { + return nil, errors.New("bad bulk string format") + } + + req = append(req, buf) + } } else { @@ -226,3 +218,12 @@ func (c *client) writeArray(ay []interface{}) { } } } + +func (c *client) writeBulkFrom(n int64, rb io.Reader) { + c.wb.WriteByte('$') + c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10))) + c.wb.Write(Delims) + + io.Copy(c.wb, rb) + c.wb.Write(Delims) +} diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 8588877..7e1c881 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -1,8 +1,11 @@ package server import ( + "encoding/binary" "fmt" "github.com/siddontang/ledisdb/ledis" + "io/ioutil" + "os" "strconv" "strings" ) @@ -35,3 +38,77 @@ func slaveofCommand(c *client) error { return nil } + +func fullsyncCommand(c *client) error { + //todo, multi fullsync may use same dump file + dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_") + if err != nil { + return err + } + + if err = c.app.ldb.Dump(dumpFile); err != nil { + return err + } + + st, _ := dumpFile.Stat() + n := st.Size() + + dumpFile.Seek(0, os.SEEK_SET) + + c.writeBulkFrom(n, dumpFile) + + name := dumpFile.Name() + dumpFile.Close() + + os.Remove(name) + + return nil +} + +var reserveInfoSpace = make([]byte, 16) + +func syncCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + var logIndex int64 + var logPos int64 + var err error + logIndex, err = ledis.StrInt64(args[0], nil) + if err != nil { + return ErrCmdParams + } + + logPos, err = ledis.StrInt64(args[1], nil) + if err != nil { + return ErrCmdParams + } + + c.syncBuf.Reset() + + //reserve space to write master info + if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil { + return err + } + + if m, err := c.app.ldb.ReadEventsTo(logIndex, logPos, &c.syncBuf); err != nil { + return err + } else { + buf := c.syncBuf.Bytes() + + binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex)) + binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos)) + + c.writeBulk(buf) + } + + return nil +} + +func init() { + register("slaveof", slaveofCommand) + register("fullsync", fullsyncCommand) + register("sync", syncCommand) +} diff --git a/server/replication.go b/server/replication.go index 81ba85e..bc64496 100644 --- a/server/replication.go +++ b/server/replication.go @@ -1,25 +1,77 @@ package server import ( + "bufio" + "bytes" + "encoding/binary" "encoding/json" + "errors" + "fmt" "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/ledis" "io/ioutil" + "net" "os" "path" + "strconv" + "sync" + "time" ) -type masterInfo struct { - Addr string `json:"addr"` - LogFile string `json:"log_name"` - LogPos int64 `json:"log_pos"` +var ( + errConnectMaster = errors.New("connect master error") +) + +type master struct { + sync.Mutex + + addr string `json:"addr"` + logFileIndex int64 `json:"log_file_index"` + logPos int64 `json:"log_pos"` + + c net.Conn + rb *bufio.Reader + + app *App + + quit chan struct{} + + infoName string + infoNameBak string + + wg sync.WaitGroup + + syncBuf bytes.Buffer } -func (app *App) getMasterInfoName() string { - return path.Join(app.cfg.DataDir, "master.info") +func newMaster(app *App) *master { + m := new(master) + m.app = app + + m.infoName = path.Join(m.app.cfg.DataDir, "master.info") + m.infoNameBak = fmt.Sprintf("%s.bak", m.infoName) + + m.quit = make(chan struct{}) + + //if load error, we will start a fullsync later + m.loadInfo() + + return m } -func (app *App) loadMasterInfo() error { - data, err := ioutil.ReadFile(app.getMasterInfoName()) +func (m *master) Close() { + close(m.quit) + + if m.c != nil { + m.c.Close() + m.c = nil + } + + m.wg.Wait() +} + +func (m *master) loadInfo() error { + data, err := ioutil.ReadFile(m.infoName) if err != nil { if os.IsNotExist(err) { return nil @@ -28,23 +80,21 @@ func (app *App) loadMasterInfo() error { } } - if err = json.Unmarshal(data, &app.master); err != nil { + if err = json.Unmarshal(data, m); err != nil { return err } return nil } -func (app *App) saveMasterInfo() error { - bakName := path.Join(app.cfg.DataDir, "master.info.bak") - - data, err := json.Marshal(&app.master) +func (m *master) saveInfo() error { + data, err := json.Marshal(m) if err != nil { return err } var fd *os.File - fd, err = os.OpenFile(bakName, os.O_CREATE|os.O_WRONLY, os.ModePerm) + fd, err = os.OpenFile(m.infoNameBak, os.O_CREATE|os.O_WRONLY, os.ModePerm) if err != nil { return err } @@ -55,32 +105,208 @@ func (app *App) saveMasterInfo() error { } fd.Close() - return os.Rename(bakName, app.getMasterInfoName()) + return os.Rename(m.infoNameBak, m.infoName) +} + +func (m *master) connect() error { + if len(m.addr) == 0 { + return fmt.Errorf("no assign master addr") + } + + if m.c != nil { + m.c.Close() + m.c = nil + } + + if c, err := net.Dial("tcp", m.addr); err != nil { + return err + } else { + m.c = c + + m.rb = bufio.NewReaderSize(m.c, 4096) + } + return nil +} + +func (m *master) resetInfo(addr string) { + m.addr = addr + m.logFileIndex = 0 + m.logPos = 0 +} + +func (m *master) stopReplication() error { + m.Close() + + if err := m.saveInfo(); err != nil { + log.Error("save master info error %s", err.Error()) + return err + } + + return nil +} + +func (m *master) startReplication(masterAddr string) error { + //stop last replcation, if avaliable + m.Close() + + if masterAddr != m.addr { + m.resetInfo(masterAddr) + if err := m.saveInfo(); err != nil { + log.Error("save master info error %s", err.Error()) + return err + } + } + + m.quit = make(chan struct{}) + + go m.runReplication() + return nil +} + +func (m *master) runReplication() { + m.wg.Add(1) + defer m.wg.Done() + + for { + select { + case <-m.quit: + return + default: + if err := m.connect(); err != nil { + log.Error("connect master %s error %s, try 2s later", m.addr, err.Error()) + time.Sleep(2 * time.Second) + continue + } + } + + if m.logFileIndex == 0 { + //try a fullsync + if err := m.fullSync(); err != nil { + log.Warn("full sync error %s", err.Error()) + return + } + + if m.logFileIndex == 0 { + //master not support binlog, we cannot sync, so stop replication + m.stopReplication() + return + } + } + + t := time.NewTicker(1 * time.Second) + + //then we will try sync every 1 seconds + for { + select { + case <-t.C: + if err := m.sync(); err != nil { + log.Warn("sync error %s", err.Error()) + return + } + case <-m.quit: + return + } + } + } + + return +} + +var ( + fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync + syncCmdFormat = "*3\r\n$4\r\nsync\r\n$%d\r\n%s\r\n%d\r\n%s\r\n" //sync file pos +) + +func (m *master) fullSync() error { + if _, err := m.c.Write(fullSyncCmd); err != nil { + return err + } + + dumpPath := path.Join(m.app.cfg.DataDir, "master.dump") + f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + + defer os.Remove(dumpPath) + + err = readBulkTo(m.rb, f) + f.Close() + if err != nil { + log.Error("read dump data error %s", err.Error()) + return err + } + + if err = m.app.ldb.FlushAll(); err != nil { + return err + } + + var head *ledis.MasterInfo + head, err = m.app.ldb.LoadDumpFile(dumpPath) + + if err != nil { + log.Error("load dump file error %s", err.Error()) + return err + } + + m.logFileIndex = head.LogFileIndex + m.logPos = head.LogPos + + return nil +} + +func (m *master) sync() error { + logIndexStr := strconv.FormatInt(m.logFileIndex, 10) + logPosStr := strconv.FormatInt(m.logPos, 10) + + if _, err := m.c.Write(ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), + logIndexStr, len(logPosStr), logPosStr))); err != nil { + return err + } + + m.syncBuf.Reset() + + err := readBulkTo(m.rb, &m.syncBuf) + if err != nil { + return err + } + + err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logFileIndex) + if err != nil { + return err + } + + err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logPos) + if err != nil { + return err + } + + if m.logFileIndex == 0 { + //master now not support binlog, stop replication + m.stopReplication() + return nil + } else if m.logFileIndex == -1 { + //-1 means than binlog index and pos are lost, we must start a full sync instead + return m.fullSync() + } + + err = m.app.ldb.ReplicateFromReader(&m.syncBuf) + if err != nil { + return err + } + + return nil + } func (app *App) slaveof(masterAddr string) error { + app.m.Lock() + defer app.m.Unlock() + if len(masterAddr) == 0 { - //stop replication + return app.m.stopReplication() } else { - } - - return nil -} - -func (app *App) runReplication() { -} - -func (app *App) startReplication(masterAddr string) error { - if err := app.loadMasterInfo(); err != nil { - log.Error("load master.info error %s, use fullsync", err.Error()) - app.master = masterInfo{masterAddr, "", 0} - } else if app.master.Addr != masterAddr { - if err := app.ldb.FlushAll(); err != nil { - log.Error("replication flush old data error %s", err.Error()) - return err - } - - app.master = masterInfo{masterAddr, "", 0} + return app.m.startReplication(masterAddr) } return nil diff --git a/server/util.go b/server/util.go new file mode 100644 index 0000000..e2a6738 --- /dev/null +++ b/server/util.go @@ -0,0 +1,57 @@ +package server + +import ( + "bufio" + "errors" + "github.com/siddontang/ledisdb/ledis" + "io" + "strconv" +) + +var ( + errArrayFormat = errors.New("bad array format") + errBulkFormat = errors.New("bad bulk string format") + errLineFormat = errors.New("bad response line format") +) + +func readLine(rb *bufio.Reader) ([]byte, error) { + p, err := rb.ReadSlice('\n') + + if err != nil { + return nil, err + } + i := len(p) - 2 + if i < 0 || p[i] != '\r' { + return nil, errLineFormat + } + return p[:i], nil +} + +func readBulkTo(rb *bufio.Reader, w io.Writer) error { + l, err := readLine(rb) + if len(l) == 0 { + return errArrayFormat + } else if l[0] == '$' { + var n int + //handle resp string + if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil { + return err + } else if n == -1 { + return nil + } else { + if _, err = io.CopyN(w, rb, int64(n)); err != nil { + return err + } + + if l, err = readLine(rb); err != nil { + return err + } else if len(l) != 0 { + return errBulkFormat + } + } + } else { + return errArrayFormat + } + + return nil +}