Close all connections.

This commit is contained in:
Vladimir Mihailenco 2013-11-07 16:20:15 +02:00
parent ab4d0d6b62
commit 3f491f8a8c
5 changed files with 294 additions and 240 deletions

9
v2/export_test.go Normal file
View File

@ -0,0 +1,9 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package redis
func (c *baseClient) Pool() pool {
return c.connPool
}

View File

@ -2,18 +2,25 @@ package redis
import ( import (
"container/list" "container/list"
"errors"
"net" "net"
"sync" "sync"
"time" "time"
"github.com/golang/glog"
"github.com/vmihailenco/bufio" "github.com/vmihailenco/bufio"
) )
var (
errPoolClosed = errors.New("attempt to use closed connection pool")
)
type pool interface { type pool interface {
Get() (*conn, bool, error) Get() (*conn, bool, error)
Put(*conn) error Put(*conn) error
Remove(*conn) error Remove(*conn) error
Len() int Len() int
Size() int
Close() error Close() error
} }
@ -22,17 +29,27 @@ type pool interface {
type conn struct { type conn struct {
cn net.Conn cn net.Conn
rd reader rd reader
inUse bool
usedAt time.Time usedAt time.Time
readTimeout, writeTimeout time.Duration readTimeout, writeTimeout time.Duration
elem *list.Element
} }
func newConn(netcn net.Conn) *conn { func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
cn := &conn{ return func() (*conn, error) {
cn: netcn, netcn, err := dial()
if err != nil {
return nil, err
}
cn := &conn{
cn: netcn,
}
cn.rd = bufio.NewReader(cn)
return cn, nil
} }
cn.rd = bufio.NewReader(cn)
return cn
} }
func (cn *conn) Read(b []byte) (int, error) { func (cn *conn) Read(b []byte) (int, error) {
@ -60,22 +77,25 @@ func (cn *conn) Close() error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type connPool struct { type connPool struct {
dial func() (net.Conn, error) New func() (*conn, error)
cond *sync.Cond cond *sync.Cond
conns *list.List conns *list.List
size, maxSize int len int
idleTimeout time.Duration maxSize int
idleTimeout time.Duration
closed bool
} }
func newConnPool( func newConnPool(
dial func() (net.Conn, error), dial func() (*conn, error),
maxSize int, maxSize int,
idleTimeout time.Duration, idleTimeout time.Duration,
) *connPool { ) *connPool {
return &connPool{ return &connPool{
dial: dial, New: dial,
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
conns: list.New(), conns: list.New(),
@ -86,87 +106,129 @@ func newConnPool(
} }
func (p *connPool) Get() (*conn, bool, error) { func (p *connPool) Get() (*conn, bool, error) {
defer p.cond.L.Unlock()
p.cond.L.Lock() p.cond.L.Lock()
for p.conns.Len() == 0 && p.size >= p.maxSize { if p.closed {
p.cond.Wait() p.cond.L.Unlock()
return nil, false, errPoolClosed
} }
if p.idleTimeout > 0 { if p.idleTimeout > 0 {
for e := p.conns.Front(); e != nil; e = e.Next() { for e := p.conns.Front(); e != nil; e = e.Next() {
cn := e.Value.(*conn) cn := e.Value.(*conn)
if cn.inUse {
break
}
if time.Since(cn.usedAt) > p.idleTimeout { if time.Since(cn.usedAt) > p.idleTimeout {
p.conns.Remove(e) if err := p.Remove(cn); err != nil {
glog.Errorf("Remove failed: %s", err)
}
} }
} }
} }
if p.conns.Len() == 0 { for p.conns.Len() >= p.maxSize && p.len == 0 {
rw, err := p.dial() p.cond.Wait()
}
if p.len > 0 {
elem := p.conns.Front()
cn := elem.Value.(*conn)
if cn.inUse {
panic("pool: precondition failed")
}
cn.inUse = true
p.conns.MoveToBack(elem)
p.len--
p.cond.L.Unlock()
return cn, false, nil
}
if p.conns.Len() < p.maxSize {
cn, err := p.New()
if err != nil { if err != nil {
p.cond.L.Unlock()
return nil, false, err return nil, false, err
} }
p.size++ cn.inUse = true
return newConn(rw), true, nil cn.elem = p.conns.PushBack(cn)
p.cond.L.Unlock()
return cn, true, nil
} }
elem := p.conns.Front() panic("not reached")
p.conns.Remove(elem)
return elem.Value.(*conn), false, nil
} }
func (p *connPool) Put(cn *conn) error { func (p *connPool) Put(cn *conn) error {
if cn.rd.Buffered() != 0 { if cn.rd.Buffered() != 0 {
panic("redis: attempt to put connection with buffered data") panic("redis: attempt to put connection with buffered data")
} }
p.cond.L.Lock() p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return errPoolClosed
}
cn.inUse = false
cn.usedAt = time.Now() cn.usedAt = time.Now()
p.conns.PushFront(cn) p.conns.MoveToFront(cn.elem)
p.len++
p.cond.Signal() p.cond.Signal()
p.cond.L.Unlock() p.cond.L.Unlock()
return nil return nil
} }
func (p *connPool) Remove(cn *conn) error { func (p *connPool) Remove(cn *conn) (err error) {
var err error p.cond.L.Lock()
if p.closed {
// Noop, connection is already closed.
p.cond.L.Unlock()
return nil
}
if cn != nil { if cn != nil {
err = cn.Close() err = cn.Close()
} }
p.cond.L.Lock() p.conns.Remove(cn.elem)
p.size-- cn.elem = nil
p.cond.Signal() p.cond.Signal()
p.cond.L.Unlock() p.cond.L.Unlock()
return err return err
} }
// Returns number of idle connections.
func (p *connPool) Len() int { func (p *connPool) Len() int {
defer p.cond.L.Unlock()
p.cond.L.Lock()
return p.len
}
// Returns size of the pool.
func (p *connPool) Size() int {
defer p.cond.L.Unlock() defer p.cond.L.Unlock()
p.cond.L.Lock() p.cond.L.Lock()
return p.conns.Len() return p.conns.Len()
} }
func (p *connPool) Size() int {
defer p.cond.L.Unlock()
p.cond.L.Lock()
return p.size
}
func (p *connPool) Close() error { func (p *connPool) Close() error {
defer p.cond.L.Unlock() defer p.cond.L.Unlock()
p.cond.L.Lock() p.cond.L.Lock()
if p.closed {
for e := p.conns.Front(); e != nil; e = e.Next() { return nil
if err := e.Value.(*conn).Close(); err != nil {
return err
}
} }
p.conns.Init() p.closed = true
p.size = 0 var retErr error
for e := p.conns.Front(); e != nil; e = e.Next() {
return nil cn := e.Value.(*conn)
if err := cn.Close(); err != nil {
glog.Errorf("cn.Close failed: %s", err)
retErr = err
}
cn.elem = nil
}
p.conns = nil
return retErr
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -195,34 +257,33 @@ func (p *singleConnPool) Get() (*conn, bool, error) {
} }
p.l.RUnlock() p.l.RUnlock()
defer p.l.Unlock()
p.l.Lock() p.l.Lock()
cn, isNew, err := p.pool.Get() cn, isNew, err := p.pool.Get()
if err != nil { if err != nil {
p.l.Unlock()
return nil, false, err return nil, false, err
} }
p.cn = cn p.cn = cn
p.l.Unlock()
return cn, isNew, nil return cn, isNew, nil
} }
func (p *singleConnPool) Put(cn *conn) error { func (p *singleConnPool) Put(cn *conn) error {
defer p.l.Unlock()
p.l.Lock() p.l.Lock()
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
p.l.Unlock()
return nil return nil
} }
func (p *singleConnPool) Remove(cn *conn) error { func (p *singleConnPool) Remove(cn *conn) error {
defer p.l.Unlock()
p.l.Lock() p.l.Lock()
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
p.cn = nil p.cn = nil
p.l.Unlock()
return nil return nil
} }
@ -235,6 +296,15 @@ func (p *singleConnPool) Len() int {
return 1 return 1
} }
func (p *singleConnPool) Size() int {
defer p.l.Unlock()
p.l.Lock()
if p.cn == nil {
return 0
}
return 1
}
func (p *singleConnPool) Close() error { func (p *singleConnPool) Close() error {
defer p.l.Unlock() defer p.l.Unlock()
p.l.Lock() p.l.Lock()

View File

@ -5,6 +5,7 @@ import (
"time" "time"
) )
// Not thread-safe.
type PubSub struct { type PubSub struct {
*baseClient *baseClient
} }

View File

@ -113,7 +113,7 @@ func (c *baseClient) run(cmd Cmder) {
} }
if err := c.writeCmd(cn, cmd); err != nil { if err := c.writeCmd(cn, cmd); err != nil {
c.removeConn(cn) c.freeConn(cn, err)
cmd.setErr(err) cmd.setErr(err)
return return
} }
@ -173,10 +173,7 @@ func newClient(opt *Options, dial func() (net.Conn, error)) *Client {
baseClient: &baseClient{ baseClient: &baseClient{
opt: opt, opt: opt,
connPool: newConnPool( connPool: newConnPool(newConnFunc(dial), opt.getPoolSize(), opt.IdleTimeout),
dial, opt.getPoolSize(),
opt.IdleTimeout,
),
}, },
} }
} }

