From 3cb8e0509a4dc79df7793a87d0f1bbf98693182b Mon Sep 17 00:00:00 2001 From: tidwall Date: Mon, 26 Sep 2022 10:02:02 -0700 Subject: [PATCH] Thread safe log and support for concurrent tile38 instances --- cmd/tile38-server/main.go | 12 ++-- internal/endpoint/kafka.go | 2 +- internal/endpoint/sqs.go | 2 +- internal/log/log.go | 131 +++++++++++++++++++++++-------------- internal/log/log_test.go | 14 ++-- tests/metrics_test.go | 6 +- tests/mock_test.go | 27 +++++--- tests/tests_test.go | 61 +++++++---------- 8 files changed, 144 insertions(+), 111 deletions(-) diff --git a/cmd/tile38-server/main.go b/cmd/tile38-server/main.go index 0479b5b8..58a1fd54 100644 --- a/cmd/tile38-server/main.go +++ b/cmd/tile38-server/main.go @@ -302,7 +302,7 @@ Developer Options: flag.Parse() if logEncoding == "json" { - log.LogJSON = true + log.SetLogJSON(true) data, _ := os.ReadFile(filepath.Join(dir, "config")) if gjson.GetBytes(data, "logconfig.encoding").String() == "json" { c := gjson.GetBytes(data, "logconfig").String() @@ -320,13 +320,13 @@ Developer Options: log.SetOutput(logw) if quiet { - log.Level = 0 + log.SetLevel(0) } else if veryVerbose { - log.Level = 3 + log.SetLevel(3) } else if verbose { - log.Level = 2 + log.SetLevel(2) } else { - log.Level = 1 + log.SetLevel(1) } showDebugMessages = veryVerbose @@ -445,7 +445,7 @@ Developer Options: saddr = fmt.Sprintf("Port: %d", port) } - if log.LogJSON { + if log.LogJSON() { log.Printf(`Tile38 %s%s %d bit (%s/%s) %s%s, PID: %d. Visit tile38.com/sponsor to support the project`, core.Version, gitsha, strconv.IntSize, runtime.GOARCH, runtime.GOOS, hostd, saddr, os.Getpid()) } else { diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 2cf808ea..4379d24d 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -68,7 +68,7 @@ func (conn *KafkaConn) Send(msg string) error { } conn.t = time.Now() - if log.Level > 2 { + if log.Level() > 2 { sarama.Logger = lg.New(log.Output(), "[sarama] ", 0) } diff --git a/internal/endpoint/sqs.go b/internal/endpoint/sqs.go index 148f2a3f..fae871e5 100644 --- a/internal/endpoint/sqs.go +++ b/internal/endpoint/sqs.go @@ -90,7 +90,7 @@ func (conn *SQSConn) Send(msg string) error { sess := session.Must(session.NewSession(&aws.Config{ Region: ®ion, Credentials: creds, - CredentialsChainVerboseErrors: aws.Bool(log.Level >= 3), + CredentialsChainVerboseErrors: aws.Bool(log.Level() >= 3), MaxRetries: aws.Int(5), })) svc := sqs.New(sess) diff --git a/internal/log/log.go b/internal/log/log.go index 4250d84c..16d8df87 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -6,91 +6,120 @@ import ( "io" "os" "sync" + "sync/atomic" "time" "go.uber.org/zap" "golang.org/x/term" ) -var mu sync.Mutex +var wmu sync.Mutex var wr io.Writer -var tty bool -var LogJSON = false -var logger *zap.SugaredLogger + +var zmu sync.Mutex +var zlogger *zap.SugaredLogger + +var tty atomic.Bool +var ljson atomic.Bool +var llevel atomic.Int32 + +func init() { + SetOutput(os.Stderr) + SetLevel(1) +} // Level is the log level // 0: silent - do not log // 1: normal - show everything except debug and warn // 2: verbose - show everything except debug // 3: very verbose - show everything -var Level = 1 +func SetLevel(level int) { + if level < 0 { + level = 0 + } else if level > 3 { + level = 3 + } + llevel.Store(int32(level)) +} + +// Level returns the log level +func Level() int { + return int(llevel.Load()) +} + +func SetLogJSON(logJSON bool) { + ljson.Store(logJSON) +} + +func LogJSON() bool { + return ljson.Load() +} // SetOutput sets the output of the logger func SetOutput(w io.Writer) { f, ok := w.(*os.File) - tty = ok && term.IsTerminal(int(f.Fd())) + tty.Store(ok && term.IsTerminal(int(f.Fd()))) + wmu.Lock() wr = w + wmu.Unlock() } // Build a zap logger from default or custom config func Build(c string) error { + var zcfg zap.Config if c == "" { - zcfg := zap.NewProductionConfig() + zcfg = zap.NewProductionConfig() // to be able to filter with Tile38 levels zcfg.Level.SetLevel(zap.DebugLevel) // disable caller because caller is always log.go zcfg.DisableCaller = true - core, err := zcfg.Build() - if err != nil { - return err - } - defer core.Sync() - logger = core.Sugar() } else { - var zcfg zap.Config err := json.Unmarshal([]byte(c), &zcfg) if err != nil { return err } - // to be able to filter with Tile38 levels zcfg.Level.SetLevel(zap.DebugLevel) // disable caller because caller is always log.go zcfg.DisableCaller = true - - core, err := zcfg.Build() - if err != nil { - return err - } - defer core.Sync() - logger = core.Sugar() } + core, err := zcfg.Build() + if err != nil { + return err + } + defer core.Sync() + zmu.Lock() + zlogger = core.Sugar() + zmu.Unlock() return nil } // Set a zap logger func Set(sl *zap.SugaredLogger) { - logger = sl + zmu.Lock() + zlogger = sl + zmu.Unlock() } // Get a zap logger func Get() *zap.SugaredLogger { - return logger + zmu.Lock() + sl := zlogger + zmu.Unlock() + return sl } -func init() { - SetOutput(os.Stderr) -} - -// Output retuns the output writer +// Output returns the output writer func Output() io.Writer { + wmu.Lock() + defer wmu.Unlock() return wr } func log(level int, tag, color string, formatted bool, format string, args ...interface{}) { - if Level < level { + if llevel.Load() < int32(level) { return } var msg string @@ -99,30 +128,32 @@ func log(level int, tag, color string, formatted bool, format string, args ...in } else { msg = fmt.Sprint(args...) } - if LogJSON { + if ljson.Load() { + zmu.Lock() + defer zmu.Unlock() switch tag { case "ERRO": - logger.Error(msg) + zlogger.Error(msg) case "FATA": - logger.Fatal(msg) + zlogger.Fatal(msg) case "WARN": - logger.Warn(msg) + zlogger.Warn(msg) case "DEBU": - logger.Debug(msg) + zlogger.Debug(msg) default: - logger.Info(msg) + zlogger.Info(msg) } return } s := []byte(time.Now().Format("2006/01/02 15:04:05")) s = append(s, ' ') - if tty { + if tty.Load() { s = append(s, color...) } s = append(s, '[') s = append(s, tag...) s = append(s, ']') - if tty { + if tty.Load() { s = append(s, "\x1b[0m"...) } s = append(s, ' ') @@ -130,79 +161,79 @@ func log(level int, tag, color string, formatted bool, format string, args ...in if s[len(s)-1] != '\n' { s = append(s, '\n') } - mu.Lock() + wmu.Lock() wr.Write(s) - mu.Unlock() + wmu.Unlock() } var emptyFormat string // Infof ... func Infof(format string, args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(1, "INFO", "\x1b[36m", true, format, args...) } } // Info ... func Info(args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(1, "INFO", "\x1b[36m", false, emptyFormat, args...) } } // HTTPf ... func HTTPf(format string, args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(1, "HTTP", "\x1b[1m\x1b[30m", true, format, args...) } } // HTTP ... func HTTP(args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(1, "HTTP", "\x1b[1m\x1b[30m", false, emptyFormat, args...) } } // Errorf ... func Errorf(format string, args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(1, "ERRO", "\x1b[1m\x1b[31m", true, format, args...) } } // Error .. func Error(args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(1, "ERRO", "\x1b[1m\x1b[31m", false, emptyFormat, args...) } } // Warnf ... func Warnf(format string, args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(2, "WARN", "\x1b[33m", true, format, args...) } } // Warn ... func Warn(args ...interface{}) { - if Level >= 1 { + if llevel.Load() >= 1 { log(2, "WARN", "\x1b[33m", false, emptyFormat, args...) } } // Debugf ... func Debugf(format string, args ...interface{}) { - if Level >= 3 { + if llevel.Load() >= 3 { log(3, "DEBU", "\x1b[35m", true, format, args...) } } // Debug ... func Debug(args ...interface{}) { - if Level >= 3 { + if llevel.Load() >= 3 { log(3, "DEBU", "\x1b[35m", false, emptyFormat, args...) } } diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 6c86128d..75f30be9 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -13,7 +13,7 @@ import ( func TestLog(t *testing.T) { f := &bytes.Buffer{} - LogJSON = false + SetLogJSON(false) SetOutput(f) Printf("hello %v", "everyone") if !strings.HasSuffix(f.String(), "hello everyone\n") { @@ -23,7 +23,7 @@ func TestLog(t *testing.T) { func TestLogJSON(t *testing.T) { - LogJSON = true + SetLogJSON(true) Build("") type tcase struct { @@ -40,7 +40,7 @@ func TestLogJSON(t *testing.T) { return func(t *testing.T) { observedZapCore, observedLogs := observer.New(zap.DebugLevel) Set(zap.New(observedZapCore).Sugar()) - Level = tc.level + SetLevel(tc.level) if tc.format != "" { tc.fops(tc.format, tc.args) @@ -187,8 +187,8 @@ func TestLogJSON(t *testing.T) { } func BenchmarkLogPrintf(t *testing.B) { - LogJSON = false - Level = 1 + SetLogJSON(false) + SetLevel(1) SetOutput(io.Discard) t.ResetTimer() for i := 0; i < t.N; i++ { @@ -197,8 +197,8 @@ func BenchmarkLogPrintf(t *testing.B) { } func BenchmarkLogJSONPrintf(t *testing.B) { - LogJSON = true - Level = 1 + SetLogJSON(true) + SetLevel(1) ec := zap.NewProductionEncoderConfig() ec.EncodeDuration = zapcore.NanosDurationEncoder diff --git a/tests/metrics_test.go b/tests/metrics_test.go index 429369d0..28b1b744 100644 --- a/tests/metrics_test.go +++ b/tests/metrics_test.go @@ -27,12 +27,14 @@ func downloadURLWithStatusCode(u string) (int, string, error) { func metrics_basic_test(mc *mockServer) error { + maddr := fmt.Sprintf("http://127.0.0.1:%d/", mc.metricsPort()) + mc.Do("SET", "metrics_test_1", "1", "FIELD", "foo", 5.5, "POINT", 5, 5) mc.Do("SET", "metrics_test_2", "2", "FIELD", "foo", 19.19, "POINT", 19, 19) mc.Do("SET", "metrics_test_2", "3", "FIELD", "foo", 19.19, "POINT", 19, 19) mc.Do("SET", "metrics_test_2", "truck1:driver", "STRING", "John Denton") - status, index, err := downloadURLWithStatusCode("http://127.0.0.1:4321/") + status, index, err := downloadURLWithStatusCode(maddr) if err != nil { return err } @@ -43,7 +45,7 @@ func metrics_basic_test(mc *mockServer) error { return fmt.Errorf("missing link on index page") } - status, metrics, err := downloadURLWithStatusCode("http://127.0.0.1:4321/metrics") + status, metrics, err := downloadURLWithStatusCode(maddr + "metrics") if err != nil { return err } diff --git a/tests/mock_test.go b/tests/mock_test.go index bd0597c2..d9e006d3 100644 --- a/tests/mock_test.go +++ b/tests/mock_test.go @@ -13,7 +13,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/tidwall/sjson" - tlog "github.com/tidwall/tile38/internal/log" + "github.com/tidwall/tile38/internal/log" "github.com/tidwall/tile38/internal/server" ) @@ -36,6 +36,7 @@ func mockCleanup(silent bool) { type mockServer struct { port int + mport int conn redis.Conn ioJSON bool dir string @@ -46,6 +47,10 @@ func (mc *mockServer) readAOF() ([]byte, error) { return os.ReadFile(filepath.Join(mc.dir, "appendonly.aof")) } +func (mc *mockServer) metricsPort() int { + return mc.mport +} + type MockServerOptions struct { AOFData []byte Silent bool @@ -53,6 +58,14 @@ type MockServerOptions struct { } func mockOpenServer(opts MockServerOptions) (*mockServer, error) { + + logOutput := io.Discard + if os.Getenv("PRINTLOG") == "1" { + logOutput = os.Stderr + log.SetLevel(3) + } + log.SetOutput(logOutput) + rand.Seed(time.Now().UnixNano()) port := rand.Int()%20000 + 20000 dir := fmt.Sprintf("data-mock-%d", port) @@ -69,14 +82,12 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { return nil, err } } - logOutput := io.Discard - if os.Getenv("PRINTLOG") == "1" { - logOutput = os.Stderr - tlog.Level = 3 - } + shutdown := make(chan bool) s := &mockServer{port: port, dir: dir, shutdown: shutdown} - tlog.SetOutput(logOutput) + if opts.Metrics { + s.mport = rand.Int()%20000 + 20000 + } var ferrt int32 // atomic flag for when ferr has been set var ferr error // ferr for when the server fails to start go func() { @@ -91,7 +102,7 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { ShowDebugMessages: true, } if opts.Metrics { - sopts.MetricsAddr = ":4321" + sopts.MetricsAddr = fmt.Sprintf(":%d", s.mport) } err := server.Serve(sopts) if err != nil { diff --git a/tests/tests_test.go b/tests/tests_test.go index 0ee8b4a9..b96b15bc 100644 --- a/tests/tests_test.go +++ b/tests/tests_test.go @@ -26,19 +26,8 @@ const ( white = "\x1b[37m" ) -// type mockTest struct { -// } - -// func mockTestInit() *mockTest { -// mt := &mockTest{} -// return mt -// } - -// func (mt *mockTest) Cleanup() { - -// } - func TestAll(t *testing.T) { + mockCleanup(false) defer mockCleanup(false) @@ -50,39 +39,39 @@ func TestAll(t *testing.T) { os.Exit(1) }() - mc, err := mockOpenServer(MockServerOptions{ - Silent: false, - Metrics: true, - }) - if err != nil { - t.Fatal(err) - } - // log.Printf("Waiting a second for everything to cleanly start...") - // time.Sleep(time.Second * 2) - defer mc.Close() - - runSubTest(t, "keys", mc, subTestKeys) - runSubTest(t, "json", mc, subTestJSON) - runSubTest(t, "search", mc, subTestSearch) - runSubTest(t, "testcmd", mc, subTestTestCmd) - runSubTest(t, "client", mc, subTestClient) - runSubTest(t, "scripts", mc, subTestScripts) - runSubTest(t, "fence", mc, subTestFence) - runSubTest(t, "info", mc, subTestInfo) - runSubTest(t, "timeouts", mc, subTestTimeout) - runSubTest(t, "metrics", mc, subTestMetrics) - runSubTest(t, "aof", mc, subTestAOF) + runSubTest(t, "keys", subTestKeys) + runSubTest(t, "json", subTestJSON) + runSubTest(t, "search", subTestSearch) + runSubTest(t, "testcmd", subTestTestCmd) + runSubTest(t, "client", subTestClient) + runSubTest(t, "scripts", subTestScripts) + runSubTest(t, "fence", subTestFence) + runSubTest(t, "info", subTestInfo) + runSubTest(t, "timeouts", subTestTimeout) + runSubTest(t, "metrics", subTestMetrics) + runSubTest(t, "aof", subTestAOF) } -func runSubTest(t *testing.T, name string, mc *mockServer, test func(t *testing.T, mc *mockServer)) { +func runSubTest(t *testing.T, name string, test func(t *testing.T, mc *mockServer)) { t.Run(name, func(t *testing.T) { + // t.Parallel() + t.Helper() + + mc, err := mockOpenServer(MockServerOptions{ + Silent: true, + Metrics: true, + }) + if err != nil { + t.Fatal(err) + } + defer mc.Close() + fmt.Printf(bright+"Testing %s\n"+clear, name) test(t, mc) }) } func runStep(t *testing.T, mc *mockServer, name string, step func(mc *mockServer) error) { - t.Helper() t.Run(name, func(t *testing.T) { t.Helper() if err := func() error {