Reuse buffer for constructing request.

This commit is contained in:
Vladimir Mihailenco 2012-08-14 18:20:22 +03:00
parent 625adac9ab
commit a2f5423658
8 changed files with 32 additions and 44 deletions

View File

@ -12,12 +12,14 @@ import (
type Conn struct { type Conn struct {
RW io.ReadWriteCloser RW io.ReadWriteCloser
Rd *bufio.Reader Rd *bufio.Reader
ReqBuf []byte
} }
func NewConn(rw io.ReadWriteCloser) *Conn { func NewConn(rw io.ReadWriteCloser) *Conn {
return &Conn{ return &Conn{
RW: rw, RW: rw,
Rd: bufio.NewReaderSize(rw, 1024), Rd: bufio.NewReaderSize(rw, 1024),
ReqBuf: make([]byte, 0, 1024),
} }
} }

View File

@ -35,21 +35,20 @@ func (c *MultiClient) Unwatch(keys ...string) *StatusReq {
func (c *MultiClient) Discard() { func (c *MultiClient) Discard() {
c.mtx.Lock() c.mtx.Lock()
c.reqs = c.reqs[:0] c.reqs = []Req{NewStatusReq("MULTI")}
c.mtx.Unlock() c.mtx.Unlock()
} }
func (c *MultiClient) Exec(do func()) ([]Req, error) { func (c *MultiClient) Exec(do func()) ([]Req, error) {
c.mtx.Lock() c.Discard()
c.reqs = make([]Req, 0)
c.mtx.Unlock()
do() do()
c.mtx.Lock() c.mtx.Lock()
if len(c.reqs) == 0 { c.reqs = append(c.reqs, NewMultiBulkReq("EXEC"))
if len(c.reqs) == 2 {
c.mtx.Unlock() c.mtx.Unlock()
return c.reqs, nil return []Req{}, nil
} }
reqs := c.reqs reqs := c.reqs
c.reqs = nil c.reqs = nil
@ -67,18 +66,11 @@ func (c *MultiClient) Exec(do func()) ([]Req, error) {
} }
c.ConnPool.Add(conn) c.ConnPool.Add(conn)
return reqs, nil return reqs[1 : len(reqs)-1], nil
} }
func (c *MultiClient) ExecReqs(reqs []Req, conn *Conn) error { func (c *MultiClient) ExecReqs(reqs []Req, conn *Conn) error {
multiReq := make([]byte, 0, 1024) err := c.WriteReq(conn, reqs...)
multiReq = append(multiReq, PackReq([]string{"MULTI"})...)
for _, req := range reqs {
multiReq = append(multiReq, req.Req()...)
}
multiReq = append(multiReq, PackReq([]string{"EXEC"})...)
err := c.WriteReq(multiReq, conn)
if err != nil { if err != nil {
return err return err
} }
@ -92,7 +84,7 @@ func (c *MultiClient) ExecReqs(reqs []Req, conn *Conn) error {
} }
// Parse queued replies. // Parse queued replies.
for _ = range reqs { for i := 1; i < len(reqs)-1; i++ {
_, err = statusReq.ParseReply(conn.Rd) _, err = statusReq.ParseReply(conn.Rd)
if err != nil { if err != nil {
return err return err
@ -112,7 +104,7 @@ func (c *MultiClient) ExecReqs(reqs []Req, conn *Conn) error {
} }
// Parse replies. // Parse replies.
for i := 0; i < len(reqs); i++ { for i := 1; i < len(reqs)-1; i++ {
req := reqs[i] req := reqs[i]
val, err := req.ParseReply(conn.Rd) val, err := req.ParseReply(conn.Rd)
if err != nil { if err != nil {

View File

@ -21,8 +21,7 @@ var (
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func PackReq(args []string) []byte { func AppendReq(buf []byte, args []string) []byte {
buf := make([]byte, 0, 1024)
buf = append(buf, '*') buf = append(buf, '*')
buf = strconv.AppendUint(buf, uint64(len(args)), 10) buf = strconv.AppendUint(buf, uint64(len(args)), 10)
buf = append(buf, '\r', '\n') buf = append(buf, '\r', '\n')

View File

@ -46,17 +46,7 @@ func (c *PipelineClient) RunQueued() ([]Req, error) {
} }
func (c *PipelineClient) RunReqs(reqs []Req, conn *Conn) error { func (c *PipelineClient) RunReqs(reqs []Req, conn *Conn) error {
var multiReq []byte err := c.WriteReq(conn, reqs...)
if len(reqs) == 1 {
multiReq = reqs[0].Req()
} else {
multiReq = make([]byte, 0, 1024)
for _, req := range reqs {
multiReq = append(multiReq, req.Req()...)
}
}
err := c.WriteReq(multiReq, conn)
if err != nil { if err != nil {
return err return err
} }

View File

@ -89,7 +89,7 @@ func (c *PubSubClient) subscribe(cmd string, channels ...string) (chan *Message,
return nil, err return nil, err
} }
if err := c.WriteReq(req.Req(), conn); err != nil { if err := c.WriteReq(conn, req); err != nil {
return nil, err return nil, err
} }
@ -117,7 +117,7 @@ func (c *PubSubClient) unsubscribe(cmd string, channels ...string) error {
return err return err
} }
return c.WriteReq(req.Req(), conn) return c.WriteReq(conn, req)
} }
func (c *PubSubClient) Unsubscribe(channels ...string) error { func (c *PubSubClient) Unsubscribe(channels ...string) error {

View File

@ -56,8 +56,13 @@ type BaseClient struct {
reqs []Req reqs []Req
} }
func (c *BaseClient) WriteReq(buf []byte, conn *Conn) error { func (c *BaseClient) WriteReq(conn *Conn, reqs ...Req) error {
_, err := conn.RW.Write(buf) conn.ReqBuf = conn.ReqBuf[:0]
for _, req := range reqs {
conn.ReqBuf = AppendReq(conn.ReqBuf, req.Args())
}
_, err := conn.RW.Write(conn.ReqBuf)
return err return err
} }
@ -96,7 +101,7 @@ func (c *BaseClient) Run(req Req) {
return return
} }
err = c.WriteReq(req.Req(), conn) err = c.WriteReq(conn, req)
if err != nil { if err != nil {
c.ConnPool.Remove(conn) c.ConnPool.Remove(conn)
req.SetErr(err) req.SetErr(err)

View File

@ -2412,7 +2412,7 @@ func (t *RedisTest) BenchmarkRedisWriteRead(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
err := t.client.WriteReq([]byte("PING\r\n"), conn) err := t.client.WriteReq(conn, redis.NewStatusReq("PING"))
c.Assert(err, IsNil) c.Assert(err, IsNil)
line, _, err := conn.Rd.ReadLine() line, _, err := conn.Rd.ReadLine()
@ -2423,7 +2423,7 @@ func (t *RedisTest) BenchmarkRedisWriteRead(c *C) {
c.StartTimer() c.StartTimer()
for i := 0; i < c.N; i++ { for i := 0; i < c.N; i++ {
t.client.WriteReq([]byte("PING\r\n"), conn) t.client.WriteReq(conn, redis.NewStatusReq("PING"))
conn.Rd.ReadLine() conn.Rd.ReadLine()
} }

View File

@ -5,7 +5,7 @@ import (
) )
type Req interface { type Req interface {
Req() []byte Args() []string
ParseReply(ReadLiner) (interface{}, error) ParseReply(ReadLiner) (interface{}, error)
SetErr(error) SetErr(error)
Err() error Err() error
@ -28,8 +28,8 @@ func NewBaseReq(args ...string) *BaseReq {
} }
} }
func (r *BaseReq) Req() []byte { func (r *BaseReq) Args() []string {
return PackReq(r.args) return r.args
} }
func (r *BaseReq) SetErr(err error) { func (r *BaseReq) SetErr(err error) {