ledisdb/ledis/client.go

221 lines
3.6 KiB
Go
Raw Normal View History

2014-05-09 10:49:22 +04:00
package ledis
2014-05-02 13:08:20 +04:00
import (
"bufio"
"errors"
"github.com/siddontang/golib/log"
"io"
"net"
"runtime"
"strconv"
2014-05-03 10:55:12 +04:00
"strings"
2014-05-02 13:08:20 +04:00
)
2014-05-05 07:37:44 +04:00
var errReadRequest = errors.New("invalid request protocol")
2014-05-02 13:08:20 +04:00
type client struct {
db *DB
c net.Conn
2014-05-02 13:08:20 +04:00
rb *bufio.Reader
wb *bufio.Writer
2014-05-03 10:55:12 +04:00
cmd string
args [][]byte
2014-05-09 05:17:28 +04:00
reqC chan error
2014-05-02 13:08:20 +04:00
}
func newClient(c net.Conn, db *DB) {
2014-05-02 13:08:20 +04:00
co := new(client)
co.db = db
2014-05-02 13:08:20 +04:00
co.c = c
co.rb = bufio.NewReaderSize(c, 256)
co.wb = bufio.NewWriterSize(c, 256)
2014-05-09 05:17:28 +04:00
co.reqC = make(chan error, 1)
2014-05-02 13:08:20 +04:00
go co.run()
}
func (c *client) run() {
defer func() {
if e := recover(); e != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[0:n]
log.Fatal("client run panic %s:%v", buf, e)
}
c.c.Close()
}()
for {
req, err := c.readRequest()
if err != nil {
return
}
c.handleRequest(req)
}
}
func (c *client) readLine() ([]byte, error) {
var line []byte
for {
l, more, err := c.rb.ReadLine()
if err != nil {
return nil, err
}
if line == nil && !more {
return l, nil
}
line = append(line, l...)
if !more {
break
}
}
return line, nil
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *client) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
2014-05-05 07:37:44 +04:00
var nparams int
if nparams, err = strconv.Atoi(String(l[1:])); err != nil {
2014-05-02 13:08:20 +04:00
return nil, err
2014-05-05 07:37:44 +04:00
} else if nparams <= 0 {
2014-05-02 13:08:20 +04:00
return nil, errReadRequest
}
2014-05-05 07:37:44 +04:00
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
2014-05-02 13:08:20 +04:00
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(String(l[1:])); err != nil {
2014-05-02 13:08:20 +04:00
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
buf := make([]byte, n+2)
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
} else if buf[len(buf)-2] != '\r' || buf[len(buf)-1] != '\n' {
return nil, errReadRequest
2014-05-05 07:37:44 +04:00
2014-05-02 13:08:20 +04:00
} else {
req = append(req, buf[0:len(buf)-2])
}
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *client) handleRequest(req [][]byte) {
var err error
2014-05-03 10:55:12 +04:00
if len(req) == 0 {
err = ErrEmptyCommand
2014-05-02 13:08:20 +04:00
} else {
c.cmd = strings.ToLower(String(req[0]))
2014-05-03 10:55:12 +04:00
c.args = req[1:]
f, ok := regCmds[c.cmd]
if !ok {
err = ErrNotFound
} else {
2014-05-09 05:17:28 +04:00
go func() {
c.reqC <- f(c)
}()
err = <-c.reqC
2014-05-03 10:55:12 +04:00
}
2014-05-02 13:08:20 +04:00
}
if err != nil {
c.writeError(err)
}
c.wb.Flush()
}
func (c *client) writeError(err error) {
c.wb.Write(Slice("-ERR"))
2014-05-02 13:08:20 +04:00
if err != nil {
c.wb.WriteByte(' ')
c.wb.Write(Slice(err.Error()))
2014-05-02 13:08:20 +04:00
}
c.wb.Write(Delims)
}
func (c *client) writeStatus(status string) {
c.wb.WriteByte('+')
c.wb.Write(Slice(status))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
}
func (c *client) writeInteger(n int64) {
c.wb.WriteByte(':')
c.wb.Write(Slice(strconv.FormatInt(n, 10)))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
}
func (c *client) writeBulk(b []byte) {
c.wb.WriteByte('$')
if b == nil {
c.wb.Write(NullBulk)
} else {
c.wb.Write(Slice(strconv.Itoa(len(b))))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
c.wb.Write(b)
}
c.wb.Write(Delims)
}
func (c *client) writeArray(ay []interface{}) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(Slice(strconv.Itoa(len(ay))))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
switch v := ay[i].(type) {
case []interface{}:
c.writeArray(v)
case []byte:
c.writeBulk(v)
case nil:
c.writeBulk(nil)
case int64:
c.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}