Merge branch 'restful-feature' into develop

Conflicts:
	bootstrap.sh
	server/cmd_bit.go
	server/cmd_kv.go
	server/cmd_zset.go
This commit is contained in:
wenyekui 2014-08-04 11:26:08 +08:00
commit 5c3872ce68
18 changed files with 1134 additions and 689 deletions

View File

@ -2,12 +2,16 @@
. ./dev.sh
go get github.com/siddontang/go-log/log
go get github.com/siddontang/go-snappy/snappy
go get github.com/siddontang/copier
go get -u github.com/siddontang/go-log/log
go get -u github.com/siddontang/go-snappy/snappy
go get -u github.com/siddontang/copier
go get github.com/siddontang/goleveldb/leveldb
go get -u github.com/siddontang/goleveldb/leveldb
go get github.com/szferi/gomdb
go get -u github.com/szferi/gomdb
go get -u github.com/boltdb/bolt
go get -u gopkg.in/mgo.v2/bson
go get -u github.com/ugorji/go/codec
go get github.com/boltdb/bolt

View File

@ -1,5 +1,6 @@
{
"addr": "127.0.0.1:6380",
"http_addr": "127.0.0.1:11181",
"data_dir": "/tmp/ledis_server",
"db": {

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/siddontang/ledisdb/ledis"
"net"
"net/http"
"path"
"strings"
)
@ -11,7 +12,8 @@ import (
type App struct {
cfg *Config
listener net.Listener
listener net.Listener
httpListener net.Listener
ldb *ledis.Ledis
@ -25,6 +27,14 @@ type App struct {
m *master
}
func netType(s string) string {
if strings.Contains(s, "/") {
return "unix"
} else {
return "tcp"
}
}
func NewApp(cfg *Config) (*App, error) {
if len(cfg.DataDir) == 0 {
return nil, fmt.Errorf("must set data_dir first")
@ -40,14 +50,14 @@ func NewApp(cfg *Config) (*App, error) {
var err error
if strings.Contains(cfg.Addr, "/") {
app.listener, err = net.Listen("unix", cfg.Addr)
} else {
app.listener, err = net.Listen("tcp", cfg.Addr)
if app.listener, err = net.Listen(netType(cfg.Addr), cfg.Addr); err != nil {
return nil, err
}
if err != nil {
return nil, err
if len(cfg.HttpAddr) > 0 {
if app.httpListener, err = net.Listen(netType(cfg.HttpAddr), cfg.HttpAddr); err != nil {
return nil, err
}
}
if len(cfg.AccessLog) > 0 {
@ -82,6 +92,10 @@ func (app *App) Close() {
app.listener.Close()
if app.httpListener != nil {
app.httpListener.Close()
}
app.m.Close()
if app.access != nil {
@ -96,16 +110,33 @@ func (app *App) Run() {
app.slaveof(app.cfg.SlaveOf)
}
go app.httpServe()
for !app.closed {
conn, err := app.listener.Accept()
if err != nil {
continue
}
newClient(conn, app)
newClientRESP(conn, app)
}
}
func (app *App) httpServe() {
if app.httpListener == nil {
return
}
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
newClientHTTP(app, w, r)
})
svr := http.Server{Handler: mux}
svr.Serve(app.httpListener)
}
func (app *App) Ledis() *ledis.Ledis {
return app.ldb
}

View File

@ -1,315 +0,0 @@
package server
import (
"bufio"
"bytes"
"errors"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net"
"runtime"
"strconv"
"strings"
"time"
)
var errReadRequest = errors.New("invalid request protocol")
type client struct {
app *App
ldb *ledis.Ledis
db *ledis.DB
c net.Conn
rb *bufio.Reader
wb *bufio.Writer
cmd string
args [][]byte
reqC chan error
syncBuf bytes.Buffer
compressBuf []byte
logBuf bytes.Buffer
}
func newClient(c net.Conn, app *App) {
co := new(client)
co.app = app
co.ldb = app.ldb
//use default db
co.db, _ = app.ldb.Select(0)
co.c = c
co.rb = bufio.NewReaderSize(c, 256)
co.wb = bufio.NewWriterSize(c, 256)
co.reqC = make(chan error, 1)
co.compressBuf = make([]byte, 256)
go co.run()
}
func (c *client) run() {
defer func() {
if e := recover(); e != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[0:n]
log.Fatal("client run panic %s:%v", buf, e)
}
c.c.Close()
}()
for {
req, err := c.readRequest()
if err != nil {
return
}
c.handleRequest(req)
}
}
func (c *client) readLine() ([]byte, error) {
return ReadLine(c.rb)
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *client) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if nparams <= 0 {
return nil, errReadRequest
}
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
buf := make([]byte, n)
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
}
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
req = append(req, buf)
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *client) handleRequest(req [][]byte) {
var err error
start := time.Now()
if len(req) == 0 {
err = ErrEmptyCommand
} else {
c.cmd = strings.ToLower(ledis.String(req[0]))
c.args = req[1:]
f, ok := regCmds[c.cmd]
if !ok {
err = ErrNotFound
} else {
go func() {
c.reqC <- f(c)
}()
err = <-c.reqC
}
}
duration := time.Since(start)
if c.app.access != nil {
c.logBuf.Reset()
for i, r := range req {
left := 256 - c.logBuf.Len()
if left <= 0 {
break
} else if len(r) <= left {
c.logBuf.Write(r)
if i != len(req)-1 {
c.logBuf.WriteByte(' ')
}
} else {
c.logBuf.Write(r[0:left])
}
}
c.app.access.Log(c.c.RemoteAddr().String(), duration.Nanoseconds()/1000000, c.logBuf.Bytes(), err)
}
if err != nil {
c.writeError(err)
}
c.wb.Flush()
}
func (c *client) writeError(err error) {
c.wb.Write(ledis.Slice("-ERR"))
if err != nil {
c.wb.WriteByte(' ')
c.wb.Write(ledis.Slice(err.Error()))
}
c.wb.Write(Delims)
}
func (c *client) writeStatus(status string) {
c.wb.WriteByte('+')
c.wb.Write(ledis.Slice(status))
c.wb.Write(Delims)
}
func (c *client) writeInteger(n int64) {
c.wb.WriteByte(':')
c.wb.Write(ledis.StrPutInt64(n))
c.wb.Write(Delims)
}
func (c *client) writeBulk(b []byte) {
c.wb.WriteByte('$')
if b == nil {
c.wb.Write(NullBulk)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(b))))
c.wb.Write(Delims)
c.wb.Write(b)
}
c.wb.Write(Delims)
}
func (c *client) writeArray(ay []interface{}) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
switch v := ay[i].(type) {
case []interface{}:
c.writeArray(v)
case []byte:
c.writeBulk(v)
case nil:
c.writeBulk(nil)
case int64:
c.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
func (c *client) writeSliceArray(ay [][]byte) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i])
}
}
}
func (c *client) writeFVPairArray(ay []ledis.FVPair) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Field)
c.writeBulk(ay[i].Value)
}
}
}
func (c *client) writeScorePairArray(ay []ledis.ScorePair, withScores bool) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
if withScores {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
}
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Member)
if withScores {
c.writeBulk(ledis.StrPutInt64(ay[i].Score))
}
}
}
}
func (c *client) writeBulkFrom(n int64, rb io.Reader) {
c.wb.WriteByte('$')
c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10)))
c.wb.Write(Delims)
io.Copy(c.wb, rb)
c.wb.Write(Delims)
}

