redis/main_test.go

495 lines
12 KiB
Go
Raw Normal View History

package redis_test
import (
2020-02-14 15:30:07 +03:00
"context"
2015-12-22 12:44:49 +03:00
"errors"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"sync"
"testing"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
2021-09-08 16:00:52 +03:00
"github.com/go-redis/redis/v8"
)
const (
redisPort = "6380"
redisAddr = ":" + redisPort
redisSecondaryPort = "6381"
)
2015-05-25 16:22:27 +03:00
const (
ringShard1Port = "6390"
ringShard2Port = "6391"
2018-06-29 10:45:05 +03:00
ringShard3Port = "6392"
2015-05-25 16:22:27 +03:00
)
const (
sentinelName = "mymaster"
2020-03-11 17:26:42 +03:00
sentinelMasterPort = "9123"
sentinelSlave1Port = "9124"
sentinelSlave2Port = "9125"
sentinelPort1 = "9126"
sentinelPort2 = "9127"
sentinelPort3 = "9128"
)
const (
aclSentinelUsername = "sentinel-user"
aclSentinelPassword = "sentinel-pass"
aclSentinelName = "my_server"
aclServerPort = "10001"
aclSentinelPort1 = "10002"
aclSentinelPort2 = "10003"
aclSentinelPort3 = "10004"
)
2015-05-25 16:22:27 +03:00
var (
2020-09-09 17:39:13 +03:00
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
aclSentinelAddrs = []string {":" + aclSentinelPort1, ":" + aclSentinelPort2, ":" + aclSentinelPort3}
2020-09-09 17:39:13 +03:00
processes map[string]*redisProcess
redisMain, aclServer *redisProcess
ringShard1, ringShard2, ringShard3 *redisProcess
sentinelMaster, sentinelSlave1, sentinelSlave2 *redisProcess
sentinel1, sentinel2, sentinel3 *redisProcess
aclSentinel1, aclSentinel2, aclSentinel3 *redisProcess
2015-05-25 16:22:27 +03:00
)
var cluster = &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
2019-07-25 13:53:00 +03:00
nodeIDs: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
2020-09-09 17:39:13 +03:00
func registerProcess(port string, p *redisProcess) {
if processes == nil {
processes = make(map[string]*redisProcess)
}
processes[port] = p
}
var _ = BeforeSuite(func() {
var err error
redisMain, err = startRedis(redisPort)
Expect(err).NotTo(HaveOccurred())
2015-05-25 16:22:27 +03:00
ringShard1, err = startRedis(ringShard1Port)
Expect(err).NotTo(HaveOccurred())
ringShard2, err = startRedis(ringShard2Port)
Expect(err).NotTo(HaveOccurred())
2018-06-29 10:45:05 +03:00
ringShard3, err = startRedis(ringShard3Port)
Expect(err).NotTo(HaveOccurred())
sentinelMaster, err = startRedis(sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinel1, err = startSentinel(sentinelPort1, sentinelName, sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinel2, err = startSentinel(sentinelPort2, sentinelName, sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinel3, err = startSentinel(sentinelPort3, sentinelName, sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinelSlave1, err = startRedis(
sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinelSlave2, err = startRedis(
sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
2020-03-11 17:26:42 +03:00
Expect(startCluster(ctx, cluster)).NotTo(HaveOccurred())
aclServer, err = startRedis(aclServerPort)
Expect(err).NotTo(HaveOccurred())
aclSentinel1, err = startSentinelWithAcl(aclSentinelPort1, aclSentinelName, aclServerPort)
Expect(err).NotTo(HaveOccurred())
aclSentinel2, err = startSentinelWithAcl(aclSentinelPort2, aclSentinelName, aclServerPort)
Expect(err).NotTo(HaveOccurred())
aclSentinel3, err = startSentinelWithAcl(aclSentinelPort3, aclSentinelName, aclServerPort)
Expect(err).NotTo(HaveOccurred())
})
var _ = AfterSuite(func() {
2020-09-09 17:39:13 +03:00
Expect(cluster.Close()).NotTo(HaveOccurred())
2020-09-09 17:39:13 +03:00
for _, p := range processes {
Expect(p.Close()).NotTo(HaveOccurred())
}
processes = nil
})
func TestGinkgoSuite(t *testing.T) {
RegisterFailHandler(Fail)
2017-02-18 17:42:34 +03:00
RunSpecs(t, "go-redis")
}
//------------------------------------------------------------------------------
func redisOptions() *redis.Options {
return &redis.Options{
2020-09-11 11:24:38 +03:00
Addr: redisAddr,
DB: 15,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRetries: -1,
2016-03-17 19:00:47 +03:00
PoolSize: 10,
PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
IdleCheckFrequency: 100 * time.Millisecond,
}
}
func redisClusterOptions() *redis.ClusterOptions {
return &redis.ClusterOptions{
2020-09-11 11:24:38 +03:00
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRedirects: 8,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
IdleCheckFrequency: 100 * time.Millisecond,
}
}
func redisRingOptions() *redis.RingOptions {
return &redis.RingOptions{
Addrs: map[string]string{
"ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port,
},
2020-09-11 11:24:38 +03:00
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
IdleCheckFrequency: 100 * time.Millisecond,
}
}
2018-03-08 11:16:53 +03:00
func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
var wg sync.WaitGroup
2016-03-17 19:00:47 +03:00
for _, cb := range cbs {
2020-12-06 12:05:42 +03:00
wg.Add(n)
2016-03-17 19:00:47 +03:00
for i := 0; i < n; i++ {
go func(cb func(int), i int) {
defer GinkgoRecover()
defer wg.Done()
cb(i)
}(cb, i)
}
}
2018-03-08 11:16:53 +03:00
return &wg
}
func perform(n int, cbs ...func(int)) {
wg := performAsync(n, cbs...)
wg.Wait()
}
func eventually(fn func() error, timeout time.Duration) error {
2018-08-16 13:25:19 +03:00
errCh := make(chan error, 1)
done := make(chan struct{})
2018-07-22 10:50:26 +03:00
exit := make(chan struct{})
go func() {
2018-07-22 10:50:26 +03:00
for {
err := fn()
2015-11-14 16:54:16 +03:00
if err == nil {
close(done)
return
}
2018-07-22 10:50:26 +03:00
select {
case errCh <- err:
default:
}
2018-07-22 10:50:26 +03:00
select {
case <-exit:
return
case <-time.After(timeout / 100):
}
}
}()
select {
2015-11-14 16:54:16 +03:00
case <-done:
return nil
case <-time.After(timeout):
2018-07-22 10:50:26 +03:00
close(exit)
select {
case err := <-errCh:
return err
default:
2018-08-16 13:25:19 +03:00
return fmt.Errorf("timeout after %s without an error", timeout)
}
}
}
func execCmd(name string, args ...string) (*os.Process, error) {
cmd := exec.Command(name, args...)
if testing.Verbose() {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd.Process, cmd.Start()
}
2015-12-22 12:44:49 +03:00
func connectTo(port string) (*redis.Client, error) {
client := redis.NewClient(&redis.Options{
2020-09-11 11:24:38 +03:00
Addr: ":" + port,
MaxRetries: -1,
})
2015-12-22 12:44:49 +03:00
err := eventually(func() error {
2020-03-11 17:26:42 +03:00
return client.Ping(ctx).Err()
}, 30*time.Second)
2015-12-22 12:44:49 +03:00
if err != nil {
return nil, err
}
2015-12-22 12:44:49 +03:00
return client, nil
}
type redisProcess struct {
*os.Process
*redis.Client
}
func (p *redisProcess) Close() error {
2015-12-22 12:44:49 +03:00
if err := p.Kill(); err != nil {
return err
}
err := eventually(func() error {
2020-03-11 17:26:42 +03:00
if err := p.Client.Ping(ctx).Err(); err != nil {
2015-12-22 12:44:49 +03:00
return nil
}
return errors.New("client is not shutdown")
}, 10*time.Second)
if err != nil {
return err
}
p.Client.Close()
2015-12-22 12:44:49 +03:00
return nil
}
var (
2016-03-12 13:25:59 +03:00
redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf"))
)
func redisDir(port string) (string, error) {
2016-03-12 13:25:59 +03:00
dir, err := filepath.Abs(filepath.Join("testdata", "instances", port))
if err != nil {
return "", err
2015-12-22 12:44:49 +03:00
}
if err := os.RemoveAll(dir); err != nil {
return "", err
2015-12-22 12:44:49 +03:00
}
2021-09-08 16:00:52 +03:00
if err := os.MkdirAll(dir, 0o775); err != nil {
return "", err
}
return dir, nil
}
func startRedis(port string, args ...string) (*redisProcess, error) {
dir, err := redisDir(port)
if err != nil {
return nil, err
}
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
return nil, err
}
baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
2020-09-09 17:39:13 +03:00
p := &redisProcess{process, client}
registerProcess(port, p)
return p, err
}
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
dir, err := redisDir(port)
if err != nil {
return nil, err
}
2020-09-09 17:39:13 +03:00
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
if err != nil {
return nil, err
}
2020-09-09 17:39:13 +03:00
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
2020-09-09 17:39:13 +03:00
// set down-after-milliseconds=2000
// link: https://github.com/redis/redis/issues/8607
for _, cmd := range []*redis.StatusCmd{
redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "2000"),
2020-03-11 17:26:42 +03:00
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "failover-timeout", "1000"),
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "parallel-syncs", "1"),
} {
2020-03-11 17:26:42 +03:00
client.Process(ctx, cmd)
if err := cmd.Err(); err != nil {
process.Kill()
return nil, err
}
}
2020-09-09 17:39:13 +03:00
p := &redisProcess{process, client}
registerProcess(port, p)
return p, nil
}
func startSentinelWithAcl(port, masterName, masterPort string) (*redisProcess, error) {
process, err := startSentinel(port, masterName, masterPort)
if err != nil {
return nil, err
}
for _, cmd := range []*redis.StatusCmd{
redis.NewStatusCmd(ctx, "ACL", "SETUSER", aclSentinelUsername, "ON", ">" + aclSentinelPassword, "-@all",
"+auth", "+client|getname", "+client|id", "+client|setname", "+command", "+hello", "+ping", "+role",
"+sentinel|get-master-addr-by-name", "+sentinel|master", "+sentinel|myid", "+sentinel|replicas",
"+sentinel|sentinels"),
} {
process.Client.Process(ctx, cmd)
if err := cmd.Err(); err != nil {
process.Kill()
return nil, err
}
}
return process, nil
}
//------------------------------------------------------------------------------
type badConnError string
func (e badConnError) Error() string { return string(e) }
2020-07-24 14:57:12 +03:00
func (e badConnError) Timeout() bool { return true }
func (e badConnError) Temporary() bool { return false }
2015-09-06 13:50:16 +03:00
type badConn struct {
net.TCPConn
2015-09-06 13:50:16 +03:00
readDelay, writeDelay time.Duration
readErr, writeErr error
}
2015-09-06 13:50:16 +03:00
var _ net.Conn = &badConn{}
func (cn *badConn) SetReadDeadline(t time.Time) error {
return nil
}
func (cn *badConn) SetWriteDeadline(t time.Time) error {
return nil
}
2015-09-06 13:50:16 +03:00
func (cn *badConn) Read([]byte) (int, error) {
if cn.readDelay != 0 {
time.Sleep(cn.readDelay)
}
if cn.readErr != nil {
return 0, cn.readErr
}
return 0, badConnError("bad connection")
}
2015-09-06 13:50:16 +03:00
func (cn *badConn) Write([]byte) (int, error) {
if cn.writeDelay != 0 {
time.Sleep(cn.writeDelay)
}
if cn.writeErr != nil {
return 0, cn.writeErr
}
return 0, badConnError("bad connection")
}
2020-02-14 15:30:07 +03:00
//------------------------------------------------------------------------------
2020-02-14 15:30:07 +03:00
type hook struct {
beforeProcess func(ctx context.Context, cmd redis.Cmder) (context.Context, error)
afterProcess func(ctx context.Context, cmd redis.Cmder) error
beforeProcessPipeline func(ctx context.Context, cmds []redis.Cmder) (context.Context, error)
afterProcessPipeline func(ctx context.Context, cmds []redis.Cmder) error
}
func (h *hook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
if h.beforeProcess != nil {
return h.beforeProcess(ctx, cmd)
}
return ctx, nil
}
func (h *hook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
if h.afterProcess != nil {
return h.afterProcess(ctx, cmd)
}
return nil
}
func (h *hook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
if h.beforeProcessPipeline != nil {
return h.beforeProcessPipeline(ctx, cmds)
}
return ctx, nil
}
func (h *hook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
if h.afterProcessPipeline != nil {
return h.afterProcessPipeline(ctx, cmds)
}
return nil
}