From e73e87fc11e231c51555e974466dcc0d555a30ed Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 26 Jul 2012 21:43:21 +0300 Subject: [PATCH] Improve API and perfomance. --- pubsub.go | 16 ++++- redis.go | 66 +++++++++++++------ request.go | 188 ++++++++++++++++++++++++++--------------------------- 3 files changed, 154 insertions(+), 116 deletions(-) diff --git a/pubsub.go b/pubsub.go index fd3f022..379e491 100644 --- a/pubsub.go +++ b/pubsub.go @@ -31,7 +31,17 @@ func (c *PubSubClient) consumeMessages() { for { // Replies can arrive in batches. // Read whole reply and parse messages one by one. - rd, err := c.ReadReply() + + rd, err := c.readerPool.Get() + if err != nil { + msg := &Message{} + msg.Err = err + c.ch <- msg + return + } + defer c.readerPool.Add(rd) + + err = c.ReadReply(rd) if err != nil { msg := &Message{} msg.Err = err @@ -42,13 +52,13 @@ func (c *PubSubClient) consumeMessages() { for { msg := &Message{} - req.ParseReply(rd) - reply, err := req.Reply() + replyI, err := req.ParseReply(rd) if err != nil { msg.Err = err c.ch <- msg break } + reply := replyI.([]interface{}) msgName := reply[0].(string) switch msgName { diff --git a/redis.go b/redis.go index d058238..79a7b1c 100644 --- a/redis.go +++ b/redis.go @@ -11,19 +11,23 @@ import ( type connectFunc func() (io.ReadWriter, error) type disconnectFunc func(io.ReadWriter) +func createReader() (*bufreader.Reader, error) { + return bufreader.NewSizedReader(8192), nil +} + type Client struct { mtx sync.Mutex connect connectFunc disconnect disconnectFunc currConn io.ReadWriter - rd *bufreader.Reader + readerPool *bufreader.ReaderPool reqs []Req } func NewClient(connect connectFunc, disconnect disconnectFunc) *Client { return &Client{ - rd: bufreader.NewSizedReader(8192), + readerPool: bufreader.NewReaderPool(10, createReader), connect: connect, disconnect: disconnect, } @@ -31,7 +35,7 @@ func NewClient(connect connectFunc, disconnect disconnectFunc) *Client { func NewMultiClient(connect connectFunc, disconnect disconnectFunc) *Client { return &Client{ - rd: bufreader.NewSizedReader(8192), + readerPool: bufreader.NewReaderPool(10, createReader), connect: connect, disconnect: disconnect, @@ -63,6 +67,7 @@ func (c *Client) WriteReq(buf []byte) error { if err != nil { return err } + _, err = conn.Write(buf) if err != nil { c.Close() @@ -70,26 +75,28 @@ func (c *Client) WriteReq(buf []byte) error { return err } -func (c *Client) ReadReply() (*bufreader.Reader, error) { +func (c *Client) ReadReply(rd *bufreader.Reader) error { conn, err := c.conn() if err != nil { - return nil, err + return err } - _, err = c.rd.ReadFrom(conn) + + _, err = rd.ReadFrom(conn) if err != nil { c.Close() - return nil, err + return err } - return c.rd, nil + + return nil } -func (c *Client) WriteRead(buf []byte) (*bufreader.Reader, error) { +func (c *Client) WriteRead(buf []byte, rd *bufreader.Reader) error { c.mtx.Lock() defer c.mtx.Unlock() if err := c.WriteReq(buf); err != nil { - return nil, err + return err } - return c.ReadReply() + return c.ReadReply(rd) } func (c *Client) Run(req Req) { @@ -100,12 +107,25 @@ func (c *Client) Run(req Req) { return } - rd, err := c.WriteRead(req.Req()) + rd, err := c.readerPool.Get() if err != nil { req.SetErr(err) return } - req.ParseReply(rd) + defer c.readerPool.Add(rd) + + err = c.WriteRead(req.Req(), rd) + if err != nil { + req.SetErr(err) + return + } + + val, err := req.ParseReply(rd) + if err != nil { + req.SetErr(err) + return + } + req.SetVal(val) } //------------------------------------------------------------------------------ @@ -137,7 +157,13 @@ func (c *Client) Exec() ([]Req, error) { } multiReq = append(multiReq, PackReq([]string{"EXEC"})...) - rd, err := c.WriteRead(multiReq) + rd, err := c.readerPool.Get() + if err != nil { + return nil, err + } + defer c.readerPool.Add(rd) + + err = c.WriteRead(multiReq, rd) if err != nil { return nil, err } @@ -145,16 +171,14 @@ func (c *Client) Exec() ([]Req, error) { statusReq := NewStatusReq() // multi - statusReq.ParseReply(rd) - _, err = statusReq.Reply() + _, err = statusReq.ParseReply(rd) if err != nil { return nil, err } for _ = range reqs { // queue - statusReq.ParseReply(rd) - _, err = statusReq.Reply() + _, err = statusReq.ParseReply(rd) if err != nil { return nil, err } @@ -169,7 +193,11 @@ func (c *Client) Exec() ([]Req, error) { } for _, req := range reqs { - req.ParseReply(rd) + val, err := req.ParseReply(rd) + if err != nil { + req.SetErr(err) + } + req.SetVal(val) } return reqs, nil diff --git a/request.go b/request.go index 2ecb3a6..7215009 100644 --- a/request.go +++ b/request.go @@ -62,15 +62,16 @@ func PackReq(args []string) []byte { type Req interface { Req() []byte - ParseReply(*bufreader.Reader) + ParseReply(*bufreader.Reader) (interface{}, error) SetErr(error) - Err() error + SetVal(interface{}) } //------------------------------------------------------------------------------ type BaseReq struct { args []string + val interface{} err error } @@ -85,18 +86,27 @@ func (r *BaseReq) Req() []byte { } func (r *BaseReq) SetErr(err error) { + if err == nil { + panic("non-nil value expected") + } r.err = err } -func (r *BaseReq) Err() error { - return r.err +func (r *BaseReq) SetVal(val interface{}) { + if val == nil { + panic("non-nil value expected") + } + r.val = val +} + +func (r *BaseReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { + panic("abstract") } //------------------------------------------------------------------------------ type StatusReq struct { *BaseReq - val string } func NewStatusReq(args ...string) *StatusReq { @@ -105,34 +115,32 @@ func NewStatusReq(args ...string) *StatusReq { } } -func (r *StatusReq) ParseReply(rd *bufreader.Reader) { - var line []byte - - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return +func (r *StatusReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { + line, err := rd.ReadLine('\n') + if err != nil { + return nil, err } if line[0] == '-' { - r.err = errors.New(string(line[1:])) - return + return nil, errors.New(string(line[1:])) } else if line[0] != '+' { - r.err = fmt.Errorf("Expected '+', but got %q of %q.", line, rd.Bytes()) - return + return nil, fmt.Errorf("Expected '+', but got %q of %q.", line, rd.Bytes()) } - r.val = string(line[1:]) + return string(line[1:]), nil } func (r *StatusReq) Reply() (string, error) { - return r.val, r.err + if r.err != nil { + return "", r.err + } + return r.val.(string), nil } //------------------------------------------------------------------------------ type IntReq struct { *BaseReq - val int64 } func NewIntReq(args ...string) *IntReq { @@ -141,34 +149,32 @@ func NewIntReq(args ...string) *IntReq { } } -func (r *IntReq) ParseReply(rd *bufreader.Reader) { - var line []byte - - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return +func (r *IntReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { + line, err := rd.ReadLine('\n') + if err != nil { + return nil, err } if line[0] == '-' { - r.err = errors.New(string(line[1:])) - return + return nil, errors.New(string(line[1:])) } else if line[0] != ':' { - r.err = fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) - return + return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) } - r.val, r.err = strconv.ParseInt(string(line[1:]), 10, 64) + return strconv.ParseInt(string(line[1:]), 10, 64) } func (r *IntReq) Reply() (int64, error) { - return r.val, r.err + if r.err != nil { + return 0, r.err + } + return r.val.(int64), nil } //------------------------------------------------------------------------------ type BoolReq struct { *BaseReq - val bool } func NewBoolReq(args ...string) *BoolReq { @@ -177,34 +183,32 @@ func NewBoolReq(args ...string) *BoolReq { } } -func (r *BoolReq) ParseReply(rd *bufreader.Reader) { - var line []byte - - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return +func (r *BoolReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { + line, err := rd.ReadLine('\n') + if err != nil { + return nil, err } if line[0] == '-' { - r.err = errors.New(string(line[1:])) - return + return nil, errors.New(string(line[1:])) } else if line[0] != ':' { - r.err = fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) - return + return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) } - r.val = line[1] == '1' + return line[1] == '1', nil } func (r *BoolReq) Reply() (bool, error) { - return r.val, r.err + if r.err != nil { + return false, r.err + } + return r.val.(bool), nil } //------------------------------------------------------------------------------ type BulkReq struct { *BaseReq - val string } func NewBulkReq(args ...string) *BulkReq { @@ -213,44 +217,41 @@ func NewBulkReq(args ...string) *BulkReq { } } -func (r *BulkReq) ParseReply(rd *bufreader.Reader) { - var line []byte - - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return +func (r *BulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { + line, err := rd.ReadLine('\n') + if err != nil { + return nil, err } if line[0] == '-' { - r.err = errors.New(string(line[1:])) - return + return nil, errors.New(string(line[1:])) } else if line[0] != '$' { - r.err = fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) - return + return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) } if len(line) >= 3 && line[1] == '-' && line[2] == '1' { - r.err = Nil - return + return nil, Nil } - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return + line, err = rd.ReadLine('\n') + if err != nil { + return nil, err } - r.val = string(line) + return string(line), nil } func (r *BulkReq) Reply() (string, error) { - return r.val, r.err + if r.err != nil { + return "", r.err + } + return r.val.(string), nil } //------------------------------------------------------------------------------ type MultiBulkReq struct { *BaseReq - val []interface{} } func NewMultiBulkReq(args ...string) *MultiBulkReq { @@ -259,42 +260,36 @@ func NewMultiBulkReq(args ...string) *MultiBulkReq { } } -func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) { - var line []byte - - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return +func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { + line, err := rd.ReadLine('\n') + if err != nil { + return nil, err } if line[0] == '-' { - r.err = errors.New(string(line[1:])) - return + return nil, errors.New(string(line[1:])) } else if line[0] != '*' { - r.err = fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes()) - return + return nil, fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes()) } val := make([]interface{}, 0) if len(line) >= 2 && line[1] == '0' { - r.val = val - return + return val, nil } else if len(line) >= 3 && line[1] == '-' && line[2] == '1' { - r.err = Nil - return + return nil, Nil } - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return + line, err = rd.ReadLine('\n') + if err != nil { + return nil, err } for { if line[0] == ':' { var n int64 - n, r.err = strconv.ParseInt(string(line[1:]), 10, 64) - if r.err != nil { - return + n, err = strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + return nil, err } val = append(val, n) } else if line[0] == '$' { @@ -303,32 +298,37 @@ func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) { } else if len(line) >= 3 && line[1] == '-' && line[2] == '1' { val = append(val, nil) } else { - line, r.err = rd.ReadLine('\n') - if r.err != nil { - return + line, err = rd.ReadLine('\n') + if err != nil { + return nil, err } val = append(val, string(line)) } } else { - r.err = fmt.Errorf("Expected '$', but got line %q of %q.", line, rd.Bytes()) - return + return nil, fmt.Errorf("Expected '$', but got line %q of %q.", line, rd.Bytes()) } - line, r.err = rd.ReadLine('\n') - if r.err == io.EOF { - r.err = nil - break + line, err = rd.ReadLine('\n') + if err != nil { + if err == io.EOF { + break + } + return nil, err } - // Check for start of another reply. + + // Check for the header of another reply. if line[0] == '*' { rd.UnreadLine('\n') break } } - r.val = val + return val, nil } func (r *MultiBulkReq) Reply() ([]interface{}, error) { - return r.val, r.err + if r.err != nil { + return nil, r.err + } + return r.val.([]interface{}), nil }