From 770c6ad322bb6d339a32bf8280a7c8c5f16686bc Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 30 Sep 2017 08:00:29 -0700 Subject: [PATCH] atomic clients --- controller/atomic.go | 44 +++++++++++++++++++- controller/client.go | 37 ++++++++++++----- controller/controller.go | 89 +++++++++++++++++++--------------------- controller/stats.go | 13 +++--- 4 files changed, 120 insertions(+), 63 deletions(-) diff --git a/controller/atomic.go b/controller/atomic.go index ca0aabb3..449050aa 100644 --- a/controller/atomic.go +++ b/controller/atomic.go @@ -1,6 +1,10 @@ package controller -import "sync/atomic" +import ( + "sync" + "sync/atomic" + "time" +) type aint struct{ v int64 } @@ -25,3 +29,41 @@ func (a *abool) set(t bool) bool { } return atomic.SwapInt64(&a.v, 0) != 0 } + +type astring struct { + mu sync.Mutex + v string +} + +func (a *astring) get() string { + a.mu.Lock() + p := a.v + a.mu.Unlock() + return p +} +func (a *astring) set(s string) string { + a.mu.Lock() + p := a.v + a.v = s + a.mu.Unlock() + return p +} + +type atime struct { + mu sync.Mutex + v time.Time +} + +func (a *atime) get() time.Time { + a.mu.Lock() + p := a.v + a.mu.Unlock() + return p +} +func (a *atime) set(t time.Time) time.Time { + a.mu.Lock() + p := a.v + a.v = t + a.mu.Unlock() + return p +} diff --git a/controller/client.go b/controller/client.go index 716c11c7..43a6e04d 100644 --- a/controller/client.go +++ b/controller/client.go @@ -19,6 +19,14 @@ type Conn struct { wr *resp.Writer } +type clientConn struct { + id int + name astring + opened atime + last atime + conn *server.Conn +} + // DialTimeout dials a resp server. func DialTimeout(address string, timeout time.Duration) (*Conn, error) { tcpconn, err := net.DialTimeout("tcp", address, timeout) @@ -73,18 +81,20 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, return "", errInvalidNumberOfArguments } var list []*clientConn - for _, cc := range c.conns { + c.connsmu.RLock() + for _, cc := range c.conns2 { list = append(list, cc) } + c.connsmu.RUnlock() sort.Sort(byID(list)) now := time.Now() var buf []byte for _, cc := range list { buf = append(buf, fmt.Sprintf("id=%d addr=%s name=%s age=%d idle=%d\n", - cc.id, cc.conn.RemoteAddr().String(), cc.name, - now.Sub(cc.opened)/time.Second, - now.Sub(cc.last)/time.Second, + cc.id, cc.conn.RemoteAddr().String(), cc.name.get(), + now.Sub(cc.opened.get())/time.Second, + now.Sub(cc.last.get())/time.Second, )..., ) } @@ -104,9 +114,11 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, return "", errInvalidNumberOfArguments } name := "" - if cc, ok := c.conns[conn]; ok { - name = cc.name + c.connsmu.RLock() + if cc, ok := c.conns2[conn]; ok { + name = cc.name.get() } + c.connsmu.RUnlock() switch msg.OutputType { case server.JSON: return `{"ok":true,"name":` + jsonString(name) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil @@ -124,12 +136,15 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, name := msg.Values[2].String() for i := 0; i < len(name); i++ { if name[i] < '!' || name[i] > '~' { - return "", errors.New("Client names cannot contain spaces, newlines or special characters.") + errstr := "Client names cannot contain spaces, newlines or special characters." + return "", errors.New(errstr) } } - if cc, ok := c.conns[conn]; ok { - cc.name = name + c.connsmu.RLock() + if cc, ok := c.conns2[conn]; ok { + cc.name.set(name) } + c.connsmu.RUnlock() switch msg.OutputType { case server.JSON: return `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil @@ -171,7 +186,8 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, } } var cclose *clientConn - for _, cc := range c.conns { + c.connsmu.RLock() + for _, cc := range c.conns2 { if useID && fmt.Sprintf("%d", cc.id) == id { cclose = cc break @@ -180,6 +196,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, break } } + c.connsmu.RUnlock() if cclose == nil { return "", errors.New("No such client") } diff --git a/controller/controller.go b/controller/controller.go index 7e658f40..0d667787 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -58,27 +58,38 @@ func (col *collectionT) Less(item btree.Item, ctx interface{}) bool { return col.Key < item.(*collectionT).Key } -type clientConn struct { - id uint64 - name string - opened time.Time - last time.Time - conn *server.Conn -} - // Controller is a tile38 controller type Controller struct { + // static values + host string + port int + http bool + started time.Time + config *Config + epc *endpoint.EndpointManager + + // atomics + followc aint // counter increases when follow property changes + statsTotalConns aint + statsTotalCommands aint + statsExpired aint + lastShrinkDuration aint + currentShrinkStart atime + stopBackgroundExpiring abool + stopWatchingMemory abool + stopWatchingAutoGC abool + outOfMemory abool + + connsmu sync.RWMutex + conns2 map[*server.Conn]*clientConn + mu sync.RWMutex - host string - port int f *os.File qdb *buntdb.DB // hook queue log qidx uint64 // hook queue log last idx cols *btree.BTree aofsz int dir string - config *Config - followc aint // counter increases when follow property changes follows map[*bytes.Buffer]bool fcond *sync.Cond lstack []*commandDetailsT @@ -93,24 +104,6 @@ type Controller struct { aofconnM map[net.Conn]bool expires map[string]map[string]time.Time exlist []exitem - conns map[*server.Conn]*clientConn - started time.Time - http bool - - epc *endpoint.EndpointManager - - // counters - statsTotalConns aint - statsTotalCommands aint - statsExpired aint - - lastShrinkDuration time.Duration - currentShrinkStart time.Time - - stopBackgroundExpiring abool - stopWatchingMemory abool - stopWatchingAutoGC abool - outOfMemory abool } // ListenAndServe starts a new tile38 server @@ -133,7 +126,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http aofconnM: make(map[net.Conn]bool), expires: make(map[string]map[string]time.Time), started: time.Now(), - conns: make(map[*server.Conn]*clientConn), + conns2: make(map[*server.Conn]*clientConn), epc: endpoint.NewEndpointManager(), http: http, } @@ -198,11 +191,11 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http c.stopWatchingAutoGC.set(true) }() handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { - c.mu.Lock() - if cc, ok := c.conns[conn]; ok { - cc.last = time.Now() + c.connsmu.RLock() + if cc, ok := c.conns2[conn]; ok { + cc.last.set(time.Now()) } - c.mu.Unlock() + c.connsmu.RUnlock() c.statsTotalCommands.add(1) err := c.handleInputCommand(conn, msg, w) if err != nil { @@ -228,9 +221,8 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http return is } - var clientID uint64 + var clientID aint opened := func(conn *server.Conn) { - c.mu.Lock() if c.config.keepAlive() > 0 { err := conn.SetKeepAlive( time.Duration(c.config.keepAlive()) * time.Second) @@ -239,20 +231,23 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http conn.RemoteAddr().String()) } } - clientID++ - c.conns[conn] = &clientConn{ - id: clientID, - opened: time.Now(), - conn: conn, - } + + cc := &clientConn{} + cc.id = clientID.add(1) + cc.opened.set(time.Now()) + cc.conn = conn + + c.connsmu.Lock() + c.conns2[conn] = cc + c.connsmu.Unlock() + c.statsTotalConns.add(1) - c.mu.Unlock() } closed := func(conn *server.Conn) { - c.mu.Lock() - delete(c.conns, conn) - c.mu.Unlock() + c.connsmu.Lock() + delete(c.conns2, conn) + c.connsmu.Unlock() } return server.ListenAndServe(host, port, protected, handler, opened, closed, ln, http) } diff --git a/controller/stats.go b/controller/stats.go index deb908d7..5c7aea9b 100644 --- a/controller/stats.go +++ b/controller/stats.go @@ -145,7 +145,9 @@ func (c *Controller) writeInfoServer(w *bytes.Buffer) { fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", time.Now().Sub(c.started)/time.Second) //Number of seconds since Redis server start } func (c *Controller) writeInfoClients(w *bytes.Buffer) { - fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns)) // Number of client connections (excluding connections from slaves) + c.connsmu.RLock() + fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns2)) // Number of client connections (excluding connections from slaves) + c.connsmu.RUnlock() } func (c *Controller) writeInfoMemory(w *bytes.Buffer) { var mem runtime.MemStats @@ -160,12 +162,13 @@ func boolInt(t bool) int { } func (c *Controller) writeInfoPersistence(w *bytes.Buffer) { fmt.Fprintf(w, "aof_enabled:1\r\n") - fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(c.shrinking)) // Flag indicating a AOF rewrite operation is on-going - fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", c.lastShrinkDuration/time.Second) // Duration of the last AOF rewrite operation in seconds - if c.currentShrinkStart.IsZero() { + fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(c.shrinking)) // Flag indicating a AOF rewrite operation is on-going + fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", c.lastShrinkDuration.get()/int(time.Second)) // Duration of the last AOF rewrite operation in seconds + currentShrinkStart := c.currentShrinkStart.get() + if currentShrinkStart.IsZero() { fmt.Fprintf(w, "aof_current_rewrite_time_sec:0\r\n") // Duration of the on-going AOF rewrite operation if any } else { - fmt.Fprintf(w, "aof_current_rewrite_time_sec:%d\r\n", time.Now().Sub(c.currentShrinkStart)/time.Second) // Duration of the on-going AOF rewrite operation if any + fmt.Fprintf(w, "aof_current_rewrite_time_sec:%d\r\n", time.Now().Sub(currentShrinkStart)/time.Second) // Duration of the on-going AOF rewrite operation if any } }