From 8906d8e65aa99c884edfa99c87167410da3b8596 Mon Sep 17 00:00:00 2001 From: tidwall Date: Fri, 23 Nov 2018 02:14:26 -0700 Subject: [PATCH] Refactor and consolidate variables --- internal/server/expire.go | 2 +- internal/server/live.go | 51 +++++++++++++++++++++------------------ internal/server/server.go | 29 +++++++++++----------- 3 files changed, 43 insertions(+), 39 deletions(-) diff --git a/internal/server/expire.go b/internal/server/expire.go index 39448c26..823106cc 100644 --- a/internal/server/expire.go +++ b/internal/server/expire.go @@ -105,7 +105,7 @@ func (c *Server) backgroundExpiring() { rand.Seed(time.Now().UnixNano()) var purgelist []exitem for { - if c.stopBackgroundExpiring.on() { + if c.stopServer.on() { return } now := time.Now() diff --git a/internal/server/live.go b/internal/server/live.go index b7177011..042e071e 100644 --- a/internal/server/live.go +++ b/internal/server/live.go @@ -19,16 +19,20 @@ type liveBuffer struct { cond *sync.Cond } -func (c *Server) processLives() { +func (server *Server) processLives() { + server.lcond.L.Lock() + defer server.lcond.L.Unlock() for { - c.lcond.L.Lock() - for len(c.lstack) > 0 { - item := c.lstack[0] - c.lstack = c.lstack[1:] - if len(c.lstack) == 0 { - c.lstack = nil + if server.stopServer.on() { + return + } + for len(server.lstack) > 0 { + item := server.lstack[0] + server.lstack = server.lstack[1:] + if len(server.lstack) == 0 { + server.lstack = nil } - for lb := range c.lives { + for lb := range server.lives { lb.cond.L.Lock() if lb.key != "" && lb.key == item.key { lb.details = append(lb.details, item) @@ -37,8 +41,7 @@ func (c *Server) processLives() { lb.cond.L.Unlock() } } - c.lcond.Wait() - c.lcond.L.Unlock() + server.lcond.Wait() } } @@ -68,7 +71,7 @@ func writeLiveMessage( return err } -func (c *Server) goLive( +func (server *Server) goLive( inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool, ) error { addr := conn.RemoteAddr().String() @@ -80,9 +83,9 @@ func (c *Server) goLive( default: return errors.New("invalid live type switches") case liveAOFSwitches: - return c.liveAOF(s.pos, conn, rd, msg) + return server.liveAOF(s.pos, conn, rd, msg) case liveSubscriptionSwitches: - return c.liveSubscription(conn, rd, msg, websocket) + return server.liveSubscription(conn, rd, msg, websocket) case liveFenceSwitches: // fallthrough } @@ -98,23 +101,23 @@ func (c *Server) goLive( lb.glob = s.glob lb.key = s.key lb.fence = &s - c.mu.RLock() - sw, err = c.newScanWriter( + server.mu.RLock() + sw, err = server.newScanWriter( &wr, msg, s.key, s.output, s.precision, s.glob, false, s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) - c.mu.RUnlock() + server.mu.RUnlock() // everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS if err != nil { return err } - c.lcond.L.Lock() - c.lives[lb] = true - c.lcond.L.Unlock() + server.lcond.L.Lock() + server.lives[lb] = true + server.lcond.L.Unlock() defer func() { - c.lcond.L.Lock() - delete(c.lives, lb) - c.lcond.L.Unlock() + server.lcond.L.Lock() + delete(server.lives, lb) + server.lcond.L.Unlock() conn.Close() }() @@ -181,8 +184,8 @@ func (c *Server) goLive( var msgs []string func() { // safely lock the fence because we are outside the main loop - c.mu.RLock() - defer c.mu.RUnlock() + server.mu.RLock() + defer server.mu.RUnlock() msgs = FenceMatch("", sw, fence, nil, details) }() for _, msg := range msgs { diff --git a/internal/server/server.go b/internal/server/server.go index ea4b83f3..9e208794 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -75,15 +75,13 @@ type Server struct { geomParseOpts geojson.ParseOptions // atomics - followc aint // counter increases when follow property changes - statsTotalConns aint // counter for total connections - statsTotalCommands aint // counter for total commands - statsExpired aint // item expiration counter - lastShrinkDuration aint - stopBackgroundExpiring abool - stopWatchingMemory abool - stopWatchingAutoGC abool - outOfMemory abool + followc aint // counter increases when follow property changes + statsTotalConns aint // counter for total connections + statsTotalCommands aint // counter for total commands + statsExpired aint // item expiration counter + lastShrinkDuration aint + stopServer abool + outOfMemory abool connsmu sync.RWMutex conns map[int]*Client @@ -263,10 +261,13 @@ func Serve(host string, port int, dir string, http bool) error { go server.backgroundExpiring() defer func() { // Stop background routines - server.stopBackgroundExpiring.set(true) - server.stopWatchingMemory.set(true) - server.stopWatchingAutoGC.set(true) server.followc.add(1) // this will force any follow communication to die + server.stopServer.set(true) + + // notify the live geofence connections that we are stopping. + server.lcond.L.Lock() + server.lcond.Wait() + server.lcond.L.Lock() }() // Start the network server @@ -714,7 +715,7 @@ func (server *Server) watchAutoGC() { defer t.Stop() s := time.Now() for range t.C { - if server.stopWatchingAutoGC.on() { + if server.stopServer.on() { return } autoGC := server.config.autoGC() @@ -746,7 +747,7 @@ func (server *Server) watchOutOfMemory() { var mem runtime.MemStats for range t.C { func() { - if server.stopWatchingMemory.on() { + if server.stopServer.on() { return } oom := server.outOfMemory.on()