// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Source code and contact info at http://github.com/streadway/amqp // +build integration package amqp import ( "bytes" devrand "crypto/rand" "encoding/binary" "fmt" "hash/crc32" "io" "math/rand" "net" "os" "reflect" "strconv" "sync" "testing" "testing/quick" "time" ) func TestIntegrationOpenClose(t *testing.T) { if c := integrationConnection(t, "open-close"); c != nil { t.Logf("have connection, calling connection close") if err := c.Close(); err != nil { t.Fatalf("connection close: %s", err) } t.Logf("connection close OK") } } func TestIntegrationOpenCloseChannel(t *testing.T) { if c := integrationConnection(t, "channel"); c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Fatalf("create channel 1: %s", err) } ch.Close() } } func TestIntegrationHighChannelChurnInTightLoop(t *testing.T) { if c := integrationConnection(t, "channel churn"); c != nil { defer c.Close() for i := 0; i < 1000; i++ { ch, err := c.Channel() if err != nil { t.Fatalf("create channel 1: %s", err) } ch.Close() } } } func TestIntegrationOpenConfig(t *testing.T) { config := Config{} c, err := DialConfig(integrationURLFromEnv(), config) if err != nil { t.Fatalf("expected to dial with config %+v integration server: %s", config, err) } if _, err := c.Channel(); err != nil { t.Fatalf("expected to open channel: %s", err) } if err := c.Close(); err != nil { t.Fatalf("connection close: %s", err) } } func TestIntegrationOpenConfigWithNetDial(t *testing.T) { config := Config{Dial: net.Dial} c, err := DialConfig(integrationURLFromEnv(), config) if err != nil { t.Errorf("expected to dial with config %+v integration server: %s", config, err) } if _, err := c.Channel(); err != nil { t.Fatalf("expected to open channel: %s", err) } if err := c.Close(); err != nil { t.Fatalf("connection close: %s", err) } } func TestIntegrationLocalAddr(t *testing.T) { config := Config{} c, err := DialConfig(integrationURLFromEnv(), config) defer c.Close() if err != nil { t.Errorf("expected to dial with config %+v integration server: %s", config, err) } a := c.LocalAddr() _, portString, err := net.SplitHostPort(a.String()) if err != nil { t.Errorf("expected to get a local network address with config %+v integration server: %s", config, a.String()) } port, err := strconv.Atoi(portString) if err != nil { t.Errorf("expected to get a TCP port number with config %+v integration server: %s", config, err) } t.Logf("Connected to port %d\n", port) } // https://github.com/streadway/amqp/issues/94 func TestExchangePassiveOnMissingExchangeShouldError(t *testing.T) { c := integrationConnection(t, "exch") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Fatalf("create channel 1: %s", err) } defer ch.Close() if err := ch.ExchangeDeclarePassive( "test-integration-missing-passive-exchange", "direct", // type false, // duration (note: is durable) true, // auto-delete false, // internal false, // nowait nil, // args ); err == nil { t.Fatal("ExchangeDeclarePassive of a missing exchange should return error") } } } // https://github.com/streadway/amqp/issues/94 func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T) { c := integrationConnection(t, "exch") if c != nil { defer c.Close() exchange := "test-integration-decalred-passive-exchange" ch, err := c.Channel() if err != nil { t.Fatalf("create channel: %s", err) } defer ch.Close() if err := ch.ExchangeDeclare( exchange, // name "direct", // type false, // durable true, // auto-delete false, // internal false, // nowait nil, // args ); err != nil { t.Fatalf("declare exchange: %s", err) } if err := ch.ExchangeDeclarePassive( exchange, // name "direct", // type false, // durable true, // auto-delete false, // internal false, // nowait nil, // args ); err != nil { t.Fatalf("ExchangeDeclarePassive on a declared exchange should not error, got: %q", err) } } } func TestIntegrationExchange(t *testing.T) { c := integrationConnection(t, "exch") if c != nil { defer c.Close() channel, err := c.Channel() if err != nil { t.Fatalf("create channel: %s", err) } t.Logf("create channel OK") exchange := "test-integration-exchange" if err := channel.ExchangeDeclare( exchange, // name "direct", // type false, // duration true, // auto-delete false, // internal false, // nowait nil, // args ); err != nil { t.Fatalf("declare exchange: %s", err) } t.Logf("declare exchange OK") if err := channel.ExchangeDelete(exchange, false, false); err != nil { t.Fatalf("delete exchange: %s", err) } t.Logf("delete exchange OK") if err := channel.Close(); err != nil { t.Fatalf("close channel: %s", err) } t.Logf("close channel OK") } } // https://github.com/streadway/amqp/issues/94 func TestIntegrationQueueDeclarePassiveOnMissingExchangeShouldError(t *testing.T) { c := integrationConnection(t, "queue") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Fatalf("create channel1: %s", err) } defer ch.Close() if _, err := ch.QueueDeclarePassive( "test-integration-missing-passive-queue", // name false, // duration (note: not durable) true, // auto-delete false, // exclusive false, // noWait nil, // arguments ); err == nil { t.Fatal("QueueDeclarePassive of a missing queue should error") } } } // https://github.com/streadway/amqp/issues/94 func TestIntegrationPassiveQueue(t *testing.T) { c := integrationConnection(t, "queue") if c != nil { defer c.Close() name := "test-integration-declared-passive-queue" ch, err := c.Channel() if err != nil { t.Fatalf("create channel1: %s", err) } defer ch.Close() if _, err := ch.QueueDeclare( name, // name false, // durable true, // auto-delete false, // exclusive false, // noWait nil, // arguments ); err != nil { t.Fatalf("queue declare: %s", err) } if _, err := ch.QueueDeclarePassive( name, // name false, // durable true, // auto-delete false, // exclusive false, // noWait nil, // arguments ); err != nil { t.Fatalf("QueueDeclarePassive on declared queue should not error, got: %q", err) } if _, err := ch.QueueDeclarePassive( name, // name true, // durable (note: differs) true, // auto-delete false, // exclusive false, // noWait nil, // arguments ); err != nil { t.Fatalf("QueueDeclarePassive on declared queue with different flags should error") } } } func TestIntegrationBasicQueueOperations(t *testing.T) { c := integrationConnection(t, "queue") if c != nil { defer c.Close() channel, err := c.Channel() if err != nil { t.Fatalf("create channel: %s", err) } t.Logf("create channel OK") exchangeName := "test-basic-ops-exchange" queueName := "test-basic-ops-queue" deleteQueueFirstOptions := []bool{true, false} for _, deleteQueueFirst := range deleteQueueFirstOptions { if err := channel.ExchangeDeclare( exchangeName, // name "direct", // type true, // duration (note: is durable) false, // auto-delete false, // internal false, // nowait nil, // args ); err != nil { t.Fatalf("declare exchange: %s", err) } t.Logf("declare exchange OK") if _, err := channel.QueueDeclare( queueName, // name true, // duration (note: durable) false, // auto-delete false, // exclusive false, // noWait nil, // arguments ); err != nil { t.Fatalf("queue declare: %s", err) } t.Logf("declare queue OK") if err := channel.QueueBind( queueName, // name "", // routingKey exchangeName, // sourceExchange false, // noWait nil, // arguments ); err != nil { t.Fatalf("queue bind: %s", err) } t.Logf("queue bind OK") if deleteQueueFirst { if _, err := channel.QueueDelete( queueName, // name false, // ifUnused (false=be aggressive) false, // ifEmpty (false=be aggressive) false, // noWait ); err != nil { t.Fatalf("delete queue (first): %s", err) } t.Logf("delete queue (first) OK") if err := channel.ExchangeDelete(exchangeName, false, false); err != nil { t.Fatalf("delete exchange (after delete queue): %s", err) } t.Logf("delete exchange (after delete queue) OK") } else { // deleteExchangeFirst if err := channel.ExchangeDelete(exchangeName, false, false); err != nil { t.Fatalf("delete exchange (first): %s", err) } t.Logf("delete exchange (first) OK") if _, err := channel.QueueInspect(queueName); err != nil { t.Fatalf("inspect queue state after deleting exchange: %s", err) } t.Logf("queue properly remains after exchange is deleted") if _, err := channel.QueueDelete( queueName, false, // ifUnused false, // ifEmpty false, // noWait ); err != nil { t.Fatalf("delete queue (after delete exchange): %s", err) } t.Logf("delete queue (after delete exchange) OK") } } if err := channel.Close(); err != nil { t.Fatalf("close channel: %s", err) } t.Logf("close channel OK") } } func TestIntegrationConnectionNegotiatesMaxChannels(t *testing.T) { config := Config{ChannelMax: 0} c, err := DialConfig(integrationURLFromEnv(), config) if err != nil { t.Errorf("expected to dial with config %+v integration server: %s", config, err) } defer c.Close() if want, got := defaultChannelMax, c.Config.ChannelMax; want != got { t.Fatalf("expected connection to negotiate uint16 (%d) channels, got: %d", want, got) } } func TestIntegrationConnectionNegotiatesClientMaxChannels(t *testing.T) { config := Config{ChannelMax: 16} c, err := DialConfig(integrationURLFromEnv(), config) if err != nil { t.Errorf("expected to dial with config %+v integration server: %s", config, err) } defer c.Close() if want, got := config.ChannelMax, c.Config.ChannelMax; want != got { t.Fatalf("expected client specified channel limit after handshake %d, got: %d", want, got) } } func TestIntegrationChannelIDsExhausted(t *testing.T) { config := Config{ChannelMax: 16} c, err := DialConfig(integrationURLFromEnv(), config) if err != nil { t.Errorf("expected to dial with config %+v integration server: %s", config, err) } defer c.Close() for i := 1; i <= c.Config.ChannelMax; i++ { if _, err := c.Channel(); err != nil { t.Fatalf("expected allocating all channel ids to succed, failed on %d with %v", i, err) } } if _, err := c.Channel(); err != ErrChannelMax { t.Fatalf("expected allocating all channels to produce the client side error %#v, got: %#v", ErrChannelMax, err) } } func TestIntegrationChannelClosing(t *testing.T) { c := integrationConnection(t, "closings") if c != nil { defer c.Close() // This function is run on every channel after it is successfully // opened. It can do something to verify something. It should be // quick; many channels may be opened! f := func(t *testing.T, c *Channel) { return } // open and close channel, err := c.Channel() if err != nil { t.Fatalf("basic create channel: %s", err) } t.Logf("basic create channel OK") if err := channel.Close(); err != nil { t.Fatalf("basic close channel: %s", err) } t.Logf("basic close channel OK") // deferred close signal := make(chan bool) go func() { channel, err := c.Channel() if err != nil { t.Fatalf("second create channel: %s", err) } t.Logf("second create channel OK") <-signal // a bit of synchronization f(t, channel) defer func() { if err := channel.Close(); err != nil { t.Fatalf("deferred close channel: %s", err) } t.Logf("deferred close channel OK") signal <- true }() }() signal <- true select { case <-signal: t.Logf("(got close signal OK)") break case <-time.After(250 * time.Millisecond): t.Fatalf("deferred close: timeout") } // multiple channels for _, n := range []int{2, 4, 8, 16, 32, 64, 128, 256} { channels := make([]*Channel, n) for i := 0; i < n; i++ { var err error if channels[i], err = c.Channel(); err != nil { t.Fatalf("create channel %d/%d: %s", i+1, n, err) } } f(t, channel) for i, channel := range channels { if err := channel.Close(); err != nil { t.Fatalf("close channel %d/%d: %s", i+1, n, err) } } t.Logf("created/closed %d channels OK", n) } } } func TestIntegrationMeaningfulChannelErrors(t *testing.T) { c := integrationConnection(t, "pub") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Fatalf("Could not create channel") } queue := "test.integration.channel.error" _, err = ch.QueueDeclare(queue, false, true, false, false, nil) if err != nil { t.Fatalf("Could not declare") } _, err = ch.QueueDeclare(queue, true, false, false, false, nil) if err == nil { t.Fatalf("Expected error, got nil") } e, ok := err.(*Error) if !ok { t.Fatalf("Expected type Error response, got %T", err) } if e.Code != PreconditionFailed { t.Fatalf("Expected PreconditionFailed, got: %+v", e) } _, err = ch.QueueDeclare(queue, false, true, false, false, nil) if err != ErrClosed { t.Fatalf("Expected channel to be closed, got: %T", err) } } } // https://github.com/streadway/amqp/issues/6 func TestIntegrationNonBlockingClose(t *testing.T) { c := integrationConnection(t, "#6") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Fatalf("Could not create channel") } queue := "test.integration.blocking.close" _, err = ch.QueueDeclare(queue, false, true, false, false, nil) if err != nil { t.Fatalf("Could not declare") } msgs, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Fatalf("Could not consume") } // Simulate a consumer go func() { for _ = range msgs { t.Logf("Oh my, received message on an empty queue") } }() succeed := make(chan bool) go func() { if err = ch.Close(); err != nil { t.Fatalf("Close produced an error when it shouldn't") } succeed <- true }() select { case <-succeed: break case <-time.After(1 * time.Second): t.Fatalf("Close timed out after 1s") } } } func TestIntegrationPublishConsume(t *testing.T) { queue := "test.integration.publish.consume" c1 := integrationConnection(t, "pub") c2 := integrationConnection(t, "sub") if c1 != nil && c2 != nil { defer c1.Close() defer c2.Close() pub, _ := c1.Channel() sub, _ := c2.Channel() pub.QueueDeclare(queue, false, true, false, false, nil) sub.QueueDeclare(queue, false, true, false, false, nil) defer pub.QueueDelete(queue, false, false, false) messages, _ := sub.Consume(queue, "", false, false, false, false, nil) pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")}) pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")}) pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 3")}) assertConsumeBody(t, messages, []byte("pub 1")) assertConsumeBody(t, messages, []byte("pub 2")) assertConsumeBody(t, messages, []byte("pub 3")) } } func TestIntegrationConsumeFlow(t *testing.T) { queue := "test.integration.consumer-flow" c1 := integrationConnection(t, "pub-flow") c2 := integrationConnection(t, "sub-flow") if c1 != nil && c2 != nil { defer c1.Close() defer c2.Close() pub, _ := c1.Channel() sub, _ := c2.Channel() pub.QueueDeclare(queue, false, true, false, false, nil) sub.QueueDeclare(queue, false, true, false, false, nil) defer pub.QueueDelete(queue, false, false, false) sub.Qos(1, 0, false) messages, _ := sub.Consume(queue, "", false, false, false, false, nil) pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")}) pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")}) msg := assertConsumeBody(t, messages, []byte("pub 1")) if err := sub.Flow(false); err.(*Error).Code == NotImplemented { t.Log("flow control is not supported on this version of rabbitmq") return } msg.Ack(false) select { case <-messages: t.Fatalf("message was delivered when flow was not active") default: } sub.Flow(true) msg = assertConsumeBody(t, messages, []byte("pub 2")) msg.Ack(false) } } func TestIntegrationRecoverNotImplemented(t *testing.T) { queue := "test.recover" if c, ch := integrationQueue(t, queue); c != nil { if product, ok := c.Properties["product"]; ok && product.(string) == "RabbitMQ" { defer c.Close() err := ch.Recover(false) if ex, ok := err.(*Error); !ok || ex.Code != 540 { t.Fatalf("Expected NOT IMPLEMENTED got: %v", ex) } } } } // This test is driven by a private API to simulate the server sending a channelFlow message func TestIntegrationPublishFlow(t *testing.T) { // TODO - no idea how to test without affecting the server or mucking internal APIs // i'd like to make sure the RW lock can be held by multiple publisher threads // and that multiple channelFlow messages do not block the dispatch thread } func TestIntegrationConsumeCancel(t *testing.T) { queue := "test.integration.consume-cancel" c := integrationConnection(t, "pub") if c != nil { defer c.Close() ch, _ := c.Channel() ch.QueueDeclare(queue, false, true, false, false, nil) defer ch.QueueDelete(queue, false, false, false) messages, _ := ch.Consume(queue, "integration-tag", false, false, false, false, nil) ch.Publish("", queue, false, false, Publishing{Body: []byte("1")}) assertConsumeBody(t, messages, []byte("1")) err := ch.Cancel("integration-tag", false) if err != nil { t.Fatalf("error cancelling the consumer: %v", err) } ch.Publish("", queue, false, false, Publishing{Body: []byte("2")}) select { case <-time.After(100 * time.Millisecond): t.Fatalf("Timeout on Close") case _, ok := <-messages: if ok { t.Fatalf("Extra message on consumer when consumer should have been closed") } } } } func (c *Connection) Generate(r *rand.Rand, _ int) reflect.Value { urlStr := os.Getenv("AMQP_URL") if urlStr == "" { return reflect.ValueOf(nil) } conn, err := Dial(urlStr) if err != nil { return reflect.ValueOf(nil) } return reflect.ValueOf(conn) } func (c Publishing) Generate(r *rand.Rand, _ int) reflect.Value { var ok bool var t reflect.Value p := Publishing{} //p.DeliveryMode = uint8(r.Intn(3)) //p.Priority = uint8(r.Intn(8)) if r.Intn(2) > 0 { p.ContentType = "application/octet-stream" } if r.Intn(2) > 0 { p.ContentEncoding = "gzip" } if r.Intn(2) > 0 { p.CorrelationId = fmt.Sprintf("%d", r.Int()) } if r.Intn(2) > 0 { p.ReplyTo = fmt.Sprintf("%d", r.Int()) } if r.Intn(2) > 0 { p.MessageId = fmt.Sprintf("%d", r.Int()) } if r.Intn(2) > 0 { p.Type = fmt.Sprintf("%d", r.Int()) } if r.Intn(2) > 0 { p.AppId = fmt.Sprintf("%d", r.Int()) } if r.Intn(2) > 0 { p.Timestamp = time.Unix(r.Int63(), r.Int63()) } if t, ok = quick.Value(reflect.TypeOf(p.Body), r); ok { p.Body = t.Bytes() } return reflect.ValueOf(p) } func TestQuickPublishOnly(t *testing.T) { if c := integrationConnection(t, "quick"); c != nil { defer c.Close() pub, err := c.Channel() queue := "test-publish" if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Errorf("Failed to declare: %s", err) return } defer pub.QueueDelete(queue, false, false, false) quick.Check(func(msg Publishing) bool { return pub.Publish("", queue, false, false, msg) == nil }, nil) } } func TestPublishEmptyBody(t *testing.T) { c := integrationConnection(t, "empty") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Errorf("Failed to create channel") return } queue := "test-TestPublishEmptyBody" if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("Could not declare") } messages, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Fatalf("Could not consume") } err = ch.Publish("", queue, false, false, Publishing{}) if err != nil { t.Fatalf("Could not publish") } select { case msg := <-messages: if len(msg.Body) != 0 { t.Errorf("Received non empty body") } case <-time.After(200 * time.Millisecond): t.Errorf("Timeout on receive") } } } func TestPublishEmptyBodyWithHeadersIssue67(t *testing.T) { c := integrationConnection(t, "issue67") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Errorf("Failed to create channel") return } queue := "test-TestPublishEmptyBodyWithHeaders" if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("Could not declare") } messages, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Fatalf("Could not consume") } headers := Table{ "ham": "spam", } err = ch.Publish("", queue, false, false, Publishing{Headers: headers}) if err != nil { t.Fatalf("Could not publish") } select { case msg := <-messages: if msg.Headers["ham"] == nil { t.Fatalf("Headers aren't sent") } if msg.Headers["ham"] != "spam" { t.Fatalf("Headers are wrong") } case <-time.After(200 * time.Millisecond): t.Errorf("Timeout on receive") } } } func TestQuickPublishConsumeOnly(t *testing.T) { c1 := integrationConnection(t, "quick-pub") c2 := integrationConnection(t, "quick-sub") if c1 != nil && c2 != nil { defer c1.Close() defer c2.Close() pub, err := c1.Channel() sub, err := c2.Channel() queue := "TestPublishConsumeOnly" if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Errorf("Failed to declare: %s", err) return } if _, err = sub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Errorf("Failed to declare: %s", err) return } defer sub.QueueDelete(queue, false, false, false) ch, err := sub.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Errorf("Could not sub: %s", err) } quick.CheckEqual( func(msg Publishing) []byte { empty := Publishing{Body: msg.Body} if pub.Publish("", queue, false, false, empty) != nil { return []byte{'X'} } return msg.Body }, func(msg Publishing) []byte { out := <-ch out.Ack(false) return out.Body }, nil) } } func TestQuickPublishConsumeBigBody(t *testing.T) { c1 := integrationConnection(t, "big-pub") c2 := integrationConnection(t, "big-sub") if c1 != nil && c2 != nil { defer c1.Close() defer c2.Close() pub, err := c1.Channel() sub, err := c2.Channel() queue := "test-pubsub" if _, err = sub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Errorf("Failed to declare: %s", err) return } ch, err := sub.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Errorf("Could not sub: %s", err) } fixture := Publishing{ Body: make([]byte, 1e4+1000), } if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Errorf("Failed to declare: %s", err) return } err = pub.Publish("", queue, false, false, fixture) if err != nil { t.Errorf("Could not publish big body") } select { case msg := <-ch: if bytes.Compare(msg.Body, fixture.Body) != 0 { t.Errorf("Consumed big body didn't match") } case <-time.After(200 * time.Millisecond): t.Errorf("Timeout on receive") } } } func TestIntegrationGetOk(t *testing.T) { if c := integrationConnection(t, "getok"); c != nil { defer c.Close() queue := "test.get-ok" ch, _ := c.Channel() ch.QueueDeclare(queue, false, true, false, false, nil) ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")}) msg, ok, err := ch.Get(queue, false) if err != nil { t.Fatalf("Failed get: %v", err) } if !ok { t.Fatalf("Get on a queued message did not find the message") } if string(msg.Body) != "ok" { t.Fatalf("Get did not get the correct message") } } } func TestIntegrationGetEmpty(t *testing.T) { if c := integrationConnection(t, "getok"); c != nil { defer c.Close() queue := "test.get-ok" ch, _ := c.Channel() ch.QueueDeclare(queue, false, true, false, false, nil) _, ok, err := ch.Get(queue, false) if err != nil { t.Fatalf("Failed get: %v", err) } if !ok { t.Fatalf("Get on a queued message retrieved a message when it shouldn't have") } } } func TestIntegrationTxCommit(t *testing.T) { if c := integrationConnection(t, "txcommit"); c != nil { defer c.Close() queue := "test.tx.commit" ch, _ := c.Channel() ch.QueueDeclare(queue, false, true, false, false, nil) if err := ch.Tx(); err != nil { t.Fatalf("tx.select failed") } ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")}) if err := ch.TxCommit(); err != nil { t.Fatalf("tx.commit failed") } msg, ok, err := ch.Get(queue, false) if err != nil || !ok { t.Fatalf("Failed get: %v", err) } if string(msg.Body) != "ok" { t.Fatalf("Get did not get the correct message from the transaction") } } } func TestIntegrationTxRollback(t *testing.T) { if c := integrationConnection(t, "txrollback"); c != nil { defer c.Close() queue := "test.tx.rollback" ch, _ := c.Channel() ch.QueueDeclare(queue, false, true, false, false, nil) if err := ch.Tx(); err != nil { t.Fatalf("tx.select failed") } ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")}) if err := ch.TxRollback(); err != nil { t.Fatalf("tx.rollback failed") } _, ok, err := ch.Get(queue, false) if err != nil { t.Fatalf("Failed get: %v", err) } if ok { t.Fatalf("message was published when it should have been rolled back") } } } func TestIntegrationReturn(t *testing.T) { if c, ch := integrationQueue(t, "return"); c != nil { defer c.Close() ret := make(chan Return, 1) ch.NotifyReturn(ret) // mandatory publish to an exchange without a binding should be returned ch.Publish("", "return-without-binding", true, false, Publishing{Body: []byte("mandatory")}) select { case res := <-ret: if string(res.Body) != "mandatory" { t.Fatalf("expected return of the same message") } if res.ReplyCode != NoRoute { t.Fatalf("expected no consumers reply code on the Return result, got: %v", res.ReplyCode) } case <-time.After(200 * time.Millisecond): t.Fatalf("no return was received within 200ms") } } } func TestIntegrationCancel(t *testing.T) { queue := "cancel" consumerTag := "test.cancel" if c, ch := integrationQueue(t, queue); c != nil { defer c.Close() cancels := ch.NotifyCancel(make(chan string, 1)) go func() { if _, err := ch.Consume(queue, consumerTag, false, false, false, false, nil); err != nil { t.Fatalf("cannot consume from %q to test NotifyCancel: %v", queue, err) } if _, err := ch.QueueDelete(queue, false, false, false); err != nil { t.Fatalf("cannot delete integration queue: %v", err) } }() select { case tag := <-cancels: if want, got := consumerTag, tag; want != got { t.Fatalf("expected to be notified of deleted queue with consumer tag, got: %q", got) } case <-time.After(200 * time.Millisecond): t.Fatalf("expected to be notified of deleted queue with 200ms") } } } func TestIntegrationConfirm(t *testing.T) { if c, ch := integrationQueue(t, "confirm"); c != nil { defer c.Close() confirms := ch.NotifyPublish(make(chan Confirmation, 1)) if err := ch.Confirm(false); err != nil { t.Fatalf("could not confirm") } ch.Publish("", "confirm", false, false, Publishing{Body: []byte("confirm")}) select { case confirmed := <-confirms: if confirmed.DeliveryTag != 1 { t.Fatalf("expected ack starting with delivery tag of 1") } case <-time.After(200 * time.Millisecond): t.Fatalf("no ack was received within 200ms") } } } // https://github.com/streadway/amqp/issues/61 func TestRoundTripAllFieldValueTypes61(t *testing.T) { if conn := integrationConnection(t, "issue61"); conn != nil { defer conn.Close() timestamp := time.Unix(100000000, 0) headers := Table{ "A": []interface{}{ []interface{}{"nested array", int32(3)}, Decimal{2, 1}, Table{"S": "nested table in array"}, int32(2 << 20), string("array string"), timestamp, nil, byte(2), float64(2.64), float32(2.32), int64(2 << 60), int16(2 << 10), bool(true), []byte{'b', '2'}, }, "D": Decimal{1, 1}, "F": Table{"S": "nested table in table"}, "I": int32(1 << 20), "S": string("string"), "T": timestamp, "V": nil, "b": byte(1), "d": float64(1.64), "f": float32(1.32), "l": int64(1 << 60), "s": int16(1 << 10), "t": bool(true), "x": []byte{'b', '1'}, } queue := "test.issue61-roundtrip" ch, _ := conn.Channel() if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("Could not declare") } msgs, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Fatalf("Could not consume") } err = ch.Publish("", queue, false, false, Publishing{Body: []byte("ignored"), Headers: headers}) if err != nil { t.Fatalf("Could not publish: %v", err) } msg, ok := <-msgs if !ok { t.Fatalf("Channel closed prematurely likely due to publish exception") } for k, v := range headers { if !reflect.DeepEqual(v, msg.Headers[k]) { t.Errorf("Round trip header not the same for key %q: expected: %#v, got %#v", k, v, msg.Headers[k]) } } } } // Declares a queue with the x-message-ttl extension to exercise integer // serialization. // // Relates to https://github.com/streadway/amqp/issues/60 // func TestDeclareArgsXMessageTTL(t *testing.T) { if conn := integrationConnection(t, "declareTTL"); conn != nil { defer conn.Close() ch, _ := conn.Channel() args := Table{"x-message-ttl": int32(9000000)} // should not drop the connection if _, err := ch.QueueDeclare("declareWithTTL", false, true, false, false, args); err != nil { t.Fatalf("cannot declare with TTL: got: %v", err) } } } // Sets up the topology where rejected messages will be forwarded // to a fanout exchange, with a single queue bound. // // Relates to https://github.com/streadway/amqp/issues/56 // func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) { if conn := integrationConnection(t, "declareArgs"); conn != nil { defer conn.Close() ex, q := "declareArgs", "declareArgs-deliveries" dlex, dlq := ex+"-dead-letter", q+"-dead-letter" ch, _ := conn.Channel() if err := ch.ExchangeDeclare(ex, "fanout", false, true, false, false, nil); err != nil { t.Fatalf("cannot declare %v: got: %v", ex, err) } if err := ch.ExchangeDeclare(dlex, "fanout", false, true, false, false, nil); err != nil { t.Fatalf("cannot declare %v: got: %v", dlex, err) } if _, err := ch.QueueDeclare(dlq, false, true, false, false, nil); err != nil { t.Fatalf("cannot declare %v: got: %v", dlq, err) } if err := ch.QueueBind(dlq, "#", dlex, false, nil); err != nil { t.Fatalf("cannot bind %v to %v: got: %v", dlq, dlex, err) } if _, err := ch.QueueDeclare(q, false, true, false, false, Table{ "x-dead-letter-exchange": dlex, }); err != nil { t.Fatalf("cannot declare %v with dlq %v: got: %v", q, dlex, err) } if err := ch.QueueBind(q, "#", ex, false, nil); err != nil { t.Fatalf("cannot bind %v: got: %v", ex, err) } fails, err := ch.Consume(q, "", false, false, false, false, nil) if err != nil { t.Fatalf("cannot consume %v: got: %v", q, err) } // Reject everything consumed go func() { for d := range fails { d.Reject(false) } }() // Publish the 'poison' if err := ch.Publish(ex, q, true, false, Publishing{Body: []byte("ignored")}); err != nil { t.Fatalf("publishing failed") } // spin-get until message arrives on the dead-letter queue with a // synchronous parse to exercise the array field (x-death) set by the // server relating to issue-56 for i := 0; i < 10; i++ { d, got, err := ch.Get(dlq, false) if !got && err == nil { continue } else if err != nil { t.Fatalf("expected success in parsing reject, got: %v", err) } else { // pass if we've parsed an array if v, ok := d.Headers["x-death"]; ok { if _, ok := v.([]interface{}); ok { return } } t.Fatalf("array field x-death expected in the headers, got: %v (%T)", d.Headers, d.Headers["x-death"]) } } t.Fatalf("expectd dead-letter after 10 get attempts") } } // https://github.com/streadway/amqp/issues/48 func TestDeadlockConsumerIssue48(t *testing.T) { if conn := integrationConnection(t, "issue48"); conn != nil { defer conn.Close() deadline := make(chan bool) go func() { select { case <-time.After(5 * time.Second): panic("expected to receive 2 deliveries while in an RPC, got a deadlock") case <-deadline: // pass } }() ch, err := conn.Channel() if err != nil { t.Fatalf("got error on channel.open: %v", err) } queue := "test-issue48" if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("expected to declare a queue: %v", err) } if err := ch.Confirm(false); err != nil { t.Fatalf("got error on confirm: %v", err) } confirms := ch.NotifyPublish(make(chan Confirmation, 2)) for i := 0; i < cap(confirms); i++ { // Fill the queue with some new or remaining publishings ch.Publish("", queue, false, false, Publishing{Body: []byte("")}) } for i := 0; i < cap(confirms); i++ { // Wait for them to land on the queue so they'll be delivered on consume <-confirms } // Consuming should send them all on the wire msgs, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Fatalf("got error on consume: %v", err) } // We pop one off the chan, the other is on the wire <-msgs // Opening a new channel (any RPC) while another delivery is on the wire if _, err := conn.Channel(); err != nil { t.Fatalf("got error on consume: %v", err) } // We pop the next off the chan <-msgs deadline <- true } } // https://github.com/streadway/amqp/issues/46 func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) { conn := integrationConnection(t, "issue46") if conn != nil { for i := 0; i < 100; i++ { ch, err := conn.Channel() if err != nil { t.Fatalf("expected error only on publish, got error on channel.open: %v", err) } for j := 0; j < 10; j++ { err = ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")}) if err, ok := err.(Error); ok { if err.Code != 504 { t.Fatalf("expected channel only exception, got: %v", err) } } } } } } // https://github.com/streadway/amqp/issues/43 func TestChannelExceptionWithCloseIssue43(t *testing.T) { conn := integrationConnection(t, "issue43") if conn != nil { go func() { for err := range conn.NotifyClose(make(chan *Error)) { t.Log(err.Error()) } }() c1, err := conn.Channel() if err != nil { panic(err) } go func() { for err := range c1.NotifyClose(make(chan *Error)) { t.Log("Channel1 Close: " + err.Error()) } }() c2, err := conn.Channel() if err != nil { panic(err) } go func() { for err := range c2.NotifyClose(make(chan *Error)) { t.Log("Channel2 Close: " + err.Error()) } }() // Cause an asynchronous channel exception causing the server // to send a "channel.close" method either before or after the next // asynchronous method. err = c1.Publish("nonexisting-exchange", "", false, false, Publishing{}) if err != nil { panic(err) } // Receive or send the channel close method, the channel shuts down // but this expects a channel.close-ok to be received. c1.Close() // This ensures that the 2nd channel is unaffected by the channel exception // on channel 1. err = c2.ExchangeDeclare("test-channel-still-exists", "direct", false, true, false, false, nil) if err != nil { panic(err) } } } // https://github.com/streadway/amqp/issues/7 func TestCorruptedMessageIssue7(t *testing.T) { messageCount := 1024 c1 := integrationConnection(t, "") c2 := integrationConnection(t, "") if c1 != nil && c2 != nil { defer c1.Close() defer c2.Close() pub, err := c1.Channel() if err != nil { t.Fatalf("Cannot create Channel") } sub, err := c2.Channel() if err != nil { t.Fatalf("Cannot create Channel") } queue := "test-corrupted-message-regression" if _, err := pub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("Cannot declare") } if _, err := sub.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("Cannot declare") } msgs, err := sub.Consume(queue, "", false, false, false, false, nil) if err != nil { t.Fatalf("Cannot consume") } for i := 0; i < messageCount; i++ { err := pub.Publish("", queue, false, false, Publishing{ Body: generateCrc32Random(7 * i), }) if err != nil { t.Fatalf("Failed to publish") } } for i := 0; i < messageCount; i++ { select { case msg := <-msgs: assertMessageCrc32(t, msg.Body, fmt.Sprintf("missed match at %d", i)) case <-time.After(200 * time.Millisecond): t.Fatalf("Timeout on recv") } } } } // https://github.com/streadway/amqp/issues/136 func TestChannelCounterShouldNotPanicIssue136(t *testing.T) { if c := integrationConnection(t, "issue136"); c != nil { defer c.Close() var wg sync.WaitGroup // exceeds 65535 channels for i := 0; i < 8; i++ { wg.Add(1) go func(i int) { for j := 0; j < 10000; j++ { ch, err := c.Channel() if err != nil { t.Fatalf("failed to create channel %d:%d, got: %v", i, j, err) } if err := ch.Close(); err != nil { t.Fatalf("failed to close channel %d:%d, got: %v", i, j, err) } } wg.Done() }(i) } wg.Wait() } } func TestExchangeDeclarePrecondition(t *testing.T) { c1 := integrationConnection(t, "exchange-double-declare") c2 := integrationConnection(t, "exchange-double-declare-cleanup") if c1 != nil && c2 != nil { defer c1.Close() defer c2.Close() ch, err := c1.Channel() if err != nil { t.Fatalf("Create channel") } exchange := "test-mismatched-redeclare" err = ch.ExchangeDeclare( exchange, "direct", // exchangeType false, // durable true, // auto-delete false, // internal false, // noWait nil, // arguments ) if err != nil { t.Fatalf("Could not initially declare exchange") } err = ch.ExchangeDeclare( exchange, "direct", true, // different durability true, false, false, nil, ) if err == nil { t.Fatalf("Expected to fail a redeclare with different durability, didn't receive an error") } if err, ok := err.(Error); ok { if err.Code != PreconditionFailed { t.Fatalf("Expected precondition error") } if !err.Recover { t.Fatalf("Expected to be able to recover") } } ch2, _ := c2.Channel() if err = ch2.ExchangeDelete(exchange, false, false); err != nil { t.Fatalf("Could not delete exchange: %v", err) } } } func TestRabbitMQQueueTTLGet(t *testing.T) { if c := integrationRabbitMQ(t, "ttl"); c != nil { defer c.Close() queue := "test.rabbitmq-message-ttl" channel, err := c.Channel() if err != nil { t.Fatalf("channel: %v", err) } if _, err = channel.QueueDeclare( queue, false, true, false, false, Table{"x-message-ttl": int32(100)}, // in ms ); err != nil { t.Fatalf("queue declare: %s", err) } channel.Publish("", queue, false, false, Publishing{Body: []byte("ttl")}) time.Sleep(200 * time.Millisecond) _, ok, err := channel.Get(queue, false) if ok { t.Fatalf("Expected the message to expire in 100ms, it didn't expire after 200ms") } if err != nil { t.Fatalf("Failed to get on ttl queue") } } } func TestRabbitMQQueueNackMultipleRequeue(t *testing.T) { if c := integrationRabbitMQ(t, "nack"); c != nil { defer c.Close() if c.isCapable("basic.nack") { queue := "test.rabbitmq-basic-nack" channel, err := c.Channel() if err != nil { t.Fatalf("channel: %v", err) } if _, err = channel.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("queue declare: %s", err) } channel.Publish("", queue, false, false, Publishing{Body: []byte("1")}) channel.Publish("", queue, false, false, Publishing{Body: []byte("2")}) m1, ok, err := channel.Get(queue, false) if !ok || err != nil || m1.Body[0] != '1' { t.Fatalf("could not get message %v", m1) } m2, ok, err := channel.Get(queue, false) if !ok || err != nil || m2.Body[0] != '2' { t.Fatalf("could not get message %v", m2) } m2.Nack(true, true) m1, ok, err = channel.Get(queue, false) if !ok || err != nil || m1.Body[0] != '1' { t.Fatalf("could not get message %v", m1) } m2, ok, err = channel.Get(queue, false) if !ok || err != nil || m2.Body[0] != '2' { t.Fatalf("could not get message %v", m2) } } } } func TestConsumerCancelNotification(t *testing.T) { c := integrationConnection(t, "consumer cancel notification") if c != nil { defer c.Close() ch, err := c.Channel() if err != nil { t.Fatalf("got error on channel.open: %v", err) } queue := "test-consumer-cancel-notification" if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { t.Fatalf("expected to declare a queue: %v", err) } if _, err := ch.Consume(queue, "", false, false, false, false, nil); err != nil { t.Fatalf("basic.consume failed") } // consumer cancel notification channel ccnChan := make(chan string, 1) ch.NotifyCancel(ccnChan) if _, err := ch.QueueDelete(queue, false, false, true); err != nil { t.Fatalf("queue.delete failed: %s", err) } select { case <-ccnChan: // do nothing case <-time.After(time.Second * 10): t.Errorf("basic.cancel wasn't received") t.Fail() } // we don't close ccnChan because channel shutdown // does it } } func TestConcurrentChannelAndConnectionClose(t *testing.T) { c := integrationConnection(t, "concurrent channel and connection test") if c != nil { ch, err := c.Channel() if err != nil { t.Fatalf("got error on channel.open: %v", err) } var wg sync.WaitGroup wg.Add(2) starter := make(chan struct{}) go func() { defer wg.Done() <-starter c.Close() }() go func() { defer wg.Done() <-starter ch.Close() }() close(starter) wg.Wait() } } /* * Support for integration tests */ func integrationURLFromEnv() string { url := os.Getenv("AMQP_URL") if url == "" { url = "amqp://" } return url } func loggedConnection(t *testing.T, conn *Connection, name string) *Connection { if name != "" { conn.conn = &logIO{t, name, conn.conn} } return conn } // Returns a connection to the AMQP if the AMQP_URL environment // variable is set and a connection can be established. func integrationConnection(t *testing.T, name string) *Connection { conn, err := Dial(integrationURLFromEnv()) if err != nil { t.Errorf("dial integration server: %s", err) return nil } return loggedConnection(t, conn, name) } // Returns a connection, channel and delcares a queue when the AMQP_URL is in the environment func integrationQueue(t *testing.T, name string) (*Connection, *Channel) { if conn := integrationConnection(t, name); conn != nil { if channel, err := conn.Channel(); err == nil { if _, err = channel.QueueDeclare(name, false, true, false, false, nil); err == nil { return conn, channel } } } return nil, nil } // Delegates to integrationConnection and only returns a connection if the // product is RabbitMQ func integrationRabbitMQ(t *testing.T, name string) *Connection { if conn := integrationConnection(t, "connect"); conn != nil { if server, ok := conn.Properties["product"]; ok && server == "RabbitMQ" { return conn } } return nil } func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg *Delivery) { select { case got := <-messages: if bytes.Compare(want, got.Body) != 0 { t.Fatalf("Message body does not match want: %v, got: %v, for: %+v", want, got.Body, got) } msg = &got case <-time.After(200 * time.Millisecond): t.Fatalf("Timeout waiting for %v", want) } return msg } // Pulls out the CRC and verifies the remaining content against the CRC func assertMessageCrc32(t *testing.T, msg []byte, assert string) { size := binary.BigEndian.Uint32(msg[:4]) crc := crc32.NewIEEE() crc.Write(msg[8:]) if binary.BigEndian.Uint32(msg[4:8]) != crc.Sum32() { t.Fatalf("Message does not match CRC: %s", assert) } if int(size) != len(msg)-8 { t.Fatalf("Message does not match size, should=%d, is=%d: %s", size, len(msg)-8, assert) } } // Creates a random body size with a leading 32-bit CRC in network byte order // that verifies the remaining slice func generateCrc32Random(size int) []byte { msg := make([]byte, size+8) if _, err := io.ReadFull(devrand.Reader, msg); err != nil { panic(err) } crc := crc32.NewIEEE() crc.Write(msg[8:]) binary.BigEndian.PutUint32(msg[0:4], uint32(size)) binary.BigEndian.PutUint32(msg[4:8], crc.Sum32()) return msg }