ledisdb/client/nodejs/ledis/lib/parser/javascript.js

302 lines
7.9 KiB
JavaScript
Raw Normal View History

2014-08-05 06:52:39 +04:00
var events = require("events"),
util = require("../util");
function Packet(type, size) {
this.type = type;
this.size = +size;
}
exports.name = "javascript";
exports.debug_mode = false;
function ReplyParser(options) {
this.name = exports.name;
this.options = options || { };
this._buffer = null;
this._offset = 0;
this._encoding = "utf-8";
this._debug_mode = options.debug_mode;
this._reply_type = null;
}
util.inherits(ReplyParser, events.EventEmitter);
exports.Parser = ReplyParser;
function IncompleteReadBuffer(message) {
this.name = "IncompleteReadBuffer";
this.message = message;
}
util.inherits(IncompleteReadBuffer, Error);
// Buffer.toString() is quite slow for small strings
function small_toString(buf, start, end) {
var tmp = "", i;
for (i = start; i < end; i++) {
tmp += String.fromCharCode(buf[i]);
}
return tmp;
}
ReplyParser.prototype._parseResult = function (type) {
var start, end, offset, packetHeader;
if (type === 43 || type === 45) { // + or -
// up to the delimiter
end = this._packetEndOffset() - 1;
start = this._offset;
// include the delimiter
this._offset = end + 2;
if (end > this._buffer.length) {
this._offset = start;
throw new IncompleteReadBuffer("Wait for more data.");
}
if (this.options.return_buffers) {
return this._buffer.slice(start, end);
} else {
if (end - start < 65536) { // completely arbitrary
return small_toString(this._buffer, start, end);
} else {
return this._buffer.toString(this._encoding, start, end);
}
}
} else if (type === 58) { // :
// up to the delimiter
end = this._packetEndOffset() - 1;
start = this._offset;
// include the delimiter
this._offset = end + 2;
if (end > this._buffer.length) {
this._offset = start;
throw new IncompleteReadBuffer("Wait for more data.");
}
if (this.options.return_buffers) {
return this._buffer.slice(start, end);
}
// return the coerced numeric value
return +small_toString(this._buffer, start, end);
} else if (type === 36) { // $
// set a rewind point, as the packet could be larger than the
// buffer in memory
offset = this._offset - 1;
packetHeader = new Packet(type, this.parseHeader());
// packets with a size of -1 are considered null
if (packetHeader.size === -1) {
return undefined;
}
end = this._offset + packetHeader.size;
start = this._offset;
// set the offset to after the delimiter
this._offset = end + 2;
if (end > this._buffer.length) {
this._offset = offset;
throw new IncompleteReadBuffer("Wait for more data.");
}
if (this.options.return_buffers) {
return this._buffer.slice(start, end);
} else {
return this._buffer.toString(this._encoding, start, end);
}
} else if (type === 42) { // *
offset = this._offset;
packetHeader = new Packet(type, this.parseHeader());
if (packetHeader.size < 0) {
return null;
}
if (packetHeader.size > this._bytesRemaining()) {
this._offset = offset - 1;
throw new IncompleteReadBuffer("Wait for more data.");
}
var reply = [ ];
var ntype, i, res;
offset = this._offset - 1;
for (i = 0; i < packetHeader.size; i++) {
ntype = this._buffer[this._offset++];
if (this._offset > this._buffer.length) {
throw new IncompleteReadBuffer("Wait for more data.");
}
res = this._parseResult(ntype);
if (res === undefined) {
res = null;
}
reply.push(res);
}
return reply;
}
};
ReplyParser.prototype.execute = function (buffer) {
this.append(buffer);
var type, ret, offset;
while (true) {
offset = this._offset;
try {
// at least 4 bytes: :1\r\n
if (this._bytesRemaining() < 4) {
break;
}
type = this._buffer[this._offset++];
if (type === 43) { // +
ret = this._parseResult(type);
if (ret === null) {
break;
}
this.send_reply(ret);
} else if (type === 45) { // -
ret = this._parseResult(type);
if (ret === null) {
break;
}
this.send_error(ret);
} else if (type === 58) { // :
ret = this._parseResult(type);
if (ret === null) {
break;
}
this.send_reply(ret);
} else if (type === 36) { // $
ret = this._parseResult(type);
if (ret === null) {
break;
}
// check the state for what is the result of
// a -1, set it back up for a null reply
if (ret === undefined) {
ret = null;
}
this.send_reply(ret);
} else if (type === 42) { // *
// set a rewind point. if a failure occurs,
// wait for the next execute()/append() and try again
offset = this._offset - 1;
ret = this._parseResult(type);
this.send_reply(ret);
}
} catch (err) {
// catch the error (not enough data), rewind, and wait
// for the next packet to appear
if (! (err instanceof IncompleteReadBuffer)) {
throw err;
}
this._offset = offset;
break;
}
}
};
ReplyParser.prototype.append = function (newBuffer) {
if (!newBuffer) {
return;
}
// first run
if (this._buffer === null) {
this._buffer = newBuffer;
return;
}
// out of data
if (this._offset >= this._buffer.length) {
this._buffer = newBuffer;
this._offset = 0;
return;
}
// very large packet
// check for concat, if we have it, use it
if (Buffer.concat !== undefined) {
this._buffer = Buffer.concat([this._buffer.slice(this._offset), newBuffer]);
} else {
var remaining = this._bytesRemaining(),
newLength = remaining + newBuffer.length,
tmpBuffer = new Buffer(newLength);
this._buffer.copy(tmpBuffer, 0, this._offset);
newBuffer.copy(tmpBuffer, remaining, 0);
this._buffer = tmpBuffer;
}
this._offset = 0;
};
ReplyParser.prototype.parseHeader = function () {
var end = this._packetEndOffset(),
value = small_toString(this._buffer, this._offset, end - 1);
this._offset = end + 1;
return value;
};
ReplyParser.prototype._packetEndOffset = function () {
var offset = this._offset;
while (this._buffer[offset] !== 0x0d && this._buffer[offset + 1] !== 0x0a) {
offset++;
if (offset >= this._buffer.length) {
throw new IncompleteReadBuffer("didn't see LF after NL reading multi bulk count (" + offset + " => " + this._buffer.length + ", " + this._offset + ")");
}
}
offset++;
return offset;
};
ReplyParser.prototype._bytesRemaining = function () {
return (this._buffer.length - this._offset) < 0 ? 0 : (this._buffer.length - this._offset);
};
ReplyParser.prototype.parser_error = function (message) {
this.emit("error", message);
};
ReplyParser.prototype.send_error = function (reply) {
this.emit("reply error", reply);
};
ReplyParser.prototype.send_reply = function (reply) {
this.emit("reply", reply);
};