diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 38c3bf1..2773e88 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -22,10 +22,6 @@ "ImportPath": "github.com/edsrzf/mmap-go", "Rev": "6c75090c55983bef2e129e173681b20d24871ef8" }, - { - "ImportPath": "github.com/siddontang/go/arena", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" - }, { "ImportPath": "github.com/siddontang/go/bson", "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" @@ -60,7 +56,7 @@ }, { "ImportPath": "github.com/siddontang/goredis", - "Rev": "ca4c5d7500bcc6850c52824caf2c49a6015c8a03" + "Rev": "802ef3bdd5f642335f9ed132e024e5e2fd3d03ce" }, { "ImportPath": "github.com/siddontang/rdb", diff --git a/Godeps/_workspace/src/github.com/siddontang/go/arena/arena.go b/Godeps/_workspace/src/github.com/siddontang/go/arena/arena.go deleted file mode 100644 index 2ca98aa..0000000 --- a/Godeps/_workspace/src/github.com/siddontang/go/arena/arena.go +++ /dev/null @@ -1,30 +0,0 @@ -package arena - -type Arena struct { - buf []byte - - offset int -} - -func NewArena(size int) *Arena { - a := new(Arena) - - a.buf = make([]byte, size, size) - a.offset = 0 - - return a -} - -func (a *Arena) Make(size int) []byte { - if len(a.buf) < size || len(a.buf)-a.offset < size { - return make([]byte, size) - } - - b := a.buf[a.offset : size+a.offset] - a.offset += size - return b -} - -func (a *Arena) Reset() { - a.offset = 0 -} diff --git a/Godeps/_workspace/src/github.com/siddontang/go/arena/arena_test.go b/Godeps/_workspace/src/github.com/siddontang/go/arena/arena_test.go deleted file mode 100644 index 607b2a4..0000000 --- a/Godeps/_workspace/src/github.com/siddontang/go/arena/arena_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package arena - -import ( - "fmt" - "testing" -) - -func TestArena(t *testing.T) { - a := NewArena(100) - - for i := 0; i < 50; i++ { - a.buf[i] = 1 - } - - for i := 50; i < 100; i++ { - a.buf[i] = 2 - } - - b1 := a.Make(50) - - b2 := a.Make(30) - - b3 := a.Make(40) - - fmt.Printf("%p %d\n", b1, b1[49]) - fmt.Printf("%p %d\n", b2, b2[29]) - fmt.Printf("%p %d\n", b3, b3[39]) -} diff --git a/Godeps/_workspace/src/github.com/siddontang/goredis/client.go b/Godeps/_workspace/src/github.com/siddontang/goredis/client.go index 2ada1d9..995816f 100644 --- a/Godeps/_workspace/src/github.com/siddontang/goredis/client.go +++ b/Godeps/_workspace/src/github.com/siddontang/goredis/client.go @@ -21,6 +21,11 @@ func (c *PoolConn) Close() { c.c.put(c.Conn) } +// force close inner connection and not put it into pool +func (c *PoolConn) Finalize() { + c.Conn.Close() +} + type Client struct { sync.Mutex diff --git a/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go b/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go index 1c84346..4a1f4e6 100644 --- a/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go +++ b/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go @@ -2,21 +2,12 @@ package goredis import ( "bufio" - "bytes" - "errors" - "fmt" "io" "net" - "strconv" "sync/atomic" "time" ) -// Error represents an error returned in a command reply. -type Error string - -func (err Error) Error() string { return string(err) } - type sizeWriter int64 func (s *sizeWriter) Write(p []byte) (int, error) { @@ -29,12 +20,8 @@ type Conn struct { br *bufio.Reader bw *bufio.Writer - // Scratch space for formatting argument length. - // '*' or '$', length, "\r\n" - lenScratch [32]byte - - // Scratch space for formatting integers and floats. - numScratch [40]byte + respReader *RespReader + respWriter *RespWriter totalReadSize sizeWriter totalWriteSize sizeWriter @@ -58,6 +45,9 @@ func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) { c.br = bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize) c.bw = bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize) + c.respReader = NewRespReader(c.br) + c.respWriter = NewRespWriter(c.bw) + atomic.StoreInt32(&c.closed, 0) return c, nil @@ -102,20 +92,16 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } func (c *Conn) Send(cmd string, args ...interface{}) error { - if err := c.writeCommand(cmd, args); err != nil { + if err := c.respWriter.WriteCommand(cmd, args...); err != nil { c.Close() return err } - if err := c.bw.Flush(); err != nil { - c.Close() - return err - } return nil } func (c *Conn) Receive() (interface{}, error) { - if reply, err := c.readReply(); err != nil { + if reply, err := c.respReader.Parse(); err != nil { c.Close() return nil, err } else { @@ -128,7 +114,7 @@ func (c *Conn) Receive() (interface{}, error) { } func (c *Conn) ReceiveBulkTo(w io.Writer) error { - err := c.readBulkReplyTo(w) + err := c.respReader.ParseBulkTo(w) if err != nil { if _, ok := err.(Error); !ok { c.Close() @@ -137,245 +123,6 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { return err } -func (c *Conn) writeLen(prefix byte, n int) error { - c.lenScratch[len(c.lenScratch)-1] = '\n' - c.lenScratch[len(c.lenScratch)-2] = '\r' - i := len(c.lenScratch) - 3 - for { - c.lenScratch[i] = byte('0' + n%10) - i -= 1 - n = n / 10 - if n == 0 { - break - } - } - c.lenScratch[i] = prefix - _, err := c.bw.Write(c.lenScratch[i:]) - return err -} - -func (c *Conn) writeString(s string) error { - c.writeLen('$', len(s)) - c.bw.WriteString(s) - _, err := c.bw.WriteString("\r\n") - return err -} - -func (c *Conn) writeBytes(p []byte) error { - c.writeLen('$', len(p)) - c.bw.Write(p) - _, err := c.bw.WriteString("\r\n") - return err -} - -func (c *Conn) writeInt64(n int64) error { - return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10)) -} - -func (c *Conn) writeFloat64(n float64) error { - return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64)) -} - -func (c *Conn) writeCommand(cmd string, args []interface{}) (err error) { - c.writeLen('*', 1+len(args)) - err = c.writeString(cmd) - for _, arg := range args { - if err != nil { - break - } - switch arg := arg.(type) { - case string: - err = c.writeString(arg) - case []byte: - err = c.writeBytes(arg) - case int: - err = c.writeInt64(int64(arg)) - case int64: - err = c.writeInt64(arg) - case float64: - err = c.writeFloat64(arg) - case bool: - if arg { - err = c.writeString("1") - } else { - err = c.writeString("0") - } - case nil: - err = c.writeString("") - default: - var buf bytes.Buffer - fmt.Fprint(&buf, arg) - err = c.writeBytes(buf.Bytes()) - } - } - return err -} - -func (c *Conn) readLine() ([]byte, error) { - p, err := c.br.ReadSlice('\n') - if err == bufio.ErrBufferFull { - return nil, errors.New("long response line") - } - if err != nil { - return nil, err - } - i := len(p) - 2 - if i < 0 || p[i] != '\r' { - return nil, errors.New("bad response line terminator") - } - return p[:i], nil -} - -// parseLen parses bulk string and array lengths. -func parseLen(p []byte) (int, error) { - if len(p) == 0 { - return -1, errors.New("malformed length") - } - - if p[0] == '-' && len(p) == 2 && p[1] == '1' { - // handle $-1 and $-1 null replies. - return -1, nil - } - - var n int - for _, b := range p { - n *= 10 - if b < '0' || b > '9' { - return -1, errors.New("illegal bytes in length") - } - n += int(b - '0') - } - - return n, nil -} - -// parseInt parses an integer reply. -func parseInt(p []byte) (interface{}, error) { - if len(p) == 0 { - return 0, errors.New("malformed integer") - } - - var negate bool - if p[0] == '-' { - negate = true - p = p[1:] - if len(p) == 0 { - return 0, errors.New("malformed integer") - } - } - - var n int64 - for _, b := range p { - n *= 10 - if b < '0' || b > '9' { - return 0, errors.New("illegal bytes in length") - } - n += int64(b - '0') - } - - if negate { - n = -n - } - return n, nil -} - -var ( - okReply interface{} = "OK" - pongReply interface{} = "PONG" -) - -func (c *Conn) readBulkReplyTo(w io.Writer) error { - line, err := c.readLine() - if err != nil { - return err - } - if len(line) == 0 { - return errors.New("ledis: short response line") - } - switch line[0] { - case '-': - return Error(string(line[1:])) - case '$': - n, err := parseLen(line[1:]) - if n < 0 || err != nil { - return err - } - - var nn int64 - if nn, err = io.CopyN(w, c.br, int64(n)); err != nil { - return err - } else if nn != int64(n) { - return io.ErrShortWrite - } - - if line, err := c.readLine(); err != nil { - return err - } else if len(line) != 0 { - return errors.New("bad bulk string format") - } - return nil - default: - return fmt.Errorf("not invalid bulk string type, but %c", line[0]) - } -} - -func (c *Conn) readReply() (interface{}, error) { - line, err := c.readLine() - if err != nil { - return nil, err - } - if len(line) == 0 { - return nil, errors.New("short response line") - } - switch line[0] { - case '+': - switch { - case len(line) == 3 && line[1] == 'O' && line[2] == 'K': - // Avoid allocation for frequent "+OK" response. - return okReply, nil - case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G': - // Avoid allocation in PING command benchmarks :) - return pongReply, nil - default: - return string(line[1:]), nil - } - case '-': - return Error(string(line[1:])), nil - case ':': - return parseInt(line[1:]) - case '$': - n, err := parseLen(line[1:]) - if n < 0 || err != nil { - return nil, err - } - p := make([]byte, n) - _, err = io.ReadFull(c.br, p) - if err != nil { - return nil, err - } - if line, err := c.readLine(); err != nil { - return nil, err - } else if len(line) != 0 { - return nil, errors.New("bad bulk string format") - } - return p, nil - case '*': - n, err := parseLen(line[1:]) - if n < 0 || err != nil { - return nil, err - } - r := make([]interface{}, n) - for i := range r { - r[i], err = c.readReply() - if err != nil { - return nil, err - } - } - return r, nil - } - return nil, errors.New("unexpected response line") -} - func (c *Client) newConn(addr string, pass string) (*Conn, error) { co, err := ConnectWithSize(addr, c.readBufferSize, c.writeBufferSize) if err != nil { diff --git a/Godeps/_workspace/src/github.com/siddontang/goredis/resp.go b/Godeps/_workspace/src/github.com/siddontang/goredis/resp.go new file mode 100644 index 0000000..5d2a926 --- /dev/null +++ b/Godeps/_workspace/src/github.com/siddontang/goredis/resp.go @@ -0,0 +1,456 @@ +package goredis + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "strconv" +) + +type Error string + +func (err Error) Error() string { return string(err) } + +var ( + okReply interface{} = "OK" + pongReply interface{} = "PONG" +) + +type RespReader struct { + br *bufio.Reader +} + +func NewRespReader(br *bufio.Reader) *RespReader { + r := &RespReader{br} + return r +} + +// Parse RESP +func (resp *RespReader) Parse() (interface{}, error) { + line, err := readLine(resp.br) + if err != nil { + return nil, err + } + if len(line) == 0 { + return nil, errors.New("short resp line") + } + switch line[0] { + case '+': + switch { + case len(line) == 3 && line[1] == 'O' && line[2] == 'K': + // Avoid allocation for frequent "+OK" response. + return okReply, nil + case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G': + // Avoid allocation in PING command benchmarks :) + return pongReply, nil + default: + return string(line[1:]), nil + } + case '-': + return Error(string(line[1:])), nil + case ':': + n, err := parseInt(line[1:]) + return n, err + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + p := make([]byte, n) + _, err = io.ReadFull(resp.br, p) + if err != nil { + return nil, err + } + if line, err := readLine(resp.br); err != nil { + return nil, err + } else if len(line) != 0 { + return nil, errors.New("bad bulk string format") + } + return p, nil + case '*': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + r := make([]interface{}, n) + for i := range r { + r[i], err = resp.Parse() + if err != nil { + return nil, err + } + } + return r, nil + } + return nil, errors.New("unexpected response line") +} + +// Parse client -> server command request, must be array of bulk strings +func (resp *RespReader) ParseRequest() ([][]byte, error) { + line, err := readLine(resp.br) + if err != nil { + return nil, err + } + if len(line) == 0 { + return nil, errors.New("short resp line") + } + switch line[0] { + case '*': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + r := make([][]byte, n) + for i := range r { + r[i], err = parseBulk(resp.br) + if err != nil { + return nil, err + } + } + return r, nil + default: + return nil, fmt.Errorf("not invalid array of bulk string type, but %c", line[0]) + } +} + +// Parse bulk string and write it with writer w +func (resp *RespReader) ParseBulkTo(w io.Writer) error { + line, err := readLine(resp.br) + if err != nil { + return err + } + if len(line) == 0 { + return errors.New("ledis: short response line") + } + + switch line[0] { + case '-': + return Error(string(line[1:])) + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return err + } + + var nn int64 + if nn, err = io.CopyN(w, resp.br, int64(n)); err != nil { + return err + } else if nn != int64(n) { + return io.ErrShortWrite + } + + if line, err := readLine(resp.br); err != nil { + return err + } else if len(line) != 0 { + return errors.New("bad bulk string format") + } + return nil + default: + return fmt.Errorf("not invalid bulk string type, but %c", line[0]) + } +} + +func readLine(br *bufio.Reader) ([]byte, error) { + p, err := br.ReadSlice('\n') + if err == bufio.ErrBufferFull { + return nil, errors.New("long resp line") + } + if err != nil { + return nil, err + } + i := len(p) - 2 + if i < 0 || p[i] != '\r' { + return nil, errors.New("bad resp line terminator") + } + return p[:i], nil +} + +// parseLen parses bulk string and array lengths. +func parseLen(p []byte) (int, error) { + if len(p) == 0 { + return -1, errors.New("malformed length") + } + + if p[0] == '-' && len(p) == 2 && p[1] == '1' { + // handle $-1 and $-1 null replies. + return -1, nil + } + + var n int + for _, b := range p { + n *= 10 + if b < '0' || b > '9' { + return -1, errors.New("illegal bytes in length") + } + n += int(b - '0') + } + + return n, nil +} + +// parseInt parses an integer reply. +func parseInt(p []byte) (int64, error) { + if len(p) == 0 { + return 0, errors.New("malformed integer") + } + + var negate bool + if p[0] == '-' { + negate = true + p = p[1:] + if len(p) == 0 { + return 0, errors.New("malformed integer") + } + } + + var n int64 + for _, b := range p { + n *= 10 + if b < '0' || b > '9' { + return 0, errors.New("illegal bytes in length") + } + n += int64(b - '0') + } + + if negate { + n = -n + } + return n, nil +} + +func parseBulk(br *bufio.Reader) ([]byte, error) { + line, err := readLine(br) + if err != nil { + return nil, err + } + if len(line) == 0 { + return nil, errors.New("short resp line") + } + switch line[0] { + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + p := make([]byte, n) + _, err = io.ReadFull(br, p) + if err != nil { + return nil, err + } + if line, err := readLine(br); err != nil { + return nil, err + } else if len(line) != 0 { + return nil, errors.New("bad bulk string format") + } + return p, nil + default: + return nil, fmt.Errorf("not invalid bulk string type, but %c", line[0]) + } +} + +var ( + intBuffer [][]byte + respTerm = []byte("\r\n") + nullBulk = []byte("-1") + nullArray = []byte("-1") +) + +func init() { + cnt := 10000 + intBuffer = make([][]byte, cnt) + for i := 0; i < cnt; i++ { + intBuffer[i] = []byte(strconv.Itoa(i)) + } +} + +type RespWriter struct { + bw *bufio.Writer + // Scratch space for formatting integers and floats. + numScratch [40]byte +} + +func NewRespWriter(bw *bufio.Writer) *RespWriter { + r := &RespWriter{bw: bw} + return r +} + +func (resp *RespWriter) Flush() error { + return resp.bw.Flush() +} + +func (resp *RespWriter) writeTerm() error { + _, err := resp.bw.Write(respTerm) + return err +} + +func (resp *RespWriter) writeInteger(n int64) error { + var err error + if n < int64(len(intBuffer)) { + _, err = resp.bw.Write(intBuffer[n]) + } else { + _, err = resp.bw.Write(strconv.AppendInt(nil, n, 10)) + } + return err +} + +func (resp *RespWriter) WriteInteger(n int64) error { + resp.bw.WriteByte(':') + + resp.writeInteger(n) + + return resp.writeTerm() +} + +func (resp *RespWriter) FlushInteger(n int64) error { + resp.WriteInteger(n) + return resp.Flush() +} + +func (resp *RespWriter) WriteString(s string) error { + resp.bw.WriteByte('+') + resp.bw.WriteString(s) + return resp.writeTerm() +} + +func (resp *RespWriter) FlushString(s string) error { + resp.WriteString(s) + return resp.Flush() +} + +func (resp *RespWriter) WriteError(e error) error { + resp.bw.WriteByte('-') + + if e != nil { + resp.bw.WriteString(e.Error()) + } else { + resp.bw.WriteString("error is nil, invalid") + } + + return resp.writeTerm() +} + +func (resp *RespWriter) FlushError(e error) error { + resp.WriteError(e) + return resp.Flush() +} + +func (resp *RespWriter) WriteBulk(b []byte) error { + resp.bw.WriteByte('$') + if b == nil { + resp.bw.Write(nullBulk) + } else { + resp.writeInteger(int64(len(b))) + resp.writeTerm() + resp.bw.Write(b) + } + return resp.writeTerm() +} + +func (resp *RespWriter) FlushBulk(b []byte) error { + resp.WriteBulk(b) + return resp.Flush() +} + +func (resp *RespWriter) WriteArray(ay []interface{}) error { + resp.bw.WriteByte('*') + if ay == nil { + resp.bw.Write(nullArray) + return resp.writeTerm() + } else { + resp.writeInteger(int64(len(ay))) + resp.writeTerm() + + var err error + for i := 0; i < len(ay); i++ { + if err != nil { + return err + } + + switch v := ay[i].(type) { + case []interface{}: + err = resp.WriteArray(v) + case []byte: + err = resp.WriteBulk(v) + case nil: + err = resp.WriteBulk(nil) + case int64: + err = resp.WriteInteger(v) + case string: + err = resp.WriteString(v) + case error: + err = resp.WriteError(v) + default: + err = fmt.Errorf("invalid array type %T %v", ay[i], v) + } + } + return err + } +} + +func (resp *RespWriter) FlushArray(ay []interface{}) error { + resp.WriteArray(ay) + return resp.Flush() +} + +func (resp *RespWriter) writeBulkString(s string) error { + resp.bw.WriteByte('$') + resp.writeInteger(int64(len(s))) + resp.writeTerm() + resp.bw.WriteString(s) + return resp.writeTerm() +} + +func (resp *RespWriter) writeBulkInt64(n int64) error { + return resp.WriteBulk(strconv.AppendInt(resp.numScratch[:0], n, 10)) +} + +func (resp *RespWriter) writeBulkFloat64(n float64) error { + return resp.WriteBulk(strconv.AppendFloat(resp.numScratch[:0], n, 'g', -1, 64)) +} + +// RESP command is array of bulk string +func (resp *RespWriter) WriteCommand(cmd string, args ...interface{}) error { + resp.bw.WriteByte('*') + + resp.writeInteger(int64(1 + len(args))) + resp.writeTerm() + + err := resp.writeBulkString(cmd) + + for _, arg := range args { + if err != nil { + break + } + switch arg := arg.(type) { + case string: + err = resp.writeBulkString(arg) + case []byte: + err = resp.WriteBulk(arg) + case int: + err = resp.writeBulkInt64(int64(arg)) + case int64: + err = resp.writeBulkInt64(arg) + case float64: + err = resp.writeBulkFloat64(arg) + case bool: + if arg { + err = resp.writeBulkString("1") + } else { + err = resp.writeBulkString("0") + } + case nil: + err = resp.writeBulkString("") + default: + var buf bytes.Buffer + fmt.Fprint(&buf, arg) + err = resp.WriteBulk(buf.Bytes()) + } + } + + if err != nil { + return err + } + + return resp.Flush() +} diff --git a/Godeps/_workspace/src/github.com/siddontang/goredis/resp_test.go b/Godeps/_workspace/src/github.com/siddontang/goredis/resp_test.go new file mode 100644 index 0000000..c271797 --- /dev/null +++ b/Godeps/_workspace/src/github.com/siddontang/goredis/resp_test.go @@ -0,0 +1,81 @@ +package goredis + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +func TestResp(t *testing.T) { + var buf bytes.Buffer + + reader := NewRespReader(bufio.NewReader(&buf)) + writer := NewRespWriter(bufio.NewWriter(&buf)) + + if err := writer.WriteCommand("SELECT", 1); err != nil { + t.Fatal(err) + } else { + if reqs, err := reader.ParseRequest(); err != nil { + t.Fatal(err) + } else if len(reqs) != 2 { + t.Fatal(len(reqs)) + } else if string(reqs[0]) != "SELECT" { + t.Fatal(string(reqs[0])) + } else if string(reqs[1]) != "1" { + t.Fatal(string(reqs[1])) + } + } + + if err := writer.FlushInteger(10); err != nil { + t.Fatal(err) + } else { + if n, err := Int64(reader.Parse()); err != nil { + t.Fatal(err) + } else if n != 10 { + t.Fatal(n) + } + } + + if err := writer.FlushString("abc"); err != nil { + t.Fatal(err) + } else { + if s, err := String(reader.Parse()); err != nil { + t.Fatal(err) + } else if s != "abc" { + t.Fatal(s) + } + } + + if err := writer.FlushBulk([]byte("abc")); err != nil { + t.Fatal(err) + } else { + if s, err := String(reader.Parse()); err != nil { + t.Fatal(err) + } else if s != "abc" { + t.Fatal(s) + } + } + + ay := []interface{}{[]byte("SET"), []byte("a"), []byte("1")} + if err := writer.FlushArray(ay); err != nil { + t.Fatal(err) + } else { + if oy, err := reader.Parse(); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(oy, ay) { + t.Fatalf("%#v", oy) + } + } + + e := Error("hello world") + if err := writer.FlushError(e); err != nil { + t.Fatal(err) + } else { + if ee, err := reader.Parse(); err != nil { + t.Fatal("must error") + } else if !reflect.DeepEqual(e, ee) { + t.Fatal(ee) + } + } +} diff --git a/server/client_resp.go b/server/client_resp.go index eabc357..47d7d58 100644 --- a/server/client_resp.go +++ b/server/client_resp.go @@ -4,10 +4,10 @@ import ( "bufio" "errors" "fmt" - "github.com/siddontang/go/arena" "github.com/siddontang/go/hack" "github.com/siddontang/go/log" "github.com/siddontang/go/num" + "github.com/siddontang/goredis" "github.com/siddontang/ledisdb/ledis" "io" "net" @@ -23,9 +23,8 @@ type respClient struct { *client conn net.Conn - rb *bufio.Reader - ar *arena.Arena + respReader *goredis.RespReader activeQuit bool } @@ -76,14 +75,12 @@ func newClientRESP(conn net.Conn, app *App) { tcpConn.SetWriteBuffer(app.cfg.ConnWriteBufferSize) } - c.rb = bufio.NewReaderSize(conn, app.cfg.ConnReadBufferSize) + br := bufio.NewReaderSize(conn, app.cfg.ConnReadBufferSize) + c.respReader = goredis.NewRespReader(br) c.resp = newWriterRESP(conn, app.cfg.ConnWriteBufferSize) c.remoteAddr = conn.RemoteAddr().String() - //maybe another config? - c.ar = arena.NewArena(app.cfg.ConnReadBufferSize) - app.connWait.Add(1) app.addRespClient(c) @@ -131,14 +128,12 @@ func (c *respClient) run() { c.conn.SetReadDeadline(time.Now().Add(kc)) } - reqData, err := c.readRequest() + c.cmd = "" + c.args = nil + + reqData, err := c.respReader.ParseRequest() if err == nil { err = c.handleRequest(reqData) - - c.cmd = "" - c.args = nil - - c.ar.Reset() } if err != nil { @@ -147,10 +142,6 @@ func (c *respClient) run() { } } -func (c *respClient) readRequest() ([][]byte, error) { - return ReadRequest(c.rb, c.ar) -} - func (c *respClient) handleRequest(reqData [][]byte) error { if len(reqData) == 0 { c.cmd = "" @@ -221,7 +212,7 @@ func newWriterRESP(conn net.Conn, size int) *respWriter { } func (w *respWriter) writeError(err error) { - w.buff.Write(hack.Slice("-ERR")) + w.buff.Write(hack.Slice("-")) if err != nil { w.buff.WriteByte(' ') w.buff.Write(hack.Slice(err.Error())) diff --git a/server/util.go b/server/util.go index a6ef5d4..f4fe490 100644 --- a/server/util.go +++ b/server/util.go @@ -1,128 +1,6 @@ package server -import ( - "bufio" - "errors" - "fmt" - "github.com/siddontang/go/arena" - "io" -) - -var ( - errLineFormat = errors.New("bad response line format") -) - -func ReadLine(rb *bufio.Reader) ([]byte, error) { - p, err := rb.ReadSlice('\n') - - if err != nil { - return nil, err - } - i := len(p) - 2 - if i < 0 || p[i] != '\r' { - return nil, errLineFormat - } - - return p[:i], nil -} - -func readBytes(br *bufio.Reader, a *arena.Arena) (bytes []byte, err error) { - size, err := readLong(br) - if err != nil { - return nil, err - } - if size == -1 { - return nil, nil - } - if size < 0 { - return nil, errors.New("Invalid size: " + fmt.Sprint("%d", size)) - } - - buf := a.Make(int(size) + 2) - if _, err = io.ReadFull(br, buf); err != nil { - return nil, err - } - - if buf[len(buf)-2] != '\r' && buf[len(buf)-1] != '\n' { - return nil, errors.New("bad bulk string format") - } - - bytes = buf[0 : len(buf)-2] - - return -} - -func readLong(in *bufio.Reader) (result int64, err error) { - read, err := in.ReadByte() - if err != nil { - return -1, err - } - var sign int - if read == '-' { - read, err = in.ReadByte() - if err != nil { - return -1, err - } - sign = -1 - } else { - sign = 1 - } - var number int64 - for number = 0; err == nil; read, err = in.ReadByte() { - if read == '\r' { - read, err = in.ReadByte() - if err != nil { - return -1, err - } - if read == '\n' { - return number * int64(sign), nil - } else { - return -1, errors.New("Bad line ending") - } - } - value := read - '0' - if value >= 0 && value < 10 { - number *= 10 - number += int64(value) - } else { - return -1, errors.New("Invalid digit") - } - } - return -1, err -} - -func ReadRequest(in *bufio.Reader, a *arena.Arena) ([][]byte, error) { - code, err := in.ReadByte() - if err != nil { - return nil, err - } - - if code != '*' { - return nil, errReadRequest - } - - var nparams int64 - if nparams, err = readLong(in); err != nil { - return nil, err - } else if nparams <= 0 { - return nil, errReadRequest - } - - req := make([][]byte, nparams) - for i := range req { - if code, err = in.ReadByte(); err != nil { - return nil, err - } else if code != '$' { - return nil, errReadRequest - } - - if req[i], err = readBytes(in, a); err != nil { - return nil, err - } - } - - return req, nil -} +import () func lowerSlice(buf []byte) []byte { for i, r := range buf {