Minor refactor

This commit is contained in:
tidwall 2021-12-09 09:24:26 -07:00
parent f3947407aa
commit 29a6d05f3f
6 changed files with 405 additions and 404 deletions

View File

@ -18,25 +18,25 @@ const maxkeys = 8
const maxids = 32 const maxids = 32
const maxchunk = 4 * 1024 * 1024 const maxchunk = 4 * 1024 * 1024
func (server *Server) aofshrink() { func (s *Server) aofshrink() {
if server.aof == nil { if s.aof == nil {
return return
} }
start := time.Now() start := time.Now()
server.mu.Lock() s.mu.Lock()
if server.shrinking { if s.shrinking {
server.mu.Unlock() s.mu.Unlock()
return return
} }
server.shrinking = true s.shrinking = true
server.shrinklog = nil s.shrinklog = nil
server.mu.Unlock() s.mu.Unlock()
defer func() { defer func() {
server.mu.Lock() s.mu.Lock()
server.shrinking = false s.shrinking = false
server.shrinklog = nil s.shrinklog = nil
server.mu.Unlock() s.mu.Unlock()
log.Infof("aof shrink ended %v", time.Since(start)) log.Infof("aof shrink ended %v", time.Since(start))
}() }()
@ -59,9 +59,9 @@ func (server *Server) aofshrink() {
} }
keysdone = true keysdone = true
func() { func() {
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
server.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool { s.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
if len(keys) == maxkeys { if len(keys) == maxkeys {
keysdone = false keysdone = false
nextkey = key nextkey = key
@ -85,9 +85,9 @@ func (server *Server) aofshrink() {
// load more objects // load more objects
func() { func() {
idsdone = true idsdone = true
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
col := server.getCol(keys[0]) col := s.getCol(keys[0])
if col == nil { if col == nil {
return return
} }
@ -167,10 +167,10 @@ func (server *Server) aofshrink() {
// first load the names of the hooks // first load the names of the hooks
var hnames []string var hnames []string
func() { func() {
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
hnames = make([]string, 0, server.hooks.Len()) hnames = make([]string, 0, s.hooks.Len())
server.hooks.Walk(func(v []interface{}) { s.hooks.Walk(func(v []interface{}) {
for _, v := range v { for _, v := range v {
hnames = append(hnames, v.(*Hook).Name) hnames = append(hnames, v.(*Hook).Name)
} }
@ -179,9 +179,9 @@ func (server *Server) aofshrink() {
var hookHint btree.PathHint var hookHint btree.PathHint
for _, name := range hnames { for _, name := range hnames {
func() { func() {
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
hook, _ := server.hooks.GetHint(&Hook{Name: name}, &hookHint).(*Hook) hook, _ := s.hooks.GetHint(&Hook{Name: name}, &hookHint).(*Hook)
if hook == nil { if hook == nil {
return return
} }
@ -230,26 +230,26 @@ func (server *Server) aofshrink() {
// finally grab any new data that may have been written since // finally grab any new data that may have been written since
// the aofshrink has started and swap out the files. // the aofshrink has started and swap out the files.
return func() error { return func() error {
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
// kill all followers connections and close their files. This // kill all followers connections and close their files. This
// ensures that there is only one opened AOF at a time which is // ensures that there is only one opened AOF at a time which is
// what Windows requires in order to perform the Rename function // what Windows requires in order to perform the Rename function
// below. // below.
for conn, f := range server.aofconnM { for conn, f := range s.aofconnM {
conn.Close() conn.Close()
f.Close() f.Close()
} }
// send a broadcast to all sleeping followers // send a broadcast to all sleeping followers
server.fcond.Broadcast() s.fcond.Broadcast()
// flush the aof buffer // flush the aof buffer
server.flushAOF(false) s.flushAOF(false)
aofbuf = aofbuf[:0] aofbuf = aofbuf[:0]
for _, values := range server.shrinklog { for _, values := range s.shrinklog {
// append the values to the aof buffer // append the values to the aof buffer
aofbuf = append(aofbuf, '*') aofbuf = append(aofbuf, '*')
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...) aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
@ -274,7 +274,7 @@ func (server *Server) aofshrink() {
// anything below this point is unrecoverable. just log and exit process // anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error // back up the live aof, just in case of fatal error
if err := server.aof.Close(); err != nil { if err := s.aof.Close(); err != nil {
log.Fatalf("shrink live aof close fatal operation: %v", err) log.Fatalf("shrink live aof close fatal operation: %v", err)
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
@ -286,16 +286,16 @@ func (server *Server) aofshrink() {
if err := os.Rename(core.AppendFileName+"-shrink", core.AppendFileName); err != nil { if err := os.Rename(core.AppendFileName+"-shrink", core.AppendFileName); err != nil {
log.Fatalf("shrink rename fatal operation: %v", err) log.Fatalf("shrink rename fatal operation: %v", err)
} }
server.aof, err = os.OpenFile(core.AppendFileName, os.O_CREATE|os.O_RDWR, 0600) s.aof, err = os.OpenFile(core.AppendFileName, os.O_CREATE|os.O_RDWR, 0600)
if err != nil { if err != nil {
log.Fatalf("shrink openfile fatal operation: %v", err) log.Fatalf("shrink openfile fatal operation: %v", err)
} }
var n int64 var n int64
n, err = server.aof.Seek(0, 2) n, err = s.aof.Seek(0, 2)
if err != nil { if err != nil {
log.Fatalf("shrink seek end fatal operation: %v", err) log.Fatalf("shrink seek end fatal operation: %v", err)
} }
server.aofsz = int(n) s.aofsz = int(n)
os.Remove(core.AppendFileName + "-bak") // ignore error os.Remove(core.AppendFileName + "-bak") // ignore error

View File

@ -37,7 +37,7 @@ func orderFields(fmap map[string]int, farr []string, fields []float64) []fvt {
} }
return fvs return fvs
} }
func (server *Server) cmdBounds(msg *Message) (resp.Value, error) { func (s *Server) cmdBounds(msg *Message) (resp.Value, error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
@ -50,7 +50,7 @@ func (server *Server) cmdBounds(msg *Message) (resp.Value, error) {
return NOMessage, errInvalidNumberOfArguments return NOMessage, errInvalidNumberOfArguments
} }
col := server.getCol(key) col := s.getCol(key)
if col == nil { if col == nil {
if msg.OutputType == RESP { if msg.OutputType == RESP {
return resp.NullValue(), nil return resp.NullValue(), nil
@ -94,7 +94,7 @@ func (server *Server) cmdBounds(msg *Message) (resp.Value, error) {
return NOMessage, nil return NOMessage, nil
} }
func (server *Server) cmdType(msg *Message) (resp.Value, error) { func (s *Server) cmdType(msg *Message) (resp.Value, error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
@ -104,7 +104,7 @@ func (server *Server) cmdType(msg *Message) (resp.Value, error) {
return NOMessage, errInvalidNumberOfArguments return NOMessage, errInvalidNumberOfArguments
} }
col := server.getCol(key) col := s.getCol(key)
if col == nil { if col == nil {
if msg.OutputType == RESP { if msg.OutputType == RESP {
return resp.SimpleStringValue("none"), nil return resp.SimpleStringValue("none"), nil
@ -123,7 +123,7 @@ func (server *Server) cmdType(msg *Message) (resp.Value, error) {
return NOMessage, nil return NOMessage, nil
} }
func (server *Server) cmdGet(msg *Message) (resp.Value, error) { func (s *Server) cmdGet(msg *Message) (resp.Value, error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
@ -142,7 +142,7 @@ func (server *Server) cmdGet(msg *Message) (resp.Value, error) {
vs = vs[1:] vs = vs[1:]
} }
col := server.getCol(key) col := s.getCol(key)
if col == nil { if col == nil {
if msg.OutputType == RESP { if msg.OutputType == RESP {
return resp.NullValue(), nil return resp.NullValue(), nil
@ -278,7 +278,7 @@ func (server *Server) cmdGet(msg *Message) (resp.Value, error) {
return NOMessage, nil return NOMessage, nil
} }
func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
var ok bool var ok bool
@ -295,17 +295,17 @@ func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, er
return return
} }
found := false found := false
col := server.getCol(d.key) col := s.getCol(d.key)
if col != nil { if col != nil {
d.obj, d.fields, ok = col.Delete(d.id) d.obj, d.fields, ok = col.Delete(d.id)
if ok { if ok {
if col.Count() == 0 { if col.Count() == 0 {
server.deleteCol(d.key) s.deleteCol(d.key)
} }
found = true found = true
} }
} }
server.groupDisconnectObject(d.key, d.id) s.groupDisconnectObject(d.key, d.id)
d.command = "del" d.command = "del"
d.updated = found d.updated = found
d.timestamp = time.Now() d.timestamp = time.Now()
@ -322,7 +322,7 @@ func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, er
return return
} }
func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
var ok bool var ok bool
@ -353,7 +353,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e
} }
var expired int var expired int
col := server.getCol(d.key) col := s.getCol(d.key)
if col != nil { if col != nil {
g := glob.Parse(d.pattern, false) g := glob.Parse(d.pattern, false)
if g.Limits[0] == "" && g.Limits[1] == "" { if g.Limits[0] == "" && g.Limits[1] == "" {
@ -370,7 +370,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e
} else { } else {
d.children[i] = dc d.children[i] = dc
} }
server.groupDisconnectObject(dc.key, dc.id) s.groupDisconnectObject(dc.key, dc.id)
} }
if atLeastOneNotDeleted { if atLeastOneNotDeleted {
var nchildren []*commandDetails var nchildren []*commandDetails
@ -382,7 +382,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e
d.children = nchildren d.children = nchildren
} }
if col.Count() == 0 { if col.Count() == 0 {
server.deleteCol(d.key) s.deleteCol(d.key)
} }
} }
d.command = "pdel" d.command = "pdel"
@ -402,7 +402,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e
return return
} }
func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
var ok bool var ok bool
@ -414,15 +414,15 @@ func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, e
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
col := server.getCol(d.key) col := s.getCol(d.key)
if col != nil { if col != nil {
server.deleteCol(d.key) s.deleteCol(d.key)
d.updated = true d.updated = true
} else { } else {
d.key = "" // ignore the details d.key = "" // ignore the details
d.updated = false d.updated = false
} }
server.groupDisconnectCollection(d.key) s.groupDisconnectCollection(d.key)
d.command = "drop" d.command = "drop"
d.timestamp = time.Now() d.timestamp = time.Now()
switch msg.OutputType { switch msg.OutputType {
@ -438,7 +438,7 @@ func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, e
return return
} }
func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err error) {
nx := msg.Command() == "renamenx" nx := msg.Command() == "renamenx"
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
@ -455,12 +455,12 @@ func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails,
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
col := server.getCol(d.key) col := s.getCol(d.key)
if col == nil { if col == nil {
err = errKeyNotFound err = errKeyNotFound
return return
} }
server.hooks.Ascend(nil, func(v interface{}) bool { s.hooks.Ascend(nil, func(v interface{}) bool {
h := v.(*Hook) h := v.(*Hook)
if h.Key == d.key || h.Key == d.newKey { if h.Key == d.key || h.Key == d.newKey {
err = errKeyHasHooksSet err = errKeyHasHooksSet
@ -469,18 +469,18 @@ func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails,
return true return true
}) })
d.command = "rename" d.command = "rename"
newCol := server.getCol(d.newKey) newCol := s.getCol(d.newKey)
if newCol == nil { if newCol == nil {
d.updated = true d.updated = true
} else if nx { } else if nx {
d.updated = false d.updated = false
} else { } else {
server.deleteCol(d.newKey) s.deleteCol(d.newKey)
d.updated = true d.updated = true
} }
if d.updated { if d.updated {
server.deleteCol(d.key) s.deleteCol(d.key)
server.setCol(d.newKey, col) s.setCol(d.newKey, col)
} }
d.timestamp = time.Now() d.timestamp = time.Now()
switch msg.OutputType { switch msg.OutputType {
@ -498,7 +498,7 @@ func (server *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails,
return return
} }
func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
if len(vs) != 0 { if len(vs) != 0 {
@ -507,14 +507,14 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
} }
// clear the entire database // clear the entire database
server.cols = btree.NewNonConcurrent(byCollectionKey) s.cols = btree.NewNonConcurrent(byCollectionKey)
server.groupHooks = btree.NewNonConcurrent(byGroupHook) s.groupHooks = btree.NewNonConcurrent(byGroupHook)
server.groupObjects = btree.NewNonConcurrent(byGroupObject) s.groupObjects = btree.NewNonConcurrent(byGroupObject)
server.hookExpires = btree.NewNonConcurrent(byHookExpires) s.hookExpires = btree.NewNonConcurrent(byHookExpires)
server.hooks = btree.NewNonConcurrent(byHookName) s.hooks = btree.NewNonConcurrent(byHookName)
server.hooksOut = btree.NewNonConcurrent(byHookName) s.hooksOut = btree.NewNonConcurrent(byHookName)
server.hookTree = &rtree.RTree{} s.hookTree = &rtree.RTree{}
server.hookCross = &rtree.RTree{} s.hookCross = &rtree.RTree{}
d.command = "flushdb" d.command = "flushdb"
d.updated = true d.updated = true
@ -528,7 +528,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
return return
} }
func (server *Server) parseSetArgs(vs []string) ( func (s *Server) parseSetArgs(vs []string) (
d commandDetails, fields []string, values []float64, d commandDetails, fields []string, values []float64,
xx, nx bool, xx, nx bool,
ex int64, etype []byte, evs []string, err error, ex int64, etype []byte, evs []string, err error,
@ -737,7 +737,7 @@ func (server *Server) parseSetArgs(vs []string) (
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
d.obj, err = geojson.Parse(object, &server.geomParseOpts) d.obj, err = geojson.Parse(object, &s.geomParseOpts)
if err != nil { if err != nil {
return return
} }
@ -748,8 +748,8 @@ func (server *Server) parseSetArgs(vs []string) (
return return
} }
func (server *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, err error) {
if server.config.maxMemory() > 0 && server.outOfMemory.on() { if s.config.maxMemory() > 0 && s.outOfMemory.on() {
err = errOOM err = errOOM
return return
} }
@ -760,17 +760,17 @@ func (server *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, er
var values []float64 var values []float64
var xx, nx bool var xx, nx bool
var ex int64 var ex int64
d, fields, values, xx, nx, ex, _, _, err = server.parseSetArgs(vs) d, fields, values, xx, nx, ex, _, _, err = s.parseSetArgs(vs)
if err != nil { if err != nil {
return return
} }
col := server.getCol(d.key) col := s.getCol(d.key)
if col == nil { if col == nil {
if xx { if xx {
goto notok goto notok
} }
col = collection.New() col = collection.New()
server.setCol(d.key, col) s.setCol(d.key, col)
} }
if xx || nx { if xx || nx {
_, _, _, ok := col.Get(d.id) _, _, _, ok := col.Get(d.id)
@ -817,7 +817,7 @@ notok:
return return
} }
func (server *Server) parseFSetArgs(vs []string) ( func (s *Server) parseFSetArgs(vs []string) (
d commandDetails, fields []string, values []float64, xx bool, err error, d commandDetails, fields []string, values []float64, xx bool, err error,
) { ) {
var ok bool var ok bool
@ -860,8 +860,8 @@ func (server *Server) parseFSetArgs(vs []string) (
return return
} }
func (server *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, err error) {
if server.config.maxMemory() > 0 && server.outOfMemory.on() { if s.config.maxMemory() > 0 && s.outOfMemory.on() {
err = errOOM err = errOOM
return return
} }
@ -871,9 +871,9 @@ func (server *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, e
var values []float64 var values []float64
var xx bool var xx bool
var updateCount int var updateCount int
d, fields, values, xx, err = server.parseFSetArgs(vs) d, fields, values, xx, err = s.parseFSetArgs(vs)
col := server.getCol(d.key) col := s.getCol(d.key)
if col == nil { if col == nil {
err = errKeyNotFound err = errKeyNotFound
return return
@ -904,7 +904,7 @@ func (server *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, e
return return
} }
func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
var key, id, svalue string var key, id, svalue string
@ -932,7 +932,7 @@ func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails,
return return
} }
ok = false ok = false
col := server.getCol(key) col := s.getCol(key)
if col != nil { if col != nil {
ex := time.Now().Add(time.Duration(float64(time.Second) * value)).UnixNano() ex := time.Now().Add(time.Duration(float64(time.Second) * value)).UnixNano()
ok = col.SetExpires(id, ex) ok = col.SetExpires(id, ex)
@ -957,7 +957,7 @@ func (server *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails,
return return
} }
func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails, err error) { func (s *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
var key, id string var key, id string
@ -976,7 +976,7 @@ func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails
} }
var cleared bool var cleared bool
ok = false ok = false
col := server.getCol(key) col := s.getCol(key)
if col != nil { if col != nil {
var ex int64 var ex int64
_, _, ex, ok = col.Get(id) _, _, ex, ok = col.Get(id)
@ -1009,7 +1009,7 @@ func (server *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails
return return
} }
func (server *Server) cmdTTL(msg *Message) (res resp.Value, err error) { func (s *Server) cmdTTL(msg *Message) (res resp.Value, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
var key, id string var key, id string
@ -1029,7 +1029,7 @@ func (server *Server) cmdTTL(msg *Message) (res resp.Value, err error) {
var v float64 var v float64
ok = false ok = false
var ok2 bool var ok2 bool
col := server.getCol(key) col := s.getCol(key)
if col != nil { if col != nil {
var ex int64 var ex int64
_, _, ex, ok = col.Get(id) _, _, ex, ok = col.Get(id)

View File

@ -20,20 +20,20 @@ type liveBuffer struct {
cond *sync.Cond cond *sync.Cond
} }
func (server *Server) processLives() { func (s *Server) processLives() {
server.lcond.L.Lock() s.lcond.L.Lock()
defer server.lcond.L.Unlock() defer s.lcond.L.Unlock()
for { for {
if server.stopServer.on() { if s.stopServer.on() {
return return
} }
for len(server.lstack) > 0 { for len(s.lstack) > 0 {
item := server.lstack[0] item := s.lstack[0]
server.lstack = server.lstack[1:] s.lstack = s.lstack[1:]
if len(server.lstack) == 0 { if len(s.lstack) == 0 {
server.lstack = nil s.lstack = nil
} }
for lb := range server.lives { for lb := range s.lives {
lb.cond.L.Lock() lb.cond.L.Lock()
if lb.key != "" && lb.key == item.key { if lb.key != "" && lb.key == item.key {
lb.details = append(lb.details, item) lb.details = append(lb.details, item)
@ -42,7 +42,7 @@ func (server *Server) processLives() {
lb.cond.L.Unlock() lb.cond.L.Unlock()
} }
} }
server.lcond.Wait() s.lcond.Wait()
} }
} }
@ -72,7 +72,7 @@ func writeLiveMessage(
return err return err
} }
func (server *Server) goLive( func (s *Server) goLive(
inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool, inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool,
) error { ) error {
addr := conn.RemoteAddr().String() addr := conn.RemoteAddr().String()
@ -80,15 +80,15 @@ func (server *Server) goLive(
defer func() { defer func() {
log.Info("not live " + addr) log.Info("not live " + addr)
}() }()
switch s := inerr.(type) { switch lfs := inerr.(type) {
default: default:
return errors.New("invalid live type switches") return errors.New("invalid live type switches")
case liveAOFSwitches: case liveAOFSwitches:
return server.liveAOF(s.pos, conn, rd, msg) return s.liveAOF(lfs.pos, conn, rd, msg)
case liveSubscriptionSwitches: case liveSubscriptionSwitches:
return server.liveSubscription(conn, rd, msg, websocket) return s.liveSubscription(conn, rd, msg, websocket)
case liveMonitorSwitches: case liveMonitorSwitches:
return server.liveMonitor(conn, rd, msg) return s.liveMonitor(conn, rd, msg)
case liveFenceSwitches: case liveFenceSwitches:
// fallthrough // fallthrough
} }
@ -100,27 +100,27 @@ func (server *Server) goLive(
var err error var err error
var sw *scanWriter var sw *scanWriter
var wr bytes.Buffer var wr bytes.Buffer
s := inerr.(liveFenceSwitches) lfs := inerr.(liveFenceSwitches)
lb.glob = s.glob lb.glob = lfs.glob
lb.key = s.key lb.key = lfs.key
lb.fence = &s lb.fence = &lfs
server.mu.RLock() s.mu.RLock()
sw, err = server.newScanWriter( sw, err = s.newScanWriter(
&wr, msg, s.key, s.output, s.precision, s.glob, false, &wr, msg, lfs.key, lfs.output, lfs.precision, lfs.glob, false,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) lfs.cursor, lfs.limit, lfs.wheres, lfs.whereins, lfs.whereevals, lfs.nofields)
server.mu.RUnlock() s.mu.RUnlock()
// everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS // everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS
if err != nil { if err != nil {
return err return err
} }
server.lcond.L.Lock() s.lcond.L.Lock()
server.lives[lb] = true s.lives[lb] = true
server.lcond.L.Unlock() s.lcond.L.Unlock()
defer func() { defer func() {
server.lcond.L.Lock() s.lcond.L.Lock()
delete(server.lives, lb) delete(s.lives, lb)
server.lcond.L.Unlock() s.lcond.L.Unlock()
conn.Close() conn.Close()
}() }()
@ -187,8 +187,8 @@ func (server *Server) goLive(
var msgs []string var msgs []string
func() { func() {
// safely lock the fence because we are outside the main loop // safely lock the fence because we are outside the main loop
server.mu.RLock() s.mu.RLock()
defer server.mu.RUnlock() defer s.mu.RUnlock()
msgs = FenceMatch("", sw, fence, nil, details) msgs = FenceMatch("", sw, fence, nil, details)
}() }()
for _, msg := range msgs { for _, msg := range msgs {
@ -196,7 +196,7 @@ func (server *Server) goLive(
return nil // nil return is fine here return nil // nil return is fine here
} }
} }
server.statsTotalMsgsSent.add(len(msgs)) s.statsTotalMsgsSent.add(len(msgs))
lb.cond.L.Lock() lb.cond.L.Lock()
} }

View File

@ -76,7 +76,8 @@ type ScanWriterParams struct {
func (s *Server) newScanWriter( func (s *Server) newScanWriter(
wr *bytes.Buffer, msg *Message, key string, output outputT, wr *bytes.Buffer, msg *Message, key string, output outputT,
precision uint64, globPattern string, matchValues bool, precision uint64, globPattern string, matchValues bool,
cursor, limit uint64, wheres []whereT, whereins []whereinT, whereevals []whereevalT, nofields bool, cursor, limit uint64, wheres []whereT, whereins []whereinT,
whereevals []whereevalT, nofields bool,
) ( ) (
*scanWriter, error, *scanWriter, error,
) { ) {
@ -96,12 +97,12 @@ func (s *Server) newScanWriter(
s: s, s: s,
wr: wr, wr: wr,
msg: msg, msg: msg,
cursor: cursor,
limit: limit, limit: limit,
whereevals: whereevals, cursor: cursor,
output: output, output: output,
nofields: nofields, nofields: nofields,
precision: precision, precision: precision,
whereevals: whereevals,
globPattern: globPattern, globPattern: globPattern,
matchValues: matchValues, matchValues: matchValues,
} }

View File

@ -42,18 +42,18 @@ type roamMatch struct {
meters float64 meters float64
} }
func (s liveFenceSwitches) Error() string { func (lfs liveFenceSwitches) Error() string {
return goingLive return goingLive
} }
func (s liveFenceSwitches) Close() { func (lfs liveFenceSwitches) Close() {
for _, whereeval := range s.whereevals { for _, whereeval := range lfs.whereevals {
whereeval.Close() whereeval.Close()
} }
} }
func (s liveFenceSwitches) usingLua() bool { func (lfs liveFenceSwitches) usingLua() bool {
return len(s.whereevals) > 0 return len(lfs.whereevals) > 0
} }
func parseRectArea(ltyp string, vs []string) (nvs []string, rect *geojson.Rect, err error) { func parseRectArea(ltyp string, vs []string) (nvs []string, rect *geojson.Rect, err error) {
@ -169,29 +169,29 @@ func parseRectArea(ltyp string, vs []string) (nvs []string, rect *geojson.Rect,
return return
} }
func (server *Server) cmdSearchArgs( func (s *Server) cmdSearchArgs(
fromFenceCmd bool, cmd string, vs []string, types []string, fromFenceCmd bool, cmd string, vs []string, types []string,
) (s liveFenceSwitches, err error) { ) (lfs liveFenceSwitches, err error) {
var t searchScanBaseTokens var t searchScanBaseTokens
if fromFenceCmd { if fromFenceCmd {
t.fence = true t.fence = true
} }
vs, t, err = server.parseSearchScanBaseTokens(cmd, t, vs) vs, t, err = s.parseSearchScanBaseTokens(cmd, t, vs)
if err != nil { if err != nil {
return return
} }
s.searchScanBaseTokens = t lfs.searchScanBaseTokens = t
var typ string var typ string
var ok bool var ok bool
if vs, typ, ok = tokenval(vs); !ok || typ == "" { if vs, typ, ok = tokenval(vs); !ok || typ == "" {
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
if s.searchScanBaseTokens.output == outputBounds { if lfs.searchScanBaseTokens.output == outputBounds {
if cmd == "within" || cmd == "intersects" { if cmd == "within" || cmd == "intersects" {
if _, err := strconv.ParseFloat(typ, 64); err == nil { if _, err := strconv.ParseFloat(typ, 64); err == nil {
// It's likely that the output was not specified, but rather the search bounds. // It's likely that the output was not specified, but rather the search bounds.
s.searchScanBaseTokens.output = defaultSearchOutput lfs.searchScanBaseTokens.output = defaultSearchOutput
vs = append([]string{typ}, vs...) vs = append([]string{typ}, vs...)
typ = "BOUNDS" typ = "BOUNDS"
} }
@ -205,7 +205,7 @@ func (server *Server) cmdSearchArgs(
break break
} }
} }
if !found && s.searchScanBaseTokens.fence && ltyp == "roam" && cmd == "nearby" { if !found && lfs.searchScanBaseTokens.fence && ltyp == "roam" && cmd == "nearby" {
// allow roaming for nearby fence searches. // allow roaming for nearby fence searches.
found = true found = true
} }
@ -217,7 +217,7 @@ func (server *Server) cmdSearchArgs(
case "point": case "point":
fallthrough fallthrough
case "circle": case "circle":
if s.clip { if lfs.clip {
err = errInvalidArgument("cannot clip with " + ltyp) err = errInvalidArgument("cannot clip with " + ltyp)
return return
} }
@ -267,9 +267,9 @@ func (server *Server) cmdSearchArgs(
return return
} }
} }
s.obj = geojson.NewCircle(geometry.Point{X: lon, Y: lat}, meters, defaultCircleSteps) lfs.obj = geojson.NewCircle(geometry.Point{X: lon, Y: lat}, meters, defaultCircleSteps)
case "object": case "object":
if s.clip { if lfs.clip {
err = errInvalidArgument("cannot clip with object") err = errInvalidArgument("cannot clip with object")
return return
} }
@ -278,12 +278,12 @@ func (server *Server) cmdSearchArgs(
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
s.obj, err = geojson.Parse(obj, &server.geomParseOpts) lfs.obj, err = geojson.Parse(obj, &s.geomParseOpts)
if err != nil { if err != nil {
return return
} }
case "sector": case "sector":
if s.clip { if lfs.clip {
err = errInvalidArgument("cannot clip with " + ltyp) err = errInvalidArgument("cannot clip with " + ltyp)
return return
} }
@ -338,17 +338,17 @@ func (server *Server) cmdSearchArgs(
origin := sectr.Point{Lng: lon, Lat: lat} origin := sectr.Point{Lng: lon, Lat: lat}
sector := sectr.NewSector(origin, meters, b1, b2) sector := sectr.NewSector(origin, meters, b1, b2)
s.obj, err = geojson.Parse(string(sector.JSON()), &server.geomParseOpts) lfs.obj, err = geojson.Parse(string(sector.JSON()), &s.geomParseOpts)
if err != nil { if err != nil {
return return
} }
case "bounds", "hash", "tile", "quadkey": case "bounds", "hash", "tile", "quadkey":
vs, s.obj, err = parseRectArea(ltyp, vs) vs, lfs.obj, err = parseRectArea(ltyp, vs)
if err != nil { if err != nil {
return return
} }
case "get": case "get":
if s.clip { if lfs.clip {
err = errInvalidArgument("cannot clip with get") err = errInvalidArgument("cannot clip with get")
} }
var key, id string var key, id string
@ -360,33 +360,33 @@ func (server *Server) cmdSearchArgs(
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
col := server.getCol(key) col := s.getCol(key)
if col == nil { if col == nil {
err = errKeyNotFound err = errKeyNotFound
return return
} }
s.obj, _, _, ok = col.Get(id) lfs.obj, _, _, ok = col.Get(id)
if !ok { if !ok {
err = errIDNotFound err = errIDNotFound
return return
} }
case "roam": case "roam":
s.roam.on = true lfs.roam.on = true
if vs, s.roam.key, ok = tokenval(vs); !ok || s.roam.key == "" { if vs, lfs.roam.key, ok = tokenval(vs); !ok || lfs.roam.key == "" {
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
if vs, s.roam.id, ok = tokenval(vs); !ok || s.roam.id == "" { if vs, lfs.roam.id, ok = tokenval(vs); !ok || lfs.roam.id == "" {
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
s.roam.pattern = glob.IsGlob(s.roam.id) lfs.roam.pattern = glob.IsGlob(lfs.roam.id)
var smeters string var smeters string
if vs, smeters, ok = tokenval(vs); !ok || smeters == "" { if vs, smeters, ok = tokenval(vs); !ok || smeters == "" {
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
if s.roam.meters, err = strconv.ParseFloat(smeters, 64); err != nil { if lfs.roam.meters, err = strconv.ParseFloat(smeters, 64); err != nil {
err = errInvalidArgument(smeters) err = errInvalidArgument(smeters)
return return
} }
@ -400,7 +400,7 @@ func (server *Server) cmdSearchArgs(
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
s.roam.scan = scan lfs.roam.scan = scan
} }
} }
@ -430,7 +430,7 @@ func (server *Server) cmdSearchArgs(
if err != nil { if err != nil {
return return
} }
s.obj = clip.Clip(s.obj, clip_rect, &server.geomIndexOpts) lfs.obj = clip.Clip(lfs.obj, clip_rect, &s.geomIndexOpts)
default: default:
err = errInvalidArgument("cannot clipby " + ltok) err = errInvalidArgument("cannot clipby " + ltok)
return return
@ -443,13 +443,13 @@ var nearbyTypes = []string{"point"}
var withinOrIntersectsTypes = []string{ var withinOrIntersectsTypes = []string{
"geo", "bounds", "hash", "tile", "quadkey", "get", "object", "circle", "sector"} "geo", "bounds", "hash", "tile", "quadkey", "get", "object", "circle", "sector"}
func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) { func (s *Server) cmdNearby(msg *Message) (res resp.Value, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
wr := &bytes.Buffer{} wr := &bytes.Buffer{}
s, err := server.cmdSearchArgs(false, "nearby", vs, nearbyTypes) sargs, err := s.cmdSearchArgs(false, "nearby", vs, nearbyTypes)
if s.usingLua() { if sargs.usingLua() {
defer s.Close() defer sargs.Close()
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
res = NOMessage res = NOMessage
@ -461,13 +461,13 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) {
if err != nil { if err != nil {
return NOMessage, err return NOMessage, err
} }
s.cmd = "nearby" sargs.cmd = "nearby"
if s.fence { if sargs.fence {
return NOMessage, s return NOMessage, sargs
} }
sw, err := server.newScanWriter( sw, err := s.newScanWriter(
wr, msg, s.key, s.output, s.precision, s.glob, false, wr, msg, sargs.key, sargs.output, sargs.precision, sargs.glob, false,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) sargs.cursor, sargs.limit, sargs.wheres, sargs.whereins, sargs.whereevals, sargs.nofields)
if err != nil { if err != nil {
return NOMessage, err return NOMessage, err
} }
@ -482,14 +482,14 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) {
o: o, o: o,
fields: fields, fields: fields,
distance: meters, distance: meters,
distOutput: s.distance, distOutput: sargs.distance,
noLock: true, noLock: true,
ignoreGlobMatch: true, ignoreGlobMatch: true,
skipTesting: true, skipTesting: true,
}) })
} }
maxDist := s.obj.(*geojson.Circle).Meters() maxDist := sargs.obj.(*geojson.Circle).Meters()
if s.sparse > 0 { if sargs.sparse > 0 {
if maxDist < 0 { if maxDist < 0 {
// error cannot use SPARSE and KNN together // error cannot use SPARSE and KNN together
return NOMessage, return NOMessage,
@ -498,24 +498,24 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) {
// An intersects operation is required for SPARSE // An intersects operation is required for SPARSE
iter := func(id string, o geojson.Object, fields []float64) bool { iter := func(id string, o geojson.Object, fields []float64) bool {
var meters float64 var meters float64
if s.distance { if sargs.distance {
meters = o.Distance(s.obj) meters = o.Distance(sargs.obj)
} }
return iterStep(id, o, fields, meters) return iterStep(id, o, fields, meters)
} }
sw.col.Intersects(s.obj, s.sparse, sw, msg.Deadline, iter) sw.col.Intersects(sargs.obj, sargs.sparse, sw, msg.Deadline, iter)
} else { } else {
iter := func(id string, o geojson.Object, fields []float64, dist float64) bool { iter := func(id string, o geojson.Object, fields []float64, dist float64) bool {
if maxDist > 0 && dist > maxDist { if maxDist > 0 && dist > maxDist {
return false return false
} }
var meters float64 var meters float64
if s.distance { if sargs.distance {
meters = dist meters = dist
} }
return iterStep(id, o, fields, meters) return iterStep(id, o, fields, meters)
} }
sw.col.Nearby(s.obj, sw, msg.Deadline, iter) sw.col.Nearby(sargs.obj, sw, msg.Deadline, iter)
} }
} }
sw.writeFoot() sw.writeFoot()
@ -526,22 +526,22 @@ func (server *Server) cmdNearby(msg *Message) (res resp.Value, err error) {
return sw.respOut, nil return sw.respOut, nil
} }
func (server *Server) cmdWithin(msg *Message) (res resp.Value, err error) { func (s *Server) cmdWithin(msg *Message) (res resp.Value, err error) {
return server.cmdWithinOrIntersects("within", msg) return s.cmdWithinOrIntersects("within", msg)
} }
func (server *Server) cmdIntersects(msg *Message) (res resp.Value, err error) { func (s *Server) cmdIntersects(msg *Message) (res resp.Value, err error) {
return server.cmdWithinOrIntersects("intersects", msg) return s.cmdWithinOrIntersects("intersects", msg)
} }
func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.Value, err error) { func (s *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.Value, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
wr := &bytes.Buffer{} wr := &bytes.Buffer{}
s, err := server.cmdSearchArgs(false, cmd, vs, withinOrIntersectsTypes) sargs, err := s.cmdSearchArgs(false, cmd, vs, withinOrIntersectsTypes)
if s.usingLua() { if sargs.usingLua() {
defer s.Close() defer sargs.Close()
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
res = NOMessage res = NOMessage
@ -553,13 +553,13 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.
if err != nil { if err != nil {
return NOMessage, err return NOMessage, err
} }
s.cmd = cmd sargs.cmd = cmd
if s.fence { if sargs.fence {
return NOMessage, s return NOMessage, sargs
} }
sw, err := server.newScanWriter( sw, err := s.newScanWriter(
wr, msg, s.key, s.output, s.precision, s.glob, false, wr, msg, sargs.key, sargs.output, sargs.precision, sargs.glob, false,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) sargs.cursor, sargs.limit, sargs.wheres, sargs.whereins, sargs.whereevals, sargs.nofields)
if err != nil { if err != nil {
return NOMessage, err return NOMessage, err
} }
@ -569,7 +569,7 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.
sw.writeHead() sw.writeHead()
if sw.col != nil { if sw.col != nil {
if cmd == "within" { if cmd == "within" {
sw.col.Within(s.obj, s.sparse, sw, msg.Deadline, func( sw.col.Within(sargs.obj, sargs.sparse, sw, msg.Deadline, func(
id string, o geojson.Object, fields []float64, id string, o geojson.Object, fields []float64,
) bool { ) bool {
return sw.writeObject(ScanWriterParams{ return sw.writeObject(ScanWriterParams{
@ -580,7 +580,7 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.
}) })
}) })
} else if cmd == "intersects" { } else if cmd == "intersects" {
sw.col.Intersects(s.obj, s.sparse, sw, msg.Deadline, func( sw.col.Intersects(sargs.obj, sargs.sparse, sw, msg.Deadline, func(
id string, id string,
o geojson.Object, o geojson.Object,
fields []float64, fields []float64,
@ -591,8 +591,8 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.
fields: fields, fields: fields,
noLock: true, noLock: true,
} }
if s.clip { if sargs.clip {
params.clip = s.obj params.clip = sargs.obj
} }
return sw.writeObject(params) return sw.writeObject(params)
}) })
@ -606,15 +606,15 @@ func (server *Server) cmdWithinOrIntersects(cmd string, msg *Message) (res resp.
return sw.respOut, nil return sw.respOut, nil
} }
func (server *Server) cmdSeachValuesArgs(vs []string) ( func (s *Server) cmdSeachValuesArgs(vs []string) (
s liveFenceSwitches, err error, lfs liveFenceSwitches, err error,
) { ) {
var t searchScanBaseTokens var t searchScanBaseTokens
vs, t, err = server.parseSearchScanBaseTokens("search", t, vs) vs, t, err = s.parseSearchScanBaseTokens("search", t, vs)
if err != nil { if err != nil {
return return
} }
s.searchScanBaseTokens = t lfs.searchScanBaseTokens = t
if len(vs) != 0 { if len(vs) != 0 {
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
@ -622,14 +622,14 @@ func (server *Server) cmdSeachValuesArgs(vs []string) (
return return
} }
func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) { func (s *Server) cmdSearch(msg *Message) (res resp.Value, err error) {
start := time.Now() start := time.Now()
vs := msg.Args[1:] vs := msg.Args[1:]
wr := &bytes.Buffer{} wr := &bytes.Buffer{}
s, err := server.cmdSeachValuesArgs(vs) sargs, err := s.cmdSeachValuesArgs(vs)
if s.usingLua() { if sargs.usingLua() {
defer s.Close() defer sargs.Close()
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
res = NOMessage res = NOMessage
@ -641,9 +641,9 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) {
if err != nil { if err != nil {
return NOMessage, err return NOMessage, err
} }
sw, err := server.newScanWriter( sw, err := s.newScanWriter(
wr, msg, s.key, s.output, s.precision, s.glob, true, wr, msg, sargs.key, sargs.output, sargs.precision, sargs.glob, true,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) sargs.cursor, sargs.limit, sargs.wheres, sargs.whereins, sargs.whereevals, sargs.nofields)
if err != nil { if err != nil {
return NOMessage, err return NOMessage, err
} }
@ -653,15 +653,15 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) {
sw.writeHead() sw.writeHead()
if sw.col != nil { if sw.col != nil {
if sw.output == outputCount && len(sw.wheres) == 0 && sw.globEverything { if sw.output == outputCount && len(sw.wheres) == 0 && sw.globEverything {
count := sw.col.Count() - int(s.cursor) count := sw.col.Count() - int(sargs.cursor)
if count < 0 { if count < 0 {
count = 0 count = 0
} }
sw.count = uint64(count) sw.count = uint64(count)
} else { } else {
g := glob.Parse(sw.globPattern, s.desc) g := glob.Parse(sw.globPattern, sargs.desc)
if g.Limits[0] == "" && g.Limits[1] == "" { if g.Limits[0] == "" && g.Limits[1] == "" {
sw.col.SearchValues(s.desc, sw, msg.Deadline, sw.col.SearchValues(sargs.desc, sw, msg.Deadline,
func(id string, o geojson.Object, fields []float64) bool { func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(ScanWriterParams{ return sw.writeObject(ScanWriterParams{
id: id, id: id,
@ -675,7 +675,7 @@ func (server *Server) cmdSearch(msg *Message) (res resp.Value, err error) {
// must disable globSingle for string value type matching because // must disable globSingle for string value type matching because
// globSingle is only for ID matches, not values. // globSingle is only for ID matches, not values.
sw.globSingle = false sw.globSingle = false
sw.col.SearchValuesRange(g.Limits[0], g.Limits[1], s.desc, sw, sw.col.SearchValuesRange(g.Limits[0], g.Limits[1], sargs.desc, sw,
msg.Deadline, msg.Deadline,
func(id string, o geojson.Object, fields []float64) bool { func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(ScanWriterParams{ return sw.writeObject(ScanWriterParams{

View File

@ -155,8 +155,8 @@ func Serve(opts Options) error {
} }
log.Infof("Server started, Tile38 version %s, git %s", core.Version, core.GitSHA) log.Infof("Server started, Tile38 version %s, git %s", core.Version, core.GitSHA)
// Initialize the server // Initialize the s
server := &Server{ s := &Server{
unix: opts.UnixSocketPath, unix: opts.UnixSocketPath,
host: opts.Host, host: opts.Host,
port: opts.Port, port: opts.Port,
@ -182,42 +182,42 @@ func Serve(opts Options) error {
hookExpires: btree.NewNonConcurrent(byHookExpires), hookExpires: btree.NewNonConcurrent(byHookExpires),
} }
server.epc = endpoint.NewManager(server) s.epc = endpoint.NewManager(s)
server.luascripts = server.newScriptMap() s.luascripts = s.newScriptMap()
server.luapool = server.newPool() s.luapool = s.newPool()
defer server.luapool.Shutdown() defer s.luapool.Shutdown()
if err := os.MkdirAll(opts.Dir, 0700); err != nil { if err := os.MkdirAll(opts.Dir, 0700); err != nil {
return err return err
} }
var err error var err error
server.config, err = loadConfig(filepath.Join(opts.Dir, "config")) s.config, err = loadConfig(filepath.Join(opts.Dir, "config"))
if err != nil { if err != nil {
return err return err
} }
// Send "500 Internal Server" error instead of "200 OK" for json responses // Send "500 Internal Server" error instead of "200 OK" for json responses
// with `"ok":false`. T38HTTP500ERRORS=1 // with `"ok":false`. T38HTTP500ERRORS=1
server.http500Errors, _ = strconv.ParseBool(os.Getenv("T38HTTP500ERRORS")) s.http500Errors, _ = strconv.ParseBool(os.Getenv("T38HTTP500ERRORS"))
// Allow for geometry indexing options through environment variables: // Allow for geometry indexing options through environment variables:
// T38IDXGEOMKIND -- None, RTree, QuadTree // T38IDXGEOMKIND -- None, RTree, QuadTree
// T38IDXGEOM -- Min number of points in a geometry for indexing. // T38IDXGEOM -- Min number of points in a geometry for indexing.
// T38IDXMULTI -- Min number of object in a Multi/Collection for indexing. // T38IDXMULTI -- Min number of object in a Multi/Collection for indexing.
server.geomParseOpts = *geojson.DefaultParseOptions s.geomParseOpts = *geojson.DefaultParseOptions
server.geomIndexOpts = *geometry.DefaultIndexOptions s.geomIndexOpts = *geometry.DefaultIndexOptions
n, err := strconv.ParseUint(os.Getenv("T38IDXGEOM"), 10, 32) n, err := strconv.ParseUint(os.Getenv("T38IDXGEOM"), 10, 32)
if err == nil { if err == nil {
server.geomParseOpts.IndexGeometry = int(n) s.geomParseOpts.IndexGeometry = int(n)
server.geomIndexOpts.MinPoints = int(n) s.geomIndexOpts.MinPoints = int(n)
} }
n, err = strconv.ParseUint(os.Getenv("T38IDXMULTI"), 10, 32) n, err = strconv.ParseUint(os.Getenv("T38IDXMULTI"), 10, 32)
if err == nil { if err == nil {
server.geomParseOpts.IndexChildren = int(n) s.geomParseOpts.IndexChildren = int(n)
} }
requireValid := os.Getenv("REQUIREVALID") requireValid := os.Getenv("REQUIREVALID")
if requireValid != "" { if requireValid != "" {
server.geomParseOpts.RequireValid = true s.geomParseOpts.RequireValid = true
} }
indexKind := os.Getenv("T38IDXGEOMKIND") indexKind := os.Getenv("T38IDXGEOMKIND")
switch indexKind { switch indexKind {
@ -225,31 +225,31 @@ func Serve(opts Options) error {
log.Errorf("Unknown index kind: %s", indexKind) log.Errorf("Unknown index kind: %s", indexKind)
case "": case "":
case "None": case "None":
server.geomParseOpts.IndexGeometryKind = geometry.None s.geomParseOpts.IndexGeometryKind = geometry.None
server.geomIndexOpts.Kind = geometry.None s.geomIndexOpts.Kind = geometry.None
case "RTree": case "RTree":
server.geomParseOpts.IndexGeometryKind = geometry.RTree s.geomParseOpts.IndexGeometryKind = geometry.RTree
server.geomIndexOpts.Kind = geometry.RTree s.geomIndexOpts.Kind = geometry.RTree
case "QuadTree": case "QuadTree":
server.geomParseOpts.IndexGeometryKind = geometry.QuadTree s.geomParseOpts.IndexGeometryKind = geometry.QuadTree
server.geomIndexOpts.Kind = geometry.QuadTree s.geomIndexOpts.Kind = geometry.QuadTree
} }
if server.geomParseOpts.IndexGeometryKind == geometry.None { if s.geomParseOpts.IndexGeometryKind == geometry.None {
log.Debugf("Geom indexing: %s", log.Debugf("Geom indexing: %s",
server.geomParseOpts.IndexGeometryKind, s.geomParseOpts.IndexGeometryKind,
) )
} else { } else {
log.Debugf("Geom indexing: %s (%d points)", log.Debugf("Geom indexing: %s (%d points)",
server.geomParseOpts.IndexGeometryKind, s.geomParseOpts.IndexGeometryKind,
server.geomParseOpts.IndexGeometry, s.geomParseOpts.IndexGeometry,
) )
} }
log.Debugf("Multi indexing: RTree (%d points)", server.geomParseOpts.IndexChildren) log.Debugf("Multi indexing: RTree (%d points)", s.geomParseOpts.IndexChildren)
nerr := make(chan error) nerr := make(chan error)
go func() { go func() {
// Start the server in the background // Start the server in the background
nerr <- server.netServe() nerr <- s.netServe()
}() }()
// Load the queue before the aof // Load the queue before the aof
@ -276,9 +276,9 @@ func Serve(opts Options) error {
return err return err
} }
server.qdb = qdb s.qdb = qdb
server.qidx = qidx s.qidx = qidx
if err := server.migrateAOF(); err != nil { if err := s.migrateAOF(); err != nil {
return err return err
} }
if core.AppendOnly { if core.AppendOnly {
@ -286,75 +286,75 @@ func Serve(opts Options) error {
if err != nil { if err != nil {
return err return err
} }
server.aof = f s.aof = f
if err := server.loadAOF(); err != nil { if err := s.loadAOF(); err != nil {
return err return err
} }
defer func() { defer func() {
server.flushAOF(false) s.flushAOF(false)
server.aof.Sync() s.aof.Sync()
}() }()
} }
// Start background routines // Start background routines
if server.config.followHost() != "" { if s.config.followHost() != "" {
go server.follow(server.config.followHost(), server.config.followPort(), go s.follow(s.config.followHost(), s.config.followPort(),
server.followc.get()) s.followc.get())
} }
if opts.MetricsAddr != "" { if opts.MetricsAddr != "" {
log.Infof("Listening for metrics at: %s", opts.MetricsAddr) log.Infof("Listening for metrics at: %s", opts.MetricsAddr)
go func() { go func() {
http.HandleFunc("/", server.MetricsIndexHandler) http.HandleFunc("/", s.MetricsIndexHandler)
http.HandleFunc("/metrics", server.MetricsHandler) http.HandleFunc("/metrics", s.MetricsHandler)
log.Fatal(http.ListenAndServe(opts.MetricsAddr, nil)) log.Fatal(http.ListenAndServe(opts.MetricsAddr, nil))
}() }()
} }
go server.processLives() go s.processLives()
go server.watchOutOfMemory() go s.watchOutOfMemory()
go server.watchLuaStatePool() go s.watchLuaStatePool()
go server.watchAutoGC() go s.watchAutoGC()
go server.backgroundExpiring() go s.backgroundExpiring()
go server.backgroundSyncAOF() go s.backgroundSyncAOF()
defer func() { defer func() {
// Stop background routines // Stop background routines
server.followc.add(1) // this will force any follow communication to die s.followc.add(1) // this will force any follow communication to die
server.stopServer.set(true) s.stopServer.set(true)
// notify the live geofence connections that we are stopping. // notify the live geofence connections that we are stopping.
server.lcond.L.Lock() s.lcond.L.Lock()
server.lcond.Wait() s.lcond.Wait()
server.lcond.L.Lock() s.lcond.L.Lock()
}() }()
// Server is now loaded and ready. Wait for network error messages. // Server is now loaded and ready. Wait for network error messages.
server.loadedAndReady.set(true) s.loadedAndReady.set(true)
return <-nerr return <-nerr
} }
func (server *Server) isProtected() bool { func (s *Server) isProtected() bool {
if core.ProtectedMode == "no" { if core.ProtectedMode == "no" {
// --protected-mode no // --protected-mode no
return false return false
} }
if server.host != "" && server.host != "127.0.0.1" && if s.host != "" && s.host != "127.0.0.1" &&
server.host != "::1" && server.host != "localhost" { s.host != "::1" && s.host != "localhost" {
// -h address // -h address
return false return false
} }
is := server.config.protectedMode() != "no" && server.config.requirePass() == "" is := s.config.protectedMode() != "no" && s.config.requirePass() == ""
return is return is
} }
func (server *Server) netServe() error { func (s *Server) netServe() error {
var ln net.Listener var ln net.Listener
var err error var err error
if server.unix != "" { if s.unix != "" {
os.RemoveAll(server.unix) os.RemoveAll(s.unix)
ln, err = net.Listen("unix", server.unix) ln, err = net.Listen("unix", s.unix)
} else { } else {
tcpAddr := fmt.Sprintf("%s:%d", server.host, server.port) tcpAddr := fmt.Sprintf("%s:%d", s.host, s.port)
ln, err = net.Listen("tcp", tcpAddr) ln, err = net.Listen("tcp", tcpAddr)
} }
if err != nil { if err != nil {
@ -378,17 +378,17 @@ func (server *Server) netServe() error {
client.remoteAddr = conn.RemoteAddr().String() client.remoteAddr = conn.RemoteAddr().String()
// add client to server map // add client to server map
server.connsmu.Lock() s.connsmu.Lock()
server.conns[client.id] = client s.conns[client.id] = client
server.connsmu.Unlock() s.connsmu.Unlock()
server.statsTotalConns.add(1) s.statsTotalConns.add(1)
// set the client keep-alive, if needed // set the client keep-alive, if needed
if server.config.keepAlive() > 0 { if s.config.keepAlive() > 0 {
if conn, ok := conn.(*net.TCPConn); ok { if conn, ok := conn.(*net.TCPConn); ok {
conn.SetKeepAlive(true) conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod( conn.SetKeepAlivePeriod(
time.Duration(server.config.keepAlive()) * time.Second, time.Duration(s.config.keepAlive()) * time.Second,
) )
} }
} }
@ -397,9 +397,9 @@ func (server *Server) netServe() error {
defer func() { defer func() {
// close connection // close connection
// delete from server map // delete from server map
server.connsmu.Lock() s.connsmu.Lock()
delete(server.conns, client.id) delete(s.conns, client.id)
server.connsmu.Unlock() s.connsmu.Unlock()
log.Debugf("Closed connection: %s", client.remoteAddr) log.Debugf("Closed connection: %s", client.remoteAddr)
conn.Close() conn.Close()
}() }()
@ -410,7 +410,7 @@ func (server *Server) netServe() error {
// check if the connection is protected // check if the connection is protected
if !strings.HasPrefix(client.remoteAddr, "127.0.0.1:") && if !strings.HasPrefix(client.remoteAddr, "127.0.0.1:") &&
!strings.HasPrefix(client.remoteAddr, "[::1]:") { !strings.HasPrefix(client.remoteAddr, "[::1]:") {
if server.isProtected() { if s.isProtected() {
// This is a protected server. Only loopback is allowed. // This is a protected server. Only loopback is allowed.
conn.Write(deniedMessage) conn.Write(deniedMessage)
return // close connection return // close connection
@ -437,7 +437,7 @@ func (server *Server) netServe() error {
for _, msg := range msgs { for _, msg := range msgs {
// Just closing connection if we have deprecated HTTP or WS connection, // Just closing connection if we have deprecated HTTP or WS connection,
// And --http-transport = false // And --http-transport = false
if !server.http && (msg.ConnType == WebSocket || if !s.http && (msg.ConnType == WebSocket ||
msg.ConnType == HTTP) { msg.ConnType == HTTP) {
close = true // close connection close = true // close connection
break break
@ -460,10 +460,10 @@ func (server *Server) netServe() error {
client.mu.Unlock() client.mu.Unlock()
// update total command count // update total command count
server.statsTotalCommands.add(1) s.statsTotalCommands.add(1)
// handle the command // handle the command
err := server.handleInputCommand(client, msg) err := s.handleInputCommand(client, msg)
if err != nil { if err != nil {
if err.Error() == goingLive { if err.Error() == goingLive {
client.goLiveErr = err client.goLiveErr = err
@ -484,7 +484,7 @@ func (server *Server) netServe() error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
err := server.goLive( err := s.goLive(
client.goLiveErr, client.goLiveErr,
&liveConn{conn.RemoteAddr(), rwc}, &liveConn{conn.RemoteAddr(), rwc},
&client.pr, &client.pr,
@ -520,14 +520,14 @@ func (server *Server) netServe() error {
// write to client // write to client
if len(client.out) > 0 { if len(client.out) > 0 {
if atomic.LoadInt32(&server.aofdirty) != 0 { if atomic.LoadInt32(&s.aofdirty) != 0 {
func() { func() {
// prewrite // prewrite
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
server.flushAOF(false) s.flushAOF(false)
}() }()
atomic.StoreInt32(&server.aofdirty, 0) atomic.StoreInt32(&s.aofdirty, 0)
} }
conn.Write(client.out) conn.Write(client.out)
client.out = nil client.out = nil
@ -592,19 +592,19 @@ func (conn *liveConn) SetWriteDeadline(deadline time.Time) error {
panic("not supported") panic("not supported")
} }
func (server *Server) watchAutoGC() { func (s *Server) watchAutoGC() {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
s := time.Now() start := time.Now()
for range t.C { for range t.C {
if server.stopServer.on() { if s.stopServer.on() {
return return
} }
autoGC := server.config.autoGC() autoGC := s.config.autoGC()
if autoGC == 0 { if autoGC == 0 {
continue continue
} }
if time.Since(s) < time.Second*time.Duration(autoGC) { if time.Since(start) < time.Second*time.Duration(autoGC) {
continue continue
} }
var mem1, mem2 runtime.MemStats var mem1, mem2 runtime.MemStats
@ -619,23 +619,23 @@ func (server *Server) watchAutoGC() {
log.Debugf("autogc(after): "+ log.Debugf("autogc(after): "+
"alloc: %v, heap_alloc: %v, heap_released: %v", "alloc: %v, heap_alloc: %v, heap_released: %v",
mem2.Alloc, mem2.HeapAlloc, mem2.HeapReleased) mem2.Alloc, mem2.HeapAlloc, mem2.HeapReleased)
s = time.Now() start = time.Now()
} }
} }
func (server *Server) watchOutOfMemory() { func (s *Server) watchOutOfMemory() {
t := time.NewTicker(time.Second * 2) t := time.NewTicker(time.Second * 2)
defer t.Stop() defer t.Stop()
var mem runtime.MemStats var mem runtime.MemStats
for range t.C { for range t.C {
func() { func() {
if server.stopServer.on() { if s.stopServer.on() {
return return
} }
oom := server.outOfMemory.on() oom := s.outOfMemory.on()
if server.config.maxMemory() == 0 { if s.config.maxMemory() == 0 {
if oom { if oom {
server.outOfMemory.set(false) s.outOfMemory.set(false)
} }
return return
} }
@ -643,33 +643,33 @@ func (server *Server) watchOutOfMemory() {
runtime.GC() runtime.GC()
} }
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
server.outOfMemory.set(int(mem.HeapAlloc) > server.config.maxMemory()) s.outOfMemory.set(int(mem.HeapAlloc) > s.config.maxMemory())
}() }()
} }
} }
func (server *Server) watchLuaStatePool() { func (s *Server) watchLuaStatePool() {
t := time.NewTicker(time.Second * 10) t := time.NewTicker(time.Second * 10)
defer t.Stop() defer t.Stop()
for range t.C { for range t.C {
func() { func() {
server.luapool.Prune() s.luapool.Prune()
}() }()
} }
} }
// backgroundSyncAOF ensures that the aof buffer is does not grow too big. // backgroundSyncAOF ensures that the aof buffer is does not grow too big.
func (server *Server) backgroundSyncAOF() { func (s *Server) backgroundSyncAOF() {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
for range t.C { for range t.C {
if server.stopServer.on() { if s.stopServer.on() {
return return
} }
func() { func() {
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
server.flushAOF(true) s.flushAOF(true)
}() }()
} }
} }
@ -686,21 +686,21 @@ func byCollectionKey(a, b interface{}) bool {
return a.(*collectionKeyContainer).key < b.(*collectionKeyContainer).key return a.(*collectionKeyContainer).key < b.(*collectionKeyContainer).key
} }
func (server *Server) setCol(key string, col *collection.Collection) { func (s *Server) setCol(key string, col *collection.Collection) {
server.cols.Set(&collectionKeyContainer{key, col}) s.cols.Set(&collectionKeyContainer{key, col})
} }
func (server *Server) getCol(key string) *collection.Collection { func (s *Server) getCol(key string) *collection.Collection {
if v := server.cols.Get(&collectionKeyContainer{key: key}); v != nil { if v := s.cols.Get(&collectionKeyContainer{key: key}); v != nil {
return v.(*collectionKeyContainer).col return v.(*collectionKeyContainer).col
} }
return nil return nil
} }
func (server *Server) scanGreaterOrEqual( func (s *Server) scanGreaterOrEqual(
key string, iterator func(key string, col *collection.Collection) bool, key string, iterator func(key string, col *collection.Collection) bool,
) { ) {
server.cols.Ascend(&collectionKeyContainer{key: key}, s.cols.Ascend(&collectionKeyContainer{key: key},
func(v interface{}) bool { func(v interface{}) bool {
vcol := v.(*collectionKeyContainer) vcol := v.(*collectionKeyContainer)
return iterator(vcol.key, vcol.col) return iterator(vcol.key, vcol.col)
@ -708,8 +708,8 @@ func (server *Server) scanGreaterOrEqual(
) )
} }
func (server *Server) deleteCol(key string) *collection.Collection { func (s *Server) deleteCol(key string) *collection.Collection {
if v := server.cols.Delete(&collectionKeyContainer{key: key}); v != nil { if v := s.cols.Delete(&collectionKeyContainer{key: key}); v != nil {
return v.(*collectionKeyContainer).col return v.(*collectionKeyContainer).col
} }
return nil return nil
@ -743,7 +743,7 @@ func rewriteTimeoutMsg(msg *Message) (err error) {
return return
} }
func (server *Server) handleInputCommand(client *Client, msg *Message) error { func (s *Server) handleInputCommand(client *Client, msg *Message) error {
start := time.Now() start := time.Now()
serializeOutput := func(res resp.Value) (string, error) { serializeOutput := func(res resp.Value) (string, error) {
var resStr string var resStr string
@ -768,7 +768,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
return WriteWebSocketMessage(client, []byte(res)) return WriteWebSocketMessage(client, []byte(res))
case HTTP: case HTTP:
status := "200 OK" status := "200 OK"
if (server.http500Errors || msg._command == "healthz") && if (s.http500Errors || msg._command == "healthz") &&
!gjson.Get(res, "ok").Bool() { !gjson.Get(res, "ok").Bool() {
status = "500 Internal Server Error" status = "500 Internal Server Error"
} }
@ -821,7 +821,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
} }
return writeOutput("+PONG\r\n") return writeOutput("+PONG\r\n")
} }
server.sendMonitor(nil, msg, client, false) s.sendMonitor(nil, msg, client, false)
return nil return nil
} }
@ -853,7 +853,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
return nil return nil
} }
if !server.loadedAndReady.on() { if !s.loadedAndReady.on() {
switch msg.Command() { switch msg.Command() {
case "output", "ping", "echo": case "output", "ping", "echo":
default: default:
@ -870,7 +870,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
var write bool var write bool
if (!client.authd || cmd == "auth") && cmd != "output" { if (!client.authd || cmd == "auth") && cmd != "output" {
if server.config.requirePass() != "" { if s.config.requirePass() != "" {
password := "" password := ""
// This better be an AUTH command or the Message should contain an Auth // This better be an AUTH command or the Message should contain an Auth
if cmd != "auth" && msg.Auth == "" { if cmd != "auth" && msg.Auth == "" {
@ -884,7 +884,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
password = msg.Args[1] password = msg.Args[1]
} }
} }
if server.config.requirePass() != strings.TrimSpace(password) { if s.config.requirePass() != strings.TrimSpace(password) {
return writeErr("invalid password") return writeErr("invalid password")
} }
client.authd = true client.authd = true
@ -900,30 +900,30 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
// choose the locking strategy // choose the locking strategy
switch msg.Command() { switch msg.Command() {
default: default:
server.mu.RLock() s.mu.RLock()
defer server.mu.RUnlock() defer s.mu.RUnlock()
case "set", "del", "drop", "fset", "flushdb", case "set", "del", "drop", "fset", "flushdb",
"setchan", "pdelchan", "delchan", "setchan", "pdelchan", "delchan",
"sethook", "pdelhook", "delhook", "sethook", "pdelhook", "delhook",
"expire", "persist", "jset", "pdel", "rename", "renamenx": "expire", "persist", "jset", "pdel", "rename", "renamenx":
// write operations // write operations
write = true write = true
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
if server.config.followHost() != "" { if s.config.followHost() != "" {
return writeErr("not the leader") return writeErr("not the leader")
} }
if server.config.readOnly() { if s.config.readOnly() {
return writeErr("read only") return writeErr("read only")
} }
case "eval", "evalsha": case "eval", "evalsha":
// write operations (potentially) but no AOF for the script command itself // write operations (potentially) but no AOF for the script command itself
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
if server.config.followHost() != "" { if s.config.followHost() != "" {
return writeErr("not the leader") return writeErr("not the leader")
} }
if server.config.readOnly() { if s.config.readOnly() {
return writeErr("read only") return writeErr("read only")
} }
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", case "get", "keys", "scan", "nearby", "within", "intersects", "hooks",
@ -931,16 +931,16 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
"evalro", "evalrosha", "healthz": "evalro", "evalrosha", "healthz":
// read operations // read operations
server.mu.RLock() s.mu.RLock()
defer server.mu.RUnlock() defer s.mu.RUnlock()
if server.config.followHost() != "" && !server.fcuponce { if s.config.followHost() != "" && !s.fcuponce {
return writeErr("catching up to leader") return writeErr("catching up to leader")
} }
case "follow", "slaveof", "replconf", "readonly", "config": case "follow", "slaveof", "replconf", "readonly", "config":
// system operations // system operations
// does not write to aof, but requires a write lock. // does not write to aof, but requires a write lock.
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
case "output": case "output":
// this is local connection operation. Locks not needed. // this is local connection operation. Locks not needed.
case "echo": case "echo":
@ -948,18 +948,18 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
// dev operation // dev operation
case "sleep": case "sleep":
// dev operation // dev operation
server.mu.RLock() s.mu.RLock()
defer server.mu.RUnlock() defer s.mu.RUnlock()
case "shutdown": case "shutdown":
// dev operation // dev operation
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
case "aofshrink": case "aofshrink":
server.mu.RLock() s.mu.RLock()
defer server.mu.RUnlock() defer s.mu.RUnlock()
case "client": case "client":
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
case "evalna", "evalnasha": case "evalna", "evalnasha":
// No locking for scripts, otherwise writes cannot happen within scripts // No locking for scripts, otherwise writes cannot happen within scripts
case "subscribe", "psubscribe", "publish": case "subscribe", "psubscribe", "publish":
@ -987,7 +987,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
} }
}() }()
} }
res, d, err = server.command(msg, client) res, d, err = s.command(msg, client)
if msg.Deadline != nil { if msg.Deadline != nil {
msg.Deadline.Check() msg.Deadline.Check()
} }
@ -1003,7 +1003,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
return writeErr(err.Error()) return writeErr(err.Error())
} }
if write { if write {
if err := server.writeAOF(msg.Args, &d); err != nil { if err := s.writeAOF(msg.Args, &d); err != nil {
if _, ok := err.(errAOFHook); ok { if _, ok := err.(errAOFHook); ok {
return writeErr(err.Error()) return writeErr(err.Error())
} }
@ -1040,55 +1040,55 @@ func randomKey(n int) string {
return fmt.Sprintf("%x", b) return fmt.Sprintf("%x", b)
} }
func (server *Server) reset() { func (s *Server) reset() {
server.aofsz = 0 s.aofsz = 0
server.cols = btree.NewNonConcurrent(byCollectionKey) s.cols = btree.NewNonConcurrent(byCollectionKey)
} }
func (server *Server) command(msg *Message, client *Client) ( func (s *Server) command(msg *Message, client *Client) (
res resp.Value, d commandDetails, err error, res resp.Value, d commandDetails, err error,
) { ) {
switch msg.Command() { switch msg.Command() {
default: default:
err = fmt.Errorf("unknown command '%s'", msg.Args[0]) err = fmt.Errorf("unknown command '%s'", msg.Args[0])
case "set": case "set":
res, d, err = server.cmdSet(msg) res, d, err = s.cmdSet(msg)
case "fset": case "fset":
res, d, err = server.cmdFset(msg) res, d, err = s.cmdFset(msg)
case "del": case "del":
res, d, err = server.cmdDel(msg) res, d, err = s.cmdDel(msg)
case "pdel": case "pdel":
res, d, err = server.cmdPdel(msg) res, d, err = s.cmdPdel(msg)
case "drop": case "drop":
res, d, err = server.cmdDrop(msg) res, d, err = s.cmdDrop(msg)
case "flushdb": case "flushdb":
res, d, err = server.cmdFlushDB(msg) res, d, err = s.cmdFlushDB(msg)
case "rename": case "rename":
res, d, err = server.cmdRename(msg) res, d, err = s.cmdRename(msg)
case "renamenx": case "renamenx":
res, d, err = server.cmdRename(msg) res, d, err = s.cmdRename(msg)
case "sethook": case "sethook":
res, d, err = server.cmdSetHook(msg) res, d, err = s.cmdSetHook(msg)
case "delhook": case "delhook":
res, d, err = server.cmdDelHook(msg) res, d, err = s.cmdDelHook(msg)
case "pdelhook": case "pdelhook":
res, d, err = server.cmdPDelHook(msg) res, d, err = s.cmdPDelHook(msg)
case "hooks": case "hooks":
res, err = server.cmdHooks(msg) res, err = s.cmdHooks(msg)
case "setchan": case "setchan":
res, d, err = server.cmdSetHook(msg) res, d, err = s.cmdSetHook(msg)
case "delchan": case "delchan":
res, d, err = server.cmdDelHook(msg) res, d, err = s.cmdDelHook(msg)
case "pdelchan": case "pdelchan":
res, d, err = server.cmdPDelHook(msg) res, d, err = s.cmdPDelHook(msg)
case "chans": case "chans":
res, err = server.cmdHooks(msg) res, err = s.cmdHooks(msg)
case "expire": case "expire":
res, d, err = server.cmdExpire(msg) res, d, err = s.cmdExpire(msg)
case "persist": case "persist":
res, d, err = server.cmdPersist(msg) res, d, err = s.cmdPersist(msg)
case "ttl": case "ttl":
res, err = server.cmdTTL(msg) res, err = s.cmdTTL(msg)
case "shutdown": case "shutdown":
if !core.DevMode { if !core.DevMode {
err = fmt.Errorf("unknown command '%s'", msg.Args[0]) err = fmt.Errorf("unknown command '%s'", msg.Args[0])
@ -1100,70 +1100,70 @@ func (server *Server) command(msg *Message, client *Client) (
err = fmt.Errorf("unknown command '%s'", msg.Args[0]) err = fmt.Errorf("unknown command '%s'", msg.Args[0])
return return
} }
res, err = server.cmdMassInsert(msg) res, err = s.cmdMassInsert(msg)
case "sleep": case "sleep":
if !core.DevMode { if !core.DevMode {
err = fmt.Errorf("unknown command '%s'", msg.Args[0]) err = fmt.Errorf("unknown command '%s'", msg.Args[0])
return return
} }
res, err = server.cmdSleep(msg) res, err = s.cmdSleep(msg)
case "follow", "slaveof": case "follow", "slaveof":
res, err = server.cmdFollow(msg) res, err = s.cmdFollow(msg)
case "replconf": case "replconf":
res, err = server.cmdReplConf(msg, client) res, err = s.cmdReplConf(msg, client)
case "readonly": case "readonly":
res, err = server.cmdReadOnly(msg) res, err = s.cmdReadOnly(msg)
case "stats": case "stats":
res, err = server.cmdStats(msg) res, err = s.cmdStats(msg)
case "server": case "server":
res, err = server.cmdServer(msg) res, err = s.cmdServer(msg)
case "healthz": case "healthz":
res, err = server.cmdHealthz(msg) res, err = s.cmdHealthz(msg)
case "info": case "info":
res, err = server.cmdInfo(msg) res, err = s.cmdInfo(msg)
case "scan": case "scan":
res, err = server.cmdScan(msg) res, err = s.cmdScan(msg)
case "nearby": case "nearby":
res, err = server.cmdNearby(msg) res, err = s.cmdNearby(msg)
case "within": case "within":
res, err = server.cmdWithin(msg) res, err = s.cmdWithin(msg)
case "intersects": case "intersects":
res, err = server.cmdIntersects(msg) res, err = s.cmdIntersects(msg)
case "search": case "search":
res, err = server.cmdSearch(msg) res, err = s.cmdSearch(msg)
case "bounds": case "bounds":
res, err = server.cmdBounds(msg) res, err = s.cmdBounds(msg)
case "get": case "get":
res, err = server.cmdGet(msg) res, err = s.cmdGet(msg)
case "jget": case "jget":
res, err = server.cmdJget(msg) res, err = s.cmdJget(msg)
case "jset": case "jset":
res, d, err = server.cmdJset(msg) res, d, err = s.cmdJset(msg)
case "jdel": case "jdel":
res, d, err = server.cmdJdel(msg) res, d, err = s.cmdJdel(msg)
case "type": case "type":
res, err = server.cmdType(msg) res, err = s.cmdType(msg)
case "keys": case "keys":
res, err = server.cmdKeys(msg) res, err = s.cmdKeys(msg)
case "output": case "output":
res, err = server.cmdOutput(msg) res, err = s.cmdOutput(msg)
case "aof": case "aof":
res, err = server.cmdAOF(msg) res, err = s.cmdAOF(msg)
case "aofmd5": case "aofmd5":
res, err = server.cmdAOFMD5(msg) res, err = s.cmdAOFMD5(msg)
case "gc": case "gc":
runtime.GC() runtime.GC()
debug.FreeOSMemory() debug.FreeOSMemory()
res = OKMessage(msg, time.Now()) res = OKMessage(msg, time.Now())
case "aofshrink": case "aofshrink":
go server.aofshrink() go s.aofshrink()
res = OKMessage(msg, time.Now()) res = OKMessage(msg, time.Now())
case "config get": case "config get":
res, err = server.cmdConfigGet(msg) res, err = s.cmdConfigGet(msg)
case "config set": case "config set":
res, err = server.cmdConfigSet(msg) res, err = s.cmdConfigSet(msg)
case "config rewrite": case "config rewrite":
res, err = server.cmdConfigRewrite(msg) res, err = s.cmdConfigRewrite(msg)
case "config", "script": case "config", "script":
// These get rewritten into "config foo" and "script bar" // These get rewritten into "config foo" and "script bar"
err = fmt.Errorf("unknown command '%s'", msg.Args[0]) err = fmt.Errorf("unknown command '%s'", msg.Args[0])
@ -1171,32 +1171,32 @@ func (server *Server) command(msg *Message, client *Client) (
msg.Args[1] = msg.Args[0] + " " + msg.Args[1] msg.Args[1] = msg.Args[0] + " " + msg.Args[1]
msg.Args = msg.Args[1:] msg.Args = msg.Args[1:]
msg._command = "" msg._command = ""
return server.command(msg, client) return s.command(msg, client)
} }
case "client": case "client":
res, err = server.cmdClient(msg, client) res, err = s.cmdClient(msg, client)
case "eval", "evalro", "evalna": case "eval", "evalro", "evalna":
res, err = server.cmdEvalUnified(false, msg) res, err = s.cmdEvalUnified(false, msg)
case "evalsha", "evalrosha", "evalnasha": case "evalsha", "evalrosha", "evalnasha":
res, err = server.cmdEvalUnified(true, msg) res, err = s.cmdEvalUnified(true, msg)
case "script load": case "script load":
res, err = server.cmdScriptLoad(msg) res, err = s.cmdScriptLoad(msg)
case "script exists": case "script exists":
res, err = server.cmdScriptExists(msg) res, err = s.cmdScriptExists(msg)
case "script flush": case "script flush":
res, err = server.cmdScriptFlush(msg) res, err = s.cmdScriptFlush(msg)
case "subscribe": case "subscribe":
res, err = server.cmdSubscribe(msg) res, err = s.cmdSubscribe(msg)
case "psubscribe": case "psubscribe":
res, err = server.cmdPsubscribe(msg) res, err = s.cmdPsubscribe(msg)
case "publish": case "publish":
res, err = server.cmdPublish(msg) res, err = s.cmdPublish(msg)
case "test": case "test":
res, err = server.cmdTest(msg) res, err = s.cmdTest(msg)
case "monitor": case "monitor":
res, err = server.cmdMonitor(msg) res, err = s.cmdMonitor(msg)
} }
server.sendMonitor(err, msg, client, false) s.sendMonitor(err, msg, client, false)
return return
} }