Change API and update tests.

This commit is contained in:
Vladimir Mihailenco 2012-08-06 15:09:48 +03:00
parent c5c8ec6b0c
commit 05223145e0
6 changed files with 1184 additions and 965 deletions

View File

@ -33,14 +33,14 @@ Example 2:
} }
initConn := func(client *redis.Client) error { initConn := func(client *redis.Client) error {
_, err := client.Auth("foo").Reply() auth := client.Auth("foo")
if err != nil { if auth.Err() != nil {
return err return auth.Err()
} }
_, err = client.Ping().Reply() ping := client.Ping()
if err != nil { if ping.Err() != nil {
return err return ping.Err()
} }
return nil return nil
@ -53,17 +53,19 @@ Both `closeConn` and `initConn` functions can be `nil`.
Running commands Running commands
---------------- ----------------
_, err := redisClient.Set("foo", "bar").Reply() set := redisClient.Set("foo", "bar")
if err != nil { if set.Err() != nil {
panic(err) panic(set.Err())
} }
ok := set.Val()
value, err := redisClient.Get("foo").Reply() get := redisClient.Get("foo")
if err != nil { if get.Err() != nil {
if err != redis.Nil { if get.Err() != redis.Nil {
panic(err) panic(get.Err())
} }
} }
val := get.Val()
Pipelining Pipelining
---------- ----------
@ -80,16 +82,12 @@ Client has ability to run several commands with one read/write:
panic(err) panic(err)
} }
ok, err := setReq.Reply() if setReq.Err() != nil {
if err != nil { panic(setReq.Err())
panic(err)
} }
value, err := getReq.Reply() if getReq.Err() != nil && getReq.Err() != redis.Nil {
if err != nil { panic(getReq.Err())
if err != redis.Nil {
panic(err)
}
} }
Multi/Exec Multi/Exec
@ -99,26 +97,22 @@ Example 1:
multiClient := redisClient.Multi() multiClient := redisClient.Multi()
futureGet1 := multiClient.Get("foo1") get1 := multiClient.Get("foo1")
futureGet2 := multiClient.Get("foo2") get2 := multiClient.Get("foo2")
_, err := multiClient.Exec() _, err := multiClient.Exec()
if err != nil { if err != nil {
panic(err) panic(err)
} }
value1, err := futureGet1.Reply() if get1.Err() != nil && get1.Err() != redis.Nil {
if err != nil { panic(get1.Err())
if err != redis.Nil {
panic(err)
}
} }
val1 := get1.Val()
value2, err := futureGet2.Reply() if get2.Err() != nil && get2.Err() != redis.Nil {
if err != nil { panic(get2.Err())
if err != redis.Nil {
panic(err)
}
} }
val2 := get2.Val()
Example 2: Example 2:
@ -132,11 +126,8 @@ Example 2:
} }
for req := range reqs { for req := range reqs {
value, err := req.Reply() if req.Err() != nil && req.Err() != redis.Nil {
if err != nil { panic(req.Err())
if err != redis.Nil {
panic(err)
}
} }
} }
@ -145,14 +136,17 @@ Pub/sub
Publish: Publish:
_, err := redisClient.Publish("mychannel", "hello").Reply() pub := redisClient.Publish("mychannel", "hello")
if err != nil { if pub.Err() != nil {
panic(err) panic(pub.Err())
} }
Subscribe: Subscribe:
pubsub := redisClient.PubSubClient() pubsub, err := redisClient.PubSubClient()
if err != nil {
panic(err)
}
ch, err := pubsub.Subscribe("mychannel") ch, err := pubsub.Subscribe("mychannel")
if err != nil { if err != nil {
@ -164,18 +158,18 @@ Subscribe:
if msg.Err != nil { if msg.Err != nil {
panic(err) panic(err)
} }
fmt.Println(msg.Message) message := msg.Message
} }
} }
Thread safety Thread safety
------------- -------------
redis.Client methods are thread safe. Following code is correct: Commands are thread safe. Following code is correct:
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
go func() { go func() {
redisClient.Incr("foo").Reply() redisClient.Incr("foo")
}() }()
} }
@ -186,15 +180,13 @@ Example:
func Get(client *redis.Client, key string) *redis.BulkReq { func Get(client *redis.Client, key string) *redis.BulkReq {
req := redis.NewBulkReq("GET", key) req := redis.NewBulkReq("GET", key)
client.Queue(req) client.Process(req)
return req return req
} }
value, err := Get(redisClient, "foo").Reply() get := Get(redisClient, "foo")
if err != nil { if get.Err() != nil && get.Err() != redis.Nil {
if err != redis.Nil { panic(get.Err())
panic(err)
}
} }
Connection pool Connection pool

