feat: ring.SetAddrs to add and remove shards by the ring client and reuse old connections

test: ring scale-in and scale-out

rewrite as suggested by @AlexanderYastrebov
Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
This commit is contained in:
Sandor Szücs 2022-05-23 22:14:49 +02:00
parent ce016ed85f
commit 6f7f800107
No known key found for this signature in database
GPG Key ID: 2D7B996673E41107
3 changed files with 156 additions and 15 deletions

View File

@ -93,3 +93,11 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []
} }
return parseReplicaAddrs(addrs, false) return parseReplicaAddrs(addrs, false)
} }
func (c *Ring) GetAddr(addr string) *ringShard {
return c.shards.GetAddr(addr)
}
func (c *ringShards) GetAddr(addr string) *ringShard {
return c.shards[addr]
}

90
ring.go
View File

@ -160,6 +160,7 @@ func (opt *RingOptions) clientOptions() *Options {
type ringShard struct { type ringShard struct {
Client *Client Client *Client
down int32 down int32
addr string
} }
func newRingShard(opt *RingOptions, name, addr string) *ringShard { func newRingShard(opt *RingOptions, name, addr string) *ringShard {
@ -168,6 +169,7 @@ func newRingShard(opt *RingOptions, name, addr string) *ringShard {
return &ringShard{ return &ringShard{
Client: opt.NewClient(name, clopt), Client: opt.NewClient(name, clopt),
addr: addr,
} }
} }
@ -212,33 +214,68 @@ type ringShards struct {
opt *RingOptions opt *RingOptions
mu sync.RWMutex mu sync.RWMutex
muClose sync.Mutex
hash ConsistentHash hash ConsistentHash
shards map[string]*ringShard // read only shards map[string]*ringShard // read only, updated by SetAddrs
list []*ringShard // read only list []*ringShard // read only, updated by SetAddrs
numShard int numShard int
closed bool closed bool
} }
func newRingShards(opt *RingOptions) *ringShards { func newRingShards(opt *RingOptions) *ringShards {
shards := make(map[string]*ringShard, len(opt.Addrs)) c := &ringShards{
opt: opt,
}
c.SetAddrs(opt.Addrs)
return c
}
// SetAddrs replaces the shards in use, such that you can increase and
// decrease number of shards, that you use. It will reuse shards that
// existed before and close the ones that will not be used anymore.
func (c *ringShards) SetAddrs(addrs map[string]string) {
c.muClose.Lock()
defer c.muClose.Unlock()
if c.closed {
return
}
shards := make(map[string]*ringShard)
unusedShards := make(map[string]*ringShard)
for k, shard := range c.shards {
if addr, ok := addrs[k]; ok && shard.addr == addr {
shards[k] = shard
} else {
unusedShards[k] = shard
}
}
for k, addr := range addrs {
if shard, ok := c.shards[k]; !ok || shard.addr != addr {
shards[k] = newRingShard(c.opt, k, addr)
}
}
list := make([]*ringShard, 0, len(shards)) list := make([]*ringShard, 0, len(shards))
for _, shard := range shards {
for name, addr := range opt.Addrs {
shard := newRingShard(opt, name, addr)
shards[name] = shard
list = append(list, shard) list = append(list, shard)
} }
c := &ringShards{ c.mu.Lock()
opt: opt, c.shards = shards
c.list = list
shards: shards, c.rebalanceLocked()
list: list, c.mu.Unlock()
for k, shard := range unusedShards {
err := shard.Client.Close()
if err != nil {
internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err)
}
} }
c.rebalance()
return c
} }
func (c *ringShards) List() []*ringShard { func (c *ringShards) List() []*ringShard {
@ -355,6 +392,23 @@ func (c *ringShards) rebalance() {
c.mu.Unlock() c.mu.Unlock()
} }
// rebalanceLocked removes dead shards from the Ring and callers need to hold the locl
func (c *ringShards) rebalanceLocked() {
shards := c.shards
liveShards := make([]string, 0, len(shards))
for name, shard := range shards {
if shard.IsUp() {
liveShards = append(liveShards, name)
}
}
hash := c.opt.NewConsistentHash(liveShards)
c.hash = hash
c.numShard = len(liveShards)
}
func (c *ringShards) Len() int { func (c *ringShards) Len() int {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -363,6 +417,8 @@ func (c *ringShards) Len() int {
} }
func (c *ringShards) Close() error { func (c *ringShards) Close() error {
c.muClose.Lock()
defer c.muClose.Unlock()
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -436,6 +492,10 @@ func NewRing(opt *RingOptions) *Ring {
return &ring return &ring
} }
func (c *Ring) SetAddrs(ctx context.Context, addrs map[string]string) {
c.shards.SetAddrs(addrs)
}
// Do creates a Cmd from the args and processes the cmd. // Do creates a Cmd from the args and processes the cmd.
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd { func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...) cmd := NewCmd(ctx, args...)

View File

@ -113,6 +113,79 @@ var _ = Describe("Redis Ring", func() {
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100")) Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
}) })
Describe("[new] dynamic setting ring shards", func() {
It("downscale shard and check reuse shard, upscale shard and check reuse", func() {
Expect(ring.Len(), 2)
wantShard := ring.GetAddr("ringShardOne")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
"ringShardOne": ":" + ringShard1Port,
})
Expect(ring.Len(), 1)
gotShard := ring.GetAddr("ringShardOne")
Expect(gotShard).To(Equal(wantShard))
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
"ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port,
})
Expect(ring.Len(), 2)
gotShard = ring.GetAddr("ringShardOne")
Expect(gotShard).To(Equal(wantShard))
})
It("uses 3 shards after setting it to 3 shards", func() {
Expect(ring.Len(), 2)
// Start ringShard3.
var err error
ringShard3, err = startRedis(ringShard3Port)
Expect(err).NotTo(HaveOccurred())
shardName1 := "ringShardOne"
shardAddr1 := ":" + ringShard1Port
wantShard1 := ring.GetAddr(shardName1)
shardName2 := "ringShardTwo"
shardAddr2 := ":" + ringShard2Port
wantShard2 := ring.GetAddr(shardName2)
shardName3 := "ringShardThree"
shardAddr3 := ":" + ringShard3Port
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
shardName1: shardAddr1,
shardName2: shardAddr2,
shardName3: shardAddr3,
})
Expect(ring.Len(), 3)
gotShard1 := ring.GetAddr(shardName1)
gotShard2 := ring.GetAddr(shardName2)
gotShard3 := ring.GetAddr(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard3).ToNot(BeNil())
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
shardName1: shardAddr1,
shardName2: shardAddr2,
})
Expect(ring.Len(), 2)
gotShard1 = ring.GetAddr(shardName1)
gotShard2 = ring.GetAddr(shardName2)
gotShard3 = ring.GetAddr(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard3).To(BeNil())
})
})
Describe("pipeline", func() { Describe("pipeline", func() {
It("doesn't panic closed ring, returns error", func() { It("doesn't panic closed ring, returns error", func() {
pipe := ring.Pipeline() pipe := ring.Pipeline()