forked from mirror/redis
Fix test.
This commit is contained in:
parent
f130ab6161
commit
b6b689904a
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"testing"
|
||||
|
@ -116,23 +117,23 @@ func startCluster(scenario *clusterScenario) error {
|
|||
// Wait until all nodes have consistent info
|
||||
for _, client := range scenario.clients {
|
||||
err := eventually(func() error {
|
||||
s := client.ClusterNodes().Val()
|
||||
nodes := strings.Split(s, "\n")
|
||||
if len(nodes) < 6 {
|
||||
return fmt.Errorf("got %d nodes, wanted 6", len(nodes))
|
||||
res, err := client.ClusterSlots().Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if node == "" {
|
||||
continue
|
||||
}
|
||||
parts := strings.Split(node, " ")
|
||||
var flags string
|
||||
if len(parts) >= 3 {
|
||||
flags = parts[2]
|
||||
}
|
||||
if !strings.Contains(flags, "master") && !strings.Contains(flags, "slave") {
|
||||
return fmt.Errorf("node flags are %q", flags)
|
||||
wanted := []redis.ClusterSlotInfo{
|
||||
{0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}},
|
||||
{5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}},
|
||||
{10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}},
|
||||
}
|
||||
loop:
|
||||
for _, info := range res {
|
||||
for _, info2 := range wanted {
|
||||
if reflect.DeepEqual(info, info2) {
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("cluster did not reach consistent state (%v)", res)
|
||||
}
|
||||
return nil
|
||||
}, 10*time.Second)
|
||||
|
|
|
@ -20,10 +20,8 @@ var _ = Describe("Commands", func() {
|
|||
|
||||
BeforeEach(func() {
|
||||
client = redis.NewClient(&redis.Options{
|
||||
Addr: redisAddr,
|
||||
ReadTimeout: 500 * time.Millisecond,
|
||||
WriteTimeout: 500 * time.Millisecond,
|
||||
PoolTimeout: 30 * time.Second,
|
||||
Addr: redisAddr,
|
||||
PoolTimeout: 30 * time.Second,
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -81,13 +79,10 @@ var _ = Describe("Commands", func() {
|
|||
err := client.ClientPause(time.Second).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
Consistently(func() error {
|
||||
return client.Ping().Err()
|
||||
}, "400ms").Should(HaveOccurred()) // pause time - read timeout
|
||||
|
||||
Eventually(func() error {
|
||||
return client.Ping().Err()
|
||||
}, "1s").ShouldNot(HaveOccurred())
|
||||
start := time.Now()
|
||||
err = client.Ping().Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(time.Now()).To(BeTemporally("~", start.Add(time.Second), 800*time.Millisecond))
|
||||
})
|
||||
|
||||
It("should ClientSetName and ClientGetName", func() {
|
||||
|
|
|
@ -238,6 +238,7 @@ func ExamplePubSub_Receive() {
|
|||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
// ReceiveTimeout is a low level API. Use ReceiveMessage instead.
|
||||
msgi, err := pubsub.ReceiveTimeout(100 * time.Millisecond)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -80,6 +80,9 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
|
|||
}
|
||||
|
||||
func remove(ss []string, es ...string) []string {
|
||||
if len(es) == 0 {
|
||||
return ss[:0]
|
||||
}
|
||||
for _, e := range es {
|
||||
for i, s := range ss {
|
||||
if s == e {
|
||||
|
@ -231,7 +234,9 @@ func (c *PubSub) Receive() (interface{}, error) {
|
|||
}
|
||||
|
||||
func (c *PubSub) reconnect() {
|
||||
// Close current connection.
|
||||
c.connPool.Remove(nil) // nil to force removal
|
||||
|
||||
if len(c.channels) > 0 {
|
||||
if err := c.Subscribe(c.channels...); err != nil {
|
||||
log.Printf("redis: Subscribe failed: %s", err)
|
||||
|
|
|
@ -2,6 +2,7 @@ package redis_test
|
|||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
|
@ -261,19 +262,23 @@ var _ = Describe("PubSub", func() {
|
|||
writeErr: errTimeout,
|
||||
})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer wg.Done()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
n, err := client.Publish("mychannel", "hello").Result()
|
||||
err := client.Publish("mychannel", "hello").Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(n).To(Equal(int64(2)))
|
||||
}()
|
||||
|
||||
msg, err := pubsub.ReceiveMessage()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(msg.Channel).To(Equal("mychannel"))
|
||||
Expect(msg.Payload).To(Equal("hello"))
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
})
|
||||
|
|
|
@ -56,7 +56,7 @@ var _ = Describe("Redis ring", func() {
|
|||
// Ring needs 5 * heartbeat time to detect that node is down.
|
||||
// Give it more to be sure.
|
||||
heartbeat := 100 * time.Millisecond
|
||||
time.Sleep(5*heartbeat + 2*heartbeat)
|
||||
time.Sleep(2 * 5 * heartbeat)
|
||||
|
||||
setRingKeys()
|
||||
|
||||
|
|
Loading…
Reference in New Issue