mirror of https://github.com/tidwall/tile38.git
403 lines
12 KiB
Go
403 lines
12 KiB
Go
package amqp_test
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
func ExampleConfig_timeout() {
|
|
// Provide your own anonymous Dial function that delgates to net.DialTimout
|
|
// for custom timeouts
|
|
|
|
conn, err := amqp.DialConfig("amqp:///", amqp.Config{
|
|
Dial: func(network, addr string) (net.Conn, error) {
|
|
return net.DialTimeout(network, addr, 2*time.Second)
|
|
},
|
|
})
|
|
|
|
log.Printf("conn: %v, err: %v", conn, err)
|
|
}
|
|
|
|
func ExampleDialTLS() {
|
|
// This example assume you have a RabbitMQ node running on localhost
|
|
// with TLS enabled.
|
|
//
|
|
// The easiest way to create the CA, certificates and keys required for these
|
|
// examples is by using tls-gen: https://github.com/michaelklishin/tls-gen
|
|
//
|
|
// A comprehensive RabbitMQ TLS guide can be found at
|
|
// http://www.rabbitmq.com/ssl.html
|
|
//
|
|
// Once you have the required TLS files in place, use the following
|
|
// rabbitmq.config example for the RabbitMQ node that you will run on
|
|
// localhost:
|
|
//
|
|
// [
|
|
// {rabbit, [
|
|
// {tcp_listeners, []}, % listens on 127.0.0.1:5672
|
|
// {ssl_listeners, [5671]}, % listens on 0.0.0.0:5671
|
|
// {ssl_options, [{cacertfile,"/path/to/your/testca/cacert.pem"},
|
|
// {certfile,"/path/to/your/server/cert.pem"},
|
|
// {keyfile,"/path/to/your/server/key.pem"},
|
|
// {verify,verify_peer},
|
|
// {fail_if_no_peer_cert,true}]}
|
|
// ]}
|
|
// ].
|
|
//
|
|
//
|
|
// In the above rabbitmq.config example, we are disabling the plain AMQP port
|
|
// and verifying that clients and fail if no certificate is presented.
|
|
//
|
|
// The self-signing certificate authority's certificate (cacert.pem) must be
|
|
// included in the RootCAs to be trusted, otherwise the server certificate
|
|
// will fail certificate verification.
|
|
//
|
|
// Alternatively to adding it to the tls.Config. you can add the CA's cert to
|
|
// your system's root CAs. The tls package will use the system roots
|
|
// specific to each support OS. Under OS X, add (drag/drop) cacert.pem
|
|
// file to the 'Certificates' section of KeyChain.app to add and always
|
|
// trust. You can also add it via the command line:
|
|
//
|
|
// security add-certificate testca/cacert.pem
|
|
// security add-trusted-cert testca/cacert.pem
|
|
//
|
|
// If you depend on the system root CAs, then use nil for the RootCAs field
|
|
// so the system roots will be loaded instead.
|
|
//
|
|
// Server names are validated by the crypto/tls package, so the server
|
|
// certificate must be made for the hostname in the URL. Find the commonName
|
|
// (CN) and make sure the hostname in the URL matches this common name. Per
|
|
// the RabbitMQ instructions (or tls-gen) for a self-signed cert, this defaults to the
|
|
// current hostname.
|
|
//
|
|
// openssl x509 -noout -in /path/to/certificate.pem -subject
|
|
//
|
|
// If your server name in your certificate is different than the host you are
|
|
// connecting to, set the hostname used for verification in
|
|
// ServerName field of the tls.Config struct.
|
|
cfg := new(tls.Config)
|
|
|
|
// see at the top
|
|
cfg.RootCAs = x509.NewCertPool()
|
|
|
|
if ca, err := ioutil.ReadFile("testca/cacert.pem"); err == nil {
|
|
cfg.RootCAs.AppendCertsFromPEM(ca)
|
|
}
|
|
|
|
// Move the client cert and key to a location specific to your application
|
|
// and load them here.
|
|
|
|
if cert, err := tls.LoadX509KeyPair("client/cert.pem", "client/key.pem"); err == nil {
|
|
cfg.Certificates = append(cfg.Certificates, cert)
|
|
}
|
|
|
|
// see a note about Common Name (CN) at the top
|
|
conn, err := amqp.DialTLS("amqps://server-name-from-certificate/", cfg)
|
|
|
|
log.Printf("conn: %v, err: %v", conn, err)
|
|
}
|
|
|
|
func ExampleChannel_Confirm_bridge() {
|
|
// This example acts as a bridge, shoveling all messages sent from the source
|
|
// exchange "log" to destination exchange "log".
|
|
|
|
// Confirming publishes can help from overproduction and ensure every message
|
|
// is delivered.
|
|
|
|
// Setup the source of the store and forward
|
|
source, err := amqp.Dial("amqp://source/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open source: %s", err)
|
|
}
|
|
defer source.Close()
|
|
|
|
chs, err := source.Channel()
|
|
if err != nil {
|
|
log.Fatalf("channel.open source: %s", err)
|
|
}
|
|
|
|
if err := chs.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
|
|
log.Fatalf("exchange.declare destination: %s", err)
|
|
}
|
|
|
|
if _, err := chs.QueueDeclare("remote-tee", true, true, false, false, nil); err != nil {
|
|
log.Fatalf("queue.declare source: %s", err)
|
|
}
|
|
|
|
if err := chs.QueueBind("remote-tee", "#", "logs", false, nil); err != nil {
|
|
log.Fatalf("queue.bind source: %s", err)
|
|
}
|
|
|
|
shovel, err := chs.Consume("remote-tee", "shovel", false, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("basic.consume source: %s", err)
|
|
}
|
|
|
|
// Setup the destination of the store and forward
|
|
destination, err := amqp.Dial("amqp://destination/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open destination: %s", err)
|
|
}
|
|
defer destination.Close()
|
|
|
|
chd, err := destination.Channel()
|
|
if err != nil {
|
|
log.Fatalf("channel.open destination: %s", err)
|
|
}
|
|
|
|
if err := chd.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
|
|
log.Fatalf("exchange.declare destination: %s", err)
|
|
}
|
|
|
|
// Buffer of 1 for our single outstanding publishing
|
|
confirms := chd.NotifyPublish(make(chan amqp.Confirmation, 1))
|
|
|
|
if err := chd.Confirm(false); err != nil {
|
|
log.Fatalf("confirm.select destination: %s", err)
|
|
}
|
|
|
|
// Now pump the messages, one by one, a smarter implementation
|
|
// would batch the deliveries and use multiple ack/nacks
|
|
for {
|
|
msg, ok := <-shovel
|
|
if !ok {
|
|
log.Fatalf("source channel closed, see the reconnect example for handling this")
|
|
}
|
|
|
|
err = chd.Publish("logs", msg.RoutingKey, false, false, amqp.Publishing{
|
|
// Copy all the properties
|
|
ContentType: msg.ContentType,
|
|
ContentEncoding: msg.ContentEncoding,
|
|
DeliveryMode: msg.DeliveryMode,
|
|
Priority: msg.Priority,
|
|
CorrelationId: msg.CorrelationId,
|
|
ReplyTo: msg.ReplyTo,
|
|
Expiration: msg.Expiration,
|
|
MessageId: msg.MessageId,
|
|
Timestamp: msg.Timestamp,
|
|
Type: msg.Type,
|
|
UserId: msg.UserId,
|
|
AppId: msg.AppId,
|
|
|
|
// Custom headers
|
|
Headers: msg.Headers,
|
|
|
|
// And the body
|
|
Body: msg.Body,
|
|
})
|
|
|
|
if err != nil {
|
|
msg.Nack(false, false)
|
|
log.Fatalf("basic.publish destination: %+v", msg)
|
|
}
|
|
|
|
// only ack the source delivery when the destination acks the publishing
|
|
if confirmed := <-confirms; confirmed.Ack {
|
|
msg.Ack(false)
|
|
} else {
|
|
msg.Nack(false, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func ExampleChannel_Consume() {
|
|
// Connects opens an AMQP connection from the credentials in the URL.
|
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open: %s", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
c, err := conn.Channel()
|
|
if err != nil {
|
|
log.Fatalf("channel.open: %s", err)
|
|
}
|
|
|
|
// We declare our topology on both the publisher and consumer to ensure they
|
|
// are the same. This is part of AMQP being a programmable messaging model.
|
|
//
|
|
// See the Channel.Publish example for the complimentary declare.
|
|
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("exchange.declare: %s", err)
|
|
}
|
|
|
|
// Establish our queue topologies that we are responsible for
|
|
type bind struct {
|
|
queue string
|
|
key string
|
|
}
|
|
|
|
bindings := []bind{
|
|
bind{"page", "alert"},
|
|
bind{"email", "info"},
|
|
bind{"firehose", "#"},
|
|
}
|
|
|
|
for _, b := range bindings {
|
|
_, err = c.QueueDeclare(b.queue, true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("queue.declare: %v", err)
|
|
}
|
|
|
|
err = c.QueueBind(b.queue, b.key, "logs", false, nil)
|
|
if err != nil {
|
|
log.Fatalf("queue.bind: %v", err)
|
|
}
|
|
}
|
|
|
|
// Set our quality of service. Since we're sharing 3 consumers on the same
|
|
// channel, we want at least 3 messages in flight.
|
|
err = c.Qos(3, 0, false)
|
|
if err != nil {
|
|
log.Fatalf("basic.qos: %v", err)
|
|
}
|
|
|
|
// Establish our consumers that have different responsibilities. Our first
|
|
// two queues do not ack the messages on the server, so require to be acked
|
|
// on the client.
|
|
|
|
pages, err := c.Consume("page", "pager", false, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("basic.consume: %v", err)
|
|
}
|
|
|
|
go func() {
|
|
for log := range pages {
|
|
// ... this consumer is responsible for sending pages per log
|
|
log.Ack(false)
|
|
}
|
|
}()
|
|
|
|
// Notice how the concern for which messages arrive here are in the AMQP
|
|
// topology and not in the queue. We let the server pick a consumer tag this
|
|
// time.
|
|
|
|
emails, err := c.Consume("email", "", false, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("basic.consume: %v", err)
|
|
}
|
|
|
|
go func() {
|
|
for log := range emails {
|
|
// ... this consumer is responsible for sending emails per log
|
|
log.Ack(false)
|
|
}
|
|
}()
|
|
|
|
// This consumer requests that every message is acknowledged as soon as it's
|
|
// delivered.
|
|
|
|
firehose, err := c.Consume("firehose", "", true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("basic.consume: %v", err)
|
|
}
|
|
|
|
// To show how to process the items in parallel, we'll use a work pool.
|
|
for i := 0; i < runtime.NumCPU(); i++ {
|
|
go func(work <-chan amqp.Delivery) {
|
|
for _ = range work {
|
|
// ... this consumer pulls from the firehose and doesn't need to acknowledge
|
|
}
|
|
}(firehose)
|
|
}
|
|
|
|
// Wait until you're ready to finish, could be a signal handler here.
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Cancelling a consumer by name will finish the range and gracefully end the
|
|
// goroutine
|
|
err = c.Cancel("pager", false)
|
|
if err != nil {
|
|
log.Fatalf("basic.cancel: %v", err)
|
|
}
|
|
|
|
// deferred closing the Connection will also finish the consumer's ranges of
|
|
// their delivery chans. If you need every delivery to be processed, make
|
|
// sure to wait for all consumers goroutines to finish before exiting your
|
|
// process.
|
|
}
|
|
|
|
func ExampleChannel_Publish() {
|
|
// Connects opens an AMQP connection from the credentials in the URL.
|
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open: %s", err)
|
|
}
|
|
|
|
// This waits for a server acknowledgment which means the sockets will have
|
|
// flushed all outbound publishings prior to returning. It's important to
|
|
// block on Close to not lose any publishings.
|
|
defer conn.Close()
|
|
|
|
c, err := conn.Channel()
|
|
if err != nil {
|
|
log.Fatalf("channel.open: %s", err)
|
|
}
|
|
|
|
// We declare our topology on both the publisher and consumer to ensure they
|
|
// are the same. This is part of AMQP being a programmable messaging model.
|
|
//
|
|
// See the Channel.Consume example for the complimentary declare.
|
|
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatalf("exchange.declare: %v", err)
|
|
}
|
|
|
|
// Prepare this message to be persistent. Your publishing requirements may
|
|
// be different.
|
|
msg := amqp.Publishing{
|
|
DeliveryMode: amqp.Persistent,
|
|
Timestamp: time.Now(),
|
|
ContentType: "text/plain",
|
|
Body: []byte("Go Go AMQP!"),
|
|
}
|
|
|
|
// This is not a mandatory delivery, so it will be dropped if there are no
|
|
// queues bound to the logs exchange.
|
|
err = c.Publish("logs", "info", false, false, msg)
|
|
if err != nil {
|
|
// Since publish is asynchronous this can happen if the network connection
|
|
// is reset or if the server has run out of resources.
|
|
log.Fatalf("basic.publish: %v", err)
|
|
}
|
|
}
|
|
|
|
func publishAllTheThings(conn *amqp.Connection) {
|
|
// ... snarf snarf, barf barf
|
|
}
|
|
|
|
func ExampleConnection_NotifyBlocked() {
|
|
// Simply logs when the server throttles the TCP connection for publishers
|
|
|
|
// Test this by tuning your server to have a low memory watermark:
|
|
// rabbitmqctl set_vm_memory_high_watermark 0.00000001
|
|
|
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open: %s", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
blockings := conn.NotifyBlocked(make(chan amqp.Blocking))
|
|
go func() {
|
|
for b := range blockings {
|
|
if b.Active {
|
|
log.Printf("TCP blocked: %q", b.Reason)
|
|
} else {
|
|
log.Printf("TCP unblocked")
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Your application domain channel setup publishings
|
|
publishAllTheThings(conn)
|
|
}
|