Add experimental V2 client.

This commit is contained in:
Vladimir Mihailenco 2013-07-02 14:17:31 +03:00
parent 45e45f8422
commit 49c91385e0
14 changed files with 5988 additions and 0 deletions

1155
v2/commands.go Normal file

File diff suppressed because it is too large Load Diff

133
v2/doc.go Normal file
View File

@ -0,0 +1,133 @@
/*
Package github.com/vmihailenco/redis implements a Redis client.
Let's start with connecting to Redis using TCP:
password := "" // no password set
db := int64(-1) // use default DB
client := redis.NewTCPClient("localhost:6379", password, db)
defer client.Close()
ping := client.Ping()
fmt.Println(ping.Err(), ping.Val())
// Output: <nil> PONG
or using Unix socket:
client := redis.NewUnixClient("/tmp/redis.sock", "", -1)
defer client.Close()
ping := client.Ping()
fmt.Println(ping.Err(), ping.Val())
// Output: <nil> PONG
Then we can start sending commands:
set := client.Set("foo", "bar")
fmt.Println(set.Err(), set.Val())
get := client.Get("foo")
fmt.Println(get.Err(), get.Val())
// Output: <nil> OK
// <nil> bar
We can also pipeline two commands together:
var set *redis.StatusReq
var get *redis.StringReq
reqs, err := client.Pipelined(func(c *redis.PipelineClient) {
set = c.Set("key1", "hello1")
get = c.Get("key2")
})
fmt.Println(err, reqs)
fmt.Println(set)
fmt.Println(get)
// Output: <nil> [SET key1 hello1: OK GET key2: (nil)]
// SET key1 hello1: OK
// GET key2: (nil)
or:
var set *redis.StatusReq
var get *redis.StringReq
reqs, err := client.Pipelined(func(c *redis.PipelineClient) {
set = c.Set("key1", "hello1")
get = c.Get("key2")
})
fmt.Println(err, reqs)
fmt.Println(set)
fmt.Println(get)
// Output: <nil> [SET key1 hello1 GET key2]
// SET key1 hello1
// GET key2
We can also send several commands in transaction:
func transaction(multi *redis.MultiClient) ([]redis.Req, error) {
get := multi.Get("key")
if err := get.Err(); err != nil && err != redis.Nil {
return nil, err
}
val, _ := strconv.ParseInt(get.Val(), 10, 64)
reqs, err := multi.Exec(func() {
multi.Set("key", strconv.FormatInt(val+1, 10))
})
// Transaction failed. Repeat.
if err == redis.Nil {
return transaction(multi)
}
return reqs, err
}
multi, err := client.MultiClient()
_ = err
defer multi.Close()
watch := multi.Watch("key")
_ = watch.Err()
reqs, err := transaction(multi)
fmt.Println(err, reqs)
// Output: <nil> [SET key 1: OK]
To subscribe to the channel:
pubsub, err := client.PubSubClient()
defer pubsub.Close()
ch, err := pubsub.Subscribe("mychannel")
_ = err
subscribeMsg := <-ch
fmt.Println(subscribeMsg.Err, subscribeMsg.Name)
pub := client.Publish("mychannel", "hello")
_ = pub.Err()
msg := <-ch
fmt.Println(msg.Err, msg.Message)
// Output: <nil> subscribe
// <nil> hello
You can also write custom commands:
func Get(client *redis.Client, key string) *redis.StringReq {
req := redis.NewStringReq("GET", key)
client.Process(req)
return req
}
get := Get(client, "key_does_not_exist")
fmt.Println(get.Err(), get.Val())
// Output: (nil)
Client uses connection pool to send commands. You can change maximum number of connections with:
client.ConnPool.(*redis.MultiConnPool).MaxCap = 1
*/
package redis

135
v2/example_test.go Normal file
View File

