mirror of https://github.com/ledisdb/ledisdb.git
cut async feature from request handler
This commit is contained in:
parent
715e35098c
commit
d0e7698984
|
@ -42,6 +42,7 @@ func newClientRESP(conn net.Conn, app *App) {
|
|||
|
||||
c.req = newRequestContext(app)
|
||||
c.req.resp = newWriterRESP(conn)
|
||||
c.req.remoteAddr = conn.RemoteAddr().String()
|
||||
|
||||
c.hdl = newRequestHandler(app)
|
||||
|
||||
|
@ -133,9 +134,6 @@ func (c *respClient) readRequest() ([][]byte, error) {
|
|||
func (c *respClient) handleRequest(reqData [][]byte) {
|
||||
req := c.req
|
||||
|
||||
req.db = c.db
|
||||
req.remoteAddr = c.conn.RemoteAddr().String()
|
||||
|
||||
if len(reqData) == 0 {
|
||||
c.req.cmd = ""
|
||||
c.req.args = reqData[0:0]
|
||||
|
@ -144,7 +142,13 @@ func (c *respClient) handleRequest(reqData [][]byte) {
|
|||
c.req.args = reqData[1:]
|
||||
}
|
||||
|
||||
c.hdl.postRequest(req)
|
||||
req.db = c.db
|
||||
|
||||
c.hdl.handle(req)
|
||||
|
||||
c.db = req.db // "SELECT"
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// response writer
|
||||
|
|
|
@ -2,11 +2,8 @@ package server
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/siddontang/go-log/log"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"io"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -36,18 +33,11 @@ type requestContext struct {
|
|||
|
||||
syncBuf bytes.Buffer
|
||||
compressBuf []byte
|
||||
|
||||
finish chan interface{}
|
||||
}
|
||||
|
||||
type requestHandler struct {
|
||||
app *App
|
||||
|
||||
async bool
|
||||
quit chan struct{}
|
||||
jobs *sync.WaitGroup
|
||||
|
||||
reqs chan *requestContext
|
||||
reqErr chan error
|
||||
|
||||
buf bytes.Buffer
|
||||
|
@ -61,7 +51,6 @@ func newRequestContext(app *App) *requestContext {
|
|||
req.db, _ = app.ldb.Select(0) //use default db
|
||||
|
||||
req.compressBuf = make([]byte, 256)
|
||||
req.finish = make(chan interface{}, 1)
|
||||
|
||||
return req
|
||||
}
|
||||
|
@ -70,73 +59,12 @@ func newRequestHandler(app *App) *requestHandler {
|
|||
hdl := new(requestHandler)
|
||||
|
||||
hdl.app = app
|
||||
|
||||
hdl.async = false
|
||||
hdl.jobs = new(sync.WaitGroup)
|
||||
hdl.quit = make(chan struct{})
|
||||
|
||||
hdl.reqs = make(chan *requestContext)
|
||||
hdl.reqErr = make(chan error)
|
||||
|
||||
return hdl
|
||||
}
|
||||
|
||||
func (h *requestHandler) asyncRun() {
|
||||
if !h.async {
|
||||
// todo ... not safe
|
||||
h.async = true
|
||||
go h.run()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *requestHandler) close() {
|
||||
if h.async {
|
||||
close(h.quit)
|
||||
h.jobs.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *requestHandler) run() {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
buf := make([]byte, 4096)
|
||||
n := runtime.Stack(buf, false)
|
||||
buf = buf[0:n]
|
||||
|
||||
log.Fatal("request handler run panic %s:%v", buf, e)
|
||||
}
|
||||
}()
|
||||
|
||||
h.jobs.Add(1)
|
||||
|
||||
var req *requestContext
|
||||
for !h.async {
|
||||
select {
|
||||
case req = <-h.reqs:
|
||||
if req != nil {
|
||||
h.performance(req)
|
||||
}
|
||||
case <-h.quit:
|
||||
h.async = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
h.jobs.Done()
|
||||
return
|
||||
}
|
||||
|
||||
func (h *requestHandler) postRequest(req *requestContext) {
|
||||
if h.async {
|
||||
h.reqs <- req
|
||||
} else {
|
||||
h.performance(req)
|
||||
}
|
||||
|
||||
<-req.finish
|
||||
}
|
||||
|
||||
func (h *requestHandler) performance(req *requestContext) {
|
||||
func (h *requestHandler) handle(req *requestContext) {
|
||||
var err error
|
||||
|
||||
start := time.Now()
|
||||
|
@ -167,7 +95,6 @@ func (h *requestHandler) performance(req *requestContext) {
|
|||
}
|
||||
req.resp.flush()
|
||||
|
||||
req.finish <- nil
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue