mirror of https://github.com/tidwall/tile38.git
397 lines
9.5 KiB
Go
397 lines
9.5 KiB
Go
// Copyright 2018 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"fmt"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
nats "github.com/nats-io/go-nats"
|
|
)
|
|
|
|
// Drain can be very useful for graceful shutdown of subscribers.
|
|
// Especially queue subscribers.
|
|
func TestDrain(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
done := make(chan bool)
|
|
received := int32(0)
|
|
expected := int32(100)
|
|
|
|
cb := func(_ *nats.Msg) {
|
|
// Allow this to back up.
|
|
time.Sleep(time.Millisecond)
|
|
rcvd := atomic.AddInt32(&received, 1)
|
|
if rcvd >= expected {
|
|
done <- true
|
|
}
|
|
}
|
|
|
|
sub, err := nc.Subscribe("foo", cb)
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
|
|
for i := int32(0); i < expected; i++ {
|
|
nc.Publish("foo", []byte("Don't forget about me"))
|
|
}
|
|
|
|
// Drain it and make sure we receive all messages.
|
|
sub.Drain()
|
|
select {
|
|
case <-done:
|
|
break
|
|
case <-time.After(2 * time.Second):
|
|
r := atomic.LoadInt32(&received)
|
|
if r != expected {
|
|
t.Fatalf("Did not receive all messages: %d of %d", r, expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDrainQueueSub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
done := make(chan bool)
|
|
received := int32(0)
|
|
expected := int32(4096)
|
|
numSubs := int32(32)
|
|
|
|
checkDone := func() int32 {
|
|
rcvd := atomic.AddInt32(&received, 1)
|
|
if rcvd >= expected {
|
|
done <- true
|
|
}
|
|
return rcvd
|
|
}
|
|
|
|
callback := func(m *nats.Msg) {
|
|
rcvd := checkDone()
|
|
// Randomly replace this sub from time to time.
|
|
if rcvd%3 == 0 {
|
|
m.Sub.Drain()
|
|
// Create a new one that we will not drain.
|
|
nc.QueueSubscribe("foo", "bar", func(m *nats.Msg) { checkDone() })
|
|
}
|
|
}
|
|
|
|
for i := int32(0); i < numSubs; i++ {
|
|
_, err := nc.QueueSubscribe("foo", "bar", callback)
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
}
|
|
|
|
for i := int32(0); i < expected; i++ {
|
|
nc.Publish("foo", []byte("Don't forget about me"))
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
break
|
|
case <-time.After(5 * time.Second):
|
|
r := atomic.LoadInt32(&received)
|
|
if r != expected {
|
|
t.Fatalf("Did not receive all messages: %d of %d", r, expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func waitFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
|
|
t.Helper()
|
|
timeout := time.Now().Add(totalWait)
|
|
var err error
|
|
for time.Now().Before(timeout) {
|
|
err = f()
|
|
if err == nil {
|
|
return
|
|
}
|
|
time.Sleep(sleepDur)
|
|
}
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
}
|
|
|
|
func TestDrainUnSubs(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
num := 100
|
|
subs := make([]*nats.Subscription, num)
|
|
|
|
// Normal Unsubscribe
|
|
for i := 0; i < num; i++ {
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {})
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
subs[i] = sub
|
|
}
|
|
|
|
if numSubs := nc.NumSubscriptions(); numSubs != num {
|
|
t.Fatalf("Expected %d subscriptions, got %d", num, numSubs)
|
|
}
|
|
for i := 0; i < num; i++ {
|
|
subs[i].Unsubscribe()
|
|
}
|
|
if numSubs := nc.NumSubscriptions(); numSubs != 0 {
|
|
t.Fatalf("Expected no subscriptions, got %d", numSubs)
|
|
}
|
|
|
|
// Drain version
|
|
for i := 0; i < num; i++ {
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {})
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
subs[i] = sub
|
|
}
|
|
|
|
if numSubs := nc.NumSubscriptions(); numSubs != num {
|
|
t.Fatalf("Expected %d subscriptions, got %d", num, numSubs)
|
|
}
|
|
for i := 0; i < num; i++ {
|
|
subs[i].Drain()
|
|
}
|
|
// Should happen quickly that we get to zero, so do not need to wait long.
|
|
waitFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
|
if numSubs := nc.NumSubscriptions(); numSubs != 0 {
|
|
return fmt.Errorf("Expected no subscriptions, got %d\n", numSubs)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestDrainSlowSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
received := int32(0)
|
|
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&received, 1)
|
|
time.Sleep(100 * time.Millisecond)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
|
|
total := 10
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Slow Slow"))
|
|
}
|
|
nc.Flush()
|
|
|
|
pmsgs, _, _ := sub.Pending()
|
|
if pmsgs != total && pmsgs != total-1 {
|
|
t.Fatalf("Expected most messages to be pending, but got %d vs %d", pmsgs, total)
|
|
}
|
|
sub.Drain()
|
|
|
|
// Should take a second or so to drain away.
|
|
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
|
// Wait for it to become invalid. Once drained it is unsubscribed.
|
|
_, _, err := sub.Pending()
|
|
if err != nats.ErrBadSubscription {
|
|
return fmt.Errorf("Still valid")
|
|
}
|
|
r := int(atomic.LoadInt32(&received))
|
|
if r != total {
|
|
t.Fatalf("Did not receive all messages, got %d vs %d", r, total)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestDrainConnection(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
done := make(chan bool)
|
|
rdone := make(chan bool)
|
|
|
|
closed := func(nc *nats.Conn) {
|
|
done <- true
|
|
}
|
|
|
|
url := fmt.Sprintf("nats://127.0.0.1:%d", nats.DefaultPort)
|
|
nc, err := nats.Connect(url, nats.ClosedHandler(closed))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create default connection: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
nc2, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create default connection: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
received := int32(0)
|
|
responses := int32(0)
|
|
expected := int32(50)
|
|
sleep := 10 * time.Millisecond
|
|
|
|
// Create the listner for responses on "bar"
|
|
_, err = nc2.Subscribe("bar", func(_ *nats.Msg) {
|
|
r := atomic.AddInt32(&responses, 1)
|
|
if r == expected {
|
|
rdone <- true
|
|
}
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription for responses: %v", err)
|
|
}
|
|
|
|
// Create a slow subscriber for the responder
|
|
sub, err := nc.Subscribe("foo", func(m *nats.Msg) {
|
|
time.Sleep(sleep)
|
|
atomic.AddInt32(&received, 1)
|
|
err := nc.Publish(m.Reply, []byte("Stop bugging me"))
|
|
if err != nil {
|
|
t.Errorf("Publisher received an error sending response: %v\n", err)
|
|
}
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
|
|
// Publish some messages
|
|
for i := int32(0); i < expected; i++ {
|
|
nc.PublishRequest("foo", "bar", []byte("Slow Slow"))
|
|
}
|
|
|
|
drainStart := time.Now()
|
|
nc.Drain()
|
|
|
|
// Sub should be disabled immediately
|
|
if err := sub.Unsubscribe(); err == nil {
|
|
t.Fatalf("Expected to receive an error on Unsubscribe after drain")
|
|
}
|
|
// Also can not create any new subs
|
|
if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {}); err == nil {
|
|
t.Fatalf("Expected to receive an error on new Subscription after drain")
|
|
}
|
|
|
|
// Make sure we can still publish, this is for any responses.
|
|
if err := nc.Publish("baz", []byte("Slow Slow")); err != nil {
|
|
t.Fatalf("Expected to not receive an error on Publish after drain, got %v", err)
|
|
}
|
|
|
|
// Wait for the closed state from nc
|
|
select {
|
|
case <-done:
|
|
if time.Since(drainStart) < (sleep * time.Duration(expected)) {
|
|
t.Fatalf("Drain exited too soon\n")
|
|
}
|
|
r := atomic.LoadInt32(&received)
|
|
if r != expected {
|
|
t.Fatalf("Did not receive all messages from Drain, %d vs %d", r, expected)
|
|
}
|
|
break
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("Timeout waiting for closed state for connection")
|
|
}
|
|
|
|
// Now make sure all responses were received.
|
|
select {
|
|
case <-rdone:
|
|
r := atomic.LoadInt32(&responses)
|
|
if r != expected {
|
|
t.Fatalf("Did not receive all responses, %d vs %d", r, expected)
|
|
}
|
|
break
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("Timeout waiting for all the responses")
|
|
}
|
|
}
|
|
|
|
func TestDrainConnectionAutoUnsub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
errors := int32(0)
|
|
received := int32(0)
|
|
expected := int32(10)
|
|
|
|
done := make(chan bool)
|
|
|
|
closed := func(nc *nats.Conn) {
|
|
done <- true
|
|
}
|
|
|
|
errCb := func(nc *nats.Conn, s *nats.Subscription, err error) {
|
|
atomic.AddInt32(&errors, 1)
|
|
}
|
|
|
|
url := fmt.Sprintf("nats://127.0.0.1:%d", nats.DefaultPort)
|
|
nc, err := nats.Connect(url, nats.ErrorHandler(errCb), nats.ClosedHandler(closed))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create default connection: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
// So they back up a bit in client and allow drain to do its thing.
|
|
time.Sleep(10 * time.Millisecond)
|
|
atomic.AddInt32(&received, 1)
|
|
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error creating subscription; %v", err)
|
|
}
|
|
|
|
sub.AutoUnsubscribe(int(expected))
|
|
|
|
// Publish some messages
|
|
for i := 0; i < 50; i++ {
|
|
nc.Publish("foo", []byte("Only 10 please!"))
|
|
}
|
|
// Flush here so messages coming back into client.
|
|
nc.Flush()
|
|
|
|
// Now add drain state.
|
|
time.Sleep(10 * time.Millisecond)
|
|
nc.Drain()
|
|
|
|
// Wait for the closed state from nc
|
|
select {
|
|
case <-done:
|
|
errs := atomic.LoadInt32(&errors)
|
|
if errs > 0 {
|
|
t.Fatalf("Did not expect any errors, got %d", errs)
|
|
}
|
|
r := atomic.LoadInt32(&received)
|
|
if r != expected {
|
|
t.Fatalf("Did not receive all messages from Drain, %d vs %d", r, expected)
|
|
}
|
|
break
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("Timeout waiting for closed state for connection")
|
|
}
|
|
}
|