Make Script and Pipeline interoperable. Fixes #18.

This commit is contained in:
Vladimir Mihailenco 2013-12-30 13:02:14 +02:00
parent af4ea559fe
commit 82a41314a0
5 changed files with 57 additions and 23 deletions

View File

@ -1,7 +1,3 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package redis package redis
func (c *baseClient) Pool() pool { func (c *baseClient) Pool() pool {

View File

@ -71,7 +71,6 @@ func (c *Multi) Exec(f func()) ([]Cmder, error) {
return cmds[1 : len(cmds)-1], err return cmds[1 : len(cmds)-1], err
} }
// Synchronize writes and reads to the connection using mutex.
err = c.execCmds(cn, cmds) err = c.execCmds(cn, cmds)
if err != nil { if err != nil {
c.freeConn(cn, err) c.freeConn(cn, err)
@ -122,7 +121,7 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
var firstCmdErr error var firstCmdErr error
// Parse replies. // Parse replies.
// Loop starts from 1 to omit first cmduest (MULTI). // Loop starts from 1 to omit MULTI cmd.
for i := 1; i < cmdsLen; i++ { for i := 1; i < cmdsLen; i++ {
cmd := cmds[i] cmd := cmds[i]
val, err := cmd.parseReply(cn.rd) val, err := cmd.parseReply(cn.rd)

View File

@ -82,7 +82,7 @@ type connPool struct {
cond *sync.Cond cond *sync.Cond
conns *list.List conns *list.List
len int idleNum int
maxSize int maxSize int
idleTimeout time.Duration idleTimeout time.Duration
@ -114,8 +114,8 @@ func (p *connPool) Get() (*conn, bool, error) {
} }
if p.idleTimeout > 0 { if p.idleTimeout > 0 {
for e := p.conns.Front(); e != nil; e = e.Next() { for el := p.conns.Front(); el != nil; el = el.Next() {
cn := e.Value.(*conn) cn := el.Value.(*conn)
if cn.inUse { if cn.inUse {
break break
} }
@ -127,11 +127,11 @@ func (p *connPool) Get() (*conn, bool, error) {
} }
} }
for p.conns.Len() >= p.maxSize && p.len == 0 { for p.conns.Len() >= p.maxSize && p.idleNum == 0 {
p.cond.Wait() p.cond.Wait()
} }
if p.len > 0 { if p.idleNum > 0 {
elem := p.conns.Front() elem := p.conns.Front()
cn := elem.Value.(*conn) cn := elem.Value.(*conn)
if cn.inUse { if cn.inUse {
@ -139,7 +139,7 @@ func (p *connPool) Get() (*conn, bool, error) {
} }
cn.inUse = true cn.inUse = true
p.conns.MoveToBack(elem) p.conns.MoveToBack(elem)
p.len-- p.idleNum--
p.cond.L.Unlock() p.cond.L.Unlock()
return cn, false, nil return cn, false, nil
@ -166,15 +166,18 @@ func (p *connPool) Put(cn *conn) error {
if cn.rd.Buffered() != 0 { if cn.rd.Buffered() != 0 {
panic("redis: attempt to put connection with buffered data") panic("redis: attempt to put connection with buffered data")
} }
if p.idleTimeout > 0 {
cn.usedAt = time.Now()
}
p.cond.L.Lock() p.cond.L.Lock()
if p.closed { if p.closed {
p.cond.L.Unlock() p.cond.L.Unlock()
return errPoolClosed return errPoolClosed
} }
cn.inUse = false cn.inUse = false
cn.usedAt = time.Now()
p.conns.MoveToFront(cn.elem) p.conns.MoveToFront(cn.elem)
p.len++ p.idleNum++
p.cond.Signal() p.cond.Signal()
p.cond.L.Unlock() p.cond.L.Unlock()
return nil return nil
@ -201,10 +204,10 @@ func (p *connPool) Remove(cn *conn) (err error) {
func (p *connPool) Len() int { func (p *connPool) Len() int {
defer p.cond.L.Unlock() defer p.cond.L.Unlock()
p.cond.L.Lock() p.cond.L.Lock()
return p.len return p.idleNum
} }
// Returns size of the pool. // Returns number of connections in the pool.
func (p *connPool) Size() int { func (p *connPool) Size() int {
defer p.cond.L.Unlock() defer p.cond.L.Unlock()
p.cond.L.Lock() p.cond.L.Lock()

View File

@ -2844,15 +2844,15 @@ func (t *RedisTest) TestScriptingEvalSha(c *C) {
c.Assert(set.Err(), IsNil) c.Assert(set.Err(), IsNil)
c.Assert(set.Val(), Equals, "OK") c.Assert(set.Val(), Equals, "OK")
eval := t.client.Eval("return redis.call('get','foo')", []string{}, []string{}) eval := t.client.Eval("return redis.call('get','foo')", nil, nil)
c.Assert(eval.Err(), IsNil) c.Assert(eval.Err(), IsNil)
c.Assert(eval.Val(), Equals, "bar") c.Assert(eval.Val(), Equals, "bar")
evalSha := t.client.EvalSha("6b1bf486c81ceb7edf3c093f4c48582e38c0e791", []string{}, []string{}) evalSha := t.client.EvalSha("6b1bf486c81ceb7edf3c093f4c48582e38c0e791", nil, nil)
c.Assert(evalSha.Err(), IsNil) c.Assert(evalSha.Err(), IsNil)
c.Assert(evalSha.Val(), Equals, "bar") c.Assert(evalSha.Val(), Equals, "bar")
evalSha = t.client.EvalSha("ffffffffffffffffffffffffffffffffffffffff", []string{}, []string{}) evalSha = t.client.EvalSha("ffffffffffffffffffffffffffffffffffffffff", nil, nil)
c.Assert(evalSha.Err(), ErrorMatches, "NOSCRIPT No matching script. Please use EVAL.") c.Assert(evalSha.Err(), ErrorMatches, "NOSCRIPT No matching script. Please use EVAL.")
c.Assert(evalSha.Val(), Equals, nil) c.Assert(evalSha.Val(), Equals, nil)
} }
@ -2888,6 +2888,35 @@ func (t *RedisTest) TestScriptingScriptLoad(c *C) {
c.Assert(scriptLoad.Val(), Equals, "6b1bf486c81ceb7edf3c093f4c48582e38c0e791") c.Assert(scriptLoad.Val(), Equals, "6b1bf486c81ceb7edf3c093f4c48582e38c0e791")
} }
func (t *RedisTest) TestNewScript(c *C) {
s := redis.NewScript("return 1")
run := s.Run(t.client, nil, nil)
c.Assert(run.Err(), IsNil)
c.Assert(run.Val(), Equals, int64(1))
}
func (t *RedisTest) TestEvalAndPipeline(c *C) {
pipeline := t.client.Pipeline()
s := redis.NewScript("return 1")
run := s.Eval(pipeline, nil, nil)
_, err := pipeline.Exec()
c.Assert(err, IsNil)
c.Assert(run.Err(), IsNil)
c.Assert(run.Val(), Equals, int64(1))
}
func (t *RedisTest) TestEvalShaAndPipeline(c *C) {
s := redis.NewScript("return 1")
c.Assert(s.Load(t.client).Err(), IsNil)
pipeline := t.client.Pipeline()
run := s.Eval(pipeline, nil, nil)
_, err := pipeline.Exec()
c.Assert(err, IsNil)
c.Assert(run.Err(), IsNil)
c.Assert(run.Val(), Equals, int64(1))
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (t *RedisTest) BenchmarkRedisPing(c *C) { func (t *RedisTest) BenchmarkRedisPing(c *C) {

View File

@ -7,6 +7,13 @@ import (
"strings" "strings"
) )
type scripter interface {
Eval(script string, keys []string, args []string) *Cmd
EvalSha(sha1 string, keys []string, args []string) *Cmd
ScriptExists(scripts ...string) *BoolSliceCmd
ScriptLoad(script string) *StringCmd
}
type Script struct { type Script struct {
src, hash string src, hash string
} }
@ -20,19 +27,19 @@ func NewScript(src string) *Script {
} }
} }
func (s *Script) Load(c *Client) *StringCmd { func (s *Script) Load(c scripter) *StringCmd {
return c.ScriptLoad(s.src) return c.ScriptLoad(s.src)
} }
func (s *Script) Exists(c *Client) *BoolSliceCmd { func (s *Script) Exists(c scripter) *BoolSliceCmd {
return c.ScriptExists(s.src) return c.ScriptExists(s.src)
} }
func (s *Script) Eval(c *Client, keys []string, args []string) *Cmd { func (s *Script) Eval(c scripter, keys []string, args []string) *Cmd {
return c.Eval(s.src, keys, args) return c.Eval(s.src, keys, args)
} }
func (s *Script) EvalSha(c *Client, keys []string, args []string) *Cmd { func (s *Script) EvalSha(c scripter, keys []string, args []string) *Cmd {
return c.EvalSha(s.hash, keys, args) return c.EvalSha(s.hash, keys, args)
} }