diff --git a/server/client_resp.go b/server/client_resp.go index 4d0110a..e7a4114 100644 --- a/server/client_resp.go +++ b/server/client_resp.go @@ -42,6 +42,7 @@ func newClientRESP(conn net.Conn, app *App) { c.req = newRequestContext(app) c.req.resp = newWriterRESP(conn) + c.req.remoteAddr = conn.RemoteAddr().String() c.hdl = newRequestHandler(app) @@ -133,9 +134,6 @@ func (c *respClient) readRequest() ([][]byte, error) { func (c *respClient) handleRequest(reqData [][]byte) { req := c.req - req.db = c.db - req.remoteAddr = c.conn.RemoteAddr().String() - if len(reqData) == 0 { c.req.cmd = "" c.req.args = reqData[0:0] @@ -144,7 +142,13 @@ func (c *respClient) handleRequest(reqData [][]byte) { c.req.args = reqData[1:] } - c.hdl.postRequest(req) + req.db = c.db + + c.hdl.handle(req) + + c.db = req.db // "SELECT" + + return } // response writer diff --git a/server/request.go b/server/request.go index 5d6dacf..f74d162 100644 --- a/server/request.go +++ b/server/request.go @@ -2,11 +2,8 @@ package server import ( "bytes" - "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/ledis" "io" - "runtime" - "sync" "time" ) @@ -36,18 +33,11 @@ type requestContext struct { syncBuf bytes.Buffer compressBuf []byte - - finish chan interface{} } type requestHandler struct { app *App - async bool - quit chan struct{} - jobs *sync.WaitGroup - - reqs chan *requestContext reqErr chan error buf bytes.Buffer @@ -61,7 +51,6 @@ func newRequestContext(app *App) *requestContext { req.db, _ = app.ldb.Select(0) //use default db req.compressBuf = make([]byte, 256) - req.finish = make(chan interface{}, 1) return req } @@ -70,73 +59,12 @@ func newRequestHandler(app *App) *requestHandler { hdl := new(requestHandler) hdl.app = app - - hdl.async = false - hdl.jobs = new(sync.WaitGroup) - hdl.quit = make(chan struct{}) - - hdl.reqs = make(chan *requestContext) hdl.reqErr = make(chan error) return hdl } -func (h *requestHandler) asyncRun() { - if !h.async { - // todo ... not safe - h.async = true - go h.run() - } -} - -func (h *requestHandler) close() { - if h.async { - close(h.quit) - h.jobs.Wait() - } -} - -func (h *requestHandler) run() { - defer func() { - if e := recover(); e != nil { - buf := make([]byte, 4096) - n := runtime.Stack(buf, false) - buf = buf[0:n] - - log.Fatal("request handler run panic %s:%v", buf, e) - } - }() - - h.jobs.Add(1) - - var req *requestContext - for !h.async { - select { - case req = <-h.reqs: - if req != nil { - h.performance(req) - } - case <-h.quit: - h.async = true - break - } - } - - h.jobs.Done() - return -} - -func (h *requestHandler) postRequest(req *requestContext) { - if h.async { - h.reqs <- req - } else { - h.performance(req) - } - - <-req.finish -} - -func (h *requestHandler) performance(req *requestContext) { +func (h *requestHandler) handle(req *requestContext) { var err error start := time.Now() @@ -167,7 +95,6 @@ func (h *requestHandler) performance(req *requestContext) { } req.resp.flush() - req.finish <- nil return }