260
server/client_http.go Normal file
View File

@ -0,0 +1,260 @@
package server
import (
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net/http"
"strconv"
"strings"
"encoding/json"
"github.com/ugorji/go/codec"
"gopkg.in/mgo.v2/bson"
)
var allowedContentTypes = map[string]struct{}{
"json": struct{}{},
"bson": struct{}{},
"msgpack": struct{}{},
}
var unsopportedCommands = map[string]struct{}{
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"quit": struct{}{},
}
type httpClient struct {
app *App
db *ledis.DB
ldb *ledis.Ledis
resp responseWriter
req *requestContext
}
type httpWriter struct {
contentType string
cmd string
w http.ResponseWriter
}
func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) {
var err error
c := new(httpClient)
c.app = app
c.ldb = app.ldb
c.db, err = c.ldb.Select(0)
if err != nil {
w.Write([]byte(err.Error()))
return
}
c.req, err = c.makeRequest(app, r, w)
if err != nil {
w.Write([]byte(err.Error()))
return
}
c.req.perform()
}
func (c *httpClient) addr(r *http.Request) string {
return r.RemoteAddr
}
func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWriter) (*requestContext, error) {
var err error
db, cmd, argsStr, contentType := c.parseReqPath(r)
c.db, err = app.ldb.Select(db)
if err != nil {
return nil, err
}
contentType = strings.ToLower(contentType)
if _, ok := allowedContentTypes[contentType]; !ok {
return nil, fmt.Errorf("unsupported content type: '%s', only json, bson, msgpack are supported", contentType)
}
req := newRequestContext(app)
args := make([][]byte, len(argsStr))
for i, arg := range argsStr {
args[i] = []byte(arg)
}
req.cmd = strings.ToLower(cmd)
if _, ok := unsopportedCommands[req.cmd]; ok {
return nil, fmt.Errorf("unsupported command: '%s'", cmd)
}
req.args = args
req.remoteAddr = c.addr(r)
req.resp = &httpWriter{contentType, cmd, w}
return req, nil
}
func (c *httpClient) parseReqPath(r *http.Request) (db int, cmd string, args []string, contentType string) {
contentType = r.FormValue("type")
if contentType == "" {
contentType = "json"
}
substrings := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/")
if len(substrings) == 1 {
return 0, substrings[0], substrings[1:], contentType
}
db, err := strconv.Atoi(substrings[0])
if err != nil {
cmd = substrings[0]
args = substrings[1:]
} else {
cmd = substrings[1]
args = substrings[2:]
}
return
}
// http writer
func (w *httpWriter) genericWrite(result interface{}) {
m := map[string]interface{}{
w.cmd: result,
}
switch w.contentType {
case "json":
writeJSON(&m, w.w)
case "bson":
writeBSON(&m, w.w)
case "msgpack":
writeMsgPack(&m, w.w)
default:
log.Error("invalid content type %s", w.contentType)
}
}
func (w *httpWriter) writeError(err error) {
result := [2]interface{}{
false,
fmt.Sprintf("ERR %s", err.Error()),
}
w.genericWrite(result)
}
func (w *httpWriter) writeStatus(status string) {
var success bool
if status == OK || status == PONG {
success = true
}
w.genericWrite([]interface{}{success, status})
}
func (w *httpWriter) writeInteger(n int64) {
w.genericWrite(n)
}
func (w *httpWriter) writeBulk(b []byte) {
if b == nil {
w.genericWrite(nil)
} else {
w.genericWrite(ledis.String(b))
}
}
func (w *httpWriter) writeArray(lst []interface{}) {
w.genericWrite(lst)
}
func (w *httpWriter) writeSliceArray(lst [][]byte) {
arr := make([]interface{}, len(lst))
for i, elem := range lst {
if elem == nil {
arr[i] = nil
} else {
arr[i] = ledis.String(elem)
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) {
m := make(map[string]string)
for _, elem := range lst {
m[ledis.String(elem.Field)] = ledis.String(elem.Value)
}
w.genericWrite(m)
}
func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
var arr []string
if withScores {
arr = make([]string, 2*len(lst))
for i, data := range lst {
arr[2*i] = ledis.String(data.Member)
arr[2*i+1] = strconv.FormatInt(data.Score, 10)
}
} else {
arr = make([]string, len(lst))
for i, data := range lst {
arr[i] = ledis.String(data.Member)
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) {
w.writeError(fmt.Errorf("unsuport"))
}
func (w *httpWriter) flush() {
}
func writeJSON(resutl interface{}, w http.ResponseWriter) {
buf, err := json.Marshal(resutl)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/json; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeBSON(result interface{}, w http.ResponseWriter) {
buf, err := bson.Marshal(result)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeMsgPack(result interface{}, w http.ResponseWriter) {
w.Header().Set("Content-type", "application/octet-stream")
var mh codec.MsgpackHandle
enc := codec.NewEncoder(w, &mh)
if err := enc.Encode(result); err != nil {
log.Error(err.Error())
}
}

291
server/client_resp.go Normal file
View File

@ -0,0 +1,291 @@
package server
import (
"bufio"
"errors"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net"
"runtime"
"strconv"
"strings"
)
var errReadRequest = errors.New("invalid request protocol")
type respClient struct {
app *App
ldb *ledis.Ledis
db *ledis.DB
conn net.Conn
rb *bufio.Reader
req *requestContext
}
type respWriter struct {
buff *bufio.Writer
}
func newClientRESP(conn net.Conn, app *App) {
c := new(respClient)
c.app = app
c.conn = conn
c.ldb = app.ldb
c.db, _ = app.ldb.Select(0)
c.rb = bufio.NewReaderSize(conn, 256)
c.req = newRequestContext(app)
c.req.resp = newWriterRESP(conn)
c.req.remoteAddr = conn.RemoteAddr().String()
go c.run()
}
func (c *respClient) run() {
defer func() {
if e := recover(); e != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[0:n]
log.Fatal("client run panic %s:%v", buf, e)
}
c.conn.Close()
}()
for {
reqData, err := c.readRequest()
if err != nil {
return
}
c.handleRequest(reqData)
}
}
func (c *respClient) readLine() ([]byte, error) {
return ReadLine(c.rb)
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *respClient) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if nparams <= 0 {
return nil, errReadRequest
}
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
buf := make([]byte, n)
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
}
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
req = append(req, buf)
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *respClient) handleRequest(reqData [][]byte) {
req := c.req
if len(reqData) == 0 {
c.req.cmd = ""
c.req.args = reqData[0:0]
} else {
c.req.cmd = strings.ToLower(ledis.String(reqData[0]))
c.req.args = reqData[1:]
}
if c.req.cmd == "quit" {
c.req.resp.writeStatus(OK)
c.req.resp.flush()
c.conn.Close()
}
req.db = c.db
c.req.perform()
c.db = req.db // "SELECT"
return
}
// response writer
func newWriterRESP(conn net.Conn) *respWriter {
w := new(respWriter)
w.buff = bufio.NewWriterSize(conn, 256)
return w
}
func (w *respWriter) writeError(err error) {
w.buff.Write(ledis.Slice("-ERR"))
if err != nil {
w.buff.WriteByte(' ')
w.buff.Write(ledis.Slice(err.Error()))
}
w.buff.Write(Delims)
}
func (w *respWriter) writeStatus(status string) {
w.buff.WriteByte('+')
w.buff.Write(ledis.Slice(status))
w.buff.Write(Delims)
}
func (w *respWriter) writeInteger(n int64) {
w.buff.WriteByte(':')
w.buff.Write(ledis.StrPutInt64(n))
w.buff.Write(Delims)
}
func (w *respWriter) writeBulk(b []byte) {
w.buff.WriteByte('$')
if b == nil {
w.buff.Write(NullBulk)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(b))))
w.buff.Write(Delims)
w.buff.Write(b)
}
w.buff.Write(Delims)
}
func (w *respWriter) writeArray(lst []interface{}) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
switch v := lst[i].(type) {
case []interface{}:
w.writeArray(v)
case []byte:
w.writeBulk(v)
case nil:
w.writeBulk(nil)
case int64:
w.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
func (w *respWriter) writeSliceArray(lst [][]byte) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i])
}
}
}
func (w *respWriter) writeFVPairArray(lst []ledis.FVPair) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2)))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i].Field)
w.writeBulk(lst[i].Value)
}
}
}
func (w *respWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
if withScores {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2)))
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
}
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i].Member)
if withScores {
w.writeBulk(ledis.StrPutInt64(lst[i].Score))
}
}
}
}
func (w *respWriter) writeBulkFrom(n int64, rb io.Reader) {
w.buff.WriteByte('$')
w.buff.Write(ledis.Slice(strconv.FormatInt(n, 10)))
w.buff.Write(Delims)
io.Copy(w.buff, rb)
w.buff.Write(Delims)
}
func (w *respWriter) flush() {
w.buff.Flush()
}

