Add OpenTelemetryHook

This commit is contained in:
Vladimir Mihailenco 2020-06-09 16:29:53 +03:00
parent 0f875ec0f6
commit 8eed1e67ff
8 changed files with 265 additions and 98 deletions

View File

@ -74,13 +74,13 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
} }
func cmdString(cmd Cmder, val interface{}) string { func cmdString(cmd Cmder, val interface{}) string {
b := make([]byte, 0, 32) b := make([]byte, 0, 64)
for i, arg := range cmd.Args() { for i, arg := range cmd.Args() {
if i > 0 { if i > 0 {
b = append(b, ' ') b = append(b, ' ')
} }
b = appendArg(b, arg) b = internal.AppendArg(b, arg)
} }
if err := cmd.Err(); err != nil { if err := cmd.Err(); err != nil {
@ -88,48 +88,10 @@ func cmdString(cmd Cmder, val interface{}) string {
b = append(b, err.Error()...) b = append(b, err.Error()...)
} else if val != nil { } else if val != nil {
b = append(b, ": "...) b = append(b, ": "...)
b = internal.AppendArg(b, val)
switch val := val.(type) {
case []byte:
b = append(b, val...)
default:
b = appendArg(b, val)
}
} }
return string(b) return internal.String(b)
}
func appendArg(b []byte, v interface{}) []byte {
switch v := v.(type) {
case nil:
return append(b, "<nil>"...)
case string:
return append(b, v...)
case []byte:
return append(b, v...)
case int:
return strconv.AppendInt(b, int64(v), 10)
case int32:
return strconv.AppendInt(b, int64(v), 10)
case int64:
return strconv.AppendInt(b, v, 10)
case uint:
return strconv.AppendUint(b, uint64(v), 10)
case uint32:
return strconv.AppendUint(b, uint64(v), 10)
case uint64:
return strconv.AppendUint(b, v, 10)
case bool:
if v {
return append(b, "true"...)
}
return append(b, "false"...)
case time.Time:
return v.AppendFormat(b, time.RFC3339Nano)
default:
return append(b, fmt.Sprint(v)...)
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@ -1,6 +1,7 @@
package pool package pool
import ( import (
"bufio"
"context" "context"
"net" "net"
"sync/atomic" "sync/atomic"
@ -13,15 +14,16 @@ import (
var noDeadline = time.Time{} var noDeadline = time.Time{}
type Conn struct { type Conn struct {
usedAt int64 // atomic
netConn net.Conn netConn net.Conn
rd *proto.Reader rd *proto.Reader
bw *bufio.Writer
wr *proto.Writer wr *proto.Writer
Inited bool Inited bool
pooled bool pooled bool
createdAt time.Time createdAt time.Time
usedAt int64 // atomic
} }
func NewConn(netConn net.Conn) *Conn { func NewConn(netConn net.Conn) *Conn {
@ -30,7 +32,8 @@ func NewConn(netConn net.Conn) *Conn {
createdAt: time.Now(), createdAt: time.Now(),
} }
cn.rd = proto.NewReader(netConn) cn.rd = proto.NewReader(netConn)
cn.wr = proto.NewWriter(netConn) cn.bw = bufio.NewWriter(netConn)
cn.wr = proto.NewWriter(cn.bw)
cn.SetUsedAt(time.Now()) cn.SetUsedAt(time.Now())
return cn return cn
} }
@ -47,7 +50,7 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
func (cn *Conn) SetNetConn(netConn net.Conn) { func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn cn.netConn = netConn
cn.rd.Reset(netConn) cn.rd.Reset(netConn)
cn.wr.Reset(netConn) cn.bw.Reset(netConn)
} }
func (cn *Conn) Write(b []byte) (int, error) { func (cn *Conn) Write(b []byte) (int, error) {
@ -77,8 +80,8 @@ func (cn *Conn) WithWriter(
return err return err
} }
if cn.wr.Buffered() > 0 { if cn.bw.Buffered() > 0 {
cn.wr.Reset(cn.netConn) cn.bw.Reset(cn.netConn)
} }
err = fn(cn.wr) err = fn(cn.wr)
@ -86,7 +89,7 @@ func (cn *Conn) WithWriter(
return err return err
} }
return cn.wr.Flush() return cn.bw.Flush()
}) })
} }

View File

