From 53271ebad6788fe34f55d49444b822c83292664a Mon Sep 17 00:00:00 2001 From: Lenny-Campino Hartmann Date: Tue, 7 Aug 2018 21:04:04 +0200 Subject: [PATCH] Added NATS endpoint --- pkg/endpoint/endpoint.go | 87 +++++++++++++++++++++++++++++++++++----- pkg/endpoint/nats.go | 80 ++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 10 deletions(-) create mode 100644 pkg/endpoint/nats.go diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 6c60ec65..0b5ace8e 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -33,6 +33,8 @@ const ( AMQP = Protocol("amqp") // SQS protocol SQS = Protocol("sqs") + // NATS protocol + NATS = Protocol("nats") ) // Endpoint represents an endpoint. @@ -90,6 +92,14 @@ type Endpoint struct { CredProfile string QueueName string } + + NATS struct { + Host string + Port int + User string + Pass string + Topic string + } } // Conn is an endpoint connection @@ -165,6 +175,8 @@ func (epc *Manager) Send(endpoint, msg string) error { conn = newAMQPConn(ep) case SQS: conn = newSQSConn(ep) + case NATS: + conn = newNATSConn(ep) } epc.conns[endpoint] = conn } @@ -209,6 +221,8 @@ func parseEndpoint(s string) (Endpoint, error) { endpoint.Protocol = MQTT case strings.HasPrefix(s, "sqs:"): endpoint.Protocol = SQS + case strings.HasPrefix(s, "nats:"): + endpoint.Protocol = NATS } s = s[strings.Index(s, ":")+1:] @@ -469,7 +483,7 @@ func parseEndpoint(s string) (Endpoint, error) { // Basic AMQP connection strings in HOOKS interface // amqp://guest:guest@localhost:5672//?params=value - // or amqp://guest:guest@localhost:5672///?params=value + // or amqp://guest:guest@localhost:5672///?params=value // // Default params are: // @@ -487,15 +501,15 @@ func parseEndpoint(s string) (Endpoint, error) { endpoint.AMQP.Durable = true endpoint.AMQP.DeliveryMode = amqp.Transient - // Fix incase of namespace, e.g. example.com/namespace/queue - // but not example.com/queue/ - with an endslash. - if len(sp) > 2 && len(sp[2]) > 0 { - endpoint.AMQP.URI = endpoint.AMQP.URI + "/" + sp[1] - sp = append([]string{endpoint.AMQP.URI}, sp[2:]...) - } - - // Bind queue name with no namespace - if len(sp) > 1 { + // Fix incase of namespace, e.g. example.com/namespace/queue + // but not example.com/queue/ - with an endslash. + if len(sp) > 2 && len(sp[2]) > 0 { + endpoint.AMQP.URI = endpoint.AMQP.URI + "/" + sp[1] + sp = append([]string{endpoint.AMQP.URI}, sp[2:]...) + } + + // Bind queue name with no namespace + if len(sp) > 1 { var err error endpoint.AMQP.QueueName, err = url.QueryUnescape(sp[1]) if err != nil { @@ -549,6 +563,59 @@ func parseEndpoint(s string) (Endpoint, error) { } } + // Basic NATS connection strings in HOOKS interface + // nats://://?params=value + // + // params are: + // + // user - username + // pass - password + // when user or pass is not set then login without password is used + if endpoint.Protocol == NATS { + // Parsing connection from URL string + hp := strings.Split(s, ":") + switch len(hp) { + default: + return endpoint, errors.New("invalid SQS url") + case 2: + endpoint.NATS.Host = hp[0] + port, err := strconv.Atoi(hp[1]) + if err != nil { + endpoint.NATS.Port = 4222 // default nats port + } else { + endpoint.NATS.Port = port + } + } + + // Parsing NATS topic name + if len(sp) > 1 { + var err error + endpoint.NATS.Topic, err = url.QueryUnescape(sp[1]) + if err != nil { + return endpoint, errors.New("invalid NATS topic name") + } + } + + // Parsing additional params + if len(sqp) > 1 { + m, err := url.ParseQuery(sqp[1]) + if err != nil { + return endpoint, errors.New("invalid NATS url") + } + for key, val := range m { + if len(val) == 0 { + continue + } + switch key { + case "user": + endpoint.NATS.User = val[0] + case "pass": + endpoint.NATS.Pass = val[0] + } + } + } + } + return endpoint, nil } diff --git a/pkg/endpoint/nats.go b/pkg/endpoint/nats.go new file mode 100644 index 00000000..f63e4d05 --- /dev/null +++ b/pkg/endpoint/nats.go @@ -0,0 +1,80 @@ +package endpoint + +import ( + "fmt" + "sync" + "time" + + "github.com/nats-io/go-nats" +) + +const ( + natsExpiresAfter = time.Second * 30 +) + +// NATSConn is an endpoint connection +type NATSConn struct { + mu sync.Mutex + ep Endpoint + ex bool + t time.Time + conn *nats.Conn +} + +func newNATSConn(ep Endpoint) *NATSConn { + return &NATSConn{ + ep: ep, + t: time.Now(), + } +} + +// Expired returns true if the connection has expired +func (conn *NATSConn) Expired() bool { + conn.mu.Lock() + defer conn.mu.Unlock() + if !conn.ex { + if time.Now().Sub(conn.t) > natsExpiresAfter { + if conn.conn != nil { + conn.close() + } + conn.ex = true + } + } + return conn.ex +} + +func (conn *NATSConn) close() { + if conn.conn != nil { + conn.conn.Close() + conn.conn = nil + } +} + +// Send sends a message +func (conn *NATSConn) Send(msg string) error { + conn.mu.Lock() + defer conn.mu.Unlock() + if conn.ex { + return errExpired + } + conn.t = time.Now() + if conn.conn == nil { + addr := fmt.Sprintf("nats://%s:%d", conn.ep.NATS.Host, conn.ep.NATS.Port) + var err error + if conn.ep.NATS.User != "" && conn.ep.NATS.Pass != "" { + conn.conn, err = nats.Connect(addr, nats.UserInfo(conn.ep.NATS.User, conn.ep.NATS.Pass)) + } + conn.conn, err = nats.Connect(addr) + if err != nil { + conn.close() + return err + } + } + err := conn.conn.Publish(conn.ep.NATS.Topic, []byte(msg)) + if err != nil { + conn.close() + return err + } + + return nil +}