forked from mirror/ledisdb
parent
d058c20094
commit
f0475b635e
|
@ -6,6 +6,7 @@ import (
|
||||||
"github.com/siddontang/go/sync2"
|
"github.com/siddontang/go/sync2"
|
||||||
"github.com/siddontang/ledisdb/ledis"
|
"github.com/siddontang/ledisdb/ledis"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -74,6 +75,13 @@ type client struct {
|
||||||
script *ledis.Multi
|
script *ledis.Multi
|
||||||
|
|
||||||
slaveListeningAddr string
|
slaveListeningAddr string
|
||||||
|
|
||||||
|
quit chan struct{}
|
||||||
|
done chan error
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
fc chan CommandFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(app *App) *client {
|
func newClient(app *App) *client {
|
||||||
|
@ -85,9 +93,35 @@ func newClient(app *App) *client {
|
||||||
|
|
||||||
// c.reqErr = make(chan error)
|
// c.reqErr = make(chan error)
|
||||||
|
|
||||||
|
c.quit = make(chan struct{})
|
||||||
|
c.done = make(chan error, 1)
|
||||||
|
c.fc = make(chan CommandFunc, 1)
|
||||||
|
|
||||||
|
c.wg.Add(1)
|
||||||
|
go c.run()
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *client) close() {
|
||||||
|
close(c.quit)
|
||||||
|
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) run() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
case f := <-c.fc:
|
||||||
|
c.done <- f(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *client) perform() {
|
func (c *client) perform() {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -114,7 +148,9 @@ func (c *client) perform() {
|
||||||
// }()
|
// }()
|
||||||
|
|
||||||
// err = <-c.reqErr
|
// err = <-c.reqErr
|
||||||
err = exeCmd(c)
|
c.fc <- exeCmd
|
||||||
|
|
||||||
|
err = <-c.done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.perform()
|
c.perform()
|
||||||
|
c.client.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) addr(r *http.Request) string {
|
func (c *httpClient) addr(r *http.Request) string {
|
||||||
|
|
|
@ -56,6 +56,8 @@ func (c *respClient) run() {
|
||||||
c.app.info.addClients(1)
|
c.app.info.addClients(1)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
c.client.close()
|
||||||
|
|
||||||
c.app.info.addClients(-1)
|
c.app.info.addClients(-1)
|
||||||
|
|
||||||
if e := recover(); e != nil {
|
if e := recover(); e != nil {
|
||||||
|
@ -82,32 +84,20 @@ func (c *respClient) run() {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
kc := time.Duration(c.app.cfg.ConnKeepaliveInterval) * time.Second
|
kc := time.Duration(c.app.cfg.ConnKeepaliveInterval) * time.Second
|
||||||
done := make(chan error)
|
|
||||||
for {
|
for {
|
||||||
if kc > 0 {
|
if kc > 0 {
|
||||||
c.conn.SetReadDeadline(time.Now().Add(kc))
|
c.conn.SetReadDeadline(time.Now().Add(kc))
|
||||||
}
|
}
|
||||||
|
|
||||||
// I still don't know why use goroutine can improve performance
|
reqData, err := c.readRequest()
|
||||||
// if someone knows and benchamrks with another different result without goroutine, please tell me
|
if err == nil {
|
||||||
go func() {
|
c.handleRequest(reqData)
|
||||||
reqData, err := c.readRequest()
|
}
|
||||||
if err == nil {
|
|
||||||
c.handleRequest(reqData)
|
|
||||||
}
|
|
||||||
|
|
||||||
done <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
// reqData, err := c.readRequest()
|
|
||||||
// if err == nil {
|
|
||||||
// c.handleRequest(reqData)
|
|
||||||
// }
|
|
||||||
|
|
||||||
err := <-done
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue