Allow some basic client commands before AOF data loads

This commit accepts incoming connections even before the AOF
dataset has been loaded into memory. Though only a very limited
command set is allowed.

Allowed commands:
  PING, ECHO, OUTPUT, QUIT

All other commands will return:
  LOADING Tile38 is loading the dataset in memory

This is useful for establishing connections for the purpose of
checking process and network state.
This commit is contained in:
tidwall 2021-10-01 17:18:07 -07:00
parent a3c808e97f
commit 9e552c3629
1 changed files with 33 additions and 5 deletions

View File

@ -94,6 +94,7 @@ type Server struct {
lastShrinkDuration aint lastShrinkDuration aint
stopServer abool stopServer abool
outOfMemory abool outOfMemory abool
loadedAndReady abool // server is loaded and ready for commands
connsmu sync.RWMutex connsmu sync.RWMutex
conns map[int]*Client conns map[int]*Client
@ -245,6 +246,12 @@ func Serve(opts Options) error {
} }
log.Debugf("Multi indexing: RTree (%d points)", server.geomParseOpts.IndexChildren) log.Debugf("Multi indexing: RTree (%d points)", server.geomParseOpts.IndexChildren)
nerr := make(chan error)
go func() {
// Start the server in the background
nerr <- server.netServe()
}()
// Load the queue before the aof // Load the queue before the aof
qdb, err := buntdb.Open(core.QueueFileName) qdb, err := buntdb.Open(core.QueueFileName)
if err != nil { if err != nil {
@ -288,7 +295,6 @@ func Serve(opts Options) error {
server.aof.Sync() server.aof.Sync()
}() }()
} }
// server.fillExpiresList()
// Start background routines // Start background routines
if server.config.followHost() != "" { if server.config.followHost() != "" {
@ -322,8 +328,9 @@ func Serve(opts Options) error {
server.lcond.L.Lock() server.lcond.L.Lock()
}() }()
// Start the network server // Server is now loaded and ready. Wait for network error messages.
return server.netServe() server.loadedAndReady.set(true)
return <-nerr
} }
func (server *Server) isProtected() bool { func (server *Server) isProtected() bool {
@ -426,7 +433,6 @@ func (server *Server) netServe() error {
rdbuf := bytes.NewBuffer(packet) rdbuf := bytes.NewBuffer(packet)
pr.rd = rdbuf pr.rd = rdbuf
pr.wr = client pr.wr = client
msgs, err := pr.ReadMessages() msgs, err := pr.ReadMessages()
for _, msg := range msgs { for _, msg := range msgs {
// Just closing connection if we have deprecated HTTP or WS connection, // Just closing connection if we have deprecated HTTP or WS connection,
@ -827,12 +833,34 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
if errMsg == errInvalidNumberOfArguments.Error() { if errMsg == errInvalidNumberOfArguments.Error() {
return writeOutput("-ERR wrong number of arguments for '" + cmd + "' command\r\n") return writeOutput("-ERR wrong number of arguments for '" + cmd + "' command\r\n")
} }
v, _ := resp.ErrorValue(errors.New("ERR " + errMsg)).MarshalRESP() var ucprefix bool
word := strings.Split(errMsg, " ")[0]
if len(word) > 0 {
ucprefix = true
for i := 0; i < len(word); i++ {
if word[i] < 'A' || word[i] > 'Z' {
ucprefix = false
break
}
}
}
if !ucprefix {
errMsg = "ERR " + errMsg
}
v, _ := resp.ErrorValue(errors.New(errMsg)).MarshalRESP()
return writeOutput(string(v)) return writeOutput(string(v))
} }
return nil return nil
} }
if !server.loadedAndReady.on() {
switch msg.Command() {
case "output", "ping", "echo":
default:
return writeErr("LOADING Tile38 is loading the dataset in memory")
}
}
if cmd == "timeout" { if cmd == "timeout" {
if err := rewriteTimeoutMsg(msg); err != nil { if err := rewriteTimeoutMsg(msg); err != nil {
return writeErr(err.Error()) return writeErr(err.Error())