Merge branch 'develop'

This commit is contained in:
siddontang 2014-08-05 14:12:30 +08:00
commit 89c2a1678c
36 changed files with 3126 additions and 703 deletions

View File

@ -1,18 +1,19 @@
# LedisDB
Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis.
Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash, zset, bitmap, and may be alternative for Redis.
LedisDB now supports multi database as backend to store data, you can test and choose the proper one for you.
## Features
+ Rich advanced data structure: KV, List, Hash, ZSet, Bit.
+ Rich advanced data structure: KV, List, Hash, ZSet, Bitmap.
+ Stores lots of data, over the memory limit.
+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB.
+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB.
+ Supports expiration and ttl.
+ Redis clients, like redis-cli, are supported directly.
+ Multi client API supports, including Golang, Python, Lua(Openresty).
+ Easy to embed in Golang application.
+ Restful API support, json/bson/msgpack output.
+ Replication to guarantee data safe.
+ Supplies tools to load, dump, repair database.
@ -94,6 +95,14 @@ You must known that changing store database runtime is very dangerous, LedisDB w
ledis 127.0.0.1:6380> get a
"1"
//use curl
curl http://127.0.0.1:11181/SET/hello/world
→ {"SET":[true,"OK"]}
curl http://127.0.0.1:11181/0/GET/hello?type=json
→ {"GET":"world"}
## Package Example
import "github.com/siddontang/ledisdb/ledis"

View File

@ -8,4 +8,10 @@ go get github.com/siddontang/copier
go get github.com/siddontang/goleveldb/leveldb
go get -d github.com/siddontang/gomdb
go get github.com/szferi/gomdb
go get github.com/boltdb/bolt
go get gopkg.in/mgo.v2/bson
go get github.com/ugorji/go/codec

18
client/nodejs/LICENSE Normal file
View File

@ -0,0 +1,18 @@
The MIT License (MIT)
Copyright (c) 2010 Matthew Ranney, http://ranney.com/
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

77
client/nodejs/README.md Normal file
View File

@ -0,0 +1,77 @@
###Ledis - a node.js LedisDB client
This is a modification of [simplegeo/nodejs-redis](https://github.com/simplegeo/nodejs-redis) , aiming to be compatible with LedisDB.
###Setup
Just copy (or move) the ledis directory into your project's **node_modules** directory.
cp -r /path/to/ledisdb/client/nodejs/ledis /path/to/your/node_modules/
###Example
Below is the total content of example.js, including the ledisDB's special commands.
var ledis = require("ledis"),
client = ledis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
client.set("string key", "string val", ledis.print);
client.get("string key", ledis.print);
client.hset("hash key", "hashtest 1", "some value", ledis.print);
client.hset(["hash key", "hashtest 2", "some other value"], ledis.print);
client.hkeys("hash key", function (err, replies) {
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
console.log(" " + i + ": " + reply);
});
});
//ledis special commands
client.lpush("list key", "1", "2", "3", ledis.print);
client.lrange("list key", "0", "2", ledis.print);
client.lclear("list key", ledis.print);
client.lrange("list key", "0", "2", ledis.print);
client.zadd("zset key", 100, "m", ledis.print);
client.zexpire("zset key", 40, ledis.print);
client.zttl("zset key", ledis.print);
client.bsetbit("bit key 1", 1, 1, ledis.print);
client.bsetbit("bit key 2", 1, 1, ledis.print);
client.bopt("and", "bit key 3", "bit key 1", "bit key 2", ledis.print);
client.bget("bit key 3", function(err, result){
if (result=="\x02"){
console.log("Reply: \\x02")
}
});
client.quit();
Run the example in your project directory, and will display
wyk:~/my/project/dir/$ node example.js
Reply: OK
Reply: string val
Reply: 0
Reply: 0
2 replies:
0: hashtest 1
1: hashtest 2
Reply: 3
Reply: 3,2,1
Reply: 3
Reply:
Reply: 1
Reply: 1
Reply: 40
Reply: 1
Reply: 1
Reply: 2
Reply: \x02

38
client/nodejs/example.js Normal file
View File

@ -0,0 +1,38 @@
var ledis = require("ledis"),
client = ledis.createClient();
client.on("error", function (err) {
console.log("Error " + err);
});
client.set("string key", "string val", ledis.print);
client.get("string key", ledis.print);
client.hset("hash key", "hashtest 1", "some value", ledis.print);
client.hset(["hash key", "hashtest 2", "some other value"], ledis.print);
client.hkeys("hash key", function (err, replies) {
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
console.log(" " + i + ": " + reply);
});
});
//ledis special commands
client.lpush("list key", "1", "2", "3", ledis.print);
client.lrange("list key", "0", "2", ledis.print);
client.lclear("list key", ledis.print);
client.lrange("list key", "0", "2", ledis.print);
client.zadd("zset key", 100, "m", ledis.print);
client.zexpire("zset key", 40, ledis.print);
client.zttl("zset key", ledis.print);
client.bsetbit("bit key 1", 1, 1, ledis.print);
client.bsetbit("bit key 2", 1, 1, ledis.print);
client.bopt("and", "bit key 3", "bit key 1", "bit key 2", ledis.print);
client.bget("bit key 3", function(err, result){
if (result=="\x02"){
console.log("Reply: \\x02")
}
});
client.quit();

View File

@ -0,0 +1,8 @@
examples/
benches/
test.js
diff_multi_bench_output.js
generate_commands.js
multi_bench.js
test-unref.js
changelog.md

View File

@ -0,0 +1,80 @@
var net = require('net');
var proxyPort = 6379;
var counter = 0;
function breaker(conn) {
conn.end();
conn.destroy();
}
var server = net.createServer(function(conn) {
counter++;
var proxyConn = net.createConnection({
port: proxyPort
});
conn.pipe(proxyConn);
proxyConn.pipe(conn);
proxyConn.on('end', function() {
conn.end();
});
conn.on('end', function() {
proxyConn.end();
});
conn.on('close', function() {
proxyConn.end();
});
proxyConn.on('close', function() {
conn.end();
});
proxyConn.on('error', function() {
conn.end();
});
conn.on('error', function() {
proxyConn.end();
});
setTimeout(breaker.bind(null, conn), Math.floor(Math.random() * 2000));
});
server.listen(6479);
var redis = require('./');
var port = 6479;
var client = redis.createClient(6479, 'localhost');
function iter() {
var k = "k" + Math.floor(Math.random() * 10);
var coinflip = Math.random() > 0.5;
if (coinflip) {
client.set(k, k, function(err, resp) {
if (!err && resp !== "OK") {
console.log("Unexpected set response " + resp);
}
});
} else {
client.get(k, function(err, resp) {
if (!err) {
if (k !== resp) {
console.log("Key response mismatch: " + k + " " + resp);
}
}
});
}
}
function iters() {
for (var i = 0; i < 100; ++i) {
iter();
}
setTimeout(iters, 10);
}
client.on("connect", function () {
iters();
});
client.on("error", function (err) {
console.log("Client error " + err);
});

1245
client/nodejs/ledis/index.js Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,94 @@
// This file was generated by ./generate_commands.js on Wed Apr 23 2014 14:51:21 GMT-0700 (PDT)
module.exports = [
"quit",
"bget",
"bdelete",
"bsetbit",
"bgetbit",
"bmsetbit",
"bcount",
"bopt",
"bexpire",
"bexpireat",
"bttl",
"bpersist",
"hdel",
"hexists",
"hget",
"hgetall",
"hincrby",
"hkeys",
"hlen",
"hmget",
"hmset",
"hset",
"hvals",
"hclear",
"hmclear",
"hexpire",
"hexpireat",
"httl",
"hpersist",
"decr",
"decrby",
"del",
"exists",
"get",
"getset",
"incr",
"incrby",
"mget",
"mset",
"set",
"setnx",
"expire",
"expireat",
"ttl",
"persist",
"lindex",
"llen",
"lpop",
"lrange",
"lpush",
"rpop",
"rpush",
"lclear",
"lmclear",
"lexpire",
"lexpireat",
"lttl",
"lpersist",
"zadd",
"zcard",
"zcount",
"zincrby",
"zrange",
"zrangebyscore",
"zrank",
"zrem",
"zremrangebyrank",
"zremrangebyscore",
"zrevrange",
"zrevrank",
"zrevrangebyscore",
"zscore",
"zclear",
"zmclear",
"zexpire",
"zexpireat",
"zttl",
"zpersist",
];

View File

@ -0,0 +1,46 @@
var events = require("events"),
util = require("../util"),
hiredis = require("hiredis");
exports.debug_mode = false;
exports.name = "hiredis";
function HiredisReplyParser(options) {
this.name = exports.name;
this.options = options || {};
this.reset();
events.EventEmitter.call(this);
}
util.inherits(HiredisReplyParser, events.EventEmitter);
exports.Parser = HiredisReplyParser;
HiredisReplyParser.prototype.reset = function () {
this.reader = new hiredis.Reader({
return_buffers: this.options.return_buffers || false
});
};
HiredisReplyParser.prototype.execute = function (data) {
var reply;
this.reader.feed(data);
while (true) {
try {
reply = this.reader.get();
} catch (err) {
this.emit("error", err);
break;
}
if (reply === undefined) {
break;
}
if (reply && reply.constructor === Error) {
this.emit("reply error", reply);
} else {
this.emit("reply", reply);
}
}
};

View File

@ -0,0 +1,301 @@
var events = require("events"),
util = require("../util");
function Packet(type, size) {
this.type = type;
this.size = +size;
}
exports.name = "javascript";
exports.debug_mode = false;
function ReplyParser(options) {
this.name = exports.name;
this.options = options || { };
this._buffer = null;
this._offset = 0;
this._encoding = "utf-8";
this._debug_mode = options.debug_mode;
this._reply_type = null;
}
util.inherits(ReplyParser, events.EventEmitter);
exports.Parser = ReplyParser;
function IncompleteReadBuffer(message) {
this.name = "IncompleteReadBuffer";
this.message = message;
}
util.inherits(IncompleteReadBuffer, Error);
// Buffer.toString() is quite slow for small strings
function small_toString(buf, start, end) {
var tmp = "", i;
for (i = start; i < end; i++) {
tmp += String.fromCharCode(buf[i]);
}
return tmp;
}
ReplyParser.prototype._parseResult = function (type) {
var start, end, offset, packetHeader;
if (type === 43 || type === 45) { // + or -
// up to the delimiter
end = this._packetEndOffset() - 1;
start = this._offset;
// include the delimiter
this._offset = end + 2;
if (end > this._buffer.length) {
this._offset = start;
throw new IncompleteReadBuffer("Wait for more data.");
}
if (this.options.return_buffers) {
return this._buffer.slice(start, end);
} else {
if (end - start < 65536) { // completely arbitrary
return small_toString(this._buffer, start, end);
} else {
return this._buffer.toString(this._encoding, start, end);
}
}
} else if (type === 58) { // :
// up to the delimiter
end = this._packetEndOffset() - 1;
start = this._offset;
// include the delimiter
this._offset = end + 2;
if (end > this._buffer.length) {
this._offset = start;
throw new IncompleteReadBuffer("Wait for more data.");
}
if (this.options.return_buffers) {
return this._buffer.slice(start, end);
}
// return the coerced numeric value
return +small_toString(this._buffer, start, end);
} else if (type === 36) { // $
// set a rewind point, as the packet could be larger than the
// buffer in memory
offset = this._offset - 1;
packetHeader = new Packet(type, this.parseHeader());
// packets with a size of -1 are considered null
if (packetHeader.size === -1) {
return undefined;
}
end = this._offset + packetHeader.size;
start = this._offset;
// set the offset to after the delimiter
this._offset = end + 2;
if (end > this._buffer.length) {
this._offset = offset;
throw new IncompleteReadBuffer("Wait for more data.");
}
if (this.options.return_buffers) {
return this._buffer.slice(start, end);
} else {
return this._buffer.toString(this._encoding, start, end);
}
} else if (type === 42) { // *
offset = this._offset;
packetHeader = new Packet(type, this.parseHeader());
if (packetHeader.size < 0) {
return null;
}
if (packetHeader.size > this._bytesRemaining()) {
this._offset = offset - 1;
throw new IncompleteReadBuffer("Wait for more data.");
}
var reply = [ ];
var ntype, i, res;
offset = this._offset - 1;
for (i = 0; i < packetHeader.size; i++) {
ntype = this._buffer[this._offset++];
if (this._offset > this._buffer.length) {
throw new IncompleteReadBuffer("Wait for more data.");
}
res = this._parseResult(ntype);
if (res === undefined) {
res = null;
}
reply.push(res);
}
return reply;
}
};
ReplyParser.prototype.execute = function (buffer) {
this.append(buffer);
var type, ret, offset;
while (true) {
offset = this._offset;
try {
// at least 4 bytes: :1\r\n
if (this._bytesRemaining() < 4) {
break;
}
type = this._buffer[this._offset++];
if (type === 43) { // +
ret = this._parseResult(type);
if (ret === null) {
break;
}
this.send_reply(ret);
} else if (type === 45) { // -
ret = this._parseResult(type);
if (ret === null) {
break;
}
this.send_error(ret);
} else if (type === 58) { // :
ret = this._parseResult(type);
if (ret === null) {
break;
}
this.send_reply(ret);
} else if (type === 36) { // $
ret = this._parseResult(type);
if (ret === null) {
break;
}
// check the state for what is the result of
// a -1, set it back up for a null reply
if (ret === undefined) {
ret = null;
}
this.send_reply(ret);
} else if (type === 42) { // *
// set a rewind point. if a failure occurs,
// wait for the next execute()/append() and try again
offset = this._offset - 1;
ret = this._parseResult(type);
this.send_reply(ret);
}
} catch (err) {
// catch the error (not enough data), rewind, and wait
// for the next packet to appear
if (! (err instanceof IncompleteReadBuffer)) {
throw err;
}
this._offset = offset;
break;
}
}
};
ReplyParser.prototype.append = function (newBuffer) {
if (!newBuffer) {
return;
}
// first run
if (this._buffer === null) {
this._buffer = newBuffer;
return;
}
// out of data
if (this._offset >= this._buffer.length) {
this._buffer = newBuffer;
this._offset = 0;
return;
}
// very large packet
// check for concat, if we have it, use it
if (Buffer.concat !== undefined) {
this._buffer = Buffer.concat([this._buffer.slice(this._offset), newBuffer]);
} else {
var remaining = this._bytesRemaining(),
newLength = remaining + newBuffer.length,
tmpBuffer = new Buffer(newLength);
this._buffer.copy(tmpBuffer, 0, this._offset);
newBuffer.copy(tmpBuffer, remaining, 0);
this._buffer = tmpBuffer;
}
this._offset = 0;
};
ReplyParser.prototype.parseHeader = function () {
var end = this._packetEndOffset(),
value = small_toString(this._buffer, this._offset, end - 1);
this._offset = end + 1;
return value;
};
ReplyParser.prototype._packetEndOffset = function () {
var offset = this._offset;
while (this._buffer[offset] !== 0x0d && this._buffer[offset + 1] !== 0x0a) {
offset++;
if (offset >= this._buffer.length) {
throw new IncompleteReadBuffer("didn't see LF after NL reading multi bulk count (" + offset + " => " + this._buffer.length + ", " + this._offset + ")");
}
}
offset++;
return offset;
};
ReplyParser.prototype._bytesRemaining = function () {
return (this._buffer.length - this._offset) < 0 ? 0 : (this._buffer.length - this._offset);
};
ReplyParser.prototype.parser_error = function (message) {
this.emit("error", message);
};
ReplyParser.prototype.send_error = function (reply) {
this.emit("reply error", reply);
};
ReplyParser.prototype.send_reply = function (reply) {
this.emit("reply", reply);
};

View File

