mirror of https://github.com/ledisdb/ledisdb.git
add send and receive interface for client
This commit is contained in:
parent
041a1737c8
commit
d442a3b185
|
@ -20,22 +20,23 @@ type Client struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
cfg *Config
|
cfg *Config
|
||||||
proto string
|
|
||||||
|
|
||||||
conns *list.List
|
conns *list.List
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getProto(addr string) string {
|
||||||
|
if strings.Contains(addr, "/") {
|
||||||
|
return "unix"
|
||||||
|
} else {
|
||||||
|
return "tcp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewClient(cfg *Config) *Client {
|
func NewClient(cfg *Config) *Client {
|
||||||
c := new(Client)
|
c := new(Client)
|
||||||
|
|
||||||
c.cfg = cfg
|
c.cfg = cfg
|
||||||
|
|
||||||
if strings.Contains(cfg.Addr, "/") {
|
|
||||||
c.proto = "unix"
|
|
||||||
} else {
|
|
||||||
c.proto = "tcp"
|
|
||||||
}
|
|
||||||
|
|
||||||
c.conns = list.New()
|
c.conns = list.New()
|
||||||
|
|
||||||
return c
|
return c
|
||||||
|
@ -71,7 +72,7 @@ func (c *Client) get() *Conn {
|
||||||
if c.conns.Len() == 0 {
|
if c.conns.Len() == 0 {
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
|
|
||||||
return c.newConn()
|
return c.newConn(c.cfg.Addr)
|
||||||
} else {
|
} else {
|
||||||
e := c.conns.Front()
|
e := c.conns.Front()
|
||||||
co := e.Value.(*Conn)
|
co := e.Value.(*Conn)
|
||||||
|
|
|
@ -19,6 +19,8 @@ func (err Error) Error() string { return string(err) }
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
client *Client
|
client *Client
|
||||||
|
|
||||||
|
addr string
|
||||||
|
|
||||||
c net.Conn
|
c net.Conn
|
||||||
br *bufio.Reader
|
br *bufio.Reader
|
||||||
bw *bufio.Writer
|
bw *bufio.Writer
|
||||||
|
@ -33,25 +35,47 @@ type Conn struct {
|
||||||
numScratch [40]byte
|
numScratch [40]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewConn(addr string) *Conn {
|
||||||
|
co := new(Conn)
|
||||||
|
co.addr = addr
|
||||||
|
|
||||||
|
return co
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) Close() {
|
func (c *Conn) Close() {
|
||||||
|
if c.client != nil {
|
||||||
c.client.put(c)
|
c.client.put(c)
|
||||||
|
} else {
|
||||||
|
c.finalize()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||||
if err := c.connect(); err != nil {
|
if err := c.Send(cmd, args...); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return c.Receive()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Send(cmd string, args ...interface{}) error {
|
||||||
|
if err := c.connect(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := c.writeCommand(cmd, args); err != nil {
|
if err := c.writeCommand(cmd, args); err != nil {
|
||||||
c.finalize()
|
c.finalize()
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.bw.Flush(); err != nil {
|
if err := c.bw.Flush(); err != nil {
|
||||||
c.finalize()
|
c.finalize()
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Receive() (interface{}, error) {
|
||||||
if reply, err := c.readReply(); err != nil {
|
if reply, err := c.readReply(); err != nil {
|
||||||
c.finalize()
|
c.finalize()
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -64,6 +88,16 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) ReceiveBulkTo(w io.Writer) error {
|
||||||
|
err := c.readBulkReplyTo(w)
|
||||||
|
if err != nil {
|
||||||
|
if _, ok := err.(Error); !ok {
|
||||||
|
c.finalize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) finalize() {
|
func (c *Conn) finalize() {
|
||||||
if c.c != nil {
|
if c.c != nil {
|
||||||
c.c.Close()
|
c.c.Close()
|
||||||
|
@ -77,7 +111,7 @@ func (c *Conn) connect() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
c.c, err = net.Dial(c.client.proto, c.client.cfg.Addr)
|
c.c, err = net.Dial(getProto(c.addr), c.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -244,6 +278,41 @@ var (
|
||||||
pongReply interface{} = "PONG"
|
pongReply interface{} = "PONG"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (c *Conn) readBulkReplyTo(w io.Writer) error {
|
||||||
|
line, err := c.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(line) == 0 {
|
||||||
|
return errors.New("ledis: short response line")
|
||||||
|
}
|
||||||
|
switch line[0] {
|
||||||
|
case '-':
|
||||||
|
return Error(string(line[1:]))
|
||||||
|
case '$':
|
||||||
|
n, err := parseLen(line[1:])
|
||||||
|
if n < 0 || err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var nn int64
|
||||||
|
if nn, err = io.CopyN(w, c.br, int64(n)); err != nil {
|
||||||
|
return err
|
||||||
|
} else if nn != int64(n) {
|
||||||
|
return io.ErrShortWrite
|
||||||
|
}
|
||||||
|
|
||||||
|
if line, err := c.readLine(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if len(line) != 0 {
|
||||||
|
return errors.New("ledis: bad bulk string format")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("ledis: not invalid bulk string type, but %c", line[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) readReply() (interface{}, error) {
|
func (c *Conn) readReply() (interface{}, error) {
|
||||||
line, err := c.readLine()
|
line, err := c.readLine()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -301,9 +370,10 @@ func (c *Conn) readReply() (interface{}, error) {
|
||||||
return nil, errors.New("ledis: unexpected response line")
|
return nil, errors.New("ledis: unexpected response line")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) newConn() *Conn {
|
func (c *Client) newConn(addr string) *Conn {
|
||||||
co := new(Conn)
|
co := new(Conn)
|
||||||
co.client = c
|
co.client = c
|
||||||
|
co.addr = addr
|
||||||
|
|
||||||
return co
|
return co
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue