atomic clients

This commit is contained in:
Josh Baker 2017-09-30 08:00:29 -07:00
parent 0db57db1f1
commit 770c6ad322
4 changed files with 120 additions and 63 deletions

View File

@ -1,6 +1,10 @@
package controller package controller
import "sync/atomic" import (
"sync"
"sync/atomic"
"time"
)
type aint struct{ v int64 } type aint struct{ v int64 }
@ -25,3 +29,41 @@ func (a *abool) set(t bool) bool {
} }
return atomic.SwapInt64(&a.v, 0) != 0 return atomic.SwapInt64(&a.v, 0) != 0
} }
type astring struct {
mu sync.Mutex
v string
}
func (a *astring) get() string {
a.mu.Lock()
p := a.v
a.mu.Unlock()
return p
}
func (a *astring) set(s string) string {
a.mu.Lock()
p := a.v
a.v = s
a.mu.Unlock()
return p
}
type atime struct {
mu sync.Mutex
v time.Time
}
func (a *atime) get() time.Time {
a.mu.Lock()
p := a.v
a.mu.Unlock()
return p
}
func (a *atime) set(t time.Time) time.Time {
a.mu.Lock()
p := a.v
a.v = t
a.mu.Unlock()
return p
}

View File

@ -19,6 +19,14 @@ type Conn struct {
wr *resp.Writer wr *resp.Writer
} }
type clientConn struct {
id int
name astring
opened atime
last atime
conn *server.Conn
}
// DialTimeout dials a resp server. // DialTimeout dials a resp server.
func DialTimeout(address string, timeout time.Duration) (*Conn, error) { func DialTimeout(address string, timeout time.Duration) (*Conn, error) {
tcpconn, err := net.DialTimeout("tcp", address, timeout) tcpconn, err := net.DialTimeout("tcp", address, timeout)
@ -73,18 +81,20 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
return "", errInvalidNumberOfArguments return "", errInvalidNumberOfArguments
} }
var list []*clientConn var list []*clientConn
for _, cc := range c.conns { c.connsmu.RLock()
for _, cc := range c.conns2 {
list = append(list, cc) list = append(list, cc)
} }
c.connsmu.RUnlock()
sort.Sort(byID(list)) sort.Sort(byID(list))
now := time.Now() now := time.Now()
var buf []byte var buf []byte
for _, cc := range list { for _, cc := range list {
buf = append(buf, buf = append(buf,
fmt.Sprintf("id=%d addr=%s name=%s age=%d idle=%d\n", fmt.Sprintf("id=%d addr=%s name=%s age=%d idle=%d\n",
cc.id, cc.conn.RemoteAddr().String(), cc.name, cc.id, cc.conn.RemoteAddr().String(), cc.name.get(),
now.Sub(cc.opened)/time.Second, now.Sub(cc.opened.get())/time.Second,
now.Sub(cc.last)/time.Second, now.Sub(cc.last.get())/time.Second,
)..., )...,
) )
} }
@ -104,9 +114,11 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
return "", errInvalidNumberOfArguments return "", errInvalidNumberOfArguments
} }
name := "" name := ""
if cc, ok := c.conns[conn]; ok { c.connsmu.RLock()
name = cc.name if cc, ok := c.conns2[conn]; ok {
name = cc.name.get()
} }
c.connsmu.RUnlock()
switch msg.OutputType { switch msg.OutputType {
case server.JSON: case server.JSON:
return `{"ok":true,"name":` + jsonString(name) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil return `{"ok":true,"name":` + jsonString(name) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil
@ -124,12 +136,15 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
name := msg.Values[2].String() name := msg.Values[2].String()
for i := 0; i < len(name); i++ { for i := 0; i < len(name); i++ {
if name[i] < '!' || name[i] > '~' { if name[i] < '!' || name[i] > '~' {
return "", errors.New("Client names cannot contain spaces, newlines or special characters.") errstr := "Client names cannot contain spaces, newlines or special characters."
return "", errors.New(errstr)
} }
} }
if cc, ok := c.conns[conn]; ok { c.connsmu.RLock()
cc.name = name if cc, ok := c.conns2[conn]; ok {
cc.name.set(name)
} }
c.connsmu.RUnlock()
switch msg.OutputType { switch msg.OutputType {
case server.JSON: case server.JSON:
return `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil return `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil
@ -171,7 +186,8 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
} }
} }
var cclose *clientConn var cclose *clientConn
for _, cc := range c.conns { c.connsmu.RLock()
for _, cc := range c.conns2 {
if useID && fmt.Sprintf("%d", cc.id) == id { if useID && fmt.Sprintf("%d", cc.id) == id {
cclose = cc cclose = cc
break break
@ -180,6 +196,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
break break
} }
} }
c.connsmu.RUnlock()
if cclose == nil { if cclose == nil {
return "", errors.New("No such client") return "", errors.New("No such client")
} }