@ -0,0 +1,59 @@
// Queue class adapted from Tim Caswell's pattern library
// http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js
function Queue() {
this.tail = [];
this.head = [];
this.offset = 0;
}
Queue.prototype.shift = function () {
if (this.offset === this.head.length) {
var tmp = this.head;
tmp.length = 0;
this.head = this.tail;
this.tail = tmp;
this.offset = 0;
if (this.head.length === 0) {
return;
}
}
return this.head[this.offset++]; // sorry, JSLint
};
Queue.prototype.push = function (item) {
return this.tail.push(item);
};
Queue.prototype.forEach = function (fn, thisv) {
var array = this.head.slice(this.offset), i, il;
array.push.apply(array, this.tail);
if (thisv) {
for (i = 0, il = array.length; i < il; i += 1) {
fn.call(thisv, array[i], i, array);
}
} else {
for (i = 0, il = array.length; i < il; i += 1) {
fn(array[i], i, array);
}
}
return array;
};
Queue.prototype.getLength = function () {
return this.head.length - this.offset + this.tail.length;
};
Object.defineProperty(Queue.prototype, "length", {
get: function () {
return this.getLength();
}
});
if (typeof module !== "undefined" && module.exports) {
module.exports = Queue;
}

View File

@ -0,0 +1,12 @@
function to_array(args) {
var len = args.length,
arr = new Array(len), i;
for (i = 0; i < len; i += 1) {
arr[i] = args[i];
}
return arr;
}
module.exports = to_array;

View File

@ -0,0 +1,11 @@
// Support for very old versions of node where the module was called "sys". At some point, we should abandon this.
var util;
try {
util = require("util");
} catch (err) {
util = require("sys");
}
module.exports = util;

View File

@ -0,0 +1,15 @@
{
"name": "ledis",
"version": "0.1",
"description": "Ledis client library",
"keywords": [
"ledis",
"nosql"
],
"main": "./index.js",
"devDependencies": {
"metrics": ">=0.1.5",
"colors": "~0.6.0-1",
"underscore": "~1.4.4"
}
}

View File

@ -3,6 +3,9 @@ package main
import (
"flag"
"github.com/siddontang/ledisdb/server"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
@ -52,5 +55,9 @@ func main() {
app.Close()
}()
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
app.Run()
}

View File

@ -1,5 +1,6 @@
{
"addr": "127.0.0.1:6380",
"http_addr": "127.0.0.1:11181",
"data_dir": "/tmp/ledis_server",
"db": {

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/siddontang/ledisdb/ledis"
"net"
"net/http"
"path"
"strings"
)
@ -12,6 +13,7 @@ type App struct {
cfg *Config
listener net.Listener
httpListener net.Listener
ldb *ledis.Ledis
@ -25,6 +27,14 @@ type App struct {
m *master
}
func netType(s string) string {
if strings.Contains(s, "/") {
return "unix"
} else {
return "tcp"
}
}
func NewApp(cfg *Config) (*App, error) {
if len(cfg.DataDir) == 0 {
return nil, fmt.Errorf("must set data_dir first")
@ -40,15 +50,15 @@ func NewApp(cfg *Config) (*App, error) {
var err error
if strings.Contains(cfg.Addr, "/") {
app.listener, err = net.Listen("unix", cfg.Addr)
} else {
app.listener, err = net.Listen("tcp", cfg.Addr)
if app.listener, err = net.Listen(netType(cfg.Addr), cfg.Addr); err != nil {
return nil, err
}
if err != nil {
if len(cfg.HttpAddr) > 0 {
if app.httpListener, err = net.Listen(netType(cfg.HttpAddr), cfg.HttpAddr); err != nil {
return nil, err
}
}
if len(cfg.AccessLog) > 0 {
if path.Dir(cfg.AccessLog) == "." {
@ -82,6 +92,10 @@ func (app *App) Close() {
app.listener.Close()
if app.httpListener != nil {
app.httpListener.Close()
}
app.m.Close()
if app.access != nil {
@ -96,16 +110,33 @@ func (app *App) Run() {
app.slaveof(app.cfg.SlaveOf)
}
go app.httpServe()
for !app.closed {
conn, err := app.listener.Accept()
if err != nil {
continue
}
newClient(conn, app)
newClientRESP(conn, app)
}
}
func (app *App) httpServe() {
if app.httpListener == nil {
return
}
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
newClientHTTP(app, w, r)
})
svr := http.Server{Handler: mux}
svr.Serve(app.httpListener)
}
func (app *App) Ledis() *ledis.Ledis {
return app.ldb
}

View File

@ -1,315 +0,0 @@
package server
import (
"bufio"
"bytes"
"errors"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net"
"runtime"
"strconv"
"strings"
"time"
)
var errReadRequest = errors.New("invalid request protocol")
type client struct {
app *App
ldb *ledis.Ledis
db *ledis.DB
c net.Conn
rb *bufio.Reader
wb *bufio.Writer
cmd string
args [][]byte
reqC chan error
syncBuf bytes.Buffer
compressBuf []byte
logBuf bytes.Buffer
}
func newClient(c net.Conn, app *App) {
co := new(client)
co.app = app
co.ldb = app.ldb
//use default db
co.db, _ = app.ldb.Select(0)
co.c = c
co.rb = bufio.NewReaderSize(c, 256)
co.wb = bufio.NewWriterSize(c, 256)
co.reqC = make(chan error, 1)
co.compressBuf = make([]byte, 256)
go co.run()
}
func (c *client) run() {
defer func() {
if e := recover(); e != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[0:n]
log.Fatal("client run panic %s:%v", buf, e)
}
c.c.Close()
}()
for {
req, err := c.readRequest()
if err != nil {
return
}
c.handleRequest(req)
}
}
func (c *client) readLine() ([]byte, error) {
return ReadLine(c.rb)
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *client) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if nparams <= 0 {
return nil, errReadRequest
}
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
buf := make([]byte, n)
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
}
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
req = append(req, buf)
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *client) handleRequest(req [][]byte) {
var err error
start := time.Now()
if len(req) == 0 {
err = ErrEmptyCommand
} else {
c.cmd = strings.ToLower(ledis.String(req[0]))
c.args = req[1:]
f, ok := regCmds[c.cmd]
if !ok {
err = ErrNotFound
} else {
go func() {
c.reqC <- f(c)
}()
err = <-c.reqC
}
}
duration := time.Since(start)
if c.app.access != nil {
c.logBuf.Reset()
for i, r := range req {
left := 256 - c.logBuf.Len()
if left <= 0 {
break
} else if len(r) <= left {
c.logBuf.Write(r)
if i != len(req)-1 {
c.logBuf.WriteByte(' ')
}
} else {
c.logBuf.Write(r[0:left])
}
}
c.app.access.Log(c.c.RemoteAddr().String(), duration.Nanoseconds()/1000000, c.logBuf.Bytes(), err)
}
if err != nil {
c.writeError(err)
}
c.wb.Flush()
}
func (c *client) writeError(err error) {
c.wb.Write(ledis.Slice("-ERR"))
if err != nil {
c.wb.WriteByte(' ')
c.wb.Write(ledis.Slice(err.Error()))
}
c.wb.Write(Delims)
}
func (c *client) writeStatus(status string) {
c.wb.WriteByte('+')
c.wb.Write(ledis.Slice(status))
c.wb.Write(Delims)
}
func (c *client) writeInteger(n int64) {
c.wb.WriteByte(':')
c.wb.Write(ledis.StrPutInt64(n))
c.wb.Write(Delims)
}
func (c *client) writeBulk(b []byte) {
c.wb.WriteByte('$')
if b == nil {
c.wb.Write(NullBulk)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(b))))
c.wb.Write(Delims)
c.wb.Write(b)
}
c.wb.Write(Delims)
}
func (c *client) writeArray(ay []interface{}) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
switch v := ay[i].(type) {
case []interface{}:
c.writeArray(v)
case []byte:
c.writeBulk(v)
case nil:
c.writeBulk(nil)
case int64:
c.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
func (c *client) writeSliceArray(ay [][]byte) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i])
}
}
}
func (c *client) writeFVPairArray(ay []ledis.FVPair) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Field)
c.writeBulk(ay[i].Value)
}
}
}
func (c *client) writeScorePairArray(ay []ledis.ScorePair, withScores bool) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
if withScores {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
}
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Member)
if withScores {
c.writeBulk(ledis.StrPutInt64(ay[i].Score))
}
}
}
}
func (c *client) writeBulkFrom(n int64, rb io.Reader) {
c.wb.WriteByte('$')
c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10)))
c.wb.Write(Delims)
io.Copy(c.wb, rb)
c.wb.Write(Delims)
}

260
server/client_http.go Normal file
View File

