From d442a3b185942ae431aa364487fe4be7a012fd2b Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 23 Oct 2014 13:34:03 +0800 Subject: [PATCH] add send and receive interface for client --- client/go/ledis/client.go | 19 ++++----- client/go/ledis/conn.go | 82 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 86 insertions(+), 15 deletions(-) diff --git a/client/go/ledis/client.go b/client/go/ledis/client.go index dfe35b8..ee1aca2 100644 --- a/client/go/ledis/client.go +++ b/client/go/ledis/client.go @@ -19,23 +19,24 @@ type Config struct { type Client struct { sync.Mutex - cfg *Config - proto string + cfg *Config conns *list.List } +func getProto(addr string) string { + if strings.Contains(addr, "/") { + return "unix" + } else { + return "tcp" + } +} + func NewClient(cfg *Config) *Client { c := new(Client) c.cfg = cfg - if strings.Contains(cfg.Addr, "/") { - c.proto = "unix" - } else { - c.proto = "tcp" - } - c.conns = list.New() return c @@ -71,7 +72,7 @@ func (c *Client) get() *Conn { if c.conns.Len() == 0 { c.Unlock() - return c.newConn() + return c.newConn(c.cfg.Addr) } else { e := c.conns.Front() co := e.Value.(*Conn) diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 688460e..527b1f3 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -19,6 +19,8 @@ func (err Error) Error() string { return string(err) } type Conn struct { client *Client + addr string + c net.Conn br *bufio.Reader bw *bufio.Writer @@ -33,25 +35,47 @@ type Conn struct { numScratch [40]byte } +func NewConn(addr string) *Conn { + co := new(Conn) + co.addr = addr + + return co +} + func (c *Conn) Close() { - c.client.put(c) + if c.client != nil { + c.client.put(c) + } else { + c.finalize() + } } func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { - if err := c.connect(); err != nil { + if err := c.Send(cmd, args...); err != nil { return nil, err } + return c.Receive() +} + +func (c *Conn) Send(cmd string, args ...interface{}) error { + if err := c.connect(); err != nil { + return err + } + if err := c.writeCommand(cmd, args); err != nil { c.finalize() - return nil, err + return err } if err := c.bw.Flush(); err != nil { c.finalize() - return nil, err + return err } + return nil +} +func (c *Conn) Receive() (interface{}, error) { if reply, err := c.readReply(); err != nil { c.finalize() return nil, err @@ -64,6 +88,16 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } } +func (c *Conn) ReceiveBulkTo(w io.Writer) error { + err := c.readBulkReplyTo(w) + if err != nil { + if _, ok := err.(Error); !ok { + c.finalize() + } + } + return err +} + func (c *Conn) finalize() { if c.c != nil { c.c.Close() @@ -77,7 +111,7 @@ func (c *Conn) connect() error { } var err error - c.c, err = net.Dial(c.client.proto, c.client.cfg.Addr) + c.c, err = net.Dial(getProto(c.addr), c.addr) if err != nil { return err } @@ -244,6 +278,41 @@ var ( pongReply interface{} = "PONG" ) +func (c *Conn) readBulkReplyTo(w io.Writer) error { + line, err := c.readLine() + if err != nil { + return err + } + if len(line) == 0 { + return errors.New("ledis: short response line") + } + switch line[0] { + case '-': + return Error(string(line[1:])) + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return err + } + + var nn int64 + if nn, err = io.CopyN(w, c.br, int64(n)); err != nil { + return err + } else if nn != int64(n) { + return io.ErrShortWrite + } + + if line, err := c.readLine(); err != nil { + return err + } else if len(line) != 0 { + return errors.New("ledis: bad bulk string format") + } + return nil + default: + return fmt.Errorf("ledis: not invalid bulk string type, but %c", line[0]) + } +} + func (c *Conn) readReply() (interface{}, error) { line, err := c.readLine() if err != nil { @@ -301,9 +370,10 @@ func (c *Conn) readReply() (interface{}, error) { return nil, errors.New("ledis: unexpected response line") } -func (c *Client) newConn() *Conn { +func (c *Client) newConn(addr string) *Conn { co := new(Conn) co.client = c + co.addr = addr return co }