View File

@ -58,27 +58,38 @@ func (col *collectionT) Less(item btree.Item, ctx interface{}) bool {
return col.Key < item.(*collectionT).Key return col.Key < item.(*collectionT).Key
} }
type clientConn struct {
id uint64
name string
opened time.Time
last time.Time
conn *server.Conn
}
// Controller is a tile38 controller // Controller is a tile38 controller
type Controller struct { type Controller struct {
mu sync.RWMutex // static values
host string host string
port int port int
http bool
started time.Time
config *Config
epc *endpoint.EndpointManager
// atomics
followc aint // counter increases when follow property changes
statsTotalConns aint
statsTotalCommands aint
statsExpired aint
lastShrinkDuration aint
currentShrinkStart atime
stopBackgroundExpiring abool
stopWatchingMemory abool
stopWatchingAutoGC abool
outOfMemory abool
connsmu sync.RWMutex
conns2 map[*server.Conn]*clientConn
mu sync.RWMutex
f *os.File f *os.File
qdb *buntdb.DB // hook queue log qdb *buntdb.DB // hook queue log
qidx uint64 // hook queue log last idx qidx uint64 // hook queue log last idx
cols *btree.BTree cols *btree.BTree
aofsz int aofsz int
dir string dir string
config *Config
followc aint // counter increases when follow property changes
follows map[*bytes.Buffer]bool follows map[*bytes.Buffer]bool
fcond *sync.Cond fcond *sync.Cond
lstack []*commandDetailsT lstack []*commandDetailsT
@ -93,24 +104,6 @@ type Controller struct {
aofconnM map[net.Conn]bool aofconnM map[net.Conn]bool
expires map[string]map[string]time.Time expires map[string]map[string]time.Time
exlist []exitem exlist []exitem
conns map[*server.Conn]*clientConn
started time.Time
http bool
epc *endpoint.EndpointManager
// counters
statsTotalConns aint
statsTotalCommands aint
statsExpired aint
lastShrinkDuration time.Duration
currentShrinkStart time.Time
stopBackgroundExpiring abool
stopWatchingMemory abool
stopWatchingAutoGC abool
outOfMemory abool
} }
// ListenAndServe starts a new tile38 server // ListenAndServe starts a new tile38 server
@ -133,7 +126,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
aofconnM: make(map[net.Conn]bool), aofconnM: make(map[net.Conn]bool),
expires: make(map[string]map[string]time.Time), expires: make(map[string]map[string]time.Time),
started: time.Now(), started: time.Now(),
conns: make(map[*server.Conn]*clientConn), conns2: make(map[*server.Conn]*clientConn),
epc: endpoint.NewEndpointManager(), epc: endpoint.NewEndpointManager(),
http: http, http: http,
} }
@ -198,11 +191,11 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
c.stopWatchingAutoGC.set(true) c.stopWatchingAutoGC.set(true)
}() }()
handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error {
c.mu.Lock() c.connsmu.RLock()
if cc, ok := c.conns[conn]; ok { if cc, ok := c.conns2[conn]; ok {
cc.last = time.Now() cc.last.set(time.Now())
} }
c.mu.Unlock() c.connsmu.RUnlock()
c.statsTotalCommands.add(1) c.statsTotalCommands.add(1)
err := c.handleInputCommand(conn, msg, w) err := c.handleInputCommand(conn, msg, w)
if err != nil { if err != nil {
@ -228,9 +221,8 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
return is return is
} }
var clientID uint64 var clientID aint
opened := func(conn *server.Conn) { opened := func(conn *server.Conn) {
c.mu.Lock()
if c.config.keepAlive() > 0 { if c.config.keepAlive() > 0 {
err := conn.SetKeepAlive( err := conn.SetKeepAlive(
time.Duration(c.config.keepAlive()) * time.Second) time.Duration(c.config.keepAlive()) * time.Second)
@ -239,20 +231,23 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
conn.RemoteAddr().String()) conn.RemoteAddr().String())
} }
} }
clientID++
c.conns[conn] = &clientConn{ cc := &clientConn{}
id: clientID, cc.id = clientID.add(1)
opened: time.Now(), cc.opened.set(time.Now())
conn: conn, cc.conn = conn
}
c.connsmu.Lock()
c.conns2[conn] = cc
c.connsmu.Unlock()
c.statsTotalConns.add(1) c.statsTotalConns.add(1)
c.mu.Unlock()
} }
closed := func(conn *server.Conn) { closed := func(conn *server.Conn) {
c.mu.Lock() c.connsmu.Lock()
delete(c.conns, conn) delete(c.conns2, conn)
c.mu.Unlock() c.connsmu.Unlock()
} }
return server.ListenAndServe(host, port, protected, handler, opened, closed, ln, http) return server.ListenAndServe(host, port, protected, handler, opened, closed, ln, http)
} }

View File

@ -145,7 +145,9 @@ func (c *Controller) writeInfoServer(w *bytes.Buffer) {
fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", time.Now().Sub(c.started)/time.Second) //Number of seconds since Redis server start fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", time.Now().Sub(c.started)/time.Second) //Number of seconds since Redis server start
} }
func (c *Controller) writeInfoClients(w *bytes.Buffer) { func (c *Controller) writeInfoClients(w *bytes.Buffer) {
fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns)) // Number of client connections (excluding connections from slaves) c.connsmu.RLock()
fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns2)) // Number of client connections (excluding connections from slaves)
c.connsmu.RUnlock()
} }
func (c *Controller) writeInfoMemory(w *bytes.Buffer) { func (c *Controller) writeInfoMemory(w *bytes.Buffer) {
var mem runtime.MemStats var mem runtime.MemStats
@ -161,11 +163,12 @@ func boolInt(t bool) int {
func (c *Controller) writeInfoPersistence(w *bytes.Buffer) { func (c *Controller) writeInfoPersistence(w *bytes.Buffer) {
fmt.Fprintf(w, "aof_enabled:1\r\n") fmt.Fprintf(w, "aof_enabled:1\r\n")
fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(c.shrinking)) // Flag indicating a AOF rewrite operation is on-going fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(c.shrinking)) // Flag indicating a AOF rewrite operation is on-going
fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", c.lastShrinkDuration/time.Second) // Duration of the last AOF rewrite operation in seconds fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", c.lastShrinkDuration.get()/int(time.Second)) // Duration of the last AOF rewrite operation in seconds
if c.currentShrinkStart.IsZero() { currentShrinkStart := c.currentShrinkStart.get()
if currentShrinkStart.IsZero() {
fmt.Fprintf(w, "aof_current_rewrite_time_sec:0\r\n") // Duration of the on-going AOF rewrite operation if any fmt.Fprintf(w, "aof_current_rewrite_time_sec:0\r\n") // Duration of the on-going AOF rewrite operation if any
} else { } else {
fmt.Fprintf(w, "aof_current_rewrite_time_sec:%d\r\n", time.Now().Sub(c.currentShrinkStart)/time.Second) // Duration of the on-going AOF rewrite operation if any fmt.Fprintf(w, "aof_current_rewrite_time_sec:%d\r\n", time.Now().Sub(currentShrinkStart)/time.Second) // Duration of the on-going AOF rewrite operation if any
} }
} }