Fix AMQP uri custom params not working

issue #301
This commit is contained in:
Josh Baker 2018-04-17 14:56:43 -07:00
parent d20900ccc3
commit cd08d7fa7d
2 changed files with 70 additions and 18 deletions

View File

@ -1,10 +1,12 @@
package endpoint
import (
"net"
"sync"
"time"
"fmt"
"github.com/streadway/amqp"
)
@ -56,7 +58,11 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
prefix = "amqps://"
}
c, err := amqp.Dial(fmt.Sprintf("%s%s", prefix, conn.ep.AMQP.URI))
var cfg amqp.Config
cfg.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, time.Second)
}
c, err := amqp.DialConfig(fmt.Sprintf("%s%s", prefix, conn.ep.AMQP.URI), cfg)
if err != nil {
return err
@ -70,11 +76,11 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
// Declare new exchange
if err := channel.ExchangeDeclare(
conn.ep.AMQP.QueueName,
"direct",
true,
false,
false,
false,
conn.ep.AMQP.Type,
conn.ep.AMQP.Durable,
conn.ep.AMQP.AutoDelete,
conn.ep.AMQP.Internal,
conn.ep.AMQP.NoWait,
nil,
); err != nil {
return err
@ -83,11 +89,10 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
// Create queue if queue don't exists
if _, err := channel.QueueDeclare(
conn.ep.AMQP.QueueName,
true,
false,
false,
conn.ep.AMQP.Durable,
conn.ep.AMQP.AutoDelete,
false,
conn.ep.AMQP.NoWait,
nil,
); err != nil {
return err
@ -98,7 +103,7 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
conn.ep.AMQP.QueueName,
conn.ep.AMQP.RouteKey,
conn.ep.AMQP.QueueName,
false,
conn.ep.AMQP.NoWait,
nil,
); err != nil {
return err
@ -111,14 +116,14 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
if err := conn.channel.Publish(
conn.ep.AMQP.QueueName,
conn.ep.AMQP.RouteKey,
false,
false,
conn.ep.AMQP.Mandatory,
conn.ep.AMQP.Immediate,
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
Body: []byte(msg),
DeliveryMode: amqp.Transient,
DeliveryMode: conn.ep.AMQP.DeliveryMode,
Priority: 0,
},
); err != nil {

View File

@ -7,6 +7,8 @@ import (
"strings"
"sync"
"time"
"github.com/streadway/amqp"
)
var errExpired = errors.New("expired")
@ -52,10 +54,18 @@ type Endpoint struct {
QueueName string
}
AMQP struct {
URI string
SSL bool
QueueName string
RouteKey string
URI string
SSL bool
QueueName string
RouteKey string
Type string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Mandatory bool
Immediate bool
DeliveryMode uint8
}
MQTT struct {
Host string
@ -462,6 +472,9 @@ func parseEndpoint(s string) (Endpoint, error) {
if endpoint.Protocol == AMQP {
// Bind connection information
endpoint.AMQP.URI = s
endpoint.AMQP.Type = "direct"
endpoint.AMQP.Durable = true
endpoint.AMQP.DeliveryMode = amqp.Transient
// Bind queue name
if len(sp) > 1 {
@ -485,6 +498,22 @@ func parseEndpoint(s string) (Endpoint, error) {
switch key {
case "route":
endpoint.AMQP.RouteKey = val[0]
case "type":
endpoint.AMQP.Type = val[0]
case "durable":
endpoint.AMQP.Durable = queryBool(val[0])
case "internal":
endpoint.AMQP.Internal = queryBool(val[0])
case "no_wait":
endpoint.AMQP.NoWait = queryBool(val[0])
case "auto_delete":
endpoint.AMQP.AutoDelete = queryBool(val[0])
case "immediate":
endpoint.AMQP.Immediate = queryBool(val[0])
case "mandatory":
endpoint.AMQP.Mandatory = queryBool(val[0])
case "delivery_mode":
endpoint.AMQP.DeliveryMode = uint8(queryInt(val[0]))
}
}
}
@ -504,3 +533,21 @@ func parseEndpoint(s string) (Endpoint, error) {
return endpoint, nil
}
func queryInt(s string) int {
x, _ := strconv.ParseInt(s, 10, 64)
return int(x)
}
func queryBool(s string) bool {
if len(s) > 0 {
if s[0] >= '1' && s[0] <= '9' {
return true
}
switch s[0] {
case 'Y', 'y', 'T', 't':
return true
}
}
return false
}