View File

@ -5,36 +5,36 @@ import (
"strings"
)
func bgetCommand(c *client) error {
args := c.args
func bgetCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.BGet(args[0]); err != nil {
if v, err := req.db.BGet(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func bdeleteCommand(c *client) error {
args := c.args
func bdeleteCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.BDelete(args[0]); err != nil {
if n, err := req.db.BDelete(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func bsetbitCommand(c *client) error {
args := c.args
func bsetbitCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -58,16 +58,16 @@ func bsetbitCommand(c *client) error {
return ErrBool
}
if ori, err := c.db.BSetBit(args[0], offset, uint8(val)); err != nil {
if ori, err := req.db.BSetBit(args[0], offset, uint8(val)); err != nil {
return err
} else {
c.writeInteger(int64(ori))
req.resp.writeInteger(int64(ori))
}
return nil
}
func bgetbitCommand(c *client) error {
args := c.args
func bgetbitCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -78,16 +78,16 @@ func bgetbitCommand(c *client) error {
return ErrOffset
}
if v, err := c.db.BGetBit(args[0], offset); err != nil {
if v, err := req.db.BGetBit(args[0], offset); err != nil {
return err
} else {
c.writeInteger(int64(v))
req.resp.writeInteger(int64(v))
}
return nil
}
func bmsetbitCommand(c *client) error {
args := c.args
func bmsetbitCommand(req *requestContext) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -124,16 +124,16 @@ func bmsetbitCommand(c *client) error {
pairs[i].Val = uint8(val)
}
if place, err := c.db.BMSetBit(key, pairs...); err != nil {
if place, err := req.db.BMSetBit(key, pairs...); err != nil {
return err
} else {
c.writeInteger(place)
req.resp.writeInteger(place)
}
return nil
}
func bcountCommand(c *client) error {
args := c.args
func bcountCommand(req *requestContext) error {
args := req.args
argCnt := len(args)
if !(argCnt > 0 && argCnt <= 3) {
@ -159,16 +159,16 @@ func bcountCommand(c *client) error {
}
}
if cnt, err := c.db.BCount(args[0], start, end); err != nil {
if cnt, err := req.db.BCount(args[0], start, end); err != nil {
return err
} else {
c.writeInteger(int64(cnt))
req.resp.writeInteger(int64(cnt))
}
return nil
}
func boptCommand(c *client) error {
args := c.args
func boptCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
@ -194,16 +194,16 @@ func boptCommand(c *client) error {
if len(srcKeys) == 0 {
return ErrCmdParams
}
if blen, err := c.db.BOperation(op, dstKey, srcKeys...); err != nil {
if blen, err := req.db.BOperation(op, dstKey, srcKeys...); err != nil {
return err
} else {
c.writeInteger(int64(blen))
req.resp.writeInteger(int64(blen))
}
return nil
}
func bexpireCommand(c *client) error {
args := c.args
func bexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -213,17 +213,17 @@ func bexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.BExpire(args[0], duration); err != nil {
if v, err := req.db.BExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func bexpireAtCommand(c *client) error {
args := c.args
func bexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -233,40 +233,40 @@ func bexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.BExpireAt(args[0], when); err != nil {
if v, err := req.db.BExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func bttlCommand(c *client) error {
args := c.args
func bttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.BTTL(args[0]); err != nil {
if v, err := req.db.BTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func bpersistCommand(c *client) error {
args := c.args
func bpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.BPersist(args[0]); err != nil {
if n, err := req.db.BPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -4,87 +4,87 @@ import (
"github.com/siddontang/ledisdb/ledis"
)
func hsetCommand(c *client) error {
args := c.args
func hsetCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil {
if n, err := req.db.HSet(args[0], args[1], args[2]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hgetCommand(c *client) error {
args := c.args
func hgetCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if v, err := c.db.HGet(args[0], args[1]); err != nil {
if v, err := req.db.HGet(args[0], args[1]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func hexistsCommand(c *client) error {
args := c.args
func hexistsCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
var n int64 = 1
if v, err := c.db.HGet(args[0], args[1]); err != nil {
if v, err := req.db.HGet(args[0], args[1]); err != nil {
return err
} else {
if v == nil {
n = 0
}
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hdelCommand(c *client) error {
args := c.args
func hdelCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.HDel(args[0], args[1:]...); err != nil {
if n, err := req.db.HDel(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hlenCommand(c *client) error {
args := c.args
func hlenCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.HLen(args[0]); err != nil {
if n, err := req.db.HLen(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hincrbyCommand(c *client) error {
args := c.args
func hincrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -95,16 +95,16 @@ func hincrbyCommand(c *client) error {
}
var n int64
if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil {
if n, err = req.db.HIncrBy(args[0], args[1], delta); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hmsetCommand(c *client) error {
args := c.args
func hmsetCommand(req *requestContext) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -123,107 +123,107 @@ func hmsetCommand(c *client) error {
kvs[i].Value = args[2*i+1]
}
if err := c.db.HMset(key, kvs...); err != nil {
if err := req.db.HMset(key, kvs...); err != nil {
return err
} else {
c.writeStatus(OK)
req.resp.writeStatus(OK)
}
return nil
}
func hmgetCommand(c *client) error {
args := c.args
func hmgetCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if v, err := c.db.HMget(args[0], args[1:]...); err != nil {
if v, err := req.db.HMget(args[0], args[1:]...); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func hgetallCommand(c *client) error {
args := c.args
func hgetallCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HGetAll(args[0]); err != nil {
if v, err := req.db.HGetAll(args[0]); err != nil {
return err
} else {
c.writeFVPairArray(v)
req.resp.writeFVPairArray(v)
}
return nil
}
func hkeysCommand(c *client) error {
args := c.args
func hkeysCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HKeys(args[0]); err != nil {
if v, err := req.db.HKeys(args[0]); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func hvalsCommand(c *client) error {
args := c.args
func hvalsCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HValues(args[0]); err != nil {
if v, err := req.db.HValues(args[0]); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func hclearCommand(c *client) error {
args := c.args
func hclearCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.HClear(args[0]); err != nil {
if n, err := req.db.HClear(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hmclearCommand(c *client) error {
args := c.args
func hmclearCommand(req *requestContext) error {
args := req.args
if len(args) < 1 {
return ErrCmdParams
}
if n, err := c.db.HMclear(args...); err != nil {
if n, err := req.db.HMclear(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hexpireCommand(c *client) error {
args := c.args
func hexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -233,17 +233,17 @@ func hexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.HExpire(args[0], duration); err != nil {
if v, err := req.db.HExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func hexpireAtCommand(c *client) error {
args := c.args
func hexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -253,40 +253,40 @@ func hexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.HExpireAt(args[0], when); err != nil {
if v, err := req.db.HExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func httlCommand(c *client) error {
args := c.args
func httlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HTTL(args[0]); err != nil {
if v, err := req.db.HTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func hpersistCommand(c *client) error {
args := c.args
func hpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.HPersist(args[0]); err != nil {
if n, err := req.db.HPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -4,112 +4,112 @@ import (
"github.com/siddontang/ledisdb/ledis"
)
func getCommand(c *client) error {
args := c.args
func getCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.Get(args[0]); err != nil {
if v, err := req.db.Get(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func setCommand(c *client) error {
args := c.args
func setCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if err := c.db.Set(args[0], args[1]); err != nil {
if err := req.db.Set(args[0], args[1]); err != nil {
return err
} else {
c.writeStatus(OK)
req.resp.writeStatus(OK)
}
return nil
}
func getsetCommand(c *client) error {
args := c.args
func getsetCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if v, err := c.db.GetSet(args[0], args[1]); err != nil {
if v, err := req.db.GetSet(args[0], args[1]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func setnxCommand(c *client) error {
args := c.args
func setnxCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.db.SetNX(args[0], args[1]); err != nil {
if n, err := req.db.SetNX(args[0], args[1]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func existsCommand(c *client) error {
args := c.args
func existsCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Exists(args[0]); err != nil {
if n, err := req.db.Exists(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func incrCommand(c *client) error {
args := c.args
func incrCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Incr(c.args[0]); err != nil {
if n, err := req.db.Incr(req.args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func decrCommand(c *client) error {
args := c.args
func decrCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Decr(c.args[0]); err != nil {
if n, err := req.db.Decr(req.args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func incrbyCommand(c *client) error {
args := c.args
func incrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -119,17 +119,17 @@ func incrbyCommand(c *client) error {
return ErrValue
}
if n, err := c.db.IncrBy(c.args[0], delta); err != nil {
if n, err := req.db.IncrBy(req.args[0], delta); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func decrbyCommand(c *client) error {
args := c.args
func decrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -139,32 +139,32 @@ func decrbyCommand(c *client) error {
return ErrValue
}
if n, err := c.db.DecrBy(c.args[0], delta); err != nil {
if n, err := req.db.DecrBy(req.args[0], delta); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func delCommand(c *client) error {
args := c.args
func delCommand(req *requestContext) error {
args := req.args
if len(args) == 0 {
return ErrCmdParams
}
if n, err := c.db.Del(args...); err != nil {
if n, err := req.db.Del(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func msetCommand(c *client) error {
args := c.args
func msetCommand(req *requestContext) error {
args := req.args
if len(args) == 0 || len(args)%2 != 0 {
return ErrCmdParams
}
@ -175,36 +175,36 @@ func msetCommand(c *client) error {
kvs[i].Value = args[2*i+1]
}
if err := c.db.MSet(kvs...); err != nil {
if err := req.db.MSet(kvs...); err != nil {
return err
} else {
c.writeStatus(OK)
req.resp.writeStatus(OK)
}
return nil
}
// func setexCommand(c *client) error {
// func setexCommand(req *requestContext) error {
// return nil
// }
func mgetCommand(c *client) error {
args := c.args
func mgetCommand(req *requestContext) error {
args := req.args
if len(args) == 0 {
return ErrCmdParams
}
if v, err := c.db.MGet(args...); err != nil {
if v, err := req.db.MGet(args...); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func expireCommand(c *client) error {
args := c.args
func expireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -214,17 +214,17 @@ func expireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.Expire(args[0], duration); err != nil {
if v, err := req.db.Expire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func expireAtCommand(c *client) error {
args := c.args
func expireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -234,40 +234,40 @@ func expireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ExpireAt(args[0], when); err != nil {
if v, err := req.db.ExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func ttlCommand(c *client) error {
args := c.args
func ttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.TTL(args[0]); err != nil {
if v, err := req.db.TTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func persistCommand(c *client) error {
args := c.args
func persistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Persist(args[0]); err != nil {
if n, err := req.db.Persist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -4,83 +4,83 @@ import (
"github.com/siddontang/ledisdb/ledis"
)
func lpushCommand(c *client) error {
args := c.args
func lpushCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.LPush(args[0], args[1:]...); err != nil {
if n, err := req.db.LPush(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func rpushCommand(c *client) error {
args := c.args
func rpushCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.RPush(args[0], args[1:]...); err != nil {
if n, err := req.db.RPush(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lpopCommand(c *client) error {
args := c.args
func lpopCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.LPop(args[0]); err != nil {
if v, err := req.db.LPop(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func rpopCommand(c *client) error {
args := c.args
func rpopCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.RPop(args[0]); err != nil {
if v, err := req.db.RPop(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func llenCommand(c *client) error {
args := c.args
func llenCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.LLen(args[0]); err != nil {
if n, err := req.db.LLen(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lindexCommand(c *client) error {
args := c.args
func lindexCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -90,17 +90,17 @@ func lindexCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LIndex(args[0], int32(index)); err != nil {
if v, err := req.db.LIndex(args[0], int32(index)); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func lrangeCommand(c *client) error {
args := c.args
func lrangeCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -119,47 +119,47 @@ func lrangeCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil {
if v, err := req.db.LRange(args[0], int32(start), int32(stop)); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func lclearCommand(c *client) error {
args := c.args
func lclearCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.LClear(args[0]); err != nil {
if n, err := req.db.LClear(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lmclearCommand(c *client) error {
args := c.args
func lmclearCommand(req *requestContext) error {
args := req.args
if len(args) < 1 {
return ErrCmdParams
}
if n, err := c.db.LMclear(args...); err != nil {
if n, err := req.db.LMclear(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lexpireCommand(c *client) error {
args := c.args
func lexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -169,17 +169,17 @@ func lexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LExpire(args[0], duration); err != nil {
if v, err := req.db.LExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func lexpireAtCommand(c *client) error {
args := c.args
func lexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -189,40 +189,40 @@ func lexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LExpireAt(args[0], when); err != nil {
if v, err := req.db.LExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func lttlCommand(c *client) error {
args := c.args
func lttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.LTTL(args[0]); err != nil {
if v, err := req.db.LTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func lpersistCommand(c *client) error {
args := c.args
func lpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.LPersist(args[0]); err != nil {
if n, err := req.db.LPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -11,8 +11,8 @@ import (
"strings"
)
func slaveofCommand(c *client) error {
args := c.args
func slaveofCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
@ -31,23 +31,23 @@ func slaveofCommand(c *client) error {
masterAddr = fmt.Sprintf("%s:%s", args[0], args[1])
}
if err := c.app.slaveof(masterAddr); err != nil {
if err := req.app.slaveof(masterAddr); err != nil {
return err
}
c.writeStatus(OK)
req.resp.writeStatus(OK)
return nil
}
func fullsyncCommand(c *client) error {
func fullsyncCommand(req *requestContext) error {
//todo, multi fullsync may use same dump file
dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_")
dumpFile, err := ioutil.TempFile(req.app.cfg.DataDir, "dump_")
if err != nil {
return err
}
if err = c.app.ldb.Dump(dumpFile); err != nil {
if err = req.app.ldb.Dump(dumpFile); err != nil {
return err
}
@ -56,7 +56,7 @@ func fullsyncCommand(c *client) error {
dumpFile.Seek(0, os.SEEK_SET)
c.writeBulkFrom(n, dumpFile)
req.resp.writeBulkFrom(n, dumpFile)
name := dumpFile.Name()
dumpFile.Close()
@ -68,8 +68,8 @@ func fullsyncCommand(c *client) error {
var reserveInfoSpace = make([]byte, 16)
func syncCommand(c *client) error {
args := c.args
func syncCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -87,32 +87,32 @@ func syncCommand(c *client) error {
return ErrCmdParams
}
c.syncBuf.Reset()
req.syncBuf.Reset()
//reserve space to write master info
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
if _, err := req.syncBuf.Write(reserveInfoSpace); err != nil {
return err
}
m := &ledis.MasterInfo{logIndex, logPos}
if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil {
if _, err := req.app.ldb.ReadEventsTo(m, &req.syncBuf); err != nil {
return err
} else {
buf := c.syncBuf.Bytes()
buf := req.syncBuf.Bytes()
binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex))
binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos))
if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
if len(req.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
req.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
}
if buf, err = snappy.Encode(c.compressBuf, buf); err != nil {
if buf, err = snappy.Encode(req.compressBuf, buf); err != nil {
return err
}
c.writeBulk(buf)
req.resp.writeBulk(buf)
}
return nil

View File

@ -12,8 +12,8 @@ import (
var errScoreOverflow = errors.New("zset score overflow")
func zaddCommand(c *client) error {
args := c.args
func zaddCommand(req *requestContext) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -36,66 +36,66 @@ func zaddCommand(c *client) error {
params[i].Member = args[2*i+1]
}
if n, err := c.db.ZAdd(key, params...); err != nil {
if n, err := req.db.ZAdd(key, params...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zcardCommand(c *client) error {
args := c.args
func zcardCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.ZCard(args[0]); err != nil {
if n, err := req.db.ZCard(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zscoreCommand(c *client) error {
args := c.args
func zscoreCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if s, err := c.db.ZScore(args[0], args[1]); err != nil {
if s, err := req.db.ZScore(args[0], args[1]); err != nil {
if err == ledis.ErrScoreMiss {
c.writeBulk(nil)
req.resp.writeBulk(nil)
} else {
return err
}
} else {
c.writeBulk(ledis.StrPutInt64(s))
req.resp.writeBulk(ledis.StrPutInt64(s))
}
return nil
}
func zremCommand(c *client) error {
args := c.args
func zremCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.ZRem(args[0], args[1:]...); err != nil {
if n, err := req.db.ZRem(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zincrbyCommand(c *client) error {
args := c.args
func zincrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -107,10 +107,10 @@ func zincrbyCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil {
if v, err := req.db.ZIncrBy(key, delta, args[2]); err != nil {
return err
} else {
c.writeBulk(ledis.StrPutInt64(v))
req.resp.writeBulk(ledis.StrPutInt64(v))
}
return nil
@ -157,6 +157,10 @@ func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err e
err = ErrCmdParams
return
}
if maxBuf[0] == '(' {
ropen = true
maxBuf = maxBuf[1:]
}
if maxBuf[0] == '(' {
ropen = true
@ -182,8 +186,8 @@ func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err e
return
}
func zcountCommand(c *client) error {
args := c.args
func zcountCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -194,77 +198,77 @@ func zcountCommand(c *client) error {
}
if min > max {
c.writeInteger(0)
req.resp.writeInteger(0)
return nil
}
if n, err := c.db.ZCount(args[0], min, max); err != nil {
if n, err := req.db.ZCount(args[0], min, max); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zrankCommand(c *client) error {
args := c.args
func zrankCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.db.ZRank(args[0], args[1]); err != nil {
if n, err := req.db.ZRank(args[0], args[1]); err != nil {
return err
} else if n == -1 {
c.writeBulk(nil)
req.resp.writeBulk(nil)
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zrevrankCommand(c *client) error {
args := c.args
func zrevrankCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.db.ZRevRank(args[0], args[1]); err != nil {
if n, err := req.db.ZRevRank(args[0], args[1]); err != nil {
return err
} else if n == -1 {
c.writeBulk(nil)
req.resp.writeBulk(nil)
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zremrangebyrankCommand(c *client) error {
args := c.args
func zremrangebyrankCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
key := args[0]
start, stop, err := zparseRange(c, args[1], args[2])
start, stop, err := zparseRange(req, args[1], args[2])
if err != nil {
return ErrValue
}
if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil {
if n, err := req.db.ZRemRangeByRank(key, start, stop); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zremrangebyscoreCommand(c *client) error {
args := c.args
func zremrangebyscoreCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -275,16 +279,16 @@ func zremrangebyscoreCommand(c *client) error {
return err
}
if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil {
if n, err := req.db.ZRemRangeByScore(key, min, max); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err error) {
func zparseRange(req *requestContext, a1 []byte, a2 []byte) (start int, stop int, err error) {
if start, err = strconv.Atoi(ledis.String(a1)); err != nil {
return
}
@ -296,15 +300,15 @@ func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err erro
return
}
func zrangeGeneric(c *client, reverse bool) error {
args := c.args
func zrangeGeneric(req *requestContext, reverse bool) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
key := args[0]
start, stop, err := zparseRange(c, args[1], args[2])
start, stop, err := zparseRange(req, args[1], args[2])
if err != nil {
return ErrValue
}
@ -323,24 +327,24 @@ func zrangeGeneric(c *client, reverse bool) error {
}
}
if datas, err := c.db.ZRangeGeneric(key, start, stop, reverse); err != nil {
if datas, err := req.db.ZRangeGeneric(key, start, stop, reverse); err != nil {
return err
} else {
c.writeScorePairArray(datas, withScores)
req.resp.writeScorePairArray(datas, withScores)
}
return nil
}
func zrangeCommand(c *client) error {
return zrangeGeneric(c, false)
func zrangeCommand(req *requestContext) error {
return zrangeGeneric(req, false)
}
func zrevrangeCommand(c *client) error {
return zrangeGeneric(c, true)
func zrevrangeCommand(req *requestContext) error {
return zrangeGeneric(req, true)
}
func zrangebyscoreGeneric(c *client, reverse bool) error {
args := c.args
func zrangebyscoreGeneric(req *requestContext, reverse bool) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -396,59 +400,59 @@ func zrangebyscoreGeneric(c *client, reverse bool) error {
if offset < 0 {
//for ledis, if offset < 0, a empty will return
//so here we directly return a empty array
c.writeArray([]interface{}{})
req.resp.writeArray([]interface{}{})
return nil
}
if datas, err := c.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil {
if datas, err := req.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil {
return err
} else {
c.writeScorePairArray(datas, withScores)
req.resp.writeScorePairArray(datas, withScores)
}
return nil
}
func zrangebyscoreCommand(c *client) error {
return zrangebyscoreGeneric(c, false)
func zrangebyscoreCommand(req *requestContext) error {
return zrangebyscoreGeneric(req, false)
}
func zrevrangebyscoreCommand(c *client) error {
return zrangebyscoreGeneric(c, true)
func zrevrangebyscoreCommand(req *requestContext) error {
return zrangebyscoreGeneric(req, true)
}
func zclearCommand(c *client) error {
args := c.args
func zclearCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.ZClear(args[0]); err != nil {
if n, err := req.db.ZClear(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zmclearCommand(c *client) error {
args := c.args
func zmclearCommand(req *requestContext) error {
args := req.args
if len(args) < 1 {
return ErrCmdParams
}
if n, err := c.db.ZMclear(args...); err != nil {
if n, err := req.db.ZMclear(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zexpireCommand(c *client) error {
args := c.args
func zexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -458,17 +462,17 @@ func zexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ZExpire(args[0], duration); err != nil {
if v, err := req.db.ZExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func zexpireAtCommand(c *client) error {
args := c.args
func zexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -478,39 +482,39 @@ func zexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ZExpireAt(args[0], when); err != nil {
if v, err := req.db.ZExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func zttlCommand(c *client) error {
args := c.args
func zttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.ZTTL(args[0]); err != nil {
if v, err := req.db.ZTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func zpersistCommand(c *client) error {
args := c.args
func zpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.ZPersist(args[0]); err != nil {
if n, err := req.db.ZPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -8,7 +8,7 @@ import (
"strings"
)
type CommandFunc func(c *client) error
type CommandFunc func(req *requestContext) error
var regCmds = map[string]CommandFunc{}
@ -20,33 +20,33 @@ func register(name string, f CommandFunc) {
regCmds[name] = f
}
func pingCommand(c *client) error {
c.writeStatus(PONG)
func pingCommand(req *requestContext) error {
req.resp.writeStatus(PONG)
return nil
}
func echoCommand(c *client) error {
if len(c.args) != 1 {
func echoCommand(req *requestContext) error {
if len(req.args) != 1 {
return ErrCmdParams
}
c.writeBulk(c.args[0])
req.resp.writeBulk(req.args[0])
return nil
}
func selectCommand(c *client) error {
if len(c.args) != 1 {
func selectCommand(req *requestContext) error {
if len(req.args) != 1 {
return ErrCmdParams
}
if index, err := strconv.Atoi(ledis.String(c.args[0])); err != nil {
if index, err := strconv.Atoi(ledis.String(req.args[0])); err != nil {
return err
} else {
if db, err := c.ldb.Select(index); err != nil {
if db, err := req.ldb.Select(index); err != nil {
return err
} else {
c.db = db
c.writeStatus(OK)
req.db = db
req.resp.writeStatus(OK)
}
}
return nil

View File

@ -10,6 +10,8 @@ import (
type Config struct {
Addr string `json:"addr"`
HttpAddr string `json:"http_addr"`
DataDir string `json:"data_dir"`
DB struct {

View File

@ -26,4 +26,13 @@
//
// After you send slaveof command, the slave will start to sync master's binlog and replicate from binlog.
//
// HTTP Interface
// LedisDB provides http interfaces for most commands(except the replication commands)
//
// curl http://127.0.0.1:11181/SET/hello/world
// → {"SET":[true,"OK"]}
//
// curl http://127.0.0.1:11181/0/GET/hello?type=json
// → {"GET":"world"}
//
package server

42
server/http_interface.md Normal file
View File

@ -0,0 +1,42 @@
##HTTP Interface
LedisDB provides http interfaces for most commands.
####Request
The proper url format is
http://host:port[/db]/cmd/arg1/arg2/.../argN[?type=type]
'db' and 'type' are optional. 'db' stands for ledis db index, ranges from 0 to 15, its default value is 0. 'type' is a custom content type, can be json, bson or msgpack, json is default.
####Response
The response format is
{ cmd: return_value }
or
{ cmd: [success, message] }
'return_value' stands for the output of 'cmd', it can be a number, a string, a list, or a hash. If the return value is just a descriptive message, the second format will be taken, and 'success', a boolean value, indicates whether it is successful.
####Example
#####Curl
curl http://127.0.0.1:11181/SET/hello/world
→ {"SET":[true,"OK"]}
curl http://127.0.0.1:11181/0/GET/hello?type=json
→ {"GET":"world"}
#####Python
Requires [msgpack-python](https://pypi.python.org/pypi/msgpack-python) and [requests](https://pypi.python.org/pypi/requests/)
>>> import requests
>>> import msgpack
>>> requests.get("http://127.0.0.1:11181/0/SET/hello/world")
>>> r = requests.get("http://127.0.0.1:11181/0/GET/hello?type=msgpack")
>>> msgpack.unpackb(r.content)
>>> {"GET":"world"}

View File

@ -72,8 +72,8 @@ func (m *MasterInfo) Load(filePath string) error {
type master struct {
sync.Mutex
c net.Conn
rb *bufio.Reader
conn net.Conn
rb *bufio.Reader
app *App
@ -114,9 +114,9 @@ func (m *master) Close() {
default:
}
if m.c != nil {
m.c.Close()
m.c = nil
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
m.wg.Wait()
@ -135,17 +135,17 @@ func (m *master) connect() error {
return fmt.Errorf("no assign master addr")
}
if m.c != nil {
m.c.Close()
m.c = nil
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
if c, err := net.Dial("tcp", m.info.Addr); err != nil {
if conn, err := net.Dial("tcp", m.info.Addr); err != nil {
return err
} else {
m.c = c
m.conn = conn
m.rb = bufio.NewReaderSize(m.c, 4096)
m.rb = bufio.NewReaderSize(m.conn, 4096)
}
return nil
}
@ -248,7 +248,7 @@ var (
)
func (m *master) fullSync() error {
if _, err := m.c.Write(fullSyncCmd); err != nil {
if _, err := m.conn.Write(fullSyncCmd); err != nil {
return err
}
@ -291,7 +291,7 @@ func (m *master) sync() error {
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr))
if _, err := m.c.Write(cmd); err != nil {
if _, err := m.conn.Write(cmd); err != nil {
return err
}

116
server/request.go Normal file
View File

@ -0,0 +1,116 @@
package server
import (
"bytes"
"github.com/siddontang/ledisdb/ledis"
"io"
"time"
)
type responseWriter interface {
writeError(error)
writeStatus(string)
writeInteger(int64)
writeBulk([]byte)
writeArray([]interface{})
writeSliceArray([][]byte)
writeFVPairArray([]ledis.FVPair)
writeScorePairArray([]ledis.ScorePair, bool)
writeBulkFrom(int64, io.Reader)
flush()
}
type requestContext struct {
app *App
ldb *ledis.Ledis
db *ledis.DB
remoteAddr string
cmd string
args [][]byte
resp responseWriter
syncBuf bytes.Buffer
compressBuf []byte
reqErr chan error
buf bytes.Buffer
}
func newRequestContext(app *App) *requestContext {
req := new(requestContext)
req.app = app
req.ldb = app.ldb
req.db, _ = app.ldb.Select(0) //use default db
req.compressBuf = make([]byte, 256)
req.reqErr = make(chan error)
return req
}
func (req *requestContext) perform() {
var err error
start := time.Now()
if len(req.cmd) == 0 {
err = ErrEmptyCommand
} else if exeCmd, ok := regCmds[req.cmd]; !ok {
err = ErrNotFound
} else {
go func() {
req.reqErr <- exeCmd(req)
}()
err = <-req.reqErr
}
duration := time.Since(start)
if req.app.access != nil {
fullCmd := req.catGenericCommand()
cost := duration.Nanoseconds() / 1000000
truncateLen := len(fullCmd)
if truncateLen > 256 {
truncateLen = 256
}
req.app.access.Log(req.remoteAddr, cost, fullCmd[:truncateLen], err)
}
if err != nil {
req.resp.writeError(err)
}
req.resp.flush()
return
}
// func (h *requestHandler) catFullCommand(req *requestContext) []byte {
//
// // if strings.HasSuffix(cmd, "expire") {
// // catExpireCommand(c, buffer)
// // } else {
// // catGenericCommand(c, buffer)
// // }
//
// return h.catGenericCommand(req)
// }
func (req *requestContext) catGenericCommand() []byte {
buffer := req.buf
buffer.Reset()
buffer.Write([]byte(req.cmd))
for _, arg := range req.args {
buffer.WriteByte(' ')
buffer.Write(arg)
}
return buffer.Bytes()
}