ledisdb/server/client_http.go

258 lines
5.2 KiB
Go
Raw Permalink Normal View History

2014-08-01 07:42:16 +04:00
package server
import (
2014-08-14 11:24:41 +04:00
"encoding/json"
2014-08-01 07:42:16 +04:00
"fmt"
2015-05-04 17:42:28 +03:00
"io"
"net/http"
"strconv"
"strings"
2014-09-24 05:46:36 +04:00
"github.com/siddontang/go/bson"
2014-09-24 08:34:21 +04:00
"github.com/siddontang/go/hack"
2014-09-24 05:46:36 +04:00
"github.com/siddontang/go/log"
2014-08-01 07:42:16 +04:00
"github.com/siddontang/ledisdb/ledis"
2014-08-14 11:24:41 +04:00
"github.com/ugorji/go/codec"
2014-08-01 07:42:16 +04:00
)
var allowedContentTypes = map[string]struct{}{
"json": struct{}{},
"bson": struct{}{},
"msgpack": struct{}{},
}
2014-08-25 10:18:23 +04:00
var httpUnsupportedCommands = map[string]struct{}{
2014-08-04 06:38:43 +04:00
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
2014-08-04 07:06:28 +04:00
"quit": struct{}{},
2014-09-25 06:44:07 +04:00
"begin": struct{}{},
"commit": struct{}{},
"rollback": struct{}{},
2014-08-04 06:28:09 +04:00
}
2014-08-01 07:42:16 +04:00
type httpClient struct {
2014-08-25 10:18:23 +04:00
*client
2014-08-01 07:42:16 +04:00
}
type httpWriter struct {
contentType string
cmd string
w http.ResponseWriter
}
func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) {
app.connWait.Add(1)
defer app.connWait.Done()
2014-08-01 07:42:16 +04:00
var err error
c := new(httpClient)
2014-12-26 05:08:37 +03:00
c.client = newClient(app)
2014-08-01 07:42:16 +04:00
2014-08-25 10:18:23 +04:00
err = c.makeRequest(app, r, w)
2014-08-01 07:42:16 +04:00
if err != nil {
2014-12-26 05:08:37 +03:00
c.client.close()
2014-08-01 07:42:16 +04:00
w.Write([]byte(err.Error()))
return
}
2014-08-25 10:18:23 +04:00
c.perform()
c.client.close()
2014-08-01 07:42:16 +04:00
}
func (c *httpClient) addr(r *http.Request) string {
2014-08-04 06:28:09 +04:00
return r.RemoteAddr
2014-08-01 07:42:16 +04:00
}
2014-08-25 10:18:23 +04:00
func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWriter) error {
2014-08-01 07:42:16 +04:00
var err error
db, cmd, argsStr, contentType := c.parseReqPath(r)
c.db, err = app.ldb.Select(db)
if err != nil {
2014-08-25 10:18:23 +04:00
return err
2014-08-01 07:42:16 +04:00
}
contentType = strings.ToLower(contentType)
if _, ok := allowedContentTypes[contentType]; !ok {
2014-08-25 10:18:23 +04:00
return fmt.Errorf("unsupported content type: '%s', only json, bson, msgpack are supported", contentType)
2014-08-01 07:42:16 +04:00
}
args := make([][]byte, len(argsStr))
for i, arg := range argsStr {
args[i] = []byte(arg)
}
2014-08-25 10:18:23 +04:00
c.cmd = strings.ToLower(cmd)
if _, ok := httpUnsupportedCommands[c.cmd]; ok {
return fmt.Errorf("unsupported command: '%s'", cmd)
}
2014-08-04 06:38:43 +04:00
2014-08-25 10:18:23 +04:00
c.args = args
2014-08-25 10:18:23 +04:00
c.remoteAddr = c.addr(r)
c.resp = &httpWriter{contentType, cmd, w}
return nil
2014-08-01 07:42:16 +04:00
}
func (c *httpClient) parseReqPath(r *http.Request) (db int, cmd string, args []string, contentType string) {
contentType = r.FormValue("type")
if contentType == "" {
contentType = "json"
}
substrings := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/")
if len(substrings) == 1 {
return 0, substrings[0], substrings[1:], contentType
}
db, err := strconv.Atoi(substrings[0])
if err != nil {
cmd = substrings[0]
args = substrings[1:]
} else {
cmd = substrings[1]
args = substrings[2:]
}
return
}
// http writer
func (w *httpWriter) genericWrite(result interface{}) {
m := map[string]interface{}{
w.cmd: result,
}
switch w.contentType {
case "json":
writeJSON(&m, w.w)
case "bson":
writeBSON(&m, w.w)
case "msgpack":
writeMsgPack(&m, w.w)
default:
2015-01-12 05:32:03 +03:00
log.Errorf("invalid content type %s", w.contentType)
2014-08-01 07:42:16 +04:00
}
}
func (w *httpWriter) writeError(err error) {
result := [2]interface{}{
false,
fmt.Sprintf("ERR %s", err.Error()),
}
w.genericWrite(result)
}
func (w *httpWriter) writeStatus(status string) {
var success bool
if status == OK || status == PONG {
success = true
}
w.genericWrite([]interface{}{success, status})
2014-08-01 07:42:16 +04:00
}
func (w *httpWriter) writeInteger(n int64) {
w.genericWrite(n)
}
func (w *httpWriter) writeBulk(b []byte) {
if b == nil {
w.genericWrite(nil)
} else {
2014-09-24 08:34:21 +04:00
w.genericWrite(hack.String(b))
2014-08-01 07:42:16 +04:00
}
}
func (w *httpWriter) writeArray(lst []interface{}) {
w.genericWrite(lst)
}
func (w *httpWriter) writeSliceArray(lst [][]byte) {
arr := make([]interface{}, len(lst))
for i, elem := range lst {
if elem == nil {
arr[i] = nil
} else {
2014-09-24 08:34:21 +04:00
arr[i] = hack.String(elem)
2014-08-01 07:42:16 +04:00
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) {
m := make(map[string]string)
for _, elem := range lst {
2014-09-24 08:34:21 +04:00
m[hack.String(elem.Field)] = hack.String(elem.Value)
2014-08-01 07:42:16 +04:00
}
w.genericWrite(m)
}
func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
var arr []string
if withScores {
arr = make([]string, 2*len(lst))
for i, data := range lst {
2014-09-24 08:34:21 +04:00
arr[2*i] = hack.String(data.Member)
2014-08-01 07:42:16 +04:00
arr[2*i+1] = strconv.FormatInt(data.Score, 10)
}
} else {
arr = make([]string, len(lst))
for i, data := range lst {
2014-09-24 08:34:21 +04:00
arr[i] = hack.String(data.Member)
2014-08-01 07:42:16 +04:00
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) {
2014-08-14 06:37:27 +04:00
w.writeError(fmt.Errorf("unsupport"))
2014-08-01 07:42:16 +04:00
}
func (w *httpWriter) flush() {
}
func writeJSON(resutl interface{}, w http.ResponseWriter) {
buf, err := json.Marshal(resutl)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/json; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeBSON(result interface{}, w http.ResponseWriter) {
buf, err := bson.Marshal(result)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeMsgPack(result interface{}, w http.ResponseWriter) {
w.Header().Set("Content-type", "application/octet-stream")
var mh codec.MsgpackHandle
enc := codec.NewEncoder(w, &mh)
if err := enc.Encode(result); err != nil {
log.Error(err.Error())
}
}