commit a74d9a5801560e5e95c67a276df30ee127ca9e8b Author: Vladimir Mihailenco Date: Wed Jul 25 16:00:50 2012 +0300 Initial commit. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ffba868 --- /dev/null +++ b/README.md @@ -0,0 +1,104 @@ +Readme +====== + +Sync Redis client for Golang. + +Usage +----- + +Example: + + connect := func() (io.ReadWriter, error) { + fmt.Println("Connecting...") + return net.Dial("tcp", "localhost:6379") + } + + disconnect := func(conn io.ReadWriter) error { + fmt.Println("Disconnecting...") + conn.Close() + return nil + } + + redisClient = redis.NewClient(connect, disconnect) + + _, err := redisClient.Set("foo", "bar").Reply() + if err != nil { + panic(err) + } + + value, err := redisClient.Get("foo").Reply() + if err != nil { + panic(err) + } + +Multi/Exec +---------- + +Example 1: + + multiClient := redisClient.Multi() + futureGet1 := multiClient.Get("foo1") + futureGet2 := multiClient.Get("foo2") + _, err := multiClient.Exec() + if err != nil { + panic(err) + } + + value1, err := futureGet1.Reply() + if err != nil { + panic(err) + } + + value2, err := futureGet2.Reply() + if err != nil { + panic(err) + } + +Example 2: + + multiClient := redisClient.Multi() + multiClient.Get("foo1") + multiClient.Get("foo2") + reqs, err := multiClient.Exec() + if err != nil { + panic(err) + } + + for req := range reqs { + value, err := req.Reply() + if err != nil { + panic(err) + } + } + +Pub/sub +------- + +Publish: + + _, err := redisClient.Publish("mychannel", "hello").Reply() + if err != nil { + panic(err) + } + +Subscribe: + + pubsub := redisClient.PubSubClient() + ch, err := pubsub.Subscribe("mychannel") + if err != nil { + panic(err) + } + + go func() { + for msg := range ch { + if msg.Err != nil { + panic(err) + } + fmt.Println(msg.Message) + } + } + +Thread safety +------------- + +Client is thread safe. Internally sync.Mutex is used to synchronize writes and reads. diff --git a/commands.go b/commands.go new file mode 100644 index 0000000..2e28b7a --- /dev/null +++ b/commands.go @@ -0,0 +1,162 @@ +package redis + +import ( + "strconv" +) + +//------------------------------------------------------------------------------ + +func (c *Client) Ping() *StatusReq { + req := NewStatusReq("PING") + c.Run(req) + return req +} + +func (c *Client) Flushall() *StatusReq { + req := NewStatusReq("FLUSHALL") + c.Run(req) + return req +} + +func (c *Client) Flushdb() *StatusReq { + req := NewStatusReq("FLUSHDB") + c.Run(req) + return req +} + +//------------------------------------------------------------------------------ + +func (c *Client) Get(key string) *BulkReq { + req := NewBulkReq("GET", key) + c.Run(req) + return req +} + +func (c *Client) Set(key, value string) *StatusReq { + req := NewStatusReq("SET", key, value) + c.Run(req) + return req +} + +func (c *Client) Auth(password string) *StatusReq { + req := NewStatusReq("AUTH", password) + c.Run(req) + return req +} + +//------------------------------------------------------------------------------ + +func (c *Client) Sadd(key string, members ...string) *IntReq { + args := append([]string{"SADD", key}, members...) + req := NewIntReq(args...) + c.Run(req) + return req +} + +func (c *Client) Srem(key string, members ...string) *IntReq { + args := append([]string{"SREM", key}, members...) + req := NewIntReq(args...) + c.Run(req) + return req +} + +func (c *Client) Smembers(key string) *MultiBulkReq { + req := NewMultiBulkReq("SMEMBERS", key) + c.Run(req) + return req +} + +//------------------------------------------------------------------------------ + +func (c *Client) Multi() *MultiClient { + return NewMultiClient(c.connect, c.disconnect) +} + +//------------------------------------------------------------------------------ + +func (c *Client) PubSubClient() *PubSubClient { + return NewPubSubClient(c.connect, c.disconnect) +} + +func (c *Client) Publish(channel, message string) *IntReq { + req := NewIntReq("PUBLISH", channel, message) + c.Run(req) + return req +} + +//------------------------------------------------------------------------------ + +func (c *Client) Hset(key, field, value string) *BoolReq { + req := NewBoolReq("HSET", key, field, value) + c.Run(req) + return req +} + +func (c *Client) Hsetnx(key, field, value string) *BoolReq { + req := NewBoolReq("HSETNX", key, field, value) + c.Run(req) + return req +} + +func (c *Client) Hmset(key, field, value string, pairs ...string) *StatusReq { + args := append([]string{"HMSET", key, field, value}, pairs...) + req := NewStatusReq(args...) + c.Run(req) + return req +} + +func (c *Client) Hget(key, field string) *BulkReq { + req := NewBulkReq("HGET", key, field) + c.Run(req) + return req +} + +func (c *Client) Hmget(key string, fields ...string) *MultiBulkReq { + args := append([]string{"HMGET", key}, fields...) + req := NewMultiBulkReq(args...) + c.Run(req) + return req +} + +func (c *Client) Hexists(key, field string) *BoolReq { + req := NewBoolReq("HEXISTS", key, field) + c.Run(req) + return req +} + +func (c *Client) Hdel(key string, fields ...string) *IntReq { + args := append([]string{"HDEL", key}, fields...) + req := NewIntReq(args...) + c.Run(req) + return req +} + +func (c *Client) Hlen(key string) *IntReq { + req := NewIntReq("HLEN", key) + c.Run(req) + return req +} + +func (c *Client) Hgetall(key string) *MultiBulkReq { + req := NewMultiBulkReq("HGETALL", key) + c.Run(req) + return req +} + +func (c *Client) Hkeys(key string) *MultiBulkReq { + req := NewMultiBulkReq("HKEYS", key) + c.Run(req) + return req +} + +func (c *Client) Hvals(key string) *MultiBulkReq { + req := NewMultiBulkReq("HVALS", key) + c.Run(req) + return req +} + +func (c *Client) Hincrby(key, field string, incr int64) *IntReq { + req := NewIntReq("HINCRBY", key, field, strconv.FormatInt(incr, 10)) + c.Run(req) + return req +} diff --git a/multi.go b/multi.go new file mode 100644 index 0000000..20b2dcc --- /dev/null +++ b/multi.go @@ -0,0 +1,48 @@ +package redis + +type MultiClient struct { + *Client + reqs []Req +} + +func NewMultiClient(connect connectFunc, disconnect disconnectFunc) *MultiClient { + return &MultiClient{ + Client: NewClient(connect, disconnect), + reqs: make([]Req, 0), + } + +} + +func (c *MultiClient) queueReq(req Req) { + c.reqs = append(c.reqs, req) +} + +func (c *MultiClient) run(req Req) { + c.queueReq(req) +} + +func (c *MultiClient) Exec() ([]Req, error) { + c.mtx.Lock() + defer c.mtx.Unlock() + + reqs := c.reqs + c.reqs = make([]Req, 0) + + multiReq := make([]byte, 0, 8192) + multiReq = append(multiReq, PackReq([]string{"MULTI"})...) + for _, req := range reqs { + multiReq = append(multiReq, req.Req()...) + } + multiReq = append(multiReq, PackReq([]string{"EXEC"})...) + + buf, err := c.WriteRead(multiReq) + if err != nil { + return nil, err + } + + for _, req := range reqs { + req.ParseReply(buf) + } + + return reqs, nil +} diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 0000000..fd3f022 --- /dev/null +++ b/pubsub.go @@ -0,0 +1,97 @@ +package redis + +import ( + "fmt" +) + +type PubSubClient struct { + *Client + isSubscribed bool + ch chan *Message +} + +func NewPubSubClient(connect connectFunc, disconnect disconnectFunc) *PubSubClient { + c := &PubSubClient{ + Client: NewClient(connect, disconnect), + ch: make(chan *Message), + } + return c +} + +type Message struct { + Name, Channel, Message string + Number int64 + + Err error +} + +func (c *PubSubClient) consumeMessages() { + req := NewMultiBulkReq() + + for { + // Replies can arrive in batches. + // Read whole reply and parse messages one by one. + rd, err := c.ReadReply() + if err != nil { + msg := &Message{} + msg.Err = err + c.ch <- msg + return + } + + for { + msg := &Message{} + + req.ParseReply(rd) + reply, err := req.Reply() + if err != nil { + msg.Err = err + c.ch <- msg + break + } + + msgName := reply[0].(string) + switch msgName { + case "subscribe", "unsubscribe": + msg.Name = msgName + msg.Channel = reply[1].(string) + msg.Number = reply[2].(int64) + case "message": + msg.Name = msgName + msg.Channel = reply[1].(string) + msg.Message = reply[2].(string) + default: + msg.Err = fmt.Errorf("Unsupported message name: %q.", msgName) + } + c.ch <- msg + + if !rd.HasUnread() { + break + } + } + } +} + +func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) { + args := append([]string{"SUBSCRIBE"}, channels...) + req := NewMultiBulkReq(args...) + + if err := c.WriteReq(req.Req()); err != nil { + return nil, err + } + + c.mtx.Lock() + if !c.isSubscribed { + c.isSubscribed = true + go c.consumeMessages() + } + c.mtx.Unlock() + + return c.ch, nil +} + +func (c *PubSubClient) Unsubscribe(channels ...string) error { + args := append([]string{"UNSUBSCRIBE"}, channels...) + req := NewMultiBulkReq(args...) + return c.WriteReq(req.Req()) +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..4b606a1 --- /dev/null +++ b/redis.go @@ -0,0 +1,93 @@ +package redis + +import ( + "io" + "sync" + + "github.com/togoio/redisgoproxy/bufreader" +) + +type connectFunc func() (io.ReadWriter, error) +type disconnectFunc func(io.ReadWriter) + +type Client struct { + mtx sync.Mutex + connect connectFunc + disconnect disconnectFunc + currConn io.ReadWriter + rd *bufreader.Reader +} + +func NewClient(connect connectFunc, disconnect disconnectFunc) *Client { + return &Client{ + rd: bufreader.NewSizedReader(8192), + connect: connect, + disconnect: disconnect, + } +} + +func (c *Client) Close() error { + if c.disconnect != nil { + c.disconnect(c.currConn) + } + c.currConn = nil + return nil +} + +func (c *Client) conn() (io.ReadWriter, error) { + if c.currConn == nil { + currConn, err := c.connect() + if err != nil { + return nil, err + } + c.currConn = currConn + } + return c.currConn, nil +} + +func (c *Client) WriteReq(buf []byte) error { + conn, err := c.conn() + if err != nil { + return err + } + _, err = conn.Write(buf) + if err != nil { + c.Close() + } + return err +} + +func (c *Client) ReadReply() (*bufreader.Reader, error) { + conn, err := c.conn() + if err != nil { + return nil, err + } + _, err = c.rd.ReadFrom(conn) + if err != nil { + c.Close() + return nil, err + } + return c.rd, nil +} + +func (c *Client) WriteRead(buf []byte) (*bufreader.Reader, error) { + if err := c.WriteReq(buf); err != nil { + return nil, err + } + return c.ReadReply() +} + +func (c *Client) Run(req Req) { + c.mtx.Lock() + c.run(req) + c.mtx.Unlock() +} + +func (c *Client) run(req Req) { + buf, err := c.WriteRead(req.Req()) + if err != nil { + req.SetErr(err) + return + } + req.ParseReply(buf) +} diff --git a/redis_test.go b/redis_test.go new file mode 100644 index 0000000..0ee5c3f --- /dev/null +++ b/redis_test.go @@ -0,0 +1,262 @@ +package redis_test + +import ( + "io" + "net" + "testing" + "time" + + . "launchpad.net/gocheck" + + "github.com/togoio/redisgoproxy/redis" +) + +//------------------------------------------------------------------------------ + +type RedisTest struct { + redisC *redis.Client +} + +var _ = Suite(&RedisTest{}) + +func Test(t *testing.T) { TestingT(t) } + +//------------------------------------------------------------------------------ + +func (t *RedisTest) SetUpTest(c *C) { + connect := func() (io.ReadWriter, error) { + return net.Dial("tcp", "localhost:6379") + } + + t.redisC = redis.NewClient(connect, nil) + t.redisC.Flushdb() +} + +func (t *RedisTest) TearDownTest(c *C) { + t.redisC.Flushdb() +} + +func (t *RedisTest) TestPing(c *C) { + _, err := t.redisC.Ping().Reply() + c.Check(err, IsNil) +} + +func (t *RedisTest) TestSetGet(c *C) { + key := "foo" + value := "bar" + + _, err := t.redisC.Set(key, value).Reply() + c.Check(err, IsNil) + + v, err := t.redisC.Get("foo").Reply() + c.Check(err, IsNil) + c.Check(v, Equals, value) + + _, err = t.redisC.Get("_").Reply() + c.Check(err, Equals, redis.Nil) +} + +func (t *RedisTest) TestHmsetHmget(c *C) { + _, err := t.redisC.Hmset("myhash", "foo1", "bar1", "foo2", "bar2").Reply() + c.Check(err, IsNil) + + pairs, err := t.redisC.Hmget("myhash", "foo1", "foo3", "foo2").Reply() + c.Check(err, IsNil) + c.Check(pairs, HasLen, 3) + c.Check(pairs[0], Equals, "bar1") + c.Check(pairs[1], Equals, nil) + c.Check(pairs[2], Equals, "bar2") +} + +func (t *RedisTest) TestSet(c *C) { + n, err := t.redisC.Sadd("myset", "foo").Reply() + c.Check(err, IsNil) + c.Check(n, Equals, int64(1)) + + members, err := t.redisC.Smembers("myset").Reply() + c.Check(err, IsNil) + c.Check(members, HasLen, 1) + c.Check(members[0], Equals, "foo") + + n, err = t.redisC.Srem("myset", "foo").Reply() + c.Check(err, IsNil) + c.Check(n, Equals, int64(1)) +} + +func (t *RedisTest) TestHsetnx(c *C) { + wasSet, err := t.redisC.Hsetnx("myhash", "foo1", "bar1").Reply() + c.Check(err, IsNil) + c.Check(wasSet, Equals, true) + + wasSet, err = t.redisC.Hsetnx("myhash", "foo1", "bar1").Reply() + c.Check(err, IsNil) + c.Check(wasSet, Equals, false) +} + +func (t *RedisTest) TestHash(c *C) { + _, err := t.redisC.Hset("myhash", "foo", "bar").Reply() + c.Check(err, IsNil) + + v, err := t.redisC.Hget("myhash", "foo").Reply() + c.Check(err, IsNil) + c.Check(v, Equals, "bar") + + exists, err := t.redisC.Hexists("myhash", "foo").Reply() + c.Check(err, IsNil) + c.Check(exists, Equals, true) + + hlen, err := t.redisC.Hlen("myhash").Reply() + c.Check(err, IsNil) + c.Check(hlen, Equals, int64(1)) + + res, err := t.redisC.Hgetall("myhash").Reply() + c.Check(err, IsNil) + c.Check(res, HasLen, 2) + c.Check(res[0], Equals, "foo") + c.Check(res[1], Equals, "bar") + + keys, err := t.redisC.Hkeys("myhash").Reply() + c.Check(err, IsNil) + c.Check(keys, HasLen, 1) + c.Check(keys[0], Equals, "foo") + + vals, err := t.redisC.Hvals("myhash").Reply() + c.Check(err, IsNil) + c.Check(vals, HasLen, 1) + c.Check(vals[0], Equals, "bar") + + n, err := t.redisC.Hdel("myhash", "foo").Reply() + c.Check(err, IsNil) + c.Check(n, Equals, int64(1)) + + wasSet, err := t.redisC.Hset("myhash", "counter", "0").Reply() + c.Check(err, IsNil) + c.Check(wasSet, Equals, true) + + counter, err := t.redisC.Hincrby("myhash", "counter", 1).Reply() + c.Check(err, IsNil) + c.Check(counter, Equals, int64(1)) +} + +func (t *RedisTest) TestPubSub(c *C) { + pubsub := t.redisC.PubSubClient() + + ch, err := pubsub.Subscribe("mychannel") + c.Check(err, IsNil) + c.Check(ch, Not(Equals), nil) + + ch, err = pubsub.Subscribe("mychannel2") + c.Check(err, IsNil) + c.Check(ch, Not(Equals), nil) + + n, err := t.redisC.Publish("mychannel", "hello").Reply() + c.Check(err, IsNil) + c.Check(n, Equals, int64(1)) + + n, err = t.redisC.Publish("mychannel2", "hello2").Reply() + c.Check(err, IsNil) + c.Check(n, Equals, int64(1)) + + err = pubsub.Unsubscribe("mychannel") + c.Check(err, IsNil) + + err = pubsub.Unsubscribe("mychannel2") + c.Check(err, IsNil) + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "subscribe") + c.Check(msg.Channel, Equals, "mychannel") + c.Check(msg.Number, Equals, int64(1)) + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "subscribe") + c.Check(msg.Channel, Equals, "mychannel2") + c.Check(msg.Number, Equals, int64(2)) + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "message") + c.Check(msg.Channel, Equals, "mychannel") + c.Check(msg.Message, Equals, "hello") + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "message") + c.Check(msg.Channel, Equals, "mychannel2") + c.Check(msg.Message, Equals, "hello2") + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "unsubscribe") + c.Check(msg.Channel, Equals, "mychannel") + c.Check(msg.Number, Equals, int64(1)) + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "unsubscribe") + c.Check(msg.Channel, Equals, "mychannel2") + c.Check(msg.Number, Equals, int64(0)) + case <-time.After(time.Second): + c.Error("Channel is empty.") + } +} + +func (t *RedisTest) TestMultiExec(c *C) { + multiC := t.redisC.Multi() + + setR := multiC.Set("foo", "bar") + getR := multiC.Get("foo") + + _, err := multiC.Exec() + c.Check(err, IsNil) + + _, err = setR.Reply() + c.Check(err, IsNil) + + v, err := getR.Reply() + c.Check(err, IsNil) + c.Check(v, Equals, "bar") +} + +//------------------------------------------------------------------------------ + +func (t *RedisTest) BenchmarkPing(c *C) { + for i := 0; i < c.N; i++ { + t.redisC.Ping().Reply() + } +} + +func (t *RedisTest) BenchmarkSet(c *C) { + for i := 0; i < c.N; i++ { + t.redisC.Set("foo", "bar").Reply() + } +} + +func (t *RedisTest) BenchmarkGet(c *C) { + for i := 0; i < c.N; i++ { + t.redisC.Get("foo").Reply() + } +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..2ecb3a6 --- /dev/null +++ b/request.go @@ -0,0 +1,334 @@ +package redis + +import ( + "errors" + "fmt" + "io" + "strconv" + + "github.com/togoio/redisgoproxy/bufreader" +) + +var Nil = errors.New("(nil)") + +//------------------------------------------------------------------------------ + +func ParseReq(rd *bufreader.Reader) ([]string, error) { + line, err := rd.ReadLine('\n') + if err != nil { + return nil, err + } + if line[0] != '*' { + return []string{string(line)}, nil + } + + args := make([]string, 0) + for { + line, err = rd.ReadLine('\n') + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + if line[0] != '$' { + return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) + } + + line, err = rd.ReadLine('\n') + args = append(args, string(line)) + } + return args, nil +} + +//------------------------------------------------------------------------------ + +func PackReq(args []string) []byte { + buf := make([]byte, 0, 1024) + buf = append(buf, '*') + buf = strconv.AppendUint(buf, uint64(len(args)), 10) + buf = append(buf, '\r', '\n') + for _, arg := range args { + buf = append(buf, '$') + buf = strconv.AppendUint(buf, uint64(len(arg)), 10) + buf = append(buf, '\r', '\n') + buf = append(buf, []byte(arg)...) + buf = append(buf, '\r', '\n') + } + return buf +} + +//------------------------------------------------------------------------------ + +type Req interface { + Req() []byte + ParseReply(*bufreader.Reader) + SetErr(error) + Err() error +} + +//------------------------------------------------------------------------------ + +type BaseReq struct { + args []string + err error +} + +func NewBaseReq(args ...string) *BaseReq { + return &BaseReq{ + args: args, + } +} + +func (r *BaseReq) Req() []byte { + return PackReq(r.args) +} + +func (r *BaseReq) SetErr(err error) { + r.err = err +} + +func (r *BaseReq) Err() error { + return r.err +} + +//------------------------------------------------------------------------------ + +type StatusReq struct { + *BaseReq + val string +} + +func NewStatusReq(args ...string) *StatusReq { + return &StatusReq{ + BaseReq: NewBaseReq(args...), + } +} + +func (r *StatusReq) ParseReply(rd *bufreader.Reader) { + var line []byte + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + + if line[0] == '-' { + r.err = errors.New(string(line[1:])) + return + } else if line[0] != '+' { + r.err = fmt.Errorf("Expected '+', but got %q of %q.", line, rd.Bytes()) + return + } + + r.val = string(line[1:]) +} + +func (r *StatusReq) Reply() (string, error) { + return r.val, r.err +} + +//------------------------------------------------------------------------------ + +type IntReq struct { + *BaseReq + val int64 +} + +func NewIntReq(args ...string) *IntReq { + return &IntReq{ + BaseReq: NewBaseReq(args...), + } +} + +func (r *IntReq) ParseReply(rd *bufreader.Reader) { + var line []byte + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + + if line[0] == '-' { + r.err = errors.New(string(line[1:])) + return + } else if line[0] != ':' { + r.err = fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) + return + } + + r.val, r.err = strconv.ParseInt(string(line[1:]), 10, 64) +} + +func (r *IntReq) Reply() (int64, error) { + return r.val, r.err +} + +//------------------------------------------------------------------------------ + +type BoolReq struct { + *BaseReq + val bool +} + +func NewBoolReq(args ...string) *BoolReq { + return &BoolReq{ + BaseReq: NewBaseReq(args...), + } +} + +func (r *BoolReq) ParseReply(rd *bufreader.Reader) { + var line []byte + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + + if line[0] == '-' { + r.err = errors.New(string(line[1:])) + return + } else if line[0] != ':' { + r.err = fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) + return + } + + r.val = line[1] == '1' +} + +func (r *BoolReq) Reply() (bool, error) { + return r.val, r.err +} + +//------------------------------------------------------------------------------ + +type BulkReq struct { + *BaseReq + val string +} + +func NewBulkReq(args ...string) *BulkReq { + return &BulkReq{ + BaseReq: NewBaseReq(args...), + } +} + +func (r *BulkReq) ParseReply(rd *bufreader.Reader) { + var line []byte + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + + if line[0] == '-' { + r.err = errors.New(string(line[1:])) + return + } else if line[0] != '$' { + r.err = fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) + return + } + + if len(line) >= 3 && line[1] == '-' && line[2] == '1' { + r.err = Nil + return + } + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + + r.val = string(line) +} + +func (r *BulkReq) Reply() (string, error) { + return r.val, r.err +} + +//------------------------------------------------------------------------------ + +type MultiBulkReq struct { + *BaseReq + val []interface{} +} + +func NewMultiBulkReq(args ...string) *MultiBulkReq { + return &MultiBulkReq{ + BaseReq: NewBaseReq(args...), + } +} + +func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) { + var line []byte + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + + if line[0] == '-' { + r.err = errors.New(string(line[1:])) + return + } else if line[0] != '*' { + r.err = fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes()) + return + } + + val := make([]interface{}, 0) + + if len(line) >= 2 && line[1] == '0' { + r.val = val + return + } else if len(line) >= 3 && line[1] == '-' && line[2] == '1' { + r.err = Nil + return + } + + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + for { + if line[0] == ':' { + var n int64 + n, r.err = strconv.ParseInt(string(line[1:]), 10, 64) + if r.err != nil { + return + } + val = append(val, n) + } else if line[0] == '$' { + if len(line) >= 2 && line[1] == '0' { + val = append(val, "") + } else if len(line) >= 3 && line[1] == '-' && line[2] == '1' { + val = append(val, nil) + } else { + line, r.err = rd.ReadLine('\n') + if r.err != nil { + return + } + val = append(val, string(line)) + } + } else { + r.err = fmt.Errorf("Expected '$', but got line %q of %q.", line, rd.Bytes()) + return + } + + line, r.err = rd.ReadLine('\n') + if r.err == io.EOF { + r.err = nil + break + } + // Check for start of another reply. + if line[0] == '*' { + rd.UnreadLine('\n') + break + } + } + + r.val = val +} + +func (r *MultiBulkReq) Reply() ([]interface{}, error) { + return r.val, r.err +}