diff --git a/controller/endpoint/amqp.go b/controller/endpoint/amqp.go index 290ed51f..db5dbdd2 100644 --- a/controller/endpoint/amqp.go +++ b/controller/endpoint/amqp.go @@ -1,10 +1,12 @@ package endpoint import ( + "net" "sync" "time" "fmt" + "github.com/streadway/amqp" ) @@ -56,7 +58,11 @@ func (conn *AMQPEndpointConn) Send(msg string) error { prefix = "amqps://" } - c, err := amqp.Dial(fmt.Sprintf("%s%s", prefix, conn.ep.AMQP.URI)) + var cfg amqp.Config + cfg.Dial = func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Second) + } + c, err := amqp.DialConfig(fmt.Sprintf("%s%s", prefix, conn.ep.AMQP.URI), cfg) if err != nil { return err @@ -70,11 +76,11 @@ func (conn *AMQPEndpointConn) Send(msg string) error { // Declare new exchange if err := channel.ExchangeDeclare( conn.ep.AMQP.QueueName, - "direct", - true, - false, - false, - false, + conn.ep.AMQP.Type, + conn.ep.AMQP.Durable, + conn.ep.AMQP.AutoDelete, + conn.ep.AMQP.Internal, + conn.ep.AMQP.NoWait, nil, ); err != nil { return err @@ -83,11 +89,10 @@ func (conn *AMQPEndpointConn) Send(msg string) error { // Create queue if queue don't exists if _, err := channel.QueueDeclare( conn.ep.AMQP.QueueName, - true, - - false, - false, + conn.ep.AMQP.Durable, + conn.ep.AMQP.AutoDelete, false, + conn.ep.AMQP.NoWait, nil, ); err != nil { return err @@ -98,7 +103,7 @@ func (conn *AMQPEndpointConn) Send(msg string) error { conn.ep.AMQP.QueueName, conn.ep.AMQP.RouteKey, conn.ep.AMQP.QueueName, - false, + conn.ep.AMQP.NoWait, nil, ); err != nil { return err @@ -111,14 +116,14 @@ func (conn *AMQPEndpointConn) Send(msg string) error { if err := conn.channel.Publish( conn.ep.AMQP.QueueName, conn.ep.AMQP.RouteKey, - false, - false, + conn.ep.AMQP.Mandatory, + conn.ep.AMQP.Immediate, amqp.Publishing{ Headers: amqp.Table{}, ContentType: "application/json", ContentEncoding: "", Body: []byte(msg), - DeliveryMode: amqp.Transient, + DeliveryMode: conn.ep.AMQP.DeliveryMode, Priority: 0, }, ); err != nil { diff --git a/controller/endpoint/endpoint.go b/controller/endpoint/endpoint.go index f4e12435..5df104e5 100644 --- a/controller/endpoint/endpoint.go +++ b/controller/endpoint/endpoint.go @@ -7,6 +7,8 @@ import ( "strings" "sync" "time" + + "github.com/streadway/amqp" ) var errExpired = errors.New("expired") @@ -52,10 +54,18 @@ type Endpoint struct { QueueName string } AMQP struct { - URI string - SSL bool - QueueName string - RouteKey string + URI string + SSL bool + QueueName string + RouteKey string + Type string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Mandatory bool + Immediate bool + DeliveryMode uint8 } MQTT struct { Host string @@ -462,6 +472,9 @@ func parseEndpoint(s string) (Endpoint, error) { if endpoint.Protocol == AMQP { // Bind connection information endpoint.AMQP.URI = s + endpoint.AMQP.Type = "direct" + endpoint.AMQP.Durable = true + endpoint.AMQP.DeliveryMode = amqp.Transient // Bind queue name if len(sp) > 1 { @@ -485,6 +498,22 @@ func parseEndpoint(s string) (Endpoint, error) { switch key { case "route": endpoint.AMQP.RouteKey = val[0] + case "type": + endpoint.AMQP.Type = val[0] + case "durable": + endpoint.AMQP.Durable = queryBool(val[0]) + case "internal": + endpoint.AMQP.Internal = queryBool(val[0]) + case "no_wait": + endpoint.AMQP.NoWait = queryBool(val[0]) + case "auto_delete": + endpoint.AMQP.AutoDelete = queryBool(val[0]) + case "immediate": + endpoint.AMQP.Immediate = queryBool(val[0]) + case "mandatory": + endpoint.AMQP.Mandatory = queryBool(val[0]) + case "delivery_mode": + endpoint.AMQP.DeliveryMode = uint8(queryInt(val[0])) } } } @@ -504,3 +533,21 @@ func parseEndpoint(s string) (Endpoint, error) { return endpoint, nil } + +func queryInt(s string) int { + x, _ := strconv.ParseInt(s, 10, 64) + return int(x) +} + +func queryBool(s string) bool { + if len(s) > 0 { + if s[0] >= '1' && s[0] <= '9' { + return true + } + switch s[0] { + case 'Y', 'y', 'T', 't': + return true + } + } + return false +}