@ -0,0 +1,135 @@
package redis_test
import (
"fmt"
"strconv"
"github.com/vmihailenco/redis/v2"
)
func ExampleTCPClient() {
password := "" // no password set
db := int64(-1) // use default DB
client := redis.NewTCPClient("localhost:6379", password, db)
defer client.Close()
ping := client.Ping()
fmt.Println(ping.Err(), ping.Val())
// Output: <nil> PONG
}
func ExampleUnixClient() {
client := redis.NewUnixClient("/tmp/redis.sock", "", -1)
defer client.Close()
ping := client.Ping()
fmt.Println(ping.Err(), ping.Val())
// Output: <nil> PONG
}
func ExampleSetGet() {
client := redis.NewTCPClient(":6379", "", -1)
defer client.Close()
set := client.Set("foo", "bar")
fmt.Println(set.Err(), set.Val())
get := client.Get("foo")
fmt.Println(get.Err(), get.Val())
// Output: <nil> OK
// <nil> bar
}
func ExamplePipeline() {
client := redis.NewTCPClient(":6379", "", -1)
defer client.Close()
var set *redis.StatusReq
var get *redis.StringReq
reqs, err := client.Pipelined(func(c *redis.PipelineClient) {
set = c.Set("key1", "hello1")
get = c.Get("key2")
})
fmt.Println(err, reqs)
fmt.Println(set)
fmt.Println(get)
// Output: <nil> [SET key1 hello1: OK GET key2: (nil)]
// SET key1 hello1: OK
// GET key2: (nil)
}
func transaction(multi *redis.MultiClient) ([]redis.Req, error) {
get := multi.Get("key")
if err := get.Err(); err != nil && err != redis.Nil {
return nil, err
}
val, _ := strconv.ParseInt(get.Val(), 10, 64)
reqs, err := multi.Exec(func() {
multi.Set("key", strconv.FormatInt(val+1, 10))
})
// Transaction failed. Repeat.
if err == redis.Nil {
return transaction(multi)
}
return reqs, err
}
func ExampleTransaction() {
client := redis.NewTCPClient(":6379", "", -1)
defer client.Close()
client.Del("key")
multi, err := client.MultiClient()
_ = err
defer multi.Close()
watch := multi.Watch("key")
_ = watch.Err()
reqs, err := transaction(multi)
fmt.Println(err, reqs)
// Output: <nil> [SET key 1: OK]
}
func ExamplePubSub() {
client := redis.NewTCPClient(":6379", "", -1)
defer client.Close()
pubsub, err := client.PubSubClient()
defer pubsub.Close()
ch, err := pubsub.Subscribe("mychannel")
_ = err
subscribeMsg := <-ch
fmt.Println(subscribeMsg.Err, subscribeMsg.Name)
pub := client.Publish("mychannel", "hello")
_ = pub.Err()
msg := <-ch
fmt.Println(msg.Err, msg.Message)
// Output: <nil> subscribe
// <nil> hello
}
func Get(client *redis.Client, key string) *redis.StringReq {
req := redis.NewStringReq("GET", key)
client.Process(req)
return req
}
func ExampleCustomCommand() {
client := redis.NewTCPClient(":6379", "", -1)
defer client.Close()
get := Get(client, "key_does_not_exist")
fmt.Println(get.Err(), get.Val())
// Output: (nil)
}

134
v2/multi.go Normal file
View File

@ -0,0 +1,134 @@
package redis
import (
"fmt"
"sync"
)
type MultiClient struct {
*Client
execMtx sync.Mutex
}
func (c *Client) MultiClient() (*MultiClient, error) {
return &MultiClient{
Client: &Client{
baseClient: &baseClient{
connPool: newSingleConnPool(c.connPool, nil, true),
password: c.password,
db: c.db,
},
},
}, nil
}
func (c *MultiClient) Close() error {
c.Unwatch()
return c.Client.Close()
}
func (c *MultiClient) Watch(keys ...string) *StatusReq {
args := append([]string{"WATCH"}, keys...)
req := NewStatusReq(args...)
c.Process(req)
return req
}
func (c *MultiClient) Unwatch(keys ...string) *StatusReq {
args := append([]string{"UNWATCH"}, keys...)
req := NewStatusReq(args...)
c.Process(req)
return req
}
func (c *MultiClient) Discard() {
c.reqsMtx.Lock()
if c.reqs == nil {
panic("Discard can be used only inside Exec")
}
c.reqs = c.reqs[:1]
c.reqsMtx.Unlock()
}
func (c *MultiClient) Exec(do func()) ([]Req, error) {
c.reqsMtx.Lock()
c.reqs = []Req{NewStatusReq("MULTI")}
c.reqsMtx.Unlock()
do()
c.queue(NewIfaceSliceReq("EXEC"))
c.reqsMtx.Lock()
reqs := c.reqs
c.reqs = nil
c.reqsMtx.Unlock()
if len(reqs) == 2 {
return []Req{}, nil
}
cn, err := c.conn()
if err != nil {
return nil, err
}
// Synchronize writes and reads to the connection using mutex.
c.execMtx.Lock()
err = c.execReqs(reqs, cn)
c.execMtx.Unlock()
if err != nil {
c.removeConn(cn)
return nil, err
}
c.putConn(cn)
return reqs[1 : len(reqs)-1], nil
}
func (c *MultiClient) execReqs(reqs []Req, cn *conn) error {
err := c.writeReq(cn, reqs...)
if err != nil {
return err
}
statusReq := NewStatusReq()
// Omit last request (EXEC).
reqsLen := len(reqs) - 1
// Parse queued replies.
for i := 0; i < reqsLen; i++ {
_, err = statusReq.ParseReply(cn.Rd)
if err != nil {
return err
}
}
// Parse number of replies.
line, err := readLine(cn.Rd)
if err != nil {
return err
}
if line[0] != '*' {
return fmt.Errorf("Expected '*', but got line %q", line)
}
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
return Nil
}
// Parse replies.
// Loop starts from 1 to omit first request (MULTI).
for i := 1; i < reqsLen; i++ {
req := reqs[i]
val, err := req.ParseReply(cn.Rd)
if err != nil {
req.SetErr(err)
} else {
req.SetVal(val)
}
}
return nil
}

304
v2/parser.go Normal file
View File

@ -0,0 +1,304 @@
package redis
import (
"errors"
"fmt"
"strconv"
"github.com/vmihailenco/bufio"
)
type replyType int
const (
ifaceSlice replyType = iota
stringSlice
boolSlice
stringStringMap
stringFloatMap
)
// Represents Redis nil reply.
var Nil = errors.New("(nil)")
var (
errReaderTooSmall = errors.New("redis: reader is too small")
errValNotSet = errors.New("redis: value is not set")
errInvalidReplyType = errors.New("redis: invalid reply type")
)
//------------------------------------------------------------------------------
type parserError struct {
err error
}
func (e *parserError) Error() string {
return e.err.Error()
}
//------------------------------------------------------------------------------
func appendReq(buf []byte, args []string) []byte {
buf = append(buf, '*')
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
buf = append(buf, '\r', '\n')
for _, arg := range args {
buf = append(buf, '$')
buf = strconv.AppendUint(buf, uint64(len(arg)), 10)
buf = append(buf, '\r', '\n')
buf = append(buf, arg...)
buf = append(buf, '\r', '\n')
}
return buf
}
//------------------------------------------------------------------------------
type reader interface {
ReadLine() ([]byte, bool, error)
Read([]byte) (int, error)
ReadN(n int) ([]byte, error)
Buffered() int
Peek(int) ([]byte, error)
}
func readLine(rd reader) ([]byte, error) {
line, isPrefix, err := rd.ReadLine()
if err != nil {
return line, err
}
if isPrefix {
return line, errReaderTooSmall
}
return line, nil
}
func readN(rd reader, n int) ([]byte, error) {
b, err := rd.ReadN(n)
if err == bufio.ErrBufferFull {
newB := make([]byte, n)
r := copy(newB, b)
b = newB
for {
nn, err := rd.Read(b[r:])
r += nn
if r >= n {
// Ignore error if we read enough.
break
}
if err != nil {
return nil, err
}
}
} else if err != nil {
return nil, err
}
return b, nil
}
//------------------------------------------------------------------------------
func ParseReq(rd reader) ([]string, error) {
line, err := readLine(rd)
if err != nil {
return nil, err
}
if line[0] != '*' {
return []string{string(line)}, nil
}
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return nil, err
}
args := make([]string, 0, numReplies)
for i := int64(0); i < numReplies; i++ {
line, err = readLine(rd)
if err != nil {
return nil, err
}
if line[0] != '$' {
return nil, fmt.Errorf("redis: expected '$', but got %q", line)
}
argLen, err := strconv.ParseInt(string(line[1:]), 10, 32)
if err != nil {
return nil, err
}
arg, err := readN(rd, int(argLen)+2)
if err != nil {
return nil, err
}
args = append(args, string(arg[:argLen]))
}
return args, nil
}
//------------------------------------------------------------------------------
func parseReply(rd reader) (interface{}, error) {
return _parseReply(rd, ifaceSlice)
}
func parseStringSliceReply(rd reader) (interface{}, error) {
return _parseReply(rd, stringSlice)
}
func parseBoolSliceReply(rd reader) (interface{}, error) {
return _parseReply(rd, boolSlice)
}
func parseStringStringMapReply(rd reader) (interface{}, error) {
return _parseReply(rd, stringStringMap)
}
func parseStringFloatMapReply(rd reader) (interface{}, error) {
return _parseReply(rd, stringFloatMap)
}
func _parseReply(rd reader, typ replyType) (interface{}, error) {
line, err := readLine(rd)
if err != nil {
return 0, &parserError{err}
}
switch line[0] {
case '-':
return nil, errors.New(string(line[1:]))
case '+':
return string(line[1:]), nil
case ':':
v, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return 0, &parserError{err}
}
return v, nil
case '$':
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
return "", Nil
}
replyLenInt32, err := strconv.ParseInt(string(line[1:]), 10, 32)
if err != nil {
return "", &parserError{err}
}
replyLen := int(replyLenInt32) + 2
line, err = readN(rd, replyLen)
if err != nil {
return "", &parserError{err}
}
return string(line[:len(line)-2]), nil
case '*':
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
return nil, Nil
}
repliesNum, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return nil, &parserError{err}
}
switch typ {
case stringSlice:
vals := make([]string, 0, repliesNum)
for i := int64(0); i < repliesNum; i++ {
vi, err := parseReply(rd)
if err != nil {
return nil, err
}
if v, ok := vi.(string); ok {
vals = append(vals, v)
} else {
return nil, errInvalidReplyType
}
}
return vals, nil
case boolSlice:
vals := make([]bool, 0, repliesNum)
for i := int64(0); i < repliesNum; i++ {
vi, err := parseReply(rd)
if err != nil {
return nil, err
}
if v, ok := vi.(int64); ok {
vals = append(vals, v == 1)
} else {
return nil, errInvalidReplyType
}
}
return vals, nil
case stringStringMap: // TODO: Consider handling Nil values.
m := make(map[string]string, repliesNum/2)
for i := int64(0); i < repliesNum; i += 2 {
keyI, err := parseReply(rd)
if err != nil {
return nil, err
}
key, ok := keyI.(string)
if !ok {
return nil, errInvalidReplyType
}
valueI, err := parseReply(rd)
if err != nil {
return nil, err
}
value, ok := valueI.(string)
if !ok {
return nil, errInvalidReplyType
}
m[key] = value
}
return m, nil
case stringFloatMap: // TODO: Consider handling Nil values.
m := make(map[string]float64, repliesNum/2)
for i := int64(0); i < repliesNum; i += 2 {
keyI, err := parseReply(rd)
if err != nil {
return nil, err
}
key, ok := keyI.(string)
if !ok {
return nil, errInvalidReplyType
}
valueI, err := parseReply(rd)
if err != nil {
return nil, err
}
valueS, ok := valueI.(string)
if !ok {
return nil, errInvalidReplyType
}
value, err := strconv.ParseFloat(valueS, 64)
if err != nil {
return nil, &parserError{err}
}
m[key] = value
}
return m, nil
default:
vals := make([]interface{}, 0, repliesNum)
for i := int64(0); i < repliesNum; i++ {
v, err := parseReply(rd)
if err == Nil {
vals = append(vals, nil)
} else if err != nil {
return nil, err
} else {
vals = append(vals, v)
}
}
return vals, nil
}
default:
return nil, &parserError{fmt.Errorf("redis: can't parse %q", line)}
}
}

23
v2/parser_test.go Normal file
View File

@ -0,0 +1,23 @@
package redis_test
import (
"bytes"
"github.com/vmihailenco/bufio"
. "launchpad.net/gocheck"
"github.com/vmihailenco/redis/v2"
)
type ParserTest struct{}
var _ = Suite(&ParserTest{})
func (t *ParserTest) TestParseReq(c *C) {
buf := bytes.NewBufferString("*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nhello\r\n")
rd := bufio.NewReaderSize(buf, 1024)
args, err := redis.ParseReq(rd)
c.Check(err, IsNil)
c.Check(args, DeepEquals, []string{"SET", "key", "hello"})
}

87
v2/pipeline.go Normal file
View File

@ -0,0 +1,87 @@
package redis
type PipelineClient struct {
*Client
}
func (c *Client) PipelineClient() (*PipelineClient, error) {
return &PipelineClient{
Client: &Client{
baseClient: &baseClient{
connPool: c.connPool,
password: c.password,
db: c.db,
reqs: make([]Req, 0),
},
},
}, nil
}
func (c *Client) Pipelined(do func(*PipelineClient)) ([]Req, error) {
pc, err := c.PipelineClient()
if err != nil {
return nil, err
}
defer pc.Close()
do(pc)
return pc.RunQueued()
}
func (c *PipelineClient) Close() error {
return nil
}
func (c *PipelineClient) DiscardQueued() {
c.reqsMtx.Lock()
c.reqs = c.reqs[:0]
c.reqsMtx.Unlock()
}
func (c *PipelineClient) RunQueued() ([]Req, error) {
c.reqsMtx.Lock()
reqs := c.reqs
c.reqs = make([]Req, 0)
c.reqsMtx.Unlock()
if len(reqs) == 0 {
return []Req{}, nil
}
conn, err := c.conn()
if err != nil {
return nil, err
}
err = c.runReqs(reqs, conn)
if err != nil {
c.removeConn(conn)
return nil, err
}
c.putConn(conn)
return reqs, nil
}
func (c *PipelineClient) runReqs(reqs []Req, cn *conn) error {
err := c.writeReq(cn, reqs...)
if err != nil {
return err
}
reqsLen := len(reqs)
for i := 0; i < reqsLen; i++ {
req := reqs[i]
val, err := req.ParseReply(cn.Rd)
if err != nil {
req.SetErr(err)
} else {
req.SetVal(val)
}
}
return nil
}

257
v2/pool.go Normal file
View File

