mirror of https://github.com/go-redis/redis.git
Merge pull request #350 from go-redis/fix/ring-heartbeat-frequency
ring: reduce HeartbeatFrequency.
This commit is contained in:
commit
0b8675fa45
31
options.go
31
options.go
|
@ -18,43 +18,42 @@ type Options struct {
|
||||||
// Network and Addr options.
|
// Network and Addr options.
|
||||||
Dialer func() (net.Conn, error)
|
Dialer func() (net.Conn, error)
|
||||||
|
|
||||||
// An optional password. Must match the password specified in the
|
// Optional password. Must match the password specified in the
|
||||||
// requirepass server configuration option.
|
// requirepass server configuration option.
|
||||||
Password string
|
Password string
|
||||||
// A database to be selected after connecting to server.
|
// Database to be selected after connecting to the server.
|
||||||
DB int
|
DB int
|
||||||
|
|
||||||
// The maximum number of retries before giving up.
|
// Maximum number of retries before giving up.
|
||||||
// Default is to not retry failed commands.
|
// Default is to not retry failed commands.
|
||||||
MaxRetries int
|
MaxRetries int
|
||||||
|
|
||||||
// Sets the deadline for establishing new connections. If reached,
|
// Dial timeout for establishing new connections.
|
||||||
// dial will fail with a timeout.
|
|
||||||
// Default is 5 seconds.
|
// Default is 5 seconds.
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
// Sets the deadline for socket reads. If reached, commands will
|
// Timeout for socket reads. If reached, commands will fail
|
||||||
// fail with a timeout instead of blocking.
|
// with a timeout instead of blocking.
|
||||||
ReadTimeout time.Duration
|
ReadTimeout time.Duration
|
||||||
// Sets the deadline for socket writes. If reached, commands will
|
// Timeout for socket writes. If reached, commands will fail
|
||||||
// fail with a timeout instead of blocking.
|
// with a timeout instead of blocking.
|
||||||
WriteTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
|
|
||||||
// The maximum number of socket connections.
|
// Maximum number of socket connections.
|
||||||
// Default is 10 connections.
|
// Default is 10 connections.
|
||||||
PoolSize int
|
PoolSize int
|
||||||
// Specifies amount of time client waits for connection if all
|
// Amount of time client waits for connection if all connections
|
||||||
// connections are busy before returning an error.
|
// are busy before returning an error.
|
||||||
// Default is 1 second.
|
// Default is 1 second.
|
||||||
PoolTimeout time.Duration
|
PoolTimeout time.Duration
|
||||||
// Specifies amount of time after which client closes idle
|
// Amount of time after which client closes idle connections.
|
||||||
// connections. Should be less than server's timeout.
|
// Should be less than server's timeout.
|
||||||
// Default is to not close idle connections.
|
// Default is to not close idle connections.
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
// The frequency of idle checks.
|
// Frequency of idle checks.
|
||||||
// Default is 1 minute.
|
// Default is 1 minute.
|
||||||
IdleCheckFrequency time.Duration
|
IdleCheckFrequency time.Duration
|
||||||
|
|
||||||
// Enables read queries for a connection to a Redis Cluster slave node.
|
// Enables read only queries on slave nodes.
|
||||||
ReadOnly bool
|
ReadOnly bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
22
ring.go
22
ring.go
|
@ -12,16 +12,18 @@ import (
|
||||||
"gopkg.in/redis.v4/internal/pool"
|
"gopkg.in/redis.v4/internal/pool"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errRingShardsDown = errors.New("redis: all ring shards are down")
|
||||||
errRingShardsDown = errors.New("redis: all ring shards are down")
|
|
||||||
)
|
|
||||||
|
|
||||||
// RingOptions are used to configure a ring client and should be
|
// RingOptions are used to configure a ring client and should be
|
||||||
// passed to NewRing.
|
// passed to NewRing.
|
||||||
type RingOptions struct {
|
type RingOptions struct {
|
||||||
// A map of name => host:port addresses of ring shards.
|
// Map of name => host:port addresses of ring shards.
|
||||||
Addrs map[string]string
|
Addrs map[string]string
|
||||||
|
|
||||||
|
// Frequency of PING commands sent to check shards availability.
|
||||||
|
// Shard is considered down after 3 subsequent failed checks.
|
||||||
|
HeartbeatFrequency time.Duration
|
||||||
|
|
||||||
// Following options are copied from Options struct.
|
// Following options are copied from Options struct.
|
||||||
|
|
||||||
DB int
|
DB int
|
||||||
|
@ -39,7 +41,11 @@ type RingOptions struct {
|
||||||
IdleCheckFrequency time.Duration
|
IdleCheckFrequency time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *RingOptions) init() {}
|
func (opt *RingOptions) init() {
|
||||||
|
if opt.HeartbeatFrequency == 0 {
|
||||||
|
opt.HeartbeatFrequency = 500 * time.Millisecond
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (opt *RingOptions) clientOptions() *Options {
|
func (opt *RingOptions) clientOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
|
@ -73,7 +79,7 @@ func (shard *ringShard) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *ringShard) IsDown() bool {
|
func (shard *ringShard) IsDown() bool {
|
||||||
const threshold = 5
|
const threshold = 3
|
||||||
return shard.down >= threshold
|
return shard.down >= threshold
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +114,7 @@ func (shard *ringShard) Vote(up bool) bool {
|
||||||
// uses shards that are available to the client and does not do any
|
// uses shards that are available to the client and does not do any
|
||||||
// coordination when shard state is changed.
|
// coordination when shard state is changed.
|
||||||
//
|
//
|
||||||
// Ring should be used when you use multiple Redis servers for caching
|
// Ring should be used when you need multiple Redis servers for caching
|
||||||
// and can tolerate losing data when one of the servers dies.
|
// and can tolerate losing data when one of the servers dies.
|
||||||
// Otherwise you should use Redis Cluster.
|
// Otherwise you should use Redis Cluster.
|
||||||
type Ring struct {
|
type Ring struct {
|
||||||
|
@ -224,7 +230,7 @@ func (c *Ring) rebalance() {
|
||||||
|
|
||||||
// heartbeat monitors state of each shard in the ring.
|
// heartbeat monitors state of each shard in the ring.
|
||||||
func (c *Ring) heartbeat() {
|
func (c *Ring) heartbeat() {
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
ticker := time.NewTicker(c.opt.HeartbeatFrequency)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for _ = range ticker.C {
|
for _ = range ticker.C {
|
||||||
var rebalance bool
|
var rebalance bool
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Redis ring", func() {
|
var _ = Describe("Redis ring", func() {
|
||||||
|
const heartbeat = 100 * time.Millisecond
|
||||||
var ring *redis.Ring
|
var ring *redis.Ring
|
||||||
|
|
||||||
setRingKeys := func() {
|
setRingKeys := func() {
|
||||||
|
@ -27,6 +28,7 @@ var _ = Describe("Redis ring", func() {
|
||||||
"ringShardOne": ":" + ringShard1Port,
|
"ringShardOne": ":" + ringShard1Port,
|
||||||
"ringShardTwo": ":" + ringShard2Port,
|
"ringShardTwo": ":" + ringShard2Port,
|
||||||
},
|
},
|
||||||
|
HeartbeatFrequency: heartbeat,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Shards should not have any keys.
|
// Shards should not have any keys.
|
||||||
|
@ -53,10 +55,9 @@ var _ = Describe("Redis ring", func() {
|
||||||
// Stop ringShard2.
|
// Stop ringShard2.
|
||||||
Expect(ringShard2.Close()).NotTo(HaveOccurred())
|
Expect(ringShard2.Close()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Ring needs 5 * heartbeat time to detect that node is down.
|
// Ring needs 3 * heartbeat time to detect that node is down.
|
||||||
// Give it more to be sure.
|
// Give it more to be sure.
|
||||||
heartbeat := 100 * time.Millisecond
|
time.Sleep(2 * 3 * heartbeat)
|
||||||
time.Sleep(2 * 5 * heartbeat)
|
|
||||||
|
|
||||||
setRingKeys()
|
setRingKeys()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue