tile38/vendor/github.com/nats-io/go-nats/test/sub_test.go

1496 lines
34 KiB
Go

// Copyright 2013-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/go-nats"
)
// More advanced tests on subscriptions
func TestServerAutoUnsub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
received := int32(0)
max := int32(10)
// Call this to make sure that we have everything setup connection wise
nc.Flush()
base := runtime.NumGoroutine()
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
atomic.AddInt32(&received, 1)
})
if err != nil {
t.Fatal("Failed to subscribe: ", err)
}
sub.AutoUnsubscribe(int(max))
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt32(&received) != max {
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
}
if sub.IsValid() {
t.Fatal("Expected subscription to be invalid after hitting max")
}
if err := sub.AutoUnsubscribe(10); err == nil {
t.Fatal("Calling AutoUnsubscribe() on closed subscription should fail")
}
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist post max subscriptions hit", delta)
}
return nil
})
}
func TestClientSyncAutoUnsub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
received := 0
max := 10
sub, _ := nc.SubscribeSync("foo")
sub.AutoUnsubscribe(max)
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
for {
_, err := sub.NextMsg(10 * time.Millisecond)
if err != nil {
if err != nats.ErrMaxMessages {
t.Fatalf("Expected '%v', but got: '%v'\n", nats.ErrBadSubscription, err.Error())
}
break
}
received++
}
if received != max {
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
}
if sub.IsValid() {
t.Fatal("Expected subscription to be invalid after hitting max")
}
if err := sub.AutoUnsubscribe(10); err == nil {
t.Fatal("Calling AutoUnsubscribe() ob closed subscription should fail")
}
}
func TestClientASyncAutoUnsub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
received := int32(0)
max := int32(10)
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
atomic.AddInt32(&received, 1)
})
if err != nil {
t.Fatal("Failed to subscribe: ", err)
}
sub.AutoUnsubscribe(int(max))
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(&received) != max {
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
}
if err := sub.AutoUnsubscribe(10); err == nil {
t.Fatal("Calling AutoUnsubscribe() on closed subscription should fail")
}
}
func TestAutoUnsubAndReconnect(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
rch := make(chan bool)
nc, err := nats.Connect(nats.DefaultURL,
nats.ReconnectWait(50*time.Millisecond),
nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true }))
if err != nil {
t.Fatalf("Unable to connect: %v", err)
}
defer nc.Close()
received := int32(0)
max := int32(10)
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
atomic.AddInt32(&received, 1)
})
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
sub.AutoUnsubscribe(int(max))
// Send less than the max
total := int(max / 2)
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
// Restart the server
s.Shutdown()
s = RunDefaultServer()
defer s.Shutdown()
// and wait to reconnect
if err := Wait(rch); err != nil {
t.Fatal("Failed to get the reconnect cb")
}
// Now send more than the total max.
total = int(3 * max)
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
// Wait a bit before checking.
time.Sleep(50 * time.Millisecond)
// We should have received only up-to-max messages.
if atomic.LoadInt32(&received) != max {
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
}
}
func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
rch := make(chan bool, 1)
nc, err := nats.Connect(nats.DefaultURL,
nats.ReconnectWait(50*time.Millisecond),
nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true }))
if err != nil {
t.Fatalf("Unable to connect: %v", err)
}
defer nc.Close()
numRoutines := 3
max := 100
total := max * 2
received := int64(0)
var wg sync.WaitGroup
sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
sub.AutoUnsubscribe(int(max))
nc.Flush()
wg.Add(numRoutines)
for i := 0; i < numRoutines; i++ {
go func(s *nats.Subscription, idx int) {
for {
// The first to reach the max delivered will cause the
// subscription to be removed, which will kick out all
// other calls to NextMsg. So don't be afraid of the long
// timeout.
_, err := s.NextMsg(3 * time.Second)
if err != nil {
break
}
atomic.AddInt64(&received, 1)
}
wg.Done()
}(sub, i)
}
msg := []byte("Hello")
for i := 0; i < max/2; i++ {
nc.Publish("foo", msg)
}
nc.Flush()
s.Shutdown()
s = RunDefaultServer()
defer s.Shutdown()
// Make sure we got the reconnected cb
if err := Wait(rch); err != nil {
t.Fatal("Failed to get reconnected cb")
}
for i := 0; i < total; i++ {
nc.Publish("foo", msg)
}
nc.Flush()
wg.Wait()
if atomic.LoadInt64(&received) != int64(max) {
t.Fatalf("Wrong number of received msg: %v instead of %v", atomic.LoadInt64(&received), max)
}
}
func TestAutoUnsubscribeFromCallback(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unable to connect: %v", err)
}
defer nc.Close()
max := 10
resetUnsubMark := int64(max / 2)
limit := int64(100)
received := int64(0)
msg := []byte("Hello")
// Auto-unsubscribe within the callback with a value lower
// than what was already received.
sub, err := nc.Subscribe("foo", func(m *nats.Msg) {
r := atomic.AddInt64(&received, 1)
if r == resetUnsubMark {
m.Sub.AutoUnsubscribe(int(r - 1))
nc.Flush()
}
if r == limit {
// Something went wrong... fail now
t.Fatal("Got more messages than expected")
}
nc.Publish("foo", msg)
})
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
sub.AutoUnsubscribe(int(max))
nc.Flush()
// Trigger the first message, the other are sent from the callback.
nc.Publish("foo", msg)
nc.Flush()
waitFor(t, time.Second, 100*time.Millisecond, func() error {
recv := atomic.LoadInt64(&received)
if recv != resetUnsubMark {
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
max, resetUnsubMark, recv)
}
return nil
})
// Now check with AutoUnsubscribe with higher value than original
received = int64(0)
newMax := int64(2 * max)
sub, err = nc.Subscribe("foo", func(m *nats.Msg) {
r := atomic.AddInt64(&received, 1)
if r == resetUnsubMark {
m.Sub.AutoUnsubscribe(int(newMax))
nc.Flush()
}
if r == limit {
// Something went wrong... fail now
t.Fatal("Got more messages than expected")
}
nc.Publish("foo", msg)
})
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
sub.AutoUnsubscribe(int(max))
nc.Flush()
// Trigger the first message, the other are sent from the callback.
nc.Publish("foo", msg)
nc.Flush()
waitFor(t, time.Second, 100*time.Millisecond, func() error {
recv := atomic.LoadInt64(&received)
if recv != newMax {
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
max, newMax, recv)
}
return nil
})
}
func TestCloseSubRelease(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, _ := nc.SubscribeSync("foo")
start := time.Now()
go func() {
time.Sleep(5 * time.Millisecond)
nc.Close()
}()
_, err := sub.NextMsg(50 * time.Millisecond)
if err == nil {
t.Fatalf("Expected an error from NextMsg")
}
elapsed := time.Since(start)
// On Windows, the minimum waitTime is at least 15ms.
if elapsed > 20*time.Millisecond {
t.Fatalf("Too much time has elapsed to release NextMsg: %dms",
(elapsed / time.Millisecond))
}
}
func TestIsValidSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if !sub.IsValid() {
t.Fatalf("Subscription should be valid")
}
for i := 0; i < 10; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
_, err = sub.NextMsg(200 * time.Millisecond)
if err != nil {
t.Fatalf("NextMsg returned an error")
}
sub.Unsubscribe()
_, err = sub.NextMsg(200 * time.Millisecond)
if err == nil {
t.Fatalf("NextMsg should have returned an error")
}
}
func TestSlowSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, _ := nc.SubscribeSync("foo")
sub.SetPendingLimits(100, 1024)
for i := 0; i < 200; i++ {
nc.Publish("foo", []byte("Hello"))
}
timeout := 5 * time.Second
start := time.Now()
nc.FlushTimeout(timeout)
elapsed := time.Since(start)
if elapsed >= timeout {
t.Fatalf("Flush did not return before timeout: %d > %d", elapsed, timeout)
}
// Make sure NextMsg returns an error to indicate slow consumer
_, err := sub.NextMsg(200 * time.Millisecond)
if err == nil {
t.Fatalf("NextMsg did not return an error")
}
}
func TestSlowChanSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
ch := make(chan *nats.Msg, 64)
sub, _ := nc.ChanSubscribe("foo", ch)
sub.SetPendingLimits(100, 1024)
for i := 0; i < 200; i++ {
nc.Publish("foo", []byte("Hello"))
}
timeout := 5 * time.Second
start := time.Now()
nc.FlushTimeout(timeout)
elapsed := time.Since(start)
if elapsed >= timeout {
t.Fatalf("Flush did not return before timeout: %d > %d", elapsed, timeout)
}
}
func TestSlowAsyncSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
bch := make(chan bool)
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
// block to back us up..
<-bch
})
// Make sure these are the defaults
pm, pb, _ := sub.PendingLimits()
if pm != nats.DefaultSubPendingMsgsLimit {
t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", nats.DefaultSubPendingMsgsLimit, pm)
}
if pb != nats.DefaultSubPendingBytesLimit {
t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", nats.DefaultSubPendingBytesLimit, pb)
}
// Set new limits
pml := 100
pbl := 1024 * 1024
sub.SetPendingLimits(pml, pbl)
// Make sure the set is correct
pm, pb, _ = sub.PendingLimits()
if pm != pml {
t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", pml, pm)
}
if pb != pbl {
t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", pbl, pb)
}
for i := 0; i < (int(pml) + 100); i++ {
nc.Publish("foo", []byte("Hello"))
}
timeout := 5 * time.Second
start := time.Now()
err := nc.FlushTimeout(timeout)
elapsed := time.Since(start)
if elapsed >= timeout {
t.Fatalf("Flush did not return before timeout")
}
// We want flush to work, so expect no error for it.
if err != nil {
t.Fatalf("Expected no error from Flush()\n")
}
if nc.LastError() != nats.ErrSlowConsumer {
t.Fatal("Expected LastError to indicate slow consumer")
}
// release the sub
bch <- true
}
func TestAsyncErrHandler(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
opts := nats.GetDefaultOptions()
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Could not connect to server: %v\n", err)
}
defer nc.Close()
subj := "async_test"
bch := make(chan bool)
sub, err := nc.Subscribe(subj, func(_ *nats.Msg) {
// block to back us up..
<-bch
})
if err != nil {
t.Fatalf("Could not subscribe: %v\n", err)
}
limit := 10
toSend := 100
// Limit internal subchan length to trip condition easier.
sub.SetPendingLimits(limit, 1024)
ch := make(chan bool)
aeCalled := int64(0)
nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {
atomic.AddInt64(&aeCalled, 1)
if s != sub {
t.Fatal("Did not receive proper subscription")
}
if e != nats.ErrSlowConsumer {
t.Fatalf("Did not receive proper error: %v vs %v", e, nats.ErrSlowConsumer)
}
// Suppress additional calls
if atomic.LoadInt64(&aeCalled) == 1 {
// release the sub
defer close(bch)
// release the test
ch <- true
}
})
b := []byte("Hello World!")
// First one trips the ch wait in subscription callback.
nc.Publish(subj, b)
nc.Flush()
for i := 0; i < toSend; i++ {
nc.Publish(subj, b)
}
if err := nc.Flush(); err != nil {
t.Fatalf("Got an error on Flush:%v", err)
}
if e := Wait(ch); e != nil {
t.Fatal("Failed to call async err handler")
}
// Make sure dropped stats is correct.
if d, _ := sub.Dropped(); d != toSend-limit+1 {
t.Fatalf("Expected Dropped to be %d, got %d", toSend-limit+1, d)
}
if ae := atomic.LoadInt64(&aeCalled); ae != 1 {
t.Fatalf("Expected err handler to be called only once, got %d", ae)
}
sub.Unsubscribe()
if _, err := sub.Dropped(); err == nil {
t.Fatal("Calling Dropped() on closed subscription should fail")
}
}
func TestAsyncErrHandlerChanSubscription(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
opts := nats.GetDefaultOptions()
nc, err := opts.Connect()
if err != nil {
t.Fatalf("Could not connect to server: %v", err)
}
defer nc.Close()
subj := "chan_test"
limit := 10
toSend := 100
// Create our own channel.
mch := make(chan *nats.Msg, limit)
sub, err := nc.ChanSubscribe(subj, mch)
if err != nil {
t.Fatalf("Could not subscribe: %v", err)
}
ch := make(chan bool)
aeCalled := int64(0)
nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {
atomic.AddInt64(&aeCalled, 1)
if e != nats.ErrSlowConsumer {
t.Fatalf("Did not receive proper error: %v vs %v",
e, nats.ErrSlowConsumer)
}
// Suppress additional calls
if atomic.LoadInt64(&aeCalled) == 1 {
// release the test
ch <- true
}
})
b := []byte("Hello World!")
for i := 0; i < toSend; i++ {
nc.Publish(subj, b)
}
nc.Flush()
if e := Wait(ch); e != nil {
t.Fatal("Failed to call async err handler")
}
// Make sure dropped stats is correct.
if d, _ := sub.Dropped(); d != toSend-limit {
t.Fatalf("Expected Dropped to be %d, go %d", toSend-limit, d)
}
if ae := atomic.LoadInt64(&aeCalled); ae != 1 {
t.Fatalf("Expected err handler to be called once, got %d", ae)
}
sub.Unsubscribe()
if _, err := sub.Dropped(); err == nil {
t.Fatal("Calling Dropped() on closed subscription should fail")
}
}
// Test to make sure that we can send and async receive messages on
// different subjects within a callback.
func TestAsyncSubscriberStarvation(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Helper
nc.Subscribe("helper", func(m *nats.Msg) {
nc.Publish(m.Reply, []byte("Hello"))
})
ch := make(chan bool)
// Kickoff
nc.Subscribe("start", func(m *nats.Msg) {
// Helper Response
response := nats.NewInbox()
nc.Subscribe(response, func(_ *nats.Msg) {
ch <- true
})
nc.PublishRequest("helper", response, []byte("Help Me!"))
})
nc.Publish("start", []byte("Begin"))
nc.Flush()
if e := Wait(ch); e != nil {
t.Fatal("Was stalled inside of callback waiting on another callback")
}
}
func TestAsyncSubscribersOnClose(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
toSend := 10
callbacks := int32(0)
ch := make(chan bool, toSend)
nc.Subscribe("foo", func(_ *nats.Msg) {
atomic.AddInt32(&callbacks, 1)
<-ch
})
for i := 0; i < toSend; i++ {
nc.Publish("foo", []byte("Hello World!"))
}
nc.Flush()
time.Sleep(10 * time.Millisecond)
nc.Close()
// Release callbacks
for i := 1; i < toSend; i++ {
ch <- true
}
// Wait for some time.
time.Sleep(10 * time.Millisecond)
seen := atomic.LoadInt32(&callbacks)
if seen != 1 {
t.Fatalf("Expected only one callback, received %d callbacks", seen)
}
}
func TestNextMsgCallOnAsyncSub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
})
if err != nil {
t.Fatal("Failed to subscribe: ", err)
}
_, err = sub.NextMsg(time.Second)
if err == nil {
t.Fatal("Expected an error call NextMsg() on AsyncSubscriber")
}
}
func TestNextMsgCallOnClosedSub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatal("Failed to subscribe: ", err)
}
if err = sub.Unsubscribe(); err != nil {
t.Fatal("Unsubscribe failed with err:", err)
}
_, err = sub.NextMsg(time.Second)
if err == nil {
t.Fatal("Expected an error calling NextMsg() on closed subscription")
} else if err != nats.ErrBadSubscription {
t.Fatalf("Expected '%v', but got: '%v'", nats.ErrBadSubscription, err.Error())
}
}
func TestChanSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Create our own channel.
ch := make(chan *nats.Msg, 128)
// Channel is mandatory
if _, err := nc.ChanSubscribe("foo", nil); err == nil {
t.Fatal("Creating subscription without channel should have failed")
}
_, err := nc.ChanSubscribe("foo", ch)
if err != nil {
t.Fatal("Failed to subscribe: ", err)
}
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
received := 0
tm := time.NewTimer(5 * time.Second)
defer tm.Stop()
// Go ahead and receive
for {
select {
case _, ok := <-ch:
if !ok {
t.Fatalf("Got an error reading from channel")
}
case <-tm.C:
t.Fatalf("Timed out waiting on messages")
}
received++
if received >= total {
return
}
}
}
func TestChanQueueSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Create our own channel.
ch1 := make(chan *nats.Msg, 64)
ch2 := make(chan *nats.Msg, 64)
nc.ChanQueueSubscribe("foo", "bar", ch1)
nc.ChanQueueSubscribe("foo", "bar", ch2)
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
received := 0
tm := time.NewTimer(5 * time.Second)
defer tm.Stop()
chk := func(ok bool) {
if !ok {
t.Fatalf("Got an error reading from channel")
} else {
received++
}
}
// Go ahead and receive
for {
select {
case _, ok := <-ch1:
chk(ok)
case _, ok := <-ch2:
chk(ok)
case <-tm.C:
t.Fatalf("Timed out waiting on messages")
}
if received >= total {
return
}
}
}
func TestChanSubscriberPendingLimits(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
ncp := NewDefaultConnection(t)
defer ncp.Close()
// There was a defect that prevented to receive more than
// the default pending message limit. Trying to send more
// than this limit.
total := nats.DefaultSubPendingMsgsLimit + 100
for typeSubs := 0; typeSubs < 3; typeSubs++ {
func() {
// Create our own channel.
ch := make(chan *nats.Msg, total)
var err error
var sub *nats.Subscription
switch typeSubs {
case 0:
sub, err = nc.ChanSubscribe("foo", ch)
case 1:
sub, err = nc.ChanQueueSubscribe("foo", "bar", ch)
case 2:
sub, err = nc.QueueSubscribeSyncWithChan("foo", "bar", ch)
}
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
nc.Flush()
// Send some messages
for i := 0; i < total; i++ {
if err := ncp.Publish("foo", []byte("Hello")); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
}
received := 0
tm := time.NewTimer(10 * time.Second)
defer tm.Stop()
chk := func(ok bool) {
if !ok {
t.Fatalf("Got an error reading from channel")
} else {
received++
}
}
// Go ahead and receive
for {
select {
case _, ok := <-ch:
chk(ok)
if received >= total {
return
}
case <-tm.C:
t.Fatalf("Timed out waiting on messages for test %d, received %d", typeSubs, received)
}
}
}()
}
}
func TestQueueChanQueueSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Create our own channel.
ch1 := make(chan *nats.Msg, 64)
ch2 := make(chan *nats.Msg, 64)
nc.QueueSubscribeSyncWithChan("foo", "bar", ch1)
nc.QueueSubscribeSyncWithChan("foo", "bar", ch2)
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
recv1 := 0
recv2 := 0
tm := time.NewTimer(5 * time.Second)
defer tm.Stop()
runTimer := time.NewTimer(500 * time.Millisecond)
defer runTimer.Stop()
chk := func(ok bool, which int) {
if !ok {
t.Fatalf("Got an error reading from channel")
} else {
if which == 1 {
recv1++
} else {
recv2++
}
}
}
// Go ahead and receive
recvLoop:
for {
select {
case _, ok := <-ch1:
chk(ok, 1)
case _, ok := <-ch2:
chk(ok, 2)
case <-tm.C:
t.Fatalf("Timed out waiting on messages")
case <-runTimer.C:
break recvLoop
}
}
if recv1+recv2 > total {
t.Fatalf("Received more messages than expected: %v vs %v", (recv1 + recv2), total)
}
}
func TestUnsubscribeChanOnSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Create our own channel.
ch := make(chan *nats.Msg, 8)
sub, _ := nc.ChanSubscribe("foo", ch)
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
sub.Unsubscribe()
for len(ch) > 0 {
<-ch
}
// Make sure we can send to the channel still.
// Test that we do not close it.
ch <- &nats.Msg{}
}
func TestCloseChanOnSubscriber(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Create our own channel.
ch := make(chan *nats.Msg, 8)
nc.ChanSubscribe("foo", ch)
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
nc.Publish("foo", []byte("Hello"))
}
nc.Close()
for len(ch) > 0 {
<-ch
}
// Make sure we can send to the channel still.
// Test that we do not close it.
ch <- &nats.Msg{}
}
func TestAsyncSubscriptionPending(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Send some messages to ourselves.
total := 100
msg := []byte("0123456789")
inCb := make(chan bool)
block := make(chan bool)
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
inCb <- true
<-block
})
defer sub.Unsubscribe()
for i := 0; i < total; i++ {
nc.Publish("foo", msg)
}
nc.Flush()
// Wait that a message is received, so checks are safe
if err := Wait(inCb); err != nil {
t.Fatal("No message received")
}
// Test old way
q, _ := sub.QueuedMsgs()
if q != total && q != total-1 {
t.Fatalf("Expected %d or %d, got %d", total, total-1, q)
}
// New way, make sure the same and check bytes.
m, b, _ := sub.Pending()
mlen := len(msg)
totalSize := total * mlen
if m != total && m != total-1 {
t.Fatalf("Expected msgs of %d or %d, got %d", total, total-1, m)
}
if b != totalSize && b != totalSize-mlen {
t.Fatalf("Expected bytes of %d or %d, got %d",
totalSize, totalSize-mlen, b)
}
// Make sure max has been set. Since we block after the first message is
// received, MaxPending should be >= total - 1 and <= total
mm, bm, _ := sub.MaxPending()
if mm < total-1 || mm > total {
t.Fatalf("Expected max msgs (%d) to be between %d and %d",
mm, total-1, total)
}
if bm < totalSize-mlen || bm > totalSize {
t.Fatalf("Expected max bytes (%d) to be between %d and %d",
bm, totalSize, totalSize-mlen)
}
// Check that clear works.
sub.ClearMaxPending()
mm, bm, _ = sub.MaxPending()
if mm != 0 {
t.Fatalf("Expected max msgs to be 0 vs %d after clearing", mm)
}
if bm != 0 {
t.Fatalf("Expected max bytes to be 0 vs %d after clearing", bm)
}
close(block)
sub.Unsubscribe()
// These calls should fail once the subscription is closed.
if _, _, err := sub.Pending(); err == nil {
t.Fatal("Calling Pending() on closed subscription should fail")
}
if _, _, err := sub.MaxPending(); err == nil {
t.Fatal("Calling MaxPending() on closed subscription should fail")
}
if err := sub.ClearMaxPending(); err == nil {
t.Fatal("Calling ClearMaxPending() on closed subscription should fail")
}
}
func TestAsyncSubscriptionPendingDrain(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Send some messages to ourselves.
total := 100
msg := []byte("0123456789")
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {})
defer sub.Unsubscribe()
for i := 0; i < total; i++ {
nc.Publish("foo", msg)
}
nc.Flush()
// Wait for all delivered.
for d, _ := sub.Delivered(); d != int64(total); d, _ = sub.Delivered() {
time.Sleep(10 * time.Millisecond)
}
m, b, _ := sub.Pending()
if m != 0 {
t.Fatalf("Expected msgs of 0, got %d", m)
}
if b != 0 {
t.Fatalf("Expected bytes of 0, got %d", b)
}
sub.Unsubscribe()
if _, err := sub.Delivered(); err == nil {
t.Fatal("Calling Delivered() on closed subscription should fail")
}
}
func TestSyncSubscriptionPendingDrain(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
// Send some messages to ourselves.
total := 100
msg := []byte("0123456789")
sub, _ := nc.SubscribeSync("foo")
defer sub.Unsubscribe()
for i := 0; i < total; i++ {
nc.Publish("foo", msg)
}
nc.Flush()
// Wait for all delivered.
for d, _ := sub.Delivered(); d != int64(total); d, _ = sub.Delivered() {
sub.NextMsg(10 * time.Millisecond)
}
m, b, _ := sub.Pending()
if m != 0 {
t.Fatalf("Expected msgs of 0, got %d", m)
}
if b != 0 {
t.Fatalf("Expected bytes of 0, got %d", b)
}
sub.Unsubscribe()
if _, err := sub.Delivered(); err == nil {
t.Fatal("Calling Delivered() on closed subscription should fail")
}
}
func TestSyncSubscriptionPending(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, _ := nc.SubscribeSync("foo")
defer sub.Unsubscribe()
// Send some messages to ourselves.
total := 100
msg := []byte("0123456789")
for i := 0; i < total; i++ {
nc.Publish("foo", msg)
}
nc.Flush()
// Test old way
q, _ := sub.QueuedMsgs()
if q != total && q != total-1 {
t.Fatalf("Expected %d or %d, got %d", total, total-1, q)
}
// New way, make sure the same and check bytes.
m, b, _ := sub.Pending()
mlen := len(msg)
if m != total {
t.Fatalf("Expected msgs of %d, got %d", total, m)
}
if b != total*mlen {
t.Fatalf("Expected bytes of %d, got %d", total*mlen, b)
}
// Now drain some down and make sure pending is correct
for i := 0; i < total-1; i++ {
sub.NextMsg(10 * time.Millisecond)
}
m, b, _ = sub.Pending()
if m != 1 {
t.Fatalf("Expected msgs of 1, got %d", m)
}
if b != mlen {
t.Fatalf("Expected bytes of %d, got %d", mlen, b)
}
}
func TestSetPendingLimits(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
payload := []byte("hello")
payloadLen := len(payload)
toSend := 100
var sub *nats.Subscription
// Check for invalid values
invalid := func() error {
if err := sub.SetPendingLimits(0, 1); err == nil {
return fmt.Errorf("Setting limit with 0 should fail")
}
if err := sub.SetPendingLimits(1, 0); err == nil {
return fmt.Errorf("Setting limit with 0 should fail")
}
return nil
}
// function to send messages
send := func(subject string, count int) {
for i := 0; i < count; i++ {
if err := nc.Publish(subject, payload); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
}
nc.Flush()
}
// Check pending vs expected values
var limitCount, limitBytes int
var expectedCount, expectedBytes int
checkPending := func() error {
lc, lb, err := sub.PendingLimits()
if err != nil {
return err
}
if lc != limitCount || lb != limitBytes {
return fmt.Errorf("Unexpected limits, expected %v msgs %v bytes, got %v msgs %v bytes",
limitCount, limitBytes, lc, lb)
}
msgs, bytes, err := sub.Pending()
if err != nil {
return fmt.Errorf("Unexpected error getting pending counts: %v", err)
}
if (msgs != expectedCount && msgs != expectedCount-1) ||
(bytes != expectedBytes && bytes != expectedBytes-payloadLen) {
return fmt.Errorf("Unexpected counts, expected %v msgs %v bytes, got %v msgs %v bytes",
expectedCount, expectedBytes, msgs, bytes)
}
return nil
}
recv := make(chan bool)
block := make(chan bool)
cb := func(m *nats.Msg) {
recv <- true
<-block
m.Sub.Unsubscribe()
}
subj := "foo"
sub, err := nc.Subscribe(subj, cb)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
if err := invalid(); err != nil {
t.Fatalf("%v", err)
}
// Check we apply limit only for size
limitCount = -1
limitBytes = (toSend / 2) * payloadLen
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
// Wait for message to be received
if err := Wait(recv); err != nil {
t.Fatal("Did not get our message")
}
expectedBytes = limitBytes
expectedCount = limitBytes / payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
// Release callback
block <- true
subj = "bar"
sub, err = nc.Subscribe(subj, cb)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
// Check we apply limit only for count
limitCount = toSend / 4
limitBytes = -1
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
// Wait for message to be received
if err := Wait(recv); err != nil {
t.Fatal("Did not get our message")
}
expectedCount = limitCount
expectedBytes = limitCount * payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
// Release callback
block <- true
subj = "baz"
sub, err = nc.SubscribeSync(subj)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
if err := invalid(); err != nil {
t.Fatalf("%v", err)
}
// Check we apply limit only for size
limitCount = -1
limitBytes = (toSend / 2) * payloadLen
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
expectedBytes = limitBytes
expectedCount = limitBytes / payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
sub.Unsubscribe()
nc.Flush()
subj = "boz"
sub, err = nc.SubscribeSync(subj)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
// Check we apply limit only for count
limitCount = toSend / 4
limitBytes = -1
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
expectedCount = limitCount
expectedBytes = limitCount * payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
sub.Unsubscribe()
nc.Flush()
}
func TestSubscriptionTypes(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {})
defer sub.Unsubscribe()
if st := sub.Type(); st != nats.AsyncSubscription {
t.Fatalf("Expected AsyncSubscription, got %v", st)
}
// Check Pending
if err := sub.SetPendingLimits(1, 100); err != nil {
t.Fatalf("We should be able to SetPendingLimits()")
}
if _, _, err := sub.Pending(); err != nil {
t.Fatalf("We should be able to call Pending()")
}
sub.Unsubscribe()
if err := sub.SetPendingLimits(1, 100); err == nil {
t.Fatal("Calling SetPendingLimits() on closed subscription should fail")
}
if _, _, err := sub.PendingLimits(); err == nil {
t.Fatal("Calling PendingLimits() on closed subscription should fail")
}
sub, _ = nc.SubscribeSync("foo")
defer sub.Unsubscribe()
if st := sub.Type(); st != nats.SyncSubscription {
t.Fatalf("Expected SyncSubscription, got %v", st)
}
// Check Pending
if err := sub.SetPendingLimits(1, 100); err != nil {
t.Fatalf("We should be able to SetPendingLimits()")
}
if _, _, err := sub.Pending(); err != nil {
t.Fatalf("We should be able to call Pending()")
}
sub.Unsubscribe()
if err := sub.SetPendingLimits(1, 100); err == nil {
t.Fatal("Calling SetPendingLimits() on closed subscription should fail")
}
if _, _, err := sub.PendingLimits(); err == nil {
t.Fatal("Calling PendingLimits() on closed subscription should fail")
}
sub, _ = nc.ChanSubscribe("foo", make(chan *nats.Msg))
defer sub.Unsubscribe()
if st := sub.Type(); st != nats.ChanSubscription {
t.Fatalf("Expected ChanSubscription, got %v", st)
}
// Check Pending
if err := sub.SetPendingLimits(1, 100); err == nil {
t.Fatalf("We should NOT be able to SetPendingLimits() on ChanSubscriber")
}
if _, _, err := sub.Pending(); err == nil {
t.Fatalf("We should NOT be able to call Pending() on ChanSubscriber")
}
if _, _, err := sub.MaxPending(); err == nil {
t.Fatalf("We should NOT be able to call MaxPending() on ChanSubscriber")
}
if err := sub.ClearMaxPending(); err == nil {
t.Fatalf("We should NOT be able to call ClearMaxPending() on ChanSubscriber")
}
if _, _, err := sub.PendingLimits(); err == nil {
t.Fatalf("We should NOT be able to call PendingLimits() on ChanSubscriber")
}
}