@ -0,0 +1,257 @@
package redis
import (
"container/list"
"net"
"sync"
"time"
"github.com/vmihailenco/bufio"
)
type pool interface {
Get() (*conn, bool, error)
Put(*conn) error
Remove(*conn) error
Len() int
Close() error
}
//------------------------------------------------------------------------------
type conn struct {
Cn net.Conn
Rd reader
UsedAt time.Time
readTimeout, writeTimeout time.Duration
}
func newConn(netcn net.Conn, readTimeout, writeTimeout time.Duration) *conn {
cn := &conn{
Cn: netcn,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
}
cn.Rd = bufio.NewReaderSize(netcn, 1024)
return cn
}
func (cn *conn) Read(b []byte) (int, error) {
if cn.readTimeout > 0 {
cn.Cn.SetReadDeadline(time.Now().Add(cn.readTimeout))
}
return cn.Cn.Read(b)
}
func (cn *conn) Write(b []byte) (int, error) {
if cn.writeTimeout > 0 {
cn.Cn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
}
return cn.Cn.Write(b)
}
//------------------------------------------------------------------------------
type connPool struct {
dial func() (net.Conn, error)
close func(net.Conn) error
cond *sync.Cond
conns *list.List
readTimeout, writeTimeout time.Duration
size, maxSize int
idleTimeout time.Duration
}
func newConnPool(
dial func() (net.Conn, error),
close func(net.Conn) error,
maxSize int,
readTimeout, writeTimeout, idleTimeout time.Duration,
) *connPool {
return &connPool{
dial: dial,
close: close,
cond: sync.NewCond(&sync.Mutex{}),
conns: list.New(),
maxSize: maxSize,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
idleTimeout: idleTimeout,
}
}
func (p *connPool) Get() (*conn, bool, error) {
defer p.cond.L.Unlock()
p.cond.L.Lock()
for p.conns.Len() == 0 && p.size >= p.maxSize {
p.cond.Wait()
}
if p.idleTimeout > 0 {
for e := p.conns.Front(); e != nil; e = e.Next() {
cn := e.Value.(*conn)
if time.Since(cn.UsedAt) > p.idleTimeout {
p.conns.Remove(e)
}
}
}
if p.conns.Len() == 0 {
rw, err := p.dial()
if err != nil {
return nil, false, err
}
p.size++
return newConn(rw, p.readTimeout, p.writeTimeout), true, nil
}
elem := p.conns.Front()
p.conns.Remove(elem)
return elem.Value.(*conn), false, nil
}
func (p *connPool) Put(cn *conn) error {
p.cond.L.Lock()
cn.UsedAt = time.Now()
p.conns.PushFront(cn)
p.cond.Signal()
p.cond.L.Unlock()
return nil
}
func (p *connPool) Remove(cn *conn) error {
var err error
if cn != nil {
err = p.closeConn(cn)
}
p.cond.L.Lock()
p.size--
p.cond.Signal()
p.cond.L.Unlock()
return err
}
func (p *connPool) Len() int {
return p.conns.Len()
}
func (p *connPool) Size() int {
return p.size
}
func (p *connPool) Close() error {
defer p.cond.L.Unlock()
p.cond.L.Lock()
for e := p.conns.Front(); e != nil; e = e.Next() {
if err := p.closeConn(e.Value.(*conn)); err != nil {
return err
}
}
p.conns.Init()
p.size = 0
return nil
}
func (p *connPool) closeConn(cn *conn) error {
if p.close != nil {
return p.close(cn.Cn)
} else {
return cn.Cn.Close()
}
}
//------------------------------------------------------------------------------
type singleConnPool struct {
pool pool
l sync.RWMutex
cn *conn
reusable bool
}
func newSingleConnPool(pool pool, cn *conn, reusable bool) *singleConnPool {
return &singleConnPool{
pool: pool,
cn: cn,
reusable: reusable,
}
}
func (p *singleConnPool) Get() (*conn, bool, error) {
p.l.RLock()
if p.cn != nil {
p.l.RUnlock()
return p.cn, false, nil
}
p.l.RUnlock()
defer p.l.Unlock()
p.l.Lock()
cn, isNew, err := p.pool.Get()
if err != nil {
return nil, false, err
}
p.cn = cn
return cn, isNew, nil
}
func (p *singleConnPool) Put(cn *conn) error {
defer p.l.Unlock()
p.l.Lock()
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}
func (p *singleConnPool) Remove(cn *conn) error {
defer p.l.Unlock()
p.l.Lock()
if p.cn != cn {
panic("p.cn != cn")
}
p.cn = nil
return nil
}
func (p *singleConnPool) Len() int {
defer p.l.Unlock()
p.l.Lock()
if p.cn == nil {
return 0
}
return 1
}
func (p *singleConnPool) Close() error {
defer p.l.Unlock()
p.l.Lock()
var err error
if p.cn != nil {
if p.reusable {
err = p.pool.Put(p.cn)
} else {
err = p.pool.Remove(p.cn)
}
}
p.cn = nil
return err
}

128
v2/pubsub.go Normal file
View File

@ -0,0 +1,128 @@
package redis
import (
"fmt"
"sync"
)
type PubSubClient struct {
*baseClient
ch chan *Message
once sync.Once
}
func (c *Client) PubSubClient() (*PubSubClient, error) {
return &PubSubClient{
baseClient: &baseClient{
connPool: newSingleConnPool(c.connPool, nil, false),
password: c.password,
db: c.db,
},
ch: make(chan *Message),
}, nil
}
func (c *Client) Publish(channel, message string) *IntReq {
req := NewIntReq("PUBLISH", channel, message)
c.Process(req)
return req
}
type Message struct {
Name, Channel, ChannelPattern, Message string
Number int64
Err error
}
func (c *PubSubClient) consumeMessages(cn *conn) {
req := NewIfaceSliceReq()
for {
msg := &Message{}
replyIface, err := req.ParseReply(cn.Rd)
if err != nil {
msg.Err = err
c.ch <- msg
return
}
reply, ok := replyIface.([]interface{})
if !ok {
msg.Err = fmt.Errorf("redis: unexpected reply type %T", replyIface)
c.ch <- msg
return
}
msgName := reply[0].(string)
switch msgName {
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
msg.Name = msgName
msg.Channel = reply[1].(string)
msg.Number = reply[2].(int64)
case "message":
msg.Name = msgName
msg.Channel = reply[1].(string)
msg.Message = reply[2].(string)
case "pmessage":
msg.Name = msgName
msg.ChannelPattern = reply[1].(string)
msg.Channel = reply[2].(string)
msg.Message = reply[3].(string)
default:
msg.Err = fmt.Errorf("redis: unsupported message name: %q", msgName)
}
c.ch <- msg
}
}
func (c *PubSubClient) subscribe(cmd string, channels ...string) (chan *Message, error) {
args := append([]string{cmd}, channels...)
req := NewIfaceSliceReq(args...)
cn, err := c.conn()
if err != nil {
return nil, err
}
if err := c.writeReq(cn, req); err != nil {
return nil, err
}
c.once.Do(func() {
go c.consumeMessages(cn)
})
return c.ch, nil
}
func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) {
return c.subscribe("SUBSCRIBE", channels...)
}
func (c *PubSubClient) PSubscribe(patterns ...string) (chan *Message, error) {
return c.subscribe("PSUBSCRIBE", patterns...)
}
func (c *PubSubClient) unsubscribe(cmd string, channels ...string) error {
args := append([]string{cmd}, channels...)
req := NewIfaceSliceReq(args...)
cn, err := c.conn()
if err != nil {
return err
}
return c.writeReq(cn, req)
}
func (c *PubSubClient) Unsubscribe(channels ...string) error {
return c.unsubscribe("UNSUBSCRIBE", channels...)
}
func (c *PubSubClient) PUnsubscribe(patterns ...string) error {
return c.unsubscribe("PUNSUBSCRIBE", patterns...)
}

