ledisdb/server/client.go

316 lines
5.5 KiB
Go
Raw Normal View History

package server
2014-05-02 13:08:20 +04:00
import (
"bufio"
2014-06-09 13:23:32 +04:00
"bytes"
2014-05-02 13:08:20 +04:00
"errors"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/ledis"
2014-05-02 13:08:20 +04:00
"io"
"net"
"runtime"
"strconv"
2014-05-03 10:55:12 +04:00
"strings"
2014-06-11 12:48:11 +04:00
"time"
2014-05-02 13:08:20 +04:00
)
2014-05-05 07:37:44 +04:00
var errReadRequest = errors.New("invalid request protocol")
2014-05-02 13:08:20 +04:00
type client struct {
app *App
2014-05-20 04:41:24 +04:00
ldb *ledis.Ledis
db *ledis.DB
c net.Conn
2014-05-02 13:08:20 +04:00
rb *bufio.Reader
wb *bufio.Writer
2014-05-03 10:55:12 +04:00
cmd string
args [][]byte
2014-05-09 05:17:28 +04:00
reqC chan error
2014-06-09 13:23:32 +04:00
syncBuf bytes.Buffer
2014-06-12 10:10:29 +04:00
2014-07-04 13:55:47 +04:00
compressBuf []byte
2014-06-12 10:10:29 +04:00
logBuf bytes.Buffer
2014-05-02 13:08:20 +04:00
}
func newClient(c net.Conn, app *App) {
2014-05-02 13:08:20 +04:00
co := new(client)
co.app = app
co.ldb = app.ldb
2014-05-20 04:41:24 +04:00
//use default db
co.db, _ = app.ldb.Select(0)
2014-05-02 13:08:20 +04:00
co.c = c
co.rb = bufio.NewReaderSize(c, 256)
co.wb = bufio.NewWriterSize(c, 256)
2014-05-09 05:17:28 +04:00
co.reqC = make(chan error, 1)
2014-07-04 13:55:47 +04:00
co.compressBuf = make([]byte, 256)
2014-05-02 13:08:20 +04:00
go co.run()
}
func (c *client) run() {
defer func() {
if e := recover(); e != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[0:n]
log.Fatal("client run panic %s:%v", buf, e)
}
c.c.Close()
}()
for {
req, err := c.readRequest()
if err != nil {
return
}
c.handleRequest(req)
}
}
func (c *client) readLine() ([]byte, error) {
2014-06-09 13:23:32 +04:00
return readLine(c.rb)
2014-05-02 13:08:20 +04:00
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *client) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
2014-05-05 07:37:44 +04:00
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
2014-05-02 13:08:20 +04:00
return nil, err
2014-05-05 07:37:44 +04:00
} else if nparams <= 0 {
2014-05-02 13:08:20 +04:00
return nil, errReadRequest
}
2014-05-05 07:37:44 +04:00
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
2014-05-02 13:08:20 +04:00
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
2014-05-02 13:08:20 +04:00
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
2014-06-09 13:23:32 +04:00
buf := make([]byte, n)
2014-05-02 13:08:20 +04:00
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
2014-06-09 13:23:32 +04:00
}
2014-05-05 07:37:44 +04:00
2014-06-09 13:23:32 +04:00
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
2014-05-02 13:08:20 +04:00
}
2014-06-09 13:23:32 +04:00
req = append(req, buf)
2014-05-02 13:08:20 +04:00
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *client) handleRequest(req [][]byte) {
var err error
2014-05-03 10:55:12 +04:00
2014-06-11 12:48:11 +04:00
start := time.Now()
2014-05-03 10:55:12 +04:00
if len(req) == 0 {
err = ErrEmptyCommand
2014-05-02 13:08:20 +04:00
} else {
c.cmd = strings.ToLower(ledis.String(req[0]))
2014-05-03 10:55:12 +04:00
c.args = req[1:]
f, ok := regCmds[c.cmd]
if !ok {
err = ErrNotFound
} else {
2014-05-09 05:17:28 +04:00
go func() {
c.reqC <- f(c)
}()
err = <-c.reqC
2014-05-03 10:55:12 +04:00
}
2014-05-02 13:08:20 +04:00
}
2014-06-11 12:48:11 +04:00
duration := time.Since(start)
if c.app.access != nil {
2014-06-12 10:10:29 +04:00
c.logBuf.Reset()
for i, r := range req {
left := 256 - c.logBuf.Len()
if left <= 0 {
break
} else if len(r) <= left {
c.logBuf.Write(r)
if i != len(req)-1 {
c.logBuf.WriteByte(' ')
}
} else {
c.logBuf.Write(r[0:left])
}
}
c.app.access.Log(c.c.RemoteAddr().String(), duration.Nanoseconds()/1000000, c.logBuf.Bytes(), err)
2014-06-11 12:48:11 +04:00
}
2014-05-02 13:08:20 +04:00
if err != nil {
c.writeError(err)
}
c.wb.Flush()
}
func (c *client) writeError(err error) {
c.wb.Write(ledis.Slice("-ERR"))
2014-05-02 13:08:20 +04:00
if err != nil {
c.wb.WriteByte(' ')
c.wb.Write(ledis.Slice(err.Error()))
2014-05-02 13:08:20 +04:00
}
c.wb.Write(Delims)
}
func (c *client) writeStatus(status string) {
c.wb.WriteByte('+')
c.wb.Write(ledis.Slice(status))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
}
func (c *client) writeInteger(n int64) {
c.wb.WriteByte(':')
c.wb.Write(ledis.StrPutInt64(n))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
}
func (c *client) writeBulk(b []byte) {
c.wb.WriteByte('$')
if b == nil {
c.wb.Write(NullBulk)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(b))))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
c.wb.Write(b)
}
c.wb.Write(Delims)
}
func (c *client) writeArray(ay []interface{}) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
2014-05-02 13:08:20 +04:00
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
switch v := ay[i].(type) {
case []interface{}:
c.writeArray(v)
case []byte:
c.writeBulk(v)
case nil:
c.writeBulk(nil)
case int64:
c.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
2014-06-09 13:23:32 +04:00
func (c *client) writeSliceArray(ay [][]byte) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i])
}
}
}
func (c *client) writeFVPairArray(ay []ledis.FVPair) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Field)
c.writeBulk(ay[i].Value)
}
}
}
func (c *client) writeScorePairArray(ay []ledis.ScorePair, withScores bool) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
if withScores {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
}
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Member)
if withScores {
c.writeBulk(ledis.StrPutInt64(ay[i].Score))
}
}
}
}
2014-06-09 13:23:32 +04:00
func (c *client) writeBulkFrom(n int64, rb io.Reader) {
c.wb.WriteByte('$')
c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10)))
c.wb.Write(Delims)
io.Copy(c.wb, rb)
c.wb.Write(Delims)
}