diff --git a/internal/endpoint/amqp.go b/internal/endpoint/amqp.go index 546dee78..b69a22ef 100644 --- a/internal/endpoint/amqp.go +++ b/internal/endpoint/amqp.go @@ -27,13 +27,21 @@ func (conn *AMQPConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > amqpExpiresAfter { - conn.ex = true conn.close() + conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *AMQPConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *AMQPConn) close() { if conn.conn != nil { conn.conn.Close() diff --git a/internal/endpoint/disque.go b/internal/endpoint/disque.go index 35300ec6..664262f8 100644 --- a/internal/endpoint/disque.go +++ b/internal/endpoint/disque.go @@ -33,15 +33,21 @@ func (conn *DisqueConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > disqueExpiresAfter { - if conn.conn != nil { - conn.close() - } + conn.close() conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *DisqueConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *DisqueConn) close() { if conn.conn != nil { conn.conn.Close() diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 8e5d1274..f13feeb7 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/streadway/amqp" @@ -136,6 +137,7 @@ type Endpoint struct { // Conn is an endpoint connection type Conn interface { + ExpireNow() Expired() bool Send(val string) error } @@ -145,6 +147,8 @@ type Manager struct { mu sync.RWMutex conns map[string]Conn publisher LocalPublisher + shutdown int32 // atomic bool + wg sync.WaitGroup // run wait group } // NewManager returns a new manager @@ -153,13 +157,29 @@ func NewManager(publisher LocalPublisher) *Manager { conns: make(map[string]Conn), publisher: publisher, } - go epc.Run() + epc.wg.Add(1) + go epc.run() return epc } +func (epc *Manager) Shutdown() { + defer epc.wg.Wait() + atomic.StoreInt32(&epc.shutdown, 1) + // expire the connections + epc.mu.Lock() + defer epc.mu.Unlock() + for _, conn := range epc.conns { + conn.ExpireNow() + } +} + // Run starts the managing of endpoints -func (epc *Manager) Run() { +func (epc *Manager) run() { + defer epc.wg.Done() for { + if atomic.LoadInt32(&epc.shutdown) != 0 { + return + } time.Sleep(time.Second) func() { epc.mu.Lock() diff --git a/internal/endpoint/eventHub.go b/internal/endpoint/eventHub.go index 5de50616..83ea0775 100644 --- a/internal/endpoint/eventHub.go +++ b/internal/endpoint/eventHub.go @@ -28,6 +28,10 @@ func (conn *EvenHubConn) Expired() bool { return false } +// ExpireNow forces the connection to expire +func (conn *EvenHubConn) ExpireNow() { +} + // Send sends a message func (conn *EvenHubConn) Send(msg string) error { hub, err := eventhub.NewHubFromConnectionString(conn.ep.EventHub.ConnectionString) diff --git a/internal/endpoint/grpc.go b/internal/endpoint/grpc.go index 12de3d7e..d40a53fa 100644 --- a/internal/endpoint/grpc.go +++ b/internal/endpoint/grpc.go @@ -36,14 +36,21 @@ func (conn *GRPCConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > grpcExpiresAfter { - if conn.conn != nil { - conn.close() - } + conn.close() conn.ex = true } } return conn.ex } + +// ExpireNow forces the connection to expire +func (conn *GRPCConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *GRPCConn) close() { if conn.conn != nil { conn.conn.Close() diff --git a/internal/endpoint/http.go b/internal/endpoint/http.go index 52deba24..c89bb986 100644 --- a/internal/endpoint/http.go +++ b/internal/endpoint/http.go @@ -38,6 +38,10 @@ func (conn *HTTPConn) Expired() bool { return false } +// ExpireNow forces the connection to expire +func (conn *HTTPConn) ExpireNow() { +} + // Send sends a message func (conn *HTTPConn) Send(msg string) error { req, err := http.NewRequest("POST", conn.ep.Original, bytes.NewBufferString(msg)) diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index ef083380..2cf808ea 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -34,15 +34,21 @@ func (conn *KafkaConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > kafkaExpiresAfter { - if conn.conn != nil { - conn.close() - } + conn.close() conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *KafkaConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() diff --git a/internal/endpoint/local.go b/internal/endpoint/local.go index 1038fd79..2c1c7d6d 100644 --- a/internal/endpoint/local.go +++ b/internal/endpoint/local.go @@ -23,6 +23,10 @@ func (conn *LocalConn) Expired() bool { return false } +// ExpireNow forces the connection to expire +func (conn *LocalConn) ExpireNow() { +} + // Send sends a message func (conn *LocalConn) Send(msg string) error { conn.publisher.Publish(conn.ep.Local.Channel, msg) diff --git a/internal/endpoint/mqtt.go b/internal/endpoint/mqtt.go index 5f80765c..d7410649 100644 --- a/internal/endpoint/mqtt.go +++ b/internal/endpoint/mqtt.go @@ -40,12 +40,19 @@ func (conn *MQTTConn) Expired() bool { return conn.ex } +// ExpireNow forces the connection to expire +func (conn *MQTTConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *MQTTConn) close() { if conn.conn != nil { if conn.conn.IsConnected() { conn.conn.Disconnect(250) } - conn.conn = nil } } diff --git a/internal/endpoint/nats.go b/internal/endpoint/nats.go index 6c0c7815..ae254802 100644 --- a/internal/endpoint/nats.go +++ b/internal/endpoint/nats.go @@ -32,15 +32,21 @@ func (conn *NATSConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > natsExpiresAfter { - if conn.conn != nil { - conn.close() - } + conn.close() conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *NATSConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *NATSConn) close() { if conn.conn != nil { conn.conn.Close() diff --git a/internal/endpoint/pubsub.go b/internal/endpoint/pubsub.go index db6dac33..b4926474 100644 --- a/internal/endpoint/pubsub.go +++ b/internal/endpoint/pubsub.go @@ -83,13 +83,21 @@ func (conn *PubSubConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > pubsubExpiresAfter { - conn.ex = true conn.close() + conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *PubSubConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func newPubSubConn(ep Endpoint) *PubSubConn { return &PubSubConn{ ep: ep, diff --git a/internal/endpoint/redis.go b/internal/endpoint/redis.go index 8064d166..d5101b57 100644 --- a/internal/endpoint/redis.go +++ b/internal/endpoint/redis.go @@ -32,15 +32,21 @@ func (conn *RedisConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > redisExpiresAfter { - if conn.conn != nil { - conn.close() - } + conn.close() conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *RedisConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *RedisConn) close() { if conn.conn != nil { conn.conn.Close() diff --git a/internal/endpoint/sqs.go b/internal/endpoint/sqs.go index 8a1098b7..148f2a3f 100644 --- a/internal/endpoint/sqs.go +++ b/internal/endpoint/sqs.go @@ -39,13 +39,21 @@ func (conn *SQSConn) Expired() bool { defer conn.mu.Unlock() if !conn.ex { if time.Since(conn.t) > sqsExpiresAfter { - conn.ex = true conn.close() + conn.ex = true } } return conn.ex } +// ExpireNow forces the connection to expire +func (conn *SQSConn) ExpireNow() { + conn.mu.Lock() + defer conn.mu.Unlock() + conn.close() + conn.ex = true +} + func (conn *SQSConn) close() { if conn.svc != nil { conn.svc = nil diff --git a/internal/server/expire.go b/internal/server/expire.go index 39abd412..5f812913 100644 --- a/internal/server/expire.go +++ b/internal/server/expire.go @@ -1,6 +1,7 @@ package server import ( + "sync" "time" "github.com/tidwall/tile38/internal/collection" @@ -12,20 +13,15 @@ const bgExpireDelay = time.Second / 10 // backgroundExpiring deletes expired items from the database. // It's executes every 1/10 of a second. -func (s *Server) backgroundExpiring() { - for { - if s.stopServer.on() { - return - } - func() { - s.mu.Lock() - defer s.mu.Unlock() - now := time.Now() - s.backgroundExpireObjects(now) - s.backgroundExpireHooks(now) - }() - time.Sleep(bgExpireDelay) - } +func (s *Server) backgroundExpiring(wg *sync.WaitGroup) { + defer wg.Done() + s.loopUntilServerStops(bgExpireDelay, func() { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + s.backgroundExpireObjects(now) + s.backgroundExpireHooks(now) + }) } func (s *Server) backgroundExpireObjects(now time.Time) { diff --git a/internal/server/live.go b/internal/server/live.go index eb66ec64..338523b5 100644 --- a/internal/server/live.go +++ b/internal/server/live.go @@ -21,10 +21,16 @@ type liveBuffer struct { cond *sync.Cond } -func (s *Server) processLives() { - defer s.lwait.Done() +func (s *Server) processLives(wg *sync.WaitGroup) { + defer wg.Done() + var done abool + wg.Add(1) go func() { + defer wg.Done() for { + if done.on() { + break + } s.lcond.Broadcast() time.Sleep(time.Second / 4) } @@ -33,6 +39,7 @@ func (s *Server) processLives() { defer s.lcond.L.Unlock() for { if s.stopServer.on() { + done.set(true) return } for len(s.lstack) > 0 { diff --git a/internal/server/server.go b/internal/server/server.go index 7c19bac3..7a367592 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -79,6 +79,7 @@ type Server struct { started time.Time config *Config epc *endpoint.Manager + ln net.Listener // server listener // env opts geomParseOpts geojson.ParseOptions @@ -114,7 +115,6 @@ type Server struct { lstack []*commandDetails lives map[*liveBuffer]bool lcond *sync.Cond - lwait sync.WaitGroup fcup bool // follow caught up fcuponce bool // follow caught up once shrinking bool // aof shrinking flag @@ -165,6 +165,9 @@ type Options struct { // QueueFileName allows for custom queue.db file path QueueFileName string + + // Shutdown allows for shutting down the server. + Shutdown <-chan bool } // Serve starts a new tile38 server @@ -180,6 +183,15 @@ func Serve(opts Options) error { } log.Infof("Server started, Tile38 version %s, git %s", core.Version, core.GitSHA) + defer func() { + log.Warn("Server has shutdown, bye now") + if false { + // prints the stack, looking for running goroutines. + buf := make([]byte, 10000) + n := runtime.Stack(buf, true) + println(string(buf[:n])) + } + }() // Initialize the s s := &Server{ @@ -210,6 +222,7 @@ func Serve(opts Options) error { } s.epc = endpoint.NewManager(s) + defer s.epc.Shutdown() s.luascripts = s.newScriptMap() s.luapool = s.newPool() defer s.luapool.Shutdown() @@ -279,6 +292,13 @@ func Serve(opts Options) error { nerr <- s.netServe() }() + go func() { + <-opts.Shutdown + s.stopServer.set(true) + log.Warnf("Shutting down...") + s.ln.Close() + }() + // Load the queue before the aof qdb, err := buntdb.Open(opts.QueueFileName) if err != nil { @@ -324,32 +344,60 @@ func Serve(opts Options) error { } // Start background routines - if s.config.followHost() != "" { - go s.follow(s.config.followHost(), s.config.followPort(), - s.followc.get()) - } + var bgwg sync.WaitGroup - if opts.MetricsAddr != "" { - log.Infof("Listening for metrics at: %s", opts.MetricsAddr) + if s.config.followHost() != "" { + bgwg.Add(1) go func() { - http.HandleFunc("/", s.MetricsIndexHandler) - http.HandleFunc("/metrics", s.MetricsHandler) - log.Fatal(http.ListenAndServe(opts.MetricsAddr, nil)) + defer bgwg.Done() + s.follow(s.config.followHost(), s.config.followPort(), + s.followc.get()) }() } - s.lwait.Add(1) - go s.processLives() - go s.watchOutOfMemory() - go s.watchLuaStatePool() - go s.watchAutoGC() - go s.backgroundExpiring() - go s.backgroundSyncAOF() + var mln net.Listener + if opts.MetricsAddr != "" { + log.Infof("Listening for metrics at: %s", opts.MetricsAddr) + mln, err = net.Listen("tcp", opts.MetricsAddr) + if err != nil { + return err + } + bgwg.Add(1) + go func() { + defer bgwg.Done() + smux := http.NewServeMux() + smux.HandleFunc("/", s.MetricsIndexHandler) + smux.HandleFunc("/metrics", s.MetricsHandler) + err := http.Serve(mln, smux) + if err != nil { + if !s.stopServer.on() { + log.Fatalf("metrics server: %s", err) + } + } + }() + } + + bgwg.Add(1) + go s.processLives(&bgwg) + bgwg.Add(1) + go s.watchOutOfMemory(&bgwg) + bgwg.Add(1) + go s.watchLuaStatePool(&bgwg) + bgwg.Add(1) + go s.watchAutoGC(&bgwg) + bgwg.Add(1) + go s.backgroundExpiring(&bgwg) + bgwg.Add(1) + go s.backgroundSyncAOF(&bgwg) defer func() { + log.Debug("Stopping background routines") // Stop background routines s.followc.add(1) // this will force any follow communication to die s.stopServer.set(true) - s.lwait.Wait() + if mln != nil { + mln.Close() // Stop the metrics server + } + bgwg.Wait() }() // Server is now loaded and ready. Wait for network error messages. @@ -384,16 +432,37 @@ func (s *Server) netServe() error { if err != nil { return err } - defer ln.Close() + + var wg sync.WaitGroup + defer func() { + log.Debug("Closing client connections...") + s.connsmu.RLock() + for _, c := range s.conns { + c.closer.Close() + } + s.connsmu.RUnlock() + wg.Wait() + ln.Close() + log.Debug("Client connection closed") + }() + s.ln = ln + log.Infof("Ready to accept connections at %s", ln.Addr()) var clientID int64 for { conn, err := ln.Accept() if err != nil { - return err + if s.stopServer.on() { + return nil + } + log.Warn(err) + time.Sleep(time.Second / 5) + continue } - + wg.Add(1) go func(conn net.Conn) { + defer wg.Done() + // open connection // create the client client := new(Client) @@ -617,20 +686,16 @@ func (conn *liveConn) SetWriteDeadline(deadline time.Time) error { panic("not supported") } -func (s *Server) watchAutoGC() { - t := time.NewTicker(time.Second) - defer t.Stop() +func (s *Server) watchAutoGC(wg *sync.WaitGroup) { + defer wg.Done() start := time.Now() - for range t.C { - if s.stopServer.on() { - return - } + s.loopUntilServerStops(time.Second, func() { autoGC := s.config.autoGC() if autoGC == 0 { - continue + return } if time.Since(start) < time.Second*time.Duration(autoGC) { - continue + return } var mem1, mem2 runtime.MemStats runtime.ReadMemStats(&mem1) @@ -645,7 +710,7 @@ func (s *Server) watchAutoGC() { "alloc: %v, heap_alloc: %v, heap_released: %v", mem2.Alloc, mem2.HeapAlloc, mem2.HeapReleased) start = time.Now() - } + }) } func (s *Server) checkOutOfMemory() { @@ -667,40 +732,45 @@ func (s *Server) checkOutOfMemory() { s.outOfMemory.set(int(mem.HeapAlloc) > s.config.maxMemory()) } -func (s *Server) watchOutOfMemory() { - t := time.NewTicker(time.Second * 2) - defer t.Stop() - for range t.C { - s.checkOutOfMemory() - } -} - -func (s *Server) watchLuaStatePool() { - t := time.NewTicker(time.Second * 10) - defer t.Stop() - for range t.C { - func() { - s.luapool.Prune() - }() - } -} - -// backgroundSyncAOF ensures that the aof buffer is does not grow too big. -func (s *Server) backgroundSyncAOF() { - t := time.NewTicker(time.Second) - defer t.Stop() - for range t.C { +func (s *Server) loopUntilServerStops(dur time.Duration, op func()) { + var last time.Time + for { if s.stopServer.on() { return } - func() { - s.mu.Lock() - defer s.mu.Unlock() - s.flushAOF(true) - }() + now := time.Now() + if now.Sub(last) > dur { + op() + last = now + } + time.Sleep(time.Second / 5) } } +func (s *Server) watchOutOfMemory(wg *sync.WaitGroup) { + defer wg.Done() + s.loopUntilServerStops(time.Second*4, func() { + s.checkOutOfMemory() + }) +} + +func (s *Server) watchLuaStatePool(wg *sync.WaitGroup) { + defer wg.Done() + s.loopUntilServerStops(time.Second*10, func() { + s.luapool.Prune() + }) +} + +// backgroundSyncAOF ensures that the aof buffer is does not grow too big. +func (s *Server) backgroundSyncAOF(wg *sync.WaitGroup) { + defer wg.Done() + s.loopUntilServerStops(time.Second, func() { + s.mu.Lock() + defer s.mu.Unlock() + s.flushAOF(true) + }) +} + func isReservedFieldName(field string) bool { switch field { case "z", "lat", "lon": diff --git a/scripts/test.sh b/scripts/test.sh index 93f63322..acfc97f0 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -9,9 +9,8 @@ cd tests go test -coverpkg=../internal/server -coverprofile=/tmp/coverage.out $GOTEST -# go test \ -# -coverpkg=../internal/... -coverprofile=/tmp/coverage.out \ -# -v . -v ../... $GOTEST +# go test -coverpkg=../internal/... -coverprofile=/tmp/coverage.out \ +# -v ./... $GOTEST go tool cover -html=/tmp/coverage.out -o /tmp/coverage.html echo "details: file:///tmp/coverage.html" diff --git a/tests/mock_test.go b/tests/mock_test.go index a67e5f17..bd0597c2 100644 --- a/tests/mock_test.go +++ b/tests/mock_test.go @@ -35,10 +35,11 @@ func mockCleanup(silent bool) { } type mockServer struct { - port int - conn redis.Conn - ioJSON bool - dir string + port int + conn redis.Conn + ioJSON bool + dir string + shutdown chan bool } func (mc *mockServer) readAOF() ([]byte, error) { @@ -71,24 +72,29 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { logOutput := io.Discard if os.Getenv("PRINTLOG") == "1" { logOutput = os.Stderr + tlog.Level = 3 } - s := &mockServer{port: port, dir: dir} + shutdown := make(chan bool) + s := &mockServer{port: port, dir: dir, shutdown: shutdown} tlog.SetOutput(logOutput) var ferrt int32 // atomic flag for when ferr has been set var ferr error // ferr for when the server fails to start go func() { sopts := server.Options{ - Host: "localhost", - Port: port, - Dir: dir, - UseHTTP: true, - DevMode: true, - AppendOnly: true, + Host: "localhost", + Port: port, + Dir: dir, + UseHTTP: true, + DevMode: true, + AppendOnly: true, + Shutdown: shutdown, + ShowDebugMessages: true, } if opts.Metrics { sopts.MetricsAddr = ":4321" } - if err := server.Serve(sopts); err != nil { + err := server.Serve(sopts) + if err != nil { ferr = err atomic.StoreInt32(&ferrt, 1) } @@ -133,6 +139,7 @@ func (s *mockServer) waitForStartup(ferr *error, ferrt *int32) error { } func (mc *mockServer) Close() { + mc.shutdown <- true if mc.conn != nil { mc.conn.Close() } diff --git a/tests/tests_test.go b/tests/tests_test.go index 5894bb20..0ee8b4a9 100644 --- a/tests/tests_test.go +++ b/tests/tests_test.go @@ -26,6 +26,18 @@ const ( white = "\x1b[37m" ) +// type mockTest struct { +// } + +// func mockTestInit() *mockTest { +// mt := &mockTest{} +// return mt +// } + +// func (mt *mockTest) Cleanup() { + +// } + func TestAll(t *testing.T) { mockCleanup(false) defer mockCleanup(false) @@ -45,6 +57,8 @@ func TestAll(t *testing.T) { if err != nil { t.Fatal(err) } + // log.Printf("Waiting a second for everything to cleanly start...") + // time.Sleep(time.Second * 2) defer mc.Close() runSubTest(t, "keys", mc, subTestKeys)