tile38/vendor/github.com/streadway/amqp/confirms_test.go

154 lines
2.8 KiB
Go
Raw Permalink Normal View History

2017-03-15 20:45:35 +03:00
package amqp
import (
"testing"
"time"
)
func TestConfirmOneResequences(t *testing.T) {
var (
fixtures = []Confirmation{
{1, true},
{2, false},
{3, true},
}
c = newConfirms()
l = make(chan Confirmation, len(fixtures))
)
c.Listen(l)
for i := range fixtures {
if want, got := uint64(i+1), c.Publish(); want != got {
t.Fatalf("expected publish to return the 1 based delivery tag published, want: %d, got: %d", want, got)
}
}
c.One(fixtures[1])
c.One(fixtures[2])
select {
case confirm := <-l:
t.Fatalf("expected to wait in order to properly resequence results, got: %+v", confirm)
default:
}
c.One(fixtures[0])
for i, fix := range fixtures {
if want, got := fix, <-l; want != got {
t.Fatalf("expected to return confirmations in sequence for %d, want: %+v, got: %+v", i, want, got)
}
}
}
func TestConfirmMixedResequences(t *testing.T) {
var (
fixtures = []Confirmation{
{1, true},
{2, true},
{3, true},
}
c = newConfirms()
l = make(chan Confirmation, len(fixtures))
)
c.Listen(l)
for _ = range fixtures {
c.Publish()
}
c.One(fixtures[0])
c.One(fixtures[2])
c.Multiple(fixtures[1])
for i, fix := range fixtures {
want := fix
var got Confirmation
select {
case got = <-l:
case <-time.After(1 * time.Second):
t.Fatalf("timeout on reading confirmations")
}
if want != got {
t.Fatalf("expected to confirm in sequence for %d, want: %+v, got: %+v", i, want, got)
}
}
}
func TestConfirmMultipleResequences(t *testing.T) {
var (
fixtures = []Confirmation{
{1, true},
{2, true},
{3, true},
{4, true},
}
c = newConfirms()
l = make(chan Confirmation, len(fixtures))
)
c.Listen(l)
for _ = range fixtures {
c.Publish()
}
c.Multiple(fixtures[len(fixtures)-1])
for i, fix := range fixtures {
if want, got := fix, <-l; want != got {
t.Fatalf("expected to confirm multiple in sequence for %d, want: %+v, got: %+v", i, want, got)
}
}
}
func BenchmarkSequentialBufferedConfirms(t *testing.B) {
var (
c = newConfirms()
l = make(chan Confirmation, 10)
)
c.Listen(l)
for i := 0; i < t.N; i++ {
if i > cap(l)-1 {
<-l
}
c.One(Confirmation{c.Publish(), true})
}
}
func TestConfirmsIsThreadSafe(t *testing.T) {
const count = 1000
const timeout = 5 * time.Second
var (
c = newConfirms()
l = make(chan Confirmation)
pub = make(chan Confirmation)
done = make(chan Confirmation)
late = time.After(timeout)
)
c.Listen(l)
for i := 0; i < count; i++ {
go func() { pub <- Confirmation{c.Publish(), true} }()
}
for i := 0; i < count; i++ {
go func() { c.One(<-pub) }()
}
for i := 0; i < count; i++ {
go func() { done <- <-l }()
}
for i := 0; i < count; i++ {
select {
case <-done:
case <-late:
t.Fatalf("expected all publish/confirms to finish after %s", timeout)
}
}
}