@ -1,7 +1,6 @@
package proto package proto
import ( import (
"bufio"
"encoding" "encoding"
"fmt" "fmt"
"io" "io"
@ -11,16 +10,22 @@ import (
"github.com/go-redis/redis/v8/internal/util" "github.com/go-redis/redis/v8/internal/util"
) )
type writer interface {
io.Writer
io.ByteWriter
io.StringWriter
}
type Writer struct { type Writer struct {
wr *bufio.Writer writer
lenBuf []byte lenBuf []byte
numBuf []byte numBuf []byte
} }
func NewWriter(wr io.Writer) *Writer { func NewWriter(wr writer) *Writer {
return &Writer{ return &Writer{
wr: bufio.NewWriter(wr), writer: wr,
lenBuf: make([]byte, 64), lenBuf: make([]byte, 64),
numBuf: make([]byte, 64), numBuf: make([]byte, 64),
@ -28,19 +33,16 @@ func NewWriter(wr io.Writer) *Writer {
} }
func (w *Writer) WriteArgs(args []interface{}) error { func (w *Writer) WriteArgs(args []interface{}) error {
err := w.wr.WriteByte(ArrayReply) if err := w.WriteByte(ArrayReply); err != nil {
if err != nil {
return err return err
} }
err = w.writeLen(len(args)) if err := w.writeLen(len(args)); err != nil {
if err != nil {
return err return err
} }
for _, arg := range args { for _, arg := range args {
err := w.writeArg(arg) if err := w.WriteArg(arg); err != nil {
if err != nil {
return err return err
} }
} }
@ -51,11 +53,11 @@ func (w *Writer) WriteArgs(args []interface{}) error {
func (w *Writer) writeLen(n int) error { func (w *Writer) writeLen(n int) error {
w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10) w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10)
w.lenBuf = append(w.lenBuf, '\r', '\n') w.lenBuf = append(w.lenBuf, '\r', '\n')
_, err := w.wr.Write(w.lenBuf) _, err := w.Write(w.lenBuf)
return err return err
} }
func (w *Writer) writeArg(v interface{}) error { func (w *Writer) WriteArg(v interface{}) error {
switch v := v.(type) { switch v := v.(type) {
case nil: case nil:
return w.string("") return w.string("")
@ -108,18 +110,15 @@ func (w *Writer) writeArg(v interface{}) error {
} }
func (w *Writer) bytes(b []byte) error { func (w *Writer) bytes(b []byte) error {
err := w.wr.WriteByte(StringReply) if err := w.WriteByte(StringReply); err != nil {
if err != nil {
return err return err
} }
err = w.writeLen(len(b)) if err := w.writeLen(len(b)); err != nil {
if err != nil {
return err return err
} }
_, err = w.wr.Write(b) if _, err := w.Write(b); err != nil {
if err != nil {
return err return err
} }
@ -146,21 +145,8 @@ func (w *Writer) float(f float64) error {
} }
func (w *Writer) crlf() error { func (w *Writer) crlf() error {
err := w.wr.WriteByte('\r') if err := w.WriteByte('\r'); err != nil {
if err != nil {
return err return err
} }
return w.wr.WriteByte('\n') return w.WriteByte('\n')
}
func (w *Writer) Buffered() int {
return w.wr.Buffered()
}
func (w *Writer) Reset(wr io.Writer) {
w.wr.Reset(wr)
}
func (w *Writer) Flush() error {
return w.wr.Flush()
} }

View File

@ -3,7 +3,6 @@ package proto_test
import ( import (
"bytes" "bytes"
"encoding" "encoding"
"io/ioutil"
"testing" "testing"
"time" "time"
@ -41,9 +40,6 @@ var _ = Describe("WriteBuffer", func() {
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = wr.Flush()
Expect(err).NotTo(HaveOccurred())
Expect(buf.Bytes()).To(Equal([]byte("*6\r\n" + Expect(buf.Bytes()).To(Equal([]byte("*6\r\n" +
"$6\r\nstring\r\n" + "$6\r\nstring\r\n" +
"$2\r\n12\r\n" + "$2\r\n12\r\n" +
@ -59,9 +55,6 @@ var _ = Describe("WriteBuffer", func() {
err := wr.WriteArgs([]interface{}{tm}) err := wr.WriteArgs([]interface{}{tm})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = wr.Flush()
Expect(err).NotTo(HaveOccurred())
Expect(buf.Len()).To(Equal(41)) Expect(buf.Len()).To(Equal(41))
}) })
@ -69,26 +62,32 @@ var _ = Describe("WriteBuffer", func() {
err := wr.WriteArgs([]interface{}{&MyType{}}) err := wr.WriteArgs([]interface{}{&MyType{}})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = wr.Flush()
Expect(err).NotTo(HaveOccurred())
Expect(buf.Len()).To(Equal(15)) Expect(buf.Len()).To(Equal(15))
}) })
}) })
type discard struct{}
func (discard) Write(b []byte) (int, error) {
return len(b), nil
}
func (discard) WriteString(s string) (int, error) {
return len(s), nil
}
func (discard) WriteByte(c byte) error {
return nil
}
func BenchmarkWriteBuffer_Append(b *testing.B) { func BenchmarkWriteBuffer_Append(b *testing.B) {
buf := proto.NewWriter(ioutil.Discard) buf := proto.NewWriter(discard{})
args := []interface{}{"hello", "world", "foo", "bar"} args := []interface{}{"hello", "world", "foo", "bar"}
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
err := buf.WriteArgs(args) err := buf.WriteArgs(args)
if err != nil { if err != nil {
panic(err) b.Fatal(err)
}
err = buf.Flush()
if err != nil {
panic(err)
} }
} }
} }

11
internal/safe.go Normal file
View File

@ -0,0 +1,11 @@
// +build appengine
package internal
func String(b []byte) string {
return string(b)
}
func Bytes(s string) []byte {
return []byte(s)
}

20
internal/unsafe.go Normal file
View File

@ -0,0 +1,20 @@
// +build !appengine
package internal
import "unsafe"
// String converts byte slice to string.
func String(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
// Bytes converts string to byte slice.
func Bytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}

View File

@ -2,7 +2,10 @@ package internal
import ( import (
"context" "context"
"fmt"
"strconv"
"time" "time"
"unicode/utf8"
"github.com/go-redis/redis/v8/internal/util" "github.com/go-redis/redis/v8/internal/util"
"go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global"
@ -69,3 +72,74 @@ func WithSpan(ctx context.Context, name string, fn func(context.Context) error)
return fn(ctx) return fn(ctx)
} }
func AppendArg(b []byte, v interface{}) []byte {
switch v := v.(type) {
case nil:
return append(b, "<nil>"...)
case string:
return appendUTF8String(b, v)
case []byte:
return appendUTF8String(b, String(v))
case int:
return strconv.AppendInt(b, int64(v), 10)
case int8:
return strconv.AppendInt(b, int64(v), 10)
case int16:
return strconv.AppendInt(b, int64(v), 10)
case int32:
return strconv.AppendInt(b, int64(v), 10)
case int64:
return strconv.AppendInt(b, v, 10)
case uint:
return strconv.AppendUint(b, uint64(v), 10)
case uint8:
return strconv.AppendUint(b, uint64(v), 10)
case uint16:
return strconv.AppendUint(b, uint64(v), 10)
case uint32:
return strconv.AppendUint(b, uint64(v), 10)
case uint64:
return strconv.AppendUint(b, v, 10)
case float32:
return strconv.AppendFloat(b, float64(v), 'f', -1, 64)
case float64:
return strconv.AppendFloat(b, v, 'f', -1, 64)
case bool:
if v {
return append(b, "true"...)
}
return append(b, "false"...)
case time.Time:
return v.AppendFormat(b, time.RFC3339Nano)
default:
return append(b, fmt.Sprint(v)...)
}
}
func appendUTF8String(b []byte, s string) []byte {
for _, r := range s {
b = appendRune(b, r)
}
return b
}
func appendRune(b []byte, r rune) []byte {
if r < utf8.RuneSelf {
switch c := byte(r); c {
case '\n':
return append(b, "\\n"...)
case '\r':
return append(b, "\\r"...)
default:
return append(b, c)
}
}
l := len(b)
b = append(b, make([]byte, utf8.UTFMax)...)
n := utf8.EncodeRune(b[l:l+utf8.UTFMax], r)
b = b[:l+n]
return b
}

112
redisext/otel.go Normal file
View File

@ -0,0 +1,112 @@
package redisext
import (
"context"
"strings"
"github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v8/internal"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
)
type OpenTelemetryHook struct{}
var _ redis.Hook = OpenTelemetryHook{}
func (OpenTelemetryHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
if !trace.SpanFromContext(ctx).IsRecording() {
return ctx, nil
}
b := make([]byte, 32)
b = appendCmd(b, cmd)
tracer := global.Tracer("github.com/go-redis/redis")
ctx, span := tracer.Start(ctx, cmd.FullName())
span.SetAttributes(
kv.String("db.system", "redis"),
kv.String("redis.cmd", internal.String(b)),
)
return ctx, nil
}
func (OpenTelemetryHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
trace.SpanFromContext(ctx).End()
return nil
}
func (OpenTelemetryHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
if !trace.SpanFromContext(ctx).IsRecording() {
return ctx, nil
}
const numCmdLimit = 100
const numNameLimit = 10
seen := make(map[string]struct{}, len(cmds))
unqNames := make([]string, 0, len(cmds))
b := make([]byte, 0, 32*len(cmds))
for i, cmd := range cmds {
if i > numCmdLimit {
break
}
if i > 0 {
b = append(b, '\n')
}
b = appendCmd(b, cmd)
if len(unqNames) >= numNameLimit {
continue
}
name := cmd.FullName()
if _, ok := seen[name]; !ok {
seen[name] = struct{}{}
unqNames = append(unqNames, name)
}
}
tracer := global.Tracer("github.com/go-redis/redis")
ctx, span := tracer.Start(ctx, "pipeline "+strings.Join(unqNames, " "))
span.SetAttributes(
kv.String("db.system", "redis"),
kv.Int("redis.num_cmd", len(cmds)),
kv.String("redis.cmds", internal.String(b)),
)
return ctx, nil
}
func (OpenTelemetryHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
trace.SpanFromContext(ctx).End()
return nil
}
func appendCmd(b []byte, cmd redis.Cmder) []byte {
const lenLimit = 64
for i, arg := range cmd.Args() {
if i > 0 {
b = append(b, ' ')
}
start := len(b)
b = internal.AppendArg(b, arg)
if len(b)-start > lenLimit {
b = append(b[:start+lenLimit], "..."...)
}
}
if err := cmd.Err(); err != nil {
b = append(b, ": "...)
b = append(b, err.Error()...)
}
return b
}