@ -0,0 +1,260 @@
package server
import (
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net/http"
"strconv"
"strings"
"encoding/json"
"github.com/ugorji/go/codec"
"gopkg.in/mgo.v2/bson"
)
var allowedContentTypes = map[string]struct{}{
"json": struct{}{},
"bson": struct{}{},
"msgpack": struct{}{},
}
var unsopportedCommands = map[string]struct{}{
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"quit": struct{}{},
}
type httpClient struct {
app *App
db *ledis.DB
ldb *ledis.Ledis
resp responseWriter
req *requestContext
}
type httpWriter struct {
contentType string
cmd string
w http.ResponseWriter
}
func newClientHTTP(app *App, w http.ResponseWriter, r *http.Request) {
var err error
c := new(httpClient)
c.app = app
c.ldb = app.ldb
c.db, err = c.ldb.Select(0)
if err != nil {
w.Write([]byte(err.Error()))
return
}
c.req, err = c.makeRequest(app, r, w)
if err != nil {
w.Write([]byte(err.Error()))
return
}
c.req.perform()
}
func (c *httpClient) addr(r *http.Request) string {
return r.RemoteAddr
}
func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWriter) (*requestContext, error) {
var err error
db, cmd, argsStr, contentType := c.parseReqPath(r)
c.db, err = app.ldb.Select(db)
if err != nil {
return nil, err
}
contentType = strings.ToLower(contentType)
if _, ok := allowedContentTypes[contentType]; !ok {
return nil, fmt.Errorf("unsupported content type: '%s', only json, bson, msgpack are supported", contentType)
}
req := newRequestContext(app)
args := make([][]byte, len(argsStr))
for i, arg := range argsStr {
args[i] = []byte(arg)
}
req.cmd = strings.ToLower(cmd)
if _, ok := unsopportedCommands[req.cmd]; ok {
return nil, fmt.Errorf("unsupported command: '%s'", cmd)
}
req.args = args
req.remoteAddr = c.addr(r)
req.resp = &httpWriter{contentType, cmd, w}
return req, nil
}
func (c *httpClient) parseReqPath(r *http.Request) (db int, cmd string, args []string, contentType string) {
contentType = r.FormValue("type")
if contentType == "" {
contentType = "json"
}
substrings := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/")
if len(substrings) == 1 {
return 0, substrings[0], substrings[1:], contentType
}
db, err := strconv.Atoi(substrings[0])
if err != nil {
cmd = substrings[0]
args = substrings[1:]
} else {
cmd = substrings[1]
args = substrings[2:]
}
return
}
// http writer
func (w *httpWriter) genericWrite(result interface{}) {
m := map[string]interface{}{
w.cmd: result,
}
switch w.contentType {
case "json":
writeJSON(&m, w.w)
case "bson":
writeBSON(&m, w.w)
case "msgpack":
writeMsgPack(&m, w.w)
default:
log.Error("invalid content type %s", w.contentType)
}
}
func (w *httpWriter) writeError(err error) {
result := [2]interface{}{
false,
fmt.Sprintf("ERR %s", err.Error()),
}
w.genericWrite(result)
}
func (w *httpWriter) writeStatus(status string) {
var success bool
if status == OK || status == PONG {
success = true
}
w.genericWrite([]interface{}{success, status})
}
func (w *httpWriter) writeInteger(n int64) {
w.genericWrite(n)
}
func (w *httpWriter) writeBulk(b []byte) {
if b == nil {
w.genericWrite(nil)
} else {
w.genericWrite(ledis.String(b))
}
}
func (w *httpWriter) writeArray(lst []interface{}) {
w.genericWrite(lst)
}
func (w *httpWriter) writeSliceArray(lst [][]byte) {
arr := make([]interface{}, len(lst))
for i, elem := range lst {
if elem == nil {
arr[i] = nil
} else {
arr[i] = ledis.String(elem)
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) {
m := make(map[string]string)
for _, elem := range lst {
m[ledis.String(elem.Field)] = ledis.String(elem.Value)
}
w.genericWrite(m)
}
func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
var arr []string
if withScores {
arr = make([]string, 2*len(lst))
for i, data := range lst {
arr[2*i] = ledis.String(data.Member)
arr[2*i+1] = strconv.FormatInt(data.Score, 10)
}
} else {
arr = make([]string, len(lst))
for i, data := range lst {
arr[i] = ledis.String(data.Member)
}
}
w.genericWrite(arr)
}
func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) {
w.writeError(fmt.Errorf("unsuport"))
}
func (w *httpWriter) flush() {
}
func writeJSON(resutl interface{}, w http.ResponseWriter) {
buf, err := json.Marshal(resutl)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/json; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeBSON(result interface{}, w http.ResponseWriter) {
buf, err := bson.Marshal(result)
if err != nil {
log.Error(err.Error())
return
}
w.Header().Set("Content-type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(len(buf)))
_, err = w.Write(buf)
if err != nil {
log.Error(err.Error())
}
}
func writeMsgPack(result interface{}, w http.ResponseWriter) {
w.Header().Set("Content-type", "application/octet-stream")
var mh codec.MsgpackHandle
enc := codec.NewEncoder(w, &mh)
if err := enc.Encode(result); err != nil {
log.Error(err.Error())
}
}

292
server/client_resp.go Normal file
View File

@ -0,0 +1,292 @@
package server
import (
"bufio"
"errors"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
"io"
"net"
"runtime"
"strconv"
"strings"
)
var errReadRequest = errors.New("invalid request protocol")
type respClient struct {
app *App
ldb *ledis.Ledis
db *ledis.DB
conn net.Conn
rb *bufio.Reader
req *requestContext
}
type respWriter struct {
buff *bufio.Writer
}
func newClientRESP(conn net.Conn, app *App) {
c := new(respClient)
c.app = app
c.conn = conn
c.ldb = app.ldb
c.db, _ = app.ldb.Select(0)
c.rb = bufio.NewReaderSize(conn, 256)
c.req = newRequestContext(app)
c.req.resp = newWriterRESP(conn)
c.req.remoteAddr = conn.RemoteAddr().String()
go c.run()
}
func (c *respClient) run() {
defer func() {
if e := recover(); e != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[0:n]
log.Fatal("client run panic %s:%v", buf, e)
}
c.conn.Close()
}()
for {
reqData, err := c.readRequest()
if err != nil {
return
}
c.handleRequest(reqData)
}
}
func (c *respClient) readLine() ([]byte, error) {
return ReadLine(c.rb)
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *respClient) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if nparams <= 0 {
return nil, errReadRequest
}
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
buf := make([]byte, n)
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
}
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
req = append(req, buf)
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *respClient) handleRequest(reqData [][]byte) {
req := c.req
if len(reqData) == 0 {
c.req.cmd = ""
c.req.args = reqData[0:0]
} else {
c.req.cmd = strings.ToLower(ledis.String(reqData[0]))
c.req.args = reqData[1:]
}
if c.req.cmd == "quit" {
c.req.resp.writeStatus(OK)
c.req.resp.flush()
c.conn.Close()
return
}
req.db = c.db
c.req.perform()
c.db = req.db // "SELECT"
return
}
// response writer
func newWriterRESP(conn net.Conn) *respWriter {
w := new(respWriter)
w.buff = bufio.NewWriterSize(conn, 256)
return w
}
func (w *respWriter) writeError(err error) {
w.buff.Write(ledis.Slice("-ERR"))
if err != nil {
w.buff.WriteByte(' ')
w.buff.Write(ledis.Slice(err.Error()))
}
w.buff.Write(Delims)
}
func (w *respWriter) writeStatus(status string) {
w.buff.WriteByte('+')
w.buff.Write(ledis.Slice(status))
w.buff.Write(Delims)
}
func (w *respWriter) writeInteger(n int64) {
w.buff.WriteByte(':')
w.buff.Write(ledis.StrPutInt64(n))
w.buff.Write(Delims)
}
func (w *respWriter) writeBulk(b []byte) {
w.buff.WriteByte('$')
if b == nil {
w.buff.Write(NullBulk)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(b))))
w.buff.Write(Delims)
w.buff.Write(b)
}
w.buff.Write(Delims)
}
func (w *respWriter) writeArray(lst []interface{}) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
switch v := lst[i].(type) {
case []interface{}:
w.writeArray(v)
case []byte:
w.writeBulk(v)
case nil:
w.writeBulk(nil)
case int64:
w.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
func (w *respWriter) writeSliceArray(lst [][]byte) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i])
}
}
}
func (w *respWriter) writeFVPairArray(lst []ledis.FVPair) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2)))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i].Field)
w.writeBulk(lst[i].Value)
}
}
}
func (w *respWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
if withScores {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2)))
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
}
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i].Member)
if withScores {
w.writeBulk(ledis.StrPutInt64(lst[i].Score))
}
}
}
}
func (w *respWriter) writeBulkFrom(n int64, rb io.Reader) {
w.buff.WriteByte('$')
w.buff.Write(ledis.Slice(strconv.FormatInt(n, 10)))
w.buff.Write(Delims)
io.Copy(w.buff, rb)
w.buff.Write(Delims)
}
func (w *respWriter) flush() {
w.buff.Flush()
}

