tile38/controller/endpoint/endpoint.go

295 lines
6.4 KiB
Go
Raw Normal View History

2016-09-12 05:01:24 +03:00
package endpoint
2016-09-11 17:49:48 +03:00
import (
"errors"
"net/url"
"strconv"
"strings"
"sync"
2016-09-12 05:01:24 +03:00
"time"
2016-09-11 17:49:48 +03:00
)
2017-02-10 15:27:02 +03:00
var errExpired = errors.New("expired")
2016-09-11 17:49:48 +03:00
// EndpointProtocol is the type of protocol that the endpoint represents.
type EndpointProtocol string
const (
HTTP = EndpointProtocol("http") // HTTP
Disque = EndpointProtocol("disque") // Disque
2016-09-12 07:09:02 +03:00
GRPC = EndpointProtocol("grpc") // GRPC
Redis = EndpointProtocol("redis") // Redis
Kafka = EndpointProtocol("kafka") // Kafka
2016-09-11 17:49:48 +03:00
)
2016-09-12 05:01:24 +03:00
// Endpoint represents an endpoint.
type Endpoint struct {
Protocol EndpointProtocol
Original string
2016-09-12 07:09:02 +03:00
GRPC struct {
Host string
Port int
}
Disque struct {
2016-09-12 05:01:24 +03:00
Host string
Port int
QueueName string
Options struct {
Replicate int
}
}
Redis struct {
2017-02-10 15:27:02 +03:00
Host string
Port int
Channel string
}
Kafka struct {
Host string
Port int
QueueName string
}
2016-09-12 05:01:24 +03:00
}
type EndpointConn interface {
Expired() bool
Send(val string) error
}
2016-09-11 17:49:48 +03:00
type EndpointManager struct {
2016-09-12 05:01:24 +03:00
mu sync.RWMutex // this is intentionally exposed
conns map[string]EndpointConn
2016-09-11 17:49:48 +03:00
}
2016-09-12 05:01:24 +03:00
func NewEndpointManager() *EndpointManager {
epc := &EndpointManager{
conns: make(map[string]EndpointConn),
}
go epc.Run()
return epc
}
// Manage connection at enpoints
// If some connection expired we should delete it
2016-09-12 05:01:24 +03:00
func (epc *EndpointManager) Run() {
for {
time.Sleep(time.Second)
func() {
epc.mu.Lock()
defer epc.mu.Unlock()
for endpoint, conn := range epc.conns {
if conn.Expired() {
delete(epc.conns, endpoint)
}
}
}()
2016-09-11 17:49:48 +03:00
}
}
// Get finds an endpoint based on its url. If the enpoint does not
// exist a new only is created.
func (epc *EndpointManager) Validate(url string) error {
_, err := parseEndpoint(url)
return err
}
2016-09-12 05:01:24 +03:00
func (epc *EndpointManager) Send(endpoint, val string) error {
2017-02-10 15:27:02 +03:00
for {
epc.mu.Lock()
conn, ok := epc.conns[endpoint]
if !ok || conn.Expired() {
ep, err := parseEndpoint(endpoint)
if err != nil {
epc.mu.Unlock()
return err
}
switch ep.Protocol {
default:
return errors.New("invalid protocol")
case HTTP:
conn = newHTTPEndpointConn(ep)
case Disque:
conn = newDisqueEndpointConn(ep)
case GRPC:
conn = newGRPCEndpointConn(ep)
case Redis:
conn = newRedisEndpointConn(ep)
case Kafka:
conn = newKafkaEndpointConn(ep)
2017-02-10 15:27:02 +03:00
}
epc.conns[endpoint] = conn
}
epc.mu.Unlock()
err := conn.Send(val)
2016-09-12 05:01:24 +03:00
if err != nil {
2017-02-10 15:27:02 +03:00
if err == errExpired {
// it's possible that the connection has expired in-between
// the last conn.Expired() check and now. If so, we should
// just try the send again.
continue
}
2016-09-12 05:01:24 +03:00
return err
2016-09-11 17:49:48 +03:00
}
2017-02-10 15:27:02 +03:00
return nil
2016-09-12 05:01:24 +03:00
}
2016-09-11 17:49:48 +03:00
}
func parseEndpoint(s string) (Endpoint, error) {
var endpoint Endpoint
endpoint.Original = s
switch {
default:
return endpoint, errors.New("unknown scheme")
case strings.HasPrefix(s, "http:"):
endpoint.Protocol = HTTP
case strings.HasPrefix(s, "https:"):
endpoint.Protocol = HTTP
case strings.HasPrefix(s, "disque:"):
endpoint.Protocol = Disque
2016-09-12 07:09:02 +03:00
case strings.HasPrefix(s, "grpc:"):
endpoint.Protocol = GRPC
case strings.HasPrefix(s, "redis:"):
endpoint.Protocol = Redis
case strings.HasPrefix(s, "kafka:"):
endpoint.Protocol = Kafka
2016-09-11 17:49:48 +03:00
}
2016-09-11 17:49:48 +03:00
s = s[strings.Index(s, ":")+1:]
if !strings.HasPrefix(s, "//") {
return endpoint, errors.New("missing the two slashes")
}
2016-09-11 17:49:48 +03:00
sqp := strings.Split(s[2:], "?")
sp := strings.Split(sqp[0], "/")
s = sp[0]
if s == "" {
return endpoint, errors.New("missing host")
}
2016-09-12 07:09:02 +03:00
if endpoint.Protocol == GRPC {
dp := strings.Split(s, ":")
switch len(dp) {
default:
return endpoint, errors.New("invalid grpc url")
case 1:
endpoint.GRPC.Host = dp[0]
endpoint.GRPC.Port = 80
case 2:
endpoint.GRPC.Host = dp[0]
n, err := strconv.ParseUint(dp[1], 10, 16)
if err != nil {
return endpoint, errors.New("invalid grpc url")
}
endpoint.GRPC.Port = int(n)
}
}
if endpoint.Protocol == Redis {
dp := strings.Split(s, ":")
switch len(dp) {
default:
return endpoint, errors.New("invalid redis url")
case 1:
endpoint.Redis.Host = dp[0]
endpoint.Redis.Port = 6379
case 2:
endpoint.Redis.Host = dp[0]
n, err := strconv.ParseUint(dp[1], 10, 16)
if err != nil {
return endpoint, errors.New("invalid redis url port")
}
endpoint.Redis.Port = int(n)
}
if len(sp) > 1 {
var err error
endpoint.Redis.Channel, err = url.QueryUnescape(sp[1])
if err != nil {
return endpoint, errors.New("invalid redis channel name")
}
}
}
2016-09-11 17:49:48 +03:00
if endpoint.Protocol == Disque {
dp := strings.Split(s, ":")
switch len(dp) {
default:
return endpoint, errors.New("invalid disque url")
case 1:
endpoint.Disque.Host = dp[0]
endpoint.Disque.Port = 7711
case 2:
endpoint.Disque.Host = dp[0]
n, err := strconv.ParseUint(dp[1], 10, 16)
if err != nil {
return endpoint, errors.New("invalid disque url")
}
endpoint.Disque.Port = int(n)
}
if len(sp) > 1 {
var err error
endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1])
if err != nil {
return endpoint, errors.New("invalid disque queue name")
}
}
if len(sqp) > 1 {
m, err := url.ParseQuery(sqp[1])
if err != nil {
return endpoint, errors.New("invalid disque url")
}
for key, val := range m {
if len(val) == 0 {
continue
}
switch key {
case "replicate":
n, err := strconv.ParseUint(val[0], 10, 8)
if err != nil {
return endpoint, errors.New("invalid disque replicate value")
}
endpoint.Disque.Options.Replicate = int(n)
}
}
}
if endpoint.Disque.QueueName == "" {
return endpoint, errors.New("missing disque queue name")
}
}
if endpoint.Protocol == Kafka {
// Parsing connection from URL string
hp := strings.Split(s, ":")
switch len(hp) {
default:
return endpoint, errors.New("invalid kafka url")
case 1:
endpoint.Kafka.Host = hp[0]
endpoint.Kafka.Port = 9092
case 2:
n, err := strconv.ParseUint(hp[1], 10, 16)
if err != nil {
return endpoint, errors.New("invalid kafka url port")
}
endpoint.Kafka.Host = hp[0]
endpoint.Kafka.Port = int(n)
}
// Parsing Kafka queue name
if len(sp) > 1 {
var err error
endpoint.Kafka.QueueName, err = url.QueryUnescape(sp[1])
if err != nil {
return endpoint, errors.New("invalid kafka topic name")
}
}
// Throw error if we not provide any queue name
if endpoint.Kafka.QueueName == "" {
return endpoint, errors.New("missing kafka topic name")
}
}
2016-09-11 17:49:48 +03:00
return endpoint, nil
}