feat: add ParseClusterURLs to allow for parsing of redis cluster urls into cluster options

This commit is contained in:
Stephanie Hingtgen 2021-10-15 23:04:25 -05:00
parent ca2b16333e
commit 7daa7f91fd
No known key found for this signature in database
GPG Key ID: C38C0397CA26D3E5
2 changed files with 337 additions and 0 deletions

View File

@ -6,8 +6,10 @@ import (
"fmt" "fmt"
"math" "math"
"net" "net"
"net/url"
"runtime" "runtime"
"sort" "sort"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -131,6 +133,143 @@ func (opt *ClusterOptions) init() {
} }
} }
// ParseClusterURLs parses an array of URLs into ClusterOptions that can be used to connect to Redis.
// The strings in the array must be in the form:
// redis://<user>:<password>@<host>:<port>
// or
// rediss://<user>:<password>@<host>:<port>
// All strings in the array must use the same scheme, username, and password.
//
// Most Option fields can be set using query parameters, with the following restrictions:
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
// - if query parameters differ between urls, the last one in the array will be used
// Examples:
// [
// redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&max_retries=2,
// redis://user:password@localhost:6790?dial_timeout=3&read_timeout=6s&max_retries=2,
// redis://user:password@localhost:6791?dial_timeout=3&read_timeout=6s&max_retries=5,
// ]
// is equivalent to:
// &ClusterOptions{
// Addr: ["localhost:6789", "localhost:6790", "localhost:6791"]
// DialTimeout: 3 * time.Second, // no time unit = seconds
// ReadTimeout: 6 * time.Second,
// MaxRetries: 5, // last one in the array is used
// }
func ParseClusterURLs(redisURLs []string) (*ClusterOptions, error) {
o := &ClusterOptions{}
previousScheme := ""
// loop through all the URLs and retrieve the addresses as well as the
// cluster options
for _, redisURL := range redisURLs {
u, err := url.Parse(redisURL)
if err != nil {
return nil, err
}
h, p, err := net.SplitHostPort(u.Host)
if err != nil {
h = u.Host
}
if h == "" {
h = "localhost"
}
if p == "" {
p = "6379"
}
o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))
// all URLS must use the same scheme
if previousScheme != "" && u.Scheme != previousScheme {
return nil, fmt.Errorf("redis: mismatch schemes: %s and %s", previousScheme, u.Scheme)
}
previousScheme = u.Scheme
// setup username, password, and other configurations
o, err = setupClusterConn(u, h, o)
if err != nil {
return nil, err
}
}
return o, nil
}
// setupClusterConn gets the username and password from the URL and the query parameters.
func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptions, error) {
// retrieve the configuration from the query parameters
o, err := setupClusterQueryParams(u, o)
if err != nil {
return nil, err
}
switch u.Scheme {
case "rediss":
o.TLSConfig = &tls.Config{ServerName: host}
fallthrough
case "redis":
// get the username & password - they must be consistent across urls
u, p := getUserPassword(u)
if o.Username != "" && o.Username != u {
return nil, fmt.Errorf("redis: mismatch usernames: %s and %s", o.Username, u)
}
if o.Password != "" && o.Password != p {
return nil, fmt.Errorf("redis: mismatch passwords")
}
o.Username, o.Password = u, p
return o, nil
default:
return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
}
}
// setupClusterQueryParams converts query parameters in u to option value in o.
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
q := queryOptions{q: u.Query()}
o.MaxRedirects = q.int("max_redirects")
o.ReadOnly = q.bool("read_only")
o.RouteByLatency = q.bool("route_by_latency")
o.RouteByLatency = q.bool("route_randomly")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff")
o.DialTimeout = q.duration("dial_timeout")
o.ReadTimeout = q.duration("read_timeout")
o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size")
o.MinIdleConns = q.int("min_idle_conns")
o.MaxConnAge = q.duration("max_conn_age")
o.PoolTimeout = q.duration("pool_timeout")
o.IdleTimeout = q.duration("idle_timeout")
o.IdleCheckFrequency = q.duration("idle_check_frequency")
if q.err != nil {
return nil, q.err
}
// any parameters left?
if r := q.remaining(); len(r) > 0 {
return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
}
return o, nil
}
func (opt *ClusterOptions) clientOptions() *Options { func (opt *ClusterOptions) clientOptions() *Options {
const disableIdleCheck = -1 const disableIdleCheck = -1

View File

@ -2,11 +2,15 @@ package redis_test
import ( import (
"context" "context"
"crypto/tls"
"errors"
"fmt" "fmt"
"net" "net"
"reflect"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"testing"
"time" "time"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -1281,3 +1285,197 @@ var _ = Describe("ClusterClient timeout", func() {
testTimeout() testTimeout()
}) })
}) })
func TestParseClusterURLs(t *testing.T) {
cases := []struct {
test string
urls []string
o *redis.ClusterOptions // expected value
err error
}{
{
test: "ParseRedisURL",
urls: []string{"redis://localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
}, {
test: "ParseRedissURL",
urls: []string{"rediss://localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "MissingRedisPort",
urls: []string{"redis://localhost"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
}, {
test: "MissingRedissPort",
urls: []string{"rediss://localhost"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "MultipleRedisURLs",
urls: []string{"redis://localhost:123", "redis://localhost:1234"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}},
}, {
test: "MultipleRedissURLs",
urls: []string{"rediss://localhost:123", "rediss://localhost:1234"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "OnlyPassword",
urls: []string{"redis://:bar@localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
}, {
test: "OnlyUser",
urls: []string{"redis://foo@localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
}, {
test: "RedisUsernamePassword",
urls: []string{"redis://foo:bar@localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
}, {
test: "RedissUsernamePassword",
urls: []string{"rediss://foo:bar@localhost:123", "rediss://foo:bar@localhost:1234"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "QueryParameters",
urls: []string{"redis://localhost:123?read_timeout=2&pool_fifo=true"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
}, {
test: "UseFinalQueryParameters",
urls: []string{"redis://localhost:123?read_timeout=2&pool_fifo=true", "redis://localhost:1234?read_timeout=3&pool_fifo=true"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 3 * time.Second, PoolFIFO: true},
}, {
test: "DisabledTimeout",
urls: []string{"redis://localhost:123?idle_timeout=0"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: -1},
}, {
test: "DisabledTimeoutNeg",
urls: []string{"redis://localhost:123?idle_timeout=-1"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: -1},
}, {
test: "UseDefault",
urls: []string{"redis://localhost:123?idle_timeout="},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: 0},
}, {
test: "UseDefaultMissing=",
urls: []string{"redis://localhost:123?idle_timeout"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: 0},
}, {
test: "RedisPasswordMismatch",
urls: []string{"redis://foo:bar@localhost:123", "redis://foo:barr@localhost:1234"},
err: errors.New(`redis: mismatch passwords`),
}, {
test: "RedisUsernameMismatch",
urls: []string{"redis://fooo:bar@localhost:123", "redis://foo:bar@localhost:1234"},
err: errors.New(`redis: mismatch usernames: fooo and foo`),
}, {
test: "RedissPasswordMismatch",
urls: []string{"rediss://foo:bar@localhost:123", "rediss://foo:barr@localhost:1234"},
err: errors.New(`redis: mismatch passwords`),
}, {
test: "RedissUsernameMismatch",
urls: []string{"rediss://foo:bar@localhost:123", "rediss://fooo:bar@localhost:1234"},
err: errors.New(`redis: mismatch usernames: foo and fooo`),
}, {
test: "SchemeMismatch",
urls: []string{"rediss://foo:bar@localhost:123", "redis://foo:bar@localhost:1234"},
err: errors.New(`redis: mismatch schemes: rediss and redis`),
}, {
test: "SchemeMismatch",
urls: []string{"redis://foo:bar@localhost:123", "localhost:1234"},
err: errors.New(`redis: mismatch schemes: redis and localhost`),
}, {
test: "InvalidInt",
urls: []string{"redis://localhost?pool_size=five"},
err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
}, {
test: "InvalidBool",
urls: []string{"redis://localhost?pool_fifo=yes"},
err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
}, {
test: "UnknownParam",
urls: []string{"redis://localhost?abc=123"},
err: errors.New("redis: unexpected option: abc"),
}, {
test: "InvalidScheme",
urls: []string{"https://google.com"},
err: errors.New("redis: invalid URL scheme: https"),
},
}
for i := range cases {
tc := cases[i]
t.Run(tc.test, func(t *testing.T) {
t.Parallel()
actual, err := redis.ParseClusterURLs(tc.urls)
if tc.err == nil && err != nil {
t.Fatalf("unexpected error: %q", err)
return
}
if tc.err != nil && err != nil {
if tc.err.Error() != err.Error() {
t.Fatalf("got %q, expected %q", err, tc.err)
}
return
}
comprareOptions(t, actual, tc.o)
})
}
}
func comprareOptions(t *testing.T, actual, expected *redis.ClusterOptions) {
t.Helper()
if !reflect.DeepEqual(actual.Addrs, expected.Addrs) {
t.Errorf("got %q, want %q", actual.Addrs, expected.Addrs)
}
if actual.TLSConfig == nil && expected.TLSConfig != nil {
t.Errorf("got nil TLSConfig, expected a TLSConfig")
}
if actual.TLSConfig != nil && expected.TLSConfig == nil {
t.Errorf("got TLSConfig, expected no TLSConfig")
}
if actual.Username != expected.Username {
t.Errorf("Username: got %q, expected %q", actual.Username, expected.Username)
}
if actual.Password != expected.Password {
t.Errorf("Password: got %q, expected %q", actual.Password, expected.Password)
}
if actual.MaxRetries != expected.MaxRetries {
t.Errorf("MaxRetries: got %v, expected %v", actual.MaxRetries, expected.MaxRetries)
}
if actual.MinRetryBackoff != expected.MinRetryBackoff {
t.Errorf("MinRetryBackoff: got %v, expected %v", actual.MinRetryBackoff, expected.MinRetryBackoff)
}
if actual.MaxRetryBackoff != expected.MaxRetryBackoff {
t.Errorf("MaxRetryBackoff: got %v, expected %v", actual.MaxRetryBackoff, expected.MaxRetryBackoff)
}
if actual.DialTimeout != expected.DialTimeout {
t.Errorf("DialTimeout: got %v, expected %v", actual.DialTimeout, expected.DialTimeout)
}
if actual.ReadTimeout != expected.ReadTimeout {
t.Errorf("ReadTimeout: got %v, expected %v", actual.ReadTimeout, expected.ReadTimeout)
}
if actual.WriteTimeout != expected.WriteTimeout {
t.Errorf("WriteTimeout: got %v, expected %v", actual.WriteTimeout, expected.WriteTimeout)
}
if actual.PoolFIFO != expected.PoolFIFO {
t.Errorf("PoolFIFO: got %v, expected %v", actual.PoolFIFO, expected.PoolFIFO)
}
if actual.PoolSize != expected.PoolSize {
t.Errorf("PoolSize: got %v, expected %v", actual.PoolSize, expected.PoolSize)
}
if actual.MinIdleConns != expected.MinIdleConns {
t.Errorf("MinIdleConns: got %v, expected %v", actual.MinIdleConns, expected.MinIdleConns)
}
if actual.MaxConnAge != expected.MaxConnAge {
t.Errorf("MaxConnAge: got %v, expected %v", actual.MaxConnAge, expected.MaxConnAge)
}
if actual.PoolTimeout != expected.PoolTimeout {
t.Errorf("PoolTimeout: got %v, expected %v", actual.PoolTimeout, expected.PoolTimeout)
}
if actual.IdleTimeout != expected.IdleTimeout {
t.Errorf("IdleTimeout: got %v, expected %v", actual.IdleTimeout, expected.IdleTimeout)
}
if actual.IdleCheckFrequency != expected.IdleCheckFrequency {
t.Errorf("IdleCheckFrequency: got %v, expected %v", actual.IdleCheckFrequency, expected.IdleCheckFrequency)
}
}