forked from mirror/redis
Merge pull request #51 from go-redis/fix/fix-rl
Fix rate limiter and add test.
This commit is contained in:
commit
9778c1acf5
2
Makefile
2
Makefile
|
@ -1,2 +1,2 @@
|
|||
all:
|
||||
go test gopkg.in/redis.v2
|
||||
go test gopkg.in/redis.v2 -cpu=1,2,4
|
||||
|
|
12
commands.go
12
commands.go
|
@ -1,6 +1,7 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
@ -1120,7 +1121,16 @@ func (c *Client) shutdown(modifier string) *StatusCmd {
|
|||
}
|
||||
cmd := NewStatusCmd(args...)
|
||||
c.Process(cmd)
|
||||
c.Close()
|
||||
if err := cmd.Err(); err != nil {
|
||||
if err == io.EOF {
|
||||
// Server quit as expected.
|
||||
cmd.err = nil
|
||||
}
|
||||
} else {
|
||||
// Server did not quit. String reply contains the reason.
|
||||
cmd.err = errorf(cmd.val)
|
||||
cmd.val = ""
|
||||
}
|
||||
return cmd
|
||||
}
|
||||
|
||||
|
|
|
@ -37,9 +37,10 @@ func (rl *rateLimiter) Check() bool {
|
|||
if atomic.CompareAndSwapInt64(&rl.v, v, v-1) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *rateLimiter) Close() error {
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRateLimiter(t *testing.T) {
|
||||
const n = 100000
|
||||
rl := newRateLimiter(time.Second, n)
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if !rl.Check() {
|
||||
panic("check failed")
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if rl.Check() && rl.Check() {
|
||||
t.Fatal("check passed")
|
||||
}
|
||||
}
|
|
@ -88,10 +88,13 @@ func TestSentinel(t *testing.T) {
|
|||
slavePort := "8124"
|
||||
sentinelPort := "8125"
|
||||
|
||||
_, err := startRedis(masterPort)
|
||||
masterCmd, err := startRedis(masterPort)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer masterCmd.Process.Kill()
|
||||
|
||||
// Wait for master to start.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
master := redis.NewTCPClient(&redis.Options{
|
||||
|
@ -100,12 +103,14 @@ func TestSentinel(t *testing.T) {
|
|||
if err := master.Ping().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer master.Shutdown()
|
||||
|
||||
_, err = startRedisSlave(slavePort, masterPort)
|
||||
slaveCmd, err := startRedisSlave(slavePort, masterPort)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer slaveCmd.Process.Kill()
|
||||
|
||||
// Wait for slave to start.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
slave := redis.NewTCPClient(&redis.Options{
|
||||
|
@ -114,12 +119,14 @@ func TestSentinel(t *testing.T) {
|
|||
if err := slave.Ping().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer slave.Shutdown()
|
||||
|
||||
_, err = startRedisSentinel(sentinelPort, masterName, masterPort)
|
||||
sentinelCmd, err := startRedisSentinel(sentinelPort, masterName, masterPort)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sentinelCmd.Process.Kill()
|
||||
|
||||
// Wait for sentinel to start.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
sentinel := redis.NewTCPClient(&redis.Options{
|
||||
|
@ -147,8 +154,13 @@ func TestSentinel(t *testing.T) {
|
|||
t.Fatalf(`got %q, expected "master"`, val)
|
||||
}
|
||||
|
||||
// Kill master.
|
||||
master.Shutdown()
|
||||
// Kill Redis master.
|
||||
if err := masterCmd.Process.Kill(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := master.Ping().Err(); err == nil {
|
||||
t.Fatalf("master was not killed")
|
||||
}
|
||||
|
||||
// Wait for Redis sentinel to elect new master.
|
||||
time.Sleep(5 * time.Second)
|
||||
|
|
Loading…
Reference in New Issue