diff --git a/cluster.go b/cluster.go index aeba3a5d..648baf9a 100644 --- a/cluster.go +++ b/cluster.go @@ -1046,7 +1046,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { for _, node := range nodes { _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() if err != nil { - internal.Logger.Printf("ReapStaleConns failed: %s", err) + internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err) } } } @@ -1576,7 +1576,7 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { info := cmdsInfo[name] if info == nil { - internal.Logger.Printf("info for cmd=%s not found", name) + internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name) } return info } diff --git a/commands.go b/commands.go index d32442a0..46b88eb9 100644 --- a/commands.go +++ b/commands.go @@ -13,9 +13,10 @@ func usePrecise(dur time.Duration) bool { return dur < time.Second || dur%time.Second != 0 } -func formatMs(dur time.Duration) int64 { +func formatMs(ctx context.Context, dur time.Duration) int64 { if dur > 0 && dur < time.Millisecond { internal.Logger.Printf( + ctx, "specified duration is %s, but minimal supported value is %s - truncating to 1ms", dur, time.Millisecond, ) @@ -24,9 +25,10 @@ func formatMs(dur time.Duration) int64 { return int64(dur / time.Millisecond) } -func formatSec(dur time.Duration) int64 { +func formatSec(ctx context.Context, dur time.Duration) int64 { if dur > 0 && dur < time.Second { internal.Logger.Printf( + ctx, "specified duration is %s, but minimal supported value is %s - truncating to 1s", dur, time.Second, ) @@ -457,7 +459,7 @@ func (c cmdable) Exists(ctx context.Context, keys ...string) *IntCmd { } func (c cmdable) Expire(ctx context.Context, key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd(ctx, "expire", key, formatSec(expiration)) + cmd := NewBoolCmd(ctx, "expire", key, formatSec(ctx, expiration)) _ = c(ctx, cmd) return cmd } @@ -482,7 +484,7 @@ func (c cmdable) Migrate(ctx context.Context, host, port, key string, db int, ti port, key, db, - formatMs(timeout), + formatMs(ctx, timeout), ) cmd.setReadTimeout(timeout) _ = c(ctx, cmd) @@ -520,7 +522,7 @@ func (c cmdable) Persist(ctx context.Context, key string) *BoolCmd { } func (c cmdable) PExpire(ctx context.Context, key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd(ctx, "pexpire", key, formatMs(expiration)) + cmd := NewBoolCmd(ctx, "pexpire", key, formatMs(ctx, expiration)) _ = c(ctx, cmd) return cmd } @@ -565,7 +567,7 @@ func (c cmdable) Restore(ctx context.Context, key string, ttl time.Duration, val ctx, "restore", key, - formatMs(ttl), + formatMs(ctx, ttl), value, ) _ = c(ctx, cmd) @@ -577,7 +579,7 @@ func (c cmdable) RestoreReplace(ctx context.Context, key string, ttl time.Durati ctx, "restore", key, - formatMs(ttl), + formatMs(ctx, ttl), value, "replace", ) @@ -761,9 +763,9 @@ func (c cmdable) Set(ctx context.Context, key string, value interface{}, expirat args[2] = value if expiration > 0 { if usePrecise(expiration) { - args = append(args, "px", formatMs(expiration)) + args = append(args, "px", formatMs(ctx, expiration)) } else { - args = append(args, "ex", formatSec(expiration)) + args = append(args, "ex", formatSec(ctx, expiration)) } } cmd := NewStatusCmd(ctx, args...) @@ -781,9 +783,9 @@ func (c cmdable) SetNX(ctx context.Context, key string, value interface{}, expir cmd = NewBoolCmd(ctx, "setnx", key, value) } else { if usePrecise(expiration) { - cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(expiration), "nx") + cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(ctx, expiration), "nx") } else { - cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(expiration), "nx") + cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "nx") } } _ = c(ctx, cmd) @@ -799,9 +801,9 @@ func (c cmdable) SetXX(ctx context.Context, key string, value interface{}, expir cmd = NewBoolCmd(ctx, "set", key, value, "xx") } else { if usePrecise(expiration) { - cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(expiration), "xx") + cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(ctx, expiration), "xx") } else { - cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(expiration), "xx") + cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "xx") } } _ = c(ctx, cmd) @@ -1088,7 +1090,7 @@ func (c cmdable) BLPop(ctx context.Context, timeout time.Duration, keys ...strin for i, key := range keys { args[1+i] = key } - args[len(args)-1] = formatSec(timeout) + args[len(args)-1] = formatSec(ctx, timeout) cmd := NewStringSliceCmd(ctx, args...) cmd.setReadTimeout(timeout) _ = c(ctx, cmd) @@ -1101,7 +1103,7 @@ func (c cmdable) BRPop(ctx context.Context, timeout time.Duration, keys ...strin for i, key := range keys { args[1+i] = key } - args[len(keys)+1] = formatSec(timeout) + args[len(keys)+1] = formatSec(ctx, timeout) cmd := NewStringSliceCmd(ctx, args...) cmd.setReadTimeout(timeout) _ = c(ctx, cmd) @@ -1114,7 +1116,7 @@ func (c cmdable) BRPopLPush(ctx context.Context, source, destination string, tim "brpoplpush", source, destination, - formatSec(timeout), + formatSec(ctx, timeout), ) cmd.setReadTimeout(timeout) _ = c(ctx, cmd) @@ -1694,7 +1696,7 @@ func (c cmdable) BZPopMax(ctx context.Context, timeout time.Duration, keys ...st for i, key := range keys { args[1+i] = key } - args[len(args)-1] = formatSec(timeout) + args[len(args)-1] = formatSec(ctx, timeout) cmd := NewZWithKeyCmd(ctx, args...) cmd.setReadTimeout(timeout) _ = c(ctx, cmd) @@ -1708,7 +1710,7 @@ func (c cmdable) BZPopMin(ctx context.Context, timeout time.Duration, keys ...st for i, key := range keys { args[1+i] = key } - args[len(args)-1] = formatSec(timeout) + args[len(args)-1] = formatSec(ctx, timeout) cmd := NewZWithKeyCmd(ctx, args...) cmd.setReadTimeout(timeout) _ = c(ctx, cmd) @@ -2165,7 +2167,7 @@ func (c cmdable) ClientList(ctx context.Context) *StringCmd { } func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd { - cmd := NewBoolCmd(ctx, "client", "pause", formatMs(dur)) + cmd := NewBoolCmd(ctx, "client", "pause", formatMs(ctx, dur)) _ = c(ctx, cmd) return cmd } diff --git a/internal/instruments.go b/internal/instruments.go index 87935e5c..e837526d 100644 --- a/internal/instruments.go +++ b/internal/instruments.go @@ -1,6 +1,8 @@ package internal import ( + "context" + "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/metric" ) @@ -15,7 +17,7 @@ var ( func init() { defer func() { if r := recover(); r != nil { - Logger.Printf("Error creating meter github.com/go-redis/redis for Instruments", r) + Logger.Printf(context.Background(), "Error creating meter github.com/go-redis/redis for Instruments", r) } }() diff --git a/internal/log.go b/internal/log.go index 49cef75f..3810f9e4 100644 --- a/internal/log.go +++ b/internal/log.go @@ -1,12 +1,24 @@ package internal import ( + "context" + "fmt" "log" "os" ) type Logging interface { - Printf(format string, v ...interface{}) + Printf(ctx context.Context, format string, v ...interface{}) } -var Logger Logging = log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile) +type logger struct { + log *log.Logger +} + +func (l *logger) Printf(ctx context.Context, format string, v ...interface{}) { + _ = l.log.Output(2, fmt.Sprintf(format, v...)) +} + +var Logger Logging = &logger{ + log: log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile), +} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index e97f4366..064efa43 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -320,7 +320,7 @@ func (p *ConnPool) popIdle() *Conn { func (p *ConnPool) Put(cn *Conn) { if cn.rd.Buffered() > 0 { - internal.Logger.Printf("Conn has unread data") + internal.Logger.Printf(context.Background(), "Conn has unread data") p.Remove(cn, BadConnError{}) return } @@ -458,7 +458,7 @@ func (p *ConnPool) reaper(frequency time.Duration) { } _, err := p.ReapStaleConns() if err != nil { - internal.Logger.Printf("ReapStaleConns failed: %s", err) + internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err) continue } case <-p.closedCh: diff --git a/pubsub.go b/pubsub.go index aac53103..61735032 100644 --- a/pubsub.go +++ b/pubsub.go @@ -162,7 +162,7 @@ func (c *PubSub) closeTheCn(reason error) error { return nil } if !c.closed { - internal.Logger.Printf("redis: discarding bad PubSub connection: %s", reason) + internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason) } err := c.closeConn(c.cn) c.cn = nil @@ -450,6 +450,14 @@ func (c *PubSub) ChannelWithSubscriptions(ctx context.Context, size int) <-chan return c.allCh } +func (c *PubSub) getContext() context.Context { + if c.cmd != nil { + return c.cmd.ctx + } + + return context.Background() +} + func (c *PubSub) initPing() { ctx := context.TODO() c.ping = make(chan struct{}, 1) @@ -531,10 +539,11 @@ func (c *PubSub) initMsgChan(size int) { } case <-timer.C: internal.Logger.Printf( + c.getContext(), "redis: %s channel is full for %s (message is dropped)", c, pingTimeout) } default: - internal.Logger.Printf("redis: unknown message type: %T", msg) + internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg) } } }() @@ -579,7 +588,7 @@ func (c *PubSub) initAllChan(size int) { case *Message: c.sendMessage(msg, timer) default: - internal.Logger.Printf("redis: unknown message type: %T", msg) + internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg) } } }() @@ -594,6 +603,7 @@ func (c *PubSub) sendMessage(msg interface{}, timer *time.Timer) { } case <-timer.C: internal.Logger.Printf( + c.getContext(), "redis: %s channel is full for %s (message is dropped)", c, pingTimeout) } } diff --git a/ring.go b/ring.go index c5164e59..f248a877 100644 --- a/ring.go +++ b/ring.go @@ -304,7 +304,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { err := shard.Client.Ping(ctx).Err() isUp := err == nil || err == pool.ErrPoolTimeout if shard.Vote(isUp) { - internal.Logger.Printf("ring shard state changed: %s", shard) + internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard) rebalance = true } } @@ -558,7 +558,7 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { } info := cmdsInfo[name] if info == nil { - internal.Logger.Printf("info for cmd=%s not found", name) + internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name) } return info } diff --git a/sentinel.go b/sentinel.go index a6dbbae5..0f0959b3 100644 --- a/sentinel.go +++ b/sentinel.go @@ -342,7 +342,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { if err != nil { return "", err } - c.switchMaster(addr) + c.switchMaster(ctx, addr) return addr, nil } @@ -393,7 +393,7 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) { masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() if err != nil { - internal.Logger.Printf("sentinel: GetMasterAddrByName master=%q failed: %s", + internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s", c.masterName, err) _ = sentinel.Close() continue @@ -413,14 +413,14 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) { func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string { addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() if err != nil { - internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s", + internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s", c.masterName, err) return "" } return net.JoinHostPort(addr[0], addr[1]) } -func (c *sentinelFailover) switchMaster(addr string) { +func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) { c.mu.RLock() masterAddr := c._masterAddr c.mu.RUnlock() @@ -435,7 +435,7 @@ func (c *sentinelFailover) switchMaster(addr string) { return } - internal.Logger.Printf("sentinel: new master=%q addr=%q", + internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q", c.masterName, addr) _ = c.Pool().Filter(func(cn *pool.Conn) bool { return cn.RemoteAddr().String() != addr @@ -457,7 +457,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl func (c *sentinelFailover) discoverSentinels(ctx context.Context) { sentinels, err := c.sentinel.Sentinels(ctx, c.masterName).Result() if err != nil { - internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) + internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.masterName, err) return } for _, sentinel := range sentinels { @@ -467,7 +467,7 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) { if key == "name" { sentinelAddr := vals[i+1].(string) if !contains(c.sentinelAddrs, sentinelAddr) { - internal.Logger.Printf("sentinel: discovered new sentinel=%q for master=%q", + internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q", sentinelAddr, c.masterName) c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) } @@ -487,11 +487,11 @@ func (c *sentinelFailover) listen(pubsub *PubSub) { if msg.Channel == "+switch-master" { parts := strings.Split(msg.Payload, " ") if parts[0] != c.masterName { - internal.Logger.Printf("sentinel: ignore addr for master=%q", parts[0]) + internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0]) continue } addr := net.JoinHostPort(parts[3], parts[4]) - c.switchMaster(addr) + c.switchMaster(pubsub.getContext(), addr) } } }