grpc support

This commit is contained in:
Josh Baker 2016-09-11 21:09:02 -07:00
parent 89cfedeb08
commit f2026e1d8d
7 changed files with 325 additions and 43 deletions

View File

@ -5,15 +5,21 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/tidwall/tile38/controller" "github.com/tidwall/tile38/controller"
"github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/core" "github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/hservice"
) )
var ( var (
@ -26,15 +32,21 @@ var (
quiet bool quiet bool
) )
// Fire up a webhook test server by using the --webhook-consumer-port // Fire up a webhook test server by using the --webhook-consumer-http-port
// for example // for example
// $ ./tile38-server --webhook-consumer-port 9999 // $ ./tile38-server --webhook-http-consumer-port 9999
// //
// The create hooks like such... // The create hooks like such...
// SETHOOK myhook http://localhost:9999/myhook NEARBY mykey FENCE POINT 33.5 -115.5 1000 // SETHOOK myhook http://localhost:9999/myhook NEARBY mykey FENCE POINT 33.5 -115.5 1000
type hserver struct{}
func (s *hserver) Send(ctx context.Context, in *hservice.MessageRequest) (*hservice.MessageReply, error) {
return &hservice.MessageReply{true}, nil
}
func main() { func main() {
if len(os.Args) == 3 && os.Args[1] == "--webhook-consumer-port" { if len(os.Args) == 3 && os.Args[1] == "--webhook-http-consumer-port" {
log.Default = log.New(os.Stderr, &log.Config{}) log.Default = log.New(os.Stderr, &log.Config{})
port, err := strconv.ParseUint(os.Args[2], 10, 16) port, err := strconv.ParseUint(os.Args[2], 10, 16)
if err != nil { if err != nil {
@ -43,8 +55,7 @@ func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body) data, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
log.Error(err) log.Fatal(err)
return
} }
log.HTTPf("http: %s : %s", r.URL.Path, string(data)) log.HTTPf("http: %s : %s", r.URL.Path, string(data))
}) })
@ -55,6 +66,26 @@ func main() {
return return
} }
if len(os.Args) == 3 && os.Args[1] == "--webhook-grpc-consumer-port" {
log.Default = log.New(os.Stderr, &log.Config{})
port, err := strconv.ParseUint(os.Args[2], 10, 16)
if err != nil {
log.Fatal(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
hservice.RegisterHookServiceServer(s, &hserver{})
log.Infof("webhook server grpc://localhost:%d/", port)
if err := s.Serve(lis); err != nil {
log.Fatal(err)
}
return
}
// parse non standard args. // parse non standard args.
nargs := []string{os.Args[0]} nargs := []string{os.Args[0]}
for i := 1; i < len(os.Args); i++ { for i := 1; i < len(os.Args); i++ {

View File

@ -13,7 +13,6 @@ import (
const ( const (
disqueExpiresAfter = time.Second * 30 disqueExpiresAfter = time.Second * 30
disqueDialTimeout = time.Second * 10
) )
type DisqueEndpointConn struct { type DisqueEndpointConn struct {
@ -64,7 +63,7 @@ func (conn *DisqueEndpointConn) Send(msg string) error {
if conn.conn == nil { if conn.conn == nil {
addr := fmt.Sprintf("%s:%d", conn.ep.Disque.Host, conn.ep.Disque.Port) addr := fmt.Sprintf("%s:%d", conn.ep.Disque.Host, conn.ep.Disque.Port)
var err error var err error
conn.conn, err = net.DialTimeout("tcp", addr, disqueDialTimeout) conn.conn, err = net.Dial("tcp", addr)
if err != nil { if err != nil {
return err return err
} }

View File

@ -15,12 +15,17 @@ type EndpointProtocol string
const ( const (
HTTP = EndpointProtocol("http") // HTTP HTTP = EndpointProtocol("http") // HTTP
Disque = EndpointProtocol("disque") // Disque Disque = EndpointProtocol("disque") // Disque
GRPC = EndpointProtocol("grpc") // GRPC
) )
// Endpoint represents an endpoint. // Endpoint represents an endpoint.
type Endpoint struct { type Endpoint struct {
Protocol EndpointProtocol Protocol EndpointProtocol
Original string Original string
GRPC struct {
Host string
Port int
}
Disque struct { Disque struct {
Host string Host string
Port int Port int
@ -86,6 +91,8 @@ func (epc *EndpointManager) Send(endpoint, val string) error {
conn = newHTTPEndpointConn(ep) conn = newHTTPEndpointConn(ep)
case Disque: case Disque:
conn = newDisqueEndpointConn(ep) conn = newDisqueEndpointConn(ep)
case GRPC:
conn = newGRPCEndpointConn(ep)
} }
epc.conns[endpoint] = conn epc.conns[endpoint] = conn
} }
@ -93,41 +100,6 @@ func (epc *EndpointManager) Send(endpoint, val string) error {
return conn.Send(val) return conn.Send(val)
} }
/*
func (conn *endpointConn) Expired() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
println("is expired?", conn.ex)
return conn.ex
}
func (conn *endpointConn) Send(val string) error {
conn.mu.Lock()
defer conn.mu.Unlock()
return nil
}
*/
/*
func (ep *Endpoint) Open() {
ep.mu.Lock()
defer ep.mu.Unlock()
println("open " + ep.Original)
// Even though open is called we should wait until the a messages
// is sent before establishing a network connection.
}
func (ep *Endpoint) Close() {
ep.mu.Lock()
defer ep.mu.Unlock()
println("close " + ep.Original)
// Make sure to forece close the network connection here.
}
func (ep *Endpoint) Send() error {
return nil
}
*/
func parseEndpoint(s string) (Endpoint, error) { func parseEndpoint(s string) (Endpoint, error) {
var endpoint Endpoint var endpoint Endpoint
endpoint.Original = s endpoint.Original = s
@ -140,6 +112,8 @@ func parseEndpoint(s string) (Endpoint, error) {
endpoint.Protocol = HTTP endpoint.Protocol = HTTP
case strings.HasPrefix(s, "disque:"): case strings.HasPrefix(s, "disque:"):
endpoint.Protocol = Disque endpoint.Protocol = Disque
case strings.HasPrefix(s, "grpc:"):
endpoint.Protocol = GRPC
} }
s = s[strings.Index(s, ":")+1:] s = s[strings.Index(s, ":")+1:]
if !strings.HasPrefix(s, "//") { if !strings.HasPrefix(s, "//") {
@ -151,6 +125,23 @@ func parseEndpoint(s string) (Endpoint, error) {
if s == "" { if s == "" {
return endpoint, errors.New("missing host") return endpoint, errors.New("missing host")
} }
if endpoint.Protocol == GRPC {
dp := strings.Split(s, ":")
switch len(dp) {
default:
return endpoint, errors.New("invalid grpc url")
case 1:
endpoint.GRPC.Host = dp[0]
endpoint.GRPC.Port = 80
case 2:
endpoint.GRPC.Host = dp[0]
n, err := strconv.ParseUint(dp[1], 10, 16)
if err != nil {
return endpoint, errors.New("invalid grpc url")
}
endpoint.GRPC.Port = int(n)
}
}
if endpoint.Protocol == Disque { if endpoint.Protocol == Disque {
dp := strings.Split(s, ":") dp := strings.Split(s, ":")
switch len(dp) { switch len(dp) {

View File

@ -0,0 +1,83 @@
package endpoint
import (
"errors"
"fmt"
"sync"
"time"
"golang.org/x/net/context"
"github.com/tidwall/tile38/hservice"
"google.golang.org/grpc"
)
const (
grpcExpiresAfter = time.Second * 30
)
type GRPCEndpointConn struct {
mu sync.Mutex
ep Endpoint
ex bool
t time.Time
conn *grpc.ClientConn
sconn hservice.HookServiceClient
}
func newGRPCEndpointConn(ep Endpoint) *GRPCEndpointConn {
return &GRPCEndpointConn{
ep: ep,
t: time.Now(),
}
}
func (conn *GRPCEndpointConn) Expired() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
if !conn.ex {
if time.Now().Sub(conn.t) > grpcExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.ex = true
}
}
return conn.ex
}
func (conn *GRPCEndpointConn) close() {
if conn.conn != nil {
conn.conn.Close()
conn.conn = nil
}
}
func (conn *GRPCEndpointConn) Send(msg string) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ex {
return errors.New("expired")
}
conn.t = time.Now()
if conn.conn == nil {
addr := fmt.Sprintf("%s:%d", conn.ep.GRPC.Host, conn.ep.GRPC.Port)
var err error
conn.conn, err = grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
conn.close()
return err
}
conn.sconn = hservice.NewHookServiceClient(conn.conn)
}
r, err := conn.sconn.Send(context.Background(), &hservice.MessageRequest{msg})
if err != nil {
conn.close()
return err
}
if !r.Ok {
conn.close()
return errors.New("invalid grpc reply")
}
return nil
}

4
hservice/gen.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
cd $(dirname "${BASH_SOURCE[0]}")
protoc --go_out=plugins=grpc,import_path=hservice:. *.proto

151
hservice/hservice.pb.go Normal file
View File

@ -0,0 +1,151 @@
// Code generated by protoc-gen-go.
// source: hservice.proto
// DO NOT EDIT!
/*
Package hservice is a generated protocol buffer package.
It is generated from these files:
hservice.proto
It has these top-level messages:
MessageRequest
MessageReply
*/
package hservice
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// The request message containing the message value
type MessageRequest struct {
Value string `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
}
func (m *MessageRequest) Reset() { *m = MessageRequest{} }
func (m *MessageRequest) String() string { return proto.CompactTextString(m) }
func (*MessageRequest) ProtoMessage() {}
func (*MessageRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
// The response message containing an ok (true or false)
type MessageReply struct {
Ok bool `protobuf:"varint,1,opt,name=ok" json:"ok,omitempty"`
}
func (m *MessageReply) Reset() { *m = MessageReply{} }
func (m *MessageReply) String() string { return proto.CompactTextString(m) }
func (*MessageReply) ProtoMessage() {}
func (*MessageReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func init() {
proto.RegisterType((*MessageRequest)(nil), "hservice.MessageRequest")
proto.RegisterType((*MessageReply)(nil), "hservice.MessageReply")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
// Client API for HookService service
type HookServiceClient interface {
// Sends a greeting
Send(ctx context.Context, in *MessageRequest, opts ...grpc.CallOption) (*MessageReply, error)
}
type hookServiceClient struct {
cc *grpc.ClientConn
}
func NewHookServiceClient(cc *grpc.ClientConn) HookServiceClient {
return &hookServiceClient{cc}
}
func (c *hookServiceClient) Send(ctx context.Context, in *MessageRequest, opts ...grpc.CallOption) (*MessageReply, error) {
out := new(MessageReply)
err := grpc.Invoke(ctx, "/hservice.HookService/Send", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for HookService service
type HookServiceServer interface {
// Sends a greeting
Send(context.Context, *MessageRequest) (*MessageReply, error)
}
func RegisterHookServiceServer(s *grpc.Server, srv HookServiceServer) {
s.RegisterService(&_HookService_serviceDesc, srv)
}
func _HookService_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(MessageRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HookServiceServer).Send(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hservice.HookService/Send",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HookServiceServer).Send(ctx, req.(*MessageRequest))
}
return interceptor(ctx, in, info, handler)
}
var _HookService_serviceDesc = grpc.ServiceDesc{
ServiceName: "hservice.HookService",
HandlerType: (*HookServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Send",
Handler: _HookService_Send_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
}
func init() { proto.RegisterFile("hservice.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 168 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x28, 0x4e, 0x2d,
0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0xd4,
0xb8, 0xf8, 0x7c, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b,
0x84, 0x44, 0xb8, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83,
0x20, 0x1c, 0x25, 0x39, 0x2e, 0x1e, 0xb8, 0xba, 0x82, 0x9c, 0x4a, 0x21, 0x3e, 0x2e, 0xa6, 0xfc,
0x6c, 0xb0, 0x12, 0x8e, 0x20, 0xa6, 0xfc, 0x6c, 0x23, 0x4f, 0x2e, 0x6e, 0x8f, 0xfc, 0xfc, 0xec,
0x60, 0x88, 0xb1, 0x42, 0x56, 0x5c, 0x2c, 0xc1, 0xa9, 0x79, 0x29, 0x42, 0x12, 0x7a, 0x70, 0x9b,
0x51, 0xad, 0x91, 0x12, 0xc3, 0x22, 0x53, 0x90, 0x53, 0xa9, 0xc4, 0xe0, 0xa4, 0xc9, 0x25, 0x9c,
0x9c, 0x9f, 0xab, 0x57, 0x92, 0x99, 0x93, 0x6a, 0x6c, 0x01, 0x57, 0xe5, 0x24, 0x80, 0x64, 0x7e,
0x00, 0xc8, 0x17, 0x01, 0x8c, 0x49, 0x6c, 0x60, 0xef, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff,
0x6d, 0xd0, 0x2b, 0x13, 0xe0, 0x00, 0x00, 0x00,
}

23
hservice/hservice.proto Normal file
View File

@ -0,0 +1,23 @@
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.tile38.hservice";
option java_outer_classname = "HookServiceProto";
package hservice;
// The greeting service definition.
service HookService {
// Sends a greeting
rpc Send (MessageRequest) returns (MessageReply) {}
}
// The request message containing the message value
message MessageRequest {
string value = 1;
}
// The response message containing an ok (true or false)
message MessageReply {
bool ok = 1;
}