View File

@ -83,147 +83,141 @@ func (t *RedisConnectorTest) TestUnixConnector(c *C) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// type RedisConnPoolTest struct { type RedisConnPoolTest struct {
// dialedConns, closedConns int64 client *redis.Client
}
// client *redis.Client var _ = Suite(&RedisConnPoolTest{})
// }
// var _ = Suite(&RedisConnPoolTest{}) func (t *RedisConnPoolTest) SetUpTest(c *C) {
t.client = redis.NewTCPClient(&redis.Options{
Addr: redisAddr,
})
}
// func (t *RedisConnPoolTest) SetUpTest(c *C) { func (t *RedisConnPoolTest) TearDownTest(c *C) {
// if t.client == nil { c.Assert(t.client.FlushDb().Err(), IsNil)
// dial := func() (net.Conn, error) { c.Assert(t.client.Close(), IsNil)
// t.dialedConns++ }
// return net.Dial("tcp", redisAddr)
// }
// close := func(conn net.Conn) error {
// t.closedConns++
// return nil
// }
// t.client = (&redis.ClientFactory{ func (t *RedisConnPoolTest) TestConnPoolMaxSize(c *C) {
// Dial: dial, wg := &sync.WaitGroup{}
// Close: close, for i := 0; i < 1000; i++ {
// }).New() wg.Add(1)
// } go func() {
// } ping := t.client.Ping()
c.Assert(ping.Err(), IsNil)
c.Assert(ping.Val(), Equals, "PONG")
wg.Done()
}()
}
wg.Wait()
// func (t *RedisConnPoolTest) TearDownTest(c *C) { c.Assert(t.client.Pool().Size(), Equals, 10)
// t.resetRedis(c) c.Assert(t.client.Pool().Len(), Equals, 10)
// t.resetClient(c) }
// }
// func (t *RedisConnPoolTest) resetRedis(c *C) { func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnPipelineClient(c *C) {
// // This is much faster than Flushall. const N = 1000
// c.Assert(t.client.Select(1).Err(), IsNil)
// c.Assert(t.client.FlushDb().Err(), IsNil)
// c.Assert(t.client.Select(0).Err(), IsNil)
// c.Assert(t.client.FlushDb().Err(), IsNil)
// }
// func (t *RedisConnPoolTest) resetClient(c *C) { wg := &sync.WaitGroup{}
// t.client.Close() wg.Add(N)
// c.Check(t.closedConns, Equals, t.dialedConns) for i := 0; i < N; i++ {
// t.dialedConns, t.closedConns = 0, 0 go func() {
// } pipeline := t.client.Pipeline()
ping := pipeline.Ping()
cmds, err := pipeline.Exec()
c.Assert(err, IsNil)
c.Assert(cmds, HasLen, 1)
c.Assert(ping.Err(), IsNil)
c.Assert(ping.Val(), Equals, "PONG")
// func (t *RedisConnPoolTest) TestConnPoolMaxSize(c *C) { c.Assert(pipeline.Close(), IsNil)
// wg := &sync.WaitGroup{}
// for i := 0; i < 1000; i++ {
// wg.Add(1)
// go func() {
// ping := t.client.Ping()
// c.Assert(ping.Err(), IsNil)
// c.Assert(ping.Val(), Equals, "PONG")
// wg.Done()
// }()
// }
// wg.Wait()
// c.Assert(t.client.Close(), IsNil) wg.Done()
// c.Assert(t.dialedConns, Equals, int64(10)) }()
// c.Assert(t.closedConns, Equals, int64(10)) }
// } wg.Wait()
// func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnPipelineClient(c *C) { c.Assert(t.client.Pool().Size(), Equals, 10)
// wg := &sync.WaitGroup{} c.Assert(t.client.Pool().Len(), Equals, 10)
// for i := 0; i < 1000; i++ { }
// wg.Add(1)
// go func() {
// pipeline, err := t.client.PipelineClient()
// c.Assert(err, IsNil)
// ping := pipeline.Ping() func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnMultiClient(c *C) {
// cmds, err := pipeline.RunQueued() const N = 1000
// c.Assert(err, IsNil)
// c.Assert(cmds, HasLen, 1)
// c.Assert(ping.Err(), IsNil)
// c.Assert(ping.Val(), Equals, "PONG")
// c.Assert(pipeline.Close(), IsNil) wg := &sync.WaitGroup{}
wg.Add(N)
for i := 0; i < N; i++ {
go func() {
multi := t.client.Multi()
var ping *redis.StatusCmd
cmds, err := multi.Exec(func() {
ping = multi.Ping()
})
c.Assert(err, IsNil)
c.Assert(cmds, HasLen, 1)
c.Assert(ping.Err(), IsNil)
c.Assert(ping.Val(), Equals, "PONG")
// wg.Done() c.Assert(multi.Close(), IsNil)
// }()
// }
// wg.Wait()
// c.Assert(t.client.Close(), IsNil) wg.Done()
// c.Assert(t.dialedConns, Equals, int64(10)) }()
// c.Assert(t.closedConns, Equals, int64(10)) }
// } wg.Wait()
// func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnMultiClient(c *C) { c.Assert(t.client.Pool().Size(), Equals, 10)
// wg := &sync.WaitGroup{} c.Assert(t.client.Pool().Len(), Equals, 10)
// for i := 0; i < 1000; i++ { }
// wg.Add(1)
// go func() {
// multi, err := t.client.MultiClient()
// c.Assert(err, IsNil)
// var ping *redis.StatusCmd func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnPubSub(c *C) {
// cmds, err := multi.Exec(func() { const N = 1000
// ping = multi.Ping()
// })
// c.Assert(err, IsNil)
// c.Assert(cmds, HasLen, 1)
// c.Assert(ping.Err(), IsNil)
// c.Assert(ping.Val(), Equals, "PONG")
// c.Assert(multi.Close(), IsNil) wg := &sync.WaitGroup{}
wg.Add(N)
for i := 0; i < N; i++ {
go func() {
pubsub := t.client.PubSub()
c.Assert(pubsub.Subscribe(), IsNil)
c.Assert(pubsub.Close(), IsNil)
wg.Done()
}()
}
wg.Wait()
// wg.Done() c.Assert(t.client.Pool().Size(), Equals, 0)
// }() c.Assert(t.client.Pool().Len(), Equals, 0)
// } }
// wg.Wait()
// c.Assert(t.client.Close(), IsNil) func (t *RedisConnPoolTest) TestConnPoolRemovesBrokenConn(c *C) {
// c.Assert(t.dialedConns, Equals, int64(10)) cn, _, err := t.client.Pool().Get()
// c.Assert(t.closedConns, Equals, int64(10)) c.Assert(err, IsNil)
// } c.Assert(cn.Close(), IsNil)
c.Assert(t.client.Pool().Put(cn), IsNil)
// func (t *RedisConnPoolTest) TestConnPoolMaxSizeOnPubSub(c *C) { ping := t.client.Ping()
// wg := &sync.WaitGroup{} c.Assert(ping.Err().Error(), Equals, "use of closed network connection")
// for i := 0; i < 1000; i++ { c.Assert(ping.Val(), Equals, "")
// wg.Add(1)
// go func() {
// pubsub, err := t.client.PubSub()
// c.Assert(err, IsNil)
// _, err = pubsub.Subscribe() ping = t.client.Ping()
// c.Assert(err, IsNil) c.Assert(ping.Err(), IsNil)
c.Assert(ping.Val(), Equals, "PONG")
// c.Assert(pubsub.Close(), IsNil) c.Assert(t.client.Pool().Size(), Equals, 1)
c.Assert(t.client.Pool().Len(), Equals, 1)
}
// wg.Done() func (t *RedisConnPoolTest) TestConnPoolReusesConn(c *C) {
// }() for i := 0; i < 1000; i++ {
// } ping := t.client.Ping()
// wg.Wait() c.Assert(ping.Err(), IsNil)
c.Assert(ping.Val(), Equals, "PONG")
}
// c.Assert(t.client.Close(), IsNil) c.Assert(t.client.Pool().Size(), Equals, 1)
// c.Assert(t.dialedConns, Equals, int64(1000)) c.Assert(t.client.Pool().Len(), Equals, 1)
// c.Assert(t.closedConns, Equals, int64(1000)) }
// }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -235,12 +229,14 @@ var _ = Suite(&RedisTest{})
func Test(t *testing.T) { TestingT(t) } func Test(t *testing.T) { TestingT(t) }
func (t *RedisTest) SetUpSuite(c *C) {
t.client = redis.NewTCPClient(&redis.Options{
Addr: ":6379",
})
}
func (t *RedisTest) SetUpTest(c *C) { func (t *RedisTest) SetUpTest(c *C) {
if t.client == nil { t.resetRedis(c)
t.client = redis.NewTCPClient(&redis.Options{
Addr: ":6379",
})
}
} }
func (t *RedisTest) TearDownTest(c *C) { func (t *RedisTest) TearDownTest(c *C) {
@ -336,33 +332,6 @@ func (t *RedisTest) TestManyKeys2(c *C) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (t *RedisTest) TestConnPoolRemovesBrokenConn(c *C) {
c.Skip("fix me")
conn, err := net.Dial("tcp", redisAddr)
c.Assert(err, IsNil)
c.Assert(conn.Close(), IsNil)
client := redis.NewTCPClient(&redis.Options{
Addr: redisAddr,
})
defer func() {
c.Assert(client.Close(), IsNil)
}()
// c.Assert(client.ConnPool.Add(redis.NewConn(conn)), IsNil)
ping := client.Ping()
c.Assert(ping.Err().Error(), Equals, "use of closed network connection")
c.Assert(ping.Val(), Equals, "")
ping = client.Ping()
c.Assert(ping.Err(), IsNil)
c.Assert(ping.Val(), Equals, "PONG")
}
//------------------------------------------------------------------------------
func (t *RedisTest) TestAuth(c *C) { func (t *RedisTest) TestAuth(c *C) {
auth := t.client.Auth("password") auth := t.client.Auth("password")
c.Assert(auth.Err(), ErrorMatches, "ERR Client sent AUTH, but no password is set") c.Assert(auth.Err(), ErrorMatches, "ERR Client sent AUTH, but no password is set")
@ -2446,15 +2415,14 @@ func (t *RedisTest) TestPipeline(c *C) {
func (t *RedisTest) TestPipelineDiscardQueued(c *C) { func (t *RedisTest) TestPipelineDiscardQueued(c *C) {
pipeline := t.client.Pipeline() pipeline := t.client.Pipeline()
defer func() {
c.Assert(pipeline.Close(), IsNil)
}()
pipeline.Get("key") pipeline.Get("key")
pipeline.Discard() pipeline.Discard()
cmds, err := pipeline.Exec() cmds, err := pipeline.Exec()
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(cmds, HasLen, 0) c.Assert(cmds, HasLen, 0)
c.Assert(pipeline.Close(), IsNil)
} }
func (t *RedisTest) TestPipelineFunc(c *C) { func (t *RedisTest) TestPipelineFunc(c *C) {
@ -2489,19 +2457,18 @@ func (t *RedisTest) TestPipelineRunQueuedOnEmptyQueue(c *C) {
c.Assert(cmds, HasLen, 0) c.Assert(cmds, HasLen, 0)
} }
func (t *RedisTest) TestPipelineIncrFromGoroutines(c *C) { // TODO: make thread safe?
func (t *RedisTest) TestPipelineIncr(c *C) {
const N = 20000
key := "TestPipelineIncr"
pipeline := t.client.Pipeline() pipeline := t.client.Pipeline()
defer func() {
c.Assert(pipeline.Close(), IsNil)
}()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := int64(0); i < 20000; i++ { wg.Add(N)
wg.Add(1) for i := 0; i < N; i++ {
go func() { pipeline.Incr(key)
pipeline.Incr("TestIncrPipeliningFromGoroutinesKey") wg.Done()
wg.Done()
}()
} }
wg.Wait() wg.Wait()
@ -2514,23 +2481,24 @@ func (t *RedisTest) TestPipelineIncrFromGoroutines(c *C) {
} }
} }
get := t.client.Get("TestIncrPipeliningFromGoroutinesKey") get := t.client.Get(key)
c.Assert(get.Err(), IsNil) c.Assert(get.Err(), IsNil)
c.Assert(get.Val(), Equals, "20000") c.Assert(get.Val(), Equals, strconv.Itoa(N))
c.Assert(pipeline.Close(), IsNil)
} }
func (t *RedisTest) TestPipelineEchoFromGoroutines(c *C) { func (t *RedisTest) TestPipelineEcho(c *C) {
pipeline := t.client.Pipeline() const N = 1000
defer func() {
c.Assert(pipeline.Close(), IsNil)
}()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := int64(0); i < 1000; i += 2 { wg.Add(N)
wg.Add(1) for i := 0; i < N; i++ {
go func() { go func(i int) {
msg1 := "echo" + strconv.FormatInt(i, 10) pipeline := t.client.Pipeline()
msg2 := "echo" + strconv.FormatInt(i+1, 10)
msg1 := "echo" + strconv.Itoa(i)
msg2 := "echo" + strconv.Itoa(i+1)
echo1 := pipeline.Echo(msg1) echo1 := pipeline.Echo(msg1)
echo2 := pipeline.Echo(msg2) echo2 := pipeline.Echo(msg2)
@ -2545,8 +2513,10 @@ func (t *RedisTest) TestPipelineEchoFromGoroutines(c *C) {
c.Assert(echo2.Err(), IsNil) c.Assert(echo2.Err(), IsNil)
c.Assert(echo2.Val(), Equals, msg2) c.Assert(echo2.Val(), Equals, msg2)
c.Assert(pipeline.Close(), IsNil)
wg.Done() wg.Done()
}() }(i)
} }
wg.Wait() wg.Wait()
} }
@ -2703,36 +2673,43 @@ func (t *RedisTest) TestWatchUnwatch(c *C) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (t *RedisTest) TestSyncEchoFromGoroutines(c *C) { func (t *RedisTest) TestRaceEcho(c *C) {
const N = 10000
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := int64(0); i < 1000; i++ { wg.Add(N)
wg.Add(1) for i := 0; i < N; i++ {
go func() { go func(i int) {
msg := "echo" + strconv.FormatInt(i, 10) msg := "echo" + strconv.Itoa(i)
echo := t.client.Echo(msg) echo := t.client.Echo(msg)
c.Assert(echo.Err(), IsNil) c.Assert(echo.Err(), IsNil)
c.Assert(echo.Val(), Equals, msg) c.Assert(echo.Val(), Equals, msg)
wg.Done() wg.Done()
}() }(i)
} }
wg.Wait() wg.Wait()
} }
func (t *RedisTest) TestIncrFromGoroutines(c *C) { func (t *RedisTest) TestRaceIncr(c *C) {
const N = 10000
key := "TestIncrFromGoroutines"
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := int64(0); i < 20000; i++ { wg.Add(N)
wg.Add(1) for i := int64(0); i < N; i++ {
go func() { go func() {
incr := t.client.Incr("TestIncrFromGoroutinesKey") incr := t.client.Incr(key)
c.Assert(incr.Err(), IsNil) if err := incr.Err(); err != nil {
panic(err)
}
wg.Done() wg.Done()
}() }()
} }
wg.Wait() wg.Wait()
get := t.client.Get("TestIncrFromGoroutinesKey") get := t.client.Get(key)
c.Assert(get.Err(), IsNil) c.Assert(get.Err(), IsNil)
c.Assert(get.Val(), Equals, "20000") c.Assert(get.Val(), Equals, strconv.Itoa(N))
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------