From a0872036d4f8bea5fcb9e6aed03eb6f11b00eb0e Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sun, 11 Sep 2016 19:01:24 -0700 Subject: [PATCH] persistent endpoints --- controller/controller.go | 5 +- controller/disque.go | 59 ----------- controller/endpoint/disque.go | 123 ++++++++++++++++++++++ controller/{ => endpoint}/endpoint.go | 142 ++++++++++++++------------ controller/endpoint/http.go | 81 +++++++++++++++ controller/hooks.go | 10 +- controller/http.go | 43 -------- 7 files changed, 293 insertions(+), 170 deletions(-) delete mode 100644 controller/disque.go create mode 100644 controller/endpoint/disque.go rename controller/{ => endpoint}/endpoint.go (72%) create mode 100644 controller/endpoint/http.go delete mode 100644 controller/http.go diff --git a/controller/controller.go b/controller/controller.go index 92ebb9c1..171bb420 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -18,6 +18,7 @@ import ( "github.com/tidwall/buntdb" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/collection" + "github.com/tidwall/tile38/controller/endpoint" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/core" @@ -78,7 +79,7 @@ type Controller struct { conns map[*server.Conn]bool started time.Time - epc *EndpointManager + epc *endpoint.EndpointManager statsTotalConns int statsTotalCommands int @@ -113,7 +114,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener) error expires: make(map[string]map[string]time.Time), started: time.Now(), conns: make(map[*server.Conn]bool), - epc: NewEndpointCollection(), + epc: endpoint.NewEndpointManager(), } if err := os.MkdirAll(dir, 0700); err != nil { return err diff --git a/controller/disque.go b/controller/disque.go deleted file mode 100644 index 3964407e..00000000 --- a/controller/disque.go +++ /dev/null @@ -1,59 +0,0 @@ -package controller - -import ( - "errors" - "fmt" - "strings" - "sync" - "time" -) - -// TODO: add one connection pool per endpoint. Use Redigo. -// The current implementation is too slow. -var endpointDisqueMu sync.Mutex - -type endpointDisqueConn struct { - mu sync.Mutex -} - -var endpointDisqueM = make(map[string]*endpointDisqueConn) - -func sendDisqueMessage(endpoint Endpoint, msg []byte) error { - endpointDisqueMu.Lock() - conn, ok := endpointDisqueM[endpoint.Original] - if !ok { - conn = &endpointDisqueConn{ - //client: &http.Client{Transport: &http.Transport{}}, - } - endpointDisqueM[endpoint.Original] = conn - } - endpointDisqueMu.Unlock() - conn.mu.Lock() - defer conn.mu.Unlock() - - addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port) - dconn, err := DialTimeout(addr, time.Second/4) - if err != nil { - return err - } - defer dconn.Close() - options := []interface{}{endpoint.Disque.QueueName, msg, 0} - replicate := endpoint.Disque.Options.Replicate - if replicate > 0 { - options = append(options, "REPLICATE") - options = append(options, endpoint.Disque.Options.Replicate) - } - v, err := dconn.Do("ADDJOB", options...) - if err != nil { - return err - } - if v.Error() != nil { - return v.Error() - } - id := v.String() - p := strings.Split(id, "-") - if len(p) != 4 { - return errors.New("invalid disque reply") - } - return nil -} diff --git a/controller/endpoint/disque.go b/controller/endpoint/disque.go new file mode 100644 index 00000000..98daa92d --- /dev/null +++ b/controller/endpoint/disque.go @@ -0,0 +1,123 @@ +package endpoint + +import ( + "bufio" + "errors" + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" +) + +const ( + disqueExpiresAfter = time.Second * 30 + disqueDialTimeout = time.Second * 10 +) + +type DisqueEndpointConn struct { + mu sync.Mutex + ep Endpoint + ex bool + t time.Time + conn net.Conn + rd *bufio.Reader +} + +func newDisqueEndpointConn(ep Endpoint) *DisqueEndpointConn { + return &DisqueEndpointConn{ + ep: ep, + t: time.Now(), + } +} + +func (conn *DisqueEndpointConn) Expired() bool { + conn.mu.Lock() + defer conn.mu.Unlock() + if !conn.ex { + if time.Now().Sub(conn.t) > httpExpiresAfter { + if conn.conn != nil { + conn.close() + } + conn.ex = true + } + } + return conn.ex +} + +func (conn *DisqueEndpointConn) close() { + if conn.conn != nil { + conn.conn.Close() + conn.conn = nil + } + conn.rd = nil +} + +func (conn *DisqueEndpointConn) Send(msg string) error { + conn.mu.Lock() + defer conn.mu.Unlock() + if conn.ex { + return errors.New("expired") + } + conn.t = time.Now() + if conn.conn == nil { + addr := fmt.Sprintf("%s:%d", conn.ep.Disque.Host, conn.ep.Disque.Port) + var err error + conn.conn, err = net.DialTimeout("tcp", addr, disqueDialTimeout) + if err != nil { + return err + } + conn.rd = bufio.NewReader(conn.conn) + } + var args []string + args = append(args, "ADDJOB", conn.ep.Disque.QueueName, msg, "0") + if conn.ep.Disque.Options.Replicate > 0 { + args = append(args, "REPLICATE", strconv.FormatInt(int64(conn.ep.Disque.Options.Replicate), 10)) + } + cmd := buildRedisCommand(args) + if _, err := conn.conn.Write(cmd); err != nil { + conn.close() + return err + } + c, err := conn.rd.ReadByte() + if err != nil { + conn.close() + return err + } + if c != '-' && c != '+' { + conn.close() + return errors.New("invalid disque reply") + } + ln, err := conn.rd.ReadBytes('\n') + if err != nil { + conn.close() + return err + } + if len(ln) < 2 || ln[len(ln)-2] != '\r' { + conn.close() + return errors.New("invalid disque reply") + } + id := string(ln[:len(ln)-2]) + p := strings.Split(id, "-") + if len(p) != 4 { + conn.close() + return errors.New("invalid disque reply") + } + return nil +} + +func buildRedisCommand(args []string) []byte { + var cmd []byte + cmd = append(cmd, '*') + cmd = append(cmd, strconv.FormatInt(int64(len(args)), 10)...) + cmd = append(cmd, '\r', '\n') + for _, arg := range args { + cmd = append(cmd, '$') + cmd = append(cmd, strconv.FormatInt(int64(len(arg)), 10)...) + cmd = append(cmd, '\r', '\n') + cmd = append(cmd, arg...) + cmd = append(cmd, '\r', '\n') + } + return cmd +} diff --git a/controller/endpoint.go b/controller/endpoint/endpoint.go similarity index 72% rename from controller/endpoint.go rename to controller/endpoint/endpoint.go index c891c82f..877f85e2 100644 --- a/controller/endpoint.go +++ b/controller/endpoint/endpoint.go @@ -1,4 +1,4 @@ -package controller +package endpoint import ( "errors" @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "time" ) // EndpointProtocol is the type of protocol that the endpoint represents. @@ -16,68 +17,6 @@ const ( Disque = EndpointProtocol("disque") // Disque ) -type EndpointManager struct { - mu sync.RWMutex // this is intentionally exposed - Endpoints map[string]*Endpoint -} - -func NewEndpointCollection() *EndpointManager { - return &EndpointManager{ - Endpoints: make(map[string]*Endpoint), - } -} - -// Get finds an endpoint based on its url. If the enpoint does not -// exist a new only is created. -func (epc *EndpointManager) Validate(url string) error { - _, err := parseEndpoint(url) - return err - /* - pendpoint := epc.Endpoints[url] - if pendpoint == nil { - pendpoint = endpoint - epc.Endpoints[url] = pendpoint - } - return pendpoint, nil - */ -} - -// We use a retain/relase on endoints. -// The calls are directed to the collection instead of -// endpoint itself to avoid having a circular reference to -// the collection. -func (epc *EndpointManager) Open(endpoint string) { - epc.mu.Lock() - defer epc.mu.Unlock() - /* - ep.referenceCount++ - if ep.referenceCount == 1 { - ep.Open() - } - */ -} - -func (epc *EndpointManager) Close(endpoint string) { - epc.mu.Lock() - defer epc.mu.Unlock() - /* - ep.referenceCount-- - if ep.referenceCount < 0 { - panic("reference count below zero") - } - if ep.referenceCount == 0 { - ep.Close() - delete(epc.Endpoints, ep.Original) - } - */ -} -func (epc *EndpointManager) Send(endpoint, val string) error { - epc.mu.Lock() - defer epc.mu.Unlock() - return errors.New("unavailable") - return nil -} - // Endpoint represents an endpoint. type Endpoint struct { Protocol EndpointProtocol @@ -92,6 +31,83 @@ type Endpoint struct { } } +type EndpointConn interface { + Expired() bool + Send(val string) error +} + +type EndpointManager struct { + mu sync.RWMutex // this is intentionally exposed + conns map[string]EndpointConn +} + +func NewEndpointManager() *EndpointManager { + epc := &EndpointManager{ + conns: make(map[string]EndpointConn), + } + go epc.Run() + return epc +} +func (epc *EndpointManager) Run() { + for { + time.Sleep(time.Second) + func() { + epc.mu.Lock() + defer epc.mu.Unlock() + for endpoint, conn := range epc.conns { + if conn.Expired() { + delete(epc.conns, endpoint) + } + } + }() + } +} + +// Get finds an endpoint based on its url. If the enpoint does not +// exist a new only is created. +func (epc *EndpointManager) Validate(url string) error { + _, err := parseEndpoint(url) + return err +} + +func (epc *EndpointManager) Send(endpoint, val string) error { + epc.mu.Lock() + conn, ok := epc.conns[endpoint] + if !ok || conn.Expired() { + ep, err := parseEndpoint(endpoint) + if err != nil { + epc.mu.Unlock() + return err + } + switch ep.Protocol { + default: + return errors.New("invalid protocol") + case HTTP: + conn = newHTTPEndpointConn(ep) + case Disque: + conn = newDisqueEndpointConn(ep) + } + epc.conns[endpoint] = conn + } + epc.mu.Unlock() + return conn.Send(val) +} + +/* +func (conn *endpointConn) Expired() bool { + conn.mu.Lock() + defer conn.mu.Unlock() + println("is expired?", conn.ex) + return conn.ex +} + +func (conn *endpointConn) Send(val string) error { + conn.mu.Lock() + defer conn.mu.Unlock() + + return nil +} +*/ /* func (ep *Endpoint) Open() { ep.mu.Lock() diff --git a/controller/endpoint/http.go b/controller/endpoint/http.go new file mode 100644 index 00000000..c15a07bc --- /dev/null +++ b/controller/endpoint/http.go @@ -0,0 +1,81 @@ +package endpoint + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" + "time" +) + +const ( + httpExpiresAfter = time.Second * 30 + httpRequestTimeout = time.Second * 5 + httpMaxIdleConnections = 20 +) + +type HTTPEndpointConn struct { + mu sync.Mutex + ep Endpoint + ex bool + t time.Time + client *http.Client +} + +func newHTTPEndpointConn(ep Endpoint) *HTTPEndpointConn { + return &HTTPEndpointConn{ + ep: ep, + t: time.Now(), + } +} + +func (conn *HTTPEndpointConn) Expired() bool { + conn.mu.Lock() + defer conn.mu.Unlock() + if !conn.ex { + if time.Now().Sub(conn.t) > httpExpiresAfter { + conn.ex = true + conn.client = nil + } + } + return conn.ex +} + +func (conn *HTTPEndpointConn) Send(msg string) error { + conn.mu.Lock() + defer conn.mu.Unlock() + if conn.ex { + return errors.New("expired") + } + conn.t = time.Now() + if conn.client == nil { + conn.client = &http.Client{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: httpMaxIdleConnections, + }, + Timeout: httpRequestTimeout, + } + } + req, err := http.NewRequest("POST", conn.ep.Original, bytes.NewBufferString(msg)) + if err != nil { + return err + } + resp, err := conn.client.Do(req) + if err != nil { + return err + } + // close the connection to reuse it + defer resp.Body.Close() + // discard response + if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + return err + } + // we only care about the 200 response + if resp.StatusCode != 200 { + return fmt.Errorf("invalid status: %s", resp.Status) + } + return nil +} diff --git a/controller/hooks.go b/controller/hooks.go index 1a33e22a..23f00a15 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -11,6 +11,7 @@ import ( "github.com/tidwall/buntdb" "github.com/tidwall/resp" + "github.com/tidwall/tile38/controller/endpoint" "github.com/tidwall/tile38/controller/glob" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" @@ -284,7 +285,7 @@ type Hook struct { closed bool opened bool query string - epm *EndpointManager + epm *endpoint.EndpointManager } // Open is called when a hook is first created. It calls the manager @@ -342,12 +343,14 @@ func (h *Hook) manager() { } } -// proc processes queued hook logs. returning true will indicate that -// there's more to do and proc() should be called again asap. +// proc processes queued hook logs. +// returning true will indicate that all log entries have been +// successfully handled. func (h *Hook) proc() (ok bool) { var keys, vals []string var ttls []time.Duration err := h.db.Update(func(tx *buntdb.Tx) error { + // get keys and vals err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool { if strings.HasPrefix(key, hookLogPrefix) { @@ -359,6 +362,7 @@ func (h *Hook) proc() (ok bool) { if err != nil { return err } + // delete the keys for _, key := range keys { if hookLogTTL > 0 { diff --git a/controller/http.go b/controller/http.go deleted file mode 100644 index 49f0c84a..00000000 --- a/controller/http.go +++ /dev/null @@ -1,43 +0,0 @@ -package controller - -import ( - "bytes" - "fmt" - "io" - "io/ioutil" - "net/http" - "sync" -) - -var endpointHTTPMu sync.Mutex - -type endpointHTTPConn struct { - mu sync.Mutex - client *http.Client -} - -var endpointHTTPM = make(map[string]*endpointHTTPConn) - -func sendHTTPMessage(endpoint Endpoint, msg []byte) error { - endpointHTTPMu.Lock() - conn, ok := endpointHTTPM[endpoint.Original] - if !ok { - conn = &endpointHTTPConn{ - client: &http.Client{Transport: &http.Transport{}}, - } - endpointHTTPM[endpoint.Original] = conn - } - endpointHTTPMu.Unlock() - conn.mu.Lock() - defer conn.mu.Unlock() - res, err := conn.client.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg)) - if err != nil { - return err - } - io.Copy(ioutil.Discard, res.Body) - res.Body.Close() - if res.StatusCode != 200 { - return fmt.Errorf("endpoint returned status code %d", res.StatusCode) - } - return nil -}