Refactor endpoint package

This commit is contained in:
Josh Baker 2018-04-19 09:25:39 -07:00
parent 19167b3a46
commit 22be725b45
11 changed files with 128 additions and 102 deletions

View File

@ -67,7 +67,7 @@ type Controller struct {
dir string dir string
started time.Time started time.Time
config *Config config *Config
epc *endpoint.EndpointManager epc *endpoint.Manager
// atomics // atomics
followc aint // counter increases when follow property changes followc aint // counter increases when follow property changes
@ -136,7 +136,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
expires: make(map[string]map[string]time.Time), expires: make(map[string]map[string]time.Time),
started: time.Now(), started: time.Now(),
conns: make(map[*server.Conn]*clientConn), conns: make(map[*server.Conn]*clientConn),
epc: endpoint.NewEndpointManager(), epc: endpoint.NewManager(),
http: http, http: http,
} }

View File

@ -345,7 +345,7 @@ type Hook struct {
closed bool closed bool
opened bool opened bool
query string query string
epm *endpoint.EndpointManager epm *endpoint.Manager
} }
func (h *Hook) Equals(hook *Hook) bool { func (h *Hook) Equals(hook *Hook) bool {

View File

@ -1,20 +1,20 @@
package endpoint package endpoint
import ( import (
"fmt"
"net" "net"
"sync" "sync"
"time" "time"
"fmt"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
const ( const (
AMQPExpiresAfter = time.Second * 30 amqpExpiresAfter = time.Second * 30
) )
type AMQPEndpointConn struct { // AMQPConn is an endpoint connection
type AMQPConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
conn *amqp.Connection conn *amqp.Connection
@ -23,11 +23,12 @@ type AMQPEndpointConn struct {
t time.Time t time.Time
} }
func (conn *AMQPEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *AMQPConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
if time.Now().Sub(conn.t) > kafkaExpiresAfter { if time.Now().Sub(conn.t) > amqpExpiresAfter {
conn.ex = true conn.ex = true
conn.close() conn.close()
} }
@ -35,7 +36,7 @@ func (conn *AMQPEndpointConn) Expired() bool {
return conn.ex return conn.ex
} }
func (conn *AMQPEndpointConn) close() { func (conn *AMQPConn) close() {
if conn.conn != nil { if conn.conn != nil {
conn.conn.Close() conn.conn.Close()
conn.conn = nil conn.conn = nil
@ -43,7 +44,8 @@ func (conn *AMQPEndpointConn) close() {
} }
} }
func (conn *AMQPEndpointConn) Send(msg string) error { // Send sends a message
func (conn *AMQPConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@ -113,7 +115,7 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
conn.channel = channel conn.channel = channel
} }
if err := conn.channel.Publish( return conn.channel.Publish(
conn.ep.AMQP.QueueName, conn.ep.AMQP.QueueName,
conn.ep.AMQP.RouteKey, conn.ep.AMQP.RouteKey,
conn.ep.AMQP.Mandatory, conn.ep.AMQP.Mandatory,
@ -126,15 +128,11 @@ func (conn *AMQPEndpointConn) Send(msg string) error {
DeliveryMode: conn.ep.AMQP.DeliveryMode, DeliveryMode: conn.ep.AMQP.DeliveryMode,
Priority: 0, Priority: 0,
}, },
); err != nil { )
return err
} }
return nil func newAMQPConn(ep Endpoint) *AMQPConn {
} return &AMQPConn{
func newAMQPEndpointConn(ep Endpoint) *AMQPEndpointConn {
return &AMQPEndpointConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }

View File

@ -15,7 +15,8 @@ const (
disqueExpiresAfter = time.Second * 30 disqueExpiresAfter = time.Second * 30
) )
type DisqueEndpointConn struct { // DisqueConn is an endpoint connection
type DisqueConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
ex bool ex bool
@ -24,14 +25,15 @@ type DisqueEndpointConn struct {
rd *bufio.Reader rd *bufio.Reader
} }
func newDisqueEndpointConn(ep Endpoint) *DisqueEndpointConn { func newDisqueConn(ep Endpoint) *DisqueConn {
return &DisqueEndpointConn{ return &DisqueConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }
} }
func (conn *DisqueEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *DisqueConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
@ -45,7 +47,7 @@ func (conn *DisqueEndpointConn) Expired() bool {
return conn.ex return conn.ex
} }
func (conn *DisqueEndpointConn) close() { func (conn *DisqueConn) close() {
if conn.conn != nil { if conn.conn != nil {
conn.conn.Close() conn.conn.Close()
conn.conn = nil conn.conn = nil
@ -53,7 +55,8 @@ func (conn *DisqueEndpointConn) close() {
conn.rd = nil conn.rd = nil
} }
func (conn *DisqueEndpointConn) Send(msg string) error { // Send sends a message
func (conn *DisqueConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ex { if conn.ex {

View File

@ -13,23 +13,31 @@ import (
var errExpired = errors.New("expired") var errExpired = errors.New("expired")
// EndpointProtocol is the type of protocol that the endpoint represents. // Protocol is the type of protocol that the endpoint represents.
type EndpointProtocol string type Protocol string
const ( const (
HTTP = EndpointProtocol("http") // HTTP // HTTP protocol
Disque = EndpointProtocol("disque") // Disque HTTP = Protocol("http")
GRPC = EndpointProtocol("grpc") // GRPC // Disque protocol
Redis = EndpointProtocol("redis") // Redis Disque = Protocol("disque")
Kafka = EndpointProtocol("kafka") // Kafka // GRPC protocol
MQTT = EndpointProtocol("mqtt") // MQTT GRPC = Protocol("grpc")
AMQP = EndpointProtocol("amqp") // AMQP // Redis protocol
SQS = EndpointProtocol("sqs") // SQS Redis = Protocol("redis")
// Kafka protocol
Kafka = Protocol("kafka")
// MQTT protocol
MQTT = Protocol("mqtt")
// AMQP protocol
AMQP = Protocol("amqp")
// SQS protocol
SQS = Protocol("sqs")
) )
// Endpoint represents an endpoint. // Endpoint represents an endpoint.
type Endpoint struct { type Endpoint struct {
Protocol EndpointProtocol Protocol Protocol
Original string Original string
GRPC struct { GRPC struct {
Host string Host string
@ -84,27 +92,29 @@ type Endpoint struct {
} }
} }
type EndpointConn interface { // Conn is an endpoint connection
type Conn interface {
Expired() bool Expired() bool
Send(val string) error Send(val string) error
} }
type EndpointManager struct { // Manager manages all endpoints
mu sync.RWMutex // this is intentionally exposed type Manager struct {
conns map[string]EndpointConn mu sync.RWMutex
conns map[string]Conn
} }
func NewEndpointManager() *EndpointManager { // NewManager returns a new manager
epc := &EndpointManager{ func NewManager() *Manager {
conns: make(map[string]EndpointConn), epc := &Manager{
conns: make(map[string]Conn),
} }
go epc.Run() go epc.Run()
return epc return epc
} }
// Manage connection at enpoints // Run starts the managing of endpoints
// If some connection expired we should delete it func (epc *Manager) Run() {
func (epc *EndpointManager) Run() {
for { for {
time.Sleep(time.Second) time.Sleep(time.Second)
func() { func() {
@ -119,14 +129,14 @@ func (epc *EndpointManager) Run() {
} }
} }
// Get finds an endpoint based on its url. If the enpoint does not // Validate an endpoint url
// exist a new only is created. func (epc *Manager) Validate(url string) error {
func (epc *EndpointManager) Validate(url string) error {
_, err := parseEndpoint(url) _, err := parseEndpoint(url)
return err return err
} }
func (epc *EndpointManager) Send(endpoint, val string) error { // Send send a message to an endpoint
func (epc *Manager) Send(endpoint, msg string) error {
for { for {
epc.mu.Lock() epc.mu.Lock()
conn, ok := epc.conns[endpoint] conn, ok := epc.conns[endpoint]
@ -140,26 +150,26 @@ func (epc *EndpointManager) Send(endpoint, val string) error {
default: default:
return errors.New("invalid protocol") return errors.New("invalid protocol")
case HTTP: case HTTP:
conn = newHTTPEndpointConn(ep) conn = newHTTPConn(ep)
case Disque: case Disque:
conn = newDisqueEndpointConn(ep) conn = newDisqueConn(ep)
case GRPC: case GRPC:
conn = newGRPCEndpointConn(ep) conn = newGRPCConn(ep)
case Redis: case Redis:
conn = newRedisEndpointConn(ep) conn = newRedisConn(ep)
case Kafka: case Kafka:
conn = newKafkaEndpointConn(ep) conn = newKafkaConn(ep)
case MQTT: case MQTT:
conn = newMQTTEndpointConn(ep) conn = newMQTTConn(ep)
case AMQP: case AMQP:
conn = newAMQPEndpointConn(ep) conn = newAMQPConn(ep)
case SQS: case SQS:
conn = newSQSEndpointConn(ep) conn = newSQSConn(ep)
} }
epc.conns[endpoint] = conn epc.conns[endpoint] = conn
} }
epc.mu.Unlock() epc.mu.Unlock()
err := conn.Send(val) err := conn.Send(msg)
if err != nil { if err != nil {
if err == errExpired { if err == errExpired {
// it's possible that the connection has expired in-between // it's possible that the connection has expired in-between

View File

@ -6,10 +6,8 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/net/context"
"github.com/tidwall/tile38/pkg/hservice" "github.com/tidwall/tile38/pkg/hservice"
"golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -17,7 +15,8 @@ const (
grpcExpiresAfter = time.Second * 30 grpcExpiresAfter = time.Second * 30
) )
type GRPCEndpointConn struct { // GRPCConn is an endpoint connection
type GRPCConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
ex bool ex bool
@ -26,14 +25,15 @@ type GRPCEndpointConn struct {
sconn hservice.HookServiceClient sconn hservice.HookServiceClient
} }
func newGRPCEndpointConn(ep Endpoint) *GRPCEndpointConn { func newGRPCConn(ep Endpoint) *GRPCConn {
return &GRPCEndpointConn{ return &GRPCConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }
} }
func (conn *GRPCEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *GRPCConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
@ -46,14 +46,15 @@ func (conn *GRPCEndpointConn) Expired() bool {
} }
return conn.ex return conn.ex
} }
func (conn *GRPCEndpointConn) close() { func (conn *GRPCConn) close() {
if conn.conn != nil { if conn.conn != nil {
conn.conn.Close() conn.conn.Close()
conn.conn = nil conn.conn = nil
} }
} }
func (conn *GRPCEndpointConn) Send(msg string) error { // Send sends a message
func (conn *GRPCConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ex { if conn.ex {
@ -70,7 +71,7 @@ func (conn *GRPCEndpointConn) Send(msg string) error {
} }
conn.sconn = hservice.NewHookServiceClient(conn.conn) conn.sconn = hservice.NewHookServiceClient(conn.conn)
} }
r, err := conn.sconn.Send(context.Background(), &hservice.MessageRequest{msg}) r, err := conn.sconn.Send(context.Background(), &hservice.MessageRequest{Value: msg})
if err != nil { if err != nil {
conn.close() conn.close()
return err return err

View File

@ -15,13 +15,14 @@ const (
httpMaxIdleConnections = 20 httpMaxIdleConnections = 20
) )
type HTTPEndpointConn struct { // HTTPConn is an endpoint connection
type HTTPConn struct {
ep Endpoint ep Endpoint
client *http.Client client *http.Client
} }
func newHTTPEndpointConn(ep Endpoint) *HTTPEndpointConn { func newHTTPConn(ep Endpoint) *HTTPConn {
return &HTTPEndpointConn{ return &HTTPConn{
ep: ep, ep: ep,
client: &http.Client{ client: &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
@ -33,11 +34,13 @@ func newHTTPEndpointConn(ep Endpoint) *HTTPEndpointConn {
} }
} }
func (conn *HTTPEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *HTTPConn) Expired() bool {
return false return false
} }
func (conn *HTTPEndpointConn) Send(msg string) error { // Send sends a message
func (conn *HTTPConn) Send(msg string) error {
req, err := http.NewRequest("POST", conn.ep.Original, bytes.NewBufferString(msg)) req, err := http.NewRequest("POST", conn.ep.Original, bytes.NewBufferString(msg))
if err != nil { if err != nil {
return err return err

View File

@ -13,7 +13,8 @@ const (
kafkaExpiresAfter = time.Second * 30 kafkaExpiresAfter = time.Second * 30
) )
type KafkaEndpointConn struct { // KafkaConn is an endpoint connection
type KafkaConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
conn sarama.SyncProducer conn sarama.SyncProducer
@ -21,7 +22,8 @@ type KafkaEndpointConn struct {
t time.Time t time.Time
} }
func (conn *KafkaEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *KafkaConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
@ -35,14 +37,15 @@ func (conn *KafkaEndpointConn) Expired() bool {
return conn.ex return conn.ex
} }
func (conn *KafkaEndpointConn) close() { func (conn *KafkaConn) close() {
if conn.conn != nil { if conn.conn != nil {
conn.conn.Close() conn.conn.Close()
conn.conn = nil conn.conn = nil
} }
} }
func (conn *KafkaEndpointConn) Send(msg string) error { // Send sends a message
func (conn *KafkaConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@ -84,8 +87,8 @@ func (conn *KafkaEndpointConn) Send(msg string) error {
return nil return nil
} }
func newKafkaEndpointConn(ep Endpoint) *KafkaEndpointConn { func newKafkaConn(ep Endpoint) *KafkaConn {
return &KafkaEndpointConn{ return &KafkaConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }

View File

@ -12,7 +12,8 @@ const (
mqttExpiresAfter = time.Second * 30 mqttExpiresAfter = time.Second * 30
) )
type MQTTEndpointConn struct { // MQTTConn is an endpoint connection
type MQTTConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
conn paho.Client conn paho.Client
@ -20,7 +21,8 @@ type MQTTEndpointConn struct {
t time.Time t time.Time
} }
func (conn *MQTTEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *MQTTConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
@ -32,7 +34,7 @@ func (conn *MQTTEndpointConn) Expired() bool {
return conn.ex return conn.ex
} }
func (conn *MQTTEndpointConn) close() { func (conn *MQTTConn) close() {
if conn.conn != nil { if conn.conn != nil {
if conn.conn.IsConnected() { if conn.conn.IsConnected() {
conn.conn.Disconnect(250) conn.conn.Disconnect(250)
@ -42,7 +44,8 @@ func (conn *MQTTEndpointConn) close() {
} }
} }
func (conn *MQTTEndpointConn) Send(msg string) error { // Send sends a message
func (conn *MQTTConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@ -74,8 +77,8 @@ func (conn *MQTTEndpointConn) Send(msg string) error {
return nil return nil
} }
func newMQTTEndpointConn(ep Endpoint) *MQTTEndpointConn { func newMQTTConn(ep Endpoint) *MQTTConn {
return &MQTTEndpointConn{ return &MQTTConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }

View File

@ -13,7 +13,8 @@ const (
redisExpiresAfter = time.Second * 30 redisExpiresAfter = time.Second * 30
) )
type RedisEndpointConn struct { // RedisConn is an endpoint connection
type RedisConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
ex bool ex bool
@ -22,14 +23,15 @@ type RedisEndpointConn struct {
rd *bufio.Reader rd *bufio.Reader
} }
func newRedisEndpointConn(ep Endpoint) *RedisEndpointConn { func newRedisConn(ep Endpoint) *RedisConn {
return &RedisEndpointConn{ return &RedisConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }
} }
func (conn *RedisEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *RedisConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
@ -43,7 +45,7 @@ func (conn *RedisEndpointConn) Expired() bool {
return conn.ex return conn.ex
} }
func (conn *RedisEndpointConn) close() { func (conn *RedisConn) close() {
if conn.conn != nil { if conn.conn != nil {
conn.conn.Close() conn.conn.Close()
conn.conn = nil conn.conn = nil
@ -51,7 +53,8 @@ func (conn *RedisEndpointConn) close() {
conn.rd = nil conn.rd = nil
} }
func (conn *RedisEndpointConn) Send(msg string) error { // Send sends a message
func (conn *RedisConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()

View File

@ -10,17 +10,17 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
var errCreateQueue = errors.New("Error while creating queue") var errCreateQueue = errors.New("Error while creating queue")
const ( const (
SQSExpiresAfter = time.Second * 30 sqsExpiresAfter = time.Second * 30
) )
type SQSEndpointConn struct { // SQSConn is an endpoint connection
type SQSConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
session *session.Session session *session.Session
@ -30,15 +30,16 @@ type SQSEndpointConn struct {
t time.Time t time.Time
} }
func (conn *SQSEndpointConn) generateSQSURL() string { func (conn *SQSConn) generateSQSURL() string {
return "https://sqs." + conn.ep.SQS.Region + "amazonaws.com/" + conn.ep.SQS.QueueID + "/" + conn.ep.SQS.QueueName return "https://sqs." + conn.ep.SQS.Region + "amazonaws.com/" + conn.ep.SQS.QueueID + "/" + conn.ep.SQS.QueueName
} }
func (conn *SQSEndpointConn) Expired() bool { // Expired returns true if the connection has expired
func (conn *SQSConn) Expired() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if !conn.ex { if !conn.ex {
if time.Now().Sub(conn.t) > SQSExpiresAfter { if time.Now().Sub(conn.t) > sqsExpiresAfter {
conn.ex = true conn.ex = true
conn.close() conn.close()
} }
@ -46,14 +47,15 @@ func (conn *SQSEndpointConn) Expired() bool {
return conn.ex return conn.ex
} }
func (conn *SQSEndpointConn) close() { func (conn *SQSConn) close() {
if conn.svc != nil { if conn.svc != nil {
conn.svc = nil conn.svc = nil
conn.session = nil conn.session = nil
} }
} }
func (conn *SQSEndpointConn) Send(msg string) error { // Send sends a message
func (conn *SQSConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@ -113,8 +115,8 @@ func (conn *SQSEndpointConn) Send(msg string) error {
return nil return nil
} }
func newSQSEndpointConn(ep Endpoint) *SQSEndpointConn { func newSQSConn(ep Endpoint) *SQSConn {
return &SQSEndpointConn{ return &SQSConn{
ep: ep, ep: ep,
t: time.Now(), t: time.Now(),
} }