Extract ringShards

This commit is contained in:
Vladimir Mihailenco 2018-03-07 14:08:40 +02:00
parent c2fb5132c0
commit db04210af4
1 changed files with 175 additions and 148 deletions

323
ring.go
View File

@ -87,6 +87,8 @@ func (opt *RingOptions) clientOptions() *Options {
}
}
//------------------------------------------------------------------------------
type ringShard struct {
Client *Client
down int32
@ -127,6 +129,150 @@ func (shard *ringShard) Vote(up bool) bool {
return shard.IsDown()
}
//------------------------------------------------------------------------------
type ringShards struct {
mu sync.RWMutex
hash *consistenthash.Map
shards map[string]*ringShard // read only
list []*ringShard // read only
closed bool
}
func newRingShards() *ringShards {
return &ringShards{
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
}
func (c *ringShards) Add(name string, cl *Client) {
shard := &ringShard{Client: cl}
c.hash.Add(name)
c.shards[name] = shard
c.list = append(c.list, shard)
}
func (c *ringShards) List() []*ringShard {
c.mu.RLock()
list := c.list
c.mu.RUnlock()
return list
}
func (c *ringShards) Hash(key string) string {
c.mu.RLock()
hash := c.hash.Get(key)
c.mu.RUnlock()
return hash
}
func (c *ringShards) GetByKey(key string) (*ringShard, error) {
key = hashtag.Key(key)
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
return nil, pool.ErrClosed
}
hash := c.hash.Get(key)
if hash == "" {
c.mu.RUnlock()
return nil, errRingShardsDown
}
shard := c.shards[hash]
c.mu.RUnlock()
return shard, nil
}
func (c *ringShards) GetByHash(name string) (*ringShard, error) {
if name == "" {
return c.Random()
}
c.mu.RLock()
shard := c.shards[name]
c.mu.RUnlock()
return shard, nil
}
func (c *ringShards) Random() (*ringShard, error) {
return c.GetByKey(strconv.Itoa(rand.Int()))
}
// heartbeat monitors state of each shard in the ring.
func (c *ringShards) Heartbeat(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for range ticker.C {
var rebalance bool
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
break
}
shards := c.list
c.mu.RUnlock()
for _, shard := range shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
internal.Logf("ring shard state changed: %s", shard)
rebalance = true
}
}
if rebalance {
c.rebalance()
}
}
}
// rebalance removes dead shards from the Ring.
func (c *ringShards) rebalance() {
hash := consistenthash.New(nreplicas, nil)
for name, shard := range c.shards {
if shard.IsUp() {
hash.Add(name)
}
}
c.mu.Lock()
c.hash = hash
c.mu.Unlock()
}
func (c *ringShards) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
var firstErr error
for _, shard := range c.shards {
if err := shard.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
c.hash = nil
c.shards = nil
c.list = nil
return firstErr
}
//------------------------------------------------------------------------------
// Ring is a Redis client that uses constistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
@ -143,18 +289,11 @@ func (shard *ringShard) Vote(up bool) bool {
// Otherwise you should use Redis Cluster.
type Ring struct {
cmdable
opt *RingOptions
shards *ringShards
cmdsInfoCache *cmdsInfoCache
mu sync.RWMutex
hash *consistenthash.Map
shards map[string]*ringShard // read only
shardsList []*ringShard // read only
processPipeline func([]Cmder) error
closed bool
}
func NewRing(opt *RingOptions) *Ring {
@ -162,10 +301,8 @@ func NewRing(opt *RingOptions) *Ring {
ring := &Ring{
opt: opt,
shards: newRingShards(),
cmdsInfoCache: newCmdsInfoCache(),
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
@ -173,19 +310,17 @@ func NewRing(opt *RingOptions) *Ring {
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
ring.addShard(name, NewClient(clopt))
ring.shards.Add(name, NewClient(clopt))
}
go ring.heartbeat()
go ring.shards.Heartbeat(opt.HeartbeatFrequency)
return ring
}
func (c *Ring) addShard(name string, cl *Client) {
shard := &ringShard{Client: cl}
c.hash.Add(name)
c.shards[name] = shard
c.shardsList = append(c.shardsList, shard)
func (c *Ring) copy() *Ring {
cp := *c
return &cp
}
// Options returns read-only Options that were used to create the client.
@ -199,10 +334,7 @@ func (c *Ring) retryBackoff(attempt int) time.Duration {
// PoolStats returns accumulated connection pool stats.
func (c *Ring) PoolStats() *PoolStats {
c.mu.RLock()
shards := c.shardsList
c.mu.RUnlock()
shards := c.shards.List()
var acc PoolStats
for _, shard := range shards {
s := shard.Client.connPool.Stats()
@ -221,7 +353,7 @@ func (c *Ring) Subscribe(channels ...string) *PubSub {
panic("at least one channel is required")
}
shard, err := c.shardByKey(channels[0])
shard, err := c.shards.GetByKey(channels[0])
if err != nil {
// TODO: return PubSub with sticky error
panic(err)
@ -235,7 +367,7 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub {
panic("at least one channel is required")
}
shard, err := c.shardByKey(channels[0])
shard, err := c.shards.GetByKey(channels[0])
if err != nil {
// TODO: return PubSub with sticky error
panic(err)
@ -246,10 +378,7 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub {
// ForEachShard concurrently calls the fn on each live shard in the ring.
// It returns the first error if any.
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
c.mu.RLock()
shards := c.shardsList
c.mu.RUnlock()
shards := c.shards.List()
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, shard := range shards {
@ -281,10 +410,7 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
func (c *Ring) cmdInfo(name string) *CommandInfo {
cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) {
c.mu.RLock()
shards := c.shardsList
c.mu.RUnlock()
shards := c.shards.List()
firstErr := errRingShardsDown
for _, shard := range shards {
cmdsInfo, err := shard.Client.Command().Result()
@ -307,50 +433,14 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
return info
}
func (c *Ring) shardByKey(key string) (*ringShard, error) {
key = hashtag.Key(key)
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
return nil, pool.ErrClosed
}
name := c.hash.Get(key)
if name == "" {
c.mu.RUnlock()
return nil, errRingShardsDown
}
shard := c.shards[name]
c.mu.RUnlock()
return shard, nil
}
func (c *Ring) randomShard() (*ringShard, error) {
return c.shardByKey(strconv.Itoa(rand.Int()))
}
func (c *Ring) shardByName(name string) (*ringShard, error) {
if name == "" {
return c.randomShard()
}
c.mu.RLock()
shard := c.shards[name]
c.mu.RUnlock()
return shard, nil
}
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
cmdInfo := c.cmdInfo(cmd.Name())
pos := cmdFirstKeyPos(cmd, cmdInfo)
if pos == 0 {
return c.randomShard()
return c.shards.Random()
}
firstKey := cmd.stringArg(pos)
return c.shardByKey(firstKey)
return c.shards.GetByKey(firstKey)
}
func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
@ -369,77 +459,6 @@ func (c *Ring) Process(cmd Cmder) error {
return shard.Client.Process(cmd)
}
// rebalance removes dead shards from the Ring.
func (c *Ring) rebalance() {
hash := consistenthash.New(nreplicas, nil)
for name, shard := range c.shards {
if shard.IsUp() {
hash.Add(name)
}
}
c.mu.Lock()
c.hash = hash
c.mu.Unlock()
}
// heartbeat monitors state of each shard in the ring.
func (c *Ring) heartbeat() {
ticker := time.NewTicker(c.opt.HeartbeatFrequency)
defer ticker.Stop()
for range ticker.C {
var rebalance bool
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
break
}
shards := c.shardsList
c.mu.RUnlock()
for _, shard := range shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
internal.Logf("ring shard state changed: %s", shard)
rebalance = true
}
}
if rebalance {
c.rebalance()
}
}
}
// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func (c *Ring) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
var firstErr error
for _, shard := range c.shards {
if err := shard.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
c.hash = nil
c.shards = nil
c.shardsList = nil
return firstErr
}
func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
@ -462,11 +481,11 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())
name := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
if name != "" {
name = c.hash.Get(hashtag.Key(name))
hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
if hash != "" {
hash = c.shards.Hash(hashtag.Key(hash))
}
cmdsMap[name] = append(cmdsMap[name], cmd)
cmdsMap[hash] = append(cmdsMap[hash], cmd)
}
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
@ -476,8 +495,8 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
var failedCmdsMap map[string][]Cmder
for name, cmds := range cmdsMap {
shard, err := c.shardByName(name)
for hash, cmds := range cmdsMap {
shard, err := c.shards.GetByHash(hash)
if err != nil {
setCmdsErr(cmds, err)
continue
@ -500,7 +519,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
failedCmdsMap[name] = cmds
failedCmdsMap[hash] = cmds
}
}
@ -520,3 +539,11 @@ func (c *Ring) TxPipeline() Pipeliner {
func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
panic("not implemented")
}
// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func (c *Ring) Close() error {
return c.shards.Close()
}