From 29a6d05f3f3e2e587e5e41e6da03b23b5232a5d7 Mon Sep 17 00:00:00 2001 From: tidwall Date: Thu, 9 Dec 2021 09:24:26 -0700 Subject: [PATCH] Minor refactor --- internal/server/aofshrink.go | 70 +++--- internal/server/crud.go | 104 ++++----- internal/server/live.go | 68 +++--- internal/server/scanner.go | 7 +- internal/server/search.go | 160 +++++++------- internal/server/server.go | 400 +++++++++++++++++------------------ 6 files changed, 405 insertions(+), 404 deletions(-) diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index fa0f4a6a..66dea327 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -18,25 +18,25 @@ const maxkeys = 8 const maxids = 32 const maxchunk = 4 * 1024 * 1024 -func (server *Server) aofshrink() { - if server.aof == nil { +func (s *Server) aofshrink() { + if s.aof == nil { return } start := time.Now() - server.mu.Lock() - if server.shrinking { - server.mu.Unlock() + s.mu.Lock() + if s.shrinking { + s.mu.Unlock() return } - server.shrinking = true - server.shrinklog = nil - server.mu.Unlock() + s.shrinking = true + s.shrinklog = nil + s.mu.Unlock() defer func() { - server.mu.Lock() - server.shrinking = false - server.shrinklog = nil - server.mu.Unlock() + s.mu.Lock() + s.shrinking = false + s.shrinklog = nil + s.mu.Unlock() log.Infof("aof shrink ended %v", time.Since(start)) }() @@ -59,9 +59,9 @@ func (server *Server) aofshrink() { } keysdone = true func() { - server.mu.Lock() - defer server.mu.Unlock() - server.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool { + s.mu.Lock() + defer s.mu.Unlock() + s.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool { if len(keys) == maxkeys { keysdone = false nextkey = key @@ -85,9 +85,9 @@ func (server *Server) aofshrink() { // load more objects func() { idsdone = true - server.mu.Lock() - defer server.mu.Unlock() - col := server.getCol(keys[0]) + s.mu.Lock() + defer s.mu.Unlock() + col := s.getCol(keys[0]) if col == nil { return } @@ -167,10 +167,10 @@ func (server *Server) aofshrink() { // first load the names of the hooks var hnames []string func() { - server.mu.Lock() - defer server.mu.Unlock() - hnames = make([]string, 0, server.hooks.Len()) - server.hooks.Walk(func(v []interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + hnames = make([]string, 0, s.hooks.Len()) + s.hooks.Walk(func(v []interface{}) { for _, v := range v { hnames = append(hnames, v.(*Hook).Name) } @@ -179,9 +179,9 @@ func (server *Server) aofshrink() { var hookHint btree.PathHint for _, name := range hnames { func() { - server.mu.Lock() - defer server.mu.Unlock() - hook, _ := server.hooks.GetHint(&Hook{Name: name}, &hookHint).(*Hook) + s.mu.Lock() + defer s.mu.Unlock() + hook, _ := s.hooks.GetHint(&Hook{Name: name}, &hookHint).(*Hook) if hook == nil { return } @@ -230,26 +230,26 @@ func (server *Server) aofshrink() { // finally grab any new data that may have been written since // the aofshrink has started and swap out the files. return func() error { - server.mu.Lock() - defer server.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() // kill all followers connections and close their files. This // ensures that there is only one opened AOF at a time which is // what Windows requires in order to perform the Rename function // below. - for conn, f := range server.aofconnM { + for conn, f := range s.aofconnM { conn.Close() f.Close() } // send a broadcast to all sleeping followers - server.fcond.Broadcast() + s.fcond.Broadcast() // flush the aof buffer - server.flushAOF(false) + s.flushAOF(false) aofbuf = aofbuf[:0] - for _, values := range server.shrinklog { + for _, values := range s.shrinklog { // append the values to the aof buffer aofbuf = append(aofbuf, '*') aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...) @@ -274,7 +274,7 @@ func (server *Server) aofshrink() { // anything below this point is unrecoverable. just log and exit process // back up the live aof, just in case of fatal error - if err := server.aof.Close(); err != nil { + if err := s.aof.Close(); err != nil { log.Fatalf("shrink live aof close fatal operation: %v", err) } if err := f.Close(); err != nil { @@ -286,16 +286,16 @@ func (server *Server) aofshrink() { if err := os.Rename(core.AppendFileName+"-shrink", core.AppendFileName); err != nil { log.Fatalf("shrink rename fatal operation: %v", err) } - server.aof, err = os.OpenFile(core.AppendFileName, os.O_CREATE|os.O_RDWR, 0600) + s.aof, err = os.OpenFile(core.AppendFileName, os.O_CREATE|os.O_RDWR, 0600) if err != nil { log.Fatalf("shrink openfile fatal operation: %v", err) } var n int64 - n, err = server.aof.Seek(0, 2) + n, err = s.aof.Seek(0, 2) if err != nil { log.Fatalf("shrink seek end fatal operation: %v", err) } - server.aofsz = int(n) + s.aofsz = int(n) os.Remove(core.AppendFileName + "-bak") // ignore error diff --git a/internal/server/crud.go b/internal/server/crud.go index 4d90a700..86561ad3 100644 --- a/internal/server/crud.go +++ b/internal/server/crud.go @@ -37,7 +37,7 @@ func orderFields(fmap map[string]int, farr []string, fields []float64) []fvt { } return fvs } -func (server *Server) cmdBounds(msg *Message) (resp.Value, error) { +func (s *Server) cmdBounds(msg *Message) (resp.Value, error) { start := time.Now() vs := msg.Args[1:] @@ -50,7 +50,7 @@ func (server *Server) cmdBounds(msg *Message) (resp.Value, error) { return NOMessage, errInvalidNumberOfArguments } - col := server.getCol(key) + col := s.getCol(key) if col == nil { if msg.OutputType == RESP { return resp.NullValue(), nil @@ -94,7 +94,7 @@ func (server *Server) cmdBounds(msg *Message) (resp.Value, error) { return NOMessage, nil } -func (server *Server) cmdType(msg *Message) (resp.Value, error) { +func (s *Server) cmdType(msg *Message) (resp.Value, error) { start := time.Now() vs := msg.Args[1:] @@ -104,7 +104,7 @@ func (server *Server) cmdType(msg *Message) (resp.Value, error) { return NOMessage, errInvalidNumberOfArguments } - col := server.getCol(key) + col := s.getCol(key) if col == nil { if msg.OutputType == RESP { return resp.SimpleStringValue("none"), nil @@ -123,7 +123,7 @@ func (server *Server) cmdType(msg *Message) (resp.Value, error) { return NOMessage, nil } -func (server *Server) cmdGet(msg *Message) (resp.Value, error) { +func (s *Server) cmdGet(msg *Message) (resp.Value, error) { start := time.Now() vs := msg.Args[1:] @@ -142,7 +142,7 @@ func (server *Server) cmdGet(msg *Message) (resp.Value, error) { vs = vs[1:] } - col := server.getCol(key) + col := s.getCol(key) if col == nil { if msg.OutputType == RESP { return resp.NullValue(), nil @@ -278,7 +278,7 @@ func (server *Server) cmdGet(msg *Message) (resp.Value, error) { return NOMessage, nil } -func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, err error) { start := time.Now() vs := msg.Args[1:] var ok bool @@ -295,17 +295,17 @@ func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, er return } found := false - col := server.getCol(d.key) + col := s.getCol(d.key) if col != nil { d.obj, d.fields, ok = col.Delete(d.id) if ok { if col.Count() == 0 { - server.deleteCol(d.key) + s.deleteCol(d.key) } found = true } } - server.groupDisconnectObject(d.key, d.id) + s.groupDisconnectObject(d.key, d.id) d.command = "del" d.updated = found d.timestamp = time.Now() @@ -322,7 +322,7 @@ func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, er return } -func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, err error) { start := time.Now() vs := msg.Args[1:] var ok bool @@ -353,7 +353,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e } var expired int - col := server.getCol(d.key) + col := s.getCol(d.key) if col != nil { g := glob.Parse(d.pattern, false) if g.Limits[0] == "" && g.Limits[1] == "" { @@ -370,7 +370,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e } else { d.children[i] = dc } - server.groupDisconnectObject(dc.key, dc.id) + s.groupDisconnectObject(dc.key, dc.id) } if atLeastOneNotDeleted { var nchildren []*commandDetails @@ -382,7 +382,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e d.children = nchildren } if col.Count() == 0 { - server.deleteCol(d.key) + s.deleteCol(d.key) } } d.command = "pdel" @@ -402,7 +402,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e return } -func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, err error) { start := time.Now() vs := msg.Args[1:] var ok bool @@ -414,15 +414,15 @@ func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, e err = errInvalidNumberOfArguments return } - col := server.getCol(d.key) + col := s.getCol(d.key) if col != nil { - server.deleteCol(d.key) + s.deleteCol(d.key) d.updated = true } else { d.key = "" // ignore the details d.updated = false } - server.groupDisconnectCollection(d.key) + s.groupDisconnectCollection(d.key) d.command = "drop" d.timestamp = time.Now() switch msg.OutputType { @@ -438,7 +438,7 @@ func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, e return } -func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err error) { nx := msg.Command() == "renamenx" start := time.Now() vs := msg.Args[1:] @@ -455,12 +455,12 @@ func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err = errInvalidNumberOfArguments return } - col := server.getCol(d.key) + col := s.getCol(d.key) if col == nil { err = errKeyNotFound return } - server.hooks.Ascend(nil, func(v interface{}) bool { + s.hooks.Ascend(nil, func(v interface{}) bool { h := v.(*Hook) if h.Key == d.key || h.Key == d.newKey { err = errKeyHasHooksSet @@ -469,18 +469,18 @@ func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, return true }) d.command = "rename" - newCol := server.getCol(d.newKey) + newCol := s.getCol(d.newKey) if newCol == nil { d.updated = true } else if nx { d.updated = false } else { - server.deleteCol(d.newKey) + s.deleteCol(d.newKey) d.updated = true } if d.updated { - server.deleteCol(d.key) - server.setCol(d.newKey, col) + s.deleteCol(d.key) + s.setCol(d.newKey, col) } d.timestamp = time.Now() switch msg.OutputType { @@ -498,7 +498,7 @@ func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, return } -func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails, err error) { start := time.Now() vs := msg.Args[1:] if len(vs) != 0 { @@ -507,14 +507,14 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails } // clear the entire database - server.cols = btree.NewNonConcurrent(byCollectionKey) - server.groupHooks = btree.NewNonConcurrent(byGroupHook) - server.groupObjects = btree.NewNonConcurrent(byGroupObject) - server.hookExpires = btree.NewNonConcurrent(byHookExpires) - server.hooks = btree.NewNonConcurrent(byHookName) - server.hooksOut = btree.NewNonConcurrent(byHookName) - server.hookTree = &rtree.RTree{} - server.hookCross = &rtree.RTree{} + s.cols = btree.NewNonConcurrent(byCollectionKey) + s.groupHooks = btree.NewNonConcurrent(byGroupHook) + s.groupObjects = btree.NewNonConcurrent(byGroupObject) + s.hookExpires = btree.NewNonConcurrent(byHookExpires) + s.hooks = btree.NewNonConcurrent(byHookName) + s.hooksOut = btree.NewNonConcurrent(byHookName) + s.hookTree = &rtree.RTree{} + s.hookCross = &rtree.RTree{} d.command = "flushdb" d.updated = true @@ -528,7 +528,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails return } -func (server *Server) parseSetArgs(vs []string) ( +func (s *Server) parseSetArgs(vs []string) ( d commandDetails, fields []string, values []float64, xx, nx bool, ex int64, etype []byte, evs []string, err error, @@ -737,7 +737,7 @@ func (server *Server) parseSetArgs(vs []string) ( err = errInvalidNumberOfArguments return } - d.obj, err = geojson.Parse(object, &server.geomParseOpts) + d.obj, err = geojson.Parse(object, &s.geomParseOpts) if err != nil { return } @@ -748,8 +748,8 @@ func (server *Server) parseSetArgs(vs []string) ( return } -func (server *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, err error) { - if server.config.maxMemory() > 0 && server.outOfMemory.on() { +func (s *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, err error) { + if s.config.maxMemory() > 0 && s.outOfMemory.on() { err = errOOM return } @@ -760,17 +760,17 @@ func (server *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, er var values []float64 var xx, nx bool var ex int64 - d, fields, values, xx, nx, ex, _, _, err = server.parseSetArgs(vs) + d, fields, values, xx, nx, ex, _, _, err = s.parseSetArgs(vs) if err != nil { return } - col := server.getCol(d.key) + col := s.getCol(d.key) if col == nil { if xx { goto notok } col = collection.New() - server.setCol(d.key, col) + s.setCol(d.key, col) } if xx || nx { _, _, _, ok := col.Get(d.id) @@ -817,7 +817,7 @@ notok: return } -func (server *Server) parseFSetArgs(vs []string) ( +func (s *Server) parseFSetArgs(vs []string) ( d commandDetails, fields []string, values []float64, xx bool, err error, ) { var ok bool @@ -860,8 +860,8 @@ func (server *Server) parseFSetArgs(vs []string) ( return } -func (server *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, err error) { - if server.config.maxMemory() > 0 && server.outOfMemory.on() { +func (s *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, err error) { + if s.config.maxMemory() > 0 && s.outOfMemory.on() { err = errOOM return } @@ -871,9 +871,9 @@ func (server *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, e var values []float64 var xx bool var updateCount int - d, fields, values, xx, err = server.parseFSetArgs(vs) + d, fields, values, xx, err = s.parseFSetArgs(vs) - col := server.getCol(d.key) + col := s.getCol(d.key) if col == nil { err = errKeyNotFound return @@ -904,7 +904,7 @@ func (server *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, e return } -func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, err error) { start := time.Now() vs := msg.Args[1:] var key, id, svalue string @@ -932,7 +932,7 @@ func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, return } ok = false - col := server.getCol(key) + col := s.getCol(key) if col != nil { ex := time.Now().Add(time.Duration(float64(time.Second) * value)).UnixNano() ok = col.SetExpires(id, ex) @@ -957,7 +957,7 @@ func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, return } -func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails, err error) { +func (s *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails, err error) { start := time.Now() vs := msg.Args[1:] var key, id string @@ -976,7 +976,7 @@ func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails } var cleared bool ok = false - col := server.getCol(key) + col := s.getCol(key) if col != nil { var ex int64 _, _, ex, ok = col.Get(id) @@ -1009,7 +1009,7 @@ func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails return } -func (server *Server) cmdTTL(msg *Message) (res resp.Value, err error) { +func (s *Server) cmdTTL(msg *Message) (res resp.Value, err error) { start := time.Now() vs := msg.Args[1:] var key, id string @@ -1029,7 +1029,7 @@ func (server *Server) cmdTTL(msg *Message) (res resp.Value, err error) { var v float64 ok = false var ok2 bool - col := server.getCol(key) + col := s.getCol(key) if col != nil { var ex int64 _, _, ex, ok = col.Get(id) diff --git a/internal/server/live.go b/internal/server/live.go index a49bb4a2..90d0a116 100644 --- a/internal/server/live.go +++ b/internal/server/live.go @@ -20,20 +20,20 @@ type liveBuffer struct { cond *sync.Cond } -func (server *Server) processLives() { - server.lcond.L.Lock() - defer server.lcond.L.Unlock() +func (s *Server) processLives() { + s.lcond.L.Lock() + defer s.lcond.L.Unlock() for { - if server.stopServer.on() { + if s.stopServer.on() { return } - for len(server.lstack) > 0 { - item := server.lstack[0] - server.lstack = server.lstack[1:] - if len(server.lstack) == 0 { - server.lstack = nil + for len(s.lstack) > 0 { + item := s.lstack[0] + s.lstack = s.lstack[1:] + if len(s.lstack) == 0 { + s.lstack = nil } - for lb := range server.lives { + for lb := range s.lives { lb.cond.L.Lock() if lb.key != "" && lb.key == item.key { lb.details = append(lb.details, item) @@ -42,7 +42,7 @@ func (server *Server) processLives() { lb.cond.L.Unlock() } } - server.lcond.Wait() + s.lcond.Wait() } } @@ -72,7 +72,7 @@ func writeLiveMessage( return err } -func (server *Server) goLive( +func (s *Server) goLive( inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool, ) error { addr := conn.RemoteAddr().String() @@ -80,15 +80,15 @@ func (server *Server) goLive( defer func() { log.Info("not live " + addr) }() - switch s := inerr.(type) { + switch lfs := inerr.(type) { default: return errors.New("invalid live type switches") case liveAOFSwitches: - return server.liveAOF(s.pos, conn, rd, msg) + return s.liveAOF(lfs.pos, conn, rd, msg) case liveSubscriptionSwitches: - return server.liveSubscription(conn, rd, msg, websocket) + return s.liveSubscription(conn, rd, msg, websocket) case liveMonitorSwitches: - return server.liveMonitor(conn, rd, msg) + return s.liveMonitor(conn, rd, msg) case liveFenceSwitches: // fallthrough } @@ -100,27 +100,27 @@ func (server *Server) goLive( var err error var sw *scanWriter var wr bytes.Buffer - s := inerr.(liveFenceSwitches) - lb.glob = s.glob - lb.key = s.key - lb.fence = &s - server.mu.RLock() - sw, err = server.newScanWriter( - &wr, msg, s.key, s.output, s.precision, s.glob, false, - s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) - server.mu.RUnlock() + lfs := inerr.(liveFenceSwitches) + lb.glob = lfs.glob + lb.key = lfs.key + lb.fence = &lfs + s.mu.RLock() + sw, err = s.newScanWriter( + &wr, msg, lfs.key, lfs.output, lfs.precision, lfs.glob, false, + lfs.cursor, lfs.limit, lfs.wheres, lfs.whereins, lfs.whereevals, lfs.nofields) + s.mu.RUnlock() // everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS if err != nil { return err } - server.lcond.L.Lock() - server.lives[lb] = true - server.lcond.L.Unlock() + s.lcond.L.Lock() + s.lives[lb] = true + s.lcond.L.Unlock() defer func() { - server.lcond.L.Lock() - delete(server.lives, lb) - server.lcond.L.Unlock() + s.lcond.L.Lock() + delete(s.lives, lb) + s.lcond.L.Unlock() conn.Close() }() @@ -187,8 +187,8 @@ func (server *Server) goLive( var msgs []string func() { // safely lock the fence because we are outside the main loop - server.mu.RLock() - defer server.mu.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() msgs = FenceMatch("", sw, fence, nil, details) }() for _, msg := range msgs { @@ -196,7 +196,7 @@ func (server *Server) goLive( return nil // nil return is fine here } } - server.statsTotalMsgsSent.add(len(msgs)) + s.statsTotalMsgsSent.add(len(msgs)) lb.cond.L.Lock() } diff --git a/internal/server/scanner.go b/internal/server/scanner.go index d6b7955b..7f73a649 100644 --- a/internal/server/scanner.go +++ b/internal/server/scanner.go @@ -76,7 +76,8 @@ type ScanWriterParams struct { func (s *Server) newScanWriter( wr *bytes.Buffer, msg *Message, key string, output outputT, precision uint64, globPattern string, matchValues bool, - cursor, limit uint64, wheres []whereT, whereins []whereinT, whereevals []whereevalT, nofields bool, + cursor, limit uint64, wheres []whereT, whereins []whereinT, + whereevals []whereevalT, nofields bool, ) ( *scanWriter, error, ) { @@ -96,12 +97,12 @@ func (s *Server) newScanWriter( s: s, wr: wr, msg: msg, - cursor: cursor, limit: limit, - whereevals: whereevals, + cursor: cursor, output: output, nofields: nofields, precision: precision, + whereevals: whereevals, globPattern: globPattern, matchValues: matchValues, } diff --git a/internal/server/search.go b/internal/server/search.go index a62f89ca..e9318b9d 100644 --- a/internal/server/search.go +++ b/internal/server/search.go @@ -42,18 +42,18 @@ type roamMatch struct { meters float64 } -func (s liveFenceSwitches) Error() string { +func (lfs liveFenceSwitches) Error() string { return goingLive } -func (s liveFenceSwitches) Close() { - for _, whereeval := range s.whereevals { +func (lfs liveFenceSwitches) Close() { + for _, whereeval := range lfs.whereevals { whereeval.Close() } } -func (s liveFenceSwitches) usingLua() bool { - return len(s.whereevals) > 0 +func (lfs liveFenceSwitches) usingLua() bool { + return len(lfs.whereevals) > 0 } func parseRectArea(ltyp string, vs []string) (nvs []string, rect *geojson.Rect, err error) { @@ -169,29 +169,29 @@ func parseRectArea(ltyp string, vs []string) (nvs []string, rect *geojson.Rect, return } -func (server *Server) cmdSearchArgs( +func (s *Server) cmdSearchArgs( fromFenceCmd bool, cmd string, vs []string, types []string, -) (s liveFenceSwitches, err error) { +) (lfs liveFenceSwitches, err error) { var t searchScanBaseTokens if fromFenceCmd { t.fence = true } - vs, t, err = server.parseSearchScanBaseTokens(cmd, t, vs) + vs, t, err = s.parseSearchScanBaseTokens(cmd, t, vs) if err != nil { return } - s.searchScanBaseTokens = t + lfs.searchScanBaseTokens = t var typ string var ok bool if vs, typ, ok = tokenval(vs); !ok || typ == "" { err = errInvalidNumberOfArguments return } - if s.searchScanBaseTokens.output == outputBounds { + if lfs.searchScanBaseTokens.output == outputBounds { if cmd == "within" || cmd == "intersects" { if _, err := strconv.ParseFloat(typ, 64); err == nil { // It's likely that the output was not specified, but rather the search bounds. - s.searchScanBaseTokens.output = defaultSearchOutput + lfs.searchScanBaseTokens.output = defaultSearchOutput vs = append([]string{typ}, vs...) typ = "BOUNDS" } @@ -205,7 +205,7 @@ func (server *Server) cmdSearchArgs( break } } - if !found && s.searchScanBaseTokens.fence && ltyp == "roam" && cmd == "nearby" { + if !found && lfs.searchScanBaseTokens.fence && ltyp == "roam" && cmd == "nearby" { // allow roaming for nearby fence searches. found = true } @@ -217,7 +217,7 @@ func (server *Server) cmdSearchArgs( case "point": fallthrough case "circle": - if s.clip { + if lfs.clip { err = errInvalidArgument("cannot clip with " + ltyp) return } @@ -267,9 +267,9 @@ func (server *Server) cmdSearchArgs( return } } - s.obj = geojson.NewCircle(geometry.Point{X: lon, Y: lat}, meters, defaultCircleSteps) + lfs.obj = geojson.NewCircle(geometry.Point{X: lon, Y: lat}, meters, defaultCircleSteps) case "object": - if s.clip { + if lfs.clip { err = errInvalidArgument("cannot clip with object") return } @@ -278,12 +278,12 @@ func (server *Server) cmdSearchArgs( err = errInvalidNumberOfArguments return } - s.obj, err = geojson.Parse(obj, &server.geomParseOpts) + lfs.obj, err = geojson.Parse(obj, &s.geomParseOpts) if err != nil { return } case "sector": - if s.clip { + if lfs.clip { err = errInvalidArgument("cannot clip with " + ltyp) return } @@ -338,17 +338,17 @@ func (server *Server) cmdSearchArgs( origin := sectr.Point{Lng: lon, Lat: lat} sector := sectr.NewSector(origin, meters, b1, b2) - s.obj, err = geojson.Parse(string(sector.JSON()), &server.geomParseOpts) + lfs.obj, err = geojson.Parse(string(sector.JSON()), &s.geomParseOpts) if err != nil { return } case "bounds", "hash", "tile", "quadkey": - vs, s.obj, err = parseRectArea(ltyp, vs) + vs, lfs.obj, err = parseRectArea(ltyp, vs) if err != nil { return } case "get": - if s.clip { + if lfs.clip { err = errInvalidArgument("cannot clip with get") } var key, id string @@ -360,33 +360,33 @@ func (server *Server) cmdSearchArgs( err = errInvalidNumberOfArguments return } - col := server.getCol(key) + col := s.getCol(key) if col == nil { err = errKeyNotFound return } - s.obj, _, _, ok = col.Get(id) + lfs.obj, _, _, ok = col.Get(id) if !ok { err = errIDNotFound return } case "roam": - s.roam.on = true - if vs, s.roam.key, ok = tokenval(vs); !ok || s.roam.key == "" { + lfs.roam.on = true + if vs, lfs.roam.key, ok = tokenval(vs); !ok || lfs.roam.key == "" { err = errInvalidNumberOfArguments return } - if vs, s.roam.id, ok = tokenval(vs); !ok || s.roam.id == "" { + if vs, lfs.roam.id, ok = tokenval(vs); !ok || lfs.roam.id == "" { err = errInvalidNumberOfArguments return } - s.roam.pattern = glob.IsGlob(s.roam.id) + lfs.roam.pattern = glob.IsGlob(lfs.roam.id) var smeters string if vs, smeters, ok = tokenval(vs); !ok || smeters == "" { err = errInvalidNumberOfArguments return } - if s.roam.meters, err = strconv.ParseFloat(smeters, 64); err != nil { + if lfs.roam.meters, err = strconv.ParseFloat(smeters, 64); err != nil { err = errInvalidArgument(smeters) return } @@ -400,7 +400,7 @@ func (server *Server) cmdSearchArgs( err = errInvalidNumberOfArguments return } - s.roam.scan = scan + lfs.roam.scan = scan } } @@ -430,7 +430,7 @@ func (server *Server) cmdSearchArgs( if err != nil { return } - s.obj = clip.Clip(s.obj, clip_rect, &server.geomIndexOpts) + lfs.obj = clip.Clip(lfs.obj, clip_rect, &s.geomIndexOpts) default: err = errInvalidArgument("cannot clipby " + ltok) return @@ -443,13 +443,13 @@ var nearbyTypes = []string{"point"} var withinOrIntersectsTypes = []string{ "geo", "bounds", "hash", "tile", "quadkey", "get", "object", "circle", "sector"} -func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { +func (s *Server) cmdNearby(msg *Message) (res resp.Value, err error) { start := time.Now() vs := msg.Args[1:] wr := &bytes.Buffer{} - s, err := server.cmdSearchArgs(false, "nearby", vs, nearbyTypes) - if s.usingLua() { - defer s.Close() + sargs, err := s.cmdSearchArgs(false, "nearby", vs, nearbyTypes) + if sargs.usingLua() { + defer sargs.Close() defer func() { if r := recover(); r != nil { res = NOMessage @@ -461,13 +461,13 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { if err != nil { return NOMessage, err } - s.cmd = "nearby" - if s.fence { - return NOMessage, s + sargs.cmd = "nearby" + if sargs.fence { + return NOMessage, sargs } - sw, err := server.newScanWriter( - wr, msg, s.key, s.output, s.precision, s.glob, false, - s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) + sw, err := s.newScanWriter( + wr, msg, sargs.key, sargs.output, sargs.precision, sargs.glob, false, + sargs.cursor, sargs.limit, sargs.wheres, sargs.whereins, sargs.whereevals, sargs.nofields) if err != nil { return NOMessage, err } @@ -482,14 +482,14 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { o: o, fields: fields, distance: meters, - distOutput: s.distance, + distOutput: sargs.distance, noLock: true, ignoreGlobMatch: true, skipTesting: true, }) } - maxDist := s.obj.(*geojson.Circle).Meters() - if s.sparse > 0 { + maxDist := sargs.obj.(*geojson.Circle).Meters() + if sargs.sparse > 0 { if maxDist < 0 { // error cannot use SPARSE and KNN together return NOMessage, @@ -498,24 +498,24 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { // An intersects operation is required for SPARSE iter := func(id string, o geojson.Object, fields []float64) bool { var meters float64 - if s.distance { - meters = o.Distance(s.obj) + if sargs.distance { + meters = o.Distance(sargs.obj) } return iterStep(id, o, fields, meters) } - sw.col.Intersects(s.obj, s.sparse, sw, msg.Deadline, iter) + sw.col.Intersects(sargs.obj, sargs.sparse, sw, msg.Deadline, iter) } else { iter := func(id string, o geojson.Object, fields []float64, dist float64) bool { if maxDist > 0 && dist > maxDist { return false } var meters float64 - if s.distance { + if sargs.distance { meters = dist } return iterStep(id, o, fields, meters) } - sw.col.Nearby(s.obj, sw, msg.Deadline, iter) + sw.col.Nearby(sargs.obj, sw, msg.Deadline, iter) } } sw.writeFoot() @@ -526,22 +526,22 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { return sw.respOut, nil } -func (server *Server) cmdWithin(msg *Message) (res resp.Value, err error) { - return server.cmdWithinOrIntersects("within", msg) +func (s *Server) cmdWithin(msg *Message) (res resp.Value, err error) { + return s.cmdWithinOrIntersects("within", msg) } -func (server *Server) cmdIntersects(msg *Message) (res resp.Value, err error) { - return server.cmdWithinOrIntersects("intersects", msg) +func (s *Server) cmdIntersects(msg *Message) (res resp.Value, err error) { + return s.cmdWithinOrIntersects("intersects", msg) } -func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.Value, err error) { +func (s *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.Value, err error) { start := time.Now() vs := msg.Args[1:] wr := &bytes.Buffer{} - s, err := server.cmdSearchArgs(false, cmd, vs, withinOrIntersectsTypes) - if s.usingLua() { - defer s.Close() + sargs, err := s.cmdSearchArgs(false, cmd, vs, withinOrIntersectsTypes) + if sargs.usingLua() { + defer sargs.Close() defer func() { if r := recover(); r != nil { res = NOMessage @@ -553,13 +553,13 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. if err != nil { return NOMessage, err } - s.cmd = cmd - if s.fence { - return NOMessage, s + sargs.cmd = cmd + if sargs.fence { + return NOMessage, sargs } - sw, err := server.newScanWriter( - wr, msg, s.key, s.output, s.precision, s.glob, false, - s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) + sw, err := s.newScanWriter( + wr, msg, sargs.key, sargs.output, sargs.precision, sargs.glob, false, + sargs.cursor, sargs.limit, sargs.wheres, sargs.whereins, sargs.whereevals, sargs.nofields) if err != nil { return NOMessage, err } @@ -569,7 +569,7 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. sw.writeHead() if sw.col != nil { if cmd == "within" { - sw.col.Within(s.obj, s.sparse, sw, msg.Deadline, func( + sw.col.Within(sargs.obj, sargs.sparse, sw, msg.Deadline, func( id string, o geojson.Object, fields []float64, ) bool { return sw.writeObject(ScanWriterParams{ @@ -580,7 +580,7 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. }) }) } else if cmd == "intersects" { - sw.col.Intersects(s.obj, s.sparse, sw, msg.Deadline, func( + sw.col.Intersects(sargs.obj, sargs.sparse, sw, msg.Deadline, func( id string, o geojson.Object, fields []float64, @@ -591,8 +591,8 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. fields: fields, noLock: true, } - if s.clip { - params.clip = s.obj + if sargs.clip { + params.clip = sargs.obj } return sw.writeObject(params) }) @@ -606,15 +606,15 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp. return sw.respOut, nil } -func (server *Server) cmdSeachValuesArgs(vs []string) ( - s liveFenceSwitches, err error, +func (s *Server) cmdSeachValuesArgs(vs []string) ( + lfs liveFenceSwitches, err error, ) { var t searchScanBaseTokens - vs, t, err = server.parseSearchScanBaseTokens("search", t, vs) + vs, t, err = s.parseSearchScanBaseTokens("search", t, vs) if err != nil { return } - s.searchScanBaseTokens = t + lfs.searchScanBaseTokens = t if len(vs) != 0 { err = errInvalidNumberOfArguments return @@ -622,14 +622,14 @@ func (server *Server) cmdSeachValuesArgs(vs []string) ( return } -func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) { +func (s *Server) cmdSearch(msg *Message) (res resp.Value, err error) { start := time.Now() vs := msg.Args[1:] wr := &bytes.Buffer{} - s, err := server.cmdSeachValuesArgs(vs) - if s.usingLua() { - defer s.Close() + sargs, err := s.cmdSeachValuesArgs(vs) + if sargs.usingLua() { + defer sargs.Close() defer func() { if r := recover(); r != nil { res = NOMessage @@ -641,9 +641,9 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) { if err != nil { return NOMessage, err } - sw, err := server.newScanWriter( - wr, msg, s.key, s.output, s.precision, s.glob, true, - s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) + sw, err := s.newScanWriter( + wr, msg, sargs.key, sargs.output, sargs.precision, sargs.glob, true, + sargs.cursor, sargs.limit, sargs.wheres, sargs.whereins, sargs.whereevals, sargs.nofields) if err != nil { return NOMessage, err } @@ -653,15 +653,15 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) { sw.writeHead() if sw.col != nil { if sw.output == outputCount && len(sw.wheres) == 0 && sw.globEverything { - count := sw.col.Count() - int(s.cursor) + count := sw.col.Count() - int(sargs.cursor) if count < 0 { count = 0 } sw.count = uint64(count) } else { - g := glob.Parse(sw.globPattern, s.desc) + g := glob.Parse(sw.globPattern, sargs.desc) if g.Limits[0] == "" && g.Limits[1] == "" { - sw.col.SearchValues(s.desc, sw, msg.Deadline, + sw.col.SearchValues(sargs.desc, sw, msg.Deadline, func(id string, o geojson.Object, fields []float64) bool { return sw.writeObject(ScanWriterParams{ id: id, @@ -675,7 +675,7 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) { // must disable globSingle for string value type matching because // globSingle is only for ID matches, not values. sw.globSingle = false - sw.col.SearchValuesRange(g.Limits[0], g.Limits[1], s.desc, sw, + sw.col.SearchValuesRange(g.Limits[0], g.Limits[1], sargs.desc, sw, msg.Deadline, func(id string, o geojson.Object, fields []float64) bool { return sw.writeObject(ScanWriterParams{ diff --git a/internal/server/server.go b/internal/server/server.go index f5446530..2bbaaa81 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -155,8 +155,8 @@ func Serve(opts Options) error { } log.Infof("Server started, Tile38 version %s, git %s", core.Version, core.GitSHA) - // Initialize the server - server := &Server{ + // Initialize the s + s := &Server{ unix: opts.UnixSocketPath, host: opts.Host, port: opts.Port, @@ -182,42 +182,42 @@ func Serve(opts Options) error { hookExpires: btree.NewNonConcurrent(byHookExpires), } - server.epc = endpoint.NewManager(server) - server.luascripts = server.newScriptMap() - server.luapool = server.newPool() - defer server.luapool.Shutdown() + s.epc = endpoint.NewManager(s) + s.luascripts = s.newScriptMap() + s.luapool = s.newPool() + defer s.luapool.Shutdown() if err := os.MkdirAll(opts.Dir, 0700); err != nil { return err } var err error - server.config, err = loadConfig(filepath.Join(opts.Dir, "config")) + s.config, err = loadConfig(filepath.Join(opts.Dir, "config")) if err != nil { return err } // Send "500 Internal Server" error instead of "200 OK" for json responses // with `"ok":false`. T38HTTP500ERRORS=1 - server.http500Errors, _ = strconv.ParseBool(os.Getenv("T38HTTP500ERRORS")) + s.http500Errors, _ = strconv.ParseBool(os.Getenv("T38HTTP500ERRORS")) // Allow for geometry indexing options through environment variables: // T38IDXGEOMKIND -- None, RTree, QuadTree // T38IDXGEOM -- Min number of points in a geometry for indexing. // T38IDXMULTI -- Min number of object in a Multi/Collection for indexing. - server.geomParseOpts = *geojson.DefaultParseOptions - server.geomIndexOpts = *geometry.DefaultIndexOptions + s.geomParseOpts = *geojson.DefaultParseOptions + s.geomIndexOpts = *geometry.DefaultIndexOptions n, err := strconv.ParseUint(os.Getenv("T38IDXGEOM"), 10, 32) if err == nil { - server.geomParseOpts.IndexGeometry = int(n) - server.geomIndexOpts.MinPoints = int(n) + s.geomParseOpts.IndexGeometry = int(n) + s.geomIndexOpts.MinPoints = int(n) } n, err = strconv.ParseUint(os.Getenv("T38IDXMULTI"), 10, 32) if err == nil { - server.geomParseOpts.IndexChildren = int(n) + s.geomParseOpts.IndexChildren = int(n) } requireValid := os.Getenv("REQUIREVALID") if requireValid != "" { - server.geomParseOpts.RequireValid = true + s.geomParseOpts.RequireValid = true } indexKind := os.Getenv("T38IDXGEOMKIND") switch indexKind { @@ -225,31 +225,31 @@ func Serve(opts Options) error { log.Errorf("Unknown index kind: %s", indexKind) case "": case "None": - server.geomParseOpts.IndexGeometryKind = geometry.None - server.geomIndexOpts.Kind = geometry.None + s.geomParseOpts.IndexGeometryKind = geometry.None + s.geomIndexOpts.Kind = geometry.None case "RTree": - server.geomParseOpts.IndexGeometryKind = geometry.RTree - server.geomIndexOpts.Kind = geometry.RTree + s.geomParseOpts.IndexGeometryKind = geometry.RTree + s.geomIndexOpts.Kind = geometry.RTree case "QuadTree": - server.geomParseOpts.IndexGeometryKind = geometry.QuadTree - server.geomIndexOpts.Kind = geometry.QuadTree + s.geomParseOpts.IndexGeometryKind = geometry.QuadTree + s.geomIndexOpts.Kind = geometry.QuadTree } - if server.geomParseOpts.IndexGeometryKind == geometry.None { + if s.geomParseOpts.IndexGeometryKind == geometry.None { log.Debugf("Geom indexing: %s", - server.geomParseOpts.IndexGeometryKind, + s.geomParseOpts.IndexGeometryKind, ) } else { log.Debugf("Geom indexing: %s (%d points)", - server.geomParseOpts.IndexGeometryKind, - server.geomParseOpts.IndexGeometry, + s.geomParseOpts.IndexGeometryKind, + s.geomParseOpts.IndexGeometry, ) } - log.Debugf("Multi indexing: RTree (%d points)", server.geomParseOpts.IndexChildren) + log.Debugf("Multi indexing: RTree (%d points)", s.geomParseOpts.IndexChildren) nerr := make(chan error) go func() { // Start the server in the background - nerr <- server.netServe() + nerr <- s.netServe() }() // Load the queue before the aof @@ -276,9 +276,9 @@ func Serve(opts Options) error { return err } - server.qdb = qdb - server.qidx = qidx - if err := server.migrateAOF(); err != nil { + s.qdb = qdb + s.qidx = qidx + if err := s.migrateAOF(); err != nil { return err } if core.AppendOnly { @@ -286,75 +286,75 @@ func Serve(opts Options) error { if err != nil { return err } - server.aof = f - if err := server.loadAOF(); err != nil { + s.aof = f + if err := s.loadAOF(); err != nil { return err } defer func() { - server.flushAOF(false) - server.aof.Sync() + s.flushAOF(false) + s.aof.Sync() }() } // Start background routines - if server.config.followHost() != "" { - go server.follow(server.config.followHost(), server.config.followPort(), - server.followc.get()) + if s.config.followHost() != "" { + go s.follow(s.config.followHost(), s.config.followPort(), + s.followc.get()) } if opts.MetricsAddr != "" { log.Infof("Listening for metrics at: %s", opts.MetricsAddr) go func() { - http.HandleFunc("/", server.MetricsIndexHandler) - http.HandleFunc("/metrics", server.MetricsHandler) + http.HandleFunc("/", s.MetricsIndexHandler) + http.HandleFunc("/metrics", s.MetricsHandler) log.Fatal(http.ListenAndServe(opts.MetricsAddr, nil)) }() } - go server.processLives() - go server.watchOutOfMemory() - go server.watchLuaStatePool() - go server.watchAutoGC() - go server.backgroundExpiring() - go server.backgroundSyncAOF() + go s.processLives() + go s.watchOutOfMemory() + go s.watchLuaStatePool() + go s.watchAutoGC() + go s.backgroundExpiring() + go s.backgroundSyncAOF() defer func() { // Stop background routines - server.followc.add(1) // this will force any follow communication to die - server.stopServer.set(true) + s.followc.add(1) // this will force any follow communication to die + s.stopServer.set(true) // notify the live geofence connections that we are stopping. - server.lcond.L.Lock() - server.lcond.Wait() - server.lcond.L.Lock() + s.lcond.L.Lock() + s.lcond.Wait() + s.lcond.L.Lock() }() // Server is now loaded and ready. Wait for network error messages. - server.loadedAndReady.set(true) + s.loadedAndReady.set(true) return <-nerr } -func (server *Server) isProtected() bool { +func (s *Server) isProtected() bool { if core.ProtectedMode == "no" { // --protected-mode no return false } - if server.host != "" && server.host != "127.0.0.1" && - server.host != "::1" && server.host != "localhost" { + if s.host != "" && s.host != "127.0.0.1" && + s.host != "::1" && s.host != "localhost" { // -h address return false } - is := server.config.protectedMode() != "no" && server.config.requirePass() == "" + is := s.config.protectedMode() != "no" && s.config.requirePass() == "" return is } -func (server *Server) netServe() error { +func (s *Server) netServe() error { var ln net.Listener var err error - if server.unix != "" { - os.RemoveAll(server.unix) - ln, err = net.Listen("unix", server.unix) + if s.unix != "" { + os.RemoveAll(s.unix) + ln, err = net.Listen("unix", s.unix) } else { - tcpAddr := fmt.Sprintf("%s:%d", server.host, server.port) + tcpAddr := fmt.Sprintf("%s:%d", s.host, s.port) ln, err = net.Listen("tcp", tcpAddr) } if err != nil { @@ -378,17 +378,17 @@ func (server *Server) netServe() error { client.remoteAddr = conn.RemoteAddr().String() // add client to server map - server.connsmu.Lock() - server.conns[client.id] = client - server.connsmu.Unlock() - server.statsTotalConns.add(1) + s.connsmu.Lock() + s.conns[client.id] = client + s.connsmu.Unlock() + s.statsTotalConns.add(1) // set the client keep-alive, if needed - if server.config.keepAlive() > 0 { + if s.config.keepAlive() > 0 { if conn, ok := conn.(*net.TCPConn); ok { conn.SetKeepAlive(true) conn.SetKeepAlivePeriod( - time.Duration(server.config.keepAlive()) * time.Second, + time.Duration(s.config.keepAlive()) * time.Second, ) } } @@ -397,9 +397,9 @@ func (server *Server) netServe() error { defer func() { // close connection // delete from server map - server.connsmu.Lock() - delete(server.conns, client.id) - server.connsmu.Unlock() + s.connsmu.Lock() + delete(s.conns, client.id) + s.connsmu.Unlock() log.Debugf("Closed connection: %s", client.remoteAddr) conn.Close() }() @@ -410,7 +410,7 @@ func (server *Server) netServe() error { // check if the connection is protected if !strings.HasPrefix(client.remoteAddr, "127.0.0.1:") && !strings.HasPrefix(client.remoteAddr, "[::1]:") { - if server.isProtected() { + if s.isProtected() { // This is a protected server. Only loopback is allowed. conn.Write(deniedMessage) return // close connection @@ -437,7 +437,7 @@ func (server *Server) netServe() error { for _, msg := range msgs { // Just closing connection if we have deprecated HTTP or WS connection, // And --http-transport = false - if !server.http && (msg.ConnType == WebSocket || + if !s.http && (msg.ConnType == WebSocket || msg.ConnType == HTTP) { close = true // close connection break @@ -460,10 +460,10 @@ func (server *Server) netServe() error { client.mu.Unlock() // update total command count - server.statsTotalCommands.add(1) + s.statsTotalCommands.add(1) // handle the command - err := server.handleInputCommand(client, msg) + err := s.handleInputCommand(client, msg) if err != nil { if err.Error() == goingLive { client.goLiveErr = err @@ -484,7 +484,7 @@ func (server *Server) netServe() error { wg.Add(1) go func() { defer wg.Done() - err := server.goLive( + err := s.goLive( client.goLiveErr, &liveConn{conn.RemoteAddr(), rwc}, &client.pr, @@ -520,14 +520,14 @@ func (server *Server) netServe() error { // write to client if len(client.out) > 0 { - if atomic.LoadInt32(&server.aofdirty) != 0 { + if atomic.LoadInt32(&s.aofdirty) != 0 { func() { // prewrite - server.mu.Lock() - defer server.mu.Unlock() - server.flushAOF(false) + s.mu.Lock() + defer s.mu.Unlock() + s.flushAOF(false) }() - atomic.StoreInt32(&server.aofdirty, 0) + atomic.StoreInt32(&s.aofdirty, 0) } conn.Write(client.out) client.out = nil @@ -592,19 +592,19 @@ func (conn *liveConn) SetWriteDeadline(deadline time.Time) error { panic("not supported") } -func (server *Server) watchAutoGC() { +func (s *Server) watchAutoGC() { t := time.NewTicker(time.Second) defer t.Stop() - s := time.Now() + start := time.Now() for range t.C { - if server.stopServer.on() { + if s.stopServer.on() { return } - autoGC := server.config.autoGC() + autoGC := s.config.autoGC() if autoGC == 0 { continue } - if time.Since(s) < time.Second*time.Duration(autoGC) { + if time.Since(start) < time.Second*time.Duration(autoGC) { continue } var mem1, mem2 runtime.MemStats @@ -619,23 +619,23 @@ func (server *Server) watchAutoGC() { log.Debugf("autogc(after): "+ "alloc: %v, heap_alloc: %v, heap_released: %v", mem2.Alloc, mem2.HeapAlloc, mem2.HeapReleased) - s = time.Now() + start = time.Now() } } -func (server *Server) watchOutOfMemory() { +func (s *Server) watchOutOfMemory() { t := time.NewTicker(time.Second * 2) defer t.Stop() var mem runtime.MemStats for range t.C { func() { - if server.stopServer.on() { + if s.stopServer.on() { return } - oom := server.outOfMemory.on() - if server.config.maxMemory() == 0 { + oom := s.outOfMemory.on() + if s.config.maxMemory() == 0 { if oom { - server.outOfMemory.set(false) + s.outOfMemory.set(false) } return } @@ -643,33 +643,33 @@ func (server *Server) watchOutOfMemory() { runtime.GC() } runtime.ReadMemStats(&mem) - server.outOfMemory.set(int(mem.HeapAlloc) > server.config.maxMemory()) + s.outOfMemory.set(int(mem.HeapAlloc) > s.config.maxMemory()) }() } } -func (server *Server) watchLuaStatePool() { +func (s *Server) watchLuaStatePool() { t := time.NewTicker(time.Second * 10) defer t.Stop() for range t.C { func() { - server.luapool.Prune() + s.luapool.Prune() }() } } // backgroundSyncAOF ensures that the aof buffer is does not grow too big. -func (server *Server) backgroundSyncAOF() { +func (s *Server) backgroundSyncAOF() { t := time.NewTicker(time.Second) defer t.Stop() for range t.C { - if server.stopServer.on() { + if s.stopServer.on() { return } func() { - server.mu.Lock() - defer server.mu.Unlock() - server.flushAOF(true) + s.mu.Lock() + defer s.mu.Unlock() + s.flushAOF(true) }() } } @@ -686,21 +686,21 @@ func byCollectionKey(a, b interface{}) bool { return a.(*collectionKeyContainer).key < b.(*collectionKeyContainer).key } -func (server *Server) setCol(key string, col *collection.Collection) { - server.cols.Set(&collectionKeyContainer{key, col}) +func (s *Server) setCol(key string, col *collection.Collection) { + s.cols.Set(&collectionKeyContainer{key, col}) } -func (server *Server) getCol(key string) *collection.Collection { - if v := server.cols.Get(&collectionKeyContainer{key: key}); v != nil { +func (s *Server) getCol(key string) *collection.Collection { + if v := s.cols.Get(&collectionKeyContainer{key: key}); v != nil { return v.(*collectionKeyContainer).col } return nil } -func (server *Server) scanGreaterOrEqual( +func (s *Server) scanGreaterOrEqual( key string, iterator func(key string, col *collection.Collection) bool, ) { - server.cols.Ascend(&collectionKeyContainer{key: key}, + s.cols.Ascend(&collectionKeyContainer{key: key}, func(v interface{}) bool { vcol := v.(*collectionKeyContainer) return iterator(vcol.key, vcol.col) @@ -708,8 +708,8 @@ func (server *Server) scanGreaterOrEqual( ) } -func (server *Server) deleteCol(key string) *collection.Collection { - if v := server.cols.Delete(&collectionKeyContainer{key: key}); v != nil { +func (s *Server) deleteCol(key string) *collection.Collection { + if v := s.cols.Delete(&collectionKeyContainer{key: key}); v != nil { return v.(*collectionKeyContainer).col } return nil @@ -743,7 +743,7 @@ func rewriteTimeoutMsg(msg *Message) (err error) { return } -func (server *Server) handleInputCommand(client *Client, msg *Message) error { +func (s *Server) handleInputCommand(client *Client, msg *Message) error { start := time.Now() serializeOutput := func(res resp.Value) (string, error) { var resStr string @@ -768,7 +768,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { return WriteWebSocketMessage(client, []byte(res)) case HTTP: status := "200 OK" - if (server.http500Errors || msg._command == "healthz") && + if (s.http500Errors || msg._command == "healthz") && !gjson.Get(res, "ok").Bool() { status = "500 Internal Server Error" } @@ -821,7 +821,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { } return writeOutput("+PONG\r\n") } - server.sendMonitor(nil, msg, client, false) + s.sendMonitor(nil, msg, client, false) return nil } @@ -853,7 +853,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { return nil } - if !server.loadedAndReady.on() { + if !s.loadedAndReady.on() { switch msg.Command() { case "output", "ping", "echo": default: @@ -870,7 +870,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { var write bool if (!client.authd || cmd == "auth") && cmd != "output" { - if server.config.requirePass() != "" { + if s.config.requirePass() != "" { password := "" // This better be an AUTH command or the Message should contain an Auth if cmd != "auth" && msg.Auth == "" { @@ -884,7 +884,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { password = msg.Args[1] } } - if server.config.requirePass() != strings.TrimSpace(password) { + if s.config.requirePass() != strings.TrimSpace(password) { return writeErr("invalid password") } client.authd = true @@ -900,30 +900,30 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { // choose the locking strategy switch msg.Command() { default: - server.mu.RLock() - defer server.mu.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() case "set", "del", "drop", "fset", "flushdb", "setchan", "pdelchan", "delchan", "sethook", "pdelhook", "delhook", "expire", "persist", "jset", "pdel", "rename", "renamenx": // write operations write = true - server.mu.Lock() - defer server.mu.Unlock() - if server.config.followHost() != "" { + s.mu.Lock() + defer s.mu.Unlock() + if s.config.followHost() != "" { return writeErr("not the leader") } - if server.config.readOnly() { + if s.config.readOnly() { return writeErr("read only") } case "eval", "evalsha": // write operations (potentially) but no AOF for the script command itself - server.mu.Lock() - defer server.mu.Unlock() - if server.config.followHost() != "" { + s.mu.Lock() + defer s.mu.Unlock() + if s.config.followHost() != "" { return writeErr("not the leader") } - if server.config.readOnly() { + if s.config.readOnly() { return writeErr("read only") } case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", @@ -931,16 +931,16 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { "evalro", "evalrosha", "healthz": // read operations - server.mu.RLock() - defer server.mu.RUnlock() - if server.config.followHost() != "" && !server.fcuponce { + s.mu.RLock() + defer s.mu.RUnlock() + if s.config.followHost() != "" && !s.fcuponce { return writeErr("catching up to leader") } case "follow", "slaveof", "replconf", "readonly", "config": // system operations // does not write to aof, but requires a write lock. - server.mu.Lock() - defer server.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() case "output": // this is local connection operation. Locks not needed. case "echo": @@ -948,18 +948,18 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { // dev operation case "sleep": // dev operation - server.mu.RLock() - defer server.mu.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() case "shutdown": // dev operation - server.mu.Lock() - defer server.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() case "aofshrink": - server.mu.RLock() - defer server.mu.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() case "client": - server.mu.Lock() - defer server.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() case "evalna", "evalnasha": // No locking for scripts, otherwise writes cannot happen within scripts case "subscribe", "psubscribe", "publish": @@ -987,7 +987,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { } }() } - res, d, err = server.command(msg, client) + res, d, err = s.command(msg, client) if msg.Deadline != nil { msg.Deadline.Check() } @@ -1003,7 +1003,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { return writeErr(err.Error()) } if write { - if err := server.writeAOF(msg.Args, &d); err != nil { + if err := s.writeAOF(msg.Args, &d); err != nil { if _, ok := err.(errAOFHook); ok { return writeErr(err.Error()) } @@ -1040,55 +1040,55 @@ func randomKey(n int) string { return fmt.Sprintf("%x", b) } -func (server *Server) reset() { - server.aofsz = 0 - server.cols = btree.NewNonConcurrent(byCollectionKey) +func (s *Server) reset() { + s.aofsz = 0 + s.cols = btree.NewNonConcurrent(byCollectionKey) } -func (server *Server) command(msg *Message, client *Client) ( +func (s *Server) command(msg *Message, client *Client) ( res resp.Value, d commandDetails, err error, ) { switch msg.Command() { default: err = fmt.Errorf("unknown command '%s'", msg.Args[0]) case "set": - res, d, err = server.cmdSet(msg) + res, d, err = s.cmdSet(msg) case "fset": - res, d, err = server.cmdFset(msg) + res, d, err = s.cmdFset(msg) case "del": - res, d, err = server.cmdDel(msg) + res, d, err = s.cmdDel(msg) case "pdel": - res, d, err = server.cmdPdel(msg) + res, d, err = s.cmdPdel(msg) case "drop": - res, d, err = server.cmdDrop(msg) + res, d, err = s.cmdDrop(msg) case "flushdb": - res, d, err = server.cmdFlushDB(msg) + res, d, err = s.cmdFlushDB(msg) case "rename": - res, d, err = server.cmdRename(msg) + res, d, err = s.cmdRename(msg) case "renamenx": - res, d, err = server.cmdRename(msg) + res, d, err = s.cmdRename(msg) case "sethook": - res, d, err = server.cmdSetHook(msg) + res, d, err = s.cmdSetHook(msg) case "delhook": - res, d, err = server.cmdDelHook(msg) + res, d, err = s.cmdDelHook(msg) case "pdelhook": - res, d, err = server.cmdPDelHook(msg) + res, d, err = s.cmdPDelHook(msg) case "hooks": - res, err = server.cmdHooks(msg) + res, err = s.cmdHooks(msg) case "setchan": - res, d, err = server.cmdSetHook(msg) + res, d, err = s.cmdSetHook(msg) case "delchan": - res, d, err = server.cmdDelHook(msg) + res, d, err = s.cmdDelHook(msg) case "pdelchan": - res, d, err = server.cmdPDelHook(msg) + res, d, err = s.cmdPDelHook(msg) case "chans": - res, err = server.cmdHooks(msg) + res, err = s.cmdHooks(msg) case "expire": - res, d, err = server.cmdExpire(msg) + res, d, err = s.cmdExpire(msg) case "persist": - res, d, err = server.cmdPersist(msg) + res, d, err = s.cmdPersist(msg) case "ttl": - res, err = server.cmdTTL(msg) + res, err = s.cmdTTL(msg) case "shutdown": if !core.DevMode { err = fmt.Errorf("unknown command '%s'", msg.Args[0]) @@ -1100,70 +1100,70 @@ func (server *Server) command(msg *Message, client *Client) ( err = fmt.Errorf("unknown command '%s'", msg.Args[0]) return } - res, err = server.cmdMassInsert(msg) + res, err = s.cmdMassInsert(msg) case "sleep": if !core.DevMode { err = fmt.Errorf("unknown command '%s'", msg.Args[0]) return } - res, err = server.cmdSleep(msg) + res, err = s.cmdSleep(msg) case "follow", "slaveof": - res, err = server.cmdFollow(msg) + res, err = s.cmdFollow(msg) case "replconf": - res, err = server.cmdReplConf(msg, client) + res, err = s.cmdReplConf(msg, client) case "readonly": - res, err = server.cmdReadOnly(msg) + res, err = s.cmdReadOnly(msg) case "stats": - res, err = server.cmdStats(msg) + res, err = s.cmdStats(msg) case "server": - res, err = server.cmdServer(msg) + res, err = s.cmdServer(msg) case "healthz": - res, err = server.cmdHealthz(msg) + res, err = s.cmdHealthz(msg) case "info": - res, err = server.cmdInfo(msg) + res, err = s.cmdInfo(msg) case "scan": - res, err = server.cmdScan(msg) + res, err = s.cmdScan(msg) case "nearby": - res, err = server.cmdNearby(msg) + res, err = s.cmdNearby(msg) case "within": - res, err = server.cmdWithin(msg) + res, err = s.cmdWithin(msg) case "intersects": - res, err = server.cmdIntersects(msg) + res, err = s.cmdIntersects(msg) case "search": - res, err = server.cmdSearch(msg) + res, err = s.cmdSearch(msg) case "bounds": - res, err = server.cmdBounds(msg) + res, err = s.cmdBounds(msg) case "get": - res, err = server.cmdGet(msg) + res, err = s.cmdGet(msg) case "jget": - res, err = server.cmdJget(msg) + res, err = s.cmdJget(msg) case "jset": - res, d, err = server.cmdJset(msg) + res, d, err = s.cmdJset(msg) case "jdel": - res, d, err = server.cmdJdel(msg) + res, d, err = s.cmdJdel(msg) case "type": - res, err = server.cmdType(msg) + res, err = s.cmdType(msg) case "keys": - res, err = server.cmdKeys(msg) + res, err = s.cmdKeys(msg) case "output": - res, err = server.cmdOutput(msg) + res, err = s.cmdOutput(msg) case "aof": - res, err = server.cmdAOF(msg) + res, err = s.cmdAOF(msg) case "aofmd5": - res, err = server.cmdAOFMD5(msg) + res, err = s.cmdAOFMD5(msg) case "gc": runtime.GC() debug.FreeOSMemory() res = OKMessage(msg, time.Now()) case "aofshrink": - go server.aofshrink() + go s.aofshrink() res = OKMessage(msg, time.Now()) case "config get": - res, err = server.cmdConfigGet(msg) + res, err = s.cmdConfigGet(msg) case "config set": - res, err = server.cmdConfigSet(msg) + res, err = s.cmdConfigSet(msg) case "config rewrite": - res, err = server.cmdConfigRewrite(msg) + res, err = s.cmdConfigRewrite(msg) case "config", "script": // These get rewritten into "config foo" and "script bar" err = fmt.Errorf("unknown command '%s'", msg.Args[0]) @@ -1171,32 +1171,32 @@ func (server *Server) command(msg *Message, client *Client) ( msg.Args[1] = msg.Args[0] + " " + msg.Args[1] msg.Args = msg.Args[1:] msg._command = "" - return server.command(msg, client) + return s.command(msg, client) } case "client": - res, err = server.cmdClient(msg, client) + res, err = s.cmdClient(msg, client) case "eval", "evalro", "evalna": - res, err = server.cmdEvalUnified(false, msg) + res, err = s.cmdEvalUnified(false, msg) case "evalsha", "evalrosha", "evalnasha": - res, err = server.cmdEvalUnified(true, msg) + res, err = s.cmdEvalUnified(true, msg) case "script load": - res, err = server.cmdScriptLoad(msg) + res, err = s.cmdScriptLoad(msg) case "script exists": - res, err = server.cmdScriptExists(msg) + res, err = s.cmdScriptExists(msg) case "script flush": - res, err = server.cmdScriptFlush(msg) + res, err = s.cmdScriptFlush(msg) case "subscribe": - res, err = server.cmdSubscribe(msg) + res, err = s.cmdSubscribe(msg) case "psubscribe": - res, err = server.cmdPsubscribe(msg) + res, err = s.cmdPsubscribe(msg) case "publish": - res, err = server.cmdPublish(msg) + res, err = s.cmdPublish(msg) case "test": - res, err = server.cmdTest(msg) + res, err = s.cmdTest(msg) case "monitor": - res, err = server.cmdMonitor(msg) + res, err = s.cmdMonitor(msg) } - server.sendMonitor(err, msg, client, false) + s.sendMonitor(err, msg, client, false) return }