forked from mirror/ledisdb
use goredis to parse RESP request
This commit is contained in:
parent
c412943bf8
commit
6b7ab2b02d
|
@ -60,7 +60,7 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/goredis",
|
||||
"Rev": "ca4c5d7500bcc6850c52824caf2c49a6015c8a03"
|
||||
"Rev": "802ef3bdd5f642335f9ed132e024e5e2fd3d03ce"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/rdb",
|
||||
|
|
|
@ -21,6 +21,11 @@ func (c *PoolConn) Close() {
|
|||
c.c.put(c.Conn)
|
||||
}
|
||||
|
||||
// force close inner connection and not put it into pool
|
||||
func (c *PoolConn) Finalize() {
|
||||
c.Conn.Close()
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
sync.Mutex
|
||||
|
||||
|
|
|
@ -2,21 +2,12 @@ package goredis
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Error represents an error returned in a command reply.
|
||||
type Error string
|
||||
|
||||
func (err Error) Error() string { return string(err) }
|
||||
|
||||
type sizeWriter int64
|
||||
|
||||
func (s *sizeWriter) Write(p []byte) (int, error) {
|
||||
|
@ -29,12 +20,8 @@ type Conn struct {
|
|||
br *bufio.Reader
|
||||
bw *bufio.Writer
|
||||
|
||||
// Scratch space for formatting argument length.
|
||||
// '*' or '$', length, "\r\n"
|
||||
lenScratch [32]byte
|
||||
|
||||
// Scratch space for formatting integers and floats.
|
||||
numScratch [40]byte
|
||||
respReader *RespReader
|
||||
respWriter *RespWriter
|
||||
|
||||
totalReadSize sizeWriter
|
||||
totalWriteSize sizeWriter
|
||||
|
@ -58,6 +45,9 @@ func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) {
|
|||
c.br = bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize)
|
||||
c.bw = bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize)
|
||||
|
||||
c.respReader = NewRespReader(c.br)
|
||||
c.respWriter = NewRespWriter(c.bw)
|
||||
|
||||
atomic.StoreInt32(&c.closed, 0)
|
||||
|
||||
return c, nil
|
||||
|
@ -102,20 +92,16 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
|||
}
|
||||
|
||||
func (c *Conn) Send(cmd string, args ...interface{}) error {
|
||||
if err := c.writeCommand(cmd, args); err != nil {
|
||||
if err := c.respWriter.WriteCommand(cmd, args...); err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.bw.Flush(); err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) Receive() (interface{}, error) {
|
||||
if reply, err := c.readReply(); err != nil {
|
||||
if reply, err := c.respReader.Parse(); err != nil {
|
||||
c.Close()
|
||||
return nil, err
|
||||
} else {
|
||||
|
@ -128,7 +114,7 @@ func (c *Conn) Receive() (interface{}, error) {
|
|||
}
|
||||
|
||||
func (c *Conn) ReceiveBulkTo(w io.Writer) error {
|
||||
err := c.readBulkReplyTo(w)
|
||||
err := c.respReader.ParseBulkTo(w)
|
||||
if err != nil {
|
||||
if _, ok := err.(Error); !ok {
|
||||
c.Close()
|
||||
|
@ -137,245 +123,6 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) writeLen(prefix byte, n int) error {
|
||||
c.lenScratch[len(c.lenScratch)-1] = '\n'
|
||||
c.lenScratch[len(c.lenScratch)-2] = '\r'
|
||||
i := len(c.lenScratch) - 3
|
||||
for {
|
||||
c.lenScratch[i] = byte('0' + n%10)
|
||||
i -= 1
|
||||
n = n / 10
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
c.lenScratch[i] = prefix
|
||||
_, err := c.bw.Write(c.lenScratch[i:])
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) writeString(s string) error {
|
||||
c.writeLen('$', len(s))
|
||||
c.bw.WriteString(s)
|
||||
_, err := c.bw.WriteString("\r\n")
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) writeBytes(p []byte) error {
|
||||
c.writeLen('$', len(p))
|
||||
c.bw.Write(p)
|
||||
_, err := c.bw.WriteString("\r\n")
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) writeInt64(n int64) error {
|
||||
return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10))
|
||||
}
|
||||
|
||||
func (c *Conn) writeFloat64(n float64) error {
|
||||
return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64))
|
||||
}
|
||||
|
||||
func (c *Conn) writeCommand(cmd string, args []interface{}) (err error) {
|
||||
c.writeLen('*', 1+len(args))
|
||||
err = c.writeString(cmd)
|
||||
for _, arg := range args {
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
switch arg := arg.(type) {
|
||||
case string:
|
||||
err = c.writeString(arg)
|
||||
case []byte:
|
||||
err = c.writeBytes(arg)
|
||||
case int:
|
||||
err = c.writeInt64(int64(arg))
|
||||
case int64:
|
||||
err = c.writeInt64(arg)
|
||||
case float64:
|
||||
err = c.writeFloat64(arg)
|
||||
case bool:
|
||||
if arg {
|
||||
err = c.writeString("1")
|
||||
} else {
|
||||
err = c.writeString("0")
|
||||
}
|
||||
case nil:
|
||||
err = c.writeString("")
|
||||
default:
|
||||
var buf bytes.Buffer
|
||||
fmt.Fprint(&buf, arg)
|
||||
err = c.writeBytes(buf.Bytes())
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) readLine() ([]byte, error) {
|
||||
p, err := c.br.ReadSlice('\n')
|
||||
if err == bufio.ErrBufferFull {
|
||||
return nil, errors.New("long response line")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i := len(p) - 2
|
||||
if i < 0 || p[i] != '\r' {
|
||||
return nil, errors.New("bad response line terminator")
|
||||
}
|
||||
return p[:i], nil
|
||||
}
|
||||
|
||||
// parseLen parses bulk string and array lengths.
|
||||
func parseLen(p []byte) (int, error) {
|
||||
if len(p) == 0 {
|
||||
return -1, errors.New("malformed length")
|
||||
}
|
||||
|
||||
if p[0] == '-' && len(p) == 2 && p[1] == '1' {
|
||||
// handle $-1 and $-1 null replies.
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
var n int
|
||||
for _, b := range p {
|
||||
n *= 10
|
||||
if b < '0' || b > '9' {
|
||||
return -1, errors.New("illegal bytes in length")
|
||||
}
|
||||
n += int(b - '0')
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// parseInt parses an integer reply.
|
||||
func parseInt(p []byte) (interface{}, error) {
|
||||
if len(p) == 0 {
|
||||
return 0, errors.New("malformed integer")
|
||||
}
|
||||
|
||||
var negate bool
|
||||
if p[0] == '-' {
|
||||
negate = true
|
||||
p = p[1:]
|
||||
if len(p) == 0 {
|
||||
return 0, errors.New("malformed integer")
|
||||
}
|
||||
}
|
||||
|
||||
var n int64
|
||||
for _, b := range p {
|
||||
n *= 10
|
||||
if b < '0' || b > '9' {
|
||||
return 0, errors.New("illegal bytes in length")
|
||||
}
|
||||
n += int64(b - '0')
|
||||
}
|
||||
|
||||
if negate {
|
||||
n = -n
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
var (
|
||||
okReply interface{} = "OK"
|
||||
pongReply interface{} = "PONG"
|
||||
)
|
||||
|
||||
func (c *Conn) readBulkReplyTo(w io.Writer) error {
|
||||
line, err := c.readLine()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(line) == 0 {
|
||||
return errors.New("ledis: short response line")
|
||||
}
|
||||
switch line[0] {
|
||||
case '-':
|
||||
return Error(string(line[1:]))
|
||||
case '$':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var nn int64
|
||||
if nn, err = io.CopyN(w, c.br, int64(n)); err != nil {
|
||||
return err
|
||||
} else if nn != int64(n) {
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
|
||||
if line, err := c.readLine(); err != nil {
|
||||
return err
|
||||
} else if len(line) != 0 {
|
||||
return errors.New("bad bulk string format")
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("not invalid bulk string type, but %c", line[0])
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) readReply() (interface{}, error) {
|
||||
line, err := c.readLine()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(line) == 0 {
|
||||
return nil, errors.New("short response line")
|
||||
}
|
||||
switch line[0] {
|
||||
case '+':
|
||||
switch {
|
||||
case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
|
||||
// Avoid allocation for frequent "+OK" response.
|
||||
return okReply, nil
|
||||
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
|
||||
// Avoid allocation in PING command benchmarks :)
|
||||
return pongReply, nil
|
||||
default:
|
||||
return string(line[1:]), nil
|
||||
}
|
||||
case '-':
|
||||
return Error(string(line[1:])), nil
|
||||
case ':':
|
||||
return parseInt(line[1:])
|
||||
case '$':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := make([]byte, n)
|
||||
_, err = io.ReadFull(c.br, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if line, err := c.readLine(); err != nil {
|
||||
return nil, err
|
||||
} else if len(line) != 0 {
|
||||
return nil, errors.New("bad bulk string format")
|
||||
}
|
||||
return p, nil
|
||||
case '*':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := make([]interface{}, n)
|
||||
for i := range r {
|
||||
r[i], err = c.readReply()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
return nil, errors.New("unexpected response line")
|
||||
}
|
||||
|
||||
func (c *Client) newConn(addr string, pass string) (*Conn, error) {
|
||||
co, err := ConnectWithSize(addr, c.readBufferSize, c.writeBufferSize)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,456 @@
|
|||
package goredis
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type Error string
|
||||
|
||||
func (err Error) Error() string { return string(err) }
|
||||
|
||||
var (
|
||||
okReply interface{} = "OK"
|
||||
pongReply interface{} = "PONG"
|
||||
)
|
||||
|
||||
type RespReader struct {
|
||||
br *bufio.Reader
|
||||
}
|
||||
|
||||
func NewRespReader(br *bufio.Reader) *RespReader {
|
||||
r := &RespReader{br}
|
||||
return r
|
||||
}
|
||||
|
||||
// Parse RESP
|
||||
func (resp *RespReader) Parse() (interface{}, error) {
|
||||
line, err := readLine(resp.br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(line) == 0 {
|
||||
return nil, errors.New("short resp line")
|
||||
}
|
||||
switch line[0] {
|
||||
case '+':
|
||||
switch {
|
||||
case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
|
||||
// Avoid allocation for frequent "+OK" response.
|
||||
return okReply, nil
|
||||
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
|
||||
// Avoid allocation in PING command benchmarks :)
|
||||
return pongReply, nil
|
||||
default:
|
||||
return string(line[1:]), nil
|
||||
}
|
||||
case '-':
|
||||
return Error(string(line[1:])), nil
|
||||
case ':':
|
||||
n, err := parseInt(line[1:])
|
||||
return n, err
|
||||
case '$':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := make([]byte, n)
|
||||
_, err = io.ReadFull(resp.br, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if line, err := readLine(resp.br); err != nil {
|
||||
return nil, err
|
||||
} else if len(line) != 0 {
|
||||
return nil, errors.New("bad bulk string format")
|
||||
}
|
||||
return p, nil
|
||||
case '*':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := make([]interface{}, n)
|
||||
for i := range r {
|
||||
r[i], err = resp.Parse()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
return nil, errors.New("unexpected response line")
|
||||
}
|
||||
|
||||
// Parse client -> server command request, must be array of bulk strings
|
||||
func (resp *RespReader) ParseRequest() ([][]byte, error) {
|
||||
line, err := readLine(resp.br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(line) == 0 {
|
||||
return nil, errors.New("short resp line")
|
||||
}
|
||||
switch line[0] {
|
||||
case '*':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := make([][]byte, n)
|
||||
for i := range r {
|
||||
r[i], err = parseBulk(resp.br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("not invalid array of bulk string type, but %c", line[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Parse bulk string and write it with writer w
|
||||
func (resp *RespReader) ParseBulkTo(w io.Writer) error {
|
||||
line, err := readLine(resp.br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(line) == 0 {
|
||||
return errors.New("ledis: short response line")
|
||||
}
|
||||
|
||||
switch line[0] {
|
||||
case '-':
|
||||
return Error(string(line[1:]))
|
||||
case '$':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var nn int64
|
||||
if nn, err = io.CopyN(w, resp.br, int64(n)); err != nil {
|
||||
return err
|
||||
} else if nn != int64(n) {
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
|
||||
if line, err := readLine(resp.br); err != nil {
|
||||
return err
|
||||
} else if len(line) != 0 {
|
||||
return errors.New("bad bulk string format")
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("not invalid bulk string type, but %c", line[0])
|
||||
}
|
||||
}
|
||||
|
||||
func readLine(br *bufio.Reader) ([]byte, error) {
|
||||
p, err := br.ReadSlice('\n')
|
||||
if err == bufio.ErrBufferFull {
|
||||
return nil, errors.New("long resp line")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i := len(p) - 2
|
||||
if i < 0 || p[i] != '\r' {
|
||||
return nil, errors.New("bad resp line terminator")
|
||||
}
|
||||
return p[:i], nil
|
||||
}
|
||||
|
||||
// parseLen parses bulk string and array lengths.
|
||||
func parseLen(p []byte) (int, error) {
|
||||
if len(p) == 0 {
|
||||
return -1, errors.New("malformed length")
|
||||
}
|
||||
|
||||
if p[0] == '-' && len(p) == 2 && p[1] == '1' {
|
||||
// handle $-1 and $-1 null replies.
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
var n int
|
||||
for _, b := range p {
|
||||
n *= 10
|
||||
if b < '0' || b > '9' {
|
||||
return -1, errors.New("illegal bytes in length")
|
||||
}
|
||||
n += int(b - '0')
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// parseInt parses an integer reply.
|
||||
func parseInt(p []byte) (int64, error) {
|
||||
if len(p) == 0 {
|
||||
return 0, errors.New("malformed integer")
|
||||
}
|
||||
|
||||
var negate bool
|
||||
if p[0] == '-' {
|
||||
negate = true
|
||||
p = p[1:]
|
||||
if len(p) == 0 {
|
||||
return 0, errors.New("malformed integer")
|
||||
}
|
||||
}
|
||||
|
||||
var n int64
|
||||
for _, b := range p {
|
||||
n *= 10
|
||||
if b < '0' || b > '9' {
|
||||
return 0, errors.New("illegal bytes in length")
|
||||
}
|
||||
n += int64(b - '0')
|
||||
}
|
||||
|
||||
if negate {
|
||||
n = -n
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func parseBulk(br *bufio.Reader) ([]byte, error) {
|
||||
line, err := readLine(br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(line) == 0 {
|
||||
return nil, errors.New("short resp line")
|
||||
}
|
||||
switch line[0] {
|
||||
case '$':
|
||||
n, err := parseLen(line[1:])
|
||||
if n < 0 || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := make([]byte, n)
|
||||
_, err = io.ReadFull(br, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if line, err := readLine(br); err != nil {
|
||||
return nil, err
|
||||
} else if len(line) != 0 {
|
||||
return nil, errors.New("bad bulk string format")
|
||||
}
|
||||
return p, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("not invalid bulk string type, but %c", line[0])
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
intBuffer [][]byte
|
||||
respTerm = []byte("\r\n")
|
||||
nullBulk = []byte("-1")
|
||||
nullArray = []byte("-1")
|
||||
)
|
||||
|
||||
func init() {
|
||||
cnt := 10000
|
||||
intBuffer = make([][]byte, cnt)
|
||||
for i := 0; i < cnt; i++ {
|
||||
intBuffer[i] = []byte(strconv.Itoa(i))
|
||||
}
|
||||
}
|
||||
|
||||
type RespWriter struct {
|
||||
bw *bufio.Writer
|
||||
// Scratch space for formatting integers and floats.
|
||||
numScratch [40]byte
|
||||
}
|
||||
|
||||
func NewRespWriter(bw *bufio.Writer) *RespWriter {
|
||||
r := &RespWriter{bw: bw}
|
||||
return r
|
||||
}
|
||||
|
||||
func (resp *RespWriter) Flush() error {
|
||||
return resp.bw.Flush()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) writeTerm() error {
|
||||
_, err := resp.bw.Write(respTerm)
|
||||
return err
|
||||
}
|
||||
|
||||
func (resp *RespWriter) writeInteger(n int64) error {
|
||||
var err error
|
||||
if n < int64(len(intBuffer)) {
|
||||
_, err = resp.bw.Write(intBuffer[n])
|
||||
} else {
|
||||
_, err = resp.bw.Write(strconv.AppendInt(nil, n, 10))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (resp *RespWriter) WriteInteger(n int64) error {
|
||||
resp.bw.WriteByte(':')
|
||||
|
||||
resp.writeInteger(n)
|
||||
|
||||
return resp.writeTerm()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) FlushInteger(n int64) error {
|
||||
resp.WriteInteger(n)
|
||||
return resp.Flush()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) WriteString(s string) error {
|
||||
resp.bw.WriteByte('+')
|
||||
resp.bw.WriteString(s)
|
||||
return resp.writeTerm()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) FlushString(s string) error {
|
||||
resp.WriteString(s)
|
||||
return resp.Flush()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) WriteError(e error) error {
|
||||
resp.bw.WriteByte('-')
|
||||
|
||||
if e != nil {
|
||||
resp.bw.WriteString(e.Error())
|
||||
} else {
|
||||
resp.bw.WriteString("error is nil, invalid")
|
||||
}
|
||||
|
||||
return resp.writeTerm()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) FlushError(e error) error {
|
||||
resp.WriteError(e)
|
||||
return resp.Flush()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) WriteBulk(b []byte) error {
|
||||
resp.bw.WriteByte('$')
|
||||
if b == nil {
|
||||
resp.bw.Write(nullBulk)
|
||||
} else {
|
||||
resp.writeInteger(int64(len(b)))
|
||||
resp.writeTerm()
|
||||
resp.bw.Write(b)
|
||||
}
|
||||
return resp.writeTerm()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) FlushBulk(b []byte) error {
|
||||
resp.WriteBulk(b)
|
||||
return resp.Flush()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) WriteArray(ay []interface{}) error {
|
||||
resp.bw.WriteByte('*')
|
||||
if ay == nil {
|
||||
resp.bw.Write(nullArray)
|
||||
return resp.writeTerm()
|
||||
} else {
|
||||
resp.writeInteger(int64(len(ay)))
|
||||
resp.writeTerm()
|
||||
|
||||
var err error
|
||||
for i := 0; i < len(ay); i++ {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch v := ay[i].(type) {
|
||||
case []interface{}:
|
||||
err = resp.WriteArray(v)
|
||||
case []byte:
|
||||
err = resp.WriteBulk(v)
|
||||
case nil:
|
||||
err = resp.WriteBulk(nil)
|
||||
case int64:
|
||||
err = resp.WriteInteger(v)
|
||||
case string:
|
||||
err = resp.WriteString(v)
|
||||
case error:
|
||||
err = resp.WriteError(v)
|
||||
default:
|
||||
err = fmt.Errorf("invalid array type %T %v", ay[i], v)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (resp *RespWriter) FlushArray(ay []interface{}) error {
|
||||
resp.WriteArray(ay)
|
||||
return resp.Flush()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) writeBulkString(s string) error {
|
||||
resp.bw.WriteByte('$')
|
||||
resp.writeInteger(int64(len(s)))
|
||||
resp.writeTerm()
|
||||
resp.bw.WriteString(s)
|
||||
return resp.writeTerm()
|
||||
}
|
||||
|
||||
func (resp *RespWriter) writeBulkInt64(n int64) error {
|
||||
return resp.WriteBulk(strconv.AppendInt(resp.numScratch[:0], n, 10))
|
||||
}
|
||||
|
||||
func (resp *RespWriter) writeBulkFloat64(n float64) error {
|
||||
return resp.WriteBulk(strconv.AppendFloat(resp.numScratch[:0], n, 'g', -1, 64))
|
||||
}
|
||||
|
||||
// RESP command is array of bulk string
|
||||
func (resp *RespWriter) WriteCommand(cmd string, args ...interface{}) error {
|
||||
resp.bw.WriteByte('*')
|
||||
|
||||
resp.writeInteger(int64(1 + len(args)))
|
||||
resp.writeTerm()
|
||||
|
||||
err := resp.writeBulkString(cmd)
|
||||
|
||||
for _, arg := range args {
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
switch arg := arg.(type) {
|
||||
case string:
|
||||
err = resp.writeBulkString(arg)
|
||||
case []byte:
|
||||
err = resp.WriteBulk(arg)
|
||||
case int:
|
||||
err = resp.writeBulkInt64(int64(arg))
|
||||
case int64:
|
||||
err = resp.writeBulkInt64(arg)
|
||||
case float64:
|
||||
err = resp.writeBulkFloat64(arg)
|
||||
case bool:
|
||||
if arg {
|
||||
err = resp.writeBulkString("1")
|
||||
} else {
|
||||
err = resp.writeBulkString("0")
|
||||
}
|
||||
case nil:
|
||||
err = resp.writeBulkString("")
|
||||
default:
|
||||
var buf bytes.Buffer
|
||||
fmt.Fprint(&buf, arg)
|
||||
err = resp.WriteBulk(buf.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return resp.Flush()
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package goredis
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestResp(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
|
||||
reader := NewRespReader(bufio.NewReader(&buf))
|
||||
writer := NewRespWriter(bufio.NewWriter(&buf))
|
||||
|
||||
if err := writer.WriteCommand("SELECT", 1); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if reqs, err := reader.ParseRequest(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(reqs) != 2 {
|
||||
t.Fatal(len(reqs))
|
||||
} else if string(reqs[0]) != "SELECT" {
|
||||
t.Fatal(string(reqs[0]))
|
||||
} else if string(reqs[1]) != "1" {
|
||||
t.Fatal(string(reqs[1]))
|
||||
}
|
||||
}
|
||||
|
||||
if err := writer.FlushInteger(10); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if n, err := Int64(reader.Parse()); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if n != 10 {
|
||||
t.Fatal(n)
|
||||
}
|
||||
}
|
||||
|
||||
if err := writer.FlushString("abc"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if s, err := String(reader.Parse()); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if s != "abc" {
|
||||
t.Fatal(s)
|
||||
}
|
||||
}
|
||||
|
||||
if err := writer.FlushBulk([]byte("abc")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if s, err := String(reader.Parse()); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if s != "abc" {
|
||||
t.Fatal(s)
|
||||
}
|
||||
}
|
||||
|
||||
ay := []interface{}{[]byte("SET"), []byte("a"), []byte("1")}
|
||||
if err := writer.FlushArray(ay); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if oy, err := reader.Parse(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(oy, ay) {
|
||||
t.Fatalf("%#v", oy)
|
||||
}
|
||||
}
|
||||
|
||||
e := Error("hello world")
|
||||
if err := writer.FlushError(e); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if ee, err := reader.Parse(); err != nil {
|
||||
t.Fatal("must error")
|
||||
} else if !reflect.DeepEqual(e, ee) {
|
||||
t.Fatal(ee)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,10 +4,10 @@ import (
|
|||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/siddontang/go/arena"
|
||||
"github.com/siddontang/go/hack"
|
||||
"github.com/siddontang/go/log"
|
||||
"github.com/siddontang/go/num"
|
||||
"github.com/siddontang/goredis"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"io"
|
||||
"net"
|
||||
|
@ -23,9 +23,8 @@ type respClient struct {
|
|||
*client
|
||||
|
||||
conn net.Conn
|
||||
rb *bufio.Reader
|
||||
|
||||
ar *arena.Arena
|
||||
respReader *goredis.RespReader
|
||||
|
||||
activeQuit bool
|
||||
}
|
||||
|
@ -76,14 +75,12 @@ func newClientRESP(conn net.Conn, app *App) {
|
|||
tcpConn.SetWriteBuffer(app.cfg.ConnWriteBufferSize)
|
||||
}
|
||||
|
||||
c.rb = bufio.NewReaderSize(conn, app.cfg.ConnReadBufferSize)
|
||||
br := bufio.NewReaderSize(conn, app.cfg.ConnReadBufferSize)
|
||||
c.respReader = goredis.NewRespReader(br)
|
||||
|
||||
c.resp = newWriterRESP(conn, app.cfg.ConnWriteBufferSize)
|
||||
c.remoteAddr = conn.RemoteAddr().String()
|
||||
|
||||
//maybe another config?
|
||||
c.ar = arena.NewArena(app.cfg.ConnReadBufferSize)
|
||||
|
||||
app.connWait.Add(1)
|
||||
|
||||
app.addRespClient(c)
|
||||
|
@ -131,14 +128,12 @@ func (c *respClient) run() {
|
|||
c.conn.SetReadDeadline(time.Now().Add(kc))
|
||||
}
|
||||
|
||||
reqData, err := c.readRequest()
|
||||
if err == nil {
|
||||
err = c.handleRequest(reqData)
|
||||
|
||||
c.cmd = ""
|
||||
c.args = nil
|
||||
|
||||
c.ar.Reset()
|
||||
reqData, err := c.respReader.ParseRequest()
|
||||
if err == nil {
|
||||
err = c.handleRequest(reqData)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -147,10 +142,6 @@ func (c *respClient) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *respClient) readRequest() ([][]byte, error) {
|
||||
return ReadRequest(c.rb, c.ar)
|
||||
}
|
||||
|
||||
func (c *respClient) handleRequest(reqData [][]byte) error {
|
||||
if len(reqData) == 0 {
|
||||
c.cmd = ""
|
||||
|
@ -221,7 +212,7 @@ func newWriterRESP(conn net.Conn, size int) *respWriter {
|
|||
}
|
||||
|
||||
func (w *respWriter) writeError(err error) {
|
||||
w.buff.Write(hack.Slice("-ERR"))
|
||||
w.buff.Write(hack.Slice("-"))
|
||||
if err != nil {
|
||||
w.buff.WriteByte(' ')
|
||||
w.buff.Write(hack.Slice(err.Error()))
|
||||
|
|
124
server/util.go
124
server/util.go
|
@ -1,128 +1,6 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/siddontang/go/arena"
|
||||
"io"
|
||||
)
|
||||
|
||||
var (
|
||||
errLineFormat = errors.New("bad response line format")
|
||||
)
|
||||
|
||||
func ReadLine(rb *bufio.Reader) ([]byte, error) {
|
||||
p, err := rb.ReadSlice('\n')
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i := len(p) - 2
|
||||
if i < 0 || p[i] != '\r' {
|
||||
return nil, errLineFormat
|
||||
}
|
||||
|
||||
return p[:i], nil
|
||||
}
|
||||
|
||||
func readBytes(br *bufio.Reader, a *arena.Arena) (bytes []byte, err error) {
|
||||
size, err := readLong(br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if size == -1 {
|
||||
return nil, nil
|
||||
}
|
||||
if size < 0 {
|
||||
return nil, errors.New("Invalid size: " + fmt.Sprint("%d", size))
|
||||
}
|
||||
|
||||
buf := a.Make(int(size) + 2)
|
||||
if _, err = io.ReadFull(br, buf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if buf[len(buf)-2] != '\r' && buf[len(buf)-1] != '\n' {
|
||||
return nil, errors.New("bad bulk string format")
|
||||
}
|
||||
|
||||
bytes = buf[0 : len(buf)-2]
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func readLong(in *bufio.Reader) (result int64, err error) {
|
||||
read, err := in.ReadByte()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
var sign int
|
||||
if read == '-' {
|
||||
read, err = in.ReadByte()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
sign = -1
|
||||
} else {
|
||||
sign = 1
|
||||
}
|
||||
var number int64
|
||||
for number = 0; err == nil; read, err = in.ReadByte() {
|
||||
if read == '\r' {
|
||||
read, err = in.ReadByte()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
if read == '\n' {
|
||||
return number * int64(sign), nil
|
||||
} else {
|
||||
return -1, errors.New("Bad line ending")
|
||||
}
|
||||
}
|
||||
value := read - '0'
|
||||
if value >= 0 && value < 10 {
|
||||
number *= 10
|
||||
number += int64(value)
|
||||
} else {
|
||||
return -1, errors.New("Invalid digit")
|
||||
}
|
||||
}
|
||||
return -1, err
|
||||
}
|
||||
|
||||
func ReadRequest(in *bufio.Reader, a *arena.Arena) ([][]byte, error) {
|
||||
code, err := in.ReadByte()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if code != '*' {
|
||||
return nil, errReadRequest
|
||||
}
|
||||
|
||||
var nparams int64
|
||||
if nparams, err = readLong(in); err != nil {
|
||||
return nil, err
|
||||
} else if nparams <= 0 {
|
||||
return nil, errReadRequest
|
||||
}
|
||||
|
||||
req := make([][]byte, nparams)
|
||||
for i := range req {
|
||||
if code, err = in.ReadByte(); err != nil {
|
||||
return nil, err
|
||||
} else if code != '$' {
|
||||
return nil, errReadRequest
|
||||
}
|
||||
|
||||
if req[i], err = readBytes(in, a); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
import ()
|
||||
|
||||
func lowerSlice(buf []byte) []byte {
|
||||
for i, r := range buf {
|
||||
|
|
Loading…
Reference in New Issue