Merge remote-tracking branch 'upstream/master'

This commit is contained in:
mike kabischev 2017-02-10 16:20:12 +03:00
commit 009c52d365
8 changed files with 82 additions and 29 deletions

View File

@ -20,9 +20,10 @@ const (
ProtectedMode = "protected-mode" ProtectedMode = "protected-mode"
MaxMemory = "maxmemory" MaxMemory = "maxmemory"
AutoGC = "autogc" AutoGC = "autogc"
KeepAlive = "keepalive"
) )
var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC} var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive}
// Config is a tile38 config // Config is a tile38 config
type Config struct { type Config struct {
@ -44,6 +45,8 @@ type Config struct {
MaxMemory int `json:"-"` MaxMemory int `json:"-"`
AutoGCP string `json:"autogc,omitempty"` AutoGCP string `json:"autogc,omitempty"`
AutoGC uint64 `json:"-"` AutoGC uint64 `json:"-"`
KeepAliveP string `json:"keepalive,omitempty"`
KeepAlive int `json:"-"`
} }
func (c *Controller) loadConfig() error { func (c *Controller) loadConfig() error {
@ -74,6 +77,9 @@ func (c *Controller) loadConfig() error {
if err := c.setConfigProperty(AutoGC, c.config.AutoGCP, true); err != nil { if err := c.setConfigProperty(AutoGC, c.config.AutoGCP, true); err != nil {
return err return err
} }
if err := c.setConfigProperty(KeepAlive, c.config.KeepAliveP, true); err != nil {
return err
}
return nil return nil
} }
@ -161,7 +167,19 @@ func (c *Controller) setConfigProperty(name, value string, fromLoad bool) error
default: default:
invalid = true invalid = true
} }
case KeepAlive:
if value == "" {
c.config.KeepAlive = 300
} else {
keepalive, err := strconv.ParseUint(value, 10, 64)
if err != nil {
invalid = true
} else {
c.config.KeepAlive = int(keepalive)
}
}
} }
if invalid { if invalid {
return fmt.Errorf("Invalid argument '%s' for CONFIG SET '%s'", value, name) return fmt.Errorf("Invalid argument '%s' for CONFIG SET '%s'", value, name)
} }
@ -192,6 +210,8 @@ func (c *Controller) getConfigProperty(name string) string {
return c.config.ProtectedMode return c.config.ProtectedMode
case MaxMemory: case MaxMemory:
return formatMemSize(c.config.MaxMemory) return formatMemSize(c.config.MaxMemory)
case KeepAlive:
return strconv.FormatUint(uint64(c.config.KeepAlive), 10)
} }
} }
@ -216,6 +236,7 @@ func (c *Controller) writeConfig(writeProperties bool) error {
c.config.ProtectedModeP = c.config.ProtectedMode c.config.ProtectedModeP = c.config.ProtectedMode
c.config.MaxMemoryP = formatMemSize(c.config.MaxMemory) c.config.MaxMemoryP = formatMemSize(c.config.MaxMemory)
c.config.AutoGCP = strconv.FormatUint(c.config.AutoGC, 10) c.config.AutoGCP = strconv.FormatUint(c.config.AutoGC, 10)
c.config.KeepAliveP = strconv.FormatUint(uint64(c.config.KeepAlive), 10)
} }
var data []byte var data []byte
data, err = json.MarshalIndent(c.config, "", "\t") data, err = json.MarshalIndent(c.config, "", "\t")

View File