226
v2/redis.go Normal file
View File

@ -0,0 +1,226 @@
package redis
import (
"crypto/tls"
"log"
"net"
"os"
"sync"
"time"
)
// Package logger.
var Logger = log.New(os.Stdout, "redis: ", log.Ldate|log.Ltime)
//------------------------------------------------------------------------------
type baseClient struct {
connPool pool
password string
db int64
reqs []Req
reqsMtx sync.Mutex
}
func (c *baseClient) writeReq(cn *conn, reqs ...Req) error {
buf := make([]byte, 0, 1000)
for _, req := range reqs {
buf = appendReq(buf, req.Args())
}
_, err := cn.Write(buf)
return err
}
func (c *baseClient) conn() (*conn, error) {
cn, isNew, err := c.connPool.Get()
if err != nil {
return nil, err
}
if isNew && (c.password != "" || c.db > 0) {
if err = c.init(cn, c.password, c.db); err != nil {
c.removeConn(cn)
return nil, err
}
}
return cn, nil
}
func (c *baseClient) init(cn *conn, password string, db int64) error {
// Client is not closed on purpose.
client := &Client{
baseClient: &baseClient{
connPool: newSingleConnPool(c.connPool, cn, false),
},
}
if password != "" {
auth := client.Auth(password)
if auth.Err() != nil {
return auth.Err()
}
}
if db > 0 {
sel := client.Select(db)
if sel.Err() != nil {
return sel.Err()
}
}
return nil
}
func (c *baseClient) removeConn(cn *conn) {
if err := c.connPool.Remove(cn); err != nil {
Logger.Printf("connPool.Remove error: %v", err)
}
}
func (c *baseClient) putConn(cn *conn) {
if err := c.connPool.Put(cn); err != nil {
Logger.Printf("connPool.Add error: %v", err)
}
}
func (c *baseClient) Process(req Req) {
if c.reqs == nil {
c.run(req)
} else {
c.queue(req)
}
}
func (c *baseClient) run(req Req) {
cn, err := c.conn()
if err != nil {
req.SetErr(err)
return
}
err = c.writeReq(cn, req)
if err != nil {
c.removeConn(cn)
req.SetErr(err)
return
}
val, err := req.ParseReply(cn.Rd)
if err != nil {
if _, ok := err.(*parserError); ok {
c.removeConn(cn)
} else {
c.putConn(cn)
}
req.SetErr(err)
return
}
c.putConn(cn)
req.SetVal(val)
}
// Queues request to be executed later.
func (c *baseClient) queue(req Req) {
c.reqsMtx.Lock()
c.reqs = append(c.reqs, req)
c.reqsMtx.Unlock()
}
func (c *baseClient) Close() error {
return c.connPool.Close()
}
//------------------------------------------------------------------------------
type ClientFactory struct {
Dial func() (net.Conn, error)
Close func(net.Conn) error
Password string
DB int64
PoolSize int
ReadTimeout, WriteTimeout, IdleTimeout time.Duration
}
func (f *ClientFactory) New() *Client {
return &Client{
baseClient: &baseClient{
password: f.Password,
db: f.DB,
connPool: newConnPool(
f.Dial, f.getClose(), f.getPoolSize(),
f.ReadTimeout, f.WriteTimeout, f.IdleTimeout,
),
},
}
}
func (f *ClientFactory) getClose() func(net.Conn) error {
if f.Close == nil {
return func(conn net.Conn) error {
return conn.Close()
}
}
return f.Close
}
func (f *ClientFactory) getPoolSize() int {
if f.PoolSize == 0 {
return 10
}
return f.PoolSize
}
//------------------------------------------------------------------------------
type Client struct {
*baseClient
}
func NewTCPClient(addr string, password string, db int64) *Client {
dial := func() (net.Conn, error) {
return net.DialTimeout("tcp", addr, 3*time.Second)
}
return (&ClientFactory{
Dial: dial,
Password: password,
DB: db,
}).New()
}
func NewTLSClient(addr string, tlsConfig *tls.Config, password string, db int64) *Client {
dial := func() (net.Conn, error) {
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
return nil, err
}
return tls.Client(conn, tlsConfig), nil
}
return (&ClientFactory{
Dial: dial,
Password: password,
DB: db,
}).New()
}
func NewUnixClient(addr string, password string, db int64) *Client {
dial := func() (net.Conn, error) {
return net.DialTimeout("unix", addr, 3*time.Second)
}
return (&ClientFactory{
Dial: dial,
Password: password,
DB: db,
}).New()
}

