Refactor and consolidate variables

This commit is contained in:
tidwall 2018-11-23 02:14:26 -07:00
parent 37531f9350
commit 8906d8e65a
3 changed files with 43 additions and 39 deletions

View File

@ -105,7 +105,7 @@ func (c *Server) backgroundExpiring() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
var purgelist []exitem var purgelist []exitem
for { for {
if c.stopBackgroundExpiring.on() { if c.stopServer.on() {
return return
} }
now := time.Now() now := time.Now()

View File

@ -19,16 +19,20 @@ type liveBuffer struct {
cond *sync.Cond cond *sync.Cond
} }
func (c *Server) processLives() { func (server *Server) processLives() {
server.lcond.L.Lock()
defer server.lcond.L.Unlock()
for { for {
c.lcond.L.Lock() if server.stopServer.on() {
for len(c.lstack) > 0 { return
item := c.lstack[0] }
c.lstack = c.lstack[1:] for len(server.lstack) > 0 {
if len(c.lstack) == 0 { item := server.lstack[0]
c.lstack = nil server.lstack = server.lstack[1:]
if len(server.lstack) == 0 {
server.lstack = nil
} }
for lb := range c.lives { for lb := range server.lives {
lb.cond.L.Lock() lb.cond.L.Lock()
if lb.key != "" && lb.key == item.key { if lb.key != "" && lb.key == item.key {
lb.details = append(lb.details, item) lb.details = append(lb.details, item)
@ -37,8 +41,7 @@ func (c *Server) processLives() {
lb.cond.L.Unlock() lb.cond.L.Unlock()
} }
} }
c.lcond.Wait() server.lcond.Wait()
c.lcond.L.Unlock()
} }
} }
@ -68,7 +71,7 @@ func writeLiveMessage(
return err return err
} }
func (c *Server) goLive( func (server *Server) goLive(
inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool, inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool,
) error { ) error {
addr := conn.RemoteAddr().String() addr := conn.RemoteAddr().String()
@ -80,9 +83,9 @@ func (c *Server) goLive(
default: default:
return errors.New("invalid live type switches") return errors.New("invalid live type switches")
case liveAOFSwitches: case liveAOFSwitches:
return c.liveAOF(s.pos, conn, rd, msg) return server.liveAOF(s.pos, conn, rd, msg)
case liveSubscriptionSwitches: case liveSubscriptionSwitches:
return c.liveSubscription(conn, rd, msg, websocket) return server.liveSubscription(conn, rd, msg, websocket)
case liveFenceSwitches: case liveFenceSwitches:
// fallthrough // fallthrough
} }
@ -98,23 +101,23 @@ func (c *Server) goLive(
lb.glob = s.glob lb.glob = s.glob
lb.key = s.key lb.key = s.key
lb.fence = &s lb.fence = &s
c.mu.RLock() server.mu.RLock()
sw, err = c.newScanWriter( sw, err = server.newScanWriter(
&wr, msg, s.key, s.output, s.precision, s.glob, false, &wr, msg, s.key, s.output, s.precision, s.glob, false,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields)
c.mu.RUnlock() server.mu.RUnlock()
// everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS // everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS
if err != nil { if err != nil {
return err return err
} }
c.lcond.L.Lock() server.lcond.L.Lock()
c.lives[lb] = true server.lives[lb] = true
c.lcond.L.Unlock() server.lcond.L.Unlock()
defer func() { defer func() {
c.lcond.L.Lock() server.lcond.L.Lock()
delete(c.lives, lb) delete(server.lives, lb)
c.lcond.L.Unlock() server.lcond.L.Unlock()
conn.Close() conn.Close()
}() }()
@ -181,8 +184,8 @@ func (c *Server) goLive(
var msgs []string var msgs []string
func() { func() {
// safely lock the fence because we are outside the main loop // safely lock the fence because we are outside the main loop
c.mu.RLock() server.mu.RLock()
defer c.mu.RUnlock() defer server.mu.RUnlock()
msgs = FenceMatch("", sw, fence, nil, details) msgs = FenceMatch("", sw, fence, nil, details)
}() }()
for _, msg := range msgs { for _, msg := range msgs {

View File

@ -75,15 +75,13 @@ type Server struct {
geomParseOpts geojson.ParseOptions geomParseOpts geojson.ParseOptions
// atomics // atomics
followc aint // counter increases when follow property changes followc aint // counter increases when follow property changes
statsTotalConns aint // counter for total connections statsTotalConns aint // counter for total connections
statsTotalCommands aint // counter for total commands statsTotalCommands aint // counter for total commands
statsExpired aint // item expiration counter statsExpired aint // item expiration counter
lastShrinkDuration aint lastShrinkDuration aint
stopBackgroundExpiring abool stopServer abool
stopWatchingMemory abool outOfMemory abool
stopWatchingAutoGC abool
outOfMemory abool
connsmu sync.RWMutex connsmu sync.RWMutex
conns map[int]*Client conns map[int]*Client
@ -263,10 +261,13 @@ func Serve(host string, port int, dir string, http bool) error {
go server.backgroundExpiring() go server.backgroundExpiring()
defer func() { defer func() {
// Stop background routines // Stop background routines
server.stopBackgroundExpiring.set(true)
server.stopWatchingMemory.set(true)
server.stopWatchingAutoGC.set(true)
server.followc.add(1) // this will force any follow communication to die server.followc.add(1) // this will force any follow communication to die
server.stopServer.set(true)
// notify the live geofence connections that we are stopping.
server.lcond.L.Lock()
server.lcond.Wait()
server.lcond.L.Lock()
}() }()
// Start the network server // Start the network server
@ -714,7 +715,7 @@ func (server *Server) watchAutoGC() {
defer t.Stop() defer t.Stop()
s := time.Now() s := time.Now()
for range t.C { for range t.C {
if server.stopWatchingAutoGC.on() { if server.stopServer.on() {
return return
} }
autoGC := server.config.autoGC() autoGC := server.config.autoGC()
@ -746,7 +747,7 @@ func (server *Server) watchOutOfMemory() {
var mem runtime.MemStats var mem runtime.MemStats
for range t.C { for range t.C {
func() { func() {
if server.stopWatchingMemory.on() { if server.stopServer.on() {
return return
} }
oom := server.outOfMemory.on() oom := server.outOfMemory.on()