Improve API and perfomance.

This commit is contained in:
Vladimir Mihailenco 2012-07-26 21:43:21 +03:00
parent 153bd05fb7
commit e73e87fc11
3 changed files with 154 additions and 116 deletions

View File

@ -31,7 +31,17 @@ func (c *PubSubClient) consumeMessages() {
for { for {
// Replies can arrive in batches. // Replies can arrive in batches.
// Read whole reply and parse messages one by one. // Read whole reply and parse messages one by one.
rd, err := c.ReadReply()
rd, err := c.readerPool.Get()
if err != nil {
msg := &Message{}
msg.Err = err
c.ch <- msg
return
}
defer c.readerPool.Add(rd)
err = c.ReadReply(rd)
if err != nil { if err != nil {
msg := &Message{} msg := &Message{}
msg.Err = err msg.Err = err
@ -42,13 +52,13 @@ func (c *PubSubClient) consumeMessages() {
for { for {
msg := &Message{} msg := &Message{}
req.ParseReply(rd) replyI, err := req.ParseReply(rd)
reply, err := req.Reply()
if err != nil { if err != nil {
msg.Err = err msg.Err = err
c.ch <- msg c.ch <- msg
break break
} }
reply := replyI.([]interface{})
msgName := reply[0].(string) msgName := reply[0].(string)
switch msgName { switch msgName {

View File

@ -11,19 +11,23 @@ import (
type connectFunc func() (io.ReadWriter, error) type connectFunc func() (io.ReadWriter, error)
type disconnectFunc func(io.ReadWriter) type disconnectFunc func(io.ReadWriter)
func createReader() (*bufreader.Reader, error) {
return bufreader.NewSizedReader(8192), nil
}
type Client struct { type Client struct {
mtx sync.Mutex mtx sync.Mutex
connect connectFunc connect connectFunc
disconnect disconnectFunc disconnect disconnectFunc
currConn io.ReadWriter currConn io.ReadWriter
rd *bufreader.Reader readerPool *bufreader.ReaderPool
reqs []Req reqs []Req
} }
func NewClient(connect connectFunc, disconnect disconnectFunc) *Client { func NewClient(connect connectFunc, disconnect disconnectFunc) *Client {
return &Client{ return &Client{
rd: bufreader.NewSizedReader(8192), readerPool: bufreader.NewReaderPool(10, createReader),
connect: connect, connect: connect,
disconnect: disconnect, disconnect: disconnect,
} }
@ -31,7 +35,7 @@ func NewClient(connect connectFunc, disconnect disconnectFunc) *Client {
func NewMultiClient(connect connectFunc, disconnect disconnectFunc) *Client { func NewMultiClient(connect connectFunc, disconnect disconnectFunc) *Client {
return &Client{ return &Client{
rd: bufreader.NewSizedReader(8192), readerPool: bufreader.NewReaderPool(10, createReader),
connect: connect, connect: connect,
disconnect: disconnect, disconnect: disconnect,
@ -63,6 +67,7 @@ func (c *Client) WriteReq(buf []byte) error {
if err != nil { if err != nil {
return err return err
} }
_, err = conn.Write(buf) _, err = conn.Write(buf)
if err != nil { if err != nil {
c.Close() c.Close()
@ -70,26 +75,28 @@ func (c *Client) WriteReq(buf []byte) error {
return err return err
} }
func (c *Client) ReadReply() (*bufreader.Reader, error) { func (c *Client) ReadReply(rd *bufreader.Reader) error {
conn, err := c.conn() conn, err := c.conn()
if err != nil { if err != nil {
return nil, err return err
} }
_, err = c.rd.ReadFrom(conn)
_, err = rd.ReadFrom(conn)
if err != nil { if err != nil {
c.Close() c.Close()
return nil, err return err
} }
return c.rd, nil
return nil
} }
func (c *Client) WriteRead(buf []byte) (*bufreader.Reader, error) { func (c *Client) WriteRead(buf []byte, rd *bufreader.Reader) error {
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
if err := c.WriteReq(buf); err != nil { if err := c.WriteReq(buf); err != nil {
return nil, err return err
} }
return c.ReadReply() return c.ReadReply(rd)
} }
func (c *Client) Run(req Req) { func (c *Client) Run(req Req) {
@ -100,12 +107,25 @@ func (c *Client) Run(req Req) {
return return
} }
rd, err := c.WriteRead(req.Req()) rd, err := c.readerPool.Get()
if err != nil { if err != nil {
req.SetErr(err) req.SetErr(err)
return return
} }
req.ParseReply(rd) defer c.readerPool.Add(rd)
err = c.WriteRead(req.Req(), rd)
if err != nil {
req.SetErr(err)
return
}
val, err := req.ParseReply(rd)
if err != nil {
req.SetErr(err)
return
}
req.SetVal(val)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -137,7 +157,13 @@ func (c *Client) Exec() ([]Req, error) {
} }
multiReq = append(multiReq, PackReq([]string{"EXEC"})...) multiReq = append(multiReq, PackReq([]string{"EXEC"})...)
rd, err := c.WriteRead(multiReq) rd, err := c.readerPool.Get()
if err != nil {
return nil, err
}
defer c.readerPool.Add(rd)
err = c.WriteRead(multiReq, rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -145,16 +171,14 @@ func (c *Client) Exec() ([]Req, error) {
statusReq := NewStatusReq() statusReq := NewStatusReq()
// multi // multi
statusReq.ParseReply(rd) _, err = statusReq.ParseReply(rd)
_, err = statusReq.Reply()
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _ = range reqs { for _ = range reqs {
// queue // queue
statusReq.ParseReply(rd) _, err = statusReq.ParseReply(rd)
_, err = statusReq.Reply()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -169,7 +193,11 @@ func (c *Client) Exec() ([]Req, error) {
} }
for _, req := range reqs { for _, req := range reqs {
req.ParseReply(rd) val, err := req.ParseReply(rd)
if err != nil {
req.SetErr(err)
}
req.SetVal(val)
} }
return reqs, nil return reqs, nil

View File

@ -62,15 +62,16 @@ func PackReq(args []string) []byte {
type Req interface { type Req interface {
Req() []byte Req() []byte
ParseReply(*bufreader.Reader) ParseReply(*bufreader.Reader) (interface{}, error)
SetErr(error) SetErr(error)
Err() error SetVal(interface{})
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type BaseReq struct { type BaseReq struct {
args []string args []string
val interface{}
err error err error
} }
@ -85,18 +86,27 @@ func (r *BaseReq) Req() []byte {
} }
func (r *BaseReq) SetErr(err error) { func (r *BaseReq) SetErr(err error) {
if err == nil {
panic("non-nil value expected")
}
r.err = err r.err = err
} }
func (r *BaseReq) Err() error { func (r *BaseReq) SetVal(val interface{}) {
return r.err if val == nil {
panic("non-nil value expected")
}
r.val = val
}
func (r *BaseReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
panic("abstract")
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type StatusReq struct { type StatusReq struct {
*BaseReq *BaseReq
val string
} }
func NewStatusReq(args ...string) *StatusReq { func NewStatusReq(args ...string) *StatusReq {
@ -105,34 +115,32 @@ func NewStatusReq(args ...string) *StatusReq {
} }
} }
func (r *StatusReq) ParseReply(rd *bufreader.Reader) { func (r *StatusReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
var line []byte line, err := rd.ReadLine('\n')
if err != nil {
line, r.err = rd.ReadLine('\n') return nil, err
if r.err != nil {
return
} }
if line[0] == '-' { if line[0] == '-' {
r.err = errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
return
} else if line[0] != '+' { } else if line[0] != '+' {
r.err = fmt.Errorf("Expected '+', but got %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected '+', but got %q of %q.", line, rd.Bytes())
return
} }
r.val = string(line[1:]) return string(line[1:]), nil
} }
func (r *StatusReq) Reply() (string, error) { func (r *StatusReq) Reply() (string, error) {
return r.val, r.err if r.err != nil {
return "", r.err
}
return r.val.(string), nil
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type IntReq struct { type IntReq struct {
*BaseReq *BaseReq
val int64
} }
func NewIntReq(args ...string) *IntReq { func NewIntReq(args ...string) *IntReq {
@ -141,34 +149,32 @@ func NewIntReq(args ...string) *IntReq {
} }
} }
func (r *IntReq) ParseReply(rd *bufreader.Reader) { func (r *IntReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
var line []byte line, err := rd.ReadLine('\n')
if err != nil {
line, r.err = rd.ReadLine('\n') return nil, err
if r.err != nil {
return
} }
if line[0] == '-' { if line[0] == '-' {
r.err = errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
return
} else if line[0] != ':' { } else if line[0] != ':' {
r.err = fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes())
return
} }
r.val, r.err = strconv.ParseInt(string(line[1:]), 10, 64) return strconv.ParseInt(string(line[1:]), 10, 64)
} }
func (r *IntReq) Reply() (int64, error) { func (r *IntReq) Reply() (int64, error) {
return r.val, r.err if r.err != nil {
return 0, r.err
}
return r.val.(int64), nil
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type BoolReq struct { type BoolReq struct {
*BaseReq *BaseReq
val bool
} }
func NewBoolReq(args ...string) *BoolReq { func NewBoolReq(args ...string) *BoolReq {
@ -177,34 +183,32 @@ func NewBoolReq(args ...string) *BoolReq {
} }
} }
func (r *BoolReq) ParseReply(rd *bufreader.Reader) { func (r *BoolReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
var line []byte line, err := rd.ReadLine('\n')
if err != nil {
line, r.err = rd.ReadLine('\n') return nil, err
if r.err != nil {
return
} }
if line[0] == '-' { if line[0] == '-' {
r.err = errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
return
} else if line[0] != ':' { } else if line[0] != ':' {
r.err = fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes())
return
} }
r.val = line[1] == '1' return line[1] == '1', nil
} }
func (r *BoolReq) Reply() (bool, error) { func (r *BoolReq) Reply() (bool, error) {
return r.val, r.err if r.err != nil {
return false, r.err
}
return r.val.(bool), nil
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type BulkReq struct { type BulkReq struct {
*BaseReq *BaseReq
val string
} }
func NewBulkReq(args ...string) *BulkReq { func NewBulkReq(args ...string) *BulkReq {
@ -213,44 +217,41 @@ func NewBulkReq(args ...string) *BulkReq {
} }
} }
func (r *BulkReq) ParseReply(rd *bufreader.Reader) { func (r *BulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
var line []byte line, err := rd.ReadLine('\n')
if err != nil {
line, r.err = rd.ReadLine('\n') return nil, err
if r.err != nil {
return
} }
if line[0] == '-' { if line[0] == '-' {
r.err = errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
return
} else if line[0] != '$' { } else if line[0] != '$' {
r.err = fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes())
return
} }
if len(line) >= 3 && line[1] == '-' && line[2] == '1' { if len(line) >= 3 && line[1] == '-' && line[2] == '1' {
r.err = Nil return nil, Nil
return
} }
line, r.err = rd.ReadLine('\n') line, err = rd.ReadLine('\n')
if r.err != nil { if err != nil {
return return nil, err
} }
r.val = string(line) return string(line), nil
} }
func (r *BulkReq) Reply() (string, error) { func (r *BulkReq) Reply() (string, error) {
return r.val, r.err if r.err != nil {
return "", r.err
}
return r.val.(string), nil
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type MultiBulkReq struct { type MultiBulkReq struct {
*BaseReq *BaseReq
val []interface{}
} }
func NewMultiBulkReq(args ...string) *MultiBulkReq { func NewMultiBulkReq(args ...string) *MultiBulkReq {
@ -259,42 +260,36 @@ func NewMultiBulkReq(args ...string) *MultiBulkReq {
} }
} }
func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) { func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
var line []byte line, err := rd.ReadLine('\n')
if err != nil {
line, r.err = rd.ReadLine('\n') return nil, err
if r.err != nil {
return
} }
if line[0] == '-' { if line[0] == '-' {
r.err = errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
return
} else if line[0] != '*' { } else if line[0] != '*' {
r.err = fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes())
return
} }
val := make([]interface{}, 0) val := make([]interface{}, 0)
if len(line) >= 2 && line[1] == '0' { if len(line) >= 2 && line[1] == '0' {
r.val = val return val, nil
return
} else if len(line) >= 3 && line[1] == '-' && line[2] == '1' { } else if len(line) >= 3 && line[1] == '-' && line[2] == '1' {
r.err = Nil return nil, Nil
return
} }
line, r.err = rd.ReadLine('\n') line, err = rd.ReadLine('\n')
if r.err != nil { if err != nil {
return return nil, err
} }
for { for {
if line[0] == ':' { if line[0] == ':' {
var n int64 var n int64
n, r.err = strconv.ParseInt(string(line[1:]), 10, 64) n, err = strconv.ParseInt(string(line[1:]), 10, 64)
if r.err != nil { if err != nil {
return return nil, err
} }
val = append(val, n) val = append(val, n)
} else if line[0] == '$' { } else if line[0] == '$' {
@ -303,32 +298,37 @@ func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) {
} else if len(line) >= 3 && line[1] == '-' && line[2] == '1' { } else if len(line) >= 3 && line[1] == '-' && line[2] == '1' {
val = append(val, nil) val = append(val, nil)
} else { } else {
line, r.err = rd.ReadLine('\n') line, err = rd.ReadLine('\n')
if r.err != nil { if err != nil {
return return nil, err
} }
val = append(val, string(line)) val = append(val, string(line))
} }
} else { } else {
r.err = fmt.Errorf("Expected '$', but got line %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected '$', but got line %q of %q.", line, rd.Bytes())
return
} }
line, r.err = rd.ReadLine('\n') line, err = rd.ReadLine('\n')
if r.err == io.EOF { if err != nil {
r.err = nil if err == io.EOF {
break break
}
return nil, err
} }
// Check for start of another reply.
// Check for the header of another reply.
if line[0] == '*' { if line[0] == '*' {
rd.UnreadLine('\n') rd.UnreadLine('\n')
break break
} }
} }
r.val = val return val, nil
} }
func (r *MultiBulkReq) Reply() ([]interface{}, error) { func (r *MultiBulkReq) Reply() ([]interface{}, error) {
return r.val, r.err if r.err != nil {
return nil, r.err
}
return r.val.([]interface{}), nil
} }