2953
v2/redis_test.go Normal file

File diff suppressed because it is too large Load Diff

315
v2/req.go Normal file
View File

@ -0,0 +1,315 @@
package redis
import (
"fmt"
"strconv"
"strings"
)
type Req interface {
Args() []string
ParseReply(reader) (interface{}, error)
SetErr(error)
Err() error
SetVal(interface{})
IfaceVal() interface{}
}
//------------------------------------------------------------------------------
type BaseReq struct {
args []string
val interface{}
err error
}
func NewBaseReq(args ...string) *BaseReq {
return &BaseReq{
args: args,
}
}
func (r *BaseReq) Args() []string {
return r.args
}
func (r *BaseReq) SetErr(err error) {
if err == nil {
panic("non-nil value expected")
}
r.err = err
}
func (r *BaseReq) Err() error {
if r.err != nil {
return r.err
}
if r.val == nil {
return errValNotSet
}
return nil
}
func (r *BaseReq) SetVal(val interface{}) {
if val == nil {
panic("non-nil value expected")
}
r.val = val
}
func (r *BaseReq) IfaceVal() interface{} {
return r.val
}
func (r *BaseReq) ParseReply(rd reader) (interface{}, error) {
return parseReply(rd)
}
func (r *BaseReq) String() string {
args := strings.Join(r.args, " ")
if r.err != nil {
return args + ": " + r.err.Error()
} else if r.val != nil {
return args + ": " + fmt.Sprint(r.val)
}
return args
}
//------------------------------------------------------------------------------
type IfaceReq struct {
*BaseReq
}
func NewIfaceReq(args ...string) *IfaceReq {
return &IfaceReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *IfaceReq) Val() interface{} {
return r.val
}
//------------------------------------------------------------------------------
type StatusReq struct {
*BaseReq
}
func NewStatusReq(args ...string) *StatusReq {
return &StatusReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *StatusReq) Val() string {
if r.val == nil {
return ""
}
return r.val.(string)
}
//------------------------------------------------------------------------------
type IntReq struct {
*BaseReq
}
func NewIntReq(args ...string) *IntReq {
return &IntReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *IntReq) Val() int64 {
if r.val == nil {
return 0
}
return r.val.(int64)
}
//------------------------------------------------------------------------------
type BoolReq struct {
*BaseReq
}
func NewBoolReq(args ...string) *BoolReq {
return &BoolReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *BoolReq) ParseReply(rd reader) (interface{}, error) {
v, err := parseReply(rd)
if err != nil {
return nil, err
}
return v.(int64) == 1, nil
}
func (r *BoolReq) Val() bool {
if r.val == nil {
return false
}
return r.val.(bool)
}
//------------------------------------------------------------------------------
type StringReq struct {
*BaseReq
}
func NewStringReq(args ...string) *StringReq {
return &StringReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *StringReq) Val() string {
if r.val == nil {
return ""
}
return r.val.(string)
}
//------------------------------------------------------------------------------
type FloatReq struct {
*BaseReq
}
func NewFloatReq(args ...string) *FloatReq {
return &FloatReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *FloatReq) ParseReply(rd reader) (interface{}, error) {
v, err := parseReply(rd)
if err != nil {
return nil, err
}
return strconv.ParseFloat(v.(string), 64)
}
func (r *FloatReq) Val() float64 {
if r.val == nil {
return 0
}
return r.val.(float64)
}
//------------------------------------------------------------------------------
type IfaceSliceReq struct {
*BaseReq
}
func NewIfaceSliceReq(args ...string) *IfaceSliceReq {
return &IfaceSliceReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *IfaceSliceReq) Val() []interface{} {
if r.val == nil {
return nil
}
return r.val.([]interface{})
}
//------------------------------------------------------------------------------
type StringSliceReq struct {
*BaseReq
}
func NewStringSliceReq(args ...string) *StringSliceReq {
return &StringSliceReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *StringSliceReq) ParseReply(rd reader) (interface{}, error) {
return parseStringSliceReply(rd)
}
func (r *StringSliceReq) Val() []string {
if r.val == nil {
return nil
}
return r.val.([]string)
}
//------------------------------------------------------------------------------
type BoolSliceReq struct {
*BaseReq
}
func NewBoolSliceReq(args ...string) *BoolSliceReq {
return &BoolSliceReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *BoolSliceReq) ParseReply(rd reader) (interface{}, error) {
return parseBoolSliceReply(rd)
}
func (r *BoolSliceReq) Val() []bool {
if r.val == nil {
return nil
}
return r.val.([]bool)
}
//------------------------------------------------------------------------------
type StringStringMapReq struct {
*BaseReq
}
func NewStringStringMapReq(args ...string) *StringStringMapReq {
return &StringStringMapReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *StringStringMapReq) ParseReply(rd reader) (interface{}, error) {
return parseStringStringMapReply(rd)
}
func (r *StringStringMapReq) Val() map[string]string {
if r.val == nil {
return nil
}
return r.val.(map[string]string)
}
//------------------------------------------------------------------------------
type StringFloatMapReq struct {
*BaseReq
}
func NewStringFloatMapReq(args ...string) *StringFloatMapReq {
return &StringFloatMapReq{
BaseReq: NewBaseReq(args...),
}
}
func (r *StringFloatMapReq) ParseReply(rd reader) (interface{}, error) {
return parseStringFloatMapReply(rd)
}
func (r *StringFloatMapReq) Val() map[string]float64 {
if r.val == nil {
return nil
}
return r.val.(map[string]float64)
}

