diff --git a/.travis.yml b/.travis.yml index 22cc2068..1d3148f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,18 @@ -language: go sudo: false +language: go services: -- redis-server + - redis-server go: - 1.3 - 1.4 - 1.5 + - tip + +matrix: + allow_failures: + - go: tip install: - go get gopkg.in/bsm/ratelimit.v1 diff --git a/Makefile b/Makefile index 1b43765b..bda28e35 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,7 @@ all: testdeps go test ./... -test.v -test.cpu=1,2,4 - go test ./... -test.short -test.race - -test: testdeps - go test ./... -test.v=1 + sleep 3 # give Redis time to exit + go test ./... -test.v -test.short -test.race testdeps: .test/redis/src/redis-server diff --git a/cluster_test.go b/cluster_test.go index bc395b44..d875735f 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "net" + "reflect" "strings" "testing" @@ -116,12 +117,23 @@ func startCluster(scenario *clusterScenario) error { // Wait until all nodes have consistent info for _, client := range scenario.clients { err := eventually(func() error { - for _, masterId := range scenario.nodeIds[:3] { - s := client.ClusterNodes().Val() - wanted := "slave " + masterId - if !strings.Contains(s, wanted) { - return fmt.Errorf("%q does not contain %q", s, wanted) + res, err := client.ClusterSlots().Result() + if err != nil { + return err + } + wanted := []redis.ClusterSlotInfo{ + {0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}}, + {5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}}, + {10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}}, + } + loop: + for _, info := range res { + for _, info2 := range wanted { + if reflect.DeepEqual(info, info2) { + continue loop + } } + return fmt.Errorf("cluster did not reach consistent state (%v)", res) } return nil }, 10*time.Second) diff --git a/commands_test.go b/commands_test.go index b931d05e..4e3488dd 100644 --- a/commands_test.go +++ b/commands_test.go @@ -20,10 +20,8 @@ var _ = Describe("Commands", func() { BeforeEach(func() { client = redis.NewClient(&redis.Options{ - Addr: redisAddr, - ReadTimeout: 500 * time.Millisecond, - WriteTimeout: 500 * time.Millisecond, - PoolTimeout: 30 * time.Second, + Addr: redisAddr, + PoolTimeout: 30 * time.Second, }) }) @@ -81,13 +79,10 @@ var _ = Describe("Commands", func() { err := client.ClientPause(time.Second).Err() Expect(err).NotTo(HaveOccurred()) - Consistently(func() error { - return client.Ping().Err() - }, "400ms").Should(HaveOccurred()) // pause time - read timeout - - Eventually(func() error { - return client.Ping().Err() - }, "1s").ShouldNot(HaveOccurred()) + start := time.Now() + err = client.Ping().Err() + Expect(err).NotTo(HaveOccurred()) + Expect(time.Now()).To(BeTemporally("~", start.Add(time.Second), 800*time.Millisecond)) }) It("should ClientSetName and ClientGetName", func() { diff --git a/example_test.go b/example_test.go index 8de20827..c7969914 100644 --- a/example_test.go +++ b/example_test.go @@ -238,6 +238,7 @@ func ExamplePubSub_Receive() { } for i := 0; i < 2; i++ { + // ReceiveTimeout is a low level API. Use ReceiveMessage instead. msgi, err := pubsub.ReceiveTimeout(100 * time.Millisecond) if err != nil { panic(err) diff --git a/pubsub.go b/pubsub.go index a3792d3f..223f8b96 100644 --- a/pubsub.go +++ b/pubsub.go @@ -80,6 +80,9 @@ func (c *PubSub) PSubscribe(patterns ...string) error { } func remove(ss []string, es ...string) []string { + if len(es) == 0 { + return ss[:0] + } for _, e := range es { for i, s := range ss { if s == e { @@ -231,7 +234,9 @@ func (c *PubSub) Receive() (interface{}, error) { } func (c *PubSub) reconnect() { + // Close current connection. c.connPool.Remove(nil) // nil to force removal + if len(c.channels) > 0 { if err := c.Subscribe(c.channels...); err != nil { log.Printf("redis: Subscribe failed: %s", err) diff --git a/pubsub_test.go b/pubsub_test.go index 411c6431..1506a8eb 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -2,6 +2,7 @@ package redis_test import ( "net" + "sync" "time" . "github.com/onsi/ginkgo" @@ -261,19 +262,23 @@ var _ = Describe("PubSub", func() { writeErr: errTimeout, }) + var wg sync.WaitGroup + wg.Add(1) go func() { defer GinkgoRecover() + defer wg.Done() time.Sleep(100 * time.Millisecond) - n, err := client.Publish("mychannel", "hello").Result() + err := client.Publish("mychannel", "hello").Err() Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(2))) }() msg, err := pubsub.ReceiveMessage() Expect(err).NotTo(HaveOccurred()) Expect(msg.Channel).To(Equal("mychannel")) Expect(msg.Payload).To(Equal("hello")) + + wg.Wait() }) }) diff --git a/redis.go b/redis.go index 6d654b10..cd88cefd 100644 --- a/redis.go +++ b/redis.go @@ -121,7 +121,7 @@ type Options struct { PoolSize int // Specifies amount of time client waits for connection if all // connections are busy before returning an error. - // Default is 5 seconds. + // Default is 1 seconds. PoolTimeout time.Duration // Specifies amount of time after which client closes idle // connections. Should be less than server's timeout. diff --git a/ring.go b/ring.go index facf3e61..ff77f4d6 100644 --- a/ring.go +++ b/ring.go @@ -95,7 +95,7 @@ func (shard *ringShard) Vote(up bool) bool { // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // -// It monitors the state of each shard and removes dead shards from +// Ring monitors the state of each shard and removes dead shards from // the ring. When shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client diff --git a/ring_test.go b/ring_test.go index 5b52b320..0de37dc4 100644 --- a/ring_test.go +++ b/ring_test.go @@ -56,7 +56,7 @@ var _ = Describe("Redis ring", func() { // Ring needs 5 * heartbeat time to detect that node is down. // Give it more to be sure. heartbeat := 100 * time.Millisecond - time.Sleep(5*heartbeat + heartbeat) + time.Sleep(2 * 5 * heartbeat) setRingKeys()