diff --git a/server/app.go b/server/app.go index 781e097..4cb4b05 100644 --- a/server/app.go +++ b/server/app.go @@ -3,7 +3,6 @@ package server import ( "fmt" "github.com/siddontang/ledisdb/ledis" - . "github.com/siddontang/ledisdb/server/http" "net" "net/http" "path" @@ -134,7 +133,9 @@ func (app *App) httpServe() { mux := http.NewServeMux() - mux.Handle("/", &CmdHandler{app.Ledis()}) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + newClientHTTP(app, w, r) + }) svr := http.Server{Handler: mux} svr.Serve(app.httpListener) diff --git a/server/client_http.go b/server/client_http.go new file mode 100644 index 0000000..a2ae729 --- /dev/null +++ b/server/client_http.go @@ -0,0 +1,254 @@ +package server + +import ( + "fmt" + "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/ledis" + "io" + "net/http" + "strconv" + "strings" + + "encoding/json" + "github.com/ugorji/go/codec" + "gopkg.in/mgo.v2/bson" +) + +var allowedContentTypes = map[string]struct{}{ + "json": struct{}{}, + "bson": struct{}{}, + "msgpack": struct{}{}, +} + +type httpClient struct { + app *App + db *ledis.DB + ldb *ledis.Ledis + + resp responseWriter + req *requestContext +} + +type httpWriter struct { + contentType string + cmd string + w http.ResponseWriter +} + +// http context + +func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) { + var err error + c := new(httpClient) + c.app = app + c.ldb = app.ldb + c.db, err = c.ldb.Select(0) + if err != nil { + w.Write([]byte(err.Error())) + return + } + + c.req, err = c.makeRequest(app, r, w) + if err != nil { + w.Write([]byte(err.Error())) + return + } + c.req.perform() +} + +func (c *httpClient) addr(r *http.Request) string { + addr := r.Header.Get("X-Forwarded-For") + if addr == "" { + addr = r.Header.Get("X-Real-IP") + if addr == "" { + addr = r.Header.Get("Remote-Addr") + } + } + return addr +} + +func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWriter) (*requestContext, error) { + var err error + + db, cmd, argsStr, contentType := c.parseReqPath(r) + + c.db, err = app.ldb.Select(db) + if err != nil { + return nil, err + } + + contentType = strings.ToLower(contentType) + + if _, ok := allowedContentTypes[contentType]; !ok { + return nil, fmt.Errorf("unsupported content type: '%s', only json, bson, msgpack are supported", contentType) + } + + req := newRequestContext(app) + args := make([][]byte, len(argsStr)) + for i, arg := range argsStr { + args[i] = []byte(arg) + } + + req.cmd = cmd + req.args = args + req.remoteAddr = c.addr(r) + req.resp = &httpWriter{contentType, cmd, w} + return req, nil +} + +func (c *httpClient) parseReqPath(r *http.Request) (db int, cmd string, args []string, contentType string) { + + contentType = r.FormValue("type") + if contentType == "" { + contentType = "json" + } + + substrings := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") + if len(substrings) == 1 { + return 0, substrings[0], substrings[1:], contentType + } + db, err := strconv.Atoi(substrings[0]) + if err != nil { + cmd = substrings[0] + args = substrings[1:] + } else { + cmd = substrings[1] + args = substrings[2:] + } + + return +} + +// http writer + +func (w *httpWriter) genericWrite(result interface{}) { + + m := map[string]interface{}{ + w.cmd: result, + } + switch w.contentType { + case "json": + writeJSON(&m, w.w) + case "bson": + writeBSON(&m, w.w) + case "msgpack": + writeMsgPack(&m, w.w) + default: + log.Error("invalid content type %s", w.contentType) + } +} + +func (w *httpWriter) writeError(err error) { + result := [2]interface{}{ + false, + fmt.Sprintf("ERR %s", err.Error()), + } + w.genericWrite(result) +} + +func (w *httpWriter) writeStatus(status string) { + w.genericWrite(status) +} + +func (w *httpWriter) writeInteger(n int64) { + w.genericWrite(n) +} + +func (w *httpWriter) writeBulk(b []byte) { + if b == nil { + w.genericWrite(nil) + } else { + w.genericWrite(ledis.String(b)) + } +} + +func (w *httpWriter) writeArray(lst []interface{}) { + w.genericWrite(lst) +} + +func (w *httpWriter) writeSliceArray(lst [][]byte) { + arr := make([]interface{}, len(lst)) + for i, elem := range lst { + if elem == nil { + arr[i] = nil + } else { + arr[i] = ledis.String(elem) + } + } + w.genericWrite(arr) +} + +func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) { + m := make(map[string]string) + for _, elem := range lst { + m[ledis.String(elem.Field)] = ledis.String(elem.Value) + } + w.genericWrite(m) +} + +func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) { + var arr []string + if withScores { + arr = make([]string, 2*len(lst)) + for i, data := range lst { + arr[2*i] = ledis.String(data.Member) + arr[2*i+1] = strconv.FormatInt(data.Score, 10) + } + } else { + arr = make([]string, len(lst)) + for i, data := range lst { + arr[i] = ledis.String(data.Member) + } + } + w.genericWrite(arr) +} + +func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) { + w.writeError(fmt.Errorf("unsuport")) +} + +func (w *httpWriter) flush() { + +} + +func writeJSON(resutl interface{}, w http.ResponseWriter) { + buf, err := json.Marshal(resutl) + if err != nil { + log.Error(err.Error()) + return + } + + w.Header().Set("Content-type", "application/json; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + + _, err = w.Write(buf) + if err != nil { + log.Error(err.Error()) + } +} + +func writeBSON(result interface{}, w http.ResponseWriter) { + buf, err := bson.Marshal(result) + if err != nil { + log.Error(err.Error()) + return + } + + w.Header().Set("Content-type", "application/octet-stream") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + + _, err = w.Write(buf) + if err != nil { + log.Error(err.Error()) + } +} + +func writeMsgPack(result interface{}, w http.ResponseWriter) { + w.Header().Set("Content-type", "application/octet-stream") + + var mh codec.MsgpackHandle + enc := codec.NewEncoder(w, &mh) + if err := enc.Encode(result); err != nil { + log.Error(err.Error()) + } +} diff --git a/server/client_resp.go b/server/client_resp.go index e7a4114..5252c6a 100644 --- a/server/client_resp.go +++ b/server/client_resp.go @@ -23,7 +23,6 @@ type respClient struct { rb *bufio.Reader req *requestContext - hdl *requestHandler } type respWriter struct { @@ -44,8 +43,6 @@ func newClientRESP(conn net.Conn, app *App) { c.req.resp = newWriterRESP(conn) c.req.remoteAddr = conn.RemoteAddr().String() - c.hdl = newRequestHandler(app) - go c.run() } @@ -144,7 +141,7 @@ func (c *respClient) handleRequest(reqData [][]byte) { req.db = c.db - c.hdl.handle(req) + c.req.perform() c.db = req.db // "SELECT" diff --git a/server/request.go b/server/request.go index f74d162..5c80284 100644 --- a/server/request.go +++ b/server/request.go @@ -33,10 +33,6 @@ type requestContext struct { syncBuf bytes.Buffer compressBuf []byte -} - -type requestHandler struct { - app *App reqErr chan error @@ -51,20 +47,12 @@ func newRequestContext(app *App) *requestContext { req.db, _ = app.ldb.Select(0) //use default db req.compressBuf = make([]byte, 256) + req.reqErr = make(chan error) return req } -func newRequestHandler(app *App) *requestHandler { - hdl := new(requestHandler) - - hdl.app = app - hdl.reqErr = make(chan error) - - return hdl -} - -func (h *requestHandler) handle(req *requestContext) { +func (req *requestContext) perform() { var err error start := time.Now() @@ -75,26 +63,30 @@ func (h *requestHandler) handle(req *requestContext) { err = ErrNotFound } else { go func() { - h.reqErr <- exeCmd(req) + req.reqErr <- exeCmd(req) }() - err = <-h.reqErr + err = <-req.reqErr } duration := time.Since(start) - if h.app.access != nil { - fullCmd := h.catGenericCommand(req) + if req.app.access != nil { + fullCmd := req.catGenericCommand() cost := duration.Nanoseconds() / 1000000 - h.app.access.Log(req.remoteAddr, cost, fullCmd[:256], err) + truncateLen := len(fullCmd) + if truncateLen > 256 { + truncateLen = 256 + } + + req.app.access.Log(req.remoteAddr, cost, fullCmd[:truncateLen], err) } if err != nil { req.resp.writeError(err) } req.resp.flush() - return } @@ -109,8 +101,8 @@ func (h *requestHandler) handle(req *requestContext) { // return h.catGenericCommand(req) // } -func (h *requestHandler) catGenericCommand(req *requestContext) []byte { - buffer := h.buf +func (req *requestContext) catGenericCommand() []byte { + buffer := req.buf buffer.Reset() buffer.Write([]byte(req.cmd))