View File

@ -5,36 +5,36 @@ import (
"strings"
)
func bgetCommand(c *client) error {
args := c.args
func bgetCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.BGet(args[0]); err != nil {
if v, err := req.db.BGet(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func bdeleteCommand(c *client) error {
args := c.args
func bdeleteCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.BDelete(args[0]); err != nil {
if n, err := req.db.BDelete(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func bsetbitCommand(c *client) error {
args := c.args
func bsetbitCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -58,16 +58,16 @@ func bsetbitCommand(c *client) error {
return ErrBool
}
if ori, err := c.db.BSetBit(args[0], offset, uint8(val)); err != nil {
if ori, err := req.db.BSetBit(args[0], offset, uint8(val)); err != nil {
return err
} else {
c.writeInteger(int64(ori))
req.resp.writeInteger(int64(ori))
}
return nil
}
func bgetbitCommand(c *client) error {
args := c.args
func bgetbitCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -78,16 +78,16 @@ func bgetbitCommand(c *client) error {
return ErrOffset
}
if v, err := c.db.BGetBit(args[0], offset); err != nil {
if v, err := req.db.BGetBit(args[0], offset); err != nil {
return err
} else {
c.writeInteger(int64(v))
req.resp.writeInteger(int64(v))
}
return nil
}
func bmsetbitCommand(c *client) error {
args := c.args
func bmsetbitCommand(req *requestContext) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -124,16 +124,16 @@ func bmsetbitCommand(c *client) error {
pairs[i].Val = uint8(val)
}
if place, err := c.db.BMSetBit(key, pairs...); err != nil {
if place, err := req.db.BMSetBit(key, pairs...); err != nil {
return err
} else {
c.writeInteger(place)
req.resp.writeInteger(place)
}
return nil
}
func bcountCommand(c *client) error {
args := c.args
func bcountCommand(req *requestContext) error {
args := req.args
argCnt := len(args)
if !(argCnt > 0 && argCnt <= 3) {
@ -159,16 +159,16 @@ func bcountCommand(c *client) error {
}
}
if cnt, err := c.db.BCount(args[0], start, end); err != nil {
if cnt, err := req.db.BCount(args[0], start, end); err != nil {
return err
} else {
c.writeInteger(int64(cnt))
req.resp.writeInteger(int64(cnt))
}
return nil
}
func boptCommand(c *client) error {
args := c.args
func boptCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
@ -194,16 +194,16 @@ func boptCommand(c *client) error {
if len(srcKeys) == 0 {
return ErrCmdParams
}
if blen, err := c.db.BOperation(op, dstKey, srcKeys...); err != nil {
if blen, err := req.db.BOperation(op, dstKey, srcKeys...); err != nil {
return err
} else {
c.writeInteger(int64(blen))
req.resp.writeInteger(int64(blen))
}
return nil
}
func bexpireCommand(c *client) error {
args := c.args
func bexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -213,17 +213,17 @@ func bexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.BExpire(args[0], duration); err != nil {
if v, err := req.db.BExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func bexpireAtCommand(c *client) error {
args := c.args
func bexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -233,40 +233,40 @@ func bexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.BExpireAt(args[0], when); err != nil {
if v, err := req.db.BExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func bttlCommand(c *client) error {
args := c.args
func bttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.BTTL(args[0]); err != nil {
if v, err := req.db.BTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func bpersistCommand(c *client) error {
args := c.args
func bpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.BPersist(args[0]); err != nil {
if n, err := req.db.BPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -4,87 +4,87 @@ import (
"github.com/siddontang/ledisdb/ledis"
)
func hsetCommand(c *client) error {
args := c.args
func hsetCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil {
if n, err := req.db.HSet(args[0], args[1], args[2]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hgetCommand(c *client) error {
args := c.args
func hgetCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if v, err := c.db.HGet(args[0], args[1]); err != nil {
if v, err := req.db.HGet(args[0], args[1]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func hexistsCommand(c *client) error {
args := c.args
func hexistsCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
var n int64 = 1
if v, err := c.db.HGet(args[0], args[1]); err != nil {
if v, err := req.db.HGet(args[0], args[1]); err != nil {
return err
} else {
if v == nil {
n = 0
}
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hdelCommand(c *client) error {
args := c.args
func hdelCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.HDel(args[0], args[1:]...); err != nil {
if n, err := req.db.HDel(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hlenCommand(c *client) error {
args := c.args
func hlenCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.HLen(args[0]); err != nil {
if n, err := req.db.HLen(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hincrbyCommand(c *client) error {
args := c.args
func hincrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -95,16 +95,16 @@ func hincrbyCommand(c *client) error {
}
var n int64
if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil {
if n, err = req.db.HIncrBy(args[0], args[1], delta); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hmsetCommand(c *client) error {
args := c.args
func hmsetCommand(req *requestContext) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -123,107 +123,107 @@ func hmsetCommand(c *client) error {
kvs[i].Value = args[2*i+1]
}
if err := c.db.HMset(key, kvs...); err != nil {
if err := req.db.HMset(key, kvs...); err != nil {
return err
} else {
c.writeStatus(OK)
req.resp.writeStatus(OK)
}
return nil
}
func hmgetCommand(c *client) error {
args := c.args
func hmgetCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if v, err := c.db.HMget(args[0], args[1:]...); err != nil {
if v, err := req.db.HMget(args[0], args[1:]...); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func hgetallCommand(c *client) error {
args := c.args
func hgetallCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HGetAll(args[0]); err != nil {
if v, err := req.db.HGetAll(args[0]); err != nil {
return err
} else {
c.writeFVPairArray(v)
req.resp.writeFVPairArray(v)
}
return nil
}
func hkeysCommand(c *client) error {
args := c.args
func hkeysCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HKeys(args[0]); err != nil {
if v, err := req.db.HKeys(args[0]); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func hvalsCommand(c *client) error {
args := c.args
func hvalsCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HValues(args[0]); err != nil {
if v, err := req.db.HValues(args[0]); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func hclearCommand(c *client) error {
args := c.args
func hclearCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.HClear(args[0]); err != nil {
if n, err := req.db.HClear(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hmclearCommand(c *client) error {
args := c.args
func hmclearCommand(req *requestContext) error {
args := req.args
if len(args) < 1 {
return ErrCmdParams
}
if n, err := c.db.HMclear(args...); err != nil {
if n, err := req.db.HMclear(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func hexpireCommand(c *client) error {
args := c.args
func hexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -233,17 +233,17 @@ func hexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.HExpire(args[0], duration); err != nil {
if v, err := req.db.HExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func hexpireAtCommand(c *client) error {
args := c.args
func hexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -253,40 +253,40 @@ func hexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.HExpireAt(args[0], when); err != nil {
if v, err := req.db.HExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func httlCommand(c *client) error {
args := c.args
func httlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.HTTL(args[0]); err != nil {
if v, err := req.db.HTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func hpersistCommand(c *client) error {
args := c.args
func hpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.HPersist(args[0]); err != nil {
if n, err := req.db.HPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -4,112 +4,112 @@ import (
"github.com/siddontang/ledisdb/ledis"
)
func getCommand(c *client) error {
args := c.args
func getCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.Get(args[0]); err != nil {
if v, err := req.db.Get(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func setCommand(c *client) error {
args := c.args
func setCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if err := c.db.Set(args[0], args[1]); err != nil {
if err := req.db.Set(args[0], args[1]); err != nil {
return err
} else {
c.writeStatus(OK)
req.resp.writeStatus(OK)
}
return nil
}
func getsetCommand(c *client) error {
args := c.args
func getsetCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if v, err := c.db.GetSet(args[0], args[1]); err != nil {
if v, err := req.db.GetSet(args[0], args[1]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func setnxCommand(c *client) error {
args := c.args
func setnxCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.db.SetNX(args[0], args[1]); err != nil {
if n, err := req.db.SetNX(args[0], args[1]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func existsCommand(c *client) error {
args := c.args
func existsCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Exists(args[0]); err != nil {
if n, err := req.db.Exists(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func incrCommand(c *client) error {
args := c.args
func incrCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Incr(c.args[0]); err != nil {
if n, err := req.db.Incr(req.args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func decrCommand(c *client) error {
args := c.args
func decrCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Decr(c.args[0]); err != nil {
if n, err := req.db.Decr(req.args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func incrbyCommand(c *client) error {
args := c.args
func incrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -119,17 +119,17 @@ func incrbyCommand(c *client) error {
return ErrValue
}
if n, err := c.db.IncrBy(c.args[0], delta); err != nil {
if n, err := req.db.IncrBy(req.args[0], delta); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func decrbyCommand(c *client) error {
args := c.args
func decrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -139,32 +139,32 @@ func decrbyCommand(c *client) error {
return ErrValue
}
if n, err := c.db.DecrBy(c.args[0], delta); err != nil {
if n, err := req.db.DecrBy(req.args[0], delta); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func delCommand(c *client) error {
args := c.args
func delCommand(req *requestContext) error {
args := req.args
if len(args) == 0 {
return ErrCmdParams
}
if n, err := c.db.Del(args...); err != nil {
if n, err := req.db.Del(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func msetCommand(c *client) error {
args := c.args
func msetCommand(req *requestContext) error {
args := req.args
if len(args) == 0 || len(args)%2 != 0 {
return ErrCmdParams
}
@ -175,36 +175,36 @@ func msetCommand(c *client) error {
kvs[i].Value = args[2*i+1]
}
if err := c.db.MSet(kvs...); err != nil {
if err := req.db.MSet(kvs...); err != nil {
return err
} else {
c.writeStatus(OK)
req.resp.writeStatus(OK)
}
return nil
}
// func setexCommand(c *client) error {
// func setexCommand(req *requestContext) error {
// return nil
// }
func mgetCommand(c *client) error {
args := c.args
func mgetCommand(req *requestContext) error {
args := req.args
if len(args) == 0 {
return ErrCmdParams
}
if v, err := c.db.MGet(args...); err != nil {
if v, err := req.db.MGet(args...); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func expireCommand(c *client) error {
args := c.args
func expireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -214,17 +214,17 @@ func expireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.Expire(args[0], duration); err != nil {
if v, err := req.db.Expire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func expireAtCommand(c *client) error {
args := c.args
func expireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -234,40 +234,40 @@ func expireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ExpireAt(args[0], when); err != nil {
if v, err := req.db.ExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func ttlCommand(c *client) error {
args := c.args
func ttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.TTL(args[0]); err != nil {
if v, err := req.db.TTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func persistCommand(c *client) error {
args := c.args
func persistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.Persist(args[0]); err != nil {
if n, err := req.db.Persist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -4,83 +4,83 @@ import (
"github.com/siddontang/ledisdb/ledis"
)
func lpushCommand(c *client) error {
args := c.args
func lpushCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.LPush(args[0], args[1:]...); err != nil {
if n, err := req.db.LPush(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func rpushCommand(c *client) error {
args := c.args
func rpushCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.RPush(args[0], args[1:]...); err != nil {
if n, err := req.db.RPush(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lpopCommand(c *client) error {
args := c.args
func lpopCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.LPop(args[0]); err != nil {
if v, err := req.db.LPop(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func rpopCommand(c *client) error {
args := c.args
func rpopCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.RPop(args[0]); err != nil {
if v, err := req.db.RPop(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func llenCommand(c *client) error {
args := c.args
func llenCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.LLen(args[0]); err != nil {
if n, err := req.db.LLen(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lindexCommand(c *client) error {
args := c.args
func lindexCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -90,17 +90,17 @@ func lindexCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LIndex(args[0], int32(index)); err != nil {
if v, err := req.db.LIndex(args[0], int32(index)); err != nil {
return err
} else {
c.writeBulk(v)
req.resp.writeBulk(v)
}
return nil
}
func lrangeCommand(c *client) error {
args := c.args
func lrangeCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -119,47 +119,47 @@ func lrangeCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil {
if v, err := req.db.LRange(args[0], int32(start), int32(stop)); err != nil {
return err
} else {
c.writeSliceArray(v)
req.resp.writeSliceArray(v)
}
return nil
}
func lclearCommand(c *client) error {
args := c.args
func lclearCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.LClear(args[0]); err != nil {
if n, err := req.db.LClear(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lmclearCommand(c *client) error {
args := c.args
func lmclearCommand(req *requestContext) error {
args := req.args
if len(args) < 1 {
return ErrCmdParams
}
if n, err := c.db.LMclear(args...); err != nil {
if n, err := req.db.LMclear(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func lexpireCommand(c *client) error {
args := c.args
func lexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -169,17 +169,17 @@ func lexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LExpire(args[0], duration); err != nil {
if v, err := req.db.LExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func lexpireAtCommand(c *client) error {
args := c.args
func lexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -189,40 +189,40 @@ func lexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.LExpireAt(args[0], when); err != nil {
if v, err := req.db.LExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func lttlCommand(c *client) error {
args := c.args
func lttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.LTTL(args[0]); err != nil {
if v, err := req.db.LTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func lpersistCommand(c *client) error {
args := c.args
func lpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.LPersist(args[0]); err != nil {
if n, err := req.db.LPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -11,8 +11,8 @@ import (
"strings"
)
func slaveofCommand(c *client) error {
args := c.args
func slaveofCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
@ -31,23 +31,23 @@ func slaveofCommand(c *client) error {
masterAddr = fmt.Sprintf("%s:%s", args[0], args[1])
}
if err := c.app.slaveof(masterAddr); err != nil {
if err := req.app.slaveof(masterAddr); err != nil {
return err
}
c.writeStatus(OK)
req.resp.writeStatus(OK)
return nil
}
func fullsyncCommand(c *client) error {
func fullsyncCommand(req *requestContext) error {
//todo, multi fullsync may use same dump file
dumpFile, err := ioutil.TempFile(c.app.cfg.DataDir, "dump_")
dumpFile, err := ioutil.TempFile(req.app.cfg.DataDir, "dump_")
if err != nil {
return err
}
if err = c.app.ldb.Dump(dumpFile); err != nil {
if err = req.app.ldb.Dump(dumpFile); err != nil {
return err
}
@ -56,7 +56,7 @@ func fullsyncCommand(c *client) error {
dumpFile.Seek(0, os.SEEK_SET)
c.writeBulkFrom(n, dumpFile)
req.resp.writeBulkFrom(n, dumpFile)
name := dumpFile.Name()
dumpFile.Close()
@ -68,8 +68,8 @@ func fullsyncCommand(c *client) error {
var reserveInfoSpace = make([]byte, 16)
func syncCommand(c *client) error {
args := c.args
func syncCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -87,32 +87,32 @@ func syncCommand(c *client) error {
return ErrCmdParams
}
c.syncBuf.Reset()
req.syncBuf.Reset()
//reserve space to write master info
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
if _, err := req.syncBuf.Write(reserveInfoSpace); err != nil {
return err
}
m := &ledis.MasterInfo{logIndex, logPos}
if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil {
if _, err := req.app.ldb.ReadEventsTo(m, &req.syncBuf); err != nil {
return err
} else {
buf := c.syncBuf.Bytes()
buf := req.syncBuf.Bytes()
binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex))
binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos))
if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
if len(req.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
req.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
}
if buf, err = snappy.Encode(c.compressBuf, buf); err != nil {
if buf, err = snappy.Encode(req.compressBuf, buf); err != nil {
return err
}
c.writeBulk(buf)
req.resp.writeBulk(buf)
}
return nil

View File

@ -12,8 +12,8 @@ import (
var errScoreOverflow = errors.New("zset score overflow")
func zaddCommand(c *client) error {
args := c.args
func zaddCommand(req *requestContext) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -36,66 +36,66 @@ func zaddCommand(c *client) error {
params[i].Member = args[2*i+1]
}
if n, err := c.db.ZAdd(key, params...); err != nil {
if n, err := req.db.ZAdd(key, params...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zcardCommand(c *client) error {
args := c.args
func zcardCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.ZCard(args[0]); err != nil {
if n, err := req.db.ZCard(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zscoreCommand(c *client) error {
args := c.args
func zscoreCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if s, err := c.db.ZScore(args[0], args[1]); err != nil {
if s, err := req.db.ZScore(args[0], args[1]); err != nil {
if err == ledis.ErrScoreMiss {
c.writeBulk(nil)
req.resp.writeBulk(nil)
} else {
return err
}
} else {
c.writeBulk(ledis.StrPutInt64(s))
req.resp.writeBulk(ledis.StrPutInt64(s))
}
return nil
}
func zremCommand(c *client) error {
args := c.args
func zremCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
if n, err := c.db.ZRem(args[0], args[1:]...); err != nil {
if n, err := req.db.ZRem(args[0], args[1:]...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zincrbyCommand(c *client) error {
args := c.args
func zincrbyCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -107,10 +107,10 @@ func zincrbyCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil {
if v, err := req.db.ZIncrBy(key, delta, args[2]); err != nil {
return err
} else {
c.writeBulk(ledis.StrPutInt64(v))
req.resp.writeBulk(ledis.StrPutInt64(v))
}
return nil
@ -157,6 +157,10 @@ func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err e
err = ErrCmdParams
return
}
if maxBuf[0] == '(' {
ropen = true
maxBuf = maxBuf[1:]
}
if maxBuf[0] == '(' {
ropen = true
@ -182,8 +186,8 @@ func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err e
return
}
func zcountCommand(c *client) error {
args := c.args
func zcountCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -194,77 +198,77 @@ func zcountCommand(c *client) error {
}
if min > max {
c.writeInteger(0)
req.resp.writeInteger(0)
return nil
}
if n, err := c.db.ZCount(args[0], min, max); err != nil {
if n, err := req.db.ZCount(args[0], min, max); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zrankCommand(c *client) error {
args := c.args
func zrankCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.db.ZRank(args[0], args[1]); err != nil {
if n, err := req.db.ZRank(args[0], args[1]); err != nil {
return err
} else if n == -1 {
c.writeBulk(nil)
req.resp.writeBulk(nil)
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zrevrankCommand(c *client) error {
args := c.args
func zrevrankCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.db.ZRevRank(args[0], args[1]); err != nil {
if n, err := req.db.ZRevRank(args[0], args[1]); err != nil {
return err
} else if n == -1 {
c.writeBulk(nil)
req.resp.writeBulk(nil)
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zremrangebyrankCommand(c *client) error {
args := c.args
func zremrangebyrankCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
key := args[0]
start, stop, err := zparseRange(c, args[1], args[2])
start, stop, err := zparseRange(req, args[1], args[2])
if err != nil {
return ErrValue
}
if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil {
if n, err := req.db.ZRemRangeByRank(key, start, stop); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zremrangebyscoreCommand(c *client) error {
args := c.args
func zremrangebyscoreCommand(req *requestContext) error {
args := req.args
if len(args) != 3 {
return ErrCmdParams
}
@ -275,16 +279,16 @@ func zremrangebyscoreCommand(c *client) error {
return err
}
if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil {
if n, err := req.db.ZRemRangeByScore(key, min, max); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err error) {
func zparseRange(req *requestContext, a1 []byte, a2 []byte) (start int, stop int, err error) {
if start, err = strconv.Atoi(ledis.String(a1)); err != nil {
return
}
@ -296,15 +300,15 @@ func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err erro
return
}
func zrangeGeneric(c *client, reverse bool) error {
args := c.args
func zrangeGeneric(req *requestContext, reverse bool) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
key := args[0]
start, stop, err := zparseRange(c, args[1], args[2])
start, stop, err := zparseRange(req, args[1], args[2])
if err != nil {
return ErrValue
}
@ -323,24 +327,24 @@ func zrangeGeneric(c *client, reverse bool) error {
}
}
if datas, err := c.db.ZRangeGeneric(key, start, stop, reverse); err != nil {
if datas, err := req.db.ZRangeGeneric(key, start, stop, reverse); err != nil {
return err
} else {
c.writeScorePairArray(datas, withScores)
req.resp.writeScorePairArray(datas, withScores)
}
return nil
}
func zrangeCommand(c *client) error {
return zrangeGeneric(c, false)
func zrangeCommand(req *requestContext) error {
return zrangeGeneric(req, false)
}
func zrevrangeCommand(c *client) error {
return zrangeGeneric(c, true)
func zrevrangeCommand(req *requestContext) error {
return zrangeGeneric(req, true)
}
func zrangebyscoreGeneric(c *client, reverse bool) error {
args := c.args
func zrangebyscoreGeneric(req *requestContext, reverse bool) error {
args := req.args
if len(args) < 3 {
return ErrCmdParams
}
@ -396,59 +400,59 @@ func zrangebyscoreGeneric(c *client, reverse bool) error {
if offset < 0 {
//for ledis, if offset < 0, a empty will return
//so here we directly return a empty array
c.writeArray([]interface{}{})
req.resp.writeArray([]interface{}{})
return nil
}
if datas, err := c.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil {
if datas, err := req.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil {
return err
} else {
c.writeScorePairArray(datas, withScores)
req.resp.writeScorePairArray(datas, withScores)
}
return nil
}
func zrangebyscoreCommand(c *client) error {
return zrangebyscoreGeneric(c, false)
func zrangebyscoreCommand(req *requestContext) error {
return zrangebyscoreGeneric(req, false)
}
func zrevrangebyscoreCommand(c *client) error {
return zrangebyscoreGeneric(c, true)
func zrevrangebyscoreCommand(req *requestContext) error {
return zrangebyscoreGeneric(req, true)
}
func zclearCommand(c *client) error {
args := c.args
func zclearCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.ZClear(args[0]); err != nil {
if n, err := req.db.ZClear(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zmclearCommand(c *client) error {
args := c.args
func zmclearCommand(req *requestContext) error {
args := req.args
if len(args) < 1 {
return ErrCmdParams
}
if n, err := c.db.ZMclear(args...); err != nil {
if n, err := req.db.ZMclear(args...); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil
}
func zexpireCommand(c *client) error {
args := c.args
func zexpireCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -458,17 +462,17 @@ func zexpireCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ZExpire(args[0], duration); err != nil {
if v, err := req.db.ZExpire(args[0], duration); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func zexpireAtCommand(c *client) error {
args := c.args
func zexpireAtCommand(req *requestContext) error {
args := req.args
if len(args) != 2 {
return ErrCmdParams
}
@ -478,39 +482,39 @@ func zexpireAtCommand(c *client) error {
return ErrValue
}
if v, err := c.db.ZExpireAt(args[0], when); err != nil {
if v, err := req.db.ZExpireAt(args[0], when); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func zttlCommand(c *client) error {
args := c.args
func zttlCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.db.ZTTL(args[0]); err != nil {
if v, err := req.db.ZTTL(args[0]); err != nil {
return err
} else {
c.writeInteger(v)
req.resp.writeInteger(v)
}
return nil
}
func zpersistCommand(c *client) error {
args := c.args
func zpersistCommand(req *requestContext) error {
args := req.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.db.ZPersist(args[0]); err != nil {
if n, err := req.db.ZPersist(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
req.resp.writeInteger(n)
}
return nil

View File

@ -8,7 +8,7 @@ import (
"strings"
)
type CommandFunc func(c *client) error
type CommandFunc func(req *requestContext) error
var regCmds = map[string]CommandFunc{}
@ -20,33 +20,33 @@ func register(name string, f CommandFunc) {
regCmds[name] = f
}
func pingCommand(c *client) error {
c.writeStatus(PONG)
func pingCommand(req *requestContext) error {
req.resp.writeStatus(PONG)
return nil
}
func echoCommand(c *client) error {
if len(c.args) != 1 {
func echoCommand(req *requestContext) error {
if len(req.args) != 1 {
return ErrCmdParams
}
c.writeBulk(c.args[0])
req.resp.writeBulk(req.args[0])
return nil
}
func selectCommand(c *client) error {
if len(c.args) != 1 {
func selectCommand(req *requestContext) error {
if len(req.args) != 1 {
return ErrCmdParams
}
if index, err := strconv.Atoi(ledis.String(c.args[0])); err != nil {
if index, err := strconv.Atoi(ledis.String(req.args[0])); err != nil {
return err
} else {
if db, err := c.ldb.Select(index); err != nil {
if db, err := req.ldb.Select(index); err != nil {
return err
} else {
c.db = db
c.writeStatus(OK)
req.db = db
req.resp.writeStatus(OK)
}
}
return nil

View File

@ -10,6 +10,8 @@ import (
type Config struct {
Addr string `json:"addr"`
HttpAddr string `json:"http_addr"`
DataDir string `json:"data_dir"`
DB struct {

View File

@ -26,4 +26,14 @@
//
// After you send slaveof command, the slave will start to sync master's binlog and replicate from binlog.
//
// HTTP Interface
//
// LedisDB provides http interfaces for most commands(except the replication commands)
//
// curl http://127.0.0.1:11181/SET/hello/world
// → {"SET":[true,"OK"]}
//
// curl http://127.0.0.1:11181/0/GET/hello?type=json
// → {"GET":"world"}
//
package server

View File

@ -72,7 +72,7 @@ func (m *MasterInfo) Load(filePath string) error {
type master struct {
sync.Mutex
c net.Conn
conn net.Conn
rb *bufio.Reader
app *App
@ -114,9 +114,9 @@ func (m *master) Close() {
default:
}
if m.c != nil {
m.c.Close()
m.c = nil
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
m.wg.Wait()
@ -135,17 +135,17 @@ func (m *master) connect() error {
return fmt.Errorf("no assign master addr")
}
if m.c != nil {
m.c.Close()
m.c = nil
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
if c, err := net.Dial("tcp", m.info.Addr); err != nil {
if conn, err := net.Dial("tcp", m.info.Addr); err != nil {
return err
} else {
m.c = c
m.conn = conn
m.rb = bufio.NewReaderSize(m.c, 4096)
m.rb = bufio.NewReaderSize(m.conn, 4096)
}
return nil
}
@ -248,7 +248,7 @@ var (
)
func (m *master) fullSync() error {
if _, err := m.c.Write(fullSyncCmd); err != nil {
if _, err := m.conn.Write(fullSyncCmd); err != nil {
return err
}
@ -291,7 +291,7 @@ func (m *master) sync() error {
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr))
if _, err := m.c.Write(cmd); err != nil {
if _, err := m.conn.Write(cmd); err != nil {
return err
}

116
server/request.go Normal file
View File

@ -0,0 +1,116 @@
package server
import (
"bytes"
"github.com/siddontang/ledisdb/ledis"
"io"
"time"
)
type responseWriter interface {
writeError(error)
writeStatus(string)
writeInteger(int64)
writeBulk([]byte)
writeArray([]interface{})
writeSliceArray([][]byte)
writeFVPairArray([]ledis.FVPair)
writeScorePairArray([]ledis.ScorePair, bool)
writeBulkFrom(int64, io.Reader)
flush()
}
type requestContext struct {
app *App
ldb *ledis.Ledis
db *ledis.DB
remoteAddr string
cmd string
args [][]byte
resp responseWriter
syncBuf bytes.Buffer
compressBuf []byte
reqErr chan error
buf bytes.Buffer
}
func newRequestContext(app *App) *requestContext {
req := new(requestContext)
req.app = app
req.ldb = app.ldb
req.db, _ = app.ldb.Select(0) //use default db
req.compressBuf = make([]byte, 256)
req.reqErr = make(chan error)
return req
}
func (req *requestContext) perform() {
var err error
start := time.Now()
if len(req.cmd) == 0 {
err = ErrEmptyCommand
} else if exeCmd, ok := regCmds[req.cmd]; !ok {
err = ErrNotFound
} else {
go func() {
req.reqErr <- exeCmd(req)
}()
err = <-req.reqErr
}
duration := time.Since(start)
if req.app.access != nil {
fullCmd := req.catGenericCommand()
cost := duration.Nanoseconds() / 1000000
truncateLen := len(fullCmd)
if truncateLen > 256 {
truncateLen = 256
}
req.app.access.Log(req.remoteAddr, cost, fullCmd[:truncateLen], err)
}
if err != nil {
req.resp.writeError(err)
}
req.resp.flush()
return
}
// func (h *requestHandler) catFullCommand(req *requestContext) []byte {
//
// // if strings.HasSuffix(cmd, "expire") {
// // catExpireCommand(c, buffer)
// // } else {
// // catGenericCommand(c, buffer)
// // }
//
// return h.catGenericCommand(req)
// }
func (req *requestContext) catGenericCommand() []byte {
buffer := req.buf
buffer.Reset()
buffer.Write([]byte(req.cmd))
for _, arg := range req.args {
buffer.WriteByte(' ')
buffer.Write(arg)
}
return buffer.Bytes()
}

View File

@ -1,5 +1,3 @@
// +build !windows
package store
import (

View File

@ -1,8 +1,8 @@
package mdb
import (
mdb "github.com/siddontang/gomdb"
"github.com/siddontang/ledisdb/store/driver"
mdb "github.com/szferi/gomdb"
"os"
)
@ -20,7 +20,7 @@ type MDB struct {
func Open(c *Config) (MDB, error) {
path := c.Path
if c.MapSize == 0 {
c.MapSize = 1024 * 1024 * 1024
c.MapSize = 500 * 1024 * 1024
}
env, err := mdb.NewEnv()
@ -90,7 +90,7 @@ func (db MDB) BatchPut(writes []driver.Write) error {
for _, w := range writes {
if w.Value == nil {
itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET)
itr.key, itr.value, itr.err = itr.c.Get(w.Key, nil, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}
@ -125,7 +125,7 @@ func (db MDB) Delete(key []byte) error {
itr := db.iterator(false)
defer itr.Close()
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET)
itr.key, itr.value, itr.err = itr.c.Get(key, nil, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}
@ -161,31 +161,31 @@ func (itr *MDBIterator) Error() error {
}
func (itr *MDBIterator) getCurrent() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.GET_CURRENT)
itr.key, itr.value, itr.err = itr.c.Get(nil, nil, mdb.GET_CURRENT)
itr.setState()
}
func (itr *MDBIterator) Seek(key []byte) {
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET_RANGE)
itr.key, itr.value, itr.err = itr.c.Get(key, nil, mdb.SET_RANGE)
itr.setState()
}
func (itr *MDBIterator) Next() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.NEXT)
itr.key, itr.value, itr.err = itr.c.Get(nil, nil, mdb.NEXT)
itr.setState()
}
func (itr *MDBIterator) Prev() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.PREV)
itr.key, itr.value, itr.err = itr.c.Get(nil, nil, mdb.PREV)
itr.setState()
}
func (itr *MDBIterator) First() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.FIRST)
itr.key, itr.value, itr.err = itr.c.Get(nil, nil, mdb.FIRST)
itr.setState()
}
func (itr *MDBIterator) Last() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.LAST)
itr.key, itr.value, itr.err = itr.c.Get(nil, nil, mdb.LAST)
itr.setState()
}

View File

@ -1,8 +1,8 @@
package mdb
import (
mdb "github.com/siddontang/gomdb"
"github.com/siddontang/ledisdb/store/driver"
mdb "github.com/szferi/gomdb"
)
type Tx struct {
@ -53,7 +53,7 @@ func (t *Tx) BatchPut(writes []driver.Write) error {
for _, w := range writes {
if w.Value == nil {
itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET)
itr.key, itr.value, itr.err = itr.c.Get(w.Key, nil, mdb.SET)
if itr.err == nil {
itr.err = itr.c.Del(0)
}

View File

@ -1,5 +1,3 @@
// +build !windows
package store
import (