diff --git a/cluster.go b/cluster.go index 15cba8fd..c39b3467 100644 --- a/cluster.go +++ b/cluster.go @@ -622,7 +622,7 @@ func (c *clusterStateHolder) LazyReload(ctx context.Context) { if err != nil { return } - time.Sleep(250 * time.Millisecond) + time.Sleep(200 * time.Millisecond) }() } diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 5bb34c3e..ea1b229c 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -63,11 +63,13 @@ func (cn *Conn) RemoteAddr() net.Addr { func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { return internal.WithSpan(ctx, "with_reader", func(ctx context.Context) error { - err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)) - if err != nil { - return err + if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { + return internal.RecordError(ctx, err) } - return fn(cn.rd) + if err := fn(cn.rd); err != nil { + return internal.RecordError(ctx, err) + } + return nil }) } @@ -75,21 +77,23 @@ func (cn *Conn) WithWriter( ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error, ) error { return internal.WithSpan(ctx, "with_writer", func(ctx context.Context) error { - err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)) - if err != nil { - return err + if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil { + return internal.RecordError(ctx, err) } if cn.bw.Buffered() > 0 { cn.bw.Reset(cn.netConn) } - err = fn(cn.wr) - if err != nil { - return err + if err := fn(cn.wr); err != nil { + return internal.RecordError(ctx, err) } - return cn.bw.Flush() + if err := cn.bw.Flush(); err != nil { + return internal.RecordError(ctx, err) + } + + return nil }) } diff --git a/internal/util.go b/internal/util.go index e8cda059..d0c00dc7 100644 --- a/internal/util.go +++ b/internal/util.go @@ -62,17 +62,6 @@ func Unwrap(err error) error { return u.Unwrap() } -func WithSpan(ctx context.Context, name string, fn func(context.Context) error) error { - if !trace.SpanFromContext(ctx).IsRecording() { - return fn(ctx) - } - - ctx, span := global.Tracer("go-redis").Start(ctx, name) - defer span.End() - - return fn(ctx) -} - func AppendArg(b []byte, v interface{}) []byte { switch v := v.(type) { case nil: @@ -143,3 +132,21 @@ func appendRune(b []byte, r rune) []byte { return b } + +//------------------------------------------------------------------------------ + +func WithSpan(ctx context.Context, name string, fn func(context.Context) error) error { + if !trace.SpanFromContext(ctx).IsRecording() { + return fn(ctx) + } + + ctx, span := global.Tracer("github.com/go-redis/redis").Start(ctx, name) + defer span.End() + + return fn(ctx) +} + +func RecordError(ctx context.Context, err error) error { + trace.SpanFromContext(ctx).RecordError(ctx, err) + return err +}