View File

@ -326,7 +326,7 @@ func (c *Client) SetEx(key string, seconds int64, value string) *StatusReq {
return req return req
} }
func (c *Client) SetNx(key, value string) *BoolReq { func (c *Client) SetNX(key, value string) *BoolReq {
req := NewBoolReq("SETNX", key, value) req := NewBoolReq("SETNX", key, value)
c.Process(req) c.Process(req)
return req return req
@ -679,8 +679,8 @@ func (c *Client) ZCount(key, min, max string) *IntReq {
return req return req
} }
func (c *Client) ZIncrBy(key string, increment int64, member string) *IntReq { func (c *Client) ZIncrBy(key string, increment int64, member string) *FloatReq {
req := NewIntReq("ZINCRBY", key, strconv.FormatInt(increment, 10), member) req := NewFloatReq("ZINCRBY", key, strconv.FormatInt(increment, 10), member)
c.Process(req) c.Process(req)
return req return req
} }

View File

@ -6,7 +6,6 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"time"
"github.com/vmihailenco/bufreader" "github.com/vmihailenco/bufreader"
) )
@ -34,16 +33,16 @@ func AuthSelectFunc(password string, db int64) InitConnFunc {
return func(client *Client) error { return func(client *Client) error {
if password != "" { if password != "" {
_, err := client.Auth(password).Reply() auth := client.Auth(password)
if err != nil { if auth.Err() != nil {
return err return auth.Err()
} }
} }
if db >= 0 { if db >= 0 {
_, err := client.Select(db).Reply() sel := client.Select(db)
if err != nil { if sel.Err() != nil {
return err return sel.Err()
} }
} }
@ -105,13 +104,6 @@ func (c *Client) WriteReq(buf []byte, conn *Conn) error {
} }
func (c *Client) ReadReply(conn *Conn) error { func (c *Client) ReadReply(conn *Conn) error {
if false {
err := conn.RW.(*net.TCPConn).SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
return err
}
}
_, err := conn.Rd.ReadFrom(conn.RW) _, err := conn.Rd.ReadFrom(conn.RW)
if err != nil { if err != nil {
return err return err
@ -120,8 +112,6 @@ func (c *Client) ReadReply(conn *Conn) error {
} }
func (c *Client) WriteRead(buf []byte, conn *Conn) error { func (c *Client) WriteRead(buf []byte, conn *Conn) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if err := c.WriteReq(buf, conn); err != nil { if err := c.WriteReq(buf, conn); err != nil {
return err return err
} }

File diff suppressed because it is too large Load Diff

View File

@ -82,7 +82,7 @@ type Req interface {
SetErr(error) SetErr(error)
Err() error Err() error
SetVal(interface{}) SetVal(interface{})
Val() interface{} InterfaceVal() interface{}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -112,8 +112,14 @@ func (r *BaseReq) SetErr(err error) {
} }
func (r *BaseReq) Err() error { func (r *BaseReq) Err() error {
if r.err != nil {
return r.err return r.err
} }
if r.val == nil {
return errResultMissing
}
return nil
}
func (r *BaseReq) SetVal(val interface{}) { func (r *BaseReq) SetVal(val interface{}) {
if val == nil { if val == nil {
@ -122,7 +128,7 @@ func (r *BaseReq) SetVal(val interface{}) {
r.val = val r.val = val
} }
func (r *BaseReq) Val() interface{} { func (r *BaseReq) InterfaceVal() interface{} {
return r.val return r.val
} }
@ -157,13 +163,11 @@ func (r *StatusReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return string(line[1:]), nil return string(line[1:]), nil
} }
func (r *StatusReq) Reply() (string, error) { func (r *StatusReq) Val() string {
if r.val == nil && r.err == nil { if r.val == nil {
return "", errResultMissing return ""
} else if r.err != nil {
return "", r.err
} }
return r.val.(string), nil return r.val.(string)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -193,13 +197,11 @@ func (r *IntReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return strconv.ParseInt(string(line[1:]), 10, 64) return strconv.ParseInt(string(line[1:]), 10, 64)
} }
func (r *IntReq) Reply() (int64, error) { func (r *IntReq) Val() int64 {
if r.val == nil && r.err == nil { if r.val == nil {
return 0, errResultMissing return 0
} else if r.err != nil {
return 0, r.err
} }
return r.val.(int64), nil return r.val.(int64)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -231,13 +233,11 @@ func (r *IntNilReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes())
} }
func (r *IntNilReq) Reply() (int64, error) { func (r *IntNilReq) Val() int64 {
if r.val == nil && r.err == nil { if r.val == nil {
return 0, errResultMissing return 0
} else if r.err != nil {
return 0, r.err
} }
return r.val.(int64), nil return r.val.(int64)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -267,13 +267,11 @@ func (r *BoolReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return line[1] == '1', nil return line[1] == '1', nil
} }
func (r *BoolReq) Reply() (bool, error) { func (r *BoolReq) Val() bool {
if r.val == nil && r.err == nil { if r.val == nil {
return false, errResultMissing return false
} else if r.err != nil {
return false, r.err
} }
return r.val.(bool), nil return r.val.(bool)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -312,13 +310,11 @@ func (r *BulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return string(line), nil return string(line), nil
} }
func (r *BulkReq) Reply() (string, error) { func (r *BulkReq) Val() string {
if r.val == nil && r.err == nil { if r.val == nil {
return "", errResultMissing return ""
} else if r.err != nil {
return "", r.err
} }
return r.val.(string), nil return r.val.(string)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -357,13 +353,11 @@ func (r *FloatReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return strconv.ParseFloat(string(line), 64) return strconv.ParseFloat(string(line), 64)
} }
func (r *FloatReq) Reply() (float64, error) { func (r *FloatReq) Val() float64 {
if r.val == nil && r.err == nil { if r.val == nil {
return 0, errResultMissing return 0
} else if r.err != nil {
return 0, r.err
} }
return r.val.(float64), nil return r.val.(float64)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -434,11 +428,9 @@ func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return val, nil return val, nil
} }
func (r *MultiBulkReq) Reply() ([]interface{}, error) { func (r *MultiBulkReq) Val() []interface{} {
if r.val == nil && r.err == nil { if r.val == nil {
return nil, errResultMissing return nil
} else if r.err != nil {
return nil, r.err
} }
return r.val.([]interface{}), nil return r.val.([]interface{})
} }

View File

@ -35,9 +35,8 @@ func (t *RequestTest) BenchmarkStatusReq(c *C) {
c.Check(vI, Equals, "OK") c.Check(vI, Equals, "OK")
req.SetVal(vI) req.SetVal(vI)
v, err := req.Reply() c.Check(req.Err(), IsNil)
c.Check(err, IsNil) c.Check(req.Val(), Equals, "OK")
c.Check(v, Equals, "OK")
} }
c.StartTimer() c.StartTimer()
@ -46,6 +45,7 @@ func (t *RequestTest) BenchmarkStatusReq(c *C) {
rd.ResetPos() rd.ResetPos()
v, _ := req.ParseReply(rd) v, _ := req.ParseReply(rd)
req.SetVal(v) req.SetVal(v)
req.Reply() req.Err()
req.Val()
} }
} }