Added NATS endpoint

This commit is contained in:
Lenny-Campino Hartmann 2018-08-07 21:04:04 +02:00
parent f31cead207
commit 53271ebad6
2 changed files with 157 additions and 10 deletions

View File

@ -33,6 +33,8 @@ const (
AMQP = Protocol("amqp")
// SQS protocol
SQS = Protocol("sqs")
// NATS protocol
NATS = Protocol("nats")
)
// Endpoint represents an endpoint.
@ -90,6 +92,14 @@ type Endpoint struct {
CredProfile string
QueueName string
}
NATS struct {
Host string
Port int
User string
Pass string
Topic string
}
}
// Conn is an endpoint connection
@ -165,6 +175,8 @@ func (epc *Manager) Send(endpoint, msg string) error {
conn = newAMQPConn(ep)
case SQS:
conn = newSQSConn(ep)
case NATS:
conn = newNATSConn(ep)
}
epc.conns[endpoint] = conn
}
@ -209,6 +221,8 @@ func parseEndpoint(s string) (Endpoint, error) {
endpoint.Protocol = MQTT
case strings.HasPrefix(s, "sqs:"):
endpoint.Protocol = SQS
case strings.HasPrefix(s, "nats:"):
endpoint.Protocol = NATS
}
s = s[strings.Index(s, ":")+1:]
@ -469,7 +483,7 @@ func parseEndpoint(s string) (Endpoint, error) {
// Basic AMQP connection strings in HOOKS interface
// amqp://guest:guest@localhost:5672/<queue_name>/?params=value
// or amqp://guest:guest@localhost:5672/<namespace>/<queue_name>/?params=value
// or amqp://guest:guest@localhost:5672/<namespace>/<queue_name>/?params=value
//
// Default params are:
//
@ -487,15 +501,15 @@ func parseEndpoint(s string) (Endpoint, error) {
endpoint.AMQP.Durable = true
endpoint.AMQP.DeliveryMode = amqp.Transient
// Fix incase of namespace, e.g. example.com/namespace/queue
// but not example.com/queue/ - with an endslash.
if len(sp) > 2 && len(sp[2]) > 0 {
endpoint.AMQP.URI = endpoint.AMQP.URI + "/" + sp[1]
sp = append([]string{endpoint.AMQP.URI}, sp[2:]...)
}
// Bind queue name with no namespace
if len(sp) > 1 {
// Fix incase of namespace, e.g. example.com/namespace/queue
// but not example.com/queue/ - with an endslash.
if len(sp) > 2 && len(sp[2]) > 0 {
endpoint.AMQP.URI = endpoint.AMQP.URI + "/" + sp[1]
sp = append([]string{endpoint.AMQP.URI}, sp[2:]...)
}
// Bind queue name with no namespace
if len(sp) > 1 {
var err error
endpoint.AMQP.QueueName, err = url.QueryUnescape(sp[1])
if err != nil {
@ -549,6 +563,59 @@ func parseEndpoint(s string) (Endpoint, error) {
}
}
// Basic NATS connection strings in HOOKS interface
// nats://<host>:<port>/<topic_name>/?params=value
//
// params are:
//
// user - username
// pass - password
// when user or pass is not set then login without password is used
if endpoint.Protocol == NATS {
// Parsing connection from URL string
hp := strings.Split(s, ":")
switch len(hp) {
default:
return endpoint, errors.New("invalid SQS url")
case 2:
endpoint.NATS.Host = hp[0]
port, err := strconv.Atoi(hp[1])
if err != nil {
endpoint.NATS.Port = 4222 // default nats port
} else {
endpoint.NATS.Port = port
}
}
// Parsing NATS topic name
if len(sp) > 1 {
var err error
endpoint.NATS.Topic, err = url.QueryUnescape(sp[1])
if err != nil {
return endpoint, errors.New("invalid NATS topic name")
}
}
// Parsing additional params
if len(sqp) > 1 {
m, err := url.ParseQuery(sqp[1])
if err != nil {
return endpoint, errors.New("invalid NATS url")
}
for key, val := range m {
if len(val) == 0 {
continue
}
switch key {
case "user":
endpoint.NATS.User = val[0]
case "pass":
endpoint.NATS.Pass = val[0]
}
}
}
}
return endpoint, nil
}

80
pkg/endpoint/nats.go Normal file
View File

@ -0,0 +1,80 @@
package endpoint
import (
"fmt"
"sync"
"time"
"github.com/nats-io/go-nats"
)
const (
natsExpiresAfter = time.Second * 30
)
// NATSConn is an endpoint connection
type NATSConn struct {
mu sync.Mutex
ep Endpoint
ex bool
t time.Time
conn *nats.Conn
}
func newNATSConn(ep Endpoint) *NATSConn {
return &NATSConn{
ep: ep,
t: time.Now(),
}
}
// Expired returns true if the connection has expired
func (conn *NATSConn) Expired() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
if !conn.ex {
if time.Now().Sub(conn.t) > natsExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.ex = true
}
}
return conn.ex
}
func (conn *NATSConn) close() {
if conn.conn != nil {
conn.conn.Close()
conn.conn = nil
}
}
// Send sends a message
func (conn *NATSConn) Send(msg string) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ex {
return errExpired
}
conn.t = time.Now()
if conn.conn == nil {
addr := fmt.Sprintf("nats://%s:%d", conn.ep.NATS.Host, conn.ep.NATS.Port)
var err error
if conn.ep.NATS.User != "" && conn.ep.NATS.Pass != "" {
conn.conn, err = nats.Connect(addr, nats.UserInfo(conn.ep.NATS.User, conn.ep.NATS.Pass))
}
conn.conn, err = nats.Connect(addr)
if err != nil {
conn.close()
return err
}
}
err := conn.conn.Publish(conn.ep.NATS.Topic, []byte(msg))
if err != nil {
conn.close()
return err
}
return nil
}