@ -227,8 +227,17 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
return is return is
} }
var clientId uint64 var clientId uint64
opened := func(conn *server.Conn) { opened := func(conn *server.Conn) {
c.mu.Lock() c.mu.Lock()
if c.config.KeepAlive > 0 {
err := conn.SetKeepAlive(
time.Duration(c.config.KeepAlive) * time.Second)
if err != nil {
log.Warnf("could not set keepalive for connection: %v",
conn.RemoteAddr().String())
}
}
clientId++ clientId++
c.conns[conn] = &clientConn{ c.conns[conn] = &clientConn{
id: clientId, id: clientId,

View File

@ -57,7 +57,7 @@ func (conn *DisqueEndpointConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ex { if conn.ex {
return errors.New("expired") return errExpired
} }
conn.t = time.Now() conn.t = time.Now()
if conn.conn == nil { if conn.conn == nil {

View File

@ -9,6 +9,8 @@ import (
"time" "time"
) )
var errExpired = errors.New("expired")
// EndpointProtocol is the type of protocol that the endpoint represents. // EndpointProtocol is the type of protocol that the endpoint represents.
type EndpointProtocol string type EndpointProtocol string
@ -36,9 +38,9 @@ type Endpoint struct {
} }
} }
Redis struct { Redis struct {
Host string Host string
Port int Port int
Channel string Channel string
} }
} }
@ -82,30 +84,42 @@ func (epc *EndpointManager) Validate(url string) error {
} }
func (epc *EndpointManager) Send(endpoint, val string) error { func (epc *EndpointManager) Send(endpoint, val string) error {
epc.mu.Lock() for {
conn, ok := epc.conns[endpoint] epc.mu.Lock()
if !ok || conn.Expired() { conn, ok := epc.conns[endpoint]
ep, err := parseEndpoint(endpoint) if !ok || conn.Expired() {
ep, err := parseEndpoint(endpoint)
if err != nil {
epc.mu.Unlock()
return err
}
switch ep.Protocol {
default:
return errors.New("invalid protocol")
case HTTP:
conn = newHTTPEndpointConn(ep)
case Disque:
conn = newDisqueEndpointConn(ep)
case GRPC:
conn = newGRPCEndpointConn(ep)
case Redis:
conn = newRedisEndpointConn(ep)
}
epc.conns[endpoint] = conn
}
epc.mu.Unlock()
err := conn.Send(val)
if err != nil { if err != nil {
epc.mu.Unlock() if err == errExpired {
// it's possible that the connection has expired in-between
// the last conn.Expired() check and now. If so, we should
// just try the send again.
continue
}
return err return err
} }
switch ep.Protocol { return nil
default:
return errors.New("invalid protocol")
case HTTP:
conn = newHTTPEndpointConn(ep)
case Disque:
conn = newDisqueEndpointConn(ep)
case GRPC:
conn = newGRPCEndpointConn(ep)
case Redis:
conn = newRedisEndpointConn(ep)
}
epc.conns[endpoint] = conn
} }
epc.mu.Unlock()
return conn.Send(val)
} }
func parseEndpoint(s string) (Endpoint, error) { func parseEndpoint(s string) (Endpoint, error) {

View File

@ -57,7 +57,7 @@ func (conn *GRPCEndpointConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ex { if conn.ex {
return errors.New("expired") return errExpired
} }
conn.t = time.Now() conn.t = time.Now()
if conn.conn == nil { if conn.conn == nil {

View File

@ -2,7 +2,6 @@ package endpoint
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -40,7 +39,7 @@ func (conn *HTTPEndpointConn) Send(msg string) error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ex { if conn.ex {
return errors.New("expired") return errExpired
} }
conn.t = time.Now() conn.t = time.Now()
if conn.client == nil { if conn.client == nil {

View File

@ -56,7 +56,7 @@ func (conn *RedisEndpointConn) Send(msg string) error {
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ex { if conn.ex {
return errors.New("expired") return errExpired
} }
conn.t = time.Now() conn.t = time.Now()

View File

@ -45,6 +45,16 @@ type Conn struct {
Authenticated bool Authenticated bool
} }
func (conn Conn) SetKeepAlive(period time.Duration) error {
if tcp, ok := conn.Conn.(*net.TCPConn); ok {
if err := tcp.SetKeepAlive(true); err != nil {
return err
}
return tcp.SetKeepAlivePeriod(period)
}
return nil
}
var errCloseHTTP = errors.New("close http") var errCloseHTTP = errors.New("close http")
// ListenAndServe starts a tile38 server at the specified address. // ListenAndServe starts a tile38 server at the specified address.