simplify client connection

fix
https://github.com/siddontang/redis-failover/issues/3#issuecomment-76665
133
This commit is contained in:
siddontang 2015-03-02 17:12:55 +08:00
parent b1ccbb9c75
commit c4380b6406
11 changed files with 122 additions and 181 deletions

View File

@ -2,6 +2,7 @@ package ledis
import (
"container/list"
"net"
"strings"
"sync"
)
@ -46,11 +47,34 @@ func NewClient(cfg *Config) *Client {
}
func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
co := c.get()
r, err := co.Do(cmd, args...)
c.put(co)
var co *Conn
var err error
var r interface{}
return r, err
for i := 0; i < 2; i++ {
co, err = c.get()
if err != nil {
return nil, err
}
r, err = co.Do(cmd, args...)
if err != nil {
co.finalize()
if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") {
//send to a closed connection, try again
continue
}
return nil, err
} else {
c.put(co)
}
return r, nil
}
return nil, err
}
func (c *Client) Close() {
@ -66,11 +90,11 @@ func (c *Client) Close() {
}
}
func (c *Client) Get() *Conn {
func (c *Client) Get() (*Conn, error) {
return c.get()
}
func (c *Client) get() *Conn {
func (c *Client) get() (*Conn, error) {
c.Lock()
if c.conns.Len() == 0 {
c.Unlock()
@ -83,7 +107,7 @@ func (c *Client) get() *Conn {
c.Unlock()
return co
return co, nil
}
}

View File

@ -8,8 +8,6 @@ import (
"io"
"net"
"strconv"
"strings"
"sync"
"time"
)
@ -19,50 +17,37 @@ type Error string
func (err Error) Error() string { return string(err) }
type Conn struct {
cm sync.Mutex
wm sync.Mutex
rm sync.Mutex
closed bool
client *Client
addr string
c net.Conn
br *bufio.Reader
bw *bufio.Writer
rSize int
wSize int
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
connectTimeout time.Duration
}
func NewConn(addr string) *Conn {
co := new(Conn)
co.addr = addr
co.rSize = 4096
co.wSize = 4096
co.closed = false
return co
func Connect(addr string) (*Conn, error) {
return ConnectWithSize(addr, 4096, 4096)
}
func NewConnSize(addr string, readSize int, writeSize int) *Conn {
co := NewConn(addr)
co.rSize = readSize
co.wSize = writeSize
return co
func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) {
c := new(Conn)
var err error
c.c, err = net.Dial(getProto(addr), addr)
if err != nil {
return nil, err
}
c.br = bufio.NewReaderSize(c.c, readSize)
c.bw = bufio.NewWriterSize(c.c, writeSize)
return c, nil
}
func (c *Conn) Close() {
@ -73,27 +58,13 @@ func (c *Conn) Close() {
}
}
func (c *Conn) SetConnectTimeout(t time.Duration) {
c.cm.Lock()
c.connectTimeout = t
c.cm.Unlock()
}
func (c *Conn) SetReadDeadline(t time.Time) {
c.cm.Lock()
if c.c != nil {
c.c.SetReadDeadline(t)
}
c.cm.Unlock()
}
func (c *Conn) SetWriteDeadline(t time.Time) {
c.cm.Lock()
if c.c != nil {
c.c.SetWriteDeadline(t)
}
c.cm.Unlock()
}
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
if err := c.Send(cmd, args...); err != nil {
@ -104,28 +75,6 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
}
func (c *Conn) Send(cmd string, args ...interface{}) error {
var err error
for i := 0; i < 2; i++ {
if err = c.send(cmd, args...); err != nil {
if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") {
//send to a closed connection, try again
continue
}
} else {
return nil
}
}
return err
}
func (c *Conn) send(cmd string, args ...interface{}) error {
if err := c.connect(); err != nil {
return err
}
c.wm.Lock()
defer c.wm.Unlock()
if err := c.writeCommand(cmd, args); err != nil {
c.finalize()
return err
@ -139,9 +88,6 @@ func (c *Conn) send(cmd string, args ...interface{}) error {
}
func (c *Conn) Receive() (interface{}, error) {
c.rm.Lock()
defer c.rm.Unlock()
if reply, err := c.readReply(); err != nil {
c.finalize()
return nil, err
@ -155,9 +101,6 @@ func (c *Conn) Receive() (interface{}, error) {
}
func (c *Conn) ReceiveBulkTo(w io.Writer) error {
c.rm.Lock()
defer c.rm.Unlock()
err := c.readBulkReplyTo(w)
if err != nil {
if _, ok := err.(Error); !ok {
@ -168,45 +111,8 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
}
func (c *Conn) finalize() {
c.cm.Lock()
if !c.closed {
if c.c != nil {
c.c.Close()
}
c.closed = true
}
c.cm.Unlock()
}
func (c *Conn) connect() error {
c.cm.Lock()
defer c.cm.Unlock()
if !c.closed && c.c != nil {
return nil
}
var err error
c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout)
if err != nil {
c.c = nil
return err
}
if c.br != nil {
c.br.Reset(c.c)
} else {
c.br = bufio.NewReaderSize(c.c, c.rSize)
}
if c.bw != nil {
c.bw.Reset(c.c)
} else {
c.bw = bufio.NewWriterSize(c.c, c.wSize)
}
return nil
}
func (c *Conn) writeLen(prefix byte, n int) error {
c.lenScratch[len(c.lenScratch)-1] = '\n'
@ -447,9 +353,12 @@ func (c *Conn) readReply() (interface{}, error) {
return nil, errors.New("ledis: unexpected response line")
}
func (c *Client) newConn(addr string) *Conn {
co := NewConnSize(addr, c.cfg.ReadBufferSize, c.cfg.WriteBufferSize)
func (c *Client) newConn(addr string) (*Conn, error) {
co, err := ConnectWithSize(addr, c.cfg.ReadBufferSize, c.cfg.WriteBufferSize)
if err != nil {
return nil, err
}
co.client = c
return co
return co, nil
}

View File

@ -38,7 +38,7 @@ func bench(cmd string, f func(c *ledis.Conn)) {
t1 := time.Now()
for i := 0; i < *clients; i++ {
go func() {
c := client.Get()
c, _ := client.Get()
for j := 0; j < loop; j++ {
f(c)
}
@ -277,7 +277,7 @@ func main() {
client = ledis.NewClient(cfg)
for i := 0; i < *clients; i++ {
c := client.Get()
c, _ := client.Get()
c.Close()
}

View File

@ -32,7 +32,11 @@ func main() {
addr = fmt.Sprintf("%s:%d", *host, *port)
}
c := ledis.NewConnSize(addr, 16*1024, 4096)
c, err := ledis.ConnectWithSize(addr, 16*1024, 4096)
if err != nil {
println(err.Error())
return
}
defer c.Close()

View File

@ -157,7 +157,9 @@ func (app *App) Close() {
app.closeScript()
app.m.Lock()
app.m.Close()
app.m.Unlock()
app.snap.Close()

View File

@ -22,7 +22,8 @@ func newTestLedisClient() {
func getTestConn() *ledis.Conn {
startTestApp()
return testLedisClient.Get()
conn, _ := testLedisClient.Get()
return conn
}
func startTestApp() {

View File

@ -266,11 +266,13 @@ func xmigratedbCommand(c *client) error {
mc := c.app.getMigrateClient(addr)
conn := mc.Get()
conn, err := mc.Get()
if err != nil {
return err
}
//timeout is milliseconds
t := time.Duration(timeout) * time.Millisecond
conn.SetConnectTimeout(t)
if _, err = conn.Do("select", db); err != nil {
return err
@ -358,11 +360,13 @@ func xmigrateCommand(c *client) error {
mc := c.app.getMigrateClient(addr)
conn := mc.Get()
conn, err := mc.Get()
if err != nil {
return err
}
//timeout is milliseconds
t := time.Duration(timeout) * time.Millisecond
conn.SetConnectTimeout(t)
if _, err = conn.Do("select", db); err != nil {
return err

View File

@ -80,10 +80,10 @@ func TestMigrate(t *testing.T) {
time.Sleep(1 * time.Second)
c1 := ledis.NewConn(s1Cfg.Addr)
c1, _ := ledis.Connect(s1Cfg.Addr)
defer c1.Close()
c2 := ledis.NewConn(s2Cfg.Addr)
c2, _ := ledis.Connect(s2Cfg.Addr)
defer c2.Close()
if _, err = c1.Do("set", "a", "1"); err != nil {

View File

@ -131,7 +131,7 @@ func syncCommand(c *client) error {
c.syncBuf.Write(dummyBuf)
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30, c.app.quit); err != nil {
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 1, c.app.quit); err != nil {
return err
} else {
buf := c.syncBuf.Bytes()

View File

@ -159,7 +159,7 @@ func TestReplication(t *testing.T) {
}
func checkTestRole(addr string, checkRoles []interface{}) error {
conn := goledis.NewConn(addr)
conn, _ := goledis.Connect(addr)
defer conn.Close()
roles, err := goledis.MultiBulk(conn.Do("ROLE"))
if err != nil {

View File

@ -48,6 +48,7 @@ func (b *syncBuffer) Write(data []byte) (int, error) {
type master struct {
sync.Mutex
connLock sync.Mutex
conn *goledis.Conn
app *App
@ -76,45 +77,45 @@ func newMaster(app *App) *master {
}
func (m *master) Close() {
select {
case m.quit <- struct{}{}:
default:
break
m.state.Set(replConnectState)
if !m.isQuited() {
close(m.quit)
}
m.closeConn()
m.wg.Wait()
select {
case <-m.quit:
default:
}
m.state.Set(replConnectState)
}
func (m *master) resetConn() error {
if len(m.addr) == 0 {
return fmt.Errorf("no assign master addr")
}
if m.conn != nil {
m.conn.Close()
}
m.conn = goledis.NewConn(m.addr)
return nil
}
func (m *master) closeConn() {
m.connLock.Lock()
defer m.connLock.Unlock()
if m.conn != nil {
//for replication, we send quit command to close gracefully
m.conn.Send("quit")
m.conn.SetReadDeadline(time.Now().Add(1 * time.Second))
m.conn.Close()
}
m.conn = nil
}
func (m *master) checkConn() error {
m.connLock.Lock()
defer m.connLock.Unlock()
var err error
if m.conn == nil {
m.conn, err = goledis.Connect(m.addr)
} else {
if _, err = m.conn.Do("PING"); err != nil {
m.conn.Close()
m.conn = nil
}
}
return err
}
func (m *master) stopReplication() error {
@ -131,12 +132,18 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
m.app.cfg.SetReadonly(true)
m.quit = make(chan struct{}, 1)
if len(m.addr) == 0 {
return fmt.Errorf("no assign master addr")
}
m.wg.Add(1)
go m.runReplication(restart)
return nil
}
func (m *master) needQuit() bool {
func (m *master) isQuited() bool {
select {
case <-m.quit:
return true
@ -151,20 +158,15 @@ func (m *master) runReplication(restart bool) {
m.wg.Done()
}()
if err := m.resetConn(); err != nil {
log.Errorf("reset conn error %s", err.Error())
return
}
for {
m.state.Set(replConnectState)
if m.needQuit() {
if m.isQuited() {
return
}
if _, err := m.conn.Do("ping"); err != nil {
log.Errorf("ping master %s error %s, try 3s later", m.addr, err.Error())
if err := m.checkConn(); err != nil {
log.Errorf("check master %s connection error %s, try 3s later", m.addr, err.Error())
select {
case <-time.After(3 * time.Second):
@ -174,7 +176,7 @@ func (m *master) runReplication(restart bool) {
continue
}
if m.needQuit() {
if m.isQuited() {
return
}
@ -210,7 +212,7 @@ func (m *master) runReplication(restart bool) {
}
m.state.Set(replConnectedState)
if m.needQuit() {
if m.isQuited() {
return
}
}
@ -295,13 +297,9 @@ func (m *master) sync() error {
m.syncBuf.Reset()
if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil {
switch err.Error() {
case ledis.ErrLogMissed.Error():
if strings.Contains(err.Error(), ledis.ErrLogMissed.Error()) {
return m.fullSync()
case ledis.ErrRplNotSupport.Error():
m.stopReplication()
return nil
default:
} else {
return err
}
}
@ -350,6 +348,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error {
app.cfg.SlaveOf = masterAddr
if len(masterAddr) == 0 {
log.Infof("slaveof no one, stop replication")
if err := app.m.stopReplication(); err != nil {
return err
}
@ -395,11 +394,9 @@ func (app *App) removeSlave(c *client, activeQuit bool) {
if _, ok := app.slaves[addr]; ok {
delete(app.slaves, addr)
log.Infof("remove slave %s", addr)
if activeQuit {
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
}
}
}
func (app *App) slaveAck(c *client) {
addr := c.slaveListeningAddr