Thread safe log and support for concurrent tile38 instances

This commit is contained in:
tidwall 2022-09-26 10:02:02 -07:00
parent 97da6d70c4
commit 3cb8e0509a
8 changed files with 144 additions and 111 deletions

View File

@ -302,7 +302,7 @@ Developer Options:
flag.Parse() flag.Parse()
if logEncoding == "json" { if logEncoding == "json" {
log.LogJSON = true log.SetLogJSON(true)
data, _ := os.ReadFile(filepath.Join(dir, "config")) data, _ := os.ReadFile(filepath.Join(dir, "config"))
if gjson.GetBytes(data, "logconfig.encoding").String() == "json" { if gjson.GetBytes(data, "logconfig.encoding").String() == "json" {
c := gjson.GetBytes(data, "logconfig").String() c := gjson.GetBytes(data, "logconfig").String()
@ -320,13 +320,13 @@ Developer Options:
log.SetOutput(logw) log.SetOutput(logw)
if quiet { if quiet {
log.Level = 0 log.SetLevel(0)
} else if veryVerbose { } else if veryVerbose {
log.Level = 3 log.SetLevel(3)
} else if verbose { } else if verbose {
log.Level = 2 log.SetLevel(2)
} else { } else {
log.Level = 1 log.SetLevel(1)
} }
showDebugMessages = veryVerbose showDebugMessages = veryVerbose
@ -445,7 +445,7 @@ Developer Options:
saddr = fmt.Sprintf("Port: %d", port) saddr = fmt.Sprintf("Port: %d", port)
} }
if log.LogJSON { if log.LogJSON() {
log.Printf(`Tile38 %s%s %d bit (%s/%s) %s%s, PID: %d. Visit tile38.com/sponsor to support the project`, log.Printf(`Tile38 %s%s %d bit (%s/%s) %s%s, PID: %d. Visit tile38.com/sponsor to support the project`,
core.Version, gitsha, strconv.IntSize, runtime.GOARCH, runtime.GOOS, hostd, saddr, os.Getpid()) core.Version, gitsha, strconv.IntSize, runtime.GOARCH, runtime.GOOS, hostd, saddr, os.Getpid())
} else { } else {

View File

@ -68,7 +68,7 @@ func (conn *KafkaConn) Send(msg string) error {
} }
conn.t = time.Now() conn.t = time.Now()
if log.Level > 2 { if log.Level() > 2 {
sarama.Logger = lg.New(log.Output(), "[sarama] ", 0) sarama.Logger = lg.New(log.Output(), "[sarama] ", 0)
} }

View File

@ -90,7 +90,7 @@ func (conn *SQSConn) Send(msg string) error {
sess := session.Must(session.NewSession(&aws.Config{ sess := session.Must(session.NewSession(&aws.Config{
Region: &region, Region: &region,
Credentials: creds, Credentials: creds,
CredentialsChainVerboseErrors: aws.Bool(log.Level >= 3), CredentialsChainVerboseErrors: aws.Bool(log.Level() >= 3),
MaxRetries: aws.Int(5), MaxRetries: aws.Int(5),
})) }))
svc := sqs.New(sess) svc := sqs.New(sess)

View File

@ -6,91 +6,120 @@ import (
"io" "io"
"os" "os"
"sync" "sync"
"sync/atomic"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/term" "golang.org/x/term"
) )
var mu sync.Mutex var wmu sync.Mutex
var wr io.Writer var wr io.Writer
var tty bool
var LogJSON = false var zmu sync.Mutex
var logger *zap.SugaredLogger var zlogger *zap.SugaredLogger
var tty atomic.Bool
var ljson atomic.Bool
var llevel atomic.Int32
func init() {
SetOutput(os.Stderr)
SetLevel(1)
}
// Level is the log level // Level is the log level
// 0: silent - do not log // 0: silent - do not log
// 1: normal - show everything except debug and warn // 1: normal - show everything except debug and warn
// 2: verbose - show everything except debug // 2: verbose - show everything except debug
// 3: very verbose - show everything // 3: very verbose - show everything
var Level = 1 func SetLevel(level int) {
if level < 0 {
level = 0
} else if level > 3 {
level = 3
}
llevel.Store(int32(level))
}
// Level returns the log level
func Level() int {
return int(llevel.Load())
}
func SetLogJSON(logJSON bool) {
ljson.Store(logJSON)
}
func LogJSON() bool {
return ljson.Load()
}
// SetOutput sets the output of the logger // SetOutput sets the output of the logger
func SetOutput(w io.Writer) { func SetOutput(w io.Writer) {
f, ok := w.(*os.File) f, ok := w.(*os.File)
tty = ok && term.IsTerminal(int(f.Fd())) tty.Store(ok && term.IsTerminal(int(f.Fd())))
wmu.Lock()
wr = w wr = w
wmu.Unlock()
} }
// Build a zap logger from default or custom config // Build a zap logger from default or custom config
func Build(c string) error { func Build(c string) error {
var zcfg zap.Config
if c == "" { if c == "" {
zcfg := zap.NewProductionConfig() zcfg = zap.NewProductionConfig()
// to be able to filter with Tile38 levels // to be able to filter with Tile38 levels
zcfg.Level.SetLevel(zap.DebugLevel) zcfg.Level.SetLevel(zap.DebugLevel)
// disable caller because caller is always log.go // disable caller because caller is always log.go
zcfg.DisableCaller = true zcfg.DisableCaller = true
core, err := zcfg.Build()
if err != nil {
return err
}
defer core.Sync()
logger = core.Sugar()
} else { } else {
var zcfg zap.Config
err := json.Unmarshal([]byte(c), &zcfg) err := json.Unmarshal([]byte(c), &zcfg)
if err != nil { if err != nil {
return err return err
} }
// to be able to filter with Tile38 levels // to be able to filter with Tile38 levels
zcfg.Level.SetLevel(zap.DebugLevel) zcfg.Level.SetLevel(zap.DebugLevel)
// disable caller because caller is always log.go // disable caller because caller is always log.go
zcfg.DisableCaller = true zcfg.DisableCaller = true
}
core, err := zcfg.Build() core, err := zcfg.Build()
if err != nil { if err != nil {
return err return err
} }
defer core.Sync() defer core.Sync()
logger = core.Sugar() zmu.Lock()
} zlogger = core.Sugar()
zmu.Unlock()
return nil return nil
} }
// Set a zap logger // Set a zap logger
func Set(sl *zap.SugaredLogger) { func Set(sl *zap.SugaredLogger) {
logger = sl zmu.Lock()
zlogger = sl
zmu.Unlock()
} }
// Get a zap logger // Get a zap logger
func Get() *zap.SugaredLogger { func Get() *zap.SugaredLogger {
return logger zmu.Lock()
sl := zlogger
zmu.Unlock()
return sl
} }
func init() { // Output returns the output writer
SetOutput(os.Stderr)
}
// Output retuns the output writer
func Output() io.Writer { func Output() io.Writer {
wmu.Lock()
defer wmu.Unlock()
return wr return wr
} }
func log(level int, tag, color string, formatted bool, format string, args ...interface{}) { func log(level int, tag, color string, formatted bool, format string, args ...interface{}) {
if Level < level { if llevel.Load() < int32(level) {
return return
} }
var msg string var msg string
@ -99,30 +128,32 @@ func log(level int, tag, color string, formatted bool, format string, args ...in
} else { } else {
msg = fmt.Sprint(args...) msg = fmt.Sprint(args...)
} }
if LogJSON { if ljson.Load() {
zmu.Lock()
defer zmu.Unlock()
switch tag { switch tag {
case "ERRO": case "ERRO":
logger.Error(msg) zlogger.Error(msg)
case "FATA": case "FATA":
logger.Fatal(msg) zlogger.Fatal(msg)
case "WARN": case "WARN":
logger.Warn(msg) zlogger.Warn(msg)
case "DEBU": case "DEBU":
logger.Debug(msg) zlogger.Debug(msg)
default: default:
logger.Info(msg) zlogger.Info(msg)
} }
return return
} }
s := []byte(time.Now().Format("2006/01/02 15:04:05")) s := []byte(time.Now().Format("2006/01/02 15:04:05"))
s = append(s, ' ') s = append(s, ' ')
if tty { if tty.Load() {
s = append(s, color...) s = append(s, color...)
} }
s = append(s, '[') s = append(s, '[')
s = append(s, tag...) s = append(s, tag...)
s = append(s, ']') s = append(s, ']')
if tty { if tty.Load() {
s = append(s, "\x1b[0m"...) s = append(s, "\x1b[0m"...)
} }
s = append(s, ' ') s = append(s, ' ')
@ -130,79 +161,79 @@ func log(level int, tag, color string, formatted bool, format string, args ...in
if s[len(s)-1] != '\n' { if s[len(s)-1] != '\n' {
s = append(s, '\n') s = append(s, '\n')
} }
mu.Lock() wmu.Lock()
wr.Write(s) wr.Write(s)
mu.Unlock() wmu.Unlock()
} }
var emptyFormat string var emptyFormat string
// Infof ... // Infof ...
func Infof(format string, args ...interface{}) { func Infof(format string, args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(1, "INFO", "\x1b[36m", true, format, args...) log(1, "INFO", "\x1b[36m", true, format, args...)
} }
} }
// Info ... // Info ...
func Info(args ...interface{}) { func Info(args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(1, "INFO", "\x1b[36m", false, emptyFormat, args...) log(1, "INFO", "\x1b[36m", false, emptyFormat, args...)
} }
} }
// HTTPf ... // HTTPf ...
func HTTPf(format string, args ...interface{}) { func HTTPf(format string, args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(1, "HTTP", "\x1b[1m\x1b[30m", true, format, args...) log(1, "HTTP", "\x1b[1m\x1b[30m", true, format, args...)
} }
} }
// HTTP ... // HTTP ...
func HTTP(args ...interface{}) { func HTTP(args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(1, "HTTP", "\x1b[1m\x1b[30m", false, emptyFormat, args...) log(1, "HTTP", "\x1b[1m\x1b[30m", false, emptyFormat, args...)
} }
} }
// Errorf ... // Errorf ...
func Errorf(format string, args ...interface{}) { func Errorf(format string, args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(1, "ERRO", "\x1b[1m\x1b[31m", true, format, args...) log(1, "ERRO", "\x1b[1m\x1b[31m", true, format, args...)
} }
} }
// Error .. // Error ..
func Error(args ...interface{}) { func Error(args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(1, "ERRO", "\x1b[1m\x1b[31m", false, emptyFormat, args...) log(1, "ERRO", "\x1b[1m\x1b[31m", false, emptyFormat, args...)
} }
} }
// Warnf ... // Warnf ...
func Warnf(format string, args ...interface{}) { func Warnf(format string, args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(2, "WARN", "\x1b[33m", true, format, args...) log(2, "WARN", "\x1b[33m", true, format, args...)
} }
} }
// Warn ... // Warn ...
func Warn(args ...interface{}) { func Warn(args ...interface{}) {
if Level >= 1 { if llevel.Load() >= 1 {
log(2, "WARN", "\x1b[33m", false, emptyFormat, args...) log(2, "WARN", "\x1b[33m", false, emptyFormat, args...)
} }
} }
// Debugf ... // Debugf ...
func Debugf(format string, args ...interface{}) { func Debugf(format string, args ...interface{}) {
if Level >= 3 { if llevel.Load() >= 3 {
log(3, "DEBU", "\x1b[35m", true, format, args...) log(3, "DEBU", "\x1b[35m", true, format, args...)
} }
} }
// Debug ... // Debug ...
func Debug(args ...interface{}) { func Debug(args ...interface{}) {
if Level >= 3 { if llevel.Load() >= 3 {
log(3, "DEBU", "\x1b[35m", false, emptyFormat, args...) log(3, "DEBU", "\x1b[35m", false, emptyFormat, args...)
} }
} }

View File

@ -13,7 +13,7 @@ import (
func TestLog(t *testing.T) { func TestLog(t *testing.T) {
f := &bytes.Buffer{} f := &bytes.Buffer{}
LogJSON = false SetLogJSON(false)
SetOutput(f) SetOutput(f)
Printf("hello %v", "everyone") Printf("hello %v", "everyone")
if !strings.HasSuffix(f.String(), "hello everyone\n") { if !strings.HasSuffix(f.String(), "hello everyone\n") {
@ -23,7 +23,7 @@ func TestLog(t *testing.T) {
func TestLogJSON(t *testing.T) { func TestLogJSON(t *testing.T) {
LogJSON = true SetLogJSON(true)
Build("") Build("")
type tcase struct { type tcase struct {
@ -40,7 +40,7 @@ func TestLogJSON(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
observedZapCore, observedLogs := observer.New(zap.DebugLevel) observedZapCore, observedLogs := observer.New(zap.DebugLevel)
Set(zap.New(observedZapCore).Sugar()) Set(zap.New(observedZapCore).Sugar())
Level = tc.level SetLevel(tc.level)
if tc.format != "" { if tc.format != "" {
tc.fops(tc.format, tc.args) tc.fops(tc.format, tc.args)
@ -187,8 +187,8 @@ func TestLogJSON(t *testing.T) {
} }
func BenchmarkLogPrintf(t *testing.B) { func BenchmarkLogPrintf(t *testing.B) {
LogJSON = false SetLogJSON(false)
Level = 1 SetLevel(1)
SetOutput(io.Discard) SetOutput(io.Discard)
t.ResetTimer() t.ResetTimer()
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
@ -197,8 +197,8 @@ func BenchmarkLogPrintf(t *testing.B) {
} }
func BenchmarkLogJSONPrintf(t *testing.B) { func BenchmarkLogJSONPrintf(t *testing.B) {
LogJSON = true SetLogJSON(true)
Level = 1 SetLevel(1)
ec := zap.NewProductionEncoderConfig() ec := zap.NewProductionEncoderConfig()
ec.EncodeDuration = zapcore.NanosDurationEncoder ec.EncodeDuration = zapcore.NanosDurationEncoder

View File

@ -27,12 +27,14 @@ func downloadURLWithStatusCode(u string) (int, string, error) {
func metrics_basic_test(mc *mockServer) error { func metrics_basic_test(mc *mockServer) error {
maddr := fmt.Sprintf("http://127.0.0.1:%d/", mc.metricsPort())
mc.Do("SET", "metrics_test_1", "1", "FIELD", "foo", 5.5, "POINT", 5, 5) mc.Do("SET", "metrics_test_1", "1", "FIELD", "foo", 5.5, "POINT", 5, 5)
mc.Do("SET", "metrics_test_2", "2", "FIELD", "foo", 19.19, "POINT", 19, 19) mc.Do("SET", "metrics_test_2", "2", "FIELD", "foo", 19.19, "POINT", 19, 19)
mc.Do("SET", "metrics_test_2", "3", "FIELD", "foo", 19.19, "POINT", 19, 19) mc.Do("SET", "metrics_test_2", "3", "FIELD", "foo", 19.19, "POINT", 19, 19)
mc.Do("SET", "metrics_test_2", "truck1:driver", "STRING", "John Denton") mc.Do("SET", "metrics_test_2", "truck1:driver", "STRING", "John Denton")
status, index, err := downloadURLWithStatusCode("http://127.0.0.1:4321/") status, index, err := downloadURLWithStatusCode(maddr)
if err != nil { if err != nil {
return err return err
} }
@ -43,7 +45,7 @@ func metrics_basic_test(mc *mockServer) error {
return fmt.Errorf("missing link on index page") return fmt.Errorf("missing link on index page")
} }
status, metrics, err := downloadURLWithStatusCode("http://127.0.0.1:4321/metrics") status, metrics, err := downloadURLWithStatusCode(maddr + "metrics")
if err != nil { if err != nil {
return err return err
} }

View File

@ -13,7 +13,7 @@ import (
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/tidwall/sjson" "github.com/tidwall/sjson"
tlog "github.com/tidwall/tile38/internal/log" "github.com/tidwall/tile38/internal/log"
"github.com/tidwall/tile38/internal/server" "github.com/tidwall/tile38/internal/server"
) )
@ -36,6 +36,7 @@ func mockCleanup(silent bool) {
type mockServer struct { type mockServer struct {
port int port int
mport int
conn redis.Conn conn redis.Conn
ioJSON bool ioJSON bool
dir string dir string
@ -46,6 +47,10 @@ func (mc *mockServer) readAOF() ([]byte, error) {
return os.ReadFile(filepath.Join(mc.dir, "appendonly.aof")) return os.ReadFile(filepath.Join(mc.dir, "appendonly.aof"))
} }
func (mc *mockServer) metricsPort() int {
return mc.mport
}
type MockServerOptions struct { type MockServerOptions struct {
AOFData []byte AOFData []byte
Silent bool Silent bool
@ -53,6 +58,14 @@ type MockServerOptions struct {
} }
func mockOpenServer(opts MockServerOptions) (*mockServer, error) { func mockOpenServer(opts MockServerOptions) (*mockServer, error) {
logOutput := io.Discard
if os.Getenv("PRINTLOG") == "1" {
logOutput = os.Stderr
log.SetLevel(3)
}
log.SetOutput(logOutput)
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
port := rand.Int()%20000 + 20000 port := rand.Int()%20000 + 20000
dir := fmt.Sprintf("data-mock-%d", port) dir := fmt.Sprintf("data-mock-%d", port)
@ -69,14 +82,12 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) {
return nil, err return nil, err
} }
} }
logOutput := io.Discard
if os.Getenv("PRINTLOG") == "1" {
logOutput = os.Stderr
tlog.Level = 3
}
shutdown := make(chan bool) shutdown := make(chan bool)
s := &mockServer{port: port, dir: dir, shutdown: shutdown} s := &mockServer{port: port, dir: dir, shutdown: shutdown}
tlog.SetOutput(logOutput) if opts.Metrics {
s.mport = rand.Int()%20000 + 20000
}
var ferrt int32 // atomic flag for when ferr has been set var ferrt int32 // atomic flag for when ferr has been set
var ferr error // ferr for when the server fails to start var ferr error // ferr for when the server fails to start
go func() { go func() {
@ -91,7 +102,7 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) {
ShowDebugMessages: true, ShowDebugMessages: true,
} }
if opts.Metrics { if opts.Metrics {
sopts.MetricsAddr = ":4321" sopts.MetricsAddr = fmt.Sprintf(":%d", s.mport)
} }
err := server.Serve(sopts) err := server.Serve(sopts)
if err != nil { if err != nil {

View File

@ -26,19 +26,8 @@ const (
white = "\x1b[37m" white = "\x1b[37m"
) )
// type mockTest struct {
// }
// func mockTestInit() *mockTest {
// mt := &mockTest{}
// return mt
// }
// func (mt *mockTest) Cleanup() {
// }
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
mockCleanup(false) mockCleanup(false)
defer mockCleanup(false) defer mockCleanup(false)
@ -50,39 +39,39 @@ func TestAll(t *testing.T) {
os.Exit(1) os.Exit(1)
}() }()
runSubTest(t, "keys", subTestKeys)
runSubTest(t, "json", subTestJSON)
runSubTest(t, "search", subTestSearch)
runSubTest(t, "testcmd", subTestTestCmd)
runSubTest(t, "client", subTestClient)
runSubTest(t, "scripts", subTestScripts)
runSubTest(t, "fence", subTestFence)
runSubTest(t, "info", subTestInfo)
runSubTest(t, "timeouts", subTestTimeout)
runSubTest(t, "metrics", subTestMetrics)
runSubTest(t, "aof", subTestAOF)
}
func runSubTest(t *testing.T, name string, test func(t *testing.T, mc *mockServer)) {
t.Run(name, func(t *testing.T) {
// t.Parallel()
t.Helper()
mc, err := mockOpenServer(MockServerOptions{ mc, err := mockOpenServer(MockServerOptions{
Silent: false, Silent: true,
Metrics: true, Metrics: true,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// log.Printf("Waiting a second for everything to cleanly start...")
// time.Sleep(time.Second * 2)
defer mc.Close() defer mc.Close()
runSubTest(t, "keys", mc, subTestKeys)
runSubTest(t, "json", mc, subTestJSON)
runSubTest(t, "search", mc, subTestSearch)
runSubTest(t, "testcmd", mc, subTestTestCmd)
runSubTest(t, "client", mc, subTestClient)
runSubTest(t, "scripts", mc, subTestScripts)
runSubTest(t, "fence", mc, subTestFence)
runSubTest(t, "info", mc, subTestInfo)
runSubTest(t, "timeouts", mc, subTestTimeout)
runSubTest(t, "metrics", mc, subTestMetrics)
runSubTest(t, "aof", mc, subTestAOF)
}
func runSubTest(t *testing.T, name string, mc *mockServer, test func(t *testing.T, mc *mockServer)) {
t.Run(name, func(t *testing.T) {
fmt.Printf(bright+"Testing %s\n"+clear, name) fmt.Printf(bright+"Testing %s\n"+clear, name)
test(t, mc) test(t, mc)
}) })
} }
func runStep(t *testing.T, mc *mockServer, name string, step func(mc *mockServer) error) { func runStep(t *testing.T, mc *mockServer, name string, step func(mc *mockServer) error) {
t.Helper()
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
t.Helper() t.Helper()
if err := func() error { if err := func() error {