93
v2/req_test.go Normal file
View File

@ -0,0 +1,93 @@
package redis_test
import (
"github.com/vmihailenco/bufio"
. "launchpad.net/gocheck"
"github.com/vmihailenco/redis"
)
//------------------------------------------------------------------------------
type LineReader struct {
line []byte
}
func NewLineReader(line []byte) *LineReader {
return &LineReader{line: line}
}
func (r *LineReader) Read(buf []byte) (int, error) {
return copy(buf, r.line), nil
}
//------------------------------------------------------------------------------
type RequestTest struct{}
var _ = Suite(&RequestTest{})
//------------------------------------------------------------------------------
func (t *RequestTest) SetUpTest(c *C) {}
func (t *RequestTest) TearDownTest(c *C) {}
//------------------------------------------------------------------------------
func (t *RequestTest) benchmarkReq(c *C, reqString string, req redis.Req, checker Checker, expected interface{}) {
c.StopTimer()
lineReader := NewLineReader([]byte(reqString))
rd := bufio.NewReaderSize(lineReader, 1024)
for i := 0; i < 10; i++ {
vIface, err := req.ParseReply(rd)
c.Check(err, IsNil)
c.Check(vIface, checker, expected)
req.SetVal(vIface)
}
c.StartTimer()
for i := 0; i < c.N; i++ {
v, _ := req.ParseReply(rd)
req.SetVal(v)
}
}
func (t *RequestTest) BenchmarkStatusReq(c *C) {
t.benchmarkReq(c, "+OK\r\n", redis.NewStatusReq(), Equals, "OK")
}
func (t *RequestTest) BenchmarkIntReq(c *C) {
t.benchmarkReq(c, ":1\r\n", redis.NewIntReq(), Equals, int64(1))
}
func (t *RequestTest) BenchmarkStringReq(c *C) {
t.benchmarkReq(c, "$5\r\nhello\r\n", redis.NewStringReq(), Equals, "hello")
}
func (t *RequestTest) BenchmarkFloatReq(c *C) {
t.benchmarkReq(c, "$5\r\n1.111\r\n", redis.NewFloatReq(), Equals, 1.111)
}
func (t *RequestTest) BenchmarkStringSliceReq(c *C) {
t.benchmarkReq(
c,
"*2\r\n$5\r\nhello\r\n$5\r\nhello\r\n",
redis.NewStringSliceReq(),
DeepEquals,
[]string{"hello", "hello"},
)
}
func (t *RequestTest) BenchmarkIfaceSliceReq(c *C) {
t.benchmarkReq(
c,
"*2\r\n$5\r\nhello\r\n$5\r\nhello\r\n",
redis.NewIfaceSliceReq(),
DeepEquals,
[]interface{}{"hello", "hello"},
)
}

45
v2/script.go Normal file
View File

@ -0,0 +1,45 @@
package redis
import (
"crypto/sha1"
"encoding/hex"
"io"
"strings"
)
type Script struct {
src, hash string
}
func NewScript(src string) *Script {
h := sha1.New()
io.WriteString(h, src)
return &Script{
src: src,
hash: hex.EncodeToString(h.Sum(nil)),
}
}
func (s *Script) Load(c *Client) *StringReq {
return c.ScriptLoad(s.src)
}
func (s *Script) Exists(c *Client) *BoolSliceReq {
return c.ScriptExists(s.src)
}
func (s *Script) Eval(c *Client, keys []string, args []string) *IfaceReq {
return c.Eval(s.src, keys, args)
}
func (s *Script) EvalSha(c *Client, keys []string, args []string) *IfaceReq {
return c.EvalSha(s.hash, keys, args)
}
func (s *Script) Run(c *Client, keys []string, args []string) *IfaceReq {
r := s.EvalSha(c, keys, args)
if err := r.Err(); err != nil && strings.HasPrefix(err.Error(), "NOSCRIPT ") {
return s.Eval(c, keys, args)
}
return r
}