atomic ints/bools

This commit is contained in:
Josh Baker 2017-09-30 07:29:03 -07:00
parent 99307da6ea
commit 920dc3adb6
5 changed files with 52 additions and 26 deletions

27
controller/atomic.go Normal file
View File

@ -0,0 +1,27 @@
package controller
import "sync/atomic"
type aint struct{ v int64 }
func (a *aint) add(d int) int {
return int(atomic.AddInt64(&a.v, int64(d)))
}
func (a *aint) get() int {
return int(atomic.LoadInt64(&a.v))
}
func (a *aint) set(i int) int {
return int(atomic.SwapInt64(&a.v, int64(i)))
}
type abool struct{ v int64 }
func (a *abool) on() bool {
return atomic.LoadInt64(&a.v) != 0
}
func (a *abool) set(t bool) bool {
if t {
return atomic.SwapInt64(&a.v, 1) != 0
}
return atomic.SwapInt64(&a.v, 0) != 0
}

View File

@ -99,17 +99,18 @@ type Controller struct {
epc *endpoint.EndpointManager epc *endpoint.EndpointManager
statsTotalConns int // counters
statsTotalCommands int statsTotalConns aint
statsExpired int statsTotalCommands aint
statsExpired aint
lastShrinkDuration time.Duration lastShrinkDuration time.Duration
currentShrinkStart time.Time currentShrinkStart time.Time
stopBackgroundExpiring bool stopBackgroundExpiring abool
stopWatchingMemory bool stopWatchingMemory abool
stopWatchingAutoGC bool stopWatchingAutoGC abool
outOfMemory bool outOfMemory abool
} }
// ListenAndServe starts a new tile38 server // ListenAndServe starts a new tile38 server
@ -180,12 +181,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
if err := c.loadAOF(); err != nil { if err := c.loadAOF(); err != nil {
return err return err
} }
c.mu.Lock()
c.fillExpiresList() c.fillExpiresList()
if c.config.followHost() != "" { if c.config.followHost() != "" {
go c.follow(c.config.followHost(), c.config.followPort(), c.followc) go c.follow(c.config.followHost(), c.config.followPort(), c.followc)
} }
c.mu.Unlock()
defer func() { defer func() {
c.mu.Lock() c.mu.Lock()
c.followc++ // this will force any follow communication to die c.followc++ // this will force any follow communication to die
@ -196,18 +195,16 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
go c.watchGC() go c.watchGC()
go c.backgroundExpiring() go c.backgroundExpiring()
defer func() { defer func() {
c.mu.Lock() c.stopBackgroundExpiring.set(true)
c.stopBackgroundExpiring = true c.stopWatchingMemory.set(true)
c.stopWatchingMemory = true c.stopWatchingAutoGC.set(true)
c.stopWatchingAutoGC = true
c.mu.Unlock()
}() }()
handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error {
c.mu.Lock() c.mu.Lock()
if cc, ok := c.conns[conn]; ok { if cc, ok := c.conns[conn]; ok {
cc.last = time.Now() cc.last = time.Now()
} }
c.statsTotalCommands++ c.statsTotalCommands.add(1)
c.mu.Unlock() c.mu.Unlock()
err := c.handleInputCommand(conn, msg, w) err := c.handleInputCommand(conn, msg, w)
if err != nil { if err != nil {
@ -232,6 +229,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
c.mu.RUnlock() c.mu.RUnlock()
return is return is
} }
var clientID uint64 var clientID uint64
opened := func(conn *server.Conn) { opened := func(conn *server.Conn) {
c.mu.Lock() c.mu.Lock()
@ -249,9 +247,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
opened: time.Now(), opened: time.Now(),
conn: conn, conn: conn,
} }
c.statsTotalConns++ c.statsTotalConns.add(1)
c.mu.Unlock() c.mu.Unlock()
} }
closed := func(conn *server.Conn) { closed := func(conn *server.Conn) {
c.mu.Lock() c.mu.Lock()
delete(c.conns, conn) delete(c.conns, conn)
@ -268,7 +267,7 @@ func (c *Controller) watchGC() {
for range t.C { for range t.C {
c.mu.RLock() c.mu.RLock()
if c.stopWatchingAutoGC { if c.stopWatchingAutoGC.on() {
c.mu.RUnlock() c.mu.RUnlock()
return return
} }
@ -307,16 +306,16 @@ func (c *Controller) watchMemory() {
for range t.C { for range t.C {
func() { func() {
c.mu.RLock() c.mu.RLock()
if c.stopWatchingMemory { if c.stopWatchingMemory.on() {
c.mu.RUnlock() c.mu.RUnlock()
return return
} }
oom := c.outOfMemory oom := c.outOfMemory.on()
c.mu.RUnlock() c.mu.RUnlock()
if c.config.maxMemory() == 0 { if c.config.maxMemory() == 0 {
if oom { if oom {
c.mu.Lock() c.mu.Lock()
c.outOfMemory = false c.outOfMemory.set(false)
c.mu.Unlock() c.mu.Unlock()
} }
return return
@ -326,7 +325,7 @@ func (c *Controller) watchMemory() {
} }
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
c.mu.Lock() c.mu.Lock()
c.outOfMemory = int(mem.HeapAlloc) > c.config.maxMemory() c.outOfMemory.set(int(mem.HeapAlloc) > c.config.maxMemory())
c.mu.Unlock() c.mu.Unlock()
}() }()
} }

View File

@ -716,7 +716,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) (
} }
func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, err error) { func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, err error) {
if c.config.maxMemory() > 0 && c.outOfMemory { if c.config.maxMemory() > 0 && c.outOfMemory.on() {
err = errOOM err = errOOM
return return
} }

View File

@ -109,7 +109,7 @@ func (c *Controller) backgroundExpiring() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
for { for {
c.mu.Lock() c.mu.Lock()
if c.stopBackgroundExpiring { if c.stopBackgroundExpiring.on() {
c.mu.Unlock() c.mu.Unlock()
return return
} }

View File

@ -170,9 +170,9 @@ func (c *Controller) writeInfoPersistence(w *bytes.Buffer) {
} }
func (c *Controller) writeInfoStats(w *bytes.Buffer) { func (c *Controller) writeInfoStats(w *bytes.Buffer) {
fmt.Fprintf(w, "total_connections_received:%d\r\n", c.statsTotalConns) // Total number of connections accepted by the server fmt.Fprintf(w, "total_connections_received:%d\r\n", c.statsTotalConns.get()) // Total number of connections accepted by the server
fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands) // Total number of commands processed by the server fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands.get()) // Total number of commands processed by the server
fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired) // Total number of key expiration events fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired.get()) // Total number of key expiration events
} }
func (c *Controller) writeInfoReplication(w *bytes.Buffer) { func (c *Controller) writeInfoReplication(w *bytes.Buffer) {
fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves