More graceful Tile38 shutdown

This commit is contained in:
tidwall 2022-09-25 06:28:17 -07:00
parent f2c3b3924a
commit 906824323b
19 changed files with 298 additions and 111 deletions

View File

@ -27,13 +27,21 @@ func (conn *AMQPConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > amqpExpiresAfter {
conn.ex = true
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *AMQPConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *AMQPConn) close() {
if conn.conn != nil {
conn.conn.Close()

View File

@ -33,15 +33,21 @@ func (conn *DisqueConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > disqueExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *DisqueConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *DisqueConn) close() {
if conn.conn != nil {
conn.conn.Close()

View File

@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/streadway/amqp"
@ -136,6 +137,7 @@ type Endpoint struct {
// Conn is an endpoint connection
type Conn interface {
ExpireNow()
Expired() bool
Send(val string) error
}
@ -145,6 +147,8 @@ type Manager struct {
mu sync.RWMutex
conns map[string]Conn
publisher LocalPublisher
shutdown int32 // atomic bool
wg sync.WaitGroup // run wait group
}
// NewManager returns a new manager
@ -153,13 +157,29 @@ func NewManager(publisher LocalPublisher) *Manager {
conns: make(map[string]Conn),
publisher: publisher,
}
go epc.Run()
epc.wg.Add(1)
go epc.run()
return epc
}
func (epc *Manager) Shutdown() {
defer epc.wg.Wait()
atomic.StoreInt32(&epc.shutdown, 1)
// expire the connections
epc.mu.Lock()
defer epc.mu.Unlock()
for _, conn := range epc.conns {
conn.ExpireNow()
}
}
// Run starts the managing of endpoints
func (epc *Manager) Run() {
func (epc *Manager) run() {
defer epc.wg.Done()
for {
if atomic.LoadInt32(&epc.shutdown) != 0 {
return
}
time.Sleep(time.Second)
func() {
epc.mu.Lock()

View File

@ -28,6 +28,10 @@ func (conn *EvenHubConn) Expired() bool {
return false
}
// ExpireNow forces the connection to expire
func (conn *EvenHubConn) ExpireNow() {
}
// Send sends a message
func (conn *EvenHubConn) Send(msg string) error {
hub, err := eventhub.NewHubFromConnectionString(conn.ep.EventHub.ConnectionString)

View File

@ -36,14 +36,21 @@ func (conn *GRPCConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > grpcExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *GRPCConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *GRPCConn) close() {
if conn.conn != nil {
conn.conn.Close()

View File

@ -38,6 +38,10 @@ func (conn *HTTPConn) Expired() bool {
return false
}
// ExpireNow forces the connection to expire
func (conn *HTTPConn) ExpireNow() {
}
// Send sends a message
func (conn *HTTPConn) Send(msg string) error {
req, err := http.NewRequest("POST", conn.ep.Original, bytes.NewBufferString(msg))

View File

@ -34,15 +34,21 @@ func (conn *KafkaConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > kafkaExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *KafkaConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *KafkaConn) close() {
if conn.conn != nil {
conn.conn.Close()

View File

@ -23,6 +23,10 @@ func (conn *LocalConn) Expired() bool {
return false
}
// ExpireNow forces the connection to expire
func (conn *LocalConn) ExpireNow() {
}
// Send sends a message
func (conn *LocalConn) Send(msg string) error {
conn.publisher.Publish(conn.ep.Local.Channel, msg)

View File

@ -40,12 +40,19 @@ func (conn *MQTTConn) Expired() bool {
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *MQTTConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *MQTTConn) close() {
if conn.conn != nil {
if conn.conn.IsConnected() {
conn.conn.Disconnect(250)
}
conn.conn = nil
}
}

View File

@ -32,15 +32,21 @@ func (conn *NATSConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > natsExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *NATSConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *NATSConn) close() {
if conn.conn != nil {
conn.conn.Close()

View File

@ -83,13 +83,21 @@ func (conn *PubSubConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > pubsubExpiresAfter {
conn.ex = true
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *PubSubConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func newPubSubConn(ep Endpoint) *PubSubConn {
return &PubSubConn{
ep: ep,

View File

@ -32,15 +32,21 @@ func (conn *RedisConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > redisExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *RedisConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *RedisConn) close() {
if conn.conn != nil {
conn.conn.Close()

View File

@ -39,13 +39,21 @@ func (conn *SQSConn) Expired() bool {
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > sqsExpiresAfter {
conn.ex = true
conn.close()
conn.ex = true
}
}
return conn.ex
}
// ExpireNow forces the connection to expire
func (conn *SQSConn) ExpireNow() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.close()
conn.ex = true
}
func (conn *SQSConn) close() {
if conn.svc != nil {
conn.svc = nil

View File

@ -1,6 +1,7 @@
package server
import (
"sync"
"time"
"github.com/tidwall/tile38/internal/collection"
@ -12,20 +13,15 @@ const bgExpireDelay = time.Second / 10
// backgroundExpiring deletes expired items from the database.
// It's executes every 1/10 of a second.
func (s *Server) backgroundExpiring() {
for {
if s.stopServer.on() {
return
}
func() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
s.backgroundExpireObjects(now)
s.backgroundExpireHooks(now)
}()
time.Sleep(bgExpireDelay)
}
func (s *Server) backgroundExpiring(wg *sync.WaitGroup) {
defer wg.Done()
s.loopUntilServerStops(bgExpireDelay, func() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
s.backgroundExpireObjects(now)
s.backgroundExpireHooks(now)
})
}
func (s *Server) backgroundExpireObjects(now time.Time) {

View File

@ -21,10 +21,16 @@ type liveBuffer struct {
cond *sync.Cond
}
func (s *Server) processLives() {
defer s.lwait.Done()
func (s *Server) processLives(wg *sync.WaitGroup) {
defer wg.Done()
var done abool
wg.Add(1)
go func() {
defer wg.Done()
for {
if done.on() {
break
}
s.lcond.Broadcast()
time.Sleep(time.Second / 4)
}
@ -33,6 +39,7 @@ func (s *Server) processLives() {
defer s.lcond.L.Unlock()
for {
if s.stopServer.on() {
done.set(true)
return
}
for len(s.lstack) > 0 {

View File

@ -79,6 +79,7 @@ type Server struct {
started time.Time
config *Config
epc *endpoint.Manager
ln net.Listener // server listener
// env opts
geomParseOpts geojson.ParseOptions
@ -114,7 +115,6 @@ type Server struct {
lstack []*commandDetails
lives map[*liveBuffer]bool
lcond *sync.Cond
lwait sync.WaitGroup
fcup bool // follow caught up
fcuponce bool // follow caught up once
shrinking bool // aof shrinking flag
@ -165,6 +165,9 @@ type Options struct {
// QueueFileName allows for custom queue.db file path
QueueFileName string
// Shutdown allows for shutting down the server.
Shutdown <-chan bool
}
// Serve starts a new tile38 server
@ -180,6 +183,15 @@ func Serve(opts Options) error {
}
log.Infof("Server started, Tile38 version %s, git %s", core.Version, core.GitSHA)
defer func() {
log.Warn("Server has shutdown, bye now")
if false {
// prints the stack, looking for running goroutines.
buf := make([]byte, 10000)
n := runtime.Stack(buf, true)
println(string(buf[:n]))
}
}()
// Initialize the s
s := &Server{
@ -210,6 +222,7 @@ func Serve(opts Options) error {
}
s.epc = endpoint.NewManager(s)
defer s.epc.Shutdown()
s.luascripts = s.newScriptMap()
s.luapool = s.newPool()
defer s.luapool.Shutdown()
@ -279,6 +292,13 @@ func Serve(opts Options) error {
nerr <- s.netServe()
}()
go func() {
<-opts.Shutdown
s.stopServer.set(true)
log.Warnf("Shutting down...")
s.ln.Close()
}()
// Load the queue before the aof
qdb, err := buntdb.Open(opts.QueueFileName)
if err != nil {
@ -324,32 +344,60 @@ func Serve(opts Options) error {
}
// Start background routines
if s.config.followHost() != "" {
go s.follow(s.config.followHost(), s.config.followPort(),
s.followc.get())
}
var bgwg sync.WaitGroup
if opts.MetricsAddr != "" {
log.Infof("Listening for metrics at: %s", opts.MetricsAddr)
if s.config.followHost() != "" {
bgwg.Add(1)
go func() {
http.HandleFunc("/", s.MetricsIndexHandler)
http.HandleFunc("/metrics", s.MetricsHandler)
log.Fatal(http.ListenAndServe(opts.MetricsAddr, nil))
defer bgwg.Done()
s.follow(s.config.followHost(), s.config.followPort(),
s.followc.get())
}()
}
s.lwait.Add(1)
go s.processLives()
go s.watchOutOfMemory()
go s.watchLuaStatePool()
go s.watchAutoGC()
go s.backgroundExpiring()
go s.backgroundSyncAOF()
var mln net.Listener
if opts.MetricsAddr != "" {
log.Infof("Listening for metrics at: %s", opts.MetricsAddr)
mln, err = net.Listen("tcp", opts.MetricsAddr)
if err != nil {
return err
}
bgwg.Add(1)
go func() {
defer bgwg.Done()
smux := http.NewServeMux()
smux.HandleFunc("/", s.MetricsIndexHandler)
smux.HandleFunc("/metrics", s.MetricsHandler)
err := http.Serve(mln, smux)
if err != nil {
if !s.stopServer.on() {
log.Fatalf("metrics server: %s", err)
}
}
}()
}
bgwg.Add(1)
go s.processLives(&bgwg)
bgwg.Add(1)
go s.watchOutOfMemory(&bgwg)
bgwg.Add(1)
go s.watchLuaStatePool(&bgwg)
bgwg.Add(1)
go s.watchAutoGC(&bgwg)
bgwg.Add(1)
go s.backgroundExpiring(&bgwg)
bgwg.Add(1)
go s.backgroundSyncAOF(&bgwg)
defer func() {
log.Debug("Stopping background routines")
// Stop background routines
s.followc.add(1) // this will force any follow communication to die
s.stopServer.set(true)
s.lwait.Wait()
if mln != nil {
mln.Close() // Stop the metrics server
}
bgwg.Wait()
}()
// Server is now loaded and ready. Wait for network error messages.
@ -384,16 +432,37 @@ func (s *Server) netServe() error {
if err != nil {
return err
}
defer ln.Close()
var wg sync.WaitGroup
defer func() {
log.Debug("Closing client connections...")
s.connsmu.RLock()
for _, c := range s.conns {
c.closer.Close()
}
s.connsmu.RUnlock()
wg.Wait()
ln.Close()
log.Debug("Client connection closed")
}()
s.ln = ln
log.Infof("Ready to accept connections at %s", ln.Addr())
var clientID int64
for {
conn, err := ln.Accept()
if err != nil {
return err
if s.stopServer.on() {
return nil
}
log.Warn(err)
time.Sleep(time.Second / 5)
continue
}
wg.Add(1)
go func(conn net.Conn) {
defer wg.Done()
// open connection
// create the client
client := new(Client)
@ -617,20 +686,16 @@ func (conn *liveConn) SetWriteDeadline(deadline time.Time) error {
panic("not supported")
}
func (s *Server) watchAutoGC() {
t := time.NewTicker(time.Second)
defer t.Stop()
func (s *Server) watchAutoGC(wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
for range t.C {
if s.stopServer.on() {
return
}
s.loopUntilServerStops(time.Second, func() {
autoGC := s.config.autoGC()
if autoGC == 0 {
continue
return
}
if time.Since(start) < time.Second*time.Duration(autoGC) {
continue
return
}
var mem1, mem2 runtime.MemStats
runtime.ReadMemStats(&mem1)
@ -645,7 +710,7 @@ func (s *Server) watchAutoGC() {
"alloc: %v, heap_alloc: %v, heap_released: %v",
mem2.Alloc, mem2.HeapAlloc, mem2.HeapReleased)
start = time.Now()
}
})
}
func (s *Server) checkOutOfMemory() {
@ -667,40 +732,45 @@ func (s *Server) checkOutOfMemory() {
s.outOfMemory.set(int(mem.HeapAlloc) > s.config.maxMemory())
}
func (s *Server) watchOutOfMemory() {
t := time.NewTicker(time.Second * 2)
defer t.Stop()
for range t.C {
s.checkOutOfMemory()
}
}
func (s *Server) watchLuaStatePool() {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
for range t.C {
func() {
s.luapool.Prune()
}()
}
}
// backgroundSyncAOF ensures that the aof buffer is does not grow too big.
func (s *Server) backgroundSyncAOF() {
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
func (s *Server) loopUntilServerStops(dur time.Duration, op func()) {
var last time.Time
for {
if s.stopServer.on() {
return
}
func() {
s.mu.Lock()
defer s.mu.Unlock()
s.flushAOF(true)
}()
now := time.Now()
if now.Sub(last) > dur {
op()
last = now
}
time.Sleep(time.Second / 5)
}
}
func (s *Server) watchOutOfMemory(wg *sync.WaitGroup) {
defer wg.Done()
s.loopUntilServerStops(time.Second*4, func() {
s.checkOutOfMemory()
})
}
func (s *Server) watchLuaStatePool(wg *sync.WaitGroup) {
defer wg.Done()
s.loopUntilServerStops(time.Second*10, func() {
s.luapool.Prune()
})
}
// backgroundSyncAOF ensures that the aof buffer is does not grow too big.
func (s *Server) backgroundSyncAOF(wg *sync.WaitGroup) {
defer wg.Done()
s.loopUntilServerStops(time.Second, func() {
s.mu.Lock()
defer s.mu.Unlock()
s.flushAOF(true)
})
}
func isReservedFieldName(field string) bool {
switch field {
case "z", "lat", "lon":

View File

@ -9,9 +9,8 @@ cd tests
go test -coverpkg=../internal/server -coverprofile=/tmp/coverage.out $GOTEST
# go test \
# -coverpkg=../internal/... -coverprofile=/tmp/coverage.out \
# -v . -v ../... $GOTEST
# go test -coverpkg=../internal/... -coverprofile=/tmp/coverage.out \
# -v ./... $GOTEST
go tool cover -html=/tmp/coverage.out -o /tmp/coverage.html
echo "details: file:///tmp/coverage.html"

View File

@ -35,10 +35,11 @@ func mockCleanup(silent bool) {
}
type mockServer struct {
port int
conn redis.Conn
ioJSON bool
dir string
port int
conn redis.Conn
ioJSON bool
dir string
shutdown chan bool
}
func (mc *mockServer) readAOF() ([]byte, error) {
@ -71,24 +72,29 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) {
logOutput := io.Discard
if os.Getenv("PRINTLOG") == "1" {
logOutput = os.Stderr
tlog.Level = 3
}
s := &mockServer{port: port, dir: dir}
shutdown := make(chan bool)
s := &mockServer{port: port, dir: dir, shutdown: shutdown}
tlog.SetOutput(logOutput)
var ferrt int32 // atomic flag for when ferr has been set
var ferr error // ferr for when the server fails to start
go func() {
sopts := server.Options{
Host: "localhost",
Port: port,
Dir: dir,
UseHTTP: true,
DevMode: true,
AppendOnly: true,
Host: "localhost",
Port: port,
Dir: dir,
UseHTTP: true,
DevMode: true,
AppendOnly: true,
Shutdown: shutdown,
ShowDebugMessages: true,
}
if opts.Metrics {
sopts.MetricsAddr = ":4321"
}
if err := server.Serve(sopts); err != nil {
err := server.Serve(sopts)
if err != nil {
ferr = err
atomic.StoreInt32(&ferrt, 1)
}
@ -133,6 +139,7 @@ func (s *mockServer) waitForStartup(ferr *error, ferrt *int32) error {
}
func (mc *mockServer) Close() {
mc.shutdown <- true
if mc.conn != nil {
mc.conn.Close()
}

View File

@ -26,6 +26,18 @@ const (
white = "\x1b[37m"
)
// type mockTest struct {
// }
// func mockTestInit() *mockTest {
// mt := &mockTest{}
// return mt
// }
// func (mt *mockTest) Cleanup() {
// }
func TestAll(t *testing.T) {
mockCleanup(false)
defer mockCleanup(false)
@ -45,6 +57,8 @@ func TestAll(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// log.Printf("Waiting a second for everything to cleanly start...")
// time.Sleep(time.Second * 2)
defer mc.Close()
runSubTest(t, "keys", mc, subTestKeys)