refactor http interface

This commit is contained in:
wenyekui 2014-08-01 11:42:16 +08:00
parent d0e7698984
commit e46554e9f6
4 changed files with 272 additions and 28 deletions

View File

@ -3,7 +3,6 @@ package server
import (
"fmt"
"github.com/siddontang/ledisdb/ledis"
. "github.com/siddontang/ledisdb/server/http"
"net"
"net/http"
"path"
@ -134,7 +133,9 @@ func (app *App) httpServe() {
mux := http.NewServeMux()
mux.Handle("/", &CmdHandler{app.Ledis()})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
newClientHTTP(app, w, r)
})
svr := http.Server{Handler: mux}
svr.Serve(app.httpListener)

254
server/client_http.go Normal file
View File

@ -0,0 +1,254 @@
package server
import (
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net/http"
"strconv"
"strings"
"encoding/json"
"github.com/ugorji/go/codec"
"gopkg.in/mgo.v2/bson"
)
var allowedContentTypes = map[string]struct{}{
"json": struct{}{},
"bson": struct{}{},
"msgpack": struct{}{},
}
type httpClient struct {
app *App
db *ledis.DB
ldb *ledis.Ledis
resp responseWriter
req *requestContext
}
type httpWriter struct {
contentType string
cmd string
w http.ResponseWriter
}
// http context
func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) {
var err error
c := new(httpClient)
c.app = app
c.ldb = app.ldb
c.db, err = c.ldb.Select(0)
if err != nil {
w.Write([]byte(err.Error()))
return
}
c.req, err = c.makeRequest(app, r, w)
if err != nil {
w.Write([]byte(err.Error()))
return
}
c.req.perform()
}
func (c *httpClient) addr(r *http.Request) string {
addr := r.Header.Get("X-Forwarded-For")
if addr == "" {
addr = r.Header.Get("X-Real-IP")
if addr == "" {
addr = r.Header.Get("Remote-Addr")
}
}
return addr
}
func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWriter) (*requestContext, error) {
var err error
db, cmd, argsStr, contentType := c.parseReqPath(r)
c.db, err = app.ldb.Select(db)
if err != nil {
return nil, err
}
contentType = strings.ToLower(contentType)
if _, ok := allowedContentTypes[contentType]; !ok {
return nil, fmt.Errorf("unsupported content type: '%s', only json, bson, msgpack are supported", contentType)
}
req := newRequestContext(app)
args := make([][]byte, len(argsStr))
for i, arg := range argsStr {
args[i] = []byte(arg)
}
req.cmd = cmd
req.args = args
req.remoteAddr = c.addr(r)
req.resp = &httpWriter{contentType, cmd, w}
return req, nil
}
func (c *httpClient) parseReqPath(r *http.Request) (db int, cmd string, args []string, contentType string) {
contentType = r.FormValue("type")
if contentType == "" {
contentType = "json"
}
substrings := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/")
if len(substrings) == 1 {
return 0, substrings[0], substrings[1:], contentType
}
db, err := strconv.Atoi(substrings[0])
if err != nil {
cmd = substrings[0]
args = substrings[1:]
} else {
cmd = substrings[1]
args = substrings[2:]
}
return
}
// http writer
func (w *httpWriter) genericWrite(result interface{}) {
m := map[string]interface{}{
w.cmd: result,
}
switch w.contentType {
case "json":
writeJSON(&m, w.w)
case "bson":
writeBSON(&m, w.w)
case "msgpack":
writeMsgPack(&m, w.w)
default:
log.Error("invalid content type %s", w.contentType)
}
}
func (w *httpWriter) writeError(err error) {
result := [2]interface{}{
false,
fmt.Sprintf("ERR %s", err.Error()),
}
w.genericWrite(result)
}
func (w *httpWriter) writeStatus(status string) {
w.genericWrite(status)
}
func (w *httpWriter) writeInteger(n int64) {
w.genericWrite(n)
}
func (w *httpWriter) writeBulk(b []byte) {
if b == nil {
w.genericWrite(nil)
} else {
w.genericWrite(ledis.String(b))
}
}
func (w *httpWriter) writeArray(lst []interface{}) {
w.genericWrite(lst)
}
func (w *httpWriter) writeSliceArray(lst [][]byte) {
arr := make([]interface{}, len(lst))
for i, elem := range lst {
if elem == nil {
arr[i] = nil
} else {
arr[i] = ledis.String(elem)
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) {
m := make(map[string]string)
for _, elem := range lst {
m[ledis.String(elem.Field)] = ledis.String(elem.Value)
}
w.genericWrite(m)
}
func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
var arr []string
if withScores {
arr = make([]string, 2*len(lst))
for i, data := range lst {
arr[2*i] = ledis.String(data.Member)
arr[2*i+1] = strconv.FormatInt(data.Score, 10)
}
} else {
arr = make([]string, len(lst))
for i, data := range lst {
arr[i] = ledis.String(data.Member)
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) {
w.writeError(fmt.Errorf("unsuport"))
}
func (w *httpWriter) flush() {
}
func writeJSON(resutl interface{}, w http.ResponseWriter) {
buf, err := json.Marshal(resutl)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/json; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeBSON(result interface{}, w http.ResponseWriter) {
buf, err := bson.Marshal(result)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeMsgPack(result interface{}, w http.ResponseWriter) {
w.Header().Set("Content-type", "application/octet-stream")
var mh codec.MsgpackHandle
enc := codec.NewEncoder(w, &mh)
if err := enc.Encode(result); err != nil {
log.Error(err.Error())
}
}

View File

@ -23,7 +23,6 @@ type respClient struct {
rb *bufio.Reader
req *requestContext
hdl *requestHandler
}
type respWriter struct {
@ -44,8 +43,6 @@ func newClientRESP(conn net.Conn, app *App) {
c.req.resp = newWriterRESP(conn)
c.req.remoteAddr = conn.RemoteAddr().String()
c.hdl = newRequestHandler(app)
go c.run()
}
@ -144,7 +141,7 @@ func (c *respClient) handleRequest(reqData [][]byte) {
req.db = c.db
c.hdl.handle(req)
c.req.perform()
c.db = req.db // "SELECT"

View File

@ -33,10 +33,6 @@ type requestContext struct {
syncBuf bytes.Buffer
compressBuf []byte
}
type requestHandler struct {
app *App
reqErr chan error
@ -51,20 +47,12 @@ func newRequestContext(app *App) *requestContext {
req.db, _ = app.ldb.Select(0) //use default db
req.compressBuf = make([]byte, 256)
req.reqErr = make(chan error)
return req
}
func newRequestHandler(app *App) *requestHandler {
hdl := new(requestHandler)
hdl.app = app
hdl.reqErr = make(chan error)
return hdl
}
func (h *requestHandler) handle(req *requestContext) {
func (req *requestContext) perform() {
var err error
start := time.Now()
@ -75,26 +63,30 @@ func (h *requestHandler) handle(req *requestContext) {
err = ErrNotFound
} else {
go func() {
h.reqErr <- exeCmd(req)
req.reqErr <- exeCmd(req)
}()
err = <-h.reqErr
err = <-req.reqErr
}
duration := time.Since(start)
if h.app.access != nil {
fullCmd := h.catGenericCommand(req)
if req.app.access != nil {
fullCmd := req.catGenericCommand()
cost := duration.Nanoseconds() / 1000000
h.app.access.Log(req.remoteAddr, cost, fullCmd[:256], err)
truncateLen := len(fullCmd)
if truncateLen > 256 {
truncateLen = 256
}
req.app.access.Log(req.remoteAddr, cost, fullCmd[:truncateLen], err)
}
if err != nil {
req.resp.writeError(err)
}
req.resp.flush()
return
}
@ -109,8 +101,8 @@ func (h *requestHandler) handle(req *requestContext) {
// return h.catGenericCommand(req)
// }
func (h *requestHandler) catGenericCommand(req *requestContext) []byte {
buffer := h.buf
func (req *requestContext) catGenericCommand() []byte {
buffer := req.buf
buffer.Reset()
buffer.Write([]byte(req.cmd))