test: `TestRingSetAddrsContention` changed to Benchmark (#2316)

This commit is contained in:
Monkey 2022-12-16 23:34:43 +08:00 committed by GitHub
parent 603e972266
commit 53cc4b4c6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 88 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
*.rdb *.rdb
testdata/*/ testdata/*
.idea/ .idea/

View File

@ -365,3 +365,78 @@ func BenchmarkClusterSetString(b *testing.B) {
} }
}) })
} }
func BenchmarkExecRingSetAddrsCmd(b *testing.B) {
const (
ringShard1Name = "ringShardOne"
ringShard2Name = "ringShardTwo"
)
for _, port := range []string{ringShard1Port, ringShard2Port} {
if _, err := startRedis(port); err != nil {
b.Fatal(err)
}
}
b.Cleanup(func() {
for _, p := range processes {
if err := p.Close(); err != nil {
b.Errorf("Failed to stop redis process: %v", err)
}
}
processes = nil
})
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"ringShardOne": ":" + ringShard1Port,
},
NewClient: func(opt *redis.Options) *redis.Client {
// Simulate slow shard creation
time.Sleep(100 * time.Millisecond)
return redis.NewClient(opt)
},
})
defer ring.Close()
if _, err := ring.Ping(context.Background()).Result(); err != nil {
b.Fatal(err)
}
// Continuously update addresses by adding and removing one address
updatesDone := make(chan struct{})
defer func() { close(updatesDone) }()
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ticker.C:
if i%2 == 0 {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
})
} else {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
ringShard2Name: ":" + ringShard2Port,
})
}
case <-updatesDone:
return
}
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := ring.Ping(context.Background()).Result(); err != nil {
if err == redis.ErrClosed {
// The shard client could be closed while ping command is in progress
continue
} else {
b.Fatal(err)
}
}
}
}

View File

@ -7,7 +7,6 @@ import (
"net" "net"
"strconv" "strconv"
"sync" "sync"
"testing"
"time" "time"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -740,89 +739,3 @@ var _ = Describe("Ring Tx timeout", func() {
testTimeout() testTimeout()
}) })
}) })
func TestRingSetAddrsContention(t *testing.T) {
const (
ringShard1Name = "ringShardOne"
ringShard2Name = "ringShardTwo"
)
for _, port := range []string{ringShard1Port, ringShard2Port} {
if _, err := startRedis(port); err != nil {
t.Fatal(err)
}
}
t.Cleanup(func() {
for _, p := range processes {
if err := p.Close(); err != nil {
t.Errorf("Failed to stop redis process: %v", err)
}
}
processes = nil
})
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"ringShardOne": ":" + ringShard1Port,
},
NewClient: func(opt *redis.Options) *redis.Client {
// Simulate slow shard creation
time.Sleep(100 * time.Millisecond)
return redis.NewClient(opt)
},
})
defer ring.Close()
if _, err := ring.Ping(context.Background()).Result(); err != nil {
t.Fatal(err)
}
// Continuously update addresses by adding and removing one address
updatesDone := make(chan struct{})
defer func() { close(updatesDone) }()
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ticker.C:
if i%2 == 0 {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
})
} else {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
ringShard2Name: ":" + ringShard2Port,
})
}
case <-updatesDone:
return
}
}
}()
var pings, errClosed int
timer := time.NewTimer(1 * time.Second)
for running := true; running; pings++ {
select {
case <-timer.C:
running = false
default:
if _, err := ring.Ping(context.Background()).Result(); err != nil {
if err == redis.ErrClosed {
// The shard client could be closed while ping command is in progress
errClosed++
} else {
t.Fatal(err)
}
}
}
}
t.Logf("Number of pings: %d, errClosed: %d", pings, errClosed)
if pings < 10_000 {
t.Errorf("Expected at least 10k pings, got: %d", pings)
}
}