diff --git a/client/go/ledis/client.go b/client/go/ledis/client.go index bdc532f..a6811f4 100644 --- a/client/go/ledis/client.go +++ b/client/go/ledis/client.go @@ -2,6 +2,7 @@ package ledis import ( "container/list" + "net" "strings" "sync" ) @@ -46,11 +47,34 @@ func NewClient(cfg *Config) *Client { } func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) { - co := c.get() - r, err := co.Do(cmd, args...) - c.put(co) + var co *Conn + var err error + var r interface{} - return r, err + for i := 0; i < 2; i++ { + co, err = c.get() + if err != nil { + return nil, err + } + + r, err = co.Do(cmd, args...) + if err != nil { + co.finalize() + + if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") { + //send to a closed connection, try again + continue + } + + return nil, err + } else { + c.put(co) + } + + return r, nil + } + + return nil, err } func (c *Client) Close() { @@ -66,11 +90,11 @@ func (c *Client) Close() { } } -func (c *Client) Get() *Conn { +func (c *Client) Get() (*Conn, error) { return c.get() } -func (c *Client) get() *Conn { +func (c *Client) get() (*Conn, error) { c.Lock() if c.conns.Len() == 0 { c.Unlock() @@ -83,7 +107,7 @@ func (c *Client) get() *Conn { c.Unlock() - return co + return co, nil } } diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 12f5f00..d78bc03 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -8,8 +8,6 @@ import ( "io" "net" "strconv" - "strings" - "sync" "time" ) @@ -19,50 +17,37 @@ type Error string func (err Error) Error() string { return string(err) } type Conn struct { - cm sync.Mutex - wm sync.Mutex - rm sync.Mutex - - closed bool - client *Client - addr string - c net.Conn br *bufio.Reader bw *bufio.Writer - rSize int - wSize int - // Scratch space for formatting argument length. // '*' or '$', length, "\r\n" lenScratch [32]byte // Scratch space for formatting integers and floats. numScratch [40]byte - - connectTimeout time.Duration } -func NewConn(addr string) *Conn { - co := new(Conn) - co.addr = addr - - co.rSize = 4096 - co.wSize = 4096 - - co.closed = false - - return co +func Connect(addr string) (*Conn, error) { + return ConnectWithSize(addr, 4096, 4096) } -func NewConnSize(addr string, readSize int, writeSize int) *Conn { - co := NewConn(addr) - co.rSize = readSize - co.wSize = writeSize - return co +func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) { + c := new(Conn) + + var err error + c.c, err = net.Dial(getProto(addr), addr) + if err != nil { + return nil, err + } + + c.br = bufio.NewReaderSize(c.c, readSize) + c.bw = bufio.NewWriterSize(c.c, writeSize) + + return c, nil } func (c *Conn) Close() { @@ -73,26 +58,12 @@ func (c *Conn) Close() { } } -func (c *Conn) SetConnectTimeout(t time.Duration) { - c.cm.Lock() - c.connectTimeout = t - c.cm.Unlock() -} - func (c *Conn) SetReadDeadline(t time.Time) { - c.cm.Lock() - if c.c != nil { - c.c.SetReadDeadline(t) - } - c.cm.Unlock() + c.c.SetReadDeadline(t) } func (c *Conn) SetWriteDeadline(t time.Time) { - c.cm.Lock() - if c.c != nil { - c.c.SetWriteDeadline(t) - } - c.cm.Unlock() + c.c.SetWriteDeadline(t) } func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { @@ -104,28 +75,6 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } func (c *Conn) Send(cmd string, args ...interface{}) error { - var err error - for i := 0; i < 2; i++ { - if err = c.send(cmd, args...); err != nil { - if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") { - //send to a closed connection, try again - continue - } - } else { - return nil - } - } - return err -} - -func (c *Conn) send(cmd string, args ...interface{}) error { - if err := c.connect(); err != nil { - return err - } - - c.wm.Lock() - defer c.wm.Unlock() - if err := c.writeCommand(cmd, args); err != nil { c.finalize() return err @@ -139,9 +88,6 @@ func (c *Conn) send(cmd string, args ...interface{}) error { } func (c *Conn) Receive() (interface{}, error) { - c.rm.Lock() - defer c.rm.Unlock() - if reply, err := c.readReply(); err != nil { c.finalize() return nil, err @@ -155,9 +101,6 @@ func (c *Conn) Receive() (interface{}, error) { } func (c *Conn) ReceiveBulkTo(w io.Writer) error { - c.rm.Lock() - defer c.rm.Unlock() - err := c.readBulkReplyTo(w) if err != nil { if _, ok := err.(Error); !ok { @@ -168,44 +111,7 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { } func (c *Conn) finalize() { - c.cm.Lock() - if !c.closed { - if c.c != nil { - c.c.Close() - } - c.closed = true - } - c.cm.Unlock() -} - -func (c *Conn) connect() error { - c.cm.Lock() - defer c.cm.Unlock() - - if !c.closed && c.c != nil { - return nil - } - - var err error - c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout) - if err != nil { - c.c = nil - return err - } - - if c.br != nil { - c.br.Reset(c.c) - } else { - c.br = bufio.NewReaderSize(c.c, c.rSize) - } - - if c.bw != nil { - c.bw.Reset(c.c) - } else { - c.bw = bufio.NewWriterSize(c.c, c.wSize) - } - - return nil + c.c.Close() } func (c *Conn) writeLen(prefix byte, n int) error { @@ -447,9 +353,12 @@ func (c *Conn) readReply() (interface{}, error) { return nil, errors.New("ledis: unexpected response line") } -func (c *Client) newConn(addr string) *Conn { - co := NewConnSize(addr, c.cfg.ReadBufferSize, c.cfg.WriteBufferSize) +func (c *Client) newConn(addr string) (*Conn, error) { + co, err := ConnectWithSize(addr, c.cfg.ReadBufferSize, c.cfg.WriteBufferSize) + if err != nil { + return nil, err + } co.client = c - return co + return co, nil } diff --git a/cmd/ledis-benchmark/main.go b/cmd/ledis-benchmark/main.go index 640607d..537aeef 100644 --- a/cmd/ledis-benchmark/main.go +++ b/cmd/ledis-benchmark/main.go @@ -38,7 +38,7 @@ func bench(cmd string, f func(c *ledis.Conn)) { t1 := time.Now() for i := 0; i < *clients; i++ { go func() { - c := client.Get() + c, _ := client.Get() for j := 0; j < loop; j++ { f(c) } @@ -277,7 +277,7 @@ func main() { client = ledis.NewClient(cfg) for i := 0; i < *clients; i++ { - c := client.Get() + c, _ := client.Get() c.Close() } diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go index 87be176..c7c8f9e 100644 --- a/cmd/ledis-dump/main.go +++ b/cmd/ledis-dump/main.go @@ -32,7 +32,11 @@ func main() { addr = fmt.Sprintf("%s:%d", *host, *port) } - c := ledis.NewConnSize(addr, 16*1024, 4096) + c, err := ledis.ConnectWithSize(addr, 16*1024, 4096) + if err != nil { + println(err.Error()) + return + } defer c.Close() diff --git a/server/app.go b/server/app.go index 08cd5d0..6937150 100644 --- a/server/app.go +++ b/server/app.go @@ -157,7 +157,9 @@ func (app *App) Close() { app.closeScript() + app.m.Lock() app.m.Close() + app.m.Unlock() app.snap.Close() diff --git a/server/app_test.go b/server/app_test.go index 781d6fb..80c8909 100644 --- a/server/app_test.go +++ b/server/app_test.go @@ -22,7 +22,8 @@ func newTestLedisClient() { func getTestConn() *ledis.Conn { startTestApp() - return testLedisClient.Get() + conn, _ := testLedisClient.Get() + return conn } func startTestApp() { diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index a162328..186c62d 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -266,11 +266,13 @@ func xmigratedbCommand(c *client) error { mc := c.app.getMigrateClient(addr) - conn := mc.Get() + conn, err := mc.Get() + if err != nil { + return err + } //timeout is milliseconds t := time.Duration(timeout) * time.Millisecond - conn.SetConnectTimeout(t) if _, err = conn.Do("select", db); err != nil { return err @@ -358,11 +360,13 @@ func xmigrateCommand(c *client) error { mc := c.app.getMigrateClient(addr) - conn := mc.Get() + conn, err := mc.Get() + if err != nil { + return err + } //timeout is milliseconds t := time.Duration(timeout) * time.Millisecond - conn.SetConnectTimeout(t) if _, err = conn.Do("select", db); err != nil { return err diff --git a/server/cmd_migrate_test.go b/server/cmd_migrate_test.go index 1dfb2c0..669043a 100644 --- a/server/cmd_migrate_test.go +++ b/server/cmd_migrate_test.go @@ -80,10 +80,10 @@ func TestMigrate(t *testing.T) { time.Sleep(1 * time.Second) - c1 := ledis.NewConn(s1Cfg.Addr) + c1, _ := ledis.Connect(s1Cfg.Addr) defer c1.Close() - c2 := ledis.NewConn(s2Cfg.Addr) + c2, _ := ledis.Connect(s2Cfg.Addr) defer c2.Close() if _, err = c1.Do("set", "a", "1"); err != nil { diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 84fc974..390530e 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -131,7 +131,7 @@ func syncCommand(c *client) error { c.syncBuf.Write(dummyBuf) - if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30, c.app.quit); err != nil { + if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 1, c.app.quit); err != nil { return err } else { buf := c.syncBuf.Bytes() diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index 66b99c7..f2a47a2 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -159,7 +159,7 @@ func TestReplication(t *testing.T) { } func checkTestRole(addr string, checkRoles []interface{}) error { - conn := goledis.NewConn(addr) + conn, _ := goledis.Connect(addr) defer conn.Close() roles, err := goledis.MultiBulk(conn.Do("ROLE")) if err != nil { diff --git a/server/replication.go b/server/replication.go index e86589a..3cf0eb7 100644 --- a/server/replication.go +++ b/server/replication.go @@ -48,7 +48,8 @@ func (b *syncBuffer) Write(data []byte) (int, error) { type master struct { sync.Mutex - conn *goledis.Conn + connLock sync.Mutex + conn *goledis.Conn app *App @@ -76,45 +77,45 @@ func newMaster(app *App) *master { } func (m *master) Close() { - select { - case m.quit <- struct{}{}: - default: - break + m.state.Set(replConnectState) + + if !m.isQuited() { + close(m.quit) } m.closeConn() m.wg.Wait() - - select { - case <-m.quit: - default: - } - - m.state.Set(replConnectState) -} - -func (m *master) resetConn() error { - if len(m.addr) == 0 { - return fmt.Errorf("no assign master addr") - } - - if m.conn != nil { - m.conn.Close() - } - - m.conn = goledis.NewConn(m.addr) - - return nil } func (m *master) closeConn() { + m.connLock.Lock() + defer m.connLock.Unlock() + if m.conn != nil { //for replication, we send quit command to close gracefully - m.conn.Send("quit") + m.conn.SetReadDeadline(time.Now().Add(1 * time.Second)) m.conn.Close() } + + m.conn = nil +} + +func (m *master) checkConn() error { + m.connLock.Lock() + defer m.connLock.Unlock() + + var err error + if m.conn == nil { + m.conn, err = goledis.Connect(m.addr) + } else { + if _, err = m.conn.Do("PING"); err != nil { + m.conn.Close() + m.conn = nil + } + } + return err } func (m *master) stopReplication() error { @@ -131,12 +132,18 @@ func (m *master) startReplication(masterAddr string, restart bool) error { m.app.cfg.SetReadonly(true) + m.quit = make(chan struct{}, 1) + + if len(m.addr) == 0 { + return fmt.Errorf("no assign master addr") + } + m.wg.Add(1) go m.runReplication(restart) return nil } -func (m *master) needQuit() bool { +func (m *master) isQuited() bool { select { case <-m.quit: return true @@ -151,20 +158,15 @@ func (m *master) runReplication(restart bool) { m.wg.Done() }() - if err := m.resetConn(); err != nil { - log.Errorf("reset conn error %s", err.Error()) - return - } - for { m.state.Set(replConnectState) - if m.needQuit() { + if m.isQuited() { return } - if _, err := m.conn.Do("ping"); err != nil { - log.Errorf("ping master %s error %s, try 3s later", m.addr, err.Error()) + if err := m.checkConn(); err != nil { + log.Errorf("check master %s connection error %s, try 3s later", m.addr, err.Error()) select { case <-time.After(3 * time.Second): @@ -174,7 +176,7 @@ func (m *master) runReplication(restart bool) { continue } - if m.needQuit() { + if m.isQuited() { return } @@ -210,7 +212,7 @@ func (m *master) runReplication(restart bool) { } m.state.Set(replConnectedState) - if m.needQuit() { + if m.isQuited() { return } } @@ -295,13 +297,9 @@ func (m *master) sync() error { m.syncBuf.Reset() if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil { - switch err.Error() { - case ledis.ErrLogMissed.Error(): + if strings.Contains(err.Error(), ledis.ErrLogMissed.Error()) { return m.fullSync() - case ledis.ErrRplNotSupport.Error(): - m.stopReplication() - return nil - default: + } else { return err } } @@ -350,6 +348,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error { app.cfg.SlaveOf = masterAddr if len(masterAddr) == 0 { + log.Infof("slaveof no one, stop replication") if err := app.m.stopReplication(); err != nil { return err } @@ -395,9 +394,7 @@ func (app *App) removeSlave(c *client, activeQuit bool) { if _, ok := app.slaves[addr]; ok { delete(app.slaves, addr) log.Infof("remove slave %s", addr) - if activeQuit { - asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get()) - } + asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get()) } }