tile38/vendor/github.com/nats-io/go-nats/test/reconnect_test.go

652 lines
15 KiB
Go

// Copyright 2013-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/gnatsd/server"
"github.com/nats-io/go-nats"
)
func startReconnectServer(t *testing.T) *server.Server {
return RunServerOnPort(22222)
}
func TestReconnectTotalTime(t *testing.T) {
opts := nats.GetDefaultOptions()
totalReconnectTime := time.Duration(opts.MaxReconnect) * opts.ReconnectWait
if totalReconnectTime < (2 * time.Minute) {
t.Fatalf("Total reconnect time should be at least 2 mins: Currently %v\n",
totalReconnectTime)
}
}
func TestReconnectDisallowedFlags(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
ch := make(chan bool)
opts := nats.GetDefaultOptions()
opts.Url = "nats://localhost:22222"
opts.AllowReconnect = false
opts.ClosedCB = func(_ *nats.Conn) {
ch <- true
}
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
ts.Shutdown()
if e := Wait(ch); e != nil {
t.Fatal("Did not trigger ClosedCB correctly")
}
}
func TestReconnectAllowedFlags(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
ch := make(chan bool)
dch := make(chan bool)
opts := nats.GetDefaultOptions()
opts.Url = "nats://localhost:22222"
opts.AllowReconnect = true
opts.MaxReconnect = 2
opts.ReconnectWait = 1 * time.Second
opts.ClosedCB = func(_ *nats.Conn) {
ch <- true
}
opts.DisconnectedCB = func(_ *nats.Conn) {
dch <- true
}
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
ts.Shutdown()
// We want wait to timeout here, and the connection
// should not trigger the Close CB.
if e := WaitTime(ch, 500*time.Millisecond); e == nil {
t.Fatal("Triggered ClosedCB incorrectly")
}
// We should wait to get the disconnected callback to ensure
// that we are in the process of reconnecting.
if e := Wait(dch); e != nil {
t.Fatal("DisconnectedCB should have been triggered")
}
if !nc.IsReconnecting() {
t.Fatal("Expected to be in a reconnecting state")
}
// clear the CloseCB since ch will block
nc.Opts.ClosedCB = nil
}
var reconnectOpts = nats.Options{
Url: "nats://localhost:22222",
AllowReconnect: true,
MaxReconnect: 10,
ReconnectWait: 100 * time.Millisecond,
Timeout: nats.DefaultTimeout,
}
func TestConnCloseBreaksReconnectLoop(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
cch := make(chan bool)
opts := reconnectOpts
// Bump the max reconnect attempts
opts.MaxReconnect = 100
opts.ClosedCB = func(_ *nats.Conn) {
cch <- true
}
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
nc.Flush()
// Shutdown the server
ts.Shutdown()
// Wait a second, then close the connection
time.Sleep(time.Second)
// Close the connection, this should break the reconnect loop.
// Do this in a go routine since the issue was that Close()
// would block until the reconnect loop is done.
go nc.Close()
// Even on Windows (where a createConn takes more than a second)
// we should be able to break the reconnect loop with the following
// timeout.
if err := WaitTime(cch, 3*time.Second); err != nil {
t.Fatal("Did not get a closed callback")
}
}
func TestBasicReconnectFunctionality(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
ch := make(chan bool)
dch := make(chan bool)
opts := reconnectOpts
opts.DisconnectedCB = func(_ *nats.Conn) {
dch <- true
}
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v\n", err)
}
defer nc.Close()
ec, err := nats.NewEncodedConn(nc, nats.DEFAULT_ENCODER)
if err != nil {
t.Fatalf("Failed to create an encoded connection: %v\n", err)
}
testString := "bar"
ec.Subscribe("foo", func(s string) {
if s != testString {
t.Fatal("String doesn't match")
}
ch <- true
})
ec.Flush()
ts.Shutdown()
// server is stopped here...
if err := Wait(dch); err != nil {
t.Fatalf("Did not get the disconnected callback on time\n")
}
if err := ec.Publish("foo", testString); err != nil {
t.Fatalf("Failed to publish message: %v\n", err)
}
ts = startReconnectServer(t)
defer ts.Shutdown()
if err := ec.FlushTimeout(5 * time.Second); err != nil {
t.Fatalf("Error on Flush: %v", err)
}
if e := Wait(ch); e != nil {
t.Fatal("Did not receive our message")
}
expectedReconnectCount := uint64(1)
reconnectCount := ec.Conn.Stats().Reconnects
if reconnectCount != expectedReconnectCount {
t.Fatalf("Reconnect count incorrect: %d vs %d\n",
reconnectCount, expectedReconnectCount)
}
}
func TestExtendedReconnectFunctionality(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
opts := reconnectOpts
dch := make(chan bool)
opts.DisconnectedCB = func(_ *nats.Conn) {
dch <- true
}
rch := make(chan bool)
opts.ReconnectedCB = func(_ *nats.Conn) {
rch <- true
}
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
ec, err := nats.NewEncodedConn(nc, nats.DEFAULT_ENCODER)
if err != nil {
t.Fatalf("Failed to create an encoded connection: %v\n", err)
}
testString := "bar"
received := int32(0)
ec.Subscribe("foo", func(s string) {
atomic.AddInt32(&received, 1)
})
sub, _ := ec.Subscribe("foobar", func(s string) {
atomic.AddInt32(&received, 1)
})
ec.Publish("foo", testString)
ec.Flush()
ts.Shutdown()
// server is stopped here..
// wait for disconnect
if e := WaitTime(dch, 2*time.Second); e != nil {
t.Fatal("Did not receive a disconnect callback message")
}
// Sub while disconnected
ec.Subscribe("bar", func(s string) {
atomic.AddInt32(&received, 1)
})
// Unsub foobar while disconnected
sub.Unsubscribe()
if err = ec.Publish("foo", testString); err != nil {
t.Fatalf("Received an error after disconnect: %v\n", err)
}
if err = ec.Publish("bar", testString); err != nil {
t.Fatalf("Received an error after disconnect: %v\n", err)
}
ts = startReconnectServer(t)
defer ts.Shutdown()
// server is restarted here..
// wait for reconnect
if e := WaitTime(rch, 2*time.Second); e != nil {
t.Fatal("Did not receive a reconnect callback message")
}
if err = ec.Publish("foobar", testString); err != nil {
t.Fatalf("Received an error after server restarted: %v\n", err)
}
if err = ec.Publish("foo", testString); err != nil {
t.Fatalf("Received an error after server restarted: %v\n", err)
}
ch := make(chan bool)
ec.Subscribe("done", func(b bool) {
ch <- true
})
ec.Publish("done", true)
if e := Wait(ch); e != nil {
t.Fatal("Did not receive our message")
}
// Sleep a bit to guarantee scheduler runs and process all subs.
time.Sleep(50 * time.Millisecond)
if atomic.LoadInt32(&received) != 4 {
t.Fatalf("Received != %d, equals %d\n", 4, received)
}
}
func TestQueueSubsOnReconnect(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
opts := reconnectOpts
// Allow us to block on reconnect complete.
reconnectsDone := make(chan bool)
opts.ReconnectedCB = func(nc *nats.Conn) {
reconnectsDone <- true
}
// Create connection
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v\n", err)
}
defer nc.Close()
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
t.Fatalf("Failed to create an encoded connection: %v\n", err)
}
// To hold results.
results := make(map[int]int)
var mu sync.Mutex
// Make sure we got what we needed, 1 msg only and all seqnos accounted for..
checkResults := func(numSent int) {
mu.Lock()
defer mu.Unlock()
for i := 0; i < numSent; i++ {
if results[i] != 1 {
t.Fatalf("Received incorrect number of messages, [%d] for seq: %d\n", results[i], i)
}
}
// Auto reset results map
results = make(map[int]int)
}
subj := "foo.bar"
qgroup := "workers"
cb := func(seqno int) {
mu.Lock()
defer mu.Unlock()
results[seqno] = results[seqno] + 1
}
// Create Queue Subscribers
ec.QueueSubscribe(subj, qgroup, cb)
ec.QueueSubscribe(subj, qgroup, cb)
ec.Flush()
// Helper function to send messages and check results.
sendAndCheckMsgs := func(numToSend int) {
for i := 0; i < numToSend; i++ {
ec.Publish(subj, i)
}
// Wait for processing.
ec.Flush()
time.Sleep(50 * time.Millisecond)
// Check Results
checkResults(numToSend)
}
// Base Test
sendAndCheckMsgs(10)
// Stop and restart server
ts.Shutdown()
ts = startReconnectServer(t)
defer ts.Shutdown()
if err := Wait(reconnectsDone); err != nil {
t.Fatal("Did not get the ReconnectedCB!")
}
// Reconnect Base Test
sendAndCheckMsgs(10)
}
func TestIsClosed(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
nc := NewConnection(t, 22222)
defer nc.Close()
if nc.IsClosed() {
t.Fatalf("IsClosed returned true when the connection is still open.")
}
ts.Shutdown()
if nc.IsClosed() {
t.Fatalf("IsClosed returned true when the connection is still open.")
}
ts = startReconnectServer(t)
defer ts.Shutdown()
if nc.IsClosed() {
t.Fatalf("IsClosed returned true when the connection is still open.")
}
nc.Close()
if !nc.IsClosed() {
t.Fatalf("IsClosed returned false after Close() was called.")
}
}
func TestIsReconnectingAndStatus(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
disconnectedch := make(chan bool)
reconnectch := make(chan bool)
opts := nats.GetDefaultOptions()
opts.Url = "nats://localhost:22222"
opts.AllowReconnect = true
opts.MaxReconnect = 10000
opts.ReconnectWait = 100 * time.Millisecond
opts.DisconnectedCB = func(_ *nats.Conn) {
disconnectedch <- true
}
opts.ReconnectedCB = func(_ *nats.Conn) {
reconnectch <- true
}
// Connect, verify initial reconnecting state check, then stop the server
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
if nc.IsReconnecting() {
t.Fatalf("IsReconnecting returned true when the connection is still open.")
}
if status := nc.Status(); status != nats.CONNECTED {
t.Fatalf("Status returned %d when connected instead of CONNECTED", status)
}
ts.Shutdown()
// Wait until we get the disconnected callback
if e := Wait(disconnectedch); e != nil {
t.Fatalf("Disconnect callback wasn't triggered: %v", e)
}
if !nc.IsReconnecting() {
t.Fatalf("IsReconnecting returned false when the client is reconnecting.")
}
if status := nc.Status(); status != nats.RECONNECTING {
t.Fatalf("Status returned %d when reconnecting instead of CONNECTED", status)
}
ts = startReconnectServer(t)
defer ts.Shutdown()
// Wait until we get the reconnect callback
if e := Wait(reconnectch); e != nil {
t.Fatalf("Reconnect callback wasn't triggered: %v", e)
}
if nc.IsReconnecting() {
t.Fatalf("IsReconnecting returned true after the connection was reconnected.")
}
if status := nc.Status(); status != nats.CONNECTED {
t.Fatalf("Status returned %d when reconnected instead of CONNECTED", status)
}
// Close the connection, reconnecting should still be false
nc.Close()
if nc.IsReconnecting() {
t.Fatalf("IsReconnecting returned true after Close() was called.")
}
if status := nc.Status(); status != nats.CLOSED {
t.Fatalf("Status returned %d after Close() was called instead of CLOSED", status)
}
}
func TestFullFlushChanDuringReconnect(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
reconnectch := make(chan bool)
opts := nats.GetDefaultOptions()
opts.Url = "nats://localhost:22222"
opts.AllowReconnect = true
opts.MaxReconnect = 10000
opts.ReconnectWait = 100 * time.Millisecond
opts.ReconnectedCB = func(_ *nats.Conn) {
reconnectch <- true
}
// Connect
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
// Channel used to make the go routine sending messages to stop.
stop := make(chan bool)
// While connected, publish as fast as we can
go func() {
for i := 0; ; i++ {
_ = nc.Publish("foo", []byte("hello"))
// Make sure we are sending at least flushChanSize (1024) messages
// before potentially pausing.
if i%2000 == 0 {
select {
case <-stop:
return
default:
time.Sleep(100 * time.Millisecond)
}
}
}
}()
// Send a bit...
time.Sleep(500 * time.Millisecond)
// Shut down the server
ts.Shutdown()
// Continue sending while we are disconnected
time.Sleep(time.Second)
// Restart the server
ts = startReconnectServer(t)
defer ts.Shutdown()
// Wait for the reconnect CB to be invoked (but not for too long)
if e := WaitTime(reconnectch, 5*time.Second); e != nil {
t.Fatalf("Reconnect callback wasn't triggered: %v", e)
}
}
func TestReconnectVerbose(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
o := nats.GetDefaultOptions()
o.Verbose = true
rch := make(chan bool)
o.ReconnectedCB = func(_ *nats.Conn) {
rch <- true
}
nc, err := o.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
err = nc.Flush()
if err != nil {
t.Fatalf("Error during flush: %v", err)
}
s.Shutdown()
s = RunDefaultServer()
defer s.Shutdown()
if e := Wait(rch); e != nil {
t.Fatal("Should have reconnected ok")
}
err = nc.Flush()
if err != nil {
t.Fatalf("Error during flush: %v", err)
}
}
func TestReconnectBufSizeOption(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc, err := nats.Connect("nats://localhost:4222", nats.ReconnectBufSize(32))
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
if nc.Opts.ReconnectBufSize != 32 {
t.Fatalf("ReconnectBufSize should be 32 but it is %d", nc.Opts.ReconnectBufSize)
}
}
func TestReconnectBufSize(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
o := nats.GetDefaultOptions()
o.ReconnectBufSize = 32 // 32 bytes
dch := make(chan bool)
o.DisconnectedCB = func(_ *nats.Conn) {
dch <- true
}
nc, err := o.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()
err = nc.Flush()
if err != nil {
t.Fatalf("Error during flush: %v", err)
}
// Force disconnected state.
s.Shutdown()
if e := Wait(dch); e != nil {
t.Fatal("DisconnectedCB should have been triggered")
}
msg := []byte("food") // 4 bytes paylaod, total proto is 16 bytes
// These should work, 2X16 = 32
if err := nc.Publish("foo", msg); err != nil {
t.Fatalf("Failed to publish message: %v\n", err)
}
if err := nc.Publish("foo", msg); err != nil {
t.Fatalf("Failed to publish message: %v\n", err)
}
// This should fail since we have exhausted the backing buffer.
if err := nc.Publish("foo", msg); err == nil {
t.Fatalf("Expected to fail to publish message: got no error\n")
}
nc.Buffered()
}