diff --git a/bootstrap.sh b/bootstrap.sh index 6440466..e4c7a5b 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -2,12 +2,16 @@ . ./dev.sh -go get github.com/siddontang/go-log/log -go get github.com/siddontang/go-snappy/snappy -go get github.com/siddontang/copier +go get -u github.com/siddontang/go-log/log +go get -u github.com/siddontang/go-snappy/snappy +go get -u github.com/siddontang/copier -go get github.com/siddontang/goleveldb/leveldb +go get -u github.com/siddontang/goleveldb/leveldb -go get github.com/szferi/gomdb +go get -u github.com/szferi/gomdb + +go get -u github.com/boltdb/bolt + +go get -u gopkg.in/mgo.v2/bson +go get -u github.com/ugorji/go/codec -go get github.com/boltdb/bolt \ No newline at end of file diff --git a/etc/ledis.json b/etc/ledis.json index 8303f5e..d788c34 100644 --- a/etc/ledis.json +++ b/etc/ledis.json @@ -1,5 +1,6 @@ { "addr": "127.0.0.1:6380", + "http_addr": "127.0.0.1:11181", "data_dir": "/tmp/ledis_server", "db": { diff --git a/server/app.go b/server/app.go index 3f0d550..a89e7ec 100644 --- a/server/app.go +++ b/server/app.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/siddontang/ledisdb/ledis" "net" + "net/http" "path" "strings" ) @@ -11,7 +12,8 @@ import ( type App struct { cfg *Config - listener net.Listener + listener net.Listener + httpListener net.Listener ldb *ledis.Ledis @@ -25,6 +27,14 @@ type App struct { m *master } +func netType(s string) string { + if strings.Contains(s, "/") { + return "unix" + } else { + return "tcp" + } +} + func NewApp(cfg *Config) (*App, error) { if len(cfg.DataDir) == 0 { return nil, fmt.Errorf("must set data_dir first") @@ -40,14 +50,14 @@ func NewApp(cfg *Config) (*App, error) { var err error - if strings.Contains(cfg.Addr, "/") { - app.listener, err = net.Listen("unix", cfg.Addr) - } else { - app.listener, err = net.Listen("tcp", cfg.Addr) + if app.listener, err = net.Listen(netType(cfg.Addr), cfg.Addr); err != nil { + return nil, err } - if err != nil { - return nil, err + if len(cfg.HttpAddr) > 0 { + if app.httpListener, err = net.Listen(netType(cfg.HttpAddr), cfg.HttpAddr); err != nil { + return nil, err + } } if len(cfg.AccessLog) > 0 { @@ -82,6 +92,10 @@ func (app *App) Close() { app.listener.Close() + if app.httpListener != nil { + app.httpListener.Close() + } + app.m.Close() if app.access != nil { @@ -96,16 +110,33 @@ func (app *App) Run() { app.slaveof(app.cfg.SlaveOf) } + go app.httpServe() + for !app.closed { conn, err := app.listener.Accept() if err != nil { continue } - newClient(conn, app) + newClientRESP(conn, app) } } +func (app *App) httpServe() { + if app.httpListener == nil { + return + } + + mux := http.NewServeMux() + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + newClientHTTP(app, w, r) + }) + + svr := http.Server{Handler: mux} + svr.Serve(app.httpListener) +} + func (app *App) Ledis() *ledis.Ledis { return app.ldb } diff --git a/server/client.go b/server/client.go deleted file mode 100644 index 11c9f97..0000000 --- a/server/client.go +++ /dev/null @@ -1,315 +0,0 @@ -package server - -import ( - "bufio" - "bytes" - "errors" - "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/ledis" - "io" - "net" - "runtime" - "strconv" - "strings" - "time" -) - -var errReadRequest = errors.New("invalid request protocol") - -type client struct { - app *App - ldb *ledis.Ledis - - db *ledis.DB - c net.Conn - - rb *bufio.Reader - wb *bufio.Writer - - cmd string - args [][]byte - - reqC chan error - - syncBuf bytes.Buffer - - compressBuf []byte - - logBuf bytes.Buffer -} - -func newClient(c net.Conn, app *App) { - co := new(client) - - co.app = app - co.ldb = app.ldb - //use default db - co.db, _ = app.ldb.Select(0) - co.c = c - - co.rb = bufio.NewReaderSize(c, 256) - co.wb = bufio.NewWriterSize(c, 256) - - co.reqC = make(chan error, 1) - - co.compressBuf = make([]byte, 256) - - go co.run() -} - -func (c *client) run() { - defer func() { - if e := recover(); e != nil { - buf := make([]byte, 4096) - n := runtime.Stack(buf, false) - buf = buf[0:n] - - log.Fatal("client run panic %s:%v", buf, e) - } - - c.c.Close() - }() - - for { - req, err := c.readRequest() - if err != nil { - return - } - - c.handleRequest(req) - } -} - -func (c *client) readLine() ([]byte, error) { - return ReadLine(c.rb) -} - -//A client sends to the Redis server a RESP Array consisting of just Bulk Strings. -func (c *client) readRequest() ([][]byte, error) { - l, err := c.readLine() - if err != nil { - return nil, err - } else if len(l) == 0 || l[0] != '*' { - return nil, errReadRequest - } - - var nparams int - if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil { - return nil, err - } else if nparams <= 0 { - return nil, errReadRequest - } - - req := make([][]byte, 0, nparams) - var n int - for i := 0; i < nparams; i++ { - if l, err = c.readLine(); err != nil { - return nil, err - } - - if len(l) == 0 { - return nil, errReadRequest - } else if l[0] == '$' { - //handle resp string - if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil { - return nil, err - } else if n == -1 { - req = append(req, nil) - } else { - buf := make([]byte, n) - if _, err = io.ReadFull(c.rb, buf); err != nil { - return nil, err - } - - if l, err = c.readLine(); err != nil { - return nil, err - } else if len(l) != 0 { - return nil, errors.New("bad bulk string format") - } - - req = append(req, buf) - - } - - } else { - return nil, errReadRequest - } - } - - return req, nil -} - -func (c *client) handleRequest(req [][]byte) { - var err error - - start := time.Now() - - if len(req) == 0 { - err = ErrEmptyCommand - } else { - c.cmd = strings.ToLower(ledis.String(req[0])) - c.args = req[1:] - - f, ok := regCmds[c.cmd] - if !ok { - err = ErrNotFound - } else { - go func() { - c.reqC <- f(c) - }() - err = <-c.reqC - } - } - - duration := time.Since(start) - - if c.app.access != nil { - c.logBuf.Reset() - for i, r := range req { - left := 256 - c.logBuf.Len() - if left <= 0 { - break - } else if len(r) <= left { - c.logBuf.Write(r) - if i != len(req)-1 { - c.logBuf.WriteByte(' ') - } - } else { - c.logBuf.Write(r[0:left]) - } - } - - c.app.access.Log(c.c.RemoteAddr().String(), duration.Nanoseconds()/1000000, c.logBuf.Bytes(), err) - } - - if err != nil { - c.writeError(err) - } - - c.wb.Flush() -} - -func (c *client) writeError(err error) { - c.wb.Write(ledis.Slice("-ERR")) - if err != nil { - c.wb.WriteByte(' ') - c.wb.Write(ledis.Slice(err.Error())) - } - c.wb.Write(Delims) -} - -func (c *client) writeStatus(status string) { - c.wb.WriteByte('+') - c.wb.Write(ledis.Slice(status)) - c.wb.Write(Delims) -} - -func (c *client) writeInteger(n int64) { - c.wb.WriteByte(':') - c.wb.Write(ledis.StrPutInt64(n)) - c.wb.Write(Delims) -} - -func (c *client) writeBulk(b []byte) { - c.wb.WriteByte('$') - if b == nil { - c.wb.Write(NullBulk) - } else { - c.wb.Write(ledis.Slice(strconv.Itoa(len(b)))) - c.wb.Write(Delims) - c.wb.Write(b) - } - - c.wb.Write(Delims) -} - -func (c *client) writeArray(ay []interface{}) { - c.wb.WriteByte('*') - if ay == nil { - c.wb.Write(NullArray) - c.wb.Write(Delims) - } else { - c.wb.Write(ledis.Slice(strconv.Itoa(len(ay)))) - c.wb.Write(Delims) - - for i := 0; i < len(ay); i++ { - switch v := ay[i].(type) { - case []interface{}: - c.writeArray(v) - case []byte: - c.writeBulk(v) - case nil: - c.writeBulk(nil) - case int64: - c.writeInteger(v) - default: - panic("invalid array type") - } - } - } -} - -func (c *client) writeSliceArray(ay [][]byte) { - c.wb.WriteByte('*') - if ay == nil { - c.wb.Write(NullArray) - c.wb.Write(Delims) - } else { - c.wb.Write(ledis.Slice(strconv.Itoa(len(ay)))) - c.wb.Write(Delims) - - for i := 0; i < len(ay); i++ { - c.writeBulk(ay[i]) - } - } -} - -func (c *client) writeFVPairArray(ay []ledis.FVPair) { - c.wb.WriteByte('*') - if ay == nil { - c.wb.Write(NullArray) - c.wb.Write(Delims) - } else { - c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2))) - c.wb.Write(Delims) - - for i := 0; i < len(ay); i++ { - c.writeBulk(ay[i].Field) - c.writeBulk(ay[i].Value) - } - } -} - -func (c *client) writeScorePairArray(ay []ledis.ScorePair, withScores bool) { - c.wb.WriteByte('*') - if ay == nil { - c.wb.Write(NullArray) - c.wb.Write(Delims) - } else { - if withScores { - c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2))) - c.wb.Write(Delims) - } else { - c.wb.Write(ledis.Slice(strconv.Itoa(len(ay)))) - c.wb.Write(Delims) - - } - - for i := 0; i < len(ay); i++ { - c.writeBulk(ay[i].Member) - - if withScores { - c.writeBulk(ledis.StrPutInt64(ay[i].Score)) - } - } - } -} - -func (c *client) writeBulkFrom(n int64, rb io.Reader) { - c.wb.WriteByte('$') - c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10))) - c.wb.Write(Delims) - - io.Copy(c.wb, rb) - c.wb.Write(Delims) -} diff --git a/server/client_http.go b/server/client_http.go new file mode 100644 index 0000000..9a4057b --- /dev/null +++ b/server/client_http.go @@ -0,0 +1,260 @@ +package server + +import ( + "fmt" + "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/ledis" + "io" + "net/http" + "strconv" + "strings" + + "encoding/json" + "github.com/ugorji/go/codec" + "gopkg.in/mgo.v2/bson" +) + +var allowedContentTypes = map[string]struct{}{ + "json": struct{}{}, + "bson": struct{}{}, + "msgpack": struct{}{}, +} +var unsopportedCommands = map[string]struct{}{ + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "quit": struct{}{}, +} + +type httpClient struct { + app *App + db *ledis.DB + ldb *ledis.Ledis + + resp responseWriter + req *requestContext +} + +type httpWriter struct { + contentType string + cmd string + w http.ResponseWriter +} + +func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) { + var err error + c := new(httpClient) + c.app = app + c.ldb = app.ldb + c.db, err = c.ldb.Select(0) + if err != nil { + w.Write([]byte(err.Error())) + return + } + + c.req, err = c.makeRequest(app, r, w) + if err != nil { + w.Write([]byte(err.Error())) + return + } + c.req.perform() +} + +func (c *httpClient) addr(r *http.Request) string { + return r.RemoteAddr +} + +func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWriter) (*requestContext, error) { + var err error + + db, cmd, argsStr, contentType := c.parseReqPath(r) + + c.db, err = app.ldb.Select(db) + if err != nil { + return nil, err + } + + contentType = strings.ToLower(contentType) + + if _, ok := allowedContentTypes[contentType]; !ok { + return nil, fmt.Errorf("unsupported content type: '%s', only json, bson, msgpack are supported", contentType) + } + + req := newRequestContext(app) + args := make([][]byte, len(argsStr)) + for i, arg := range argsStr { + args[i] = []byte(arg) + } + + req.cmd = strings.ToLower(cmd) + if _, ok := unsopportedCommands[req.cmd]; ok { + return nil, fmt.Errorf("unsupported command: '%s'", cmd) + } + + req.args = args + + req.remoteAddr = c.addr(r) + req.resp = &httpWriter{contentType, cmd, w} + return req, nil +} + +func (c *httpClient) parseReqPath(r *http.Request) (db int, cmd string, args []string, contentType string) { + + contentType = r.FormValue("type") + if contentType == "" { + contentType = "json" + } + + substrings := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") + if len(substrings) == 1 { + return 0, substrings[0], substrings[1:], contentType + } + db, err := strconv.Atoi(substrings[0]) + if err != nil { + cmd = substrings[0] + args = substrings[1:] + } else { + cmd = substrings[1] + args = substrings[2:] + } + + return +} + +// http writer + +func (w *httpWriter) genericWrite(result interface{}) { + + m := map[string]interface{}{ + w.cmd: result, + } + switch w.contentType { + case "json": + writeJSON(&m, w.w) + case "bson": + writeBSON(&m, w.w) + case "msgpack": + writeMsgPack(&m, w.w) + default: + log.Error("invalid content type %s", w.contentType) + } +} + +func (w *httpWriter) writeError(err error) { + result := [2]interface{}{ + false, + fmt.Sprintf("ERR %s", err.Error()), + } + w.genericWrite(result) +} + +func (w *httpWriter) writeStatus(status string) { + var success bool + if status == OK || status == PONG { + success = true + } + w.genericWrite([]interface{}{success, status}) +} + +func (w *httpWriter) writeInteger(n int64) { + w.genericWrite(n) +} + +func (w *httpWriter) writeBulk(b []byte) { + if b == nil { + w.genericWrite(nil) + } else { + w.genericWrite(ledis.String(b)) + } +} + +func (w *httpWriter) writeArray(lst []interface{}) { + w.genericWrite(lst) +} + +func (w *httpWriter) writeSliceArray(lst [][]byte) { + arr := make([]interface{}, len(lst)) + for i, elem := range lst { + if elem == nil { + arr[i] = nil + } else { + arr[i] = ledis.String(elem) + } + } + w.genericWrite(arr) +} + +func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) { + m := make(map[string]string) + for _, elem := range lst { + m[ledis.String(elem.Field)] = ledis.String(elem.Value) + } + w.genericWrite(m) +} + +func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) { + var arr []string + if withScores { + arr = make([]string, 2*len(lst)) + for i, data := range lst { + arr[2*i] = ledis.String(data.Member) + arr[2*i+1] = strconv.FormatInt(data.Score, 10) + } + } else { + arr = make([]string, len(lst)) + for i, data := range lst { + arr[i] = ledis.String(data.Member) + } + } + w.genericWrite(arr) +} + +func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) { + w.writeError(fmt.Errorf("unsuport")) +} + +func (w *httpWriter) flush() { + +} + +func writeJSON(resutl interface{}, w http.ResponseWriter) { + buf, err := json.Marshal(resutl) + if err != nil { + log.Error(err.Error()) + return + } + + w.Header().Set("Content-type", "application/json; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + + _, err = w.Write(buf) + if err != nil { + log.Error(err.Error()) + } +} + +func writeBSON(result interface{}, w http.ResponseWriter) { + buf, err := bson.Marshal(result) + if err != nil { + log.Error(err.Error()) + return + } + + w.Header().Set("Content-type", "application/octet-stream") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + + _, err = w.Write(buf) + if err != nil { + log.Error(err.Error()) + } +} + +func writeMsgPack(result interface{}, w http.ResponseWriter) { + w.Header().Set("Content-type", "application/octet-stream") + + var mh codec.MsgpackHandle + enc := codec.NewEncoder(w, &mh) + if err := enc.Encode(result); err != nil { + log.Error(err.Error()) + } +} diff --git a/server/client_resp.go b/server/client_resp.go new file mode 100644 index 0000000..1457b81 --- /dev/null +++ b/server/client_resp.go @@ -0,0 +1,291 @@ +package server + +import ( + "bufio" + "errors" + "github.com/siddontang/go-log/log" + "github.com/siddontang/ledisdb/ledis" + "io" + "net" + "runtime" + "strconv" + "strings" +) + +var errReadRequest = errors.New("invalid request protocol") + +type respClient struct { + app *App + ldb *ledis.Ledis + db *ledis.DB + + conn net.Conn + rb *bufio.Reader + + req *requestContext +} + +type respWriter struct { + buff *bufio.Writer +} + +func newClientRESP(conn net.Conn, app *App) { + c := new(respClient) + + c.app = app + c.conn = conn + c.ldb = app.ldb + c.db, _ = app.ldb.Select(0) + + c.rb = bufio.NewReaderSize(conn, 256) + + c.req = newRequestContext(app) + c.req.resp = newWriterRESP(conn) + c.req.remoteAddr = conn.RemoteAddr().String() + + go c.run() +} + +func (c *respClient) run() { + defer func() { + if e := recover(); e != nil { + buf := make([]byte, 4096) + n := runtime.Stack(buf, false) + buf = buf[0:n] + + log.Fatal("client run panic %s:%v", buf, e) + } + + c.conn.Close() + }() + + for { + reqData, err := c.readRequest() + if err != nil { + return + } + + c.handleRequest(reqData) + } +} + +func (c *respClient) readLine() ([]byte, error) { + return ReadLine(c.rb) +} + +//A client sends to the Redis server a RESP Array consisting of just Bulk Strings. +func (c *respClient) readRequest() ([][]byte, error) { + l, err := c.readLine() + if err != nil { + return nil, err + } else if len(l) == 0 || l[0] != '*' { + return nil, errReadRequest + } + + var nparams int + if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil { + return nil, err + } else if nparams <= 0 { + return nil, errReadRequest + } + + req := make([][]byte, 0, nparams) + var n int + for i := 0; i < nparams; i++ { + if l, err = c.readLine(); err != nil { + return nil, err + } + + if len(l) == 0 { + return nil, errReadRequest + } else if l[0] == '$' { + //handle resp string + if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil { + return nil, err + } else if n == -1 { + req = append(req, nil) + } else { + buf := make([]byte, n) + if _, err = io.ReadFull(c.rb, buf); err != nil { + return nil, err + } + + if l, err = c.readLine(); err != nil { + return nil, err + } else if len(l) != 0 { + return nil, errors.New("bad bulk string format") + } + + req = append(req, buf) + + } + + } else { + return nil, errReadRequest + } + } + + return req, nil +} + +func (c *respClient) handleRequest(reqData [][]byte) { + req := c.req + + if len(reqData) == 0 { + c.req.cmd = "" + c.req.args = reqData[0:0] + } else { + c.req.cmd = strings.ToLower(ledis.String(reqData[0])) + c.req.args = reqData[1:] + } + if c.req.cmd == "quit" { + c.req.resp.writeStatus(OK) + c.req.resp.flush() + c.conn.Close() + } + + req.db = c.db + + c.req.perform() + + c.db = req.db // "SELECT" + + return +} + +// response writer + +func newWriterRESP(conn net.Conn) *respWriter { + w := new(respWriter) + w.buff = bufio.NewWriterSize(conn, 256) + return w +} + +func (w *respWriter) writeError(err error) { + w.buff.Write(ledis.Slice("-ERR")) + if err != nil { + w.buff.WriteByte(' ') + w.buff.Write(ledis.Slice(err.Error())) + } + w.buff.Write(Delims) +} + +func (w *respWriter) writeStatus(status string) { + w.buff.WriteByte('+') + w.buff.Write(ledis.Slice(status)) + w.buff.Write(Delims) +} + +func (w *respWriter) writeInteger(n int64) { + w.buff.WriteByte(':') + w.buff.Write(ledis.StrPutInt64(n)) + w.buff.Write(Delims) +} + +func (w *respWriter) writeBulk(b []byte) { + w.buff.WriteByte('$') + if b == nil { + w.buff.Write(NullBulk) + } else { + w.buff.Write(ledis.Slice(strconv.Itoa(len(b)))) + w.buff.Write(Delims) + w.buff.Write(b) + } + + w.buff.Write(Delims) +} + +func (w *respWriter) writeArray(lst []interface{}) { + w.buff.WriteByte('*') + if lst == nil { + w.buff.Write(NullArray) + w.buff.Write(Delims) + } else { + w.buff.Write(ledis.Slice(strconv.Itoa(len(lst)))) + w.buff.Write(Delims) + + for i := 0; i < len(lst); i++ { + switch v := lst[i].(type) { + case []interface{}: + w.writeArray(v) + case []byte: + w.writeBulk(v) + case nil: + w.writeBulk(nil) + case int64: + w.writeInteger(v) + default: + panic("invalid array type") + } + } + } +} + +func (w *respWriter) writeSliceArray(lst [][]byte) { + w.buff.WriteByte('*') + if lst == nil { + w.buff.Write(NullArray) + w.buff.Write(Delims) + } else { + w.buff.Write(ledis.Slice(strconv.Itoa(len(lst)))) + w.buff.Write(Delims) + + for i := 0; i < len(lst); i++ { + w.writeBulk(lst[i]) + } + } +} + +func (w *respWriter) writeFVPairArray(lst []ledis.FVPair) { + w.buff.WriteByte('*') + if lst == nil { + w.buff.Write(NullArray) + w.buff.Write(Delims) + } else { + w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2))) + w.buff.Write(Delims) + + for i := 0; i < len(lst); i++ { + w.writeBulk(lst[i].Field) + w.writeBulk(lst[i].Value) + } + } +} + +func (w *respWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) { + w.buff.WriteByte('*') + if lst == nil { + w.buff.Write(NullArray) + w.buff.Write(Delims) + } else { + if withScores { + w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2))) + w.buff.Write(Delims) + } else { + w.buff.Write(ledis.Slice(strconv.Itoa(len(lst)))) + w.buff.Write(Delims) + + } + + for i := 0; i < len(lst); i++ { + w.writeBulk(lst[i].Member) + + if withScores { + w.writeBulk(ledis.StrPutInt64(lst[i].Score)) + } + } + } +} + +func (w *respWriter) writeBulkFrom(n int64, rb io.Reader) { + w.buff.WriteByte('$') + w.buff.Write(ledis.Slice(strconv.FormatInt(n, 10))) + w.buff.Write(Delims) + + io.Copy(w.buff, rb) + w.buff.Write(Delims) +} + +func (w *respWriter) flush() { + w.buff.Flush() +} diff --git a/server/cmd_bit.go b/server/cmd_bit.go index cb3d593..a08a966 100644 --- a/server/cmd_bit.go +++ b/server/cmd_bit.go @@ -5,36 +5,36 @@ import ( "strings" ) -func bgetCommand(c *client) error { - args := c.args +func bgetCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.BGet(args[0]); err != nil { + if v, err := req.db.BGet(args[0]); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func bdeleteCommand(c *client) error { - args := c.args +func bdeleteCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.BDelete(args[0]); err != nil { + if n, err := req.db.BDelete(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func bsetbitCommand(c *client) error { - args := c.args +func bsetbitCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } @@ -58,16 +58,16 @@ func bsetbitCommand(c *client) error { return ErrBool } - if ori, err := c.db.BSetBit(args[0], offset, uint8(val)); err != nil { + if ori, err := req.db.BSetBit(args[0], offset, uint8(val)); err != nil { return err } else { - c.writeInteger(int64(ori)) + req.resp.writeInteger(int64(ori)) } return nil } -func bgetbitCommand(c *client) error { - args := c.args +func bgetbitCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -78,16 +78,16 @@ func bgetbitCommand(c *client) error { return ErrOffset } - if v, err := c.db.BGetBit(args[0], offset); err != nil { + if v, err := req.db.BGetBit(args[0], offset); err != nil { return err } else { - c.writeInteger(int64(v)) + req.resp.writeInteger(int64(v)) } return nil } -func bmsetbitCommand(c *client) error { - args := c.args +func bmsetbitCommand(req *requestContext) error { + args := req.args if len(args) < 3 { return ErrCmdParams } @@ -124,16 +124,16 @@ func bmsetbitCommand(c *client) error { pairs[i].Val = uint8(val) } - if place, err := c.db.BMSetBit(key, pairs...); err != nil { + if place, err := req.db.BMSetBit(key, pairs...); err != nil { return err } else { - c.writeInteger(place) + req.resp.writeInteger(place) } return nil } -func bcountCommand(c *client) error { - args := c.args +func bcountCommand(req *requestContext) error { + args := req.args argCnt := len(args) if !(argCnt > 0 && argCnt <= 3) { @@ -159,16 +159,16 @@ func bcountCommand(c *client) error { } } - if cnt, err := c.db.BCount(args[0], start, end); err != nil { + if cnt, err := req.db.BCount(args[0], start, end); err != nil { return err } else { - c.writeInteger(int64(cnt)) + req.resp.writeInteger(int64(cnt)) } return nil } -func boptCommand(c *client) error { - args := c.args +func boptCommand(req *requestContext) error { + args := req.args if len(args) < 2 { return ErrCmdParams } @@ -194,16 +194,16 @@ func boptCommand(c *client) error { if len(srcKeys) == 0 { return ErrCmdParams } - if blen, err := c.db.BOperation(op, dstKey, srcKeys...); err != nil { + if blen, err := req.db.BOperation(op, dstKey, srcKeys...); err != nil { return err } else { - c.writeInteger(int64(blen)) + req.resp.writeInteger(int64(blen)) } return nil } -func bexpireCommand(c *client) error { - args := c.args +func bexpireCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -213,17 +213,17 @@ func bexpireCommand(c *client) error { return ErrValue } - if v, err := c.db.BExpire(args[0], duration); err != nil { + if v, err := req.db.BExpire(args[0], duration); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func bexpireAtCommand(c *client) error { - args := c.args +func bexpireAtCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -233,40 +233,40 @@ func bexpireAtCommand(c *client) error { return ErrValue } - if v, err := c.db.BExpireAt(args[0], when); err != nil { + if v, err := req.db.BExpireAt(args[0], when); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func bttlCommand(c *client) error { - args := c.args +func bttlCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.BTTL(args[0]); err != nil { + if v, err := req.db.BTTL(args[0]); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func bpersistCommand(c *client) error { - args := c.args +func bpersistCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.BPersist(args[0]); err != nil { + if n, err := req.db.BPersist(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil diff --git a/server/cmd_hash.go b/server/cmd_hash.go index b705d17..0d195f6 100644 --- a/server/cmd_hash.go +++ b/server/cmd_hash.go @@ -4,87 +4,87 @@ import ( "github.com/siddontang/ledisdb/ledis" ) -func hsetCommand(c *client) error { - args := c.args +func hsetCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } - if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil { + if n, err := req.db.HSet(args[0], args[1], args[2]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hgetCommand(c *client) error { - args := c.args +func hgetCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if v, err := c.db.HGet(args[0], args[1]); err != nil { + if v, err := req.db.HGet(args[0], args[1]); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func hexistsCommand(c *client) error { - args := c.args +func hexistsCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } var n int64 = 1 - if v, err := c.db.HGet(args[0], args[1]); err != nil { + if v, err := req.db.HGet(args[0], args[1]); err != nil { return err } else { if v == nil { n = 0 } - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hdelCommand(c *client) error { - args := c.args +func hdelCommand(req *requestContext) error { + args := req.args if len(args) < 2 { return ErrCmdParams } - if n, err := c.db.HDel(args[0], args[1:]...); err != nil { + if n, err := req.db.HDel(args[0], args[1:]...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hlenCommand(c *client) error { - args := c.args +func hlenCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.HLen(args[0]); err != nil { + if n, err := req.db.HLen(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hincrbyCommand(c *client) error { - args := c.args +func hincrbyCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } @@ -95,16 +95,16 @@ func hincrbyCommand(c *client) error { } var n int64 - if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil { + if n, err = req.db.HIncrBy(args[0], args[1], delta); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hmsetCommand(c *client) error { - args := c.args +func hmsetCommand(req *requestContext) error { + args := req.args if len(args) < 3 { return ErrCmdParams } @@ -123,107 +123,107 @@ func hmsetCommand(c *client) error { kvs[i].Value = args[2*i+1] } - if err := c.db.HMset(key, kvs...); err != nil { + if err := req.db.HMset(key, kvs...); err != nil { return err } else { - c.writeStatus(OK) + req.resp.writeStatus(OK) } return nil } -func hmgetCommand(c *client) error { - args := c.args +func hmgetCommand(req *requestContext) error { + args := req.args if len(args) < 2 { return ErrCmdParams } - if v, err := c.db.HMget(args[0], args[1:]...); err != nil { + if v, err := req.db.HMget(args[0], args[1:]...); err != nil { return err } else { - c.writeSliceArray(v) + req.resp.writeSliceArray(v) } return nil } -func hgetallCommand(c *client) error { - args := c.args +func hgetallCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.HGetAll(args[0]); err != nil { + if v, err := req.db.HGetAll(args[0]); err != nil { return err } else { - c.writeFVPairArray(v) + req.resp.writeFVPairArray(v) } return nil } -func hkeysCommand(c *client) error { - args := c.args +func hkeysCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.HKeys(args[0]); err != nil { + if v, err := req.db.HKeys(args[0]); err != nil { return err } else { - c.writeSliceArray(v) + req.resp.writeSliceArray(v) } return nil } -func hvalsCommand(c *client) error { - args := c.args +func hvalsCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.HValues(args[0]); err != nil { + if v, err := req.db.HValues(args[0]); err != nil { return err } else { - c.writeSliceArray(v) + req.resp.writeSliceArray(v) } return nil } -func hclearCommand(c *client) error { - args := c.args +func hclearCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.HClear(args[0]); err != nil { + if n, err := req.db.HClear(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hmclearCommand(c *client) error { - args := c.args +func hmclearCommand(req *requestContext) error { + args := req.args if len(args) < 1 { return ErrCmdParams } - if n, err := c.db.HMclear(args...); err != nil { + if n, err := req.db.HMclear(args...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func hexpireCommand(c *client) error { - args := c.args +func hexpireCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -233,17 +233,17 @@ func hexpireCommand(c *client) error { return ErrValue } - if v, err := c.db.HExpire(args[0], duration); err != nil { + if v, err := req.db.HExpire(args[0], duration); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func hexpireAtCommand(c *client) error { - args := c.args +func hexpireAtCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -253,40 +253,40 @@ func hexpireAtCommand(c *client) error { return ErrValue } - if v, err := c.db.HExpireAt(args[0], when); err != nil { + if v, err := req.db.HExpireAt(args[0], when); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func httlCommand(c *client) error { - args := c.args +func httlCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.HTTL(args[0]); err != nil { + if v, err := req.db.HTTL(args[0]); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func hpersistCommand(c *client) error { - args := c.args +func hpersistCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.HPersist(args[0]); err != nil { + if n, err := req.db.HPersist(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil diff --git a/server/cmd_kv.go b/server/cmd_kv.go index f526045..83b6e67 100644 --- a/server/cmd_kv.go +++ b/server/cmd_kv.go @@ -4,112 +4,112 @@ import ( "github.com/siddontang/ledisdb/ledis" ) -func getCommand(c *client) error { - args := c.args +func getCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.Get(args[0]); err != nil { + if v, err := req.db.Get(args[0]); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func setCommand(c *client) error { - args := c.args +func setCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if err := c.db.Set(args[0], args[1]); err != nil { + if err := req.db.Set(args[0], args[1]); err != nil { return err } else { - c.writeStatus(OK) + req.resp.writeStatus(OK) } return nil } -func getsetCommand(c *client) error { - args := c.args +func getsetCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if v, err := c.db.GetSet(args[0], args[1]); err != nil { + if v, err := req.db.GetSet(args[0], args[1]); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func setnxCommand(c *client) error { - args := c.args +func setnxCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if n, err := c.db.SetNX(args[0], args[1]); err != nil { + if n, err := req.db.SetNX(args[0], args[1]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func existsCommand(c *client) error { - args := c.args +func existsCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.Exists(args[0]); err != nil { + if n, err := req.db.Exists(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func incrCommand(c *client) error { - args := c.args +func incrCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.Incr(c.args[0]); err != nil { + if n, err := req.db.Incr(req.args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func decrCommand(c *client) error { - args := c.args +func decrCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.Decr(c.args[0]); err != nil { + if n, err := req.db.Decr(req.args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func incrbyCommand(c *client) error { - args := c.args +func incrbyCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -119,17 +119,17 @@ func incrbyCommand(c *client) error { return ErrValue } - if n, err := c.db.IncrBy(c.args[0], delta); err != nil { + if n, err := req.db.IncrBy(req.args[0], delta); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func decrbyCommand(c *client) error { - args := c.args +func decrbyCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -139,32 +139,32 @@ func decrbyCommand(c *client) error { return ErrValue } - if n, err := c.db.DecrBy(c.args[0], delta); err != nil { + if n, err := req.db.DecrBy(req.args[0], delta); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func delCommand(c *client) error { - args := c.args +func delCommand(req *requestContext) error { + args := req.args if len(args) == 0 { return ErrCmdParams } - if n, err := c.db.Del(args...); err != nil { + if n, err := req.db.Del(args...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func msetCommand(c *client) error { - args := c.args +func msetCommand(req *requestContext) error { + args := req.args if len(args) == 0 || len(args)%2 != 0 { return ErrCmdParams } @@ -175,36 +175,36 @@ func msetCommand(c *client) error { kvs[i].Value = args[2*i+1] } - if err := c.db.MSet(kvs...); err != nil { + if err := req.db.MSet(kvs...); err != nil { return err } else { - c.writeStatus(OK) + req.resp.writeStatus(OK) } return nil } -// func setexCommand(c *client) error { +// func setexCommand(req *requestContext) error { // return nil // } -func mgetCommand(c *client) error { - args := c.args +func mgetCommand(req *requestContext) error { + args := req.args if len(args) == 0 { return ErrCmdParams } - if v, err := c.db.MGet(args...); err != nil { + if v, err := req.db.MGet(args...); err != nil { return err } else { - c.writeSliceArray(v) + req.resp.writeSliceArray(v) } return nil } -func expireCommand(c *client) error { - args := c.args +func expireCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -214,17 +214,17 @@ func expireCommand(c *client) error { return ErrValue } - if v, err := c.db.Expire(args[0], duration); err != nil { + if v, err := req.db.Expire(args[0], duration); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func expireAtCommand(c *client) error { - args := c.args +func expireAtCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -234,40 +234,40 @@ func expireAtCommand(c *client) error { return ErrValue } - if v, err := c.db.ExpireAt(args[0], when); err != nil { + if v, err := req.db.ExpireAt(args[0], when); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func ttlCommand(c *client) error { - args := c.args +func ttlCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.TTL(args[0]); err != nil { + if v, err := req.db.TTL(args[0]); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func persistCommand(c *client) error { - args := c.args +func persistCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.Persist(args[0]); err != nil { + if n, err := req.db.Persist(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil diff --git a/server/cmd_list.go b/server/cmd_list.go index cafd824..7d26893 100644 --- a/server/cmd_list.go +++ b/server/cmd_list.go @@ -4,83 +4,83 @@ import ( "github.com/siddontang/ledisdb/ledis" ) -func lpushCommand(c *client) error { - args := c.args +func lpushCommand(req *requestContext) error { + args := req.args if len(args) < 2 { return ErrCmdParams } - if n, err := c.db.LPush(args[0], args[1:]...); err != nil { + if n, err := req.db.LPush(args[0], args[1:]...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func rpushCommand(c *client) error { - args := c.args +func rpushCommand(req *requestContext) error { + args := req.args if len(args) < 2 { return ErrCmdParams } - if n, err := c.db.RPush(args[0], args[1:]...); err != nil { + if n, err := req.db.RPush(args[0], args[1:]...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func lpopCommand(c *client) error { - args := c.args +func lpopCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.LPop(args[0]); err != nil { + if v, err := req.db.LPop(args[0]); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func rpopCommand(c *client) error { - args := c.args +func rpopCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.RPop(args[0]); err != nil { + if v, err := req.db.RPop(args[0]); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func llenCommand(c *client) error { - args := c.args +func llenCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.LLen(args[0]); err != nil { + if n, err := req.db.LLen(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func lindexCommand(c *client) error { - args := c.args +func lindexCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -90,17 +90,17 @@ func lindexCommand(c *client) error { return ErrValue } - if v, err := c.db.LIndex(args[0], int32(index)); err != nil { + if v, err := req.db.LIndex(args[0], int32(index)); err != nil { return err } else { - c.writeBulk(v) + req.resp.writeBulk(v) } return nil } -func lrangeCommand(c *client) error { - args := c.args +func lrangeCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } @@ -119,47 +119,47 @@ func lrangeCommand(c *client) error { return ErrValue } - if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil { + if v, err := req.db.LRange(args[0], int32(start), int32(stop)); err != nil { return err } else { - c.writeSliceArray(v) + req.resp.writeSliceArray(v) } return nil } -func lclearCommand(c *client) error { - args := c.args +func lclearCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.LClear(args[0]); err != nil { + if n, err := req.db.LClear(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func lmclearCommand(c *client) error { - args := c.args +func lmclearCommand(req *requestContext) error { + args := req.args if len(args) < 1 { return ErrCmdParams } - if n, err := c.db.LMclear(args...); err != nil { + if n, err := req.db.LMclear(args...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func lexpireCommand(c *client) error { - args := c.args +func lexpireCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -169,17 +169,17 @@ func lexpireCommand(c *client) error { return ErrValue } - if v, err := c.db.LExpire(args[0], duration); err != nil { + if v, err := req.db.LExpire(args[0], duration); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func lexpireAtCommand(c *client) error { - args := c.args +func lexpireAtCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -189,40 +189,40 @@ func lexpireAtCommand(c *client) error { return ErrValue } - if v, err := c.db.LExpireAt(args[0], when); err != nil { + if v, err := req.db.LExpireAt(args[0], when); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func lttlCommand(c *client) error { - args := c.args +func lttlCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.LTTL(args[0]); err != nil { + if v, err := req.db.LTTL(args[0]); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func lpersistCommand(c *client) error { - args := c.args +func lpersistCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.LPersist(args[0]); err != nil { + if n, err := req.db.LPersist(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil diff --git a/server/cmd_replication.go b/server/cmd_replication.go index c47be30..85c0861 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -11,8 +11,8 @@ import ( "strings" ) -func slaveofCommand(c *client) error { - args := c.args +func slaveofCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams @@ -31,23 +31,23 @@ func slaveofCommand(c *client) error { masterAddr = fmt.Sprintf("%s:%s", args[0], args[1]) } - if err := c.app.slaveof(masterAddr); err != nil { + if err := req.app.slaveof(masterAddr); err != nil { return err } - c.writeStatus(OK) + req.resp.writeStatus(OK) return nil } -func fullsyncCommand(c *client) error { +func fullsyncCommand(req *requestContext) error { //todo, multi fullsync may use same dump file - dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_") + dumpFile, err := ioutil.TempFile(req.app.cfg.DataDir, "dump_") if err != nil { return err } - if err = c.app.ldb.Dump(dumpFile); err != nil { + if err = req.app.ldb.Dump(dumpFile); err != nil { return err } @@ -56,7 +56,7 @@ func fullsyncCommand(c *client) error { dumpFile.Seek(0, os.SEEK_SET) - c.writeBulkFrom(n, dumpFile) + req.resp.writeBulkFrom(n, dumpFile) name := dumpFile.Name() dumpFile.Close() @@ -68,8 +68,8 @@ func fullsyncCommand(c *client) error { var reserveInfoSpace = make([]byte, 16) -func syncCommand(c *client) error { - args := c.args +func syncCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -87,32 +87,32 @@ func syncCommand(c *client) error { return ErrCmdParams } - c.syncBuf.Reset() + req.syncBuf.Reset() //reserve space to write master info - if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil { + if _, err := req.syncBuf.Write(reserveInfoSpace); err != nil { return err } m := &ledis.MasterInfo{logIndex, logPos} - if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil { + if _, err := req.app.ldb.ReadEventsTo(m, &req.syncBuf); err != nil { return err } else { - buf := c.syncBuf.Bytes() + buf := req.syncBuf.Bytes() binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex)) binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos)) - if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) { - c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf))) + if len(req.compressBuf) < snappy.MaxEncodedLen(len(buf)) { + req.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf))) } - if buf, err = snappy.Encode(c.compressBuf, buf); err != nil { + if buf, err = snappy.Encode(req.compressBuf, buf); err != nil { return err } - c.writeBulk(buf) + req.resp.writeBulk(buf) } return nil diff --git a/server/cmd_zset.go b/server/cmd_zset.go index 868b6bd..e540b32 100644 --- a/server/cmd_zset.go +++ b/server/cmd_zset.go @@ -12,8 +12,8 @@ import ( var errScoreOverflow = errors.New("zset score overflow") -func zaddCommand(c *client) error { - args := c.args +func zaddCommand(req *requestContext) error { + args := req.args if len(args) < 3 { return ErrCmdParams } @@ -36,66 +36,66 @@ func zaddCommand(c *client) error { params[i].Member = args[2*i+1] } - if n, err := c.db.ZAdd(key, params...); err != nil { + if n, err := req.db.ZAdd(key, params...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zcardCommand(c *client) error { - args := c.args +func zcardCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.ZCard(args[0]); err != nil { + if n, err := req.db.ZCard(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zscoreCommand(c *client) error { - args := c.args +func zscoreCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if s, err := c.db.ZScore(args[0], args[1]); err != nil { + if s, err := req.db.ZScore(args[0], args[1]); err != nil { if err == ledis.ErrScoreMiss { - c.writeBulk(nil) + req.resp.writeBulk(nil) } else { return err } } else { - c.writeBulk(ledis.StrPutInt64(s)) + req.resp.writeBulk(ledis.StrPutInt64(s)) } return nil } -func zremCommand(c *client) error { - args := c.args +func zremCommand(req *requestContext) error { + args := req.args if len(args) < 2 { return ErrCmdParams } - if n, err := c.db.ZRem(args[0], args[1:]...); err != nil { + if n, err := req.db.ZRem(args[0], args[1:]...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zincrbyCommand(c *client) error { - args := c.args +func zincrbyCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } @@ -107,10 +107,10 @@ func zincrbyCommand(c *client) error { return ErrValue } - if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil { + if v, err := req.db.ZIncrBy(key, delta, args[2]); err != nil { return err } else { - c.writeBulk(ledis.StrPutInt64(v)) + req.resp.writeBulk(ledis.StrPutInt64(v)) } return nil @@ -157,6 +157,10 @@ func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err e err = ErrCmdParams return } + if maxBuf[0] == '(' { + ropen = true + maxBuf = maxBuf[1:] + } if maxBuf[0] == '(' { ropen = true @@ -182,8 +186,8 @@ func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err e return } -func zcountCommand(c *client) error { - args := c.args +func zcountCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } @@ -194,77 +198,77 @@ func zcountCommand(c *client) error { } if min > max { - c.writeInteger(0) + req.resp.writeInteger(0) return nil } - if n, err := c.db.ZCount(args[0], min, max); err != nil { + if n, err := req.db.ZCount(args[0], min, max); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zrankCommand(c *client) error { - args := c.args +func zrankCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if n, err := c.db.ZRank(args[0], args[1]); err != nil { + if n, err := req.db.ZRank(args[0], args[1]); err != nil { return err } else if n == -1 { - c.writeBulk(nil) + req.resp.writeBulk(nil) } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zrevrankCommand(c *client) error { - args := c.args +func zrevrankCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } - if n, err := c.db.ZRevRank(args[0], args[1]); err != nil { + if n, err := req.db.ZRevRank(args[0], args[1]); err != nil { return err } else if n == -1 { - c.writeBulk(nil) + req.resp.writeBulk(nil) } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zremrangebyrankCommand(c *client) error { - args := c.args +func zremrangebyrankCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } key := args[0] - start, stop, err := zparseRange(c, args[1], args[2]) + start, stop, err := zparseRange(req, args[1], args[2]) if err != nil { return ErrValue } - if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil { + if n, err := req.db.ZRemRangeByRank(key, start, stop); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zremrangebyscoreCommand(c *client) error { - args := c.args +func zremrangebyscoreCommand(req *requestContext) error { + args := req.args if len(args) != 3 { return ErrCmdParams } @@ -275,16 +279,16 @@ func zremrangebyscoreCommand(c *client) error { return err } - if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil { + if n, err := req.db.ZRemRangeByScore(key, min, max); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err error) { +func zparseRange(req *requestContext, a1 []byte, a2 []byte) (start int, stop int, err error) { if start, err = strconv.Atoi(ledis.String(a1)); err != nil { return } @@ -296,15 +300,15 @@ func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err erro return } -func zrangeGeneric(c *client, reverse bool) error { - args := c.args +func zrangeGeneric(req *requestContext, reverse bool) error { + args := req.args if len(args) < 3 { return ErrCmdParams } key := args[0] - start, stop, err := zparseRange(c, args[1], args[2]) + start, stop, err := zparseRange(req, args[1], args[2]) if err != nil { return ErrValue } @@ -323,24 +327,24 @@ func zrangeGeneric(c *client, reverse bool) error { } } - if datas, err := c.db.ZRangeGeneric(key, start, stop, reverse); err != nil { + if datas, err := req.db.ZRangeGeneric(key, start, stop, reverse); err != nil { return err } else { - c.writeScorePairArray(datas, withScores) + req.resp.writeScorePairArray(datas, withScores) } return nil } -func zrangeCommand(c *client) error { - return zrangeGeneric(c, false) +func zrangeCommand(req *requestContext) error { + return zrangeGeneric(req, false) } -func zrevrangeCommand(c *client) error { - return zrangeGeneric(c, true) +func zrevrangeCommand(req *requestContext) error { + return zrangeGeneric(req, true) } -func zrangebyscoreGeneric(c *client, reverse bool) error { - args := c.args +func zrangebyscoreGeneric(req *requestContext, reverse bool) error { + args := req.args if len(args) < 3 { return ErrCmdParams } @@ -396,59 +400,59 @@ func zrangebyscoreGeneric(c *client, reverse bool) error { if offset < 0 { //for ledis, if offset < 0, a empty will return //so here we directly return a empty array - c.writeArray([]interface{}{}) + req.resp.writeArray([]interface{}{}) return nil } - if datas, err := c.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil { + if datas, err := req.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil { return err } else { - c.writeScorePairArray(datas, withScores) + req.resp.writeScorePairArray(datas, withScores) } return nil } -func zrangebyscoreCommand(c *client) error { - return zrangebyscoreGeneric(c, false) +func zrangebyscoreCommand(req *requestContext) error { + return zrangebyscoreGeneric(req, false) } -func zrevrangebyscoreCommand(c *client) error { - return zrangebyscoreGeneric(c, true) +func zrevrangebyscoreCommand(req *requestContext) error { + return zrangebyscoreGeneric(req, true) } -func zclearCommand(c *client) error { - args := c.args +func zclearCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.ZClear(args[0]); err != nil { + if n, err := req.db.ZClear(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zmclearCommand(c *client) error { - args := c.args +func zmclearCommand(req *requestContext) error { + args := req.args if len(args) < 1 { return ErrCmdParams } - if n, err := c.db.ZMclear(args...); err != nil { + if n, err := req.db.ZMclear(args...); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil } -func zexpireCommand(c *client) error { - args := c.args +func zexpireCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -458,17 +462,17 @@ func zexpireCommand(c *client) error { return ErrValue } - if v, err := c.db.ZExpire(args[0], duration); err != nil { + if v, err := req.db.ZExpire(args[0], duration); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func zexpireAtCommand(c *client) error { - args := c.args +func zexpireAtCommand(req *requestContext) error { + args := req.args if len(args) != 2 { return ErrCmdParams } @@ -478,39 +482,39 @@ func zexpireAtCommand(c *client) error { return ErrValue } - if v, err := c.db.ZExpireAt(args[0], when); err != nil { + if v, err := req.db.ZExpireAt(args[0], when); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func zttlCommand(c *client) error { - args := c.args +func zttlCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if v, err := c.db.ZTTL(args[0]); err != nil { + if v, err := req.db.ZTTL(args[0]); err != nil { return err } else { - c.writeInteger(v) + req.resp.writeInteger(v) } return nil } -func zpersistCommand(c *client) error { - args := c.args +func zpersistCommand(req *requestContext) error { + args := req.args if len(args) != 1 { return ErrCmdParams } - if n, err := c.db.ZPersist(args[0]); err != nil { + if n, err := req.db.ZPersist(args[0]); err != nil { return err } else { - c.writeInteger(n) + req.resp.writeInteger(n) } return nil diff --git a/server/command.go b/server/command.go index b54e11e..440a177 100644 --- a/server/command.go +++ b/server/command.go @@ -8,7 +8,7 @@ import ( "strings" ) -type CommandFunc func(c *client) error +type CommandFunc func(req *requestContext) error var regCmds = map[string]CommandFunc{} @@ -20,33 +20,33 @@ func register(name string, f CommandFunc) { regCmds[name] = f } -func pingCommand(c *client) error { - c.writeStatus(PONG) +func pingCommand(req *requestContext) error { + req.resp.writeStatus(PONG) return nil } -func echoCommand(c *client) error { - if len(c.args) != 1 { +func echoCommand(req *requestContext) error { + if len(req.args) != 1 { return ErrCmdParams } - c.writeBulk(c.args[0]) + req.resp.writeBulk(req.args[0]) return nil } -func selectCommand(c *client) error { - if len(c.args) != 1 { +func selectCommand(req *requestContext) error { + if len(req.args) != 1 { return ErrCmdParams } - if index, err := strconv.Atoi(ledis.String(c.args[0])); err != nil { + if index, err := strconv.Atoi(ledis.String(req.args[0])); err != nil { return err } else { - if db, err := c.ldb.Select(index); err != nil { + if db, err := req.ldb.Select(index); err != nil { return err } else { - c.db = db - c.writeStatus(OK) + req.db = db + req.resp.writeStatus(OK) } } return nil diff --git a/server/config.go b/server/config.go index 024eeb9..82cb570 100644 --- a/server/config.go +++ b/server/config.go @@ -10,6 +10,8 @@ import ( type Config struct { Addr string `json:"addr"` + HttpAddr string `json:"http_addr"` + DataDir string `json:"data_dir"` DB struct { diff --git a/server/doc.go b/server/doc.go index a283cab..f1ce245 100644 --- a/server/doc.go +++ b/server/doc.go @@ -26,4 +26,13 @@ // // After you send slaveof command, the slave will start to sync master's binlog and replicate from binlog. // +// HTTP Interface +// LedisDB provides http interfaces for most commands(except the replication commands) +// +// curl http://127.0.0.1:11181/SET/hello/world +// → {"SET":[true,"OK"]} +// +// curl http://127.0.0.1:11181/0/GET/hello?type=json +// → {"GET":"world"} +// package server diff --git a/server/http_interface.md b/server/http_interface.md new file mode 100644 index 0000000..6e05c43 --- /dev/null +++ b/server/http_interface.md @@ -0,0 +1,42 @@ +##HTTP Interface +LedisDB provides http interfaces for most commands. +####Request +The proper url format is + + http://host:port[/db]/cmd/arg1/arg2/.../argN[?type=type] + +'db' and 'type' are optional. 'db' stands for ledis db index, ranges from 0 to 15, its default value is 0. 'type' is a custom content type, can be json, bson or msgpack, json is default. + + +####Response + +The response format is + + { cmd: return_value } + +or + + { cmd: [success, message] } + +'return_value' stands for the output of 'cmd', it can be a number, a string, a list, or a hash. If the return value is just a descriptive message, the second format will be taken, and 'success', a boolean value, indicates whether it is successful. + +####Example +#####Curl + + curl http://127.0.0.1:11181/SET/hello/world + → {"SET":[true,"OK"]} + + curl http://127.0.0.1:11181/0/GET/hello?type=json + → {"GET":"world"} + +#####Python +Requires [msgpack-python](https://pypi.python.org/pypi/msgpack-python) and [requests](https://pypi.python.org/pypi/requests/) + + >>> import requests + >>> import msgpack + + >>> requests.get("http://127.0.0.1:11181/0/SET/hello/world") + >>> r = requests.get("http://127.0.0.1:11181/0/GET/hello?type=msgpack") + >>> msgpack.unpackb(r.content) + >>> {"GET":"world"} + diff --git a/server/replication.go b/server/replication.go index 4d185a1..d02264d 100644 --- a/server/replication.go +++ b/server/replication.go @@ -72,8 +72,8 @@ func (m *MasterInfo) Load(filePath string) error { type master struct { sync.Mutex - c net.Conn - rb *bufio.Reader + conn net.Conn + rb *bufio.Reader app *App @@ -114,9 +114,9 @@ func (m *master) Close() { default: } - if m.c != nil { - m.c.Close() - m.c = nil + if m.conn != nil { + m.conn.Close() + m.conn = nil } m.wg.Wait() @@ -135,17 +135,17 @@ func (m *master) connect() error { return fmt.Errorf("no assign master addr") } - if m.c != nil { - m.c.Close() - m.c = nil + if m.conn != nil { + m.conn.Close() + m.conn = nil } - if c, err := net.Dial("tcp", m.info.Addr); err != nil { + if conn, err := net.Dial("tcp", m.info.Addr); err != nil { return err } else { - m.c = c + m.conn = conn - m.rb = bufio.NewReaderSize(m.c, 4096) + m.rb = bufio.NewReaderSize(m.conn, 4096) } return nil } @@ -248,7 +248,7 @@ var ( ) func (m *master) fullSync() error { - if _, err := m.c.Write(fullSyncCmd); err != nil { + if _, err := m.conn.Write(fullSyncCmd); err != nil { return err } @@ -291,7 +291,7 @@ func (m *master) sync() error { cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), logIndexStr, len(logPosStr), logPosStr)) - if _, err := m.c.Write(cmd); err != nil { + if _, err := m.conn.Write(cmd); err != nil { return err } diff --git a/server/request.go b/server/request.go new file mode 100644 index 0000000..0531a37 --- /dev/null +++ b/server/request.go @@ -0,0 +1,116 @@ +package server + +import ( + "bytes" + "github.com/siddontang/ledisdb/ledis" + "io" + "time" +) + +type responseWriter interface { + writeError(error) + writeStatus(string) + writeInteger(int64) + writeBulk([]byte) + writeArray([]interface{}) + writeSliceArray([][]byte) + writeFVPairArray([]ledis.FVPair) + writeScorePairArray([]ledis.ScorePair, bool) + writeBulkFrom(int64, io.Reader) + flush() +} + +type requestContext struct { + app *App + ldb *ledis.Ledis + db *ledis.DB + + remoteAddr string + cmd string + args [][]byte + + resp responseWriter + + syncBuf bytes.Buffer + compressBuf []byte + + reqErr chan error + + buf bytes.Buffer +} + +func newRequestContext(app *App) *requestContext { + req := new(requestContext) + + req.app = app + req.ldb = app.ldb + req.db, _ = app.ldb.Select(0) //use default db + + req.compressBuf = make([]byte, 256) + req.reqErr = make(chan error) + + return req +} + +func (req *requestContext) perform() { + var err error + + start := time.Now() + + if len(req.cmd) == 0 { + err = ErrEmptyCommand + } else if exeCmd, ok := regCmds[req.cmd]; !ok { + err = ErrNotFound + } else { + go func() { + req.reqErr <- exeCmd(req) + }() + + err = <-req.reqErr + } + + duration := time.Since(start) + + if req.app.access != nil { + fullCmd := req.catGenericCommand() + cost := duration.Nanoseconds() / 1000000 + + truncateLen := len(fullCmd) + if truncateLen > 256 { + truncateLen = 256 + } + + req.app.access.Log(req.remoteAddr, cost, fullCmd[:truncateLen], err) + } + + if err != nil { + req.resp.writeError(err) + } + req.resp.flush() + return +} + +// func (h *requestHandler) catFullCommand(req *requestContext) []byte { +// +// // if strings.HasSuffix(cmd, "expire") { +// // catExpireCommand(c, buffer) +// // } else { +// // catGenericCommand(c, buffer) +// // } +// +// return h.catGenericCommand(req) +// } + +func (req *requestContext) catGenericCommand() []byte { + buffer := req.buf + buffer.Reset() + + buffer.Write([]byte(req.cmd)) + + for _, arg := range req.args { + buffer.WriteByte(' ') + buffer.Write(arg) + } + + return buffer.Bytes() +}