mirror of https://github.com/go-redis/redis.git
Log with context to associate all log by traceID (#1413)
* Log with context to associate all log by traceID
This commit is contained in:
parent
f2645d373d
commit
8a3f304b25
|
@ -1046,7 +1046,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
|
_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf("ReapStaleConns failed: %s", err)
|
internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1576,7 +1576,7 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
|
||||||
|
|
||||||
info := cmdsInfo[name]
|
info := cmdsInfo[name]
|
||||||
if info == nil {
|
if info == nil {
|
||||||
internal.Logger.Printf("info for cmd=%s not found", name)
|
internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
|
||||||
}
|
}
|
||||||
return info
|
return info
|
||||||
}
|
}
|
||||||
|
|
40
commands.go
40
commands.go
|
@ -13,9 +13,10 @@ func usePrecise(dur time.Duration) bool {
|
||||||
return dur < time.Second || dur%time.Second != 0
|
return dur < time.Second || dur%time.Second != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func formatMs(dur time.Duration) int64 {
|
func formatMs(ctx context.Context, dur time.Duration) int64 {
|
||||||
if dur > 0 && dur < time.Millisecond {
|
if dur > 0 && dur < time.Millisecond {
|
||||||
internal.Logger.Printf(
|
internal.Logger.Printf(
|
||||||
|
ctx,
|
||||||
"specified duration is %s, but minimal supported value is %s - truncating to 1ms",
|
"specified duration is %s, but minimal supported value is %s - truncating to 1ms",
|
||||||
dur, time.Millisecond,
|
dur, time.Millisecond,
|
||||||
)
|
)
|
||||||
|
@ -24,9 +25,10 @@ func formatMs(dur time.Duration) int64 {
|
||||||
return int64(dur / time.Millisecond)
|
return int64(dur / time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
func formatSec(dur time.Duration) int64 {
|
func formatSec(ctx context.Context, dur time.Duration) int64 {
|
||||||
if dur > 0 && dur < time.Second {
|
if dur > 0 && dur < time.Second {
|
||||||
internal.Logger.Printf(
|
internal.Logger.Printf(
|
||||||
|
ctx,
|
||||||
"specified duration is %s, but minimal supported value is %s - truncating to 1s",
|
"specified duration is %s, but minimal supported value is %s - truncating to 1s",
|
||||||
dur, time.Second,
|
dur, time.Second,
|
||||||
)
|
)
|
||||||
|
@ -457,7 +459,7 @@ func (c cmdable) Exists(ctx context.Context, keys ...string) *IntCmd {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cmdable) Expire(ctx context.Context, key string, expiration time.Duration) *BoolCmd {
|
func (c cmdable) Expire(ctx context.Context, key string, expiration time.Duration) *BoolCmd {
|
||||||
cmd := NewBoolCmd(ctx, "expire", key, formatSec(expiration))
|
cmd := NewBoolCmd(ctx, "expire", key, formatSec(ctx, expiration))
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
@ -482,7 +484,7 @@ func (c cmdable) Migrate(ctx context.Context, host, port, key string, db int, ti
|
||||||
port,
|
port,
|
||||||
key,
|
key,
|
||||||
db,
|
db,
|
||||||
formatMs(timeout),
|
formatMs(ctx, timeout),
|
||||||
)
|
)
|
||||||
cmd.setReadTimeout(timeout)
|
cmd.setReadTimeout(timeout)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -520,7 +522,7 @@ func (c cmdable) Persist(ctx context.Context, key string) *BoolCmd {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cmdable) PExpire(ctx context.Context, key string, expiration time.Duration) *BoolCmd {
|
func (c cmdable) PExpire(ctx context.Context, key string, expiration time.Duration) *BoolCmd {
|
||||||
cmd := NewBoolCmd(ctx, "pexpire", key, formatMs(expiration))
|
cmd := NewBoolCmd(ctx, "pexpire", key, formatMs(ctx, expiration))
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
@ -565,7 +567,7 @@ func (c cmdable) Restore(ctx context.Context, key string, ttl time.Duration, val
|
||||||
ctx,
|
ctx,
|
||||||
"restore",
|
"restore",
|
||||||
key,
|
key,
|
||||||
formatMs(ttl),
|
formatMs(ctx, ttl),
|
||||||
value,
|
value,
|
||||||
)
|
)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -577,7 +579,7 @@ func (c cmdable) RestoreReplace(ctx context.Context, key string, ttl time.Durati
|
||||||
ctx,
|
ctx,
|
||||||
"restore",
|
"restore",
|
||||||
key,
|
key,
|
||||||
formatMs(ttl),
|
formatMs(ctx, ttl),
|
||||||
value,
|
value,
|
||||||
"replace",
|
"replace",
|
||||||
)
|
)
|
||||||
|
@ -761,9 +763,9 @@ func (c cmdable) Set(ctx context.Context, key string, value interface{}, expirat
|
||||||
args[2] = value
|
args[2] = value
|
||||||
if expiration > 0 {
|
if expiration > 0 {
|
||||||
if usePrecise(expiration) {
|
if usePrecise(expiration) {
|
||||||
args = append(args, "px", formatMs(expiration))
|
args = append(args, "px", formatMs(ctx, expiration))
|
||||||
} else {
|
} else {
|
||||||
args = append(args, "ex", formatSec(expiration))
|
args = append(args, "ex", formatSec(ctx, expiration))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cmd := NewStatusCmd(ctx, args...)
|
cmd := NewStatusCmd(ctx, args...)
|
||||||
|
@ -781,9 +783,9 @@ func (c cmdable) SetNX(ctx context.Context, key string, value interface{}, expir
|
||||||
cmd = NewBoolCmd(ctx, "setnx", key, value)
|
cmd = NewBoolCmd(ctx, "setnx", key, value)
|
||||||
} else {
|
} else {
|
||||||
if usePrecise(expiration) {
|
if usePrecise(expiration) {
|
||||||
cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(expiration), "nx")
|
cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(ctx, expiration), "nx")
|
||||||
} else {
|
} else {
|
||||||
cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(expiration), "nx")
|
cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "nx")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -799,9 +801,9 @@ func (c cmdable) SetXX(ctx context.Context, key string, value interface{}, expir
|
||||||
cmd = NewBoolCmd(ctx, "set", key, value, "xx")
|
cmd = NewBoolCmd(ctx, "set", key, value, "xx")
|
||||||
} else {
|
} else {
|
||||||
if usePrecise(expiration) {
|
if usePrecise(expiration) {
|
||||||
cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(expiration), "xx")
|
cmd = NewBoolCmd(ctx, "set", key, value, "px", formatMs(ctx, expiration), "xx")
|
||||||
} else {
|
} else {
|
||||||
cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(expiration), "xx")
|
cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "xx")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -1088,7 +1090,7 @@ func (c cmdable) BLPop(ctx context.Context, timeout time.Duration, keys ...strin
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
args[1+i] = key
|
args[1+i] = key
|
||||||
}
|
}
|
||||||
args[len(args)-1] = formatSec(timeout)
|
args[len(args)-1] = formatSec(ctx, timeout)
|
||||||
cmd := NewStringSliceCmd(ctx, args...)
|
cmd := NewStringSliceCmd(ctx, args...)
|
||||||
cmd.setReadTimeout(timeout)
|
cmd.setReadTimeout(timeout)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -1101,7 +1103,7 @@ func (c cmdable) BRPop(ctx context.Context, timeout time.Duration, keys ...strin
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
args[1+i] = key
|
args[1+i] = key
|
||||||
}
|
}
|
||||||
args[len(keys)+1] = formatSec(timeout)
|
args[len(keys)+1] = formatSec(ctx, timeout)
|
||||||
cmd := NewStringSliceCmd(ctx, args...)
|
cmd := NewStringSliceCmd(ctx, args...)
|
||||||
cmd.setReadTimeout(timeout)
|
cmd.setReadTimeout(timeout)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -1114,7 +1116,7 @@ func (c cmdable) BRPopLPush(ctx context.Context, source, destination string, tim
|
||||||
"brpoplpush",
|
"brpoplpush",
|
||||||
source,
|
source,
|
||||||
destination,
|
destination,
|
||||||
formatSec(timeout),
|
formatSec(ctx, timeout),
|
||||||
)
|
)
|
||||||
cmd.setReadTimeout(timeout)
|
cmd.setReadTimeout(timeout)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -1694,7 +1696,7 @@ func (c cmdable) BZPopMax(ctx context.Context, timeout time.Duration, keys ...st
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
args[1+i] = key
|
args[1+i] = key
|
||||||
}
|
}
|
||||||
args[len(args)-1] = formatSec(timeout)
|
args[len(args)-1] = formatSec(ctx, timeout)
|
||||||
cmd := NewZWithKeyCmd(ctx, args...)
|
cmd := NewZWithKeyCmd(ctx, args...)
|
||||||
cmd.setReadTimeout(timeout)
|
cmd.setReadTimeout(timeout)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -1708,7 +1710,7 @@ func (c cmdable) BZPopMin(ctx context.Context, timeout time.Duration, keys ...st
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
args[1+i] = key
|
args[1+i] = key
|
||||||
}
|
}
|
||||||
args[len(args)-1] = formatSec(timeout)
|
args[len(args)-1] = formatSec(ctx, timeout)
|
||||||
cmd := NewZWithKeyCmd(ctx, args...)
|
cmd := NewZWithKeyCmd(ctx, args...)
|
||||||
cmd.setReadTimeout(timeout)
|
cmd.setReadTimeout(timeout)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
@ -2165,7 +2167,7 @@ func (c cmdable) ClientList(ctx context.Context) *StringCmd {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd {
|
func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd {
|
||||||
cmd := NewBoolCmd(ctx, "client", "pause", formatMs(dur))
|
cmd := NewBoolCmd(ctx, "client", "pause", formatMs(ctx, dur))
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/api/global"
|
"go.opentelemetry.io/otel/api/global"
|
||||||
"go.opentelemetry.io/otel/api/metric"
|
"go.opentelemetry.io/otel/api/metric"
|
||||||
)
|
)
|
||||||
|
@ -15,7 +17,7 @@ var (
|
||||||
func init() {
|
func init() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
Logger.Printf("Error creating meter github.com/go-redis/redis for Instruments", r)
|
Logger.Printf(context.Background(), "Error creating meter github.com/go-redis/redis for Instruments", r)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,24 @@
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Logging interface {
|
type Logging interface {
|
||||||
Printf(format string, v ...interface{})
|
Printf(ctx context.Context, format string, v ...interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
var Logger Logging = log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)
|
type logger struct {
|
||||||
|
log *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logger) Printf(ctx context.Context, format string, v ...interface{}) {
|
||||||
|
_ = l.log.Output(2, fmt.Sprintf(format, v...))
|
||||||
|
}
|
||||||
|
|
||||||
|
var Logger Logging = &logger{
|
||||||
|
log: log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile),
|
||||||
|
}
|
||||||
|
|
|
@ -320,7 +320,7 @@ func (p *ConnPool) popIdle() *Conn {
|
||||||
|
|
||||||
func (p *ConnPool) Put(cn *Conn) {
|
func (p *ConnPool) Put(cn *Conn) {
|
||||||
if cn.rd.Buffered() > 0 {
|
if cn.rd.Buffered() > 0 {
|
||||||
internal.Logger.Printf("Conn has unread data")
|
internal.Logger.Printf(context.Background(), "Conn has unread data")
|
||||||
p.Remove(cn, BadConnError{})
|
p.Remove(cn, BadConnError{})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -458,7 +458,7 @@ func (p *ConnPool) reaper(frequency time.Duration) {
|
||||||
}
|
}
|
||||||
_, err := p.ReapStaleConns()
|
_, err := p.ReapStaleConns()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf("ReapStaleConns failed: %s", err)
|
internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case <-p.closedCh:
|
case <-p.closedCh:
|
||||||
|
|
16
pubsub.go
16
pubsub.go
|
@ -162,7 +162,7 @@ func (c *PubSub) closeTheCn(reason error) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !c.closed {
|
if !c.closed {
|
||||||
internal.Logger.Printf("redis: discarding bad PubSub connection: %s", reason)
|
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason)
|
||||||
}
|
}
|
||||||
err := c.closeConn(c.cn)
|
err := c.closeConn(c.cn)
|
||||||
c.cn = nil
|
c.cn = nil
|
||||||
|
@ -450,6 +450,14 @@ func (c *PubSub) ChannelWithSubscriptions(ctx context.Context, size int) <-chan
|
||||||
return c.allCh
|
return c.allCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *PubSub) getContext() context.Context {
|
||||||
|
if c.cmd != nil {
|
||||||
|
return c.cmd.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
return context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *PubSub) initPing() {
|
func (c *PubSub) initPing() {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
c.ping = make(chan struct{}, 1)
|
c.ping = make(chan struct{}, 1)
|
||||||
|
@ -531,10 +539,11 @@ func (c *PubSub) initMsgChan(size int) {
|
||||||
}
|
}
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
internal.Logger.Printf(
|
internal.Logger.Printf(
|
||||||
|
c.getContext(),
|
||||||
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
|
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
internal.Logger.Printf("redis: unknown message type: %T", msg)
|
internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -579,7 +588,7 @@ func (c *PubSub) initAllChan(size int) {
|
||||||
case *Message:
|
case *Message:
|
||||||
c.sendMessage(msg, timer)
|
c.sendMessage(msg, timer)
|
||||||
default:
|
default:
|
||||||
internal.Logger.Printf("redis: unknown message type: %T", msg)
|
internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -594,6 +603,7 @@ func (c *PubSub) sendMessage(msg interface{}, timer *time.Timer) {
|
||||||
}
|
}
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
internal.Logger.Printf(
|
internal.Logger.Printf(
|
||||||
|
c.getContext(),
|
||||||
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
|
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
4
ring.go
4
ring.go
|
@ -304,7 +304,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
|
||||||
err := shard.Client.Ping(ctx).Err()
|
err := shard.Client.Ping(ctx).Err()
|
||||||
isUp := err == nil || err == pool.ErrPoolTimeout
|
isUp := err == nil || err == pool.ErrPoolTimeout
|
||||||
if shard.Vote(isUp) {
|
if shard.Vote(isUp) {
|
||||||
internal.Logger.Printf("ring shard state changed: %s", shard)
|
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
|
||||||
rebalance = true
|
rebalance = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -558,7 +558,7 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||||
}
|
}
|
||||||
info := cmdsInfo[name]
|
info := cmdsInfo[name]
|
||||||
if info == nil {
|
if info == nil {
|
||||||
internal.Logger.Printf("info for cmd=%s not found", name)
|
internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
|
||||||
}
|
}
|
||||||
return info
|
return info
|
||||||
}
|
}
|
||||||
|
|
18
sentinel.go
18
sentinel.go
|
@ -342,7 +342,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
c.switchMaster(addr)
|
c.switchMaster(ctx, addr)
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,7 +393,7 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
|
||||||
|
|
||||||
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
|
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf("sentinel: GetMasterAddrByName master=%q failed: %s",
|
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
|
||||||
c.masterName, err)
|
c.masterName, err)
|
||||||
_ = sentinel.Close()
|
_ = sentinel.Close()
|
||||||
continue
|
continue
|
||||||
|
@ -413,14 +413,14 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
|
||||||
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
|
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
|
||||||
addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
|
addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s",
|
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
|
||||||
c.masterName, err)
|
c.masterName, err)
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
return net.JoinHostPort(addr[0], addr[1])
|
return net.JoinHostPort(addr[0], addr[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) switchMaster(addr string) {
|
func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
masterAddr := c._masterAddr
|
masterAddr := c._masterAddr
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
@ -435,7 +435,7 @@ func (c *sentinelFailover) switchMaster(addr string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
internal.Logger.Printf("sentinel: new master=%q addr=%q",
|
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
|
||||||
c.masterName, addr)
|
c.masterName, addr)
|
||||||
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
|
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
|
||||||
return cn.RemoteAddr().String() != addr
|
return cn.RemoteAddr().String() != addr
|
||||||
|
@ -457,7 +457,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
|
||||||
func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
||||||
sentinels, err := c.sentinel.Sentinels(ctx, c.masterName).Result()
|
sentinels, err := c.sentinel.Sentinels(ctx, c.masterName).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
|
internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.masterName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, sentinel := range sentinels {
|
for _, sentinel := range sentinels {
|
||||||
|
@ -467,7 +467,7 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
||||||
if key == "name" {
|
if key == "name" {
|
||||||
sentinelAddr := vals[i+1].(string)
|
sentinelAddr := vals[i+1].(string)
|
||||||
if !contains(c.sentinelAddrs, sentinelAddr) {
|
if !contains(c.sentinelAddrs, sentinelAddr) {
|
||||||
internal.Logger.Printf("sentinel: discovered new sentinel=%q for master=%q",
|
internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
|
||||||
sentinelAddr, c.masterName)
|
sentinelAddr, c.masterName)
|
||||||
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
|
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
|
||||||
}
|
}
|
||||||
|
@ -487,11 +487,11 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
|
||||||
if msg.Channel == "+switch-master" {
|
if msg.Channel == "+switch-master" {
|
||||||
parts := strings.Split(msg.Payload, " ")
|
parts := strings.Split(msg.Payload, " ")
|
||||||
if parts[0] != c.masterName {
|
if parts[0] != c.masterName {
|
||||||
internal.Logger.Printf("sentinel: ignore addr for master=%q", parts[0])
|
internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
addr := net.JoinHostPort(parts[3], parts[4])
|
addr := net.JoinHostPort(parts[3], parts[4])
|
||||||
c.switchMaster(addr)
|
c.switchMaster(pubsub.getContext(), addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue