2015-01-24 15:12:48 +03:00
package redis_test
import (
2019-07-04 11:18:06 +03:00
"context"
2021-10-16 07:04:25 +03:00
"crypto/tls"
"errors"
2015-11-14 16:54:16 +03:00
"fmt"
2015-04-28 18:14:19 +03:00
"net"
2021-10-16 07:04:25 +03:00
"reflect"
2015-12-16 17:11:52 +03:00
"strconv"
2015-11-14 16:54:16 +03:00
"strings"
2015-12-16 17:11:52 +03:00
"sync"
2021-10-16 07:04:25 +03:00
"testing"
2015-03-18 13:41:24 +03:00
"time"
2015-01-24 15:12:48 +03:00
2016-12-16 17:26:48 +03:00
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
2021-09-08 16:00:52 +03:00
"github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v8/internal/hashtag"
2015-01-24 15:12:48 +03:00
)
2015-04-28 18:14:19 +03:00
type clusterScenario struct {
ports [ ] string
2019-07-25 13:53:00 +03:00
nodeIDs [ ] string
2015-04-28 18:14:19 +03:00
processes map [ string ] * redisProcess
clients map [ string ] * redis . Client
}
func ( s * clusterScenario ) masters ( ) [ ] * redis . Client {
result := make ( [ ] * redis . Client , 3 )
for pos , port := range s . ports [ : 3 ] {
result [ pos ] = s . clients [ port ]
2015-01-24 15:12:48 +03:00
}
2015-04-28 18:14:19 +03:00
return result
}
2015-01-24 15:12:48 +03:00
2015-04-28 18:14:19 +03:00
func ( s * clusterScenario ) slaves ( ) [ ] * redis . Client {
result := make ( [ ] * redis . Client , 3 )
for pos , port := range s . ports [ 3 : ] {
result [ pos ] = s . clients [ port ]
}
return result
}
2015-01-24 15:12:48 +03:00
2017-02-17 13:12:06 +03:00
func ( s * clusterScenario ) addrs ( ) [ ] string {
2015-04-28 18:14:19 +03:00
addrs := make ( [ ] string , len ( s . ports ) )
for i , port := range s . ports {
addrs [ i ] = net . JoinHostPort ( "127.0.0.1" , port )
}
2017-02-17 13:12:06 +03:00
return addrs
}
2020-09-05 11:34:37 +03:00
func ( s * clusterScenario ) newClusterClientUnstable ( opt * redis . ClusterOptions ) * redis . ClusterClient {
2017-02-17 13:12:06 +03:00
opt . Addrs = s . addrs ( )
2018-12-13 13:26:02 +03:00
return redis . NewClusterClient ( opt )
}
2020-03-11 17:26:42 +03:00
func ( s * clusterScenario ) newClusterClient (
ctx context . Context , opt * redis . ClusterOptions ,
) * redis . ClusterClient {
2020-09-05 11:34:37 +03:00
client := s . newClusterClientUnstable ( opt )
2018-07-23 12:01:13 +03:00
2018-07-22 10:50:26 +03:00
err := eventually ( func ( ) error {
2018-07-23 12:01:13 +03:00
if opt . ClusterSlots != nil {
return nil
}
2020-03-11 17:26:42 +03:00
state , err := client . LoadState ( ctx )
2018-05-17 16:09:56 +03:00
if err != nil {
2018-07-22 10:50:26 +03:00
return err
}
2018-07-23 12:01:13 +03:00
2020-03-11 17:26:42 +03:00
if ! state . IsConsistent ( ctx ) {
2018-11-24 14:16:21 +03:00
return fmt . Errorf ( "cluster state is not consistent" )
2018-07-23 12:01:13 +03:00
}
2018-07-22 10:50:26 +03:00
return nil
} , 30 * time . Second )
if err != nil {
panic ( err )
}
2018-07-23 12:01:13 +03:00
2018-05-17 16:09:56 +03:00
return client
2015-04-28 18:14:19 +03:00
}
2015-01-24 15:12:48 +03:00
2020-09-09 17:39:13 +03:00
func ( s * clusterScenario ) Close ( ) error {
for _ , port := range s . ports {
processes [ port ] . Close ( )
delete ( processes , port )
}
return nil
}
2020-03-11 17:26:42 +03:00
func startCluster ( ctx context . Context , scenario * clusterScenario ) error {
2015-11-14 16:54:16 +03:00
// Start processes and collect node ids
2015-04-28 18:14:19 +03:00
for pos , port := range scenario . ports {
process , err := startRedis ( port , "--cluster-enabled" , "yes" )
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
2016-03-16 17:57:24 +03:00
client := redis . NewClient ( & redis . Options {
Addr : ":" + port ,
} )
2020-03-11 17:26:42 +03:00
info , err := client . ClusterNodes ( ctx ) . Result ( )
2015-04-28 18:14:19 +03:00
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
2015-04-28 18:14:19 +03:00
scenario . processes [ port ] = process
scenario . clients [ port ] = client
2019-07-25 13:53:00 +03:00
scenario . nodeIDs [ pos ] = info [ : 40 ]
2015-04-28 18:14:19 +03:00
}
2017-07-09 13:10:07 +03:00
// Meet cluster nodes.
2015-04-28 18:14:19 +03:00
for _ , client := range scenario . clients {
2020-03-11 17:26:42 +03:00
err := client . ClusterMeet ( ctx , "127.0.0.1" , scenario . ports [ 0 ] ) . Err ( )
2015-04-28 18:14:19 +03:00
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
2015-04-28 18:14:19 +03:00
}
2015-01-24 15:12:48 +03:00
2017-07-09 13:10:07 +03:00
// Bootstrap masters.
2015-04-28 18:14:19 +03:00
slots := [ ] int { 0 , 5000 , 10000 , 16384 }
2015-11-14 16:54:16 +03:00
for pos , master := range scenario . masters ( ) {
2020-03-11 17:26:42 +03:00
err := master . ClusterAddSlotsRange ( ctx , slots [ pos ] , slots [ pos + 1 ] - 1 ) . Err ( )
2015-04-28 18:14:19 +03:00
if err != nil {
return err
}
}
2015-01-24 15:12:48 +03:00
2017-07-09 13:10:07 +03:00
// Bootstrap slaves.
2015-11-14 16:54:16 +03:00
for idx , slave := range scenario . slaves ( ) {
2019-07-25 13:53:00 +03:00
masterID := scenario . nodeIDs [ idx ]
2015-11-14 16:54:16 +03:00
// Wait until master is available
err := eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
s := slave . ClusterNodes ( ctx ) . Val ( )
2019-07-25 13:53:00 +03:00
wanted := masterID
2015-11-14 16:54:16 +03:00
if ! strings . Contains ( s , wanted ) {
return fmt . Errorf ( "%q does not contain %q" , s , wanted )
}
return nil
} , 10 * time . Second )
2015-04-28 18:14:19 +03:00
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
2020-03-11 17:26:42 +03:00
err = slave . ClusterReplicate ( ctx , masterID ) . Err ( )
2015-04-28 18:14:19 +03:00
if err != nil {
return err
2015-01-24 15:12:48 +03:00
}
2015-04-28 18:14:19 +03:00
}
2015-01-24 15:12:48 +03:00
2017-07-09 13:10:07 +03:00
// Wait until all nodes have consistent info.
2018-05-17 16:09:56 +03:00
wanted := [ ] redis . ClusterSlot { {
Start : 0 ,
End : 4999 ,
Nodes : [ ] redis . ClusterNode { {
2019-07-25 13:53:00 +03:00
ID : "" ,
2018-05-17 16:09:56 +03:00
Addr : "127.0.0.1:8220" ,
} , {
2019-07-25 13:53:00 +03:00
ID : "" ,
2018-05-17 16:09:56 +03:00
Addr : "127.0.0.1:8223" ,
} } ,
} , {
Start : 5000 ,
End : 9999 ,
Nodes : [ ] redis . ClusterNode { {
2019-07-25 13:53:00 +03:00
ID : "" ,
2018-05-17 16:09:56 +03:00
Addr : "127.0.0.1:8221" ,
} , {
2019-07-25 13:53:00 +03:00
ID : "" ,
2018-05-17 16:09:56 +03:00
Addr : "127.0.0.1:8224" ,
} } ,
} , {
Start : 10000 ,
End : 16383 ,
Nodes : [ ] redis . ClusterNode { {
2019-07-25 13:53:00 +03:00
ID : "" ,
2018-05-17 16:09:56 +03:00
Addr : "127.0.0.1:8222" ,
} , {
2019-07-25 13:53:00 +03:00
ID : "" ,
2018-05-17 16:09:56 +03:00
Addr : "127.0.0.1:8225" ,
} } ,
} }
2015-04-28 18:14:19 +03:00
for _ , client := range scenario . clients {
2015-11-14 16:54:16 +03:00
err := eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
res , err := client . ClusterSlots ( ctx ) . Result ( )
2015-11-22 15:44:38 +03:00
if err != nil {
return err
2015-11-21 14:16:13 +03:00
}
2016-04-09 12:52:43 +03:00
return assertSlotsEqual ( res , wanted )
2016-03-14 17:51:46 +03:00
} , 30 * time . Second )
2015-04-28 18:14:19 +03:00
if err != nil {
return err
}
}
return nil
}
2016-04-09 12:52:43 +03:00
func assertSlotsEqual ( slots , wanted [ ] redis . ClusterSlot ) error {
2018-10-11 13:58:31 +03:00
outerLoop :
2016-04-09 12:52:43 +03:00
for _ , s2 := range wanted {
for _ , s1 := range slots {
if slotEqual ( s1 , s2 ) {
2018-10-11 13:58:31 +03:00
continue outerLoop
2016-04-09 12:52:43 +03:00
}
}
return fmt . Errorf ( "%v not found in %v" , s2 , slots )
}
return nil
}
func slotEqual ( s1 , s2 redis . ClusterSlot ) bool {
if s1 . Start != s2 . Start {
return false
}
if s1 . End != s2 . End {
return false
}
2016-12-16 17:26:48 +03:00
if len ( s1 . Nodes ) != len ( s2 . Nodes ) {
return false
}
2016-04-09 12:52:43 +03:00
for i , n1 := range s1 . Nodes {
if n1 . Addr != s2 . Nodes [ i ] . Addr {
return false
}
}
return true
}
2015-04-28 18:14:19 +03:00
//------------------------------------------------------------------------------
2016-10-02 15:44:01 +03:00
var _ = Describe ( "ClusterClient" , func ( ) {
2018-05-17 16:09:56 +03:00
var failover bool
2016-12-16 17:26:48 +03:00
var opt * redis . ClusterOptions
2016-10-02 15:44:01 +03:00
var client * redis . ClusterClient
2015-01-24 15:12:48 +03:00
2016-12-16 17:26:48 +03:00
assertClusterClient := func ( ) {
2019-07-04 11:18:06 +03:00
It ( "supports WithContext" , func ( ) {
2020-03-11 17:26:42 +03:00
ctx , cancel := context . WithCancel ( ctx )
2019-07-04 11:18:06 +03:00
cancel ( )
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2019-07-04 11:18:06 +03:00
Expect ( err ) . To ( MatchError ( "context canceled" ) )
} )
2015-01-24 15:12:48 +03:00
It ( "should GET/SET/DEL" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Get ( ctx , "A" ) . Err ( )
2015-01-24 15:12:48 +03:00
Expect ( err ) . To ( Equal ( redis . Nil ) )
2020-03-11 17:26:42 +03:00
err = client . Set ( ctx , "A" , "VALUE" , 0 ) . Err ( )
2015-01-24 15:12:48 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2017-07-09 13:10:07 +03:00
Eventually ( func ( ) string {
2020-03-11 17:26:42 +03:00
return client . Get ( ctx , "A" ) . Val ( )
2017-08-31 15:22:47 +03:00
} , 30 * time . Second ) . Should ( Equal ( "VALUE" ) )
2015-01-24 15:12:48 +03:00
2020-03-11 17:26:42 +03:00
cnt , err := client . Del ( ctx , "A" ) . Result ( )
2015-01-24 15:12:48 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( cnt ) . To ( Equal ( int64 ( 1 ) ) )
} )
2018-05-17 16:09:56 +03:00
It ( "GET follows redirects" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Set ( ctx , "A" , "VALUE" , 0 ) . Err ( )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2015-05-01 10:42:58 +03:00
2018-05-17 16:09:56 +03:00
if ! failover {
Eventually ( func ( ) int64 {
2020-03-11 17:26:42 +03:00
nodes , err := client . Nodes ( ctx , "A" )
2018-05-17 16:09:56 +03:00
if err != nil {
return 0
}
2020-03-11 17:26:42 +03:00
return nodes [ 1 ] . Client . DBSize ( ctx ) . Val ( )
2018-05-17 16:09:56 +03:00
} , 30 * time . Second ) . Should ( Equal ( int64 ( 1 ) ) )
2015-01-24 15:12:48 +03:00
2018-05-17 16:09:56 +03:00
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
return client . SwapNodes ( ctx , "A" )
2018-05-17 16:09:56 +03:00
} , 30 * time . Second ) . ShouldNot ( HaveOccurred ( ) )
}
2020-03-11 17:26:42 +03:00
v , err := client . Get ( ctx , "A" ) . Result ( )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( v ) . To ( Equal ( "VALUE" ) )
} )
It ( "SET follows redirects" , func ( ) {
if ! failover {
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
return client . SwapNodes ( ctx , "A" )
2018-05-17 16:09:56 +03:00
} , 30 * time . Second ) . ShouldNot ( HaveOccurred ( ) )
}
2020-03-11 17:26:42 +03:00
err := client . Set ( ctx , "A" , "VALUE" , 0 ) . Err ( )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-03-11 17:26:42 +03:00
v , err := client . Get ( ctx , "A" ) . Result ( )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( v ) . To ( Equal ( "VALUE" ) )
2015-05-10 16:01:38 +03:00
} )
2015-12-16 17:11:52 +03:00
2016-10-09 14:12:32 +03:00
It ( "distributes keys" , func ( ) {
for i := 0 ; i < 100 ; i ++ {
2020-03-11 17:26:42 +03:00
err := client . Set ( ctx , fmt . Sprintf ( "key%d" , i ) , "value" , 0 ) . Err ( )
2016-10-09 14:12:32 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
}
2020-03-11 17:26:42 +03:00
client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
2018-05-17 16:09:56 +03:00
defer GinkgoRecover ( )
2017-07-09 13:10:07 +03:00
Eventually ( func ( ) string {
2020-03-11 17:26:42 +03:00
return master . Info ( ctx , "keyspace" ) . Val ( )
2017-08-31 15:22:47 +03:00
} , 30 * time . Second ) . Should ( Or (
2017-07-09 13:10:07 +03:00
ContainSubstring ( "keys=31" ) ,
ContainSubstring ( "keys=29" ) ,
ContainSubstring ( "keys=40" ) ,
) )
2018-05-17 16:09:56 +03:00
return nil
} )
2016-10-09 14:12:32 +03:00
} )
It ( "distributes keys when using EVAL" , func ( ) {
script := redis . NewScript ( `
local r = redis . call ( ' SET ' , KEYS [ 1 ] , ARGV [ 1 ] )
return r
` )
var key string
for i := 0 ; i < 100 ; i ++ {
key = fmt . Sprintf ( "key%d" , i )
2020-03-11 17:26:42 +03:00
err := script . Run ( ctx , client , [ ] string { key } , "value" ) . Err ( )
2016-10-09 14:12:32 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
}
2020-03-11 17:26:42 +03:00
client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
2018-06-29 10:45:05 +03:00
defer GinkgoRecover ( )
2017-07-09 13:10:07 +03:00
Eventually ( func ( ) string {
2020-03-11 17:26:42 +03:00
return master . Info ( ctx , "keyspace" ) . Val ( )
2017-08-31 15:22:47 +03:00
} , 30 * time . Second ) . Should ( Or (
2017-07-09 13:10:07 +03:00
ContainSubstring ( "keys=31" ) ,
ContainSubstring ( "keys=29" ) ,
ContainSubstring ( "keys=40" ) ,
) )
2018-06-29 10:45:05 +03:00
return nil
} )
2016-10-09 14:12:32 +03:00
} )
2021-04-16 16:58:11 +03:00
It ( "distributes scripts when using Script Load" , func ( ) {
client . ScriptFlush ( ctx )
script := redis . NewScript ( ` return 'Unique script' ` )
script . Load ( ctx , client )
client . ForEachShard ( ctx , func ( ctx context . Context , shard * redis . Client ) error {
defer GinkgoRecover ( )
val , _ := script . Exists ( ctx , shard ) . Result ( )
Expect ( val [ 0 ] ) . To ( Equal ( true ) )
return nil
} )
} )
It ( "checks all shards when using Script Exists" , func ( ) {
client . ScriptFlush ( ctx )
script := redis . NewScript ( ` return 'First script' ` )
lostScriptSrc := ` return 'Lost script' `
lostScript := redis . NewScript ( lostScriptSrc )
script . Load ( ctx , client )
client . Do ( ctx , "script" , "load" , lostScriptSrc )
val , _ := client . ScriptExists ( ctx , script . Hash ( ) , lostScript . Hash ( ) ) . Result ( )
Expect ( val ) . To ( Equal ( [ ] bool { true , false } ) )
} )
It ( "flushes scripts from all shards when using ScriptFlush" , func ( ) {
script := redis . NewScript ( ` return 'Unnecessary script' ` )
script . Load ( ctx , client )
val , _ := client . ScriptExists ( ctx , script . Hash ( ) ) . Result ( )
Expect ( val ) . To ( Equal ( [ ] bool { true } ) )
client . ScriptFlush ( ctx )
val , _ = client . ScriptExists ( ctx , script . Hash ( ) ) . Result ( )
Expect ( val ) . To ( Equal ( [ ] bool { false } ) )
} )
2016-06-17 15:09:38 +03:00
It ( "supports Watch" , func ( ) {
2015-12-16 17:11:52 +03:00
var incr func ( string ) error
// Transactionally increments key using GET and SET commands.
incr = func ( key string ) error {
2020-03-11 17:26:42 +03:00
err := client . Watch ( ctx , func ( tx * redis . Tx ) error {
n , err := tx . Get ( ctx , key ) . Int64 ( )
2016-05-02 15:54:15 +03:00
if err != nil && err != redis . Nil {
return err
}
2020-03-11 17:26:42 +03:00
_ , err = tx . TxPipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Set ( ctx , key , strconv . FormatInt ( n + 1 , 10 ) , 0 )
2016-05-02 15:54:15 +03:00
return nil
} )
2015-12-16 17:11:52 +03:00
return err
2016-05-02 15:54:15 +03:00
} , key )
2015-12-16 17:11:52 +03:00
if err == redis . TxFailedErr {
return incr ( key )
}
return err
}
var wg sync . WaitGroup
for i := 0 ; i < 100 ; i ++ {
wg . Add ( 1 )
go func ( ) {
2016-07-02 15:52:10 +03:00
defer GinkgoRecover ( )
2015-12-16 17:11:52 +03:00
defer wg . Done ( )
err := incr ( "key" )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} ( )
}
wg . Wait ( )
2018-02-15 14:00:54 +03:00
Eventually ( func ( ) string {
2020-03-11 17:26:42 +03:00
return client . Get ( ctx , "key" ) . Val ( )
2018-02-15 14:00:54 +03:00
} , 30 * time . Second ) . Should ( Equal ( "100" ) )
2015-12-16 17:11:52 +03:00
} )
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
Describe ( "pipelining" , func ( ) {
var pipe * redis . Pipeline
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
assertPipeline := func ( ) {
2016-12-16 17:26:48 +03:00
keys := [ ] string { "A" , "B" , "C" , "D" , "E" , "F" , "G" }
2016-04-06 14:01:08 +03:00
2016-12-16 17:26:48 +03:00
It ( "follows redirects" , func ( ) {
2018-05-17 16:09:56 +03:00
if ! failover {
for _ , key := range keys {
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
return client . SwapNodes ( ctx , key )
2018-05-17 16:09:56 +03:00
} , 30 * time . Second ) . ShouldNot ( HaveOccurred ( ) )
}
2016-12-16 17:26:48 +03:00
}
2016-06-05 14:30:56 +03:00
2016-12-13 18:28:39 +03:00
for i , key := range keys {
2020-03-11 17:26:42 +03:00
pipe . Set ( ctx , key , key + "_value" , 0 )
pipe . Expire ( ctx , key , time . Duration ( i + 1 ) * time . Hour )
2016-12-13 18:28:39 +03:00
}
2020-03-11 17:26:42 +03:00
cmds , err := pipe . Exec ( ctx )
2016-12-13 18:28:39 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( cmds ) . To ( HaveLen ( 14 ) )
2016-06-05 14:30:56 +03:00
2020-06-10 15:04:12 +03:00
_ = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2018-02-12 17:15:40 +03:00
defer GinkgoRecover ( )
Eventually ( func ( ) int64 {
2020-03-11 17:26:42 +03:00
return node . DBSize ( ctx ) . Val ( )
2018-02-12 17:15:40 +03:00
} , 30 * time . Second ) . ShouldNot ( BeZero ( ) )
return nil
} )
2018-05-17 16:09:56 +03:00
if ! failover {
for _ , key := range keys {
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
return client . SwapNodes ( ctx , key )
2018-05-17 16:09:56 +03:00
} , 30 * time . Second ) . ShouldNot ( HaveOccurred ( ) )
}
2016-12-16 17:26:48 +03:00
}
2016-12-13 18:28:39 +03:00
for _ , key := range keys {
2020-03-11 17:26:42 +03:00
pipe . Get ( ctx , key )
pipe . TTL ( ctx , key )
2016-12-13 18:28:39 +03:00
}
2020-03-11 17:26:42 +03:00
cmds , err = pipe . Exec ( ctx )
2016-12-13 18:28:39 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( cmds ) . To ( HaveLen ( 14 ) )
2016-12-16 17:26:48 +03:00
for i , key := range keys {
get := cmds [ i * 2 ] . ( * redis . StringCmd )
Expect ( get . Val ( ) ) . To ( Equal ( key + "_value" ) )
ttl := cmds [ ( i * 2 ) + 1 ] . ( * redis . DurationCmd )
2017-08-15 10:34:05 +03:00
dur := time . Duration ( i + 1 ) * time . Hour
2018-07-23 15:55:13 +03:00
Expect ( ttl . Val ( ) ) . To ( BeNumerically ( "~" , dur , 30 * time . Second ) )
2016-12-16 17:26:48 +03:00
}
2016-12-13 18:28:39 +03:00
} )
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
It ( "works with missing keys" , func ( ) {
2020-03-11 17:26:42 +03:00
pipe . Set ( ctx , "A" , "A_value" , 0 )
pipe . Set ( ctx , "C" , "C_value" , 0 )
_ , err := pipe . Exec ( ctx )
2016-12-16 17:26:48 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-03-11 17:26:42 +03:00
a := pipe . Get ( ctx , "A" )
b := pipe . Get ( ctx , "B" )
c := pipe . Get ( ctx , "C" )
cmds , err := pipe . Exec ( ctx )
2016-12-13 18:28:39 +03:00
Expect ( err ) . To ( Equal ( redis . Nil ) )
Expect ( cmds ) . To ( HaveLen ( 3 ) )
Expect ( a . Err ( ) ) . NotTo ( HaveOccurred ( ) )
Expect ( a . Val ( ) ) . To ( Equal ( "A_value" ) )
Expect ( b . Err ( ) ) . To ( Equal ( redis . Nil ) )
Expect ( b . Val ( ) ) . To ( Equal ( "" ) )
Expect ( c . Err ( ) ) . NotTo ( HaveOccurred ( ) )
Expect ( c . Val ( ) ) . To ( Equal ( "C_value" ) )
} )
}
2017-07-09 13:10:07 +03:00
Describe ( "with Pipeline" , func ( ) {
2016-12-13 18:28:39 +03:00
BeforeEach ( func ( ) {
2017-05-01 18:42:58 +03:00
pipe = client . Pipeline ( ) . ( * redis . Pipeline )
2016-10-09 14:12:32 +03:00
} )
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
AfterEach ( func ( ) {
Expect ( pipe . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
assertPipeline ( )
} )
2016-04-06 14:01:08 +03:00
2017-07-09 13:10:07 +03:00
Describe ( "with TxPipeline" , func ( ) {
2016-12-13 18:28:39 +03:00
BeforeEach ( func ( ) {
2017-05-01 18:42:58 +03:00
pipe = client . TxPipeline ( ) . ( * redis . Pipeline )
2016-12-13 18:28:39 +03:00
} )
AfterEach ( func ( ) {
Expect ( pipe . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
2016-04-06 14:01:08 +03:00
2016-12-13 18:28:39 +03:00
assertPipeline ( )
2016-10-09 14:12:32 +03:00
} )
2016-04-06 14:01:08 +03:00
} )
2016-06-17 15:09:38 +03:00
2017-07-09 10:07:20 +03:00
It ( "supports PubSub" , func ( ) {
2020-03-11 17:26:42 +03:00
pubsub := client . Subscribe ( ctx , "mychannel" )
2017-07-09 10:07:20 +03:00
defer pubsub . Close ( )
2017-07-09 13:10:07 +03:00
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
_ , err := client . Publish ( ctx , "mychannel" , "hello" ) . Result ( )
2017-07-09 13:10:07 +03:00
if err != nil {
return err
}
2017-07-09 10:07:20 +03:00
2020-03-11 17:26:42 +03:00
msg , err := pubsub . ReceiveTimeout ( ctx , time . Second )
2017-07-09 13:10:07 +03:00
if err != nil {
return err
}
2017-07-09 10:07:20 +03:00
2017-07-09 13:10:07 +03:00
_ , ok := msg . ( * redis . Message )
if ! ok {
return fmt . Errorf ( "got %T, wanted *redis.Message" , msg )
}
return nil
} , 30 * time . Second ) . ShouldNot ( HaveOccurred ( ) )
} )
2019-03-07 13:19:03 +03:00
It ( "supports PubSub.Ping without channels" , func ( ) {
2020-03-11 17:26:42 +03:00
pubsub := client . Subscribe ( ctx )
2019-03-07 13:19:03 +03:00
defer pubsub . Close ( )
2020-03-11 17:26:42 +03:00
err := pubsub . Ping ( ctx )
2019-03-07 13:19:03 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
2020-09-11 12:46:38 +03:00
}
Describe ( "ClusterClient" , func ( ) {
BeforeEach ( func ( ) {
opt = redisClusterOptions ( )
client = cluster . newClusterClient ( ctx , opt )
err := client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
AfterEach ( func ( ) {
_ = client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
} )
Expect ( client . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
It ( "returns pool stats" , func ( ) {
stats := client . PoolStats ( )
Expect ( stats ) . To ( BeAssignableToTypeOf ( & redis . PoolStats { } ) )
} )
It ( "returns an error when there are no attempts left" , func ( ) {
opt := redisClusterOptions ( )
opt . MaxRedirects = - 1
client := cluster . newClusterClient ( ctx , opt )
Eventually ( func ( ) error {
return client . SwapNodes ( ctx , "A" )
} , 30 * time . Second ) . ShouldNot ( HaveOccurred ( ) )
err := client . Get ( ctx , "A" ) . Err ( )
Expect ( err ) . To ( HaveOccurred ( ) )
Expect ( err . Error ( ) ) . To ( ContainSubstring ( "MOVED" ) )
Expect ( client . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
It ( "calls fn for every master node" , func ( ) {
for i := 0 ; i < 10 ; i ++ {
Expect ( client . Set ( ctx , strconv . Itoa ( i ) , "" , 0 ) . Err ( ) ) . NotTo ( HaveOccurred ( ) )
}
err := client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
size , err := client . DBSize ( ctx ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( size ) . To ( Equal ( int64 ( 0 ) ) )
} )
It ( "should CLUSTER SLOTS" , func ( ) {
res , err := client . ClusterSlots ( ctx ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( res ) . To ( HaveLen ( 3 ) )
wanted := [ ] redis . ClusterSlot { {
Start : 0 ,
End : 4999 ,
Nodes : [ ] redis . ClusterNode { {
ID : "" ,
Addr : "127.0.0.1:8220" ,
} , {
ID : "" ,
Addr : "127.0.0.1:8223" ,
} } ,
} , {
Start : 5000 ,
End : 9999 ,
Nodes : [ ] redis . ClusterNode { {
ID : "" ,
Addr : "127.0.0.1:8221" ,
} , {
ID : "" ,
Addr : "127.0.0.1:8224" ,
} } ,
} , {
Start : 10000 ,
End : 16383 ,
Nodes : [ ] redis . ClusterNode { {
ID : "" ,
Addr : "127.0.0.1:8222" ,
} , {
ID : "" ,
Addr : "127.0.0.1:8225" ,
} } ,
} }
Expect ( assertSlotsEqual ( res , wanted ) ) . NotTo ( HaveOccurred ( ) )
} )
It ( "should CLUSTER NODES" , func ( ) {
res , err := client . ClusterNodes ( ctx ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( len ( res ) ) . To ( BeNumerically ( ">" , 400 ) )
} )
It ( "should CLUSTER INFO" , func ( ) {
res , err := client . ClusterInfo ( ctx ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( res ) . To ( ContainSubstring ( "cluster_known_nodes:6" ) )
} )
It ( "should CLUSTER KEYSLOT" , func ( ) {
hashSlot , err := client . ClusterKeySlot ( ctx , "somekey" ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( hashSlot ) . To ( Equal ( int64 ( hashtag . Slot ( "somekey" ) ) ) )
} )
It ( "should CLUSTER GETKEYSINSLOT" , func ( ) {
keys , err := client . ClusterGetKeysInSlot ( ctx , hashtag . Slot ( "somekey" ) , 1 ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( len ( keys ) ) . To ( Equal ( 0 ) )
} )
It ( "should CLUSTER COUNT-FAILURE-REPORTS" , func ( ) {
n , err := client . ClusterCountFailureReports ( ctx , cluster . nodeIDs [ 0 ] ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( n ) . To ( Equal ( int64 ( 0 ) ) )
} )
It ( "should CLUSTER COUNTKEYSINSLOT" , func ( ) {
n , err := client . ClusterCountKeysInSlot ( ctx , 10 ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( n ) . To ( Equal ( int64 ( 0 ) ) )
} )
It ( "should CLUSTER SAVECONFIG" , func ( ) {
res , err := client . ClusterSaveConfig ( ctx ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( res ) . To ( Equal ( "OK" ) )
} )
It ( "should CLUSTER SLAVES" , func ( ) {
nodesList , err := client . ClusterSlaves ( ctx , cluster . nodeIDs [ 0 ] ) . Result ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( nodesList ) . Should ( ContainElement ( ContainSubstring ( "slave" ) ) )
Expect ( nodesList ) . Should ( HaveLen ( 1 ) )
} )
It ( "should RANDOMKEY" , func ( ) {
const nkeys = 100
for i := 0 ; i < nkeys ; i ++ {
err := client . Set ( ctx , fmt . Sprintf ( "key%d" , i ) , "value" , 0 ) . Err ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
}
var keys [ ] string
addKey := func ( key string ) {
for _ , k := range keys {
if k == key {
return
}
}
keys = append ( keys , key )
}
for i := 0 ; i < nkeys * 10 ; i ++ {
key := client . RandomKey ( ctx ) . Val ( )
addKey ( key )
}
Expect ( len ( keys ) ) . To ( BeNumerically ( "~" , nkeys , nkeys / 10 ) )
} )
2020-02-14 15:30:07 +03:00
It ( "supports Process hook" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-06-10 15:04:12 +03:00
err = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2020-03-11 17:26:42 +03:00
return node . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
var stack [ ] string
clusterHook := & hook {
beforeProcess : func ( ctx context . Context , cmd redis . Cmder ) ( context . Context , error ) {
Expect ( cmd . String ( ) ) . To ( Equal ( "ping: " ) )
stack = append ( stack , "cluster.BeforeProcess" )
return ctx , nil
} ,
afterProcess : func ( ctx context . Context , cmd redis . Cmder ) error {
Expect ( cmd . String ( ) ) . To ( Equal ( "ping: PONG" ) )
stack = append ( stack , "cluster.AfterProcess" )
return nil
} ,
}
client . AddHook ( clusterHook )
2020-02-14 16:37:35 +03:00
nodeHook := & hook {
2020-02-14 15:30:07 +03:00
beforeProcess : func ( ctx context . Context , cmd redis . Cmder ) ( context . Context , error ) {
Expect ( cmd . String ( ) ) . To ( Equal ( "ping: " ) )
stack = append ( stack , "shard.BeforeProcess" )
return ctx , nil
} ,
afterProcess : func ( ctx context . Context , cmd redis . Cmder ) error {
Expect ( cmd . String ( ) ) . To ( Equal ( "ping: PONG" ) )
stack = append ( stack , "shard.AfterProcess" )
return nil
} ,
}
2020-06-10 15:04:12 +03:00
_ = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2020-02-14 16:37:35 +03:00
node . AddHook ( nodeHook )
return nil
} )
2020-02-14 15:30:07 +03:00
2020-03-11 17:26:42 +03:00
err = client . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( stack ) . To ( Equal ( [ ] string {
"cluster.BeforeProcess" ,
"shard.BeforeProcess" ,
"shard.AfterProcess" ,
"cluster.AfterProcess" ,
} ) )
clusterHook . beforeProcess = nil
clusterHook . afterProcess = nil
2020-02-14 16:37:35 +03:00
nodeHook . beforeProcess = nil
nodeHook . afterProcess = nil
2020-02-14 15:30:07 +03:00
} )
It ( "supports Pipeline hook" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-06-10 15:04:12 +03:00
err = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2020-03-11 17:26:42 +03:00
return node . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
var stack [ ] string
client . AddHook ( & hook {
beforeProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) ( context . Context , error ) {
Expect ( cmds ) . To ( HaveLen ( 1 ) )
Expect ( cmds [ 0 ] . String ( ) ) . To ( Equal ( "ping: " ) )
stack = append ( stack , "cluster.BeforeProcessPipeline" )
return ctx , nil
} ,
afterProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) error {
Expect ( cmds ) . To ( HaveLen ( 1 ) )
Expect ( cmds [ 0 ] . String ( ) ) . To ( Equal ( "ping: PONG" ) )
stack = append ( stack , "cluster.AfterProcessPipeline" )
return nil
} ,
} )
2020-06-10 15:04:12 +03:00
_ = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2020-02-14 16:37:35 +03:00
node . AddHook ( & hook {
2020-02-14 15:30:07 +03:00
beforeProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) ( context . Context , error ) {
Expect ( cmds ) . To ( HaveLen ( 1 ) )
Expect ( cmds [ 0 ] . String ( ) ) . To ( Equal ( "ping: " ) )
stack = append ( stack , "shard.BeforeProcessPipeline" )
return ctx , nil
} ,
afterProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) error {
Expect ( cmds ) . To ( HaveLen ( 1 ) )
Expect ( cmds [ 0 ] . String ( ) ) . To ( Equal ( "ping: PONG" ) )
stack = append ( stack , "shard.AfterProcessPipeline" )
return nil
} ,
} )
2020-02-14 16:37:35 +03:00
return nil
} )
2020-02-14 15:30:07 +03:00
2020-03-11 17:26:42 +03:00
_ , err = client . Pipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Ping ( ctx )
2020-02-14 15:30:07 +03:00
return nil
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( stack ) . To ( Equal ( [ ] string {
"cluster.BeforeProcessPipeline" ,
"shard.BeforeProcessPipeline" ,
"shard.AfterProcessPipeline" ,
"cluster.AfterProcessPipeline" ,
} ) )
} )
It ( "supports TxPipeline hook" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-06-10 15:04:12 +03:00
err = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2020-03-11 17:26:42 +03:00
return node . Ping ( ctx ) . Err ( )
2020-02-14 15:30:07 +03:00
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
var stack [ ] string
client . AddHook ( & hook {
beforeProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) ( context . Context , error ) {
2021-01-09 10:27:42 +03:00
Expect ( cmds ) . To ( HaveLen ( 3 ) )
Expect ( cmds [ 1 ] . String ( ) ) . To ( Equal ( "ping: " ) )
2020-02-14 15:30:07 +03:00
stack = append ( stack , "cluster.BeforeProcessPipeline" )
return ctx , nil
} ,
afterProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) error {
2021-01-09 10:27:42 +03:00
Expect ( cmds ) . To ( HaveLen ( 3 ) )
Expect ( cmds [ 1 ] . String ( ) ) . To ( Equal ( "ping: PONG" ) )
2020-02-14 15:30:07 +03:00
stack = append ( stack , "cluster.AfterProcessPipeline" )
return nil
} ,
} )
2020-06-10 15:04:12 +03:00
_ = client . ForEachShard ( ctx , func ( ctx context . Context , node * redis . Client ) error {
2020-02-14 16:37:35 +03:00
node . AddHook ( & hook {
2020-02-14 15:30:07 +03:00
beforeProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) ( context . Context , error ) {
2020-02-14 16:37:35 +03:00
Expect ( cmds ) . To ( HaveLen ( 3 ) )
Expect ( cmds [ 1 ] . String ( ) ) . To ( Equal ( "ping: " ) )
2020-02-14 15:30:07 +03:00
stack = append ( stack , "shard.BeforeProcessPipeline" )
return ctx , nil
} ,
afterProcessPipeline : func ( ctx context . Context , cmds [ ] redis . Cmder ) error {
2020-02-14 16:37:35 +03:00
Expect ( cmds ) . To ( HaveLen ( 3 ) )
Expect ( cmds [ 1 ] . String ( ) ) . To ( Equal ( "ping: PONG" ) )
2020-02-14 15:30:07 +03:00
stack = append ( stack , "shard.AfterProcessPipeline" )
return nil
} ,
} )
2020-02-14 16:37:35 +03:00
return nil
} )
2020-02-14 15:30:07 +03:00
2020-03-11 17:26:42 +03:00
_ , err = client . TxPipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Ping ( ctx )
2020-02-14 15:30:07 +03:00
return nil
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
Expect ( stack ) . To ( Equal ( [ ] string {
"cluster.BeforeProcessPipeline" ,
"shard.BeforeProcessPipeline" ,
"shard.AfterProcessPipeline" ,
"cluster.AfterProcessPipeline" ,
} ) )
} )
2017-12-28 17:41:05 +03:00
2020-12-16 14:25:40 +03:00
It ( "should return correct replica for key" , func ( ) {
2020-12-16 18:45:06 +03:00
client , err := client . SlaveForKey ( ctx , "test" )
2020-12-16 14:25:40 +03:00
Expect ( err ) . ToNot ( HaveOccurred ( ) )
info := client . Info ( ctx , "server" )
Expect ( info . Val ( ) ) . Should ( ContainSubstring ( "tcp_port:8224" ) )
} )
It ( "should return correct master for key" , func ( ) {
client , err := client . MasterForKey ( ctx , "test" )
Expect ( err ) . ToNot ( HaveOccurred ( ) )
info := client . Info ( ctx , "server" )
Expect ( info . Val ( ) ) . Should ( ContainSubstring ( "tcp_port:8221" ) )
} )
2017-07-09 13:10:07 +03:00
assertClusterClient ( )
} )
2016-06-05 14:30:56 +03:00
Describe ( "ClusterClient with RouteByLatency" , func ( ) {
BeforeEach ( func ( ) {
2016-12-16 17:26:48 +03:00
opt = redisClusterOptions ( )
2016-10-02 15:44:01 +03:00
opt . RouteByLatency = true
2020-03-11 17:26:42 +03:00
client = cluster . newClusterClient ( ctx , opt )
2016-06-17 15:09:38 +03:00
2020-03-11 17:26:42 +03:00
err := client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
2016-06-17 15:09:38 +03:00
} )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2017-08-31 15:22:47 +03:00
2020-03-11 17:26:42 +03:00
err = client . ForEachSlave ( ctx , func ( ctx context . Context , slave * redis . Client ) error {
2017-08-31 15:22:47 +03:00
Eventually ( func ( ) int64 {
2020-03-11 17:26:42 +03:00
return client . DBSize ( ctx ) . Val ( )
2017-08-31 15:22:47 +03:00
} , 30 * time . Second ) . Should ( Equal ( int64 ( 0 ) ) )
return nil
} )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2016-06-05 14:30:56 +03:00
} )
AfterEach ( func ( ) {
2020-03-11 17:26:42 +03:00
err := client . ForEachSlave ( ctx , func ( ctx context . Context , slave * redis . Client ) error {
return slave . ReadWrite ( ctx ) . Err ( )
2017-08-31 15:22:47 +03:00
} )
2018-05-17 16:09:56 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
err = client . Close ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2016-06-05 14:30:56 +03:00
} )
2016-12-16 17:26:48 +03:00
assertClusterClient ( )
2016-04-06 14:01:08 +03:00
} )
2018-06-29 10:45:05 +03:00
Describe ( "ClusterClient with ClusterSlots" , func ( ) {
BeforeEach ( func ( ) {
failover = true
opt = redisClusterOptions ( )
2020-09-09 15:27:17 +03:00
opt . ClusterSlots = func ( ctx context . Context ) ( [ ] redis . ClusterSlot , error ) {
2018-06-29 10:45:05 +03:00
slots := [ ] redis . ClusterSlot { {
Start : 0 ,
End : 4999 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":" + ringShard1Port ,
} } ,
} , {
Start : 5000 ,
End : 9999 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":" + ringShard2Port ,
} } ,
} , {
Start : 10000 ,
End : 16383 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":" + ringShard3Port ,
} } ,
} }
return slots , nil
}
2020-03-11 17:26:42 +03:00
client = cluster . newClusterClient ( ctx , opt )
2018-06-29 10:45:05 +03:00
2020-03-11 17:26:42 +03:00
err := client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
2018-06-29 10:45:05 +03:00
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-03-11 17:26:42 +03:00
err = client . ForEachSlave ( ctx , func ( ctx context . Context , slave * redis . Client ) error {
2018-09-20 12:49:43 +03:00
Eventually ( func ( ) int64 {
2020-03-11 17:26:42 +03:00
return client . DBSize ( ctx ) . Val ( )
2018-09-20 12:49:43 +03:00
} , 30 * time . Second ) . Should ( Equal ( int64 ( 0 ) ) )
return nil
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
AfterEach ( func ( ) {
failover = false
err := client . Close ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
assertClusterClient ( )
} )
Describe ( "ClusterClient with RouteRandomly and ClusterSlots" , func ( ) {
BeforeEach ( func ( ) {
failover = true
opt = redisClusterOptions ( )
opt . RouteRandomly = true
2020-09-09 15:27:17 +03:00
opt . ClusterSlots = func ( ctx context . Context ) ( [ ] redis . ClusterSlot , error ) {
2018-09-20 12:49:43 +03:00
slots := [ ] redis . ClusterSlot { {
Start : 0 ,
End : 4999 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":" + ringShard1Port ,
} } ,
} , {
Start : 5000 ,
End : 9999 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":" + ringShard2Port ,
} } ,
} , {
Start : 10000 ,
End : 16383 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":" + ringShard3Port ,
} } ,
} }
return slots , nil
}
2020-03-11 17:26:42 +03:00
client = cluster . newClusterClient ( ctx , opt )
2018-09-20 12:49:43 +03:00
2020-03-11 17:26:42 +03:00
err := client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
2018-09-20 12:49:43 +03:00
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
2020-03-11 17:26:42 +03:00
err = client . ForEachSlave ( ctx , func ( ctx context . Context , slave * redis . Client ) error {
2018-06-29 10:45:05 +03:00
Eventually ( func ( ) int64 {
2020-03-11 17:26:42 +03:00
return client . DBSize ( ctx ) . Val ( )
2018-06-29 10:45:05 +03:00
} , 30 * time . Second ) . Should ( Equal ( int64 ( 0 ) ) )
2020-12-11 19:21:25 +03:00
return nil
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
AfterEach ( func ( ) {
failover = false
err := client . Close ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
assertClusterClient ( )
} )
Describe ( "ClusterClient with ClusterSlots with multiple nodes per slot" , func ( ) {
BeforeEach ( func ( ) {
failover = true
opt = redisClusterOptions ( )
opt . ReadOnly = true
opt . ClusterSlots = func ( ctx context . Context ) ( [ ] redis . ClusterSlot , error ) {
slots := [ ] redis . ClusterSlot { {
Start : 0 ,
End : 4999 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":8220" ,
} , {
Addr : ":8223" ,
} } ,
} , {
Start : 5000 ,
End : 9999 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":8221" ,
} , {
Addr : ":8224" ,
} } ,
} , {
Start : 10000 ,
End : 16383 ,
Nodes : [ ] redis . ClusterNode { {
Addr : ":8222" ,
} , {
Addr : ":8225" ,
} } ,
} }
return slots , nil
}
client = cluster . newClusterClient ( ctx , opt )
err := client . ForEachMaster ( ctx , func ( ctx context . Context , master * redis . Client ) error {
return master . FlushDB ( ctx ) . Err ( )
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
err = client . ForEachSlave ( ctx , func ( ctx context . Context , slave * redis . Client ) error {
Eventually ( func ( ) int64 {
return client . DBSize ( ctx ) . Val ( )
} , 30 * time . Second ) . Should ( Equal ( int64 ( 0 ) ) )
2018-06-29 10:45:05 +03:00
return nil
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
AfterEach ( func ( ) {
failover = false
err := client . Close ( )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
assertClusterClient ( )
} )
2016-12-03 18:30:13 +03:00
} )
2016-09-23 14:52:19 +03:00
2016-12-03 18:30:13 +03:00
var _ = Describe ( "ClusterClient without nodes" , func ( ) {
var client * redis . ClusterClient
BeforeEach ( func ( ) {
client = redis . NewClusterClient ( & redis . ClusterOptions { } )
} )
AfterEach ( func ( ) {
Expect ( client . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
2017-08-31 15:22:47 +03:00
It ( "Ping returns an error" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2016-12-03 18:30:13 +03:00
Expect ( err ) . To ( MatchError ( "redis: cluster has no nodes" ) )
} )
It ( "pipeline returns an error" , func ( ) {
2020-03-11 17:26:42 +03:00
_ , err := client . Pipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Ping ( ctx )
2016-12-03 18:30:13 +03:00
return nil
2016-10-09 14:12:32 +03:00
} )
2016-12-03 18:30:13 +03:00
Expect ( err ) . To ( MatchError ( "redis: cluster has no nodes" ) )
} )
} )
var _ = Describe ( "ClusterClient without valid nodes" , func ( ) {
var client * redis . ClusterClient
BeforeEach ( func ( ) {
client = redis . NewClusterClient ( & redis . ClusterOptions {
Addrs : [ ] string { redisAddr } ,
} )
} )
AfterEach ( func ( ) {
Expect ( client . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
It ( "returns an error" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2018-02-15 14:00:54 +03:00
Expect ( err ) . To ( MatchError ( "ERR This instance has cluster support disabled" ) )
2016-12-03 18:30:13 +03:00
} )
It ( "pipeline returns an error" , func ( ) {
2020-03-11 17:26:42 +03:00
_ , err := client . Pipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Ping ( ctx )
2016-12-03 18:30:13 +03:00
return nil
} )
2018-02-15 14:00:54 +03:00
Expect ( err ) . To ( MatchError ( "ERR This instance has cluster support disabled" ) )
2016-12-03 18:30:13 +03:00
} )
} )
2018-12-13 13:26:02 +03:00
var _ = Describe ( "ClusterClient with unavailable Cluster" , func ( ) {
var client * redis . ClusterClient
BeforeEach ( func ( ) {
for _ , node := range cluster . clients {
2020-03-11 17:26:42 +03:00
err := node . ClientPause ( ctx , 5 * time . Second ) . Err ( )
2018-12-13 13:26:02 +03:00
Expect ( err ) . NotTo ( HaveOccurred ( ) )
}
opt := redisClusterOptions ( )
opt . ReadTimeout = 250 * time . Millisecond
opt . WriteTimeout = 250 * time . Millisecond
opt . MaxRedirects = 1
2020-09-05 11:34:37 +03:00
client = cluster . newClusterClientUnstable ( opt )
2018-12-13 13:26:02 +03:00
} )
AfterEach ( func ( ) {
Expect ( client . Close ( ) ) . NotTo ( HaveOccurred ( ) )
} )
It ( "recovers when Cluster recovers" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2018-12-13 13:26:02 +03:00
Expect ( err ) . To ( HaveOccurred ( ) )
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
return client . Ping ( ctx ) . Err ( )
2018-12-13 13:26:02 +03:00
} , "30s" ) . ShouldNot ( HaveOccurred ( ) )
} )
} )
2016-12-03 18:30:13 +03:00
var _ = Describe ( "ClusterClient timeout" , func ( ) {
var client * redis . ClusterClient
2016-10-09 14:12:32 +03:00
2016-12-03 18:30:13 +03:00
AfterEach ( func ( ) {
2017-03-04 14:04:27 +03:00
_ = client . Close ( )
2016-12-03 18:30:13 +03:00
} )
testTimeout := func ( ) {
It ( "Ping timeouts" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Ping ( ctx ) . Err ( )
2016-12-03 18:30:13 +03:00
Expect ( err ) . To ( HaveOccurred ( ) )
Expect ( err . ( net . Error ) . Timeout ( ) ) . To ( BeTrue ( ) )
2016-10-09 14:12:32 +03:00
} )
2016-12-03 18:30:13 +03:00
It ( "Pipeline timeouts" , func ( ) {
2020-03-11 17:26:42 +03:00
_ , err := client . Pipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Ping ( ctx )
2016-10-09 14:12:32 +03:00
return nil
} )
2016-12-03 18:30:13 +03:00
Expect ( err ) . To ( HaveOccurred ( ) )
Expect ( err . ( net . Error ) . Timeout ( ) ) . To ( BeTrue ( ) )
2016-10-09 14:12:32 +03:00
} )
2016-12-03 18:30:13 +03:00
It ( "Tx timeouts" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Watch ( ctx , func ( tx * redis . Tx ) error {
return tx . Ping ( ctx ) . Err ( )
2017-08-31 15:22:47 +03:00
} , "foo" )
2016-12-03 18:30:13 +03:00
Expect ( err ) . To ( HaveOccurred ( ) )
Expect ( err . ( net . Error ) . Timeout ( ) ) . To ( BeTrue ( ) )
2016-09-23 14:52:19 +03:00
} )
2016-12-03 18:30:13 +03:00
It ( "Tx Pipeline timeouts" , func ( ) {
2020-03-11 17:26:42 +03:00
err := client . Watch ( ctx , func ( tx * redis . Tx ) error {
_ , err := tx . TxPipelined ( ctx , func ( pipe redis . Pipeliner ) error {
pipe . Ping ( ctx )
2016-12-03 18:30:13 +03:00
return nil
} )
return err
2017-08-31 15:22:47 +03:00
} , "foo" )
2016-12-03 18:30:13 +03:00
Expect ( err ) . To ( HaveOccurred ( ) )
Expect ( err . ( net . Error ) . Timeout ( ) ) . To ( BeTrue ( ) )
2016-09-23 14:52:19 +03:00
} )
2016-12-03 18:30:13 +03:00
}
2016-10-09 14:12:32 +03:00
2018-12-13 13:26:02 +03:00
const pause = 5 * time . Second
2016-12-03 18:30:13 +03:00
2017-08-31 15:22:47 +03:00
Context ( "read/write timeout" , func ( ) {
2016-12-03 18:30:13 +03:00
BeforeEach ( func ( ) {
opt := redisClusterOptions ( )
2018-06-18 12:55:26 +03:00
opt . ReadTimeout = 250 * time . Millisecond
opt . WriteTimeout = 250 * time . Millisecond
2017-08-31 15:22:47 +03:00
opt . MaxRedirects = 1
2020-03-11 17:26:42 +03:00
client = cluster . newClusterClient ( ctx , opt )
2016-12-12 18:30:08 +03:00
2020-06-10 15:04:12 +03:00
err := client . ForEachShard ( ctx , func ( ctx context . Context , client * redis . Client ) error {
2020-03-11 17:26:42 +03:00
return client . ClientPause ( ctx , pause ) . Err ( )
2016-12-12 18:30:08 +03:00
} )
Expect ( err ) . NotTo ( HaveOccurred ( ) )
} )
AfterEach ( func ( ) {
2020-06-10 15:04:12 +03:00
_ = client . ForEachShard ( ctx , func ( ctx context . Context , client * redis . Client ) error {
2018-02-15 14:00:54 +03:00
defer GinkgoRecover ( )
2017-08-15 10:34:05 +03:00
Eventually ( func ( ) error {
2020-03-11 17:26:42 +03:00
return client . Ping ( ctx ) . Err ( )
2017-08-15 10:34:05 +03:00
} , 2 * pause ) . ShouldNot ( HaveOccurred ( ) )
return nil
} )
2016-12-12 18:30:08 +03:00
} )
testTimeout ( )
} )
2015-01-24 15:12:48 +03:00
} )
2021-10-16 07:04:25 +03:00
2021-10-18 17:07:36 +03:00
func TestParseClusterURL ( t * testing . T ) {
2021-10-16 07:04:25 +03:00
cases := [ ] struct {
test string
2021-10-18 17:07:36 +03:00
url string
2021-10-16 07:04:25 +03:00
o * redis . ClusterOptions // expected value
err error
} {
{
test : "ParseRedisURL" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } } ,
} , {
test : "ParseRedissURL" ,
2021-10-18 17:07:36 +03:00
url : "rediss://localhost:123" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , TLSConfig : & tls . Config { /* no deep comparison */ } } ,
} , {
test : "MissingRedisPort" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:6379" } } ,
} , {
test : "MissingRedissPort" ,
2021-10-18 17:07:36 +03:00
url : "rediss://localhost" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:6379" } , TLSConfig : & tls . Config { /* no deep comparison */ } } ,
} , {
test : "MultipleRedisURLs" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123?addr=localhost:1234&addr=localhost:12345" ,
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" , "localhost:12345" , "localhost:1234" } } ,
2021-10-16 07:04:25 +03:00
} , {
test : "MultipleRedissURLs" ,
2021-10-18 17:07:36 +03:00
url : "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345" ,
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" , "localhost:12345" , "localhost:1234" } , TLSConfig : & tls . Config { /* no deep comparison */ } } ,
2021-10-16 07:04:25 +03:00
} , {
test : "OnlyPassword" ,
2021-10-18 17:07:36 +03:00
url : "redis://:bar@localhost:123" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , Password : "bar" } ,
} , {
test : "OnlyUser" ,
2021-10-18 17:07:36 +03:00
url : "redis://foo@localhost:123" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , Username : "foo" } ,
} , {
test : "RedisUsernamePassword" ,
2021-10-18 17:07:36 +03:00
url : "redis://foo:bar@localhost:123" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , Username : "foo" , Password : "bar" } ,
} , {
test : "RedissUsernamePassword" ,
2021-10-18 17:07:36 +03:00
url : "rediss://foo:bar@localhost:123?addr=localhost:1234" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" , "localhost:1234" } , Username : "foo" , Password : "bar" , TLSConfig : & tls . Config { /* no deep comparison */ } } ,
} , {
test : "QueryParameters" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234" ,
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" , "localhost:1234" } , ReadTimeout : 2 * time . Second , PoolFIFO : true } ,
2021-10-16 07:04:25 +03:00
} , {
test : "DisabledTimeout" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123?idle_timeout=0" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , IdleTimeout : - 1 } ,
} , {
test : "DisabledTimeoutNeg" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123?idle_timeout=-1" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , IdleTimeout : - 1 } ,
} , {
test : "UseDefault" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123?idle_timeout=" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , IdleTimeout : 0 } ,
} , {
test : "UseDefaultMissing=" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost:123?idle_timeout" ,
2021-10-16 07:04:25 +03:00
o : & redis . ClusterOptions { Addrs : [ ] string { "localhost:123" } , IdleTimeout : 0 } ,
} , {
2021-10-18 17:07:36 +03:00
test : "InvalidQueryAddr" ,
url : "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234" ,
err : errors . New ( ` redis: unable to parse addr param: rediss://foo:barr@localhost:1234 ` ) ,
2021-10-16 07:04:25 +03:00
} , {
test : "InvalidInt" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost?pool_size=five" ,
2021-10-16 07:04:25 +03:00
err : errors . New ( ` redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax ` ) ,
} , {
test : "InvalidBool" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost?pool_fifo=yes" ,
2021-10-16 07:04:25 +03:00
err : errors . New ( ` redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes" ` ) ,
} , {
test : "UnknownParam" ,
2021-10-18 17:07:36 +03:00
url : "redis://localhost?abc=123" ,
2021-10-16 07:04:25 +03:00
err : errors . New ( "redis: unexpected option: abc" ) ,
} , {
test : "InvalidScheme" ,
2021-10-18 17:07:36 +03:00
url : "https://google.com" ,
2021-10-16 07:04:25 +03:00
err : errors . New ( "redis: invalid URL scheme: https" ) ,
} ,
}
for i := range cases {
tc := cases [ i ]
t . Run ( tc . test , func ( t * testing . T ) {
t . Parallel ( )
2021-10-18 17:07:36 +03:00
actual , err := redis . ParseClusterURL ( tc . url )
2021-10-16 07:04:25 +03:00
if tc . err == nil && err != nil {
t . Fatalf ( "unexpected error: %q" , err )
return
}
2021-10-18 17:07:36 +03:00
if tc . err != nil && err == nil {
t . Fatalf ( "expected error: got %+v" , actual )
return
}
2021-10-16 07:04:25 +03:00
if tc . err != nil && err != nil {
if tc . err . Error ( ) != err . Error ( ) {
t . Fatalf ( "got %q, expected %q" , err , tc . err )
}
return
}
comprareOptions ( t , actual , tc . o )
} )
}
}
func comprareOptions ( t * testing . T , actual , expected * redis . ClusterOptions ) {
t . Helper ( )
if ! reflect . DeepEqual ( actual . Addrs , expected . Addrs ) {
t . Errorf ( "got %q, want %q" , actual . Addrs , expected . Addrs )
}
if actual . TLSConfig == nil && expected . TLSConfig != nil {
t . Errorf ( "got nil TLSConfig, expected a TLSConfig" )
}
if actual . TLSConfig != nil && expected . TLSConfig == nil {
t . Errorf ( "got TLSConfig, expected no TLSConfig" )
}
if actual . Username != expected . Username {
t . Errorf ( "Username: got %q, expected %q" , actual . Username , expected . Username )
}
if actual . Password != expected . Password {
t . Errorf ( "Password: got %q, expected %q" , actual . Password , expected . Password )
}
if actual . MaxRetries != expected . MaxRetries {
t . Errorf ( "MaxRetries: got %v, expected %v" , actual . MaxRetries , expected . MaxRetries )
}
if actual . MinRetryBackoff != expected . MinRetryBackoff {
t . Errorf ( "MinRetryBackoff: got %v, expected %v" , actual . MinRetryBackoff , expected . MinRetryBackoff )
}
if actual . MaxRetryBackoff != expected . MaxRetryBackoff {
t . Errorf ( "MaxRetryBackoff: got %v, expected %v" , actual . MaxRetryBackoff , expected . MaxRetryBackoff )
}
if actual . DialTimeout != expected . DialTimeout {
t . Errorf ( "DialTimeout: got %v, expected %v" , actual . DialTimeout , expected . DialTimeout )
}
if actual . ReadTimeout != expected . ReadTimeout {
t . Errorf ( "ReadTimeout: got %v, expected %v" , actual . ReadTimeout , expected . ReadTimeout )
}
if actual . WriteTimeout != expected . WriteTimeout {
t . Errorf ( "WriteTimeout: got %v, expected %v" , actual . WriteTimeout , expected . WriteTimeout )
}
if actual . PoolFIFO != expected . PoolFIFO {
t . Errorf ( "PoolFIFO: got %v, expected %v" , actual . PoolFIFO , expected . PoolFIFO )
}
if actual . PoolSize != expected . PoolSize {
t . Errorf ( "PoolSize: got %v, expected %v" , actual . PoolSize , expected . PoolSize )
}
if actual . MinIdleConns != expected . MinIdleConns {
t . Errorf ( "MinIdleConns: got %v, expected %v" , actual . MinIdleConns , expected . MinIdleConns )
}
if actual . MaxConnAge != expected . MaxConnAge {
t . Errorf ( "MaxConnAge: got %v, expected %v" , actual . MaxConnAge , expected . MaxConnAge )
}
if actual . PoolTimeout != expected . PoolTimeout {
t . Errorf ( "PoolTimeout: got %v, expected %v" , actual . PoolTimeout , expected . PoolTimeout )
}
if actual . IdleTimeout != expected . IdleTimeout {
t . Errorf ( "IdleTimeout: got %v, expected %v" , actual . IdleTimeout , expected . IdleTimeout )
}
if actual . IdleCheckFrequency != expected . IdleCheckFrequency {
t . Errorf ( "IdleCheckFrequency: got %v, expected %v" , actual . IdleCheckFrequency , expected . IdleCheckFrequency )
}
}