diff --git a/controller/atomic.go b/controller/atomic.go new file mode 100644 index 00000000..ca0aabb3 --- /dev/null +++ b/controller/atomic.go @@ -0,0 +1,27 @@ +package controller + +import "sync/atomic" + +type aint struct{ v int64 } + +func (a *aint) add(d int) int { + return int(atomic.AddInt64(&a.v, int64(d))) +} +func (a *aint) get() int { + return int(atomic.LoadInt64(&a.v)) +} +func (a *aint) set(i int) int { + return int(atomic.SwapInt64(&a.v, int64(i))) +} + +type abool struct{ v int64 } + +func (a *abool) on() bool { + return atomic.LoadInt64(&a.v) != 0 +} +func (a *abool) set(t bool) bool { + if t { + return atomic.SwapInt64(&a.v, 1) != 0 + } + return atomic.SwapInt64(&a.v, 0) != 0 +} diff --git a/controller/controller.go b/controller/controller.go index d0cbce9f..e84ff733 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -99,17 +99,18 @@ type Controller struct { epc *endpoint.EndpointManager - statsTotalConns int - statsTotalCommands int - statsExpired int + // counters + statsTotalConns aint + statsTotalCommands aint + statsExpired aint lastShrinkDuration time.Duration currentShrinkStart time.Time - stopBackgroundExpiring bool - stopWatchingMemory bool - stopWatchingAutoGC bool - outOfMemory bool + stopBackgroundExpiring abool + stopWatchingMemory abool + stopWatchingAutoGC abool + outOfMemory abool } // ListenAndServe starts a new tile38 server @@ -180,12 +181,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http if err := c.loadAOF(); err != nil { return err } - c.mu.Lock() c.fillExpiresList() if c.config.followHost() != "" { go c.follow(c.config.followHost(), c.config.followPort(), c.followc) } - c.mu.Unlock() defer func() { c.mu.Lock() c.followc++ // this will force any follow communication to die @@ -196,18 +195,16 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http go c.watchGC() go c.backgroundExpiring() defer func() { - c.mu.Lock() - c.stopBackgroundExpiring = true - c.stopWatchingMemory = true - c.stopWatchingAutoGC = true - c.mu.Unlock() + c.stopBackgroundExpiring.set(true) + c.stopWatchingMemory.set(true) + c.stopWatchingAutoGC.set(true) }() handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { c.mu.Lock() if cc, ok := c.conns[conn]; ok { cc.last = time.Now() } - c.statsTotalCommands++ + c.statsTotalCommands.add(1) c.mu.Unlock() err := c.handleInputCommand(conn, msg, w) if err != nil { @@ -232,6 +229,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http c.mu.RUnlock() return is } + var clientID uint64 opened := func(conn *server.Conn) { c.mu.Lock() @@ -249,9 +247,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http opened: time.Now(), conn: conn, } - c.statsTotalConns++ + c.statsTotalConns.add(1) c.mu.Unlock() } + closed := func(conn *server.Conn) { c.mu.Lock() delete(c.conns, conn) @@ -268,7 +267,7 @@ func (c *Controller) watchGC() { for range t.C { c.mu.RLock() - if c.stopWatchingAutoGC { + if c.stopWatchingAutoGC.on() { c.mu.RUnlock() return } @@ -307,16 +306,16 @@ func (c *Controller) watchMemory() { for range t.C { func() { c.mu.RLock() - if c.stopWatchingMemory { + if c.stopWatchingMemory.on() { c.mu.RUnlock() return } - oom := c.outOfMemory + oom := c.outOfMemory.on() c.mu.RUnlock() if c.config.maxMemory() == 0 { if oom { c.mu.Lock() - c.outOfMemory = false + c.outOfMemory.set(false) c.mu.Unlock() } return @@ -326,7 +325,7 @@ func (c *Controller) watchMemory() { } runtime.ReadMemStats(&mem) c.mu.Lock() - c.outOfMemory = int(mem.HeapAlloc) > c.config.maxMemory() + c.outOfMemory.set(int(mem.HeapAlloc) > c.config.maxMemory()) c.mu.Unlock() }() } diff --git a/controller/crud.go b/controller/crud.go index 02242118..ce028dc0 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -716,7 +716,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( } func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, err error) { - if c.config.maxMemory() > 0 && c.outOfMemory { + if c.config.maxMemory() > 0 && c.outOfMemory.on() { err = errOOM return } diff --git a/controller/expire.go b/controller/expire.go index 186a5c77..e49538a1 100644 --- a/controller/expire.go +++ b/controller/expire.go @@ -109,7 +109,7 @@ func (c *Controller) backgroundExpiring() { rand.Seed(time.Now().UnixNano()) for { c.mu.Lock() - if c.stopBackgroundExpiring { + if c.stopBackgroundExpiring.on() { c.mu.Unlock() return } diff --git a/controller/stats.go b/controller/stats.go index 386ef06e..deb908d7 100644 --- a/controller/stats.go +++ b/controller/stats.go @@ -170,9 +170,9 @@ func (c *Controller) writeInfoPersistence(w *bytes.Buffer) { } func (c *Controller) writeInfoStats(w *bytes.Buffer) { - fmt.Fprintf(w, "total_connections_received:%d\r\n", c.statsTotalConns) // Total number of connections accepted by the server - fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands) // Total number of commands processed by the server - fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired) // Total number of key expiration events + fmt.Fprintf(w, "total_connections_received:%d\r\n", c.statsTotalConns.get()) // Total number of connections accepted by the server + fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands.get()) // Total number of commands processed by the server + fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired.get()) // Total number of key expiration events } func (c *Controller) writeInfoReplication(w *bytes.Buffer) { fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves