diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 594eeae..de55e1d 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -48,7 +48,7 @@ }, { "ImportPath": "github.com/siddontang/goleveldb/leveldb", - "Rev": "614b6126cf79eee8be803e7e65a3ef24fc12e44a" + "Rev": "71404b29ccd98b94ec2278afa806d59a11cd0d28" }, { "ImportPath": "github.com/szferi/gomdb", diff --git a/client/go/ledis/client.go b/client/go/ledis/client.go index dfe35b8..ee1aca2 100644 --- a/client/go/ledis/client.go +++ b/client/go/ledis/client.go @@ -19,23 +19,24 @@ type Config struct { type Client struct { sync.Mutex - cfg *Config - proto string + cfg *Config conns *list.List } +func getProto(addr string) string { + if strings.Contains(addr, "/") { + return "unix" + } else { + return "tcp" + } +} + func NewClient(cfg *Config) *Client { c := new(Client) c.cfg = cfg - if strings.Contains(cfg.Addr, "/") { - c.proto = "unix" - } else { - c.proto = "tcp" - } - c.conns = list.New() return c @@ -71,7 +72,7 @@ func (c *Client) get() *Conn { if c.conns.Len() == 0 { c.Unlock() - return c.newConn() + return c.newConn(c.cfg.Addr) } else { e := c.conns.Front() co := e.Value.(*Conn) diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 688460e..3d91925 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -19,10 +19,15 @@ func (err Error) Error() string { return string(err) } type Conn struct { client *Client + addr string + c net.Conn br *bufio.Reader bw *bufio.Writer + rSize int + wSize int + lastActive time.Time // Scratch space for formatting argument length. @@ -33,25 +38,57 @@ type Conn struct { numScratch [40]byte } +func NewConn(addr string) *Conn { + co := new(Conn) + co.addr = addr + + co.rSize = 4096 + co.wSize = 4096 + + return co +} + +func NewConnSize(addr string, readSize int, writeSize int) *Conn { + co := NewConn(addr) + co.rSize = readSize + co.wSize = writeSize + return co +} + func (c *Conn) Close() { - c.client.put(c) + if c.client != nil { + c.client.put(c) + } else { + c.finalize() + } } func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { - if err := c.connect(); err != nil { + if err := c.Send(cmd, args...); err != nil { return nil, err } + return c.Receive() +} + +func (c *Conn) Send(cmd string, args ...interface{}) error { + if err := c.connect(); err != nil { + return err + } + if err := c.writeCommand(cmd, args); err != nil { c.finalize() - return nil, err + return err } if err := c.bw.Flush(); err != nil { c.finalize() - return nil, err + return err } + return nil +} +func (c *Conn) Receive() (interface{}, error) { if reply, err := c.readReply(); err != nil { c.finalize() return nil, err @@ -64,6 +101,16 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } } +func (c *Conn) ReceiveBulkTo(w io.Writer) error { + err := c.readBulkReplyTo(w) + if err != nil { + if _, ok := err.(Error); !ok { + c.finalize() + } + } + return err +} + func (c *Conn) finalize() { if c.c != nil { c.c.Close() @@ -77,7 +124,7 @@ func (c *Conn) connect() error { } var err error - c.c, err = net.Dial(c.client.proto, c.client.cfg.Addr) + c.c, err = net.Dial(getProto(c.addr), c.addr) if err != nil { return err } @@ -85,13 +132,13 @@ func (c *Conn) connect() error { if c.br != nil { c.br.Reset(c.c) } else { - c.br = bufio.NewReader(c.c) + c.br = bufio.NewReaderSize(c.c, c.rSize) } if c.bw != nil { c.bw.Reset(c.c) } else { - c.bw = bufio.NewWriter(c.c) + c.bw = bufio.NewWriterSize(c.c, c.wSize) } return nil @@ -244,6 +291,41 @@ var ( pongReply interface{} = "PONG" ) +func (c *Conn) readBulkReplyTo(w io.Writer) error { + line, err := c.readLine() + if err != nil { + return err + } + if len(line) == 0 { + return errors.New("ledis: short response line") + } + switch line[0] { + case '-': + return Error(string(line[1:])) + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return err + } + + var nn int64 + if nn, err = io.CopyN(w, c.br, int64(n)); err != nil { + return err + } else if nn != int64(n) { + return io.ErrShortWrite + } + + if line, err := c.readLine(); err != nil { + return err + } else if len(line) != 0 { + return errors.New("ledis: bad bulk string format") + } + return nil + default: + return fmt.Errorf("ledis: not invalid bulk string type, but %c", line[0]) + } +} + func (c *Conn) readReply() (interface{}, error) { line, err := c.readLine() if err != nil { @@ -301,8 +383,8 @@ func (c *Conn) readReply() (interface{}, error) { return nil, errors.New("ledis: unexpected response line") } -func (c *Client) newConn() *Conn { - co := new(Conn) +func (c *Client) newConn(addr string) *Conn { + co := NewConn(addr) co.client = c return co diff --git a/cmd/ledis-benchmark/main.go b/cmd/ledis-benchmark/main.go index 9948f5b..379f6f9 100644 --- a/cmd/ledis-benchmark/main.go +++ b/cmd/ledis-benchmark/main.go @@ -18,7 +18,7 @@ var clients = flag.Int("c", 50, "number of clients") var reverse = flag.Bool("rev", false, "enable zset rev benchmark") var round = flag.Int("r", 1, "benchmark round number") var del = flag.Bool("del", true, "enable del benchmark") - +var valueSize = flag.Int("vsize", 100, "kv value size") var wg sync.WaitGroup var client *ledis.Client @@ -65,7 +65,7 @@ var kvDelBase int64 = 0 func benchSet() { f := func() { - value := make([]byte, 100) + value := make([]byte, *valueSize) crand.Read(value) n := atomic.AddInt64(&kvSetBase, 1) waitBench("set", n, value) @@ -103,7 +103,7 @@ func benchDel() { func benchPushList() { f := func() { - value := make([]byte, 10) + value := make([]byte, 100) crand.Read(value) waitBench("rpush", "mytestlist", value) } diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go index 6f9bac4..87be176 100644 --- a/cmd/ledis-dump/main.go +++ b/cmd/ledis-dump/main.go @@ -1,11 +1,9 @@ package main import ( - "bufio" "flag" "fmt" - "github.com/siddontang/ledisdb/server" - "net" + "github.com/siddontang/ledisdb/client/go/ledis" "os" ) @@ -14,12 +12,9 @@ var port = flag.Int("port", 6380, "ledis server port") var sock = flag.String("sock", "", "ledis unix socket domain") var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") -var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync - func main() { flag.Parse() - var c net.Conn var err error var f *os.File @@ -30,30 +25,25 @@ func main() { defer f.Close() + var addr string if len(*sock) != 0 { - c, err = net.Dial("unix", *sock) + addr = *sock } else { - addr := fmt.Sprintf("%s:%d", *host, *port) - c, err = net.Dial("tcp", addr) + addr = fmt.Sprintf("%s:%d", *host, *port) } - if err != nil { - println(err.Error()) - return - } + c := ledis.NewConnSize(addr, 16*1024, 4096) defer c.Close() println("dump begin") - if _, err = c.Write(fullSyncCmd); err != nil { + if err = c.Send("fullsync"); err != nil { println(err.Error()) return } - rb := bufio.NewReaderSize(c, 16*1024) - - if err = server.ReadBulkTo(rb, f); err != nil { + if err = c.ReceiveBulkTo(f); err != nil { println(err.Error()) return } diff --git a/ledis/dump_test.go b/ledis/dump_test.go index 98e4c9e..30c7ba5 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -21,7 +21,7 @@ func TestDump(t *testing.T) { cfgS := config.NewConfigDefault() cfgS.DataDir = "/tmp/test_ledis_slave" - os.RemoveAll(cfgM.DataDir) + os.RemoveAll(cfgS.DataDir) var slave *Ledis if slave, err = Open(cfgS); err != nil { diff --git a/server/replication.go b/server/replication.go index 47230b1..24a5c1a 100644 --- a/server/replication.go +++ b/server/replication.go @@ -1,19 +1,17 @@ package server import ( - "bufio" "bytes" "errors" "fmt" - "github.com/siddontang/go/hack" "github.com/siddontang/go/log" "github.com/siddontang/go/num" + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/rpl" "net" "os" "path" - "strconv" "strings" "sync" "time" @@ -26,8 +24,7 @@ var ( type master struct { sync.Mutex - conn net.Conn - rb *bufio.Reader + conn *goledis.Conn app *App @@ -49,16 +46,12 @@ func newMaster(app *App) *master { return m } -var ( - quitCmd = []byte("*1\r\n$4\r\nquit\r\n") -) - func (m *master) Close() { ledis.AsyncNotify(m.quit) if m.conn != nil { //for replication, we send quit command to close gracefully - m.conn.Write(quitCmd) + m.conn.Send("quit") m.conn.Close() m.conn = nil @@ -67,7 +60,7 @@ func (m *master) Close() { m.wg.Wait() } -func (m *master) connect() error { +func (m *master) resetConn() error { if len(m.addr) == 0 { return fmt.Errorf("no assign master addr") } @@ -77,13 +70,7 @@ func (m *master) connect() error { m.conn = nil } - if conn, err := net.Dial("tcp", m.addr); err != nil { - return err - } else { - m.conn = conn - - m.rb = bufio.NewReaderSize(m.conn, 4096) - } + m.conn = goledis.NewConn(m.addr) return nil } @@ -112,13 +99,18 @@ func (m *master) startReplication(masterAddr string, restart bool) error { func (m *master) runReplication(restart bool) { defer m.wg.Done() + if err := m.resetConn(); err != nil { + log.Error("reset conn error %s", err.Error()) + return + } + for { select { case <-m.quit: return default: - if err := m.connect(); err != nil { - log.Error("connect master %s error %s, try 2s later", m.addr, err.Error()) + if _, err := m.conn.Do("ping"); err != nil { + log.Error("ping master %s error %s, try 2s later", m.addr, err.Error()) time.Sleep(2 * time.Second) continue } @@ -160,27 +152,15 @@ func (m *master) runReplication(restart bool) { return } -var ( - fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync - syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid - replconfCmdFormat = "*3\r\n$8\r\nreplconf\r\n$14\r\nlistening-port\r\n$%d\r\n%s\r\n" //replconf listening-port port -) - func (m *master) replConf() error { _, port, err := net.SplitHostPort(m.app.cfg.Addr) if err != nil { return err } - cmd := hack.Slice(fmt.Sprintf(replconfCmdFormat, len(port), port)) - - if _, err := m.conn.Write(cmd); err != nil { + if s, err := goledis.String(m.conn.Do("replconf", "listening-port", port)); err != nil { return err - } - - if s, err := ReadStatus(m.rb); err != nil { - return err - } else if strings.ToLower(s) != "ok" { + } else if strings.ToUpper(s) != "OK" { return fmt.Errorf("not ok but %s", s) } @@ -190,7 +170,7 @@ func (m *master) replConf() error { func (m *master) fullSync() error { log.Info("begin full sync") - if _, err := m.conn.Write(fullSyncCmd); err != nil { + if err := m.conn.Send("fullsync"); err != nil { return err } @@ -202,7 +182,7 @@ func (m *master) fullSync() error { defer os.Remove(dumpPath) - err = ReadBulkTo(m.rb, f) + err = m.conn.ReceiveBulkTo(f) f.Close() if err != nil { log.Error("read dump data error %s", err.Error()) @@ -237,18 +217,13 @@ func (m *master) sync() error { return err } - logIDStr := strconv.FormatUint(syncID, 10) - - cmd := hack.Slice(fmt.Sprintf(syncCmdFormat, len(logIDStr), - logIDStr)) - - if _, err := m.conn.Write(cmd); err != nil { + if err := m.conn.Send("sync", syncID); err != nil { return err } m.syncBuf.Reset() - if err = ReadBulkTo(m.rb, &m.syncBuf); err != nil { + if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil { switch err.Error() { case ledis.ErrLogMissed.Error(): return m.fullSync() @@ -384,7 +359,9 @@ func (app *App) publishNewLog(l *rpl.Log) { app.slock.Lock() - total := (len(app.slaves) + 1) / 2 + slaveNum := len(app.slaves) + + total := (slaveNum + 1) / 2 if app.cfg.Replication.WaitMaxSlaveAcks > 0 { total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks) } @@ -409,15 +386,21 @@ func (app *App) publishNewLog(l *rpl.Log) { startTime := time.Now() done := make(chan struct{}, 1) - go func(total int) { - for i := 0; i < total; i++ { + go func() { + n := 0 + for i := 0; i < slaveNum; i++ { id := <-app.slaveSyncAck if id < logId { log.Info("some slave may close with last logid %d < %d", id, logId) + } else { + n++ + if n >= total { + break + } } } done <- struct{}{} - }(total) + }() select { case <-done: diff --git a/server/util.go b/server/util.go index 15e6d7b..44b289c 100644 --- a/server/util.go +++ b/server/util.go @@ -3,16 +3,10 @@ package server import ( "bufio" "errors" - "github.com/siddontang/go/hack" - "io" - "strconv" ) var ( - errArrayFormat = errors.New("bad array format") - errBulkFormat = errors.New("bad bulk string format") - errLineFormat = errors.New("bad response line format") - errStatusFormat = errors.New("bad status format") + errLineFormat = errors.New("bad response line format") ) func ReadLine(rb *bufio.Reader) ([]byte, error) { @@ -27,54 +21,3 @@ func ReadLine(rb *bufio.Reader) ([]byte, error) { } return p[:i], nil } - -func ReadBulkTo(rb *bufio.Reader, w io.Writer) error { - l, err := ReadLine(rb) - if err != nil { - return err - } else if len(l) == 0 { - return errBulkFormat - } else if l[0] == '$' { - var n int - //handle resp string - if n, err = strconv.Atoi(hack.String(l[1:])); err != nil { - return err - } else if n == -1 { - return nil - } else { - var nn int64 - if nn, err = io.CopyN(w, rb, int64(n)); err != nil { - return err - } else if nn != int64(n) { - return io.ErrShortWrite - } - - if l, err = ReadLine(rb); err != nil { - return err - } else if len(l) != 0 { - return errBulkFormat - } - } - } else if l[0] == '-' { - return errors.New(string(l[1:])) - } else { - return errBulkFormat - } - - return nil -} - -func ReadStatus(rb *bufio.Reader) (string, error) { - l, err := ReadLine(rb) - if err != nil { - return "", err - } else if len(l) == 0 { - return "", errStatusFormat - } else if l[0] == '+' { - return string(l[1:]), nil - } else if l[0] == '-' { - return "", errors.New(string(l[1:])) - } else { - return